From cbb73b451fbf1dc2768a2b3deba3bf21ba8c1c81 Mon Sep 17 00:00:00 2001 From: MiaoWoo Date: Tue, 24 Mar 2020 13:27:11 +0800 Subject: [PATCH] feat: optimize framework add error catch(only log) Signed-off-by: MiaoWoo --- packages/websocket/package.json | 9 +- .../src/netty/text_websocket_frame.ts | 4 +- packages/websocket/src/server/index.ts | 20 +-- .../src/server/text_websocket_frame.ts | 3 + packages/websocket/src/socket-io/adapter.ts | 17 +-- packages/websocket/src/socket-io/client.ts | 7 +- packages/websocket/src/socket-io/index.ts | 36 +++--- packages/websocket/src/socket-io/namespace.ts | 51 +++++++- packages/websocket/src/socket-io/parser.ts | 26 ++-- packages/websocket/src/socket-io/socket.ts | 117 +++++++++++++----- 10 files changed, 191 insertions(+), 99 deletions(-) diff --git a/packages/websocket/package.json b/packages/websocket/package.json index 5f3e2328..c9a6a2af 100644 --- a/packages/websocket/package.json +++ b/packages/websocket/package.json @@ -1,7 +1,7 @@ { "name": "@ms/websocket", "version": "0.3.1", - "description": "MiaoScript api package", + "description": "MiaoScript websocket package", "keywords": [ "miaoscript", "minecraft", @@ -27,9 +27,6 @@ "typescript": "^3.8.3" }, "dependencies": { - "@ms/container": "^0.3.2", - "@ms/nashorn": "^0.3.1", - "socket.io-parser": "^3.4.0" - }, - "gitHead": "781524f83e52cad26d7c480513e3c525df867121" + "@ms/nashorn": "^0.3.1" + } } diff --git a/packages/websocket/src/netty/text_websocket_frame.ts b/packages/websocket/src/netty/text_websocket_frame.ts index 27c29ddc..1aa0c151 100644 --- a/packages/websocket/src/netty/text_websocket_frame.ts +++ b/packages/websocket/src/netty/text_websocket_frame.ts @@ -10,12 +10,14 @@ export abstract class TextWebSocketFrameHandlerAdapter { acceptInboundMessage: (msg: any) => { return TextWebSocketFrameMatcher.match(msg) }, - channelRead0: this.channelRead0.bind(this) + channelRead0: this.channelRead0.bind(this), + exceptionCaught: this.exceptionCaught.bind(this) }) this._Handler = new TextWebSocketFrameHandlerAdapterImpl(); } abstract userEventTriggered(ctx: any, evt: any); abstract channelRead0(ctx: any, msg: any); + abstract exceptionCaught(ctx: any, cause: Error); getHandler() { return this._Handler; } diff --git a/packages/websocket/src/server/index.ts b/packages/websocket/src/server/index.ts index ea797823..099e87b3 100644 --- a/packages/websocket/src/server/index.ts +++ b/packages/websocket/src/server/index.ts @@ -6,13 +6,12 @@ import { WebSocketHandler } from './websocket_handler' import { NettyClient } from './client' import { NettyWebSocketServerOptions } from './config' -class NettyWebSocketServer { - private event: EventEmitter +class NettyWebSocketServer extends EventEmitter { private pipeline: any; private allClients: { [key: string]: NettyClient }; constructor(pipeline: any, options: NettyWebSocketServerOptions) { - this.event = new EventEmitter(); + super() this.allClients = {}; this.pipeline = pipeline; let connectEvent = options.event; @@ -24,25 +23,18 @@ class NettyWebSocketServer { connectEvent.on(ServerEvent.connect, (ctx) => { let nettyClient = new NettyClient(this, ctx.channel()); this.allClients[nettyClient.id] = nettyClient; - this.event.emit(ServerEvent.connect, nettyClient); + this.emit(ServerEvent.connect, nettyClient); }) connectEvent.on(ServerEvent.message, (ctx, msg) => { let channel = ctx.channel(); - this.event.emit(ServerEvent.message, this.allClients[channel.id()], msg.text()) + this.emit(ServerEvent.message, this.allClients[channel.id()], msg.text()) }) } - - disable() { + close() { if (this.pipeline.names().contains(Keys.Detect)) { this.pipeline.remove(Keys.Detect) } - Object.values(this.allClients).forEach(client => { - client.close(); - }) - } - - on(event: string, listener: (...args: any[]) => void) { - this.event.on(event, listener) + Object.values(this.allClients).forEach(client => client.close()) } } diff --git a/packages/websocket/src/server/text_websocket_frame.ts b/packages/websocket/src/server/text_websocket_frame.ts index fb2a3f61..396fa1d3 100644 --- a/packages/websocket/src/server/text_websocket_frame.ts +++ b/packages/websocket/src/server/text_websocket_frame.ts @@ -17,4 +17,7 @@ export class TextWebSocketFrameHandler extends TextWebSocketFrameHandlerAdapter channelRead0(ctx: any, msg: any) { this.event.emit(ServerEvent.message, ctx, msg) } + exceptionCaught(ctx: any, cause: Error) { + console.ex(cause) + } } diff --git a/packages/websocket/src/socket-io/adapter.ts b/packages/websocket/src/socket-io/adapter.ts index db0cc308..4c670caa 100644 --- a/packages/websocket/src/socket-io/adapter.ts +++ b/packages/websocket/src/socket-io/adapter.ts @@ -2,14 +2,18 @@ import { EventEmitter } from 'events' import { SocketIO } from './interfaces'; import { Namespace } from './namespace'; import { Parser } from './parser'; +import { Socket } from './socket'; export class Adapter extends EventEmitter implements SocketIO.Adapter { - nsp: SocketIO.Namespace; + nsp: Namespace; rooms: Rooms; sids: { [id: string]: { [room: string]: boolean; }; }; parser: Parser constructor(nsp: Namespace) { super() + this.nsp = nsp; + this.rooms = new Rooms(); + this.sids = {}; this.parser = nsp.server.parser; } add(id: string, room: string, callback?: (err?: any) => void): void { @@ -23,7 +27,7 @@ export class Adapter extends EventEmitter implements SocketIO.Adapter { * @param {Function} callback * @api public */ - addAll(id, rooms, fn) { + addAll(id: string, rooms: string | any[], fn: { (err?: any): void; bind?: any; }) { for (var i = 0; i < rooms.length; i++) { var room = rooms[i]; this.sids[id] = this.sids[id] || {}; @@ -42,7 +46,6 @@ export class Adapter extends EventEmitter implements SocketIO.Adapter { } callback && callback.bind(null, null) } - delAll(id: string): void { var rooms = this.sids[id]; if (rooms) { @@ -66,10 +69,10 @@ export class Adapter extends EventEmitter implements SocketIO.Adapter { }; var ids = {}; var self = this; - var socket; + var socket: Socket; packet.nsp = this.nsp.name; - let encodedPackets = this.parser.encode(packet) + // let encodedPackets = this.parser.encode(packet) if (rooms.length) { for (var i = 0; i < rooms.length; i++) { var room = self.rooms[rooms[i]]; @@ -80,7 +83,7 @@ export class Adapter extends EventEmitter implements SocketIO.Adapter { if (ids[id] || ~except.indexOf(id)) continue; socket = self.nsp.connected[id]; if (socket) { - socket.packet(encodedPackets, packetOpts); + socket.packet(packet, packetOpts); ids[id] = true; } } @@ -91,7 +94,7 @@ export class Adapter extends EventEmitter implements SocketIO.Adapter { if (self.sids.hasOwnProperty(id)) { if (~except.indexOf(id)) continue; socket = self.nsp.connected[id]; - if (socket) socket.packet(encodedPackets, packetOpts); + if (socket) socket.packet(packet, packetOpts); } } } diff --git a/packages/websocket/src/socket-io/client.ts b/packages/websocket/src/socket-io/client.ts index 31e398a6..b1e00e7e 100644 --- a/packages/websocket/src/socket-io/client.ts +++ b/packages/websocket/src/socket-io/client.ts @@ -9,6 +9,7 @@ import { PacketTypes, SubPacketTypes } from './types'; const parser = new Parser(); export class Client extends EventEmitter implements SocketIO.Client { + id: string; server: Server; conn: NettyClient; request: any; @@ -20,13 +21,11 @@ export class Client extends EventEmitter implements SocketIO.Client { super(); this.server = server; this.conn = nettyClient; + this.id = this.conn.id + ''; this.request = nettyClient.request; this.sockets = {}; this.nsps = {}; } - get id() { - return this.conn.id; - } connect(name, query) { if (this.server.nsps[name]) { // console.debug(`connecting to namespace ${name}`); @@ -63,7 +62,7 @@ export class Client extends EventEmitter implements SocketIO.Client { } }); } - packet(packet: Packet) { + packet(packet: Packet, opts?: any) { this.conn.send(parser.encode(packet)) } onclose(reason: string) { diff --git a/packages/websocket/src/socket-io/index.ts b/packages/websocket/src/socket-io/index.ts index 49e88e7d..564862c2 100644 --- a/packages/websocket/src/socket-io/index.ts +++ b/packages/websocket/src/socket-io/index.ts @@ -83,7 +83,7 @@ class Server implements SocketIO.Server { bind(srv: any): SocketIO.Server { throw new Error("Method not implemented."); } - onconnection(socket: any): SocketIO.Server { + onconnection(socket: Client): SocketIO.Server { this.allClients[socket.id] = socket; socket.packet({ type: PacketTypes.OPEN, @@ -104,53 +104,47 @@ class Server implements SocketIO.Server { return this.nsps[nsp]; } close(fn?: () => void): void { - throw new Error("Method not implemented."); + for (let socket in this.sockets.sockets) { + this.sockets.sockets[socket].onclose() + } + this.nettyServer.close(); } on(event: "connection", listener: (socket: SocketIO.Socket) => void): SocketIO.Namespace; on(event: "connect", listener: (socket: SocketIO.Socket) => void): SocketIO.Namespace; on(event: string, listener: Function): SocketIO.Namespace; on(event: any, listener: any): SocketIO.Namespace { - this.event.on(event, listener); - return this.sockets; + return this.sockets.on(event, listener); } to(room: string): SocketIO.Namespace { - throw new Error("Method not implemented."); + return this.sockets.to(room); } in(room: string): SocketIO.Namespace { - throw new Error("Method not implemented."); + return this.sockets.in(room); } use(fn: (socket: SocketIO.Socket, fn: (err?: any) => void) => void): SocketIO.Namespace { - throw new Error("Method not implemented."); + return this.sockets.use(fn); } emit(event: string, ...args: any[]): SocketIO.Namespace { - this.sockets.emit(event, ...args); - return this.sockets; + // @ts-ignore + return this.sockets.emit(event, ...args); } send(...args: any[]): SocketIO.Namespace { - this.sockets.send(...args); - return this.sockets; + return this.sockets.send(...args); } write(...args: any[]): SocketIO.Namespace { - this.sockets.write(...args); - return this.sockets; + return this.sockets.write(...args); } clients(...args: any[]): SocketIO.Namespace { - this.sockets.clients(args[0]); - return this.sockets; + return this.sockets.clients(args[0]); } compress(...args: any[]): SocketIO.Namespace { - throw new Error("Method not implemented."); + return this.sockets.compress(args[0]) } - // =============================== checkNamespace(name, query, fn) { fn(false); }; - disable() { - this.nettyServer.disable(); - } - private initNettyServer(pipeline, options) { this.nettyServer = new NettyWebSocketServer(pipeline, { event: new EventEmitter(), diff --git a/packages/websocket/src/socket-io/namespace.ts b/packages/websocket/src/socket-io/namespace.ts index 6bb8dc67..de93170e 100644 --- a/packages/websocket/src/socket-io/namespace.ts +++ b/packages/websocket/src/socket-io/namespace.ts @@ -17,12 +17,23 @@ export class Namespace extends EventEmitter implements SocketIO.Namespace { adapter: SocketIO.Adapter; json: SocketIO.Namespace; + fns: any[]; + ids: number; + rooms: string[]; + flags: { [key: string]: boolean }; + + private events = ['connect', 'connection', 'newListener'] + constructor(name: string, server: Server) { super(); this.name = name; this.server = server; this.sockets = {}; this.connected = {}; + this.fns = []; + this.ids = 0; + this.rooms = []; + this.flags = {}; this.adapter = new Adapter(this); } initAdapter() { @@ -43,23 +54,55 @@ export class Namespace extends EventEmitter implements SocketIO.Namespace { delete this.sockets[client.id]; } use(fn: (socket: SocketIO.Socket, fn: (err?: any) => void) => void): SocketIO.Namespace { - // TODO - return this; + throw new Error("Method not implemented."); } to(room: string): SocketIO.Namespace { - // TODO + if (!~this.rooms.indexOf(room)) this.rooms.push(room); return this; } in(room: string): SocketIO.Namespace { return this.to(room); } send(...args: any[]): SocketIO.Namespace { - // TODO + super.emit('message', ...args) return this; } write(...args: any[]): SocketIO.Namespace { return this.send(...args); } + emit(event: string, ...args: any[]): boolean { + if (~this.events.indexOf(event)) { + super.emit(event, ...args); + // @ts-ignore + return this; + } + // set up packet object + var packet = { + type: (this.flags.binary !== undefined ? this.flags.binary : this.hasBin(args)) ? SubPacketTypes.BINARY_EVENT : SubPacketTypes.EVENT, + data: args + } + + if ('function' == typeof args[args.length - 1]) { + throw new Error('Callbacks are not supported when broadcasting'); + } + + var rooms = this.rooms.slice(0); + var flags = Object.assign({}, this.flags); + + // reset flags + this.rooms = []; + this.flags = {}; + + this.adapter.broadcast(packet, { + rooms: rooms, + flags: flags + }); + // @ts-ignore + return this; + } + hasBin(args: any[]) { + return false; + } clients(fn: Function): SocketIO.Namespace { return fn(Object.values(this.sockets)) } diff --git a/packages/websocket/src/socket-io/parser.ts b/packages/websocket/src/socket-io/parser.ts index 38778b80..870fe7e0 100644 --- a/packages/websocket/src/socket-io/parser.ts +++ b/packages/websocket/src/socket-io/parser.ts @@ -3,8 +3,9 @@ import { PacketTypes, SubPacketTypes } from "./types"; export class Parser { encode(packet: Packet): string { + let origin = JSON.stringify(packet) // first is type - var str = '' + packet.type; + let str = '' + packet.type; if (packet.type == PacketTypes.PONG) { if (packet.data) { str += packet.data }; return str; @@ -26,18 +27,19 @@ export class Parser { str += packet.id; } if (packet.sub_type == SubPacketTypes.EVENT) { + if (packet.name == undefined) { throw new Error(`SubPacketTypes.EVENT name can't be empty!`) } packet.data = [packet.name, packet.data] } // json data if (null != packet.data) { - var payload = this.tryStringify(packet.data); + let payload = this.tryStringify(packet.data); if (payload !== false) { str += payload; } else { return '4"encode error"' } } - console.debug(`encoded ${JSON.stringify(packet)} as ${str}`); + console.debug(`encoded ${origin} as ${str}`); return str; } tryStringify(str) { @@ -48,14 +50,14 @@ export class Parser { } } decode(str: string): Packet { - var i = 0; + let i = 0; // ignore parse binary // if ((frame.getByte(0) == 'b' && frame.getByte(1) == '4') // || frame.getByte(0) == 4 || frame.getByte(0) == 1) { // return parseBinary(head, frame); // } // look up type - var p: Packet = { + let p: Packet = { type: Number(str.charAt(i)) }; if (null == PacketTypes[p.type]) { @@ -77,7 +79,7 @@ export class Parser { } // look up attachments if type binary if ([SubPacketTypes.BINARY_ACK, SubPacketTypes.BINARY_EVENT].includes(p.sub_type)) { - var buf = ''; + let buf = ''; while (str.charAt(++i) !== '-') { buf += str.charAt(i); if (i == str.length) break; @@ -92,7 +94,7 @@ export class Parser { if ('/' === str.charAt(i + 1)) { p.nsp = ''; while (++i) { - var c = str.charAt(i); + let c = str.charAt(i); if (',' === c) break; p.nsp += c; if (i === str.length) break; @@ -102,11 +104,11 @@ export class Parser { } // look up id - var next = str.charAt(i + 1); + let next = str.charAt(i + 1); if ('' !== next && Number.isNaN(Number(next))) { - var id = '' + let id = '' while (++i) { - var c = str.charAt(i); + let c = str.charAt(i); if (null == c || Number.isNaN(Number(c))) { --i; break; @@ -128,8 +130,8 @@ export class Parser { // look up json data if (str.charAt(++i)) { - var payload = this.tryParse(str.substr(i)); - var isPayloadValid = payload !== false && (p.sub_type == SubPacketTypes.ERROR || Array.isArray(payload)); + let payload = this.tryParse(str.substr(i)); + let isPayloadValid = payload !== false && (p.sub_type == SubPacketTypes.ERROR || Array.isArray(payload)); if (isPayloadValid) { p.name = payload[0]; p.data = payload[1]; diff --git a/packages/websocket/src/socket-io/socket.ts b/packages/websocket/src/socket-io/socket.ts index 7f32af15..883e85ef 100644 --- a/packages/websocket/src/socket-io/socket.ts +++ b/packages/websocket/src/socket-io/socket.ts @@ -7,8 +7,6 @@ import { Client } from './client'; import { Namespace } from './namespace'; export class Socket extends EventEmitter implements SocketIO.Socket { - private event: EventEmitter; - nsp: Namespace; server: SocketIO.Server; adapter: SocketIO.Adapter; @@ -21,12 +19,19 @@ export class Socket extends EventEmitter implements SocketIO.Socket { connected: boolean; disconnected: boolean; handshake: SocketIO.Handshake; - json: SocketIO.Socket; - volatile: SocketIO.Socket; - broadcast: SocketIO.Socket; fns: any[]; + flags: { [key: string]: boolean }; _rooms: string[]; + private events = [ + 'error', + 'connect', + 'disconnect', + 'disconnecting', + 'newListener', + 'removeListener' + ] + constructor(nsp: Namespace, client: Client, query = {}) { super(); this.nsp = nsp; @@ -42,10 +47,26 @@ export class Socket extends EventEmitter implements SocketIO.Socket { this.disconnected = false; // this.handshake = this.buildHandshake(query); this.fns = []; - // this.flags = {}; + this.flags = {}; this._rooms = []; + } - this.event = new EventEmitter(); + get json() { + this.flags.json = true; + return this + } + + get volatile() { + this.flags.volatile = true; + return this + } + get broadcast() { + this.flags.broadcast = true; + return this + } + get local() { + this.flags.local = true; + return this } to(room: string): SocketIO.Socket { @@ -95,11 +116,10 @@ export class Socket extends EventEmitter implements SocketIO.Socket { throw new Error("Method not implemented."); } error(err: any): void { - throw new Error("Method not implemented."); + this.packet({ type: PacketTypes.MESSAGE, sub_type: SubPacketTypes.ERROR, data: err }); } // ========================================== - buildHandshake(query): SocketIO.Handshake { let requestQuery = this.request.uri(); return { @@ -113,29 +133,61 @@ export class Socket extends EventEmitter implements SocketIO.Socket { query: Object.assign(query, requestQuery) } } - - on(event: string, callback: (...args: any[]) => void) { - this.event.on(event, callback); - return this - } emit(event: string, ...args: any[]): boolean { - this.packet({ + if (~this.events.indexOf(event)) { + super.emit(event, ...args); + // @ts-ignore + return this; + } + + let packet: Packet = { type: PacketTypes.MESSAGE, - sub_type: SubPacketTypes.EVENT, + sub_type: (this.flags.binary !== undefined ? this.flags.binary : this.hasBin(args)) ? SubPacketTypes.BINARY_EVENT : SubPacketTypes.EVENT, name: event, data: args[0] - }) - return true; + } + + // access last argument to see if it's an ACK callback + if (typeof args[args.length - 1] === 'function') { + if (this._rooms.length || this.flags.broadcast) { + throw new Error('Callbacks are not supported when broadcasting'); + } + // debug('emitting packet with ack id %d', this.nsp.ids); + this.acks[this.nsp.ids] = args.pop(); + packet.id = this.nsp.ids++; + } + + let rooms = this._rooms.slice(0); + let flags = Object.assign({}, this.flags); + + // reset flags + this._rooms = []; + this.flags = {}; + + if (rooms.length || flags.broadcast) { + this.adapter.broadcast(packet, { + except: [this.id], + rooms: rooms, + flags: flags + }); + } else { + // dispatch packet + this.packet(packet, flags); + } + // @ts-ignore + return this; } - packet(packet: Packet) { + packet(packet: Packet, opts?: any) { packet.nsp = this.nsp.name; - this.client.packet(packet); + opts = opts || {}; + opts.compress = false !== opts.compress; + this.client.packet(packet, opts); } onconnect() { this.nsp.connected[this.id] = this; this.client.sockets[this.id] = this; this.join(this.id); - // var skip = this.nsp.name === '/' && this.nsp.fns.length === 0; + // let skip = this.nsp.name === '/' && this.nsp.fns.length === 0; // if (skip) { // debug('packet already sent in initial handshake'); // } else { @@ -145,7 +197,7 @@ export class Socket extends EventEmitter implements SocketIO.Socket { }); // } } - onclose(reason) { + onclose(reason?: string) { if (!this.connected) return this; // debug('closing socket - reason %s', reason); this.emit('disconnecting', reason); @@ -156,7 +208,7 @@ export class Socket extends EventEmitter implements SocketIO.Socket { this.disconnected = true; delete this.nsp.connected[this.id]; this.emit('disconnect', reason); - }; + } onpacket(packet: Packet) { switch (packet.sub_type) { case SubPacketTypes.EVENT: @@ -178,11 +230,16 @@ export class Socket extends EventEmitter implements SocketIO.Socket { this.onerror(new Error(packet.data)); } } - onerror(error: Error) { - + onerror(err: Error) { + if (this.listeners('error').length) { + this.emit('error', err); + } else { + console.error('Missing error handler on `socket`.'); + console.error(err.stack); + } } ondisconnect() { - this.onclose + this.onclose('client namespace disconnect') } onevent(packet: Packet) { // console.debug('emitting event %j', args); @@ -191,9 +248,9 @@ export class Socket extends EventEmitter implements SocketIO.Socket { this.dispatch(packet, this.ack(packet.id)) } this.dispatch(packet); - }; + } ack(id: number) { - var sent = false; + let sent = false; return (...args: any[]) => { if (sent) return; this.packet({ @@ -206,7 +263,7 @@ export class Socket extends EventEmitter implements SocketIO.Socket { } } onack(packet: Packet) { - var ack = this.acks[packet.id]; + let ack = this.acks[packet.id]; if ('function' == typeof ack) { // debug('calling ack %s with %j', packet.id, packet.data); ack.apply(this, packet.data); @@ -217,7 +274,7 @@ export class Socket extends EventEmitter implements SocketIO.Socket { } dispatch(packet: Packet, ack?: Function) { if (ack) { this.acks[packet.id] = ack; } - this.event.emit(packet.name, packet.data) + super.emit(packet.name, packet.data) } private hasBin(obj: any) { return false;