feat: optimize websocket

Signed-off-by: MiaoWoo <admin@yumc.pw>
This commit is contained in:
MiaoWoo 2020-11-13 09:40:47 +08:00
parent d769a9c8ca
commit 7f5929bc87
10 changed files with 195 additions and 126 deletions

View File

@ -3,7 +3,7 @@ const TextWebSocketFrameMatcher = TypeParameterMatcher.get(base.getClass('io.net
const SimpleChannelInboundHandler = Java.type('io.netty.channel.SimpleChannelInboundHandler') const SimpleChannelInboundHandler = Java.type('io.netty.channel.SimpleChannelInboundHandler')
export abstract class TextWebSocketFrameHandlerAdapter { export abstract class TextWebSocketFrameHandlerAdapter {
private _Handler; private _Handler
constructor() { constructor() {
let TextWebSocketFrameHandlerAdapterImpl = Java.extend(SimpleChannelInboundHandler, { let TextWebSocketFrameHandlerAdapterImpl = Java.extend(SimpleChannelInboundHandler, {
userEventTriggered: this.userEventTriggered.bind(this), userEventTriggered: this.userEventTriggered.bind(this),
@ -13,12 +13,12 @@ export abstract class TextWebSocketFrameHandlerAdapter {
channelRead0: this.channelRead0.bind(this), channelRead0: this.channelRead0.bind(this),
exceptionCaught: this.exceptionCaught.bind(this) exceptionCaught: this.exceptionCaught.bind(this)
}) })
this._Handler = new TextWebSocketFrameHandlerAdapterImpl(); this._Handler = new TextWebSocketFrameHandlerAdapterImpl()
} }
abstract userEventTriggered(ctx: any, evt: any); abstract userEventTriggered(ctx: any, evt: any)
abstract channelRead0(ctx: any, msg: any); abstract channelRead0(ctx: any, msg: any)
abstract exceptionCaught(ctx: any, cause: Error); abstract exceptionCaught(ctx: any, cause: Error)
getHandler() { getHandler() {
return this._Handler; return this._Handler
} }
} }

View File

@ -1,17 +1,19 @@
const ChannelInboundHandlerAdapter = Java.type('io.netty.channel.ChannelInboundHandlerAdapter') const ChannelInboundHandlerAdapter = Java.type('io.netty.channel.ChannelInboundHandlerAdapter')
export abstract class WebSocketHandlerAdapter { export abstract class WebSocketHandlerAdapter {
private _Handler; private _Handler
constructor() { constructor() {
let ChannelInboundHandlerAdapterImpl = Java.extend(ChannelInboundHandlerAdapter, { let ChannelInboundHandlerAdapterImpl = Java.extend(ChannelInboundHandlerAdapter, {
channelRead: this.channelRead.bind(this), channelRead: this.channelRead.bind(this),
channelUnregistered: this.exceptionCaught.bind(this),
exceptionCaught: this.exceptionCaught.bind(this) exceptionCaught: this.exceptionCaught.bind(this)
}) })
this._Handler = new ChannelInboundHandlerAdapterImpl() this._Handler = new ChannelInboundHandlerAdapterImpl()
} }
abstract channelRead(ctx: any, channel: any); abstract channelRead(ctx: any, channel: any)
abstract exceptionCaught(ctx: any, cause: Error); abstract channelUnregistered(ctx: any)
abstract exceptionCaught(ctx: any, cause: Error)
getHandler() { getHandler() {
return this._Handler; return this._Handler
} }
} }

View File

@ -1,40 +1,40 @@
import { EventEmitter } from 'events' import { EventEmitter } from 'events'
import { SocketIO } from '../socket-io/interfaces'; import { SocketIO } from '../socket-io/interfaces'
import { AttributeKeys } from './constants'; import { AttributeKeys } from './constants'
const TextWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.TextWebSocketFrame') const TextWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.TextWebSocketFrame')
export class NettyClient extends EventEmitter implements SocketIO.EngineSocket { export class NettyClient extends EventEmitter implements SocketIO.EngineSocket {
private _id: string; private _id: string
private channel: any private channel: any
server: any; server: any
readyState: string; readyState: string
remoteAddress: string; remoteAddress: string
upgraded: boolean; upgraded: boolean
request: any; request: any
transport: any; transport: any
constructor(server: any, channel: any) { constructor(server: any, channel: any) {
super(); super()
this.server = server; this.server = server
this.readyState = 'open'; this.readyState = 'open'
this.remoteAddress = channel.remoteAddress() + '' this.remoteAddress = channel.remoteAddress() + ''
this.upgraded = true; this.upgraded = true
this.request = channel.attr(AttributeKeys.Request).get(); this.request = channel.attr(AttributeKeys.Request).get()
this.transport = null; this.transport = null
this.channel = channel; this.channel = channel
this._id = channel.id(); this._id = channel.id() + ''
} }
get id() { get id() {
return this._id; return this._id
} }
send(text: string) { send(text: string) {
this.channel.writeAndFlush(new TextWebSocketFrame(text)) this.channel.writeAndFlush(new TextWebSocketFrame(text))
} }
close() { close() {
this.channel.close(); this.channel.close()
} }
} }

View File

@ -9,14 +9,14 @@ import { WebSocketDetect } from './websocket_detect'
import { WebSocketHandler } from './websocket_handler' import { WebSocketHandler } from './websocket_handler'
class NettyWebSocketServer extends EventEmitter { class NettyWebSocketServer extends EventEmitter {
private pipeline: any; private pipeline: any
private allClients: { [key: string]: NettyClient }; private clients: Map<string, NettyClient>
constructor(pipeline: any, options: ServerOptions) { constructor(pipeline: any, options: ServerOptions) {
super() super()
this.allClients = {}; this.clients = new Map()
this.pipeline = pipeline; this.pipeline = pipeline
let connectEvent = options.event; let connectEvent = options.event
try { this.pipeline.remove(Keys.Detect) } catch (error) { } try { this.pipeline.remove(Keys.Detect) } catch (error) { }
this.pipeline.addFirst(Keys.Detect, new WebSocketDetect(connectEvent).getHandler()) this.pipeline.addFirst(Keys.Detect, new WebSocketDetect(connectEvent).getHandler())
connectEvent.on(ServerEvent.detect, (ctx, channel) => { connectEvent.on(ServerEvent.detect, (ctx, channel) => {
@ -24,22 +24,42 @@ class NettyWebSocketServer extends EventEmitter {
ctx.fireChannelRead(channel) ctx.fireChannelRead(channel)
}) })
connectEvent.on(ServerEvent.connect, (ctx) => { connectEvent.on(ServerEvent.connect, (ctx) => {
let cid = ctx?.channel().id() + ''
let nettyClient = new NettyClient(this, ctx.channel()) let nettyClient = new NettyClient(this, ctx.channel())
this.allClients[nettyClient.id] = nettyClient this.clients.set(cid, nettyClient)
this.emit(ServerEvent.connect, nettyClient) this.emit(ServerEvent.connect, nettyClient)
}) })
connectEvent.on(ServerEvent.message, (ctx, msg) => { connectEvent.on(ServerEvent.message, (ctx, msg) => {
this.emit(ServerEvent.message, this.allClients[ctx.channel().id()], msg.text()) let cid = ctx?.channel().id() + ''
if (this.clients.has(cid)) {
this.emit(ServerEvent.message, this.clients.get(cid), msg.text())
} else {
console.error(`unknow client ${ctx} reciver message ${msg.text()}`)
}
})
connectEvent.on(ServerEvent.disconnect, (ctx, cause) => {
let cid = ctx?.channel().id() + ''
if (this.clients.has(cid)) {
this.emit(ServerEvent.disconnect, this.clients.get(cid), cause)
} else {
console.error(`unknow client ${ctx} disconnect cause ${cause}`)
}
}) })
connectEvent.on(ServerEvent.error, (ctx, cause) => { connectEvent.on(ServerEvent.error, (ctx, cause) => {
this.emit(ServerEvent.error, this.allClients[ctx.channel().id()], cause) let cid = ctx?.channel().id() + ''
if (this.clients.has(cid)) {
this.emit(ServerEvent.error, this.clients.get(cid), cause)
} else {
console.error(`unknow client ${ctx} cause error ${cause}`)
console.ex(cause)
}
}) })
} }
close() { close() {
if (this.pipeline.names().contains(Keys.Detect)) { if (this.pipeline.names().contains(Keys.Detect)) {
this.pipeline.remove(Keys.Detect) this.pipeline.remove(Keys.Detect)
} }
Object.values(this.allClients).forEach(client => client.close()) this.clients.forEach(client => client.close())
} }
} }

View File

@ -3,15 +3,19 @@ import { WebSocketHandlerAdapter } from "../netty"
import { ServerEvent } from '../socket-io/constants' import { ServerEvent } from '../socket-io/constants'
export class WebSocketDetect extends WebSocketHandlerAdapter { export class WebSocketDetect extends WebSocketHandlerAdapter {
private event: EventEmitter; private event: EventEmitter
constructor(event: EventEmitter) { constructor(event: EventEmitter) {
super() super()
this.event = event; this.event = event
} }
channelRead(ctx: any, channel: any) { channelRead(ctx: any, channel: any) {
this.event.emit(ServerEvent.detect, ctx, channel); this.event.emit(ServerEvent.detect, ctx, channel)
}
channelUnregistered(ctx: any) {
this.event.emit(ServerEvent.disconnect, ctx, 'client disconnect')
ctx.fireChannelUnregistered()
} }
exceptionCaught(ctx: any, cause: Error) { exceptionCaught(ctx: any, cause: Error) {
this.event.emit(ServerEvent.error, ctx, cause); this.event.emit(ServerEvent.error, ctx, cause)
} }
} }

View File

@ -13,10 +13,10 @@ const HttpObjectAggregator = Java.type('io.netty.handler.codec.http.HttpObjectAg
const WebSocketServerProtocolHandler = Java.type('io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler') const WebSocketServerProtocolHandler = Java.type('io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler')
export class WebSocketHandler extends WebSocketHandlerAdapter { export class WebSocketHandler extends WebSocketHandlerAdapter {
private options: ServerOptions; private options: ServerOptions
constructor(options: ServerOptions) { constructor(options: ServerOptions) {
super() super()
this.options = options; this.options = options
} }
channelRead(ctx: any, msg: any) { channelRead(ctx: any, msg: any) {
msg.markReaderIndex() msg.markReaderIndex()
@ -32,6 +32,7 @@ export class WebSocketHandler extends WebSocketHandlerAdapter {
pipeline.addLast('chunk', new ChunkedWriteHandler()) pipeline.addLast('chunk', new ChunkedWriteHandler())
pipeline.addLast('httpobj', new HttpObjectAggregator(64 * 1024)) pipeline.addLast('httpobj', new HttpObjectAggregator(64 * 1024))
pipeline.addLast('http_request', new HttpRequestHandler(this.options).getHandler()) pipeline.addLast('http_request', new HttpRequestHandler(this.options).getHandler())
// this.options.path, null, false, 655360, false, true, false, 10000
pipeline.addLast('websocket', new WebSocketServerProtocolHandler(this.options.path, true)) pipeline.addLast('websocket', new WebSocketServerProtocolHandler(this.options.path, true))
pipeline.addLast('websocket_handler', new TextWebSocketFrameHandler(this.options).getHandler()) pipeline.addLast('websocket_handler', new TextWebSocketFrameHandler(this.options).getHandler())
} }
@ -39,6 +40,12 @@ export class WebSocketHandler extends WebSocketHandlerAdapter {
msg.resetReaderIndex() msg.resetReaderIndex()
ctx.fireChannelRead(msg) ctx.fireChannelRead(msg)
} }
channelUnregistered(ctx: any) {
this.options.event.emit(ServerEvent.disconnect, ctx, 'client disconnect')
ctx.fireChannelUnregistered()
}
exceptionCaught(ctx: any, cause: Error) { exceptionCaught(ctx: any, cause: Error) {
this.options.event.emit(ServerEvent.error, ctx, cause) this.options.event.emit(ServerEvent.error, ctx, cause)
} }

View File

@ -21,7 +21,7 @@ interface WebSocketServer extends EventEmitter {
class Server implements SocketIO.Server { class Server implements SocketIO.Server {
private websocketServer: WebSocketServer private websocketServer: WebSocketServer
private allClients: { [key: string]: Client } private allClients: Map<string, Client>
engine: { ws: any } engine: { ws: any }
nsps: { [namespace: string]: Namespace } nsps: { [namespace: string]: Namespace }
@ -35,7 +35,7 @@ class Server implements SocketIO.Server {
constructor(instance: any, options: ServerOptions) { constructor(instance: any, options: ServerOptions) {
if (!instance) { throw new Error('instance can\'t be undefiend!') } if (!instance) { throw new Error('instance can\'t be undefiend!') }
this.allClients = {} this.allClients = new Map()
this.nsps = {} this.nsps = {}
this.sockets = new Namespace('/', this) this.sockets = new Namespace('/', this)
this.nsps['/'] = this.sockets this.nsps['/'] = this.sockets
@ -99,18 +99,17 @@ class Server implements SocketIO.Server {
bind(srv: any): SocketIO.Server { bind(srv: any): SocketIO.Server {
throw new Error("Method not implemented.") throw new Error("Method not implemented.")
} }
onconnection(socket: Client): SocketIO.Server { onconnection(client: Client): SocketIO.Server {
this.allClients[socket.id] = socket client.packet({
socket.packet({
type: PacketTypes.OPEN, type: PacketTypes.OPEN,
data: { data: {
sid: socket.id, sid: client.id,
upgrades: [], upgrades: [],
pingInterval: 25000, pingInterval: 25000,
pingTimeout: 5000 pingTimeout: 5000
} }
}) })
this.sockets.add(socket) this.sockets.add(client)
return this return this
} }
of(nsp: string): Namespace { of(nsp: string): Namespace {
@ -164,20 +163,35 @@ class Server implements SocketIO.Server {
private initServer() { private initServer() {
this.websocketServer.on(ServerEvent.connect, (socket: SocketIO.EngineSocket) => { this.websocketServer.on(ServerEvent.connect, (socket: SocketIO.EngineSocket) => {
let client = new Client(this, socket) let client = new Client(this, socket)
this.allClients.set(socket.id, client)
this.onconnection(client) this.onconnection(client)
}) })
this.websocketServer.on(ServerEvent.message, (socket: SocketIO.EngineSocket, text) => { this.websocketServer.on(ServerEvent.message, (socket: SocketIO.EngineSocket, text) => {
this.processPacket(this.parser.decode(text), this.allClients[socket.id]) if (this.allClients.has(socket.id)) {
this.processPacket(this.parser.decode(text), this.allClients.get(socket.id))
} else {
console.error(`unknow engine socket ${socket.id} reciver message ${text}`)
}
}) })
this.websocketServer.on(ServerEvent.disconnect, (socket: SocketIO.EngineSocket, reason) => { this.websocketServer.on(ServerEvent.disconnect, (socket: SocketIO.EngineSocket, reason) => {
this.allClients[socket.id].onclose(reason) if (this.allClients.has(socket.id)) {
delete this.allClients[socket.id] this.allClients.get(socket.id).onclose(reason)
this.allClients.delete(socket.id)
} else {
console.error(`unknow engine socket ${socket?.id} disconnect cause ${reason}`)
}
}) })
this.websocketServer.on(ServerEvent.error, (socket: SocketIO.EngineSocket, cause) => { this.websocketServer.on(ServerEvent.error, (socket: SocketIO.EngineSocket, cause) => {
if (socket.listeners(ServerEvent.error).length) { if (this.allClients.has(socket?.id)) {
socket.emit(ServerEvent.error, cause) if (socket.listeners(ServerEvent.error).length) {
socket.emit(ServerEvent.error, cause)
} else {
console.error(`engine socket ${socket.id} cause error: ${cause}`)
console.ex(cause)
}
} else { } else {
console.error(`client ${socket.id} cause error: ${cause}`) console.error(`unknow engine socket ${socket?.id} cause error: ${cause}`)
console.ex(cause)
} }
}) })
} }
@ -219,5 +233,6 @@ export {
Server, Server,
Socket, Socket,
Client, Client,
Namespace,
ServerOptions ServerOptions
} }

View File

@ -1,40 +1,40 @@
import { EventEmitter } from 'events' import { EventEmitter } from 'events'
import { Client } from './client' import { Client } from './client'
import { SocketIO } from './interfaces'; import { SocketIO } from './interfaces'
import { ServerEvent } from './constants'; import { ServerEvent } from './constants'
import { Socket } from './socket'; import { Socket } from './socket'
import { Adapter } from './adapter'; import { Adapter } from './adapter'
import { Server } from './index' import { Server } from './index'
import { Packet } from './packet'; import { Packet } from './packet'
import { PacketTypes, SubPacketTypes } from './types'; import { PacketTypes, SubPacketTypes } from './types'
export class Namespace extends EventEmitter implements SocketIO.Namespace { export class Namespace extends EventEmitter implements SocketIO.Namespace {
name: string; name: string
server: Server; server: Server
sockets: { [id: string]: Socket; }; sockets: { [id: string]: Socket }
connected: { [id: string]: Socket; }; connected: { [id: string]: Socket }
adapter: SocketIO.Adapter; adapter: SocketIO.Adapter
json: SocketIO.Namespace; json: SocketIO.Namespace
fns: any[]; fns: any[]
ids: number; ids: number
rooms: string[]; rooms: string[]
flags: { [key: string]: boolean }; flags: { [key: string]: boolean }
private events = ['connect', 'connection', 'newListener'] private events = ['connect', 'connection', 'newListener']
constructor(name: string, server: Server) { constructor(name: string, server: Server) {
super(); super()
this.name = name; this.name = name
this.server = server; this.server = server
this.sockets = {}; this.sockets = {}
this.connected = {}; this.connected = {}
this.fns = []; this.fns = []
this.ids = 0; this.ids = 0
this.rooms = []; this.rooms = []
this.flags = {}; this.flags = {}
this.adapter = new Adapter(this); this.adapter = new Adapter(this)
} }
initAdapter() { initAdapter() {
// @ts-ignore // @ts-ignore
@ -42,39 +42,39 @@ export class Namespace extends EventEmitter implements SocketIO.Namespace {
} }
add(client: Client, query?: any, callback?: () => void) { add(client: Client, query?: any, callback?: () => void) {
// client.conn.request.url(); // client.conn.request.url();
let socket = new Socket(this, client, {}); let socket = new Socket(this, client, {})
this.sockets[client.id] = socket; this.sockets[client.id] = socket
client.nsps[this.name] = socket; client.nsps[this.name] = socket
this.onconnection(socket); this.onconnection(socket)
return socket; return socket
} }
del(client: Client) { del(client: Client) {
let socket = this.sockets[client.id]; let socket = this.sockets[client.id]
socket.disconnect(); socket.disconnect()
delete this.sockets[client.id]; delete this.sockets[client.id]
} }
use(fn: (socket: SocketIO.Socket, fn: (err?: any) => void) => void): SocketIO.Namespace { use(fn: (socket: SocketIO.Socket, fn: (err?: any) => void) => void): SocketIO.Namespace {
throw new Error("Method not implemented."); throw new Error("Method not implemented.")
} }
to(room: string): SocketIO.Namespace { to(room: string): SocketIO.Namespace {
if (!~this.rooms.indexOf(room)) this.rooms.push(room); if (!~this.rooms.indexOf(room)) this.rooms.push(room)
return this; return this
} }
in(room: string): SocketIO.Namespace { in(room: string): SocketIO.Namespace {
return this.to(room); return this.to(room)
} }
send(...args: any[]): SocketIO.Namespace { send(...args: any[]): SocketIO.Namespace {
super.emit('message', ...args) super.emit('message', ...args)
return this; return this
} }
write(...args: any[]): SocketIO.Namespace { write(...args: any[]): SocketIO.Namespace {
return this.send(...args); return this.send(...args)
} }
emit(event: string, ...args: any[]): boolean { emit(event: string, ...args: any[]): boolean {
if (~this.events.indexOf(event)) { if (~this.events.indexOf(event)) {
super.emit(event, ...args); super.emit(event, ...args)
// @ts-ignore // @ts-ignore
return this; return this
} }
// set up packet object // set up packet object
var packet = { var packet = {
@ -85,54 +85,58 @@ export class Namespace extends EventEmitter implements SocketIO.Namespace {
} }
if ('function' == typeof args[args.length - 1]) { if ('function' == typeof args[args.length - 1]) {
throw new Error('Callbacks are not supported when broadcasting'); throw new Error('Callbacks are not supported when broadcasting')
} }
var rooms = this.rooms.slice(0); var rooms = this.rooms.slice(0)
var flags = Object.assign({}, this.flags); var flags = Object.assign({}, this.flags)
// reset flags // reset flags
this.rooms = []; this.rooms = []
this.flags = {}; this.flags = {}
this.adapter.broadcast(packet, { this.adapter.broadcast(packet, {
rooms: rooms, rooms: rooms,
flags: flags flags: flags
}); })
// @ts-ignore // @ts-ignore
return this; return this
} }
hasBin(args: any[]) { hasBin(args: any[]) {
return false; return false
} }
clients(fn: Function): SocketIO.Namespace { clients(fn: (sockets: Socket[]) => SocketIO.Namespace): SocketIO.Namespace {
return fn(Object.values(this.sockets)) return fn(Object.values(this.sockets))
} }
compress(compress: boolean): SocketIO.Namespace { compress(compress: boolean): SocketIO.Namespace {
throw new Error("Method not implemented."); throw new Error("Method not implemented.")
} }
process(packet: Packet, client: Client) { process(packet: Packet, client: Client) {
switch (packet.sub_type) { switch (packet.sub_type) {
case SubPacketTypes.CONNECT: case SubPacketTypes.CONNECT:
this.add(client); this.add(client)
break; break
default: default:
this.sockets[client.id].onpacket(packet); this.sockets[client.id].onpacket(packet)
break; break
} }
} }
remove(socket: Socket) { remove(socket: Socket) {
if (this.sockets.hasOwnProperty(socket.id)) { if (this.sockets.hasOwnProperty(socket.id)) {
delete this.sockets[socket.id]; delete this.sockets[socket.id]
} else { } else {
// debug('ignoring remove for %s', socket.id); // debug('ignoring remove for %s', socket.id);
} }
} }
close() {
this.removeAllListeners('connect')
Object.values(this.sockets).forEach(socket => socket.disconnect(false))
}
private onconnection(socket: any) { private onconnection(socket: any) {
let client = socket as Socket; let client = socket as Socket
this.sockets[client.id] = client; this.sockets[client.id] = client
this.emit(ServerEvent.connect, socket)
client.onconnect() client.onconnect()
this.emit(ServerEvent.connect, socket); this.emit(ServerEvent.connection, socket)
this.emit(ServerEvent.connection, socket);
} }
} }

View File

@ -25,7 +25,7 @@ export class TomcatClient extends EventEmitter implements SocketIO.EngineSocket
this.transport = null this.transport = null
this.session = session this.session = session
this._id = session.getId() this._id = session.getId() + ''
} }
get id() { get id() {

View File

@ -15,28 +15,45 @@ type TomcatWebSocketSession = javax.websocket.Session
class TomcatWebSocketServer extends EventEmitter { class TomcatWebSocketServer extends EventEmitter {
private beanFactory: any private beanFactory: any
private executor: any private executor: any
private allClients: { [key: string]: SocketIO.EngineSocket } private clients: Map<string, SocketIO.EngineSocket>
constructor(beanFactory: any, options: ServerOptions) { constructor(beanFactory: any, options: ServerOptions) {
super() super()
this.allClients = {} this.clients = new Map()
this.beanFactory = beanFactory this.beanFactory = beanFactory
this.initThreadPool() this.initThreadPool()
try { this.beanFactory.destroySingleton(ProxyBeanName) } catch (error) { } try { this.beanFactory.destroySingleton(ProxyBeanName) } catch (error) { }
let NashornWebSocketServerProxy = Java.extend(WebSocketServerProxy, { let NashornWebSocketServerProxy = Java.extend(WebSocketServerProxy, {
onOpen: (session: TomcatWebSocketSession) => { onOpen: (session: TomcatWebSocketSession) => {
let cid = `${session?.getId()}`
let tomcatClient = new TomcatClient(this, session) let tomcatClient = new TomcatClient(this, session)
this.allClients[session.getId()] = tomcatClient this.clients.set(cid, tomcatClient)
this.emit(ServerEvent.connect, tomcatClient) this.emit(ServerEvent.connect, tomcatClient)
}, },
onMessage: (session: TomcatWebSocketSession, message: string) => { onMessage: (session: TomcatWebSocketSession, message: string) => {
this.executor.execute(() => this.emit(ServerEvent.message, this.allClients[session.getId()], message)) let cid = `${session?.getId()}`
if (this.clients.has(cid)) {
this.executor.execute(() => this.emit(ServerEvent.message, this.clients.get(cid), message))
} else {
console.error(`unknow client ${session} reciver message ${message}`)
}
}, },
onClose: (session: TomcatWebSocketSession, reason: any) => { onClose: (session: TomcatWebSocketSession, reason: any) => {
this.emit(ServerEvent.disconnect, this.allClients[session.getId()], reason) let cid = `${session?.getId()}`
if (this.clients.has(cid)) {
this.emit(ServerEvent.disconnect, this.clients.get(cid), reason)
} else {
console.error(`unknow client ${session} disconnect cause ${reason}`)
}
}, },
onError: (session: TomcatWebSocketSession, error: Error) => { onError: (session: TomcatWebSocketSession, error: Error) => {
this.emit(ServerEvent.error, this.allClients[session.getId()], error) let cid = `${session?.getId()}`
if (this.clients.has(cid)) {
this.emit(ServerEvent.error, this.clients.get(cid), error)
} else {
console.error(`unknow client ${session} cause error ${error}`)
console.ex(error)
}
}, },
}) })
this.beanFactory.registerSingleton(ProxyBeanName, new NashornWebSocketServerProxy()) this.beanFactory.registerSingleton(ProxyBeanName, new NashornWebSocketServerProxy())
@ -52,7 +69,7 @@ class TomcatWebSocketServer extends EventEmitter {
this.executor.initialize() this.executor.initialize()
} }
close() { close() {
Object.values(this.allClients).forEach(client => client.close()) this.clients.forEach(client => client.close())
this.beanFactory.destroySingleton(ProxyBeanName) this.beanFactory.destroySingleton(ProxyBeanName)
this.executor.shutdown() this.executor.shutdown()
} }