fix: ack callback call twice

Signed-off-by: MiaoWoo <admin@yumc.pw>
This commit is contained in:
MiaoWoo 2020-03-24 14:52:50 +08:00
parent cbb73b451f
commit 9b69b1221b
5 changed files with 33 additions and 28 deletions

View File

@ -65,7 +65,7 @@ export class Client extends EventEmitter implements SocketIO.Client {
packet(packet: Packet, opts?: any) {
this.conn.send(parser.encode(packet))
}
onclose(reason: string) {
onclose(reason?: string) {
// debug('client close with reason %s', reason);
// ignore a potential subsequent `close` event
this.destroy();
@ -77,8 +77,8 @@ export class Client extends EventEmitter implements SocketIO.Client {
}
this.sockets = {};
// this.decoder.destroy(); // clean up decoder
};
close() {
}
disconnect() {
// if ('open' == this.conn.readyState) {
// debug('forcing transport close');
this.conn.close();

View File

@ -13,7 +13,6 @@ import { Socket } from './socket';
import { Adapter } from './adapter';
class Server implements SocketIO.Server {
private event: EventEmitter;
private nettyServer: NettyWebSocketServer;
private allClients: { [key: string]: Client };
@ -29,7 +28,6 @@ class Server implements SocketIO.Server {
constructor(pipeline: any, options: SocketIO.ServerOptions) {
if (!pipeline) { throw new Error('Netty Pipeline can\'t be undefiend!') }
this.event = new EventEmitter();
this.allClients = {};
this.nsps = {};
this.sockets = new Namespace('/', this);
@ -173,6 +171,8 @@ class Server implements SocketIO.Server {
this.processSubPacket(packet, client);
break;
case PacketTypes.CLOSE:
client.onclose()
break;
}
}
@ -182,9 +182,9 @@ class Server implements SocketIO.Server {
client.packet({
type: PacketTypes.MESSAGE,
sub_type: SubPacketTypes.ERROR,
data: 'not support dynamic namespace'
data: 'not support dynamic namespace: ' + packet.nsp
});
client.close();
client.disconnect();
return;
}
namespace.process(packet, client);

View File

@ -114,10 +114,7 @@ export class Namespace extends EventEmitter implements SocketIO.Namespace {
case SubPacketTypes.CONNECT:
this.add(client);
break;
case SubPacketTypes.DISCONNECT:
this.del(client);
break;
case SubPacketTypes.EVENT:
default:
this.sockets[client.id].onpacket(packet);
break;
}

View File

@ -28,7 +28,7 @@ export class Parser {
}
if (packet.sub_type == SubPacketTypes.EVENT) {
if (packet.name == undefined) { throw new Error(`SubPacketTypes.EVENT name can't be empty!`) }
packet.data = [packet.name, packet.data]
packet.data = [packet.name, ...packet.data]
}
// json data
if (null != packet.data) {
@ -85,7 +85,7 @@ export class Parser {
if (i == str.length) break;
}
if (buf != `${Number(buf)}` || str.charAt(i) !== '-') {
throw new Error('Illegal attachments');
return this.error('Illegal attachments');
}
p.attachments = Number(buf);
}
@ -105,11 +105,11 @@ export class Parser {
// look up id
let next = str.charAt(i + 1);
if ('' !== next && Number.isNaN(Number(next))) {
if ('' !== next && !isNaN(Number(next))) {
let id = ''
while (++i) {
let c = str.charAt(i);
if (null == c || Number.isNaN(Number(c))) {
if (null == c || isNaN(Number(c))) {
--i;
break;
}
@ -119,10 +119,6 @@ export class Parser {
p.id = Number(id);
}
// look up packet name
if (p.sub_type == SubPacketTypes.EVENT) {
}
// ignore binary packet
if (p.sub_type == SubPacketTypes.BINARY_EVENT) {
return this.error('not support binary parse...')
@ -134,9 +130,9 @@ export class Parser {
let isPayloadValid = payload !== false && (p.sub_type == SubPacketTypes.ERROR || Array.isArray(payload));
if (isPayloadValid) {
p.name = payload[0];
p.data = payload[1];
p.data = payload.slice(1);
} else {
return this.error('invalid payload');
return this.error('invalid payload ' + str.substr(i));
}
}

View File

@ -109,7 +109,13 @@ export class Socket extends EventEmitter implements SocketIO.Socket {
this.rooms = {};
}
disconnect(close?: boolean): SocketIO.Socket {
this.client.close();
if (!this.connected) return this;
if (close) {
this.client.disconnect();
} else {
this.packet({ type: PacketTypes.MESSAGE, sub_type: SubPacketTypes.DISCONNECT });
this.onclose('server namespace disconnect');
}
return this;
}
compress(compress: boolean): SocketIO.Socket {
@ -144,7 +150,7 @@ export class Socket extends EventEmitter implements SocketIO.Socket {
type: PacketTypes.MESSAGE,
sub_type: (this.flags.binary !== undefined ? this.flags.binary : this.hasBin(args)) ? SubPacketTypes.BINARY_EVENT : SubPacketTypes.EVENT,
name: event,
data: args[0]
data: args
}
// access last argument to see if it's an ACK callback
@ -211,21 +217,27 @@ export class Socket extends EventEmitter implements SocketIO.Socket {
}
onpacket(packet: Packet) {
switch (packet.sub_type) {
// 2
case SubPacketTypes.EVENT:
this.onevent(packet);
break;
// 5
case SubPacketTypes.BINARY_EVENT:
this.onevent(packet);
break;
// 3
case SubPacketTypes.ACK:
this.onack(packet);
break;
// 6
case SubPacketTypes.BINARY_ACK:
this.onack(packet);
break;
// 1
case SubPacketTypes.DISCONNECT:
this.ondisconnect();
break;
// 4
case SubPacketTypes.ERROR:
this.onerror(new Error(packet.data));
}
@ -242,12 +254,12 @@ export class Socket extends EventEmitter implements SocketIO.Socket {
this.onclose('client namespace disconnect')
}
onevent(packet: Packet) {
// console.debug('emitting event %j', args);
if (null != packet.id) {
// console.debug('attaching ack callback to event');
this.dispatch(packet, this.ack(packet.id))
// debug('attaching ack callback to event');
this.dispatch(packet, this.ack(packet.id));
} else {
this.dispatch(packet);
}
this.dispatch(packet);
}
ack(id: number) {
let sent = false;
@ -274,7 +286,7 @@ export class Socket extends EventEmitter implements SocketIO.Socket {
}
dispatch(packet: Packet, ack?: Function) {
if (ack) { this.acks[packet.id] = ack; }
super.emit(packet.name, packet.data)
super.emit(packet.name, ...packet.data, ack)
}
private hasBin(obj: any) {
return false;