From f86e1a8c949e7632a9f0b5ce5a74d68d8240442e Mon Sep 17 00:00:00 2001 From: MiaoWoo Date: Fri, 13 Nov 2020 09:40:47 +0800 Subject: [PATCH] feat: optimize websocket Signed-off-by: MiaoWoo --- .../src/netty/text_websocket_frame.ts | 12 +- packages/websocket/src/netty/websocket.ts | 10 +- packages/websocket/src/server/client.ts | 38 +++--- packages/websocket/src/server/index.ts | 38 ++++-- .../websocket/src/server/websocket_detect.ts | 12 +- .../websocket/src/server/websocket_handler.ts | 11 +- packages/websocket/src/socket-io/index.ts | 41 ++++-- packages/websocket/src/socket-io/namespace.ts | 126 +++++++++--------- packages/websocket/src/tomcat/client.ts | 2 +- packages/websocket/src/tomcat/server.ts | 31 ++++- 10 files changed, 195 insertions(+), 126 deletions(-) diff --git a/packages/websocket/src/netty/text_websocket_frame.ts b/packages/websocket/src/netty/text_websocket_frame.ts index 1aa0c151..f444ad6d 100644 --- a/packages/websocket/src/netty/text_websocket_frame.ts +++ b/packages/websocket/src/netty/text_websocket_frame.ts @@ -3,7 +3,7 @@ const TextWebSocketFrameMatcher = TypeParameterMatcher.get(base.getClass('io.net const SimpleChannelInboundHandler = Java.type('io.netty.channel.SimpleChannelInboundHandler') export abstract class TextWebSocketFrameHandlerAdapter { - private _Handler; + private _Handler constructor() { let TextWebSocketFrameHandlerAdapterImpl = Java.extend(SimpleChannelInboundHandler, { userEventTriggered: this.userEventTriggered.bind(this), @@ -13,12 +13,12 @@ export abstract class TextWebSocketFrameHandlerAdapter { channelRead0: this.channelRead0.bind(this), exceptionCaught: this.exceptionCaught.bind(this) }) - this._Handler = new TextWebSocketFrameHandlerAdapterImpl(); + this._Handler = new TextWebSocketFrameHandlerAdapterImpl() } - abstract userEventTriggered(ctx: any, evt: any); - abstract channelRead0(ctx: any, msg: any); - abstract exceptionCaught(ctx: any, cause: Error); + abstract userEventTriggered(ctx: any, evt: any) + abstract channelRead0(ctx: any, msg: any) + abstract exceptionCaught(ctx: any, cause: Error) getHandler() { - return this._Handler; + return this._Handler } } diff --git a/packages/websocket/src/netty/websocket.ts b/packages/websocket/src/netty/websocket.ts index ca953528..e1e15ca6 100644 --- a/packages/websocket/src/netty/websocket.ts +++ b/packages/websocket/src/netty/websocket.ts @@ -1,17 +1,19 @@ const ChannelInboundHandlerAdapter = Java.type('io.netty.channel.ChannelInboundHandlerAdapter') export abstract class WebSocketHandlerAdapter { - private _Handler; + private _Handler constructor() { let ChannelInboundHandlerAdapterImpl = Java.extend(ChannelInboundHandlerAdapter, { channelRead: this.channelRead.bind(this), + channelUnregistered: this.exceptionCaught.bind(this), exceptionCaught: this.exceptionCaught.bind(this) }) this._Handler = new ChannelInboundHandlerAdapterImpl() } - abstract channelRead(ctx: any, channel: any); - abstract exceptionCaught(ctx: any, cause: Error); + abstract channelRead(ctx: any, channel: any) + abstract channelUnregistered(ctx: any) + abstract exceptionCaught(ctx: any, cause: Error) getHandler() { - return this._Handler; + return this._Handler } } diff --git a/packages/websocket/src/server/client.ts b/packages/websocket/src/server/client.ts index 3685da14..f49df917 100644 --- a/packages/websocket/src/server/client.ts +++ b/packages/websocket/src/server/client.ts @@ -1,40 +1,40 @@ import { EventEmitter } from 'events' -import { SocketIO } from '../socket-io/interfaces'; -import { AttributeKeys } from './constants'; +import { SocketIO } from '../socket-io/interfaces' +import { AttributeKeys } from './constants' const TextWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.TextWebSocketFrame') export class NettyClient extends EventEmitter implements SocketIO.EngineSocket { - private _id: string; + private _id: string private channel: any - server: any; - readyState: string; - remoteAddress: string; - upgraded: boolean; - request: any; - transport: any; + server: any + readyState: string + remoteAddress: string + upgraded: boolean + request: any + transport: any constructor(server: any, channel: any) { - super(); - this.server = server; - this.readyState = 'open'; + super() + this.server = server + this.readyState = 'open' this.remoteAddress = channel.remoteAddress() + '' - this.upgraded = true; - this.request = channel.attr(AttributeKeys.Request).get(); - this.transport = null; + this.upgraded = true + this.request = channel.attr(AttributeKeys.Request).get() + this.transport = null - this.channel = channel; - this._id = channel.id(); + this.channel = channel + this._id = channel.id() + '' } get id() { - return this._id; + return this._id } send(text: string) { this.channel.writeAndFlush(new TextWebSocketFrame(text)) } close() { - this.channel.close(); + this.channel.close() } } diff --git a/packages/websocket/src/server/index.ts b/packages/websocket/src/server/index.ts index 4b1cfb07..7dc96fb1 100644 --- a/packages/websocket/src/server/index.ts +++ b/packages/websocket/src/server/index.ts @@ -9,14 +9,14 @@ import { WebSocketDetect } from './websocket_detect' import { WebSocketHandler } from './websocket_handler' class NettyWebSocketServer extends EventEmitter { - private pipeline: any; - private allClients: { [key: string]: NettyClient }; + private pipeline: any + private clients: Map constructor(pipeline: any, options: ServerOptions) { super() - this.allClients = {}; - this.pipeline = pipeline; - let connectEvent = options.event; + this.clients = new Map() + this.pipeline = pipeline + let connectEvent = options.event try { this.pipeline.remove(Keys.Detect) } catch (error) { } this.pipeline.addFirst(Keys.Detect, new WebSocketDetect(connectEvent).getHandler()) connectEvent.on(ServerEvent.detect, (ctx, channel) => { @@ -24,22 +24,42 @@ class NettyWebSocketServer extends EventEmitter { ctx.fireChannelRead(channel) }) connectEvent.on(ServerEvent.connect, (ctx) => { + let cid = ctx?.channel().id() + '' let nettyClient = new NettyClient(this, ctx.channel()) - this.allClients[nettyClient.id] = nettyClient + this.clients.set(cid, nettyClient) this.emit(ServerEvent.connect, nettyClient) }) connectEvent.on(ServerEvent.message, (ctx, msg) => { - this.emit(ServerEvent.message, this.allClients[ctx.channel().id()], msg.text()) + let cid = ctx?.channel().id() + '' + if (this.clients.has(cid)) { + this.emit(ServerEvent.message, this.clients.get(cid), msg.text()) + } else { + console.error(`unknow client ${ctx} reciver message ${msg.text()}`) + } + }) + connectEvent.on(ServerEvent.disconnect, (ctx, cause) => { + let cid = ctx?.channel().id() + '' + if (this.clients.has(cid)) { + this.emit(ServerEvent.disconnect, this.clients.get(cid), cause) + } else { + console.error(`unknow client ${ctx} disconnect cause ${cause}`) + } }) connectEvent.on(ServerEvent.error, (ctx, cause) => { - this.emit(ServerEvent.error, this.allClients[ctx.channel().id()], cause) + let cid = ctx?.channel().id() + '' + if (this.clients.has(cid)) { + this.emit(ServerEvent.error, this.clients.get(cid), cause) + } else { + console.error(`unknow client ${ctx} cause error ${cause}`) + console.ex(cause) + } }) } close() { if (this.pipeline.names().contains(Keys.Detect)) { this.pipeline.remove(Keys.Detect) } - Object.values(this.allClients).forEach(client => client.close()) + this.clients.forEach(client => client.close()) } } diff --git a/packages/websocket/src/server/websocket_detect.ts b/packages/websocket/src/server/websocket_detect.ts index 0cf06aab..a321e769 100644 --- a/packages/websocket/src/server/websocket_detect.ts +++ b/packages/websocket/src/server/websocket_detect.ts @@ -3,15 +3,19 @@ import { WebSocketHandlerAdapter } from "../netty" import { ServerEvent } from '../socket-io/constants' export class WebSocketDetect extends WebSocketHandlerAdapter { - private event: EventEmitter; + private event: EventEmitter constructor(event: EventEmitter) { super() - this.event = event; + this.event = event } channelRead(ctx: any, channel: any) { - this.event.emit(ServerEvent.detect, ctx, channel); + this.event.emit(ServerEvent.detect, ctx, channel) + } + channelUnregistered(ctx: any) { + this.event.emit(ServerEvent.disconnect, ctx, 'client disconnect') + ctx.fireChannelUnregistered() } exceptionCaught(ctx: any, cause: Error) { - this.event.emit(ServerEvent.error, ctx, cause); + this.event.emit(ServerEvent.error, ctx, cause) } } diff --git a/packages/websocket/src/server/websocket_handler.ts b/packages/websocket/src/server/websocket_handler.ts index cbc29ba0..e2ed4dd7 100644 --- a/packages/websocket/src/server/websocket_handler.ts +++ b/packages/websocket/src/server/websocket_handler.ts @@ -13,10 +13,10 @@ const HttpObjectAggregator = Java.type('io.netty.handler.codec.http.HttpObjectAg const WebSocketServerProtocolHandler = Java.type('io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler') export class WebSocketHandler extends WebSocketHandlerAdapter { - private options: ServerOptions; + private options: ServerOptions constructor(options: ServerOptions) { super() - this.options = options; + this.options = options } channelRead(ctx: any, msg: any) { msg.markReaderIndex() @@ -32,6 +32,7 @@ export class WebSocketHandler extends WebSocketHandlerAdapter { pipeline.addLast('chunk', new ChunkedWriteHandler()) pipeline.addLast('httpobj', new HttpObjectAggregator(64 * 1024)) pipeline.addLast('http_request', new HttpRequestHandler(this.options).getHandler()) + // this.options.path, null, false, 655360, false, true, false, 10000 pipeline.addLast('websocket', new WebSocketServerProtocolHandler(this.options.path, true)) pipeline.addLast('websocket_handler', new TextWebSocketFrameHandler(this.options).getHandler()) } @@ -39,6 +40,12 @@ export class WebSocketHandler extends WebSocketHandlerAdapter { msg.resetReaderIndex() ctx.fireChannelRead(msg) } + + channelUnregistered(ctx: any) { + this.options.event.emit(ServerEvent.disconnect, ctx, 'client disconnect') + ctx.fireChannelUnregistered() + } + exceptionCaught(ctx: any, cause: Error) { this.options.event.emit(ServerEvent.error, ctx, cause) } diff --git a/packages/websocket/src/socket-io/index.ts b/packages/websocket/src/socket-io/index.ts index dfed8c05..d25a0bd9 100644 --- a/packages/websocket/src/socket-io/index.ts +++ b/packages/websocket/src/socket-io/index.ts @@ -21,7 +21,7 @@ interface WebSocketServer extends EventEmitter { class Server implements SocketIO.Server { private websocketServer: WebSocketServer - private allClients: { [key: string]: Client } + private allClients: Map engine: { ws: any } nsps: { [namespace: string]: Namespace } @@ -35,7 +35,7 @@ class Server implements SocketIO.Server { constructor(instance: any, options: ServerOptions) { if (!instance) { throw new Error('instance can\'t be undefiend!') } - this.allClients = {} + this.allClients = new Map() this.nsps = {} this.sockets = new Namespace('/', this) this.nsps['/'] = this.sockets @@ -99,18 +99,17 @@ class Server implements SocketIO.Server { bind(srv: any): SocketIO.Server { throw new Error("Method not implemented.") } - onconnection(socket: Client): SocketIO.Server { - this.allClients[socket.id] = socket - socket.packet({ + onconnection(client: Client): SocketIO.Server { + client.packet({ type: PacketTypes.OPEN, data: { - sid: socket.id, + sid: client.id, upgrades: [], pingInterval: 25000, pingTimeout: 5000 } }) - this.sockets.add(socket) + this.sockets.add(client) return this } of(nsp: string): Namespace { @@ -164,20 +163,35 @@ class Server implements SocketIO.Server { private initServer() { this.websocketServer.on(ServerEvent.connect, (socket: SocketIO.EngineSocket) => { let client = new Client(this, socket) + this.allClients.set(socket.id, client) this.onconnection(client) }) this.websocketServer.on(ServerEvent.message, (socket: SocketIO.EngineSocket, text) => { - this.processPacket(this.parser.decode(text), this.allClients[socket.id]) + if (this.allClients.has(socket.id)) { + this.processPacket(this.parser.decode(text), this.allClients.get(socket.id)) + } else { + console.error(`unknow engine socket ${socket.id} reciver message ${text}`) + } }) this.websocketServer.on(ServerEvent.disconnect, (socket: SocketIO.EngineSocket, reason) => { - this.allClients[socket.id].onclose(reason) - delete this.allClients[socket.id] + if (this.allClients.has(socket.id)) { + this.allClients.get(socket.id).onclose(reason) + this.allClients.delete(socket.id) + } else { + console.error(`unknow engine socket ${socket?.id} disconnect cause ${reason}`) + } }) this.websocketServer.on(ServerEvent.error, (socket: SocketIO.EngineSocket, cause) => { - if (socket.listeners(ServerEvent.error).length) { - socket.emit(ServerEvent.error, cause) + if (this.allClients.has(socket?.id)) { + if (socket.listeners(ServerEvent.error).length) { + socket.emit(ServerEvent.error, cause) + } else { + console.error(`engine socket ${socket.id} cause error: ${cause}`) + console.ex(cause) + } } else { - console.error(`client ${socket.id} cause error: ${cause}`) + console.error(`unknow engine socket ${socket?.id} cause error: ${cause}`) + console.ex(cause) } }) } @@ -219,5 +233,6 @@ export { Server, Socket, Client, + Namespace, ServerOptions } diff --git a/packages/websocket/src/socket-io/namespace.ts b/packages/websocket/src/socket-io/namespace.ts index 5ce7debe..22248105 100644 --- a/packages/websocket/src/socket-io/namespace.ts +++ b/packages/websocket/src/socket-io/namespace.ts @@ -1,40 +1,40 @@ import { EventEmitter } from 'events' import { Client } from './client' -import { SocketIO } from './interfaces'; -import { ServerEvent } from './constants'; -import { Socket } from './socket'; -import { Adapter } from './adapter'; +import { SocketIO } from './interfaces' +import { ServerEvent } from './constants' +import { Socket } from './socket' +import { Adapter } from './adapter' import { Server } from './index' -import { Packet } from './packet'; -import { PacketTypes, SubPacketTypes } from './types'; +import { Packet } from './packet' +import { PacketTypes, SubPacketTypes } from './types' export class Namespace extends EventEmitter implements SocketIO.Namespace { - name: string; - server: Server; - sockets: { [id: string]: Socket; }; - connected: { [id: string]: Socket; }; - adapter: SocketIO.Adapter; - json: SocketIO.Namespace; + name: string + server: Server + sockets: { [id: string]: Socket } + connected: { [id: string]: Socket } + adapter: SocketIO.Adapter + json: SocketIO.Namespace - fns: any[]; - ids: number; - rooms: string[]; - flags: { [key: string]: boolean }; + fns: any[] + ids: number + rooms: string[] + flags: { [key: string]: boolean } private events = ['connect', 'connection', 'newListener'] constructor(name: string, server: Server) { - super(); - this.name = name; - this.server = server; - this.sockets = {}; - this.connected = {}; - this.fns = []; - this.ids = 0; - this.rooms = []; - this.flags = {}; - this.adapter = new Adapter(this); + super() + this.name = name + this.server = server + this.sockets = {} + this.connected = {} + this.fns = [] + this.ids = 0 + this.rooms = [] + this.flags = {} + this.adapter = new Adapter(this) } initAdapter() { // @ts-ignore @@ -42,39 +42,39 @@ export class Namespace extends EventEmitter implements SocketIO.Namespace { } add(client: Client, query?: any, callback?: () => void) { // client.conn.request.url(); - let socket = new Socket(this, client, {}); - this.sockets[client.id] = socket; - client.nsps[this.name] = socket; - this.onconnection(socket); - return socket; + let socket = new Socket(this, client, {}) + this.sockets[client.id] = socket + client.nsps[this.name] = socket + this.onconnection(socket) + return socket } del(client: Client) { - let socket = this.sockets[client.id]; - socket.disconnect(); - delete this.sockets[client.id]; + let socket = this.sockets[client.id] + socket.disconnect() + delete this.sockets[client.id] } use(fn: (socket: SocketIO.Socket, fn: (err?: any) => void) => void): SocketIO.Namespace { - throw new Error("Method not implemented."); + throw new Error("Method not implemented.") } to(room: string): SocketIO.Namespace { - if (!~this.rooms.indexOf(room)) this.rooms.push(room); - return this; + if (!~this.rooms.indexOf(room)) this.rooms.push(room) + return this } in(room: string): SocketIO.Namespace { - return this.to(room); + return this.to(room) } send(...args: any[]): SocketIO.Namespace { super.emit('message', ...args) - return this; + return this } write(...args: any[]): SocketIO.Namespace { - return this.send(...args); + return this.send(...args) } emit(event: string, ...args: any[]): boolean { if (~this.events.indexOf(event)) { - super.emit(event, ...args); + super.emit(event, ...args) // @ts-ignore - return this; + return this } // set up packet object var packet = { @@ -85,54 +85,58 @@ export class Namespace extends EventEmitter implements SocketIO.Namespace { } if ('function' == typeof args[args.length - 1]) { - throw new Error('Callbacks are not supported when broadcasting'); + throw new Error('Callbacks are not supported when broadcasting') } - var rooms = this.rooms.slice(0); - var flags = Object.assign({}, this.flags); + var rooms = this.rooms.slice(0) + var flags = Object.assign({}, this.flags) // reset flags - this.rooms = []; - this.flags = {}; + this.rooms = [] + this.flags = {} this.adapter.broadcast(packet, { rooms: rooms, flags: flags - }); + }) // @ts-ignore - return this; + return this } hasBin(args: any[]) { - return false; + return false } - clients(fn: Function): SocketIO.Namespace { + clients(fn: (sockets: Socket[]) => SocketIO.Namespace): SocketIO.Namespace { return fn(Object.values(this.sockets)) } compress(compress: boolean): SocketIO.Namespace { - throw new Error("Method not implemented."); + throw new Error("Method not implemented.") } process(packet: Packet, client: Client) { switch (packet.sub_type) { case SubPacketTypes.CONNECT: - this.add(client); - break; + this.add(client) + break default: - this.sockets[client.id].onpacket(packet); - break; + this.sockets[client.id].onpacket(packet) + break } } remove(socket: Socket) { if (this.sockets.hasOwnProperty(socket.id)) { - delete this.sockets[socket.id]; + delete this.sockets[socket.id] } else { // debug('ignoring remove for %s', socket.id); } } + close() { + this.removeAllListeners('connect') + Object.values(this.sockets).forEach(socket => socket.disconnect(false)) + } private onconnection(socket: any) { - let client = socket as Socket; - this.sockets[client.id] = client; + let client = socket as Socket + this.sockets[client.id] = client + this.emit(ServerEvent.connect, socket) client.onconnect() - this.emit(ServerEvent.connect, socket); - this.emit(ServerEvent.connection, socket); + this.emit(ServerEvent.connection, socket) } } diff --git a/packages/websocket/src/tomcat/client.ts b/packages/websocket/src/tomcat/client.ts index 91a6f4d3..4d516880 100644 --- a/packages/websocket/src/tomcat/client.ts +++ b/packages/websocket/src/tomcat/client.ts @@ -25,7 +25,7 @@ export class TomcatClient extends EventEmitter implements SocketIO.EngineSocket this.transport = null this.session = session - this._id = session.getId() + this._id = session.getId() + '' } get id() { diff --git a/packages/websocket/src/tomcat/server.ts b/packages/websocket/src/tomcat/server.ts index db287340..adce7ca3 100644 --- a/packages/websocket/src/tomcat/server.ts +++ b/packages/websocket/src/tomcat/server.ts @@ -15,28 +15,45 @@ type TomcatWebSocketSession = javax.websocket.Session class TomcatWebSocketServer extends EventEmitter { private beanFactory: any private executor: any - private allClients: { [key: string]: SocketIO.EngineSocket } + private clients: Map constructor(beanFactory: any, options: ServerOptions) { super() - this.allClients = {} + this.clients = new Map() this.beanFactory = beanFactory this.initThreadPool() try { this.beanFactory.destroySingleton(ProxyBeanName) } catch (error) { } let NashornWebSocketServerProxy = Java.extend(WebSocketServerProxy, { onOpen: (session: TomcatWebSocketSession) => { + let cid = `${session?.getId()}` let tomcatClient = new TomcatClient(this, session) - this.allClients[session.getId()] = tomcatClient + this.clients.set(cid, tomcatClient) this.emit(ServerEvent.connect, tomcatClient) }, onMessage: (session: TomcatWebSocketSession, message: string) => { - this.executor.execute(() => this.emit(ServerEvent.message, this.allClients[session.getId()], message)) + let cid = `${session?.getId()}` + if (this.clients.has(cid)) { + this.executor.execute(() => this.emit(ServerEvent.message, this.clients.get(cid), message)) + } else { + console.error(`unknow client ${session} reciver message ${message}`) + } }, onClose: (session: TomcatWebSocketSession, reason: any) => { - this.emit(ServerEvent.disconnect, this.allClients[session.getId()], reason) + let cid = `${session?.getId()}` + if (this.clients.has(cid)) { + this.emit(ServerEvent.disconnect, this.clients.get(cid), reason) + } else { + console.error(`unknow client ${session} disconnect cause ${reason}`) + } }, onError: (session: TomcatWebSocketSession, error: Error) => { - this.emit(ServerEvent.error, this.allClients[session.getId()], error) + let cid = `${session?.getId()}` + if (this.clients.has(cid)) { + this.emit(ServerEvent.error, this.clients.get(cid), error) + } else { + console.error(`unknow client ${session} cause error ${error}`) + console.ex(error) + } }, }) this.beanFactory.registerSingleton(ProxyBeanName, new NashornWebSocketServerProxy()) @@ -52,7 +69,7 @@ class TomcatWebSocketServer extends EventEmitter { this.executor.initialize() } close() { - Object.values(this.allClients).forEach(client => client.close()) + this.clients.forEach(client => client.close()) this.beanFactory.destroySingleton(ProxyBeanName) this.executor.shutdown() }