From 678f4ca8a4c6a5eac830ff43a74fa5f2016c7a62 Mon Sep 17 00:00:00 2001 From: MiaoWoo Date: Tue, 30 Jun 2020 14:02:18 +0800 Subject: [PATCH] fix: boardcast error when socket send packet Signed-off-by: MiaoWoo --- packages/websocket/src/socket-io/index.ts | 173 +++++++------- packages/websocket/src/socket-io/socket.ts | 253 +++++++++++---------- packages/websocket/src/tomcat/client.ts | 55 ++--- 3 files changed, 247 insertions(+), 234 deletions(-) diff --git a/packages/websocket/src/socket-io/index.ts b/packages/websocket/src/socket-io/index.ts index 20a4d68d..dfed8c05 100644 --- a/packages/websocket/src/socket-io/index.ts +++ b/packages/websocket/src/socket-io/index.ts @@ -1,18 +1,18 @@ import { EventEmitter } from 'events' -import { ServerEvent } from './constants'; -import { Namespace } from './namespace'; -import { Client } from './client'; +import { ServerEvent } from './constants' +import { Namespace } from './namespace' +import { Client } from './client' import { SocketIO } from './interfaces' import { Parser } from './parser' -import { PacketTypes, SubPacketTypes } from './types'; -import { Packet } from './packet'; -import { Socket } from './socket'; -import { Adapter } from './adapter'; +import { PacketTypes, SubPacketTypes } from './types' +import { Packet } from './packet' +import { Socket } from './socket' +import { Adapter } from './adapter' interface ServerOptions extends SocketIO.ServerOptions { - event?: EventEmitter; - root?: string; + event?: EventEmitter + root?: string } interface WebSocketServer extends EventEmitter { @@ -20,87 +20,87 @@ interface WebSocketServer extends EventEmitter { } class Server implements SocketIO.Server { - private websocketServer: WebSocketServer; - private allClients: { [key: string]: Client }; + private websocketServer: WebSocketServer + private allClients: { [key: string]: Client } - engine: { ws: any; }; - nsps: { [namespace: string]: Namespace; }; - sockets: Namespace; - json: SocketIO.Server; - volatile: SocketIO.Server; - local: SocketIO.Server; + engine: { ws: any } + nsps: { [namespace: string]: Namespace } + sockets: Namespace + json: SocketIO.Server + volatile: SocketIO.Server + local: SocketIO.Server parser = new Parser(); - _adapter: Adapter; - options: ServerOptions; + _adapter: Adapter + options: ServerOptions constructor(instance: any, options: ServerOptions) { if (!instance) { throw new Error('instance can\'t be undefiend!') } - this.allClients = {}; - this.nsps = {}; - this.sockets = new Namespace('/', this); - this.nsps['/'] = this.sockets; + this.allClients = {} + this.nsps = {} + this.sockets = new Namespace('/', this) + this.nsps['/'] = this.sockets if (instance.class.name.startsWith('io.netty.channel')) { let { NettyWebSocketServer } = require("../server") this.websocketServer = new NettyWebSocketServer(instance, Object.assign({ event: new EventEmitter(), path: '/socket.io', root: root + '/wwwroot' - }, options)); + }, options)) } else { let { TomcatWebSocketServer } = require("../tomcat/server") - this.websocketServer = new TomcatWebSocketServer(instance, options); + this.websocketServer = new TomcatWebSocketServer(instance, options) } this.initServer() } checkRequest(req: any, fn: (err: any, success: boolean) => void): void { - throw new Error("Method not implemented."); + throw new Error("Method not implemented.") } - serveClient(): boolean; - serveClient(v: boolean): SocketIO.Server; + serveClient(): boolean + serveClient(v: boolean): SocketIO.Server serveClient(v?: any): boolean | SocketIO.Server { - throw new Error("Method not implemented."); + throw new Error("Method not implemented.") } - path(): string; - path(v: string): SocketIO.Server; + path(): string + path(v: string): SocketIO.Server path(v?: any): string | SocketIO.Server { - if (!arguments.length) return this.options.path; - this.options.path = v.replace(/\/$/, ''); - return this; + if (!arguments.length) return this.options.path + this.options.path = v.replace(/\/$/, '') + return this } - adapter(): Adapter; - adapter(v: any): SocketIO.Server; + adapter(): Adapter + adapter(v: any): SocketIO.Server adapter(v?: any): Adapter | SocketIO.Server { - if (!arguments.length) return this._adapter; - this._adapter = v; + if (!arguments.length) return this._adapter + this._adapter = v for (var i in this.nsps) { if (this.nsps.hasOwnProperty(i)) { - this.nsps[i].initAdapter(); + this.nsps[i].initAdapter() } } - return this; + return this } - origins(): string | string[]; - origins(v: string | string[]): SocketIO.Server; - origins(fn: (origin: string, callback: (error: string, success: boolean) => void) => void): SocketIO.Server; + origins(): string | string[] + origins(v: string | string[]): SocketIO.Server + origins(fn: (origin: string, callback: (error: string, success: boolean) => void) => void): SocketIO.Server origins(fn?: any): string | string[] | SocketIO.Server { - throw new Error("Method not implemented."); + throw new Error("Method not implemented.") } - attach(srv: any, opts?: SocketIO.ServerOptions): SocketIO.Server; - attach(port: number, opts?: SocketIO.ServerOptions): SocketIO.Server; + attach(srv: any, opts?: SocketIO.ServerOptions): SocketIO.Server + attach(port: number, opts?: SocketIO.ServerOptions): SocketIO.Server attach(port: any, opts?: any): SocketIO.Server { - throw new Error("Method not implemented."); + throw new Error("Method not implemented.") } - listen(srv: any, opts?: SocketIO.ServerOptions): SocketIO.Server; - listen(port: number, opts?: SocketIO.ServerOptions): SocketIO.Server; + listen(srv: any, opts?: SocketIO.ServerOptions): SocketIO.Server + listen(port: number, opts?: SocketIO.ServerOptions): SocketIO.Server listen(port: any, opts?: any): SocketIO.Server { - throw new Error("Method not implemented."); + throw new Error("Method not implemented.") } bind(srv: any): SocketIO.Server { - throw new Error("Method not implemented."); + throw new Error("Method not implemented.") } onconnection(socket: Client): SocketIO.Server { - this.allClients[socket.id] = socket; + this.allClients[socket.id] = socket socket.packet({ type: PacketTypes.OPEN, data: { @@ -110,68 +110,75 @@ class Server implements SocketIO.Server { pingTimeout: 5000 } }) - this.sockets.add(socket); - return this; + this.sockets.add(socket) + return this } of(nsp: string): Namespace { if (!this.nsps[nsp]) { - this.nsps[nsp] = new Namespace(nsp, this); + this.nsps[nsp] = new Namespace(nsp, this) } - return this.nsps[nsp]; + return this.nsps[nsp] } close(fn?: () => void): void { for (let socket in this.sockets.sockets) { this.sockets.sockets[socket].onclose() } - this.websocketServer.close(); + this.websocketServer.close() } - on(event: "connection", listener: (socket: SocketIO.Socket) => void): SocketIO.Namespace; - on(event: "connect", listener: (socket: SocketIO.Socket) => void): SocketIO.Namespace; - on(event: string, listener: Function): SocketIO.Namespace; + on(event: "connection", listener: (socket: SocketIO.Socket) => void): SocketIO.Namespace + on(event: "connect", listener: (socket: SocketIO.Socket) => void): SocketIO.Namespace + on(event: string, listener: Function): SocketIO.Namespace on(event: any, listener: any): SocketIO.Namespace { - return this.sockets.on(event, listener); + return this.sockets.on(event, listener) } to(room: string): SocketIO.Namespace { - return this.sockets.to(room); + return this.sockets.to(room) } in(room: string): SocketIO.Namespace { - return this.sockets.in(room); + return this.sockets.in(room) } use(fn: (socket: SocketIO.Socket, fn: (err?: any) => void) => void): SocketIO.Namespace { - return this.sockets.use(fn); + return this.sockets.use(fn) } emit(event: string, ...args: any[]): SocketIO.Namespace { // @ts-ignore - return this.sockets.emit(event, ...args); + return this.sockets.emit(event, ...args) } send(...args: any[]): SocketIO.Namespace { - return this.sockets.send(...args); + return this.sockets.send(...args) } write(...args: any[]): SocketIO.Namespace { - return this.sockets.write(...args); + return this.sockets.write(...args) } clients(...args: any[]): SocketIO.Namespace { - return this.sockets.clients(args[0]); + return this.sockets.clients(args[0]) } compress(...args: any[]): SocketIO.Namespace { return this.sockets.compress(args[0]) } // =============================== checkNamespace(name, query, fn) { - fn(false); + fn(false) }; private initServer() { this.websocketServer.on(ServerEvent.connect, (socket: SocketIO.EngineSocket) => { - let client = new Client(this, socket); - this.onconnection(client); + let client = new Client(this, socket) + this.onconnection(client) }) this.websocketServer.on(ServerEvent.message, (socket: SocketIO.EngineSocket, text) => { - this.processPacket(this.parser.decode(text), this.allClients[socket.id]); + this.processPacket(this.parser.decode(text), this.allClients[socket.id]) + }) + this.websocketServer.on(ServerEvent.disconnect, (socket: SocketIO.EngineSocket, reason) => { + this.allClients[socket.id].onclose(reason) + delete this.allClients[socket.id] }) this.websocketServer.on(ServerEvent.error, (socket: SocketIO.EngineSocket, cause) => { - console.error(`Client ${socket.id} cause error: ` + cause) - console.ex(cause) + if (socket.listeners(ServerEvent.error).length) { + socket.emit(ServerEvent.error, cause) + } else { + console.error(`client ${socket.id} cause error: ${cause}`) + } }) } @@ -182,15 +189,15 @@ class Server implements SocketIO.Server { type: PacketTypes.PONG, data: packet.data }) - break; + break case PacketTypes.UPGRADE: - break; + break case PacketTypes.MESSAGE: - this.processSubPacket(packet, client); - break; + this.processSubPacket(packet, client) + break case PacketTypes.CLOSE: client.onclose() - break; + break } } @@ -201,11 +208,11 @@ class Server implements SocketIO.Server { type: PacketTypes.MESSAGE, sub_type: SubPacketTypes.ERROR, data: 'not support dynamic namespace: ' + packet.nsp - }); - client.disconnect(); - return; + }) + client.disconnect() + return } - namespace.process(packet, client); + namespace.process(packet, client) } } export { diff --git a/packages/websocket/src/socket-io/socket.ts b/packages/websocket/src/socket-io/socket.ts index 8fd9ea1b..595a94e6 100644 --- a/packages/websocket/src/socket-io/socket.ts +++ b/packages/websocket/src/socket-io/socket.ts @@ -1,28 +1,29 @@ import { EventEmitter } from 'events' -import { SocketIO } from "./interfaces"; -import { Packet } from './packet'; -import { PacketTypes, SubPacketTypes } from './types'; -import { Client } from './client'; -import { Namespace } from './namespace'; +import { SocketIO } from "./interfaces" +import { Packet } from './packet' +import { PacketTypes, SubPacketTypes } from './types' +import { Client } from './client' +import { Namespace } from './namespace' import * as querystring from 'querystring' +import { ServerEvent } from './constants' export class Socket extends EventEmitter implements SocketIO.Socket { - nsp: Namespace; - server: SocketIO.Server; - adapter: SocketIO.Adapter; - id: string; - request: any; - client: Client; - conn: SocketIO.EngineSocket; - rooms: { [id: string]: string; }; - acks: { [id: string]: Function; }; - connected: boolean; - disconnected: boolean; - handshake: SocketIO.Handshake; - fns: any[]; - flags: { [key: string]: boolean }; - _rooms: string[]; + nsp: Namespace + server: SocketIO.Server + adapter: SocketIO.Adapter + id: string + request: any + client: Client + conn: SocketIO.EngineSocket + rooms: { [id: string]: string } + acks: { [id: string]: Function } + connected: boolean + disconnected: boolean + handshake: SocketIO.Handshake + fns: any[] + flags: { [key: string]: boolean } + _rooms: string[] private events = [ 'error', @@ -34,113 +35,113 @@ export class Socket extends EventEmitter implements SocketIO.Socket { ] constructor(nsp: Namespace, client: Client, query = {}) { - super(); - this.nsp = nsp; - this.server = nsp.server; - this.adapter = this.nsp.adapter; - this.id = nsp.name !== '/' ? nsp.name + '#' + client.id : client.id; - this.client = client; - this.request = client.request; - this.conn = client.conn; - this.rooms = {}; - this.acks = {}; - this.connected = true; - this.disconnected = false; - this.handshake = this.buildHandshake(query); - this.fns = []; - this.flags = {}; - this._rooms = []; + super() + this.nsp = nsp + this.server = nsp.server + this.adapter = this.nsp.adapter + this.id = nsp.name !== '/' ? nsp.name + '#' + client.id : client.id + this.client = client + this.request = client.request + this.conn = client.conn + this.rooms = {} + this.acks = {} + this.connected = true + this.disconnected = false + this.handshake = this.buildHandshake(query) + this.fns = [] + this.flags = {} + this._rooms = [] } get json() { - this.flags.json = true; + this.flags.json = true return this } get volatile() { - this.flags.volatile = true; + this.flags.volatile = true return this } get broadcast() { - this.flags.broadcast = true; + this.flags.broadcast = true return this } get local() { - this.flags.local = true; + this.flags.local = true return this } to(room: string): SocketIO.Socket { - if (!~this._rooms.indexOf(room)) this._rooms.push(room); - return this; + if (!~this._rooms.indexOf(room)) this._rooms.push(room) + return this } in(room: string): SocketIO.Socket { - return this.to(room); + return this.to(room) } use(fn: (packet: SocketIO.Packet, next: (err?: any) => void) => void): SocketIO.Socket { - throw new Error("Method not implemented."); + throw new Error("Method not implemented.") } send(...args: any[]): SocketIO.Socket { this.emit("message", ...args) - return this; + return this } write(...args: any[]): SocketIO.Socket { - return this.send(...args); + return this.send(...args) } join(rooms: string | string[], fn?: (err?: any) => void): SocketIO.Socket { if (!Array.isArray(rooms)) { - rooms = [rooms]; + rooms = [rooms] } rooms = rooms.filter((room) => { - return !this.rooms.hasOwnProperty(room); - }); + return !this.rooms.hasOwnProperty(room) + }) if (!rooms.length) { - fn && fn(null); - return this; + fn && fn(null) + return this } this.adapter.addAll(this.id, rooms, (err) => { if (err) return fn && fn(err); // debug('joined room %s', rooms); (rooms as Array).forEach((room) => { - this.rooms[room] = room; - }); - fn && fn(null); - }); - return this; + this.rooms[room] = room + }) + fn && fn(null) + }) + return this } leave(name: string, fn?: Function): SocketIO.Socket { - delete this.rooms[name]; + delete this.rooms[name] fn && fn(null) - return this; + return this } leaveAll(): void { - this.adapter.delAll(this.id); - this.rooms = {}; + this.adapter.delAll(this.id) + this.rooms = {} } disconnect(close?: boolean): SocketIO.Socket { - if (!this.connected) return this; + if (!this.connected) return this if (close) { - this.client.disconnect(); + this.client.disconnect() } else { - this.packet({ type: PacketTypes.MESSAGE, sub_type: SubPacketTypes.DISCONNECT }); - this.onclose('server namespace disconnect'); + this.packet({ type: PacketTypes.MESSAGE, sub_type: SubPacketTypes.DISCONNECT }) + this.onclose('server namespace disconnect') } - return this; + return this } compress(compress: boolean): SocketIO.Socket { - throw new Error("Method not implemented."); + throw new Error("Method not implemented.") } error(err: any): void { - this.packet({ type: PacketTypes.MESSAGE, sub_type: SubPacketTypes.ERROR, data: err }); + this.packet({ type: PacketTypes.MESSAGE, sub_type: SubPacketTypes.ERROR, data: err }) } // ========================================== buildHandshake(query): SocketIO.Handshake { - let requestUri = this.request.uri(); - let headers = {}; - let nativeHeaders = this.request.headers(); + let requestUri = this.request.uri() + let headers = {} + let nativeHeaders = this.request.headers() nativeHeaders.forEach(function (header) { - headers[header.getKey()] = header.getValue(); + headers[header.getKey()] = header.getValue() }) return { headers: headers, @@ -155,9 +156,9 @@ export class Socket extends EventEmitter implements SocketIO.Socket { } emit(event: string, ...args: any[]): boolean { if (~this.events.indexOf(event)) { - super.emit(event, ...args); + super.emit(event, ...args) // @ts-ignore - return this; + return this } let packet: Packet = { @@ -170,44 +171,48 @@ export class Socket extends EventEmitter implements SocketIO.Socket { // access last argument to see if it's an ACK callback if (typeof args[args.length - 1] === 'function') { if (this._rooms.length || this.flags.broadcast) { - throw new Error('Callbacks are not supported when broadcasting'); + throw new Error('Callbacks are not supported when broadcasting') } // debug('emitting packet with ack id %d', this.nsp.ids); - this.acks[this.nsp.ids] = args.pop(); - packet.id = this.nsp.ids++; + this.acks[this.nsp.ids] = args.pop() + packet.id = this.nsp.ids++ } - let rooms = this._rooms.slice(0); - let flags = Object.assign({}, this.flags); + let rooms = this._rooms.slice(0) + let flags = Object.assign({}, this.flags) // reset flags - this._rooms = []; - this.flags = {}; + this._rooms = [] + this.flags = {} if (rooms.length || flags.broadcast) { this.adapter.broadcast(packet, { except: [this.id], rooms: rooms, flags: flags - }); + }) } else { // dispatch packet - this.packet(packet, flags); + this.packet(packet, flags) } // @ts-ignore - return this; + return this } packet(packet: Packet, opts: any = { preEncoded: false }) { if (!opts.preEncoded) { - packet.nsp = this.nsp.name; - opts.compress = false !== opts.compress; + packet.nsp = this.nsp.name + opts.compress = false !== opts.compress + } + try { + this.client.packet(packet, opts) + } catch (error) { + this.onerror(error) } - this.client.packet(packet, opts); } onconnect() { - this.nsp.connected[this.id] = this; - this.client.sockets[this.id] = this; - this.join(this.id); + this.nsp.connected[this.id] = this + this.client.sockets[this.id] = this + this.join(this.id) // let skip = this.nsp.name === '/' && this.nsp.fns.length === 0; // if (skip) { // debug('packet already sent in initial handshake'); @@ -215,54 +220,54 @@ export class Socket extends EventEmitter implements SocketIO.Socket { this.packet({ type: PacketTypes.MESSAGE, sub_type: SubPacketTypes.CONNECT - }); + }) // } } onclose(reason?: string) { - if (!this.connected) return this; + if (!this.connected) return this // debug('closing socket - reason %s', reason); - this.emit('disconnecting', reason); - this.leaveAll(); - this.nsp.remove(this); - this.client.remove(this); - this.connected = false; - this.disconnected = true; - delete this.nsp.connected[this.id]; - this.emit('disconnect', reason); + this.emit('disconnecting', reason) + this.leaveAll() + this.nsp.remove(this) + this.client.remove(this) + this.connected = false + this.disconnected = true + delete this.nsp.connected[this.id] + this.emit('disconnect', reason) } onpacket(packet: Packet) { switch (packet.sub_type) { // 2 case SubPacketTypes.EVENT: - this.onevent(packet); - break; + this.onevent(packet) + break // 5 case SubPacketTypes.BINARY_EVENT: - this.onevent(packet); - break; + this.onevent(packet) + break // 3 case SubPacketTypes.ACK: - this.onack(packet); - break; + this.onack(packet) + break // 6 case SubPacketTypes.BINARY_ACK: - this.onack(packet); - break; + this.onack(packet) + break // 1 case SubPacketTypes.DISCONNECT: - this.ondisconnect(); - break; + this.ondisconnect() + break // 4 case SubPacketTypes.ERROR: - this.onerror(new Error(packet.data)); + this.onerror(new Error(packet.data)) } } onerror(err: Error) { if (this.listeners('error').length) { - this.emit('error', err); + this.emit('error', err) } else { - console.error('Missing error handler on `socket`.'); - console.error(err.stack); + console.error('Missing error handler on `socket`.') + console.error(err.stack) } } ondisconnect() { @@ -271,39 +276,39 @@ export class Socket extends EventEmitter implements SocketIO.Socket { onevent(packet: Packet) { if (null != packet.id) { // debug('attaching ack callback to event'); - this.dispatch(packet, this.ack(packet.id)); + this.dispatch(packet, this.ack(packet.id)) } else { - this.dispatch(packet); + this.dispatch(packet) } } ack(id: number) { - let sent = false; + let sent = false return (...args: any[]) => { - if (sent) return; + if (sent) return this.packet({ id: id, type: PacketTypes.MESSAGE, sub_type: this.hasBin(args) ? SubPacketTypes.BINARY_ACK : SubPacketTypes.ACK, data: args - }); - sent = true; + }) + sent = true } } onack(packet: Packet) { - let ack = this.acks[packet.id]; + let ack = this.acks[packet.id] if ('function' == typeof ack) { // debug('calling ack %s with %j', packet.id, packet.data); - ack.apply(this, packet.data); - delete this.acks[packet.id]; + ack.apply(this, packet.data) + delete this.acks[packet.id] } else { // debug('bad ack %s', packet.id); } } dispatch(packet: Packet, ack?: Function) { - if (ack) { this.acks[packet.id] = ack; } + if (ack) { this.acks[packet.id] = ack } super.emit(packet.name, ...packet.data, ack) } private hasBin(obj: any) { - return false; + return false } } diff --git a/packages/websocket/src/tomcat/client.ts b/packages/websocket/src/tomcat/client.ts index 130fe274..91a6f4d3 100644 --- a/packages/websocket/src/tomcat/client.ts +++ b/packages/websocket/src/tomcat/client.ts @@ -1,44 +1,45 @@ import { EventEmitter } from 'events' -import { SocketIO } from '../socket-io/interfaces'; +import { SocketIO } from '../socket-io/interfaces' export class TomcatClient extends EventEmitter implements SocketIO.EngineSocket { - private _id: string; - private session: any + private _id: string + private session: javax.websocket.Session - server: any; - readyState: string; - remoteAddress: string; - upgraded: boolean; - request: any; - transport: any; + server: any + readyState: string + remoteAddress: string + upgraded: boolean + request: any + transport: any - constructor(server: any, session: any) { - super(); - this.server = server; - this.readyState = 'open'; + constructor(server: any, session: javax.websocket.Session) { + super() + this.server = server + this.readyState = 'open' this.remoteAddress = session + '' - this.upgraded = true; + this.upgraded = true this.request = { - uri: () => { - return session.getRequestURI() + '' - }, - headers: () => { - return [] - } - }; - this.transport = null; + uri: () => `${session.getRequestURI()}`, + headers: () => [] + } + this.transport = null - this.session = session; - this._id = session.getId(); + this.session = session + this._id = session.getId() } get id() { - return this._id; + return this._id } send(text: string) { - Java.synchronized(() => this.session.getBasicRemote().sendText(text), this.session)() + if (this.readyState == 'open') { + Java.synchronized(() => this.session.getBasicRemote().sendText(text), this.session)() + } } close() { - this.session.close(); + if (this.readyState == 'open') { + this.readyState = 'close' + this.session.close() + } } }