From 894d5d43e6ff798a896a9e8738ae24e5090cd3bf Mon Sep 17 00:00:00 2001 From: MiaoWoo Date: Fri, 20 Nov 2020 14:21:57 +0800 Subject: [PATCH] perf: optimize websocket logic Signed-off-by: MiaoWoo --- packages/websocket/src/interfaces.ts | 5 - .../websocket/src/netty/adapter/websocket.ts | 8 +- packages/websocket/src/netty/client.ts | 37 +---- .../websocket/src/netty/websocket_detect.ts | 10 -- .../websocket/src/netty/websocket_handler.ts | 8 +- packages/websocket/src/socket-io/client.ts | 152 +++++++++++++++--- packages/websocket/src/socket-io/constants.ts | 5 +- packages/websocket/src/socket-io/index.ts | 135 +++++++--------- packages/websocket/src/socket-io/namespace.ts | 24 +-- packages/websocket/src/socket-io/socket.ts | 43 ++--- packages/websocket/src/tomcat/client.ts | 39 +---- packages/websocket/src/transport.ts | 35 ++++ 12 files changed, 273 insertions(+), 228 deletions(-) delete mode 100644 packages/websocket/src/interfaces.ts create mode 100644 packages/websocket/src/transport.ts diff --git a/packages/websocket/src/interfaces.ts b/packages/websocket/src/interfaces.ts deleted file mode 100644 index f583f6fc..00000000 --- a/packages/websocket/src/interfaces.ts +++ /dev/null @@ -1,5 +0,0 @@ -export interface InnerClient { - id: string - send(text: string) - close() -} diff --git a/packages/websocket/src/netty/adapter/websocket.ts b/packages/websocket/src/netty/adapter/websocket.ts index f2dd285e..6873dd7d 100644 --- a/packages/websocket/src/netty/adapter/websocket.ts +++ b/packages/websocket/src/netty/adapter/websocket.ts @@ -12,8 +12,12 @@ export abstract class WebSocketHandlerAdapter { this._Handler = new ChannelInboundHandlerAdapterImpl() } abstract channelRead(ctx: any, channel: any) - abstract channelInactive(ctx: any) - abstract channelUnregistered(ctx: any) + channelInactive(ctx: any) { + ctx.fireChannelInactive() + } + channelUnregistered(ctx: any) { + ctx.fireChannelUnregistered() + } abstract exceptionCaught(ctx: any, cause: Error) getHandler() { return this._Handler diff --git a/packages/websocket/src/netty/client.ts b/packages/websocket/src/netty/client.ts index 4882c955..616adb9e 100644 --- a/packages/websocket/src/netty/client.ts +++ b/packages/websocket/src/netty/client.ts @@ -1,45 +1,24 @@ -import { EventEmitter } from 'events' -import { InnerClient } from '../interfaces' +import { Transport } from '../transport' import { AttributeKeys } from './constants' const TextWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.TextWebSocketFrame') -export class NettyClient extends EventEmitter implements InnerClient { - private _id: string +export class NettyClient extends Transport { private channel: any - server: any - readyState: string - remoteAddress: string - upgraded: boolean - request: any - constructor(server: any, channel: any) { - super() - this.server = server - this.readyState = 'open' + super(server) this.remoteAddress = channel.remoteAddress() + '' - this.upgraded = true this.request = channel.attr(AttributeKeys.Request).get() - this.channel = channel this._id = channel.id() + '' + this.channel = channel } - get id() { - return this._id + doSend(text: string) { + this.channel.writeAndFlush(new TextWebSocketFrame(text)) } - send(text: string) { - if (this.readyState == 'open') { - this.channel.writeAndFlush(new TextWebSocketFrame(text)) - } else { - console.debug(`send message ${text} to close client ${this._id}`) - } - } - close() { - if (this.readyState = 'open') { - this.channel.close() - this.readyState = 'close' - } + doClose() { + this.channel.close() } } diff --git a/packages/websocket/src/netty/websocket_detect.ts b/packages/websocket/src/netty/websocket_detect.ts index a5a510ec..6b29df01 100644 --- a/packages/websocket/src/netty/websocket_detect.ts +++ b/packages/websocket/src/netty/websocket_detect.ts @@ -11,16 +11,6 @@ export class WebSocketDetect extends WebSocketHandlerAdapter { channelRead(ctx: any, channel: any) { this.event.emit(ServerEvent.detect, ctx, channel) } - channelInactive(ctx: any) { - console.debug('WebSocketDetect channelUnregistered ' + ctx) - this.event.emit(ServerEvent.disconnect, ctx, 'client disconnect') - ctx.channelInactive() - } - channelUnregistered(ctx: any) { - console.debug('WebSocketDetect channelUnregistered ' + ctx) - this.event.emit(ServerEvent.disconnect, ctx, 'client disconnect') - ctx.fireChannelUnregistered() - } exceptionCaught(ctx: any, cause: Error) { this.event.emit(ServerEvent.error, ctx, cause) } diff --git a/packages/websocket/src/netty/websocket_handler.ts b/packages/websocket/src/netty/websocket_handler.ts index ba30bb43..812cc4e0 100644 --- a/packages/websocket/src/netty/websocket_handler.ts +++ b/packages/websocket/src/netty/websocket_handler.ts @@ -43,14 +43,14 @@ export class WebSocketHandler extends WebSocketHandlerAdapter { channelInactive(ctx: any) { console.debug('WebSocketHandler channelInactive ' + ctx) - this.options.event.emit(ServerEvent.disconnect, ctx, 'client disconnect') - ctx.channelInactive() + this.options.event.emit(ServerEvent.disconnect, ctx, 'netty channelInactive') + super.channelInactive(ctx) } channelUnregistered(ctx: any) { console.debug('WebSocketHandler channelUnregistered ' + ctx) - this.options.event.emit(ServerEvent.disconnect, ctx, 'client disconnect') - ctx.fireChannelUnregistered() + this.options.event.emit(ServerEvent.disconnect, ctx, 'netty channelUnregistered') + super.channelUnregistered(ctx) } exceptionCaught(ctx: any, cause: Error) { diff --git a/packages/websocket/src/socket-io/client.ts b/packages/websocket/src/socket-io/client.ts index 6b529af9..9ed2fcb4 100644 --- a/packages/websocket/src/socket-io/client.ts +++ b/packages/websocket/src/socket-io/client.ts @@ -5,11 +5,12 @@ import { Namespace, Server, Socket } from './index' import { PacketTypes, SubPacketTypes } from './types' import { ServerEvent } from './constants' import { SocketId } from './adapter' +import { Transport } from '../transport' const parser = new Parser() -export class Client extends EventEmitter implements Client { - public readonly conn +export class Client extends EventEmitter { + public readonly conn: Transport /** * @private */ @@ -21,6 +22,11 @@ export class Client extends EventEmitter implements Client { private nsps: Map private connectTimeout: NodeJS.Timeout + private checkIntervalTimer: NodeJS.Timeout + private upgradeTimeoutTimer: NodeJS.Timeout + private pingTimeoutTimer: NodeJS.Timeout + private pingIntervalTimer: NodeJS.Timeout + constructor(server: Server, conn) { super() this.server = server @@ -32,6 +38,9 @@ export class Client extends EventEmitter implements Client { // ============================= this.sockets = new Map() this.nsps = new Map() + // ================== engine.io + this.onOpen() + // ================== Transport this.conn.on(ServerEvent.disconnect, (reason) => { this.onclose(reason) }) @@ -73,7 +82,7 @@ export class Client extends EventEmitter implements Client { * @private */ private connect(name: string, auth: object = {}) { - console.debug(`client ${this.id} connecting to namespace ${name} has: ${this.server._nsps[name]}`) + console.debug(`client ${this.id} connecting to namespace ${name} has: ${this.server._nsps.has(name)}`) if (this.server._nsps.has(name)) { return this.doConnect(name, auth) } @@ -103,9 +112,7 @@ export class Client extends EventEmitter implements Client { const nsp = this.server.of(name) const socket = nsp._add(this, auth, () => { - console.debug(`doConnect set sockets ${socket.id}`) this.sockets.set(socket.id, socket) - console.debug(`doConnect set nsps ${nsp.name}`) this.nsps.set(nsp.name, socket) }) } @@ -128,12 +135,16 @@ export class Client extends EventEmitter implements Client { */ _remove(socket: Socket) { if (this.sockets.has(socket.id)) { - const nsp = this.sockets.get(socket.id).nsp.name this.sockets.delete(socket.id) - this.nsps.delete(nsp) + this.nsps.delete(socket.nsp.name) } else { console.debug(`ignoring remove for ${socket.id}`,) } + process.nextTick(() => { + if (this.sockets.size == 0) { + this.onclose('no live socket') + } + }) } /** * Closes the underlying connection. @@ -141,6 +152,7 @@ export class Client extends EventEmitter implements Client { * @private */ private close() { + console.debug(`client ${this.id} close`) if ("open" == this.conn.readyState) { console.debug("forcing transport close") this.onclose("forced server close") @@ -154,7 +166,7 @@ export class Client extends EventEmitter implements Client { * @param {Object} opts * @private */ - _packet(packet, opts = { preEncoded: false }) { + _packet(packet: Packet, opts = { preEncoded: false }) { // opts = opts || {} // const self = this @@ -178,7 +190,11 @@ export class Client extends EventEmitter implements Client { // } else { // debug("ignoring packet write %j", packet) // } - this.conn.send(opts.preEncoded ? packet as unknown as string : parser.encode(packet)) + if ("open" == this.conn.readyState) { + this.conn.send(opts.preEncoded ? packet as unknown as string : parser.encode(packet)) + } else { + console.debug(`ignoring write packet ${JSON.stringify(packet)} to client ${this.id} is already close!`) + } } /** * Called with incoming transport data. @@ -202,15 +218,14 @@ export class Client extends EventEmitter implements Client { if (SubPacketTypes.CONNECT == packet.sub_type) { this.connect(packet.nsp, packet.data) } else { - const socket = this.nsps.get(packet.nsp) - if (socket) { - process.nextTick(function () { + process.nextTick(() => { + const socket = this.nsps.get(packet.nsp) + if (socket) { socket._onpacket(packet) - }) - } else { - console.debug(`client ${this.id} no socket for namespace ${packet.nsp} avalibe socket: `) - this.nsps.forEach((v, k) => console.debug(`- ${k} => ${v}`)) - } + } else { + console.debug(`client ${this.id} no socket for namespace ${packet.nsp}.`) + } + }) } } /** @@ -226,9 +241,16 @@ export class Client extends EventEmitter implements Client { this.conn.close() } onclose(reason?: string) { + this.conn.readyState = "closing" + // ======= engine.io + this.onClose(reason) + // cleanup connectTimeout + if (this.connectTimeout) { + clearTimeout(this.connectTimeout) + this.connectTimeout = null + } console.debug(`client ${this.id} close with reason ${reason}`) // ignore a potential subsequent `close` event - this.destroy() // `nsps` and `sockets` are cleaned up seamlessly for (const socket of this.sockets.values()) { socket._onclose(reason) @@ -242,5 +264,97 @@ export class Client extends EventEmitter implements Client { // this.conn.removeListener('error', this.onerror); // this.conn.removeListener('close', this.onclose); // this.decoder.removeListener('decoded', this.ondecoded); - }; + } + + //================== engine.io + onOpen() { + this.conn.readyState = "open" + this._packet({ + type: PacketTypes.OPEN, + data: { + sid: this.id, + upgrades: [], + pingInterval: this.server.options.pingInterval, + pingTimeout: this.server.options.pingTimeout + } + }) + this.schedulePing() + } + + onPacket(packet: Packet) { + if ("open" === this.conn.readyState) { + // export packet event + // debug("packet") + // this.emit("packet", packet) + + // Reset ping timeout on any packet, incoming data is a good sign of + // other side's liveness + this.resetPingTimeout(this.server.options.pingInterval + this.server.options.pingTimeout * 2) + switch (packet.type) { + case PacketTypes.PING: + this._packet({ + type: PacketTypes.PONG, + data: packet.data + }) + break + case PacketTypes.PONG: + this.schedulePing() + break + case PacketTypes.UPGRADE: + break + case PacketTypes.MESSAGE: + this.ondecoded(packet) + break + case PacketTypes.CLOSE: + this.onclose() + break + default: + console.log(`client ${this.id} reciver unknow packet type: ${packet.type}`) + } + } else { + console.debug(`packet received with closed client ${this.id}`) + } + } + /** + * Called upon transport considered closed. + * Possible reasons: `ping timeout`, `client error`, `parse error`, + * `transport error`, `server close`, `transport close` + */ + onClose(reason, description?: string) { + // if ("closed" !== this.conn.readyState) { + clearTimeout(this.pingIntervalTimer) + clearTimeout(this.pingTimeoutTimer) + + clearInterval(this.checkIntervalTimer) + this.checkIntervalTimer = null + clearTimeout(this.upgradeTimeoutTimer) + // this.emit("close", reason, description) + // } + } + /** + * Pings client every `this.pingInterval` and expects response + * within `this.pingTimeout` or closes connection. + * + * @api private + */ + schedulePing() { + clearTimeout(this.pingIntervalTimer) + this.pingIntervalTimer = setTimeout(() => { + this.resetPingTimeout(this.server.options.pingTimeout) + process.nextTick(() => this._packet({ type: PacketTypes.PING })) + }, this.server.options.pingInterval) + } + + /** + * Resets ping timeout. + * + * @api private + */ + resetPingTimeout(timeout) { + clearTimeout(this.pingTimeoutTimer) + this.pingTimeoutTimer = setTimeout(() => { + if (this.conn.readyState === "closed") return + this.onclose("ping timeout") + }, timeout) + } } diff --git a/packages/websocket/src/socket-io/constants.ts b/packages/websocket/src/socket-io/constants.ts index 03a43fd3..ba29e36c 100644 --- a/packages/websocket/src/socket-io/constants.ts +++ b/packages/websocket/src/socket-io/constants.ts @@ -4,5 +4,6 @@ export enum ServerEvent { connection = 'connection', message = 'message', error = 'error', - disconnect = 'disconnect' -} \ No newline at end of file + disconnecting = 'disconnecting', + disconnect = 'disconnect', +} diff --git a/packages/websocket/src/socket-io/index.ts b/packages/websocket/src/socket-io/index.ts index d6313e25..06286ab1 100644 --- a/packages/websocket/src/socket-io/index.ts +++ b/packages/websocket/src/socket-io/index.ts @@ -8,7 +8,7 @@ import { PacketTypes, SubPacketTypes } from './types' import { Packet } from './packet' import { Socket } from './socket' import { Adapter } from './adapter' -import { InnerClient } from '../interfaces' +import { Transport } from '../transport' import { ParentNamespace } from './parent-namespace' interface EngineOptions { @@ -144,7 +144,7 @@ class Server { /** * @private */ - readonly _parser: Parser + _parser: Parser private readonly encoder /** @@ -172,29 +172,33 @@ class Server { */ _connectTimeout: number - options: ServerOptions + options: Partial private websocketServer: WebSocketServer private allClients: Map constructor(instance: any, options: Partial) { if (!instance) { throw new Error('instance can\'t be undefiend!') } - this.allClients = new Map() - this._nsps = new Map() - this.connectTimeout(options.connectTimeout || 45000) - this._parser = options.parser || new Parser() - this.adapter(options.adapter || Adapter) + this.options = Object.assign({ + event: new EventEmitter(), + path: '/socket.io', + root: root + '/wwwroot', + serveClient: false, + connectTimeout: 45000, + wsEngine: process.env.EIO_WS_ENGINE || "ws", + pingTimeout: 5000, + pingInterval: 25000, + upgradeTimeout: 10000, + maxHttpBufferSize: 1e6, + transports: 'websocket', + allowUpgrades: true, + httpCompression: { + threshold: 1024 + }, + cors: false + }, options) + this.initServerConfig() this.sockets = this.of('/') - if (instance.class.name.startsWith('io.netty.channel')) { - let { NettyWebSocketServer } = require("../netty") - this.websocketServer = new NettyWebSocketServer(instance, Object.assign({ - event: new EventEmitter(), - path: '/socket.io', - root: root + '/wwwroot' - }, options)) - } else { - let { TomcatWebSocketServer } = require("../tomcat") - this.websocketServer = new TomcatWebSocketServer(instance, options) - } + this.selectServerImpl(instance) this.initServer() } /** @@ -496,15 +500,6 @@ class Server { console.debug(`incoming connection with id ${conn.id}`) let client = new Client(this, conn) this.allClients.set(conn.id, client) - client._packet({ - type: PacketTypes.OPEN, - data: { - sid: client.id, - upgrades: [], - pingInterval: 25000, - pingTimeout: 5000 - } - }) return this } // of(nsp: string): Namespace { @@ -560,8 +555,9 @@ class Server { return nsp } close(fn?: () => void): void { - for (const socket of this.sockets.sockets.values()) { - socket._onclose("server shutting down") + this.clients.length + for (const client of this.allClients.values()) { + client._disconnect() } // this.engine.close() @@ -605,70 +601,57 @@ class Server { return this.sockets.compress(args[0]) } // =============================== + private initServerConfig() { + this.allClients = new Map() + this._nsps = new Map() + this.connectTimeout(this.options.connectTimeout || 45000) + this._parser = this.options.parser || new Parser() + this.adapter(this.options.adapter || Adapter) + } + private selectServerImpl(instance: any) { + let WebSocketServerImpl = undefined + if (instance.class.name.startsWith('io.netty.channel')) { + WebSocketServerImpl = require("../netty").NettyWebSocketServer + } else { + WebSocketServerImpl = require("../tomcat").TomcatWebSocketServer + } + this.websocketServer = new WebSocketServerImpl(instance, this.options) + } private initServer() { - this.websocketServer.on(ServerEvent.connect, (innerClient: InnerClient) => { - this.onconnection(innerClient) + this.websocketServer.on(ServerEvent.connect, (transport: Transport) => { + this.onconnection(transport) }) - this.websocketServer.on(ServerEvent.message, (innerClient: InnerClient, text) => { - if (this.allClients.has(innerClient.id)) { - this.processPacket(this._parser.decode(text), this.allClients.get(innerClient.id)) + this.websocketServer.on(ServerEvent.message, (transport: Transport, text) => { + if (this.allClients.has(transport.id)) { + let client = this.allClients.get(transport.id) + client.onPacket(this._parser.decode(text)) } else { - console.error(`unknow engine socket ${innerClient.id} reciver message ${text}`) + console.error(`unknow transport ${transport.id} reciver message ${text}`) } }) - this.websocketServer.on(ServerEvent.disconnect, (innerClient: InnerClient, reason) => { - if (this.allClients.has(innerClient.id)) { - this.allClients.get(innerClient.id).onclose(reason) - this.allClients.delete(innerClient.id) + this.websocketServer.on(ServerEvent.disconnect, (transport: Transport, reason) => { + if (this.allClients.has(transport.id)) { + this.allClients.get(transport.id).onclose(reason) + this.allClients.delete(transport.id) } else { - console.error(`unknow engine innerClient ${innerClient?.id} disconnect cause ${reason}`) + console.error(`unknow transport ${transport?.id} disconnect cause ${reason}`) } }) - this.websocketServer.on(ServerEvent.error, (innerClient: InnerClient, cause) => { - if (this.allClients.has(innerClient?.id)) { - let client = this.allClients.get(innerClient?.id) + this.websocketServer.on(ServerEvent.error, (transport: Transport, cause) => { + if (this.allClients.has(transport?.id)) { + let client = this.allClients.get(transport?.id) if (client.listeners(ServerEvent.error).length) { client.emit(ServerEvent.error, cause) } else { - console.error(`engine innerClient ${innerClient.id} cause error: ${cause}`) + console.error(`client ${client.id} cause error: ${cause}`) console.ex(cause) } } else { - console.error(`unknow innerClient ${innerClient?.id} cause error: ${cause}`) + console.error(`unknow transport ${transport?.id} cause error: ${cause}`) console.ex(cause) } }) } - - private processPacket(packet: Packet, client: Client) { - switch (packet.type) { - case PacketTypes.PING: - client._packet({ - type: PacketTypes.PONG, - data: packet.data - }) - break - case PacketTypes.UPGRADE: - break - case PacketTypes.MESSAGE: - this.processSubPacket(packet, client) - break - case PacketTypes.CLOSE: - client.onclose() - break - } - } - - private processSubPacket(packet: Packet, client: Client) { - switch (packet.sub_type) { - case SubPacketTypes.CONNECT: - client.doConnect(packet.nsp, {}) - break - default: - client.ondecoded(packet) - break - } - } } /** diff --git a/packages/websocket/src/socket-io/namespace.ts b/packages/websocket/src/socket-io/namespace.ts index 95caca35..8fa4c8c9 100644 --- a/packages/websocket/src/socket-io/namespace.ts +++ b/packages/websocket/src/socket-io/namespace.ts @@ -58,7 +58,6 @@ export class Namespace extends EventEmitter { public use( fn: (socket: Socket, next: (err?: ExtendedError) => void) => void ): Namespace { - throw new Error("Method not implemented.") this._fns.push(fn) return this } @@ -96,8 +95,8 @@ export class Namespace extends EventEmitter { return this.to(name) } _add(client: Client, query?: any, fn?: () => void) { - console.debug(`adding socket to nsp ${this.name}`) const socket = new Socket(this, client, query || {}) + console.debug(`client ${client.id} adding socket ${socket.id} to nsp ${this.name}`) this.run(socket, err => { process.nextTick(() => { if ("open" == client.conn.readyState) { @@ -137,7 +136,7 @@ export class Namespace extends EventEmitter { console.debug(`namespace ${this.name} remove socket ${socket.id}`) this.sockets.delete(socket.id) } else { - console.debug(`ignoring remove for ${socket.id}`) + console.debug(`namespace ${this.name} ignoring remove for ${socket.id}`) } } emit(event: string, ...args: any[]): boolean { @@ -185,9 +184,7 @@ export class Namespace extends EventEmitter { */ public allSockets(): Promise> { if (!this.adapter) { - throw new Error( - "No adapter for this namespace, are you trying to get the list of clients of a dynamic namespace?" - ) + throw new Error("No adapter for this namespace, are you trying to get the list of clients of a dynamic namespace?") } const rooms = new Set(this._rooms) this._rooms.clear() @@ -233,24 +230,9 @@ export class Namespace extends EventEmitter { hasBin(args: any[]) { return false } - del(client: Client) { - let socket = this.sockets[client.id] - socket.disconnect() - delete this.sockets[client.id] - } clients(fn: (sockets: Socket[]) => Namespace): Namespace { return fn(Object.values(this.sockets)) } - process(packet: Packet, client: Client) { - switch (packet.sub_type) { - case SubPacketTypes.CONNECT: - client.doConnect(this.name, {}) - break - default: - this.sockets.get(client.id)._onpacket(packet) - break - } - } close() { this.removeAllListeners(ServerEvent.connect) this.removeAllListeners(ServerEvent.connection) diff --git a/packages/websocket/src/socket-io/socket.ts b/packages/websocket/src/socket-io/socket.ts index 540a8bee..dd085c93 100644 --- a/packages/websocket/src/socket-io/socket.ts +++ b/packages/websocket/src/socket-io/socket.ts @@ -88,15 +88,6 @@ export class Socket extends EventEmitter { private _rooms: Set = new Set(); private _anyListeners: Array<(...args: any[]) => void> - private events = [ - 'connect', - "connect_error", - 'disconnect', - 'disconnecting', - 'newListener', - 'removeListener' - ] - constructor(nsp: Namespace, client: Client, auth = {}) { super() this.nsp = nsp @@ -114,12 +105,6 @@ export class Socket extends EventEmitter { this._rooms = new Set() } emit(event: string, ...args: any[]): boolean { - if (~this.events.indexOf(event)) { - super.emit(event, ...args) - // @ts-ignore - return this - } - let packet: Packet = { type: PacketTypes.MESSAGE, sub_type: (this.flags.binary !== undefined ? this.flags.binary : this.hasBin(args)) ? SubPacketTypes.BINARY_EVENT : SubPacketTypes.EVENT, @@ -213,9 +198,9 @@ export class Socket extends EventEmitter { * @private */ _onconnect(): void { - console.debug("socket connected - writing packet") + console.debug(`socket ${this.id} connected - writing packet`) this.join(this.id) - this.packet({ type: PacketTypes.MESSAGE, sub_type: SubPacketTypes.CONNECT }) + this.packet({ type: PacketTypes.MESSAGE, sub_type: SubPacketTypes.CONNECT, data: { sid: this.id } }) } _onpacket(packet: Packet) { switch (packet.sub_type) { @@ -246,7 +231,7 @@ export class Socket extends EventEmitter { } onevent(packet: Packet) { if (null != packet.id) { - console.debug('attaching ack callback to event') + console.trace(`attaching ack ${packet.id} callback to client ${this.id} event`) this.dispatch(packet, this.ack(packet.id)) } else { this.dispatch(packet) @@ -266,13 +251,13 @@ export class Socket extends EventEmitter { } } onack(packet: Packet) { - let ack = this.acks[packet.id] + let ack = this.acks.get(packet.id) if ('function' == typeof ack) { - console.debug('calling ack %s with %j', packet.id, packet.data) + console.trace(`calling ack ${packet.id} on socket ${this.id} with ${packet.data}`) ack.apply(this, packet.data) - delete this.acks[packet.id] + this.acks.delete(packet.id) } else { - console.debug('bad ack %s', packet.id) + console.trace(`bad ack ${packet.id} on socket ${this.id}`) } } /** @@ -281,7 +266,7 @@ export class Socket extends EventEmitter { * @private */ private ondisconnect(): void { - console.debug("got disconnect packet") + console.debug(`socket ${this.id} got disconnect packet`) this._onclose("client namespace disconnect") } @@ -294,7 +279,7 @@ export class Socket extends EventEmitter { if (this.listeners("error").length) { super.emit("error", err) } else { - console.error("Missing error handler on `socket`.") + console.error(`Missing error handler on 'socket(${this.id})'.`) console.error(err.stack) } } @@ -308,15 +293,15 @@ export class Socket extends EventEmitter { * @private */ _onclose(reason: string) { - console.debug(`closing socket - reason: ${reason} connected: ${this.connected}`) if (!this.connected) return this - this.emit('disconnecting', reason) + console.debug(`closing socket ${this.id} - reason: ${reason} connected: ${this.connected}`) + super.emit(ServerEvent.disconnecting, reason) this.leaveAll() this.nsp._remove(this) this.client._remove(this) this.connected = false this.disconnected = true - this.emit('disconnect', reason) + super.emit(ServerEvent.disconnect, reason) } /** @@ -496,8 +481,8 @@ export class Socket extends EventEmitter { this._onerror(error) } } - dispatch(packet: Packet, ack?: Function) { - if (ack) { this.acks[packet.id] = ack } + dispatch(packet: Packet, ack?: () => void) { + if (ack) { this.acks.set(packet.id, ack) } super.emit(packet.name, ...packet.data, ack) } private hasBin(obj: any) { diff --git a/packages/websocket/src/tomcat/client.ts b/packages/websocket/src/tomcat/client.ts index 3bbfae14..cf19e8f9 100644 --- a/packages/websocket/src/tomcat/client.ts +++ b/packages/websocket/src/tomcat/client.ts @@ -1,45 +1,22 @@ -import { EventEmitter } from 'events' -import { InnerClient } from '../interfaces' +import { Transport } from '../transport' -export class TomcatClient extends EventEmitter implements InnerClient { - private _id: string +export class TomcatClient extends Transport { private session: javax.websocket.Session - server: any - readyState: string - remoteAddress: string - upgraded: boolean - request: any - constructor(server: any, session: javax.websocket.Session) { - super() - this.server = server - this.readyState = 'open' + super(server) this.remoteAddress = session + '' - this.upgraded = true this.request = { uri: () => `${session.getRequestURI()}`, headers: () => [] } - - this.session = session this._id = session.getId() + '' + this.session = session } - - get id() { - return this._id + doSend(text: string) { + Java.synchronized(() => this.session.getBasicRemote().sendText(text), this.session)() } - send(text: string) { - if (this.readyState == 'open') { - Java.synchronized(() => this.session.getBasicRemote().sendText(text), this.session)() - } else { - console.debug(`send message ${text} to close client ${this._id}`) - } - } - close() { - if (this.readyState == 'open') { - this.readyState = 'close' - this.session.close() - } + doClose() { + this.session.close() } } diff --git a/packages/websocket/src/transport.ts b/packages/websocket/src/transport.ts new file mode 100644 index 00000000..66d96cb1 --- /dev/null +++ b/packages/websocket/src/transport.ts @@ -0,0 +1,35 @@ +import { EventEmitter } from 'events' + +export abstract class Transport extends EventEmitter { + protected _id: string + + server: any + readyState: 'opening' | 'open' | 'closing' | 'closed' + remoteAddress: string + upgraded: boolean + request: any + + constructor(server: any) { + super() + this.server = server + this.readyState = 'open' + this.upgraded = true + } + get id() { + return this._id + } + send(text: string) { + if (this.readyState == 'open') { + this.doSend(text) + } else { + console.debug(`send message ${text} to close client ${this._id}`) + } + } + close() { + if ("closed" === this.readyState || "closing" === this.readyState) { return } + this.doClose() + this.readyState = 'closed' + } + abstract doSend(text: string) + abstract doClose() +}