perf: optimize websocket logic

Signed-off-by: MiaoWoo <admin@yumc.pw>
This commit is contained in:
2020-11-20 14:21:57 +08:00
parent cd57944cb8
commit 894d5d43e6
12 changed files with 273 additions and 228 deletions

View File

@@ -5,11 +5,12 @@ import { Namespace, Server, Socket } from './index'
import { PacketTypes, SubPacketTypes } from './types'
import { ServerEvent } from './constants'
import { SocketId } from './adapter'
import { Transport } from '../transport'
const parser = new Parser()
export class Client extends EventEmitter implements Client {
public readonly conn
export class Client extends EventEmitter {
public readonly conn: Transport
/**
* @private
*/
@@ -21,6 +22,11 @@ export class Client extends EventEmitter implements Client {
private nsps: Map<string, Socket>
private connectTimeout: NodeJS.Timeout
private checkIntervalTimer: NodeJS.Timeout
private upgradeTimeoutTimer: NodeJS.Timeout
private pingTimeoutTimer: NodeJS.Timeout
private pingIntervalTimer: NodeJS.Timeout
constructor(server: Server, conn) {
super()
this.server = server
@@ -32,6 +38,9 @@ export class Client extends EventEmitter implements Client {
// =============================
this.sockets = new Map()
this.nsps = new Map()
// ================== engine.io
this.onOpen()
// ================== Transport
this.conn.on(ServerEvent.disconnect, (reason) => {
this.onclose(reason)
})
@@ -73,7 +82,7 @@ export class Client extends EventEmitter implements Client {
* @private
*/
private connect(name: string, auth: object = {}) {
console.debug(`client ${this.id} connecting to namespace ${name} has: ${this.server._nsps[name]}`)
console.debug(`client ${this.id} connecting to namespace ${name} has: ${this.server._nsps.has(name)}`)
if (this.server._nsps.has(name)) {
return this.doConnect(name, auth)
}
@@ -103,9 +112,7 @@ export class Client extends EventEmitter implements Client {
const nsp = this.server.of(name)
const socket = nsp._add(this, auth, () => {
console.debug(`doConnect set sockets ${socket.id}`)
this.sockets.set(socket.id, socket)
console.debug(`doConnect set nsps ${nsp.name}`)
this.nsps.set(nsp.name, socket)
})
}
@@ -128,12 +135,16 @@ export class Client extends EventEmitter implements Client {
*/
_remove(socket: Socket) {
if (this.sockets.has(socket.id)) {
const nsp = this.sockets.get(socket.id).nsp.name
this.sockets.delete(socket.id)
this.nsps.delete(nsp)
this.nsps.delete(socket.nsp.name)
} else {
console.debug(`ignoring remove for ${socket.id}`,)
}
process.nextTick(() => {
if (this.sockets.size == 0) {
this.onclose('no live socket')
}
})
}
/**
* Closes the underlying connection.
@@ -141,6 +152,7 @@ export class Client extends EventEmitter implements Client {
* @private
*/
private close() {
console.debug(`client ${this.id} close`)
if ("open" == this.conn.readyState) {
console.debug("forcing transport close")
this.onclose("forced server close")
@@ -154,7 +166,7 @@ export class Client extends EventEmitter implements Client {
* @param {Object} opts
* @private
*/
_packet(packet, opts = { preEncoded: false }) {
_packet(packet: Packet, opts = { preEncoded: false }) {
// opts = opts || {}
// const self = this
@@ -178,7 +190,11 @@ export class Client extends EventEmitter implements Client {
// } else {
// debug("ignoring packet write %j", packet)
// }
this.conn.send(opts.preEncoded ? packet as unknown as string : parser.encode(packet))
if ("open" == this.conn.readyState) {
this.conn.send(opts.preEncoded ? packet as unknown as string : parser.encode(packet))
} else {
console.debug(`ignoring write packet ${JSON.stringify(packet)} to client ${this.id} is already close!`)
}
}
/**
* Called with incoming transport data.
@@ -202,15 +218,14 @@ export class Client extends EventEmitter implements Client {
if (SubPacketTypes.CONNECT == packet.sub_type) {
this.connect(packet.nsp, packet.data)
} else {
const socket = this.nsps.get(packet.nsp)
if (socket) {
process.nextTick(function () {
process.nextTick(() => {
const socket = this.nsps.get(packet.nsp)
if (socket) {
socket._onpacket(packet)
})
} else {
console.debug(`client ${this.id} no socket for namespace ${packet.nsp} avalibe socket: `)
this.nsps.forEach((v, k) => console.debug(`- ${k} => ${v}`))
}
} else {
console.debug(`client ${this.id} no socket for namespace ${packet.nsp}.`)
}
})
}
}
/**
@@ -226,9 +241,16 @@ export class Client extends EventEmitter implements Client {
this.conn.close()
}
onclose(reason?: string) {
this.conn.readyState = "closing"
// ======= engine.io
this.onClose(reason)
// cleanup connectTimeout
if (this.connectTimeout) {
clearTimeout(this.connectTimeout)
this.connectTimeout = null
}
console.debug(`client ${this.id} close with reason ${reason}`)
// ignore a potential subsequent `close` event
this.destroy()
// `nsps` and `sockets` are cleaned up seamlessly
for (const socket of this.sockets.values()) {
socket._onclose(reason)
@@ -242,5 +264,97 @@ export class Client extends EventEmitter implements Client {
// this.conn.removeListener('error', this.onerror);
// this.conn.removeListener('close', this.onclose);
// this.decoder.removeListener('decoded', this.ondecoded);
};
}
//================== engine.io
onOpen() {
this.conn.readyState = "open"
this._packet({
type: PacketTypes.OPEN,
data: {
sid: this.id,
upgrades: [],
pingInterval: this.server.options.pingInterval,
pingTimeout: this.server.options.pingTimeout
}
})
this.schedulePing()
}
onPacket(packet: Packet) {
if ("open" === this.conn.readyState) {
// export packet event
// debug("packet")
// this.emit("packet", packet)
// Reset ping timeout on any packet, incoming data is a good sign of
// other side's liveness
this.resetPingTimeout(this.server.options.pingInterval + this.server.options.pingTimeout * 2)
switch (packet.type) {
case PacketTypes.PING:
this._packet({
type: PacketTypes.PONG,
data: packet.data
})
break
case PacketTypes.PONG:
this.schedulePing()
break
case PacketTypes.UPGRADE:
break
case PacketTypes.MESSAGE:
this.ondecoded(packet)
break
case PacketTypes.CLOSE:
this.onclose()
break
default:
console.log(`client ${this.id} reciver unknow packet type: ${packet.type}`)
}
} else {
console.debug(`packet received with closed client ${this.id}`)
}
}
/**
* Called upon transport considered closed.
* Possible reasons: `ping timeout`, `client error`, `parse error`,
* `transport error`, `server close`, `transport close`
*/
onClose(reason, description?: string) {
// if ("closed" !== this.conn.readyState) {
clearTimeout(this.pingIntervalTimer)
clearTimeout(this.pingTimeoutTimer)
clearInterval(this.checkIntervalTimer)
this.checkIntervalTimer = null
clearTimeout(this.upgradeTimeoutTimer)
// this.emit("close", reason, description)
// }
}
/**
* Pings client every `this.pingInterval` and expects response
* within `this.pingTimeout` or closes connection.
*
* @api private
*/
schedulePing() {
clearTimeout(this.pingIntervalTimer)
this.pingIntervalTimer = setTimeout(() => {
this.resetPingTimeout(this.server.options.pingTimeout)
process.nextTick(() => this._packet({ type: PacketTypes.PING }))
}, this.server.options.pingInterval)
}
/**
* Resets ping timeout.
*
* @api private
*/
resetPingTimeout(timeout) {
clearTimeout(this.pingTimeoutTimer)
this.pingTimeoutTimer = setTimeout(() => {
if (this.conn.readyState === "closed") return
this.onclose("ping timeout")
}, timeout)
}
}