feat: 同步 socket.io 上游代码

Signed-off-by: MiaoWoo <admin@yumc.pw>
This commit is contained in:
2023-02-09 13:49:48 +08:00
parent 359aeb9d63
commit 7b85ff5b7c
35 changed files with 7612 additions and 6519 deletions

View File

@@ -1,6 +1,7 @@
import { url } from "./url"
import { Manager, ManagerOptions } from "./manager"
import { Socket, SocketOptions } from "./socket"
// import debugModule from "debug"; // debug()
const debug = require("../debug")("socket.io-client")

View File

@@ -10,7 +10,7 @@ import * as parser from "../socket.io-parser"
// import { Decoder, Encoder, Packet } from "socket.io-parser"
import { Decoder, Encoder, Packet } from "../socket.io-parser"
import { on } from "./on.js"
import { Backoff } from "./contrib/backo2"
import { Backoff } from "./contrib/backo2.js"
import {
DefaultEventsMap,
EventsMap,
@@ -470,6 +470,10 @@ export class Manager<
this.nsps[nsp] = socket
}
if (this._autoConnect) {
socket.connect()
}
return socket
}

View File

@@ -14,11 +14,69 @@ import {
// const debug = debugModule("socket.io-client:socket") // debug()
const debug = require("../debug")("socket.io-client")
type PrependTimeoutError<T extends any[]> = {
[K in keyof T]: T[K] extends (...args: infer Params) => infer Result
? (err: Error, ...args: Params) => Result
: T[K]
}
/**
* Utility type to decorate the acknowledgement callbacks with a timeout error.
*
* This is needed because the timeout() flag breaks the symmetry between the sender and the receiver:
*
* @example
* interface Events {
* "my-event": (val: string) => void;
* }
*
* socket.on("my-event", (cb) => {
* cb("123"); // one single argument here
* });
*
* socket.timeout(1000).emit("my-event", (err, val) => {
* // two arguments there (the "err" argument is not properly typed)
* });
*
*/
export type DecorateAcknowledgements<E> = {
[K in keyof E]: E[K] extends (...args: infer Params) => infer Result
? (...args: PrependTimeoutError<Params>) => Result
: E[K]
}
export type Last<T extends any[]> = T extends [...infer H, infer L] ? L : any
export type AllButLast<T extends any[]> = T extends [...infer H, infer L]
? H
: any[]
export type FirstArg<T> = T extends (arg: infer Param) => infer Result
? Param
: any
export interface SocketOptions {
/**
* the authentication payload sent when connecting to the Namespace
*/
auth: { [key: string]: any } | ((cb: (data: object) => void) => void)
auth?: { [key: string]: any } | ((cb: (data: object) => void) => void)
/**
* The maximum number of retries. Above the limit, the packet will be discarded.
*
* Using `Infinity` means the delivery guarantee is "at-least-once" (instead of "at-most-once" by default), but a
* smaller value like 10 should be sufficient in practice.
*/
retries?: number
/**
* The default timeout in milliseconds used when waiting for an acknowledgement.
*/
ackTimeout?: number
}
type QueuedPacket = {
id: number
args: unknown[]
flags: Flags
pending: boolean
tryCount: number
}
/**
@@ -39,13 +97,14 @@ interface Flags {
compress?: boolean
volatile?: boolean
timeout?: number
fromQueue?: boolean
}
export type DisconnectDescription =
| Error
| {
description: string
context?: CloseEvent | XMLHttpRequest
context?: unknown // context should be typed as CloseEvent | XMLHttpRequest, but these types are not available on non-browser platforms
}
interface SocketReservedEvents {
@@ -88,19 +147,33 @@ export class Socket<
public readonly io: Manager<ListenEvents, EmitEvents>
/**
* A unique identifier for the session.
*
* @example
* const socket = io();
*
* console.log(socket.id); // undefined
*
* socket.on("connect", () => {
* console.log(socket.id); // "G5p5..."
* });
*/
* A unique identifier for the session.
*
* @example
* const socket = io();
*
* console.log(socket.id); // undefined
*
* socket.on("connect", () => {
* console.log(socket.id); // "G5p5..."
* });
*/
public id: string
/**
* The session ID used for connection state recovery, which must not be shared (unlike {@link id}).
*
* @private
*/
private _pid: string
/**
* The offset of the last received packet, which will be sent upon reconnection to allow for the recovery of the connection state.
*
* @private
*/
private _lastOffset: string
/**
* Whether the socket is currently connected to the server.
*
@@ -116,7 +189,11 @@ export class Socket<
* });
*/
public connected: boolean = false;
/**
* Whether the connection state was recovered after a temporary disconnection. In that case, any missed packets will
* be transmitted by the server.
*/
public recovered: boolean = false;
/**
* Credentials that are sent when accessing a namespace.
*
@@ -143,8 +220,16 @@ export class Socket<
* Buffer for packets that will be sent once the socket is connected
*/
public sendBuffer: Array<Packet> = [];
/**
* The queue of packets to be sent with retry in case of failure.
*
* Packets are sent one by one, each waiting for the server acknowledgement, in order to guarantee the delivery order.
* @private
*/
private _queue: Array<QueuedPacket> = [];
private readonly nsp: string
private readonly _opts: SocketOptions
private ids: number = 0;
private acks: object = {};
@@ -163,6 +248,7 @@ export class Socket<
if (opts && opts.auth) {
this.auth = opts.auth
}
this._opts = Object.assign({}, opts)
if (this.io._autoConnect) this.open()
}
@@ -296,6 +382,12 @@ export class Socket<
}
args.unshift(ev)
if (this._opts.retries && !this.flags.fromQueue && !this.flags.volatile) {
this._addToQueue(args)
return this
}
const packet: any = {
type: PacketType.EVENT,
data: args,
@@ -339,7 +431,7 @@ export class Socket<
* @private
*/
private _registerAckCallback(id: number, ack: Function) {
const timeout = this.flags.timeout
const timeout = this.flags.timeout ?? this._opts.ackTimeout
if (timeout === undefined) {
this.acks[id] = ack
return
@@ -365,6 +457,122 @@ export class Socket<
}
}
/**
* Emits an event and waits for an acknowledgement
*
* @example
* // without timeout
* const response = await socket.emitWithAck("hello", "world");
*
* // with a specific timeout
* try {
* const response = await socket.timeout(1000).emitWithAck("hello", "world");
* } catch (err) {
* // the server did not acknowledge the event in the given delay
* }
*
* @return a Promise that will be fulfilled when the server acknowledges the event
*/
public emitWithAck<Ev extends EventNames<EmitEvents>>(
ev: Ev,
...args: AllButLast<EventParams<EmitEvents, Ev>>
): Promise<FirstArg<Last<EventParams<EmitEvents, Ev>>>> {
// the timeout flag is optional
const withErr =
this.flags.timeout !== undefined || this._opts.ackTimeout !== undefined
return new Promise((resolve, reject) => {
args.push((arg1, arg2) => {
if (withErr) {
return arg1 ? reject(arg1) : resolve(arg2)
} else {
return resolve(arg1)
}
})
this.emit(ev, ...(args as any[] as EventParams<EmitEvents, Ev>))
})
}
/**
* Add the packet to the queue.
* @param args
* @private
*/
private _addToQueue(args: unknown[]) {
let ack
if (typeof args[args.length - 1] === "function") {
ack = args.pop()
}
const packet = {
id: this.ids++,
tryCount: 0,
pending: false,
args,
flags: Object.assign({ fromQueue: true }, this.flags),
}
args.push((err, ...responseArgs) => {
if (packet !== this._queue[0]) {
// the packet has already been acknowledged
return
}
const hasError = err !== null
if (hasError) {
if (packet.tryCount > this._opts.retries) {
debug(
"packet [%d] is discarded after %d tries",
packet.id,
packet.tryCount
)
this._queue.shift()
if (ack) {
ack(err)
}
}
} else {
debug("packet [%d] was successfully sent", packet.id)
this._queue.shift()
if (ack) {
ack(null, ...responseArgs)
}
}
packet.pending = false
return this._drainQueue()
})
this._queue.push(packet)
this._drainQueue()
}
/**
* Send the first packet of the queue, and wait for an acknowledgement from the server.
* @private
*/
private _drainQueue() {
debug("draining queue")
if (this._queue.length === 0) {
return
}
const packet = this._queue[0]
if (packet.pending) {
debug(
"packet [%d] has already been sent and is waiting for an ack",
packet.id
)
return
}
packet.pending = true
packet.tryCount++
debug("sending packet [%d] (try n°%d)", packet.id, packet.tryCount)
const currentId = this.ids
this.ids = packet.id // the same id is reused for consecutive retries, in order to allow deduplication on the server side
this.flags = packet.flags
// @ts-ignore
this.emit.apply(this, packet.args)
this.ids = currentId // restore offset
}
/**
* Sends a packet.
*
@@ -385,13 +593,28 @@ export class Socket<
debug("transport is open - connecting")
if (typeof this.auth == "function") {
this.auth((data) => {
this.packet({ type: PacketType.CONNECT, data })
this._sendConnectPacket(data as Record<string, unknown>)
})
} else {
this.packet({ type: PacketType.CONNECT, data: this.auth })
this._sendConnectPacket(this.auth)
}
}
/**
* Sends a CONNECT packet to initiate the Socket.IO session.
*
* @param data
* @private
*/
private _sendConnectPacket(data: Record<string, unknown>) {
this.packet({
type: PacketType.CONNECT,
data: this._pid
? Object.assign({ pid: this._pid, offset: this._lastOffset }, data)
: data,
})
}
/**
* Called upon engine or manager `error`.
*
@@ -435,8 +658,7 @@ export class Socket<
switch (packet.type) {
case PacketType.CONNECT:
if (packet.data && packet.data.sid) {
const id = packet.data.sid
this.onconnect(id)
this.onconnect(packet.data.sid, packet.data.pid)
} else {
this.emitReserved(
"connect_error",
@@ -503,6 +725,9 @@ export class Socket<
}
// @ts-ignore
super.emit.apply(this, args)
if (this._pid && args.length && typeof args[args.length - 1] === "string") {
this._lastOffset = args[args.length - 1]
}
}
/**
@@ -549,9 +774,11 @@ export class Socket<
*
* @private
*/
private onconnect(id: string): void {
private onconnect(id: string, pid: string) {
debug("socket connected with id %s", id)
this.id = id
this.recovered = pid && this._pid === pid
this._pid = pid // defined only if connection state recovery is enabled
this.connected = true
this.emitBuffered()
this.emitReserved("connect")
@@ -682,7 +909,9 @@ export class Socket<
*
* @returns self
*/
public timeout(timeout: number): this {
public timeout(
timeout: number
): Socket<ListenEvents, DecorateAcknowledgements<EmitEvents>> {
this.flags.timeout = timeout
return this
}