From 9b69b1221b413d93b7ffc57d3db46d95cb105787 Mon Sep 17 00:00:00 2001 From: MiaoWoo Date: Tue, 24 Mar 2020 14:52:50 +0800 Subject: [PATCH] fix: ack callback call twice Signed-off-by: MiaoWoo --- packages/websocket/src/socket-io/client.ts | 6 ++--- packages/websocket/src/socket-io/index.ts | 8 +++--- packages/websocket/src/socket-io/namespace.ts | 5 +--- packages/websocket/src/socket-io/parser.ts | 16 +++++------- packages/websocket/src/socket-io/socket.ts | 26 ++++++++++++++----- 5 files changed, 33 insertions(+), 28 deletions(-) diff --git a/packages/websocket/src/socket-io/client.ts b/packages/websocket/src/socket-io/client.ts index b1e00e7e..8f67d6bc 100644 --- a/packages/websocket/src/socket-io/client.ts +++ b/packages/websocket/src/socket-io/client.ts @@ -65,7 +65,7 @@ export class Client extends EventEmitter implements SocketIO.Client { packet(packet: Packet, opts?: any) { this.conn.send(parser.encode(packet)) } - onclose(reason: string) { + onclose(reason?: string) { // debug('client close with reason %s', reason); // ignore a potential subsequent `close` event this.destroy(); @@ -77,8 +77,8 @@ export class Client extends EventEmitter implements SocketIO.Client { } this.sockets = {}; // this.decoder.destroy(); // clean up decoder - }; - close() { + } + disconnect() { // if ('open' == this.conn.readyState) { // debug('forcing transport close'); this.conn.close(); diff --git a/packages/websocket/src/socket-io/index.ts b/packages/websocket/src/socket-io/index.ts index 564862c2..e6e42c7e 100644 --- a/packages/websocket/src/socket-io/index.ts +++ b/packages/websocket/src/socket-io/index.ts @@ -13,7 +13,6 @@ import { Socket } from './socket'; import { Adapter } from './adapter'; class Server implements SocketIO.Server { - private event: EventEmitter; private nettyServer: NettyWebSocketServer; private allClients: { [key: string]: Client }; @@ -29,7 +28,6 @@ class Server implements SocketIO.Server { constructor(pipeline: any, options: SocketIO.ServerOptions) { if (!pipeline) { throw new Error('Netty Pipeline can\'t be undefiend!') } - this.event = new EventEmitter(); this.allClients = {}; this.nsps = {}; this.sockets = new Namespace('/', this); @@ -173,6 +171,8 @@ class Server implements SocketIO.Server { this.processSubPacket(packet, client); break; case PacketTypes.CLOSE: + client.onclose() + break; } } @@ -182,9 +182,9 @@ class Server implements SocketIO.Server { client.packet({ type: PacketTypes.MESSAGE, sub_type: SubPacketTypes.ERROR, - data: 'not support dynamic namespace' + data: 'not support dynamic namespace: ' + packet.nsp }); - client.close(); + client.disconnect(); return; } namespace.process(packet, client); diff --git a/packages/websocket/src/socket-io/namespace.ts b/packages/websocket/src/socket-io/namespace.ts index de93170e..7c8b612a 100644 --- a/packages/websocket/src/socket-io/namespace.ts +++ b/packages/websocket/src/socket-io/namespace.ts @@ -114,10 +114,7 @@ export class Namespace extends EventEmitter implements SocketIO.Namespace { case SubPacketTypes.CONNECT: this.add(client); break; - case SubPacketTypes.DISCONNECT: - this.del(client); - break; - case SubPacketTypes.EVENT: + default: this.sockets[client.id].onpacket(packet); break; } diff --git a/packages/websocket/src/socket-io/parser.ts b/packages/websocket/src/socket-io/parser.ts index 870fe7e0..871ad489 100644 --- a/packages/websocket/src/socket-io/parser.ts +++ b/packages/websocket/src/socket-io/parser.ts @@ -28,7 +28,7 @@ export class Parser { } if (packet.sub_type == SubPacketTypes.EVENT) { if (packet.name == undefined) { throw new Error(`SubPacketTypes.EVENT name can't be empty!`) } - packet.data = [packet.name, packet.data] + packet.data = [packet.name, ...packet.data] } // json data if (null != packet.data) { @@ -85,7 +85,7 @@ export class Parser { if (i == str.length) break; } if (buf != `${Number(buf)}` || str.charAt(i) !== '-') { - throw new Error('Illegal attachments'); + return this.error('Illegal attachments'); } p.attachments = Number(buf); } @@ -105,11 +105,11 @@ export class Parser { // look up id let next = str.charAt(i + 1); - if ('' !== next && Number.isNaN(Number(next))) { + if ('' !== next && !isNaN(Number(next))) { let id = '' while (++i) { let c = str.charAt(i); - if (null == c || Number.isNaN(Number(c))) { + if (null == c || isNaN(Number(c))) { --i; break; } @@ -119,10 +119,6 @@ export class Parser { p.id = Number(id); } - // look up packet name - if (p.sub_type == SubPacketTypes.EVENT) { - - } // ignore binary packet if (p.sub_type == SubPacketTypes.BINARY_EVENT) { return this.error('not support binary parse...') @@ -134,9 +130,9 @@ export class Parser { let isPayloadValid = payload !== false && (p.sub_type == SubPacketTypes.ERROR || Array.isArray(payload)); if (isPayloadValid) { p.name = payload[0]; - p.data = payload[1]; + p.data = payload.slice(1); } else { - return this.error('invalid payload'); + return this.error('invalid payload ' + str.substr(i)); } } diff --git a/packages/websocket/src/socket-io/socket.ts b/packages/websocket/src/socket-io/socket.ts index 883e85ef..e2ee76ba 100644 --- a/packages/websocket/src/socket-io/socket.ts +++ b/packages/websocket/src/socket-io/socket.ts @@ -109,7 +109,13 @@ export class Socket extends EventEmitter implements SocketIO.Socket { this.rooms = {}; } disconnect(close?: boolean): SocketIO.Socket { - this.client.close(); + if (!this.connected) return this; + if (close) { + this.client.disconnect(); + } else { + this.packet({ type: PacketTypes.MESSAGE, sub_type: SubPacketTypes.DISCONNECT }); + this.onclose('server namespace disconnect'); + } return this; } compress(compress: boolean): SocketIO.Socket { @@ -144,7 +150,7 @@ export class Socket extends EventEmitter implements SocketIO.Socket { type: PacketTypes.MESSAGE, sub_type: (this.flags.binary !== undefined ? this.flags.binary : this.hasBin(args)) ? SubPacketTypes.BINARY_EVENT : SubPacketTypes.EVENT, name: event, - data: args[0] + data: args } // access last argument to see if it's an ACK callback @@ -211,21 +217,27 @@ export class Socket extends EventEmitter implements SocketIO.Socket { } onpacket(packet: Packet) { switch (packet.sub_type) { + // 2 case SubPacketTypes.EVENT: this.onevent(packet); break; + // 5 case SubPacketTypes.BINARY_EVENT: this.onevent(packet); break; + // 3 case SubPacketTypes.ACK: this.onack(packet); break; + // 6 case SubPacketTypes.BINARY_ACK: this.onack(packet); break; + // 1 case SubPacketTypes.DISCONNECT: this.ondisconnect(); break; + // 4 case SubPacketTypes.ERROR: this.onerror(new Error(packet.data)); } @@ -242,12 +254,12 @@ export class Socket extends EventEmitter implements SocketIO.Socket { this.onclose('client namespace disconnect') } onevent(packet: Packet) { - // console.debug('emitting event %j', args); if (null != packet.id) { - // console.debug('attaching ack callback to event'); - this.dispatch(packet, this.ack(packet.id)) + // debug('attaching ack callback to event'); + this.dispatch(packet, this.ack(packet.id)); + } else { + this.dispatch(packet); } - this.dispatch(packet); } ack(id: number) { let sent = false; @@ -274,7 +286,7 @@ export class Socket extends EventEmitter implements SocketIO.Socket { } dispatch(packet: Packet, ack?: Function) { if (ack) { this.acks[packet.id] = ack; } - super.emit(packet.name, packet.data) + super.emit(packet.name, ...packet.data, ack) } private hasBin(obj: any) { return false;