From cf3016e27e73ecb3d8704e0ae1d0270f0b6bbec9 Mon Sep 17 00:00:00 2001 From: MiaoWoo Date: Tue, 26 May 2020 15:45:40 +0800 Subject: [PATCH] feat: support tomcat websocket Signed-off-by: MiaoWoo --- packages/websocket/src/server/client.ts | 2 +- packages/websocket/src/server/constants.ts | 9 --- packages/websocket/src/server/index.ts | 6 +- .../src/server/text_websocket_frame.ts | 2 +- .../websocket/src/server/websocket_detect.ts | 2 +- .../websocket/src/server/websocket_handler.ts | 4 +- packages/websocket/src/socket-io/client.ts | 24 ++++--- packages/websocket/src/socket-io/constants.ts | 8 +++ packages/websocket/src/socket-io/index.ts | 48 +++++++------ .../websocket/src/socket-io/interfaces.ts | 10 +++ packages/websocket/src/socket-io/namespace.ts | 2 +- packages/websocket/src/tomcat/client.ts | 44 ++++++++++++ packages/websocket/src/tomcat/constants.ts | 1 + packages/websocket/src/tomcat/server.ts | 69 +++++++++++++++++++ 14 files changed, 185 insertions(+), 46 deletions(-) create mode 100644 packages/websocket/src/socket-io/constants.ts create mode 100644 packages/websocket/src/tomcat/client.ts create mode 100644 packages/websocket/src/tomcat/constants.ts create mode 100644 packages/websocket/src/tomcat/server.ts diff --git a/packages/websocket/src/server/client.ts b/packages/websocket/src/server/client.ts index 0fa2f45d..3685da14 100644 --- a/packages/websocket/src/server/client.ts +++ b/packages/websocket/src/server/client.ts @@ -1,5 +1,5 @@ import { EventEmitter } from 'events' -import { SocketIO } from 'socket-io/interfaces'; +import { SocketIO } from '../socket-io/interfaces'; import { AttributeKeys } from './constants'; const TextWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.TextWebSocketFrame') diff --git a/packages/websocket/src/server/constants.ts b/packages/websocket/src/server/constants.ts index ac359bad..249709d0 100644 --- a/packages/websocket/src/server/constants.ts +++ b/packages/websocket/src/server/constants.ts @@ -1,12 +1,3 @@ -export enum ServerEvent { - detect = 'detect', - connect = 'connect', - connection = 'connection', - message = 'message', - error = 'error', - disconnect = 'disconnect' -} - const AttributeKey = Java.type('io.netty.util.AttributeKey'); export enum Keys { diff --git a/packages/websocket/src/server/index.ts b/packages/websocket/src/server/index.ts index d2a6496f..4b1cfb07 100644 --- a/packages/websocket/src/server/index.ts +++ b/packages/websocket/src/server/index.ts @@ -1,8 +1,10 @@ import { EventEmitter } from 'events' -import { NettyClient } from './client' import { ServerOptions } from '../socket-io' -import { ServerEvent, Keys } from './constants' +import { ServerEvent } from '../socket-io/constants' + +import { NettyClient } from './client' +import { Keys } from './constants' import { WebSocketDetect } from './websocket_detect' import { WebSocketHandler } from './websocket_handler' diff --git a/packages/websocket/src/server/text_websocket_frame.ts b/packages/websocket/src/server/text_websocket_frame.ts index a566633b..dd2e82b5 100644 --- a/packages/websocket/src/server/text_websocket_frame.ts +++ b/packages/websocket/src/server/text_websocket_frame.ts @@ -1,6 +1,6 @@ import { EventEmitter } from 'events' -import { ServerEvent } from './constants' import { ServerOptions } from '../socket-io'; +import { ServerEvent } from '../socket-io/constants' import { TextWebSocketFrameHandlerAdapter } from '../netty' export class TextWebSocketFrameHandler extends TextWebSocketFrameHandlerAdapter { diff --git a/packages/websocket/src/server/websocket_detect.ts b/packages/websocket/src/server/websocket_detect.ts index db5edc49..0cf06aab 100644 --- a/packages/websocket/src/server/websocket_detect.ts +++ b/packages/websocket/src/server/websocket_detect.ts @@ -1,6 +1,6 @@ import { EventEmitter } from 'events' import { WebSocketHandlerAdapter } from "../netty" -import { ServerEvent } from './constants' +import { ServerEvent } from '../socket-io/constants' export class WebSocketDetect extends WebSocketHandlerAdapter { private event: EventEmitter; diff --git a/packages/websocket/src/server/websocket_handler.ts b/packages/websocket/src/server/websocket_handler.ts index 11f9d9f8..cbc29ba0 100644 --- a/packages/websocket/src/server/websocket_handler.ts +++ b/packages/websocket/src/server/websocket_handler.ts @@ -1,5 +1,7 @@ import { ServerOptions } from '../socket-io' -import { Keys, ServerEvent } from './constants' +import { ServerEvent } from '../socket-io/constants' + +import { Keys } from './constants' import { HttpRequestHandler } from './httprequest' import { WebSocketHandlerAdapter } from "../netty" import { TextWebSocketFrameHandler } from './text_websocket_frame' diff --git a/packages/websocket/src/socket-io/client.ts b/packages/websocket/src/socket-io/client.ts index fdefb266..79c07178 100644 --- a/packages/websocket/src/socket-io/client.ts +++ b/packages/websocket/src/socket-io/client.ts @@ -1,30 +1,34 @@ import { EventEmitter } from 'events' import { Parser } from './parser' import { Packet } from './packet'; -import { NettyClient } from '../server'; import { SocketIO } from './interfaces' import { Server, Socket } from './index'; import { PacketTypes, SubPacketTypes } from './types'; +import { ServerEvent } from './constants'; const parser = new Parser(); export class Client extends EventEmitter implements SocketIO.Client { id: string; server: Server; - conn: NettyClient; + conn: SocketIO.EngineSocket; request: any; sockets: { [id: string]: Socket; }; nsps: { [nsp: string]: SocketIO.Socket; }; connectBuffer: any; - constructor(server: Server, nettyClient: NettyClient) { + constructor(server: Server, engine: SocketIO.EngineSocket) { super(); this.server = server; - this.conn = nettyClient; + this.conn = engine; this.id = this.conn.id + ''; - this.request = nettyClient.request; + this.request = engine.request; this.sockets = {}; this.nsps = {}; + + this.conn.on(ServerEvent.disconnect, (reason) => { + this.onclose(reason) + }) } connect(name, query) { if (this.server.nsps[name]) { @@ -79,11 +83,11 @@ export class Client extends EventEmitter implements SocketIO.Client { // this.decoder.destroy(); // clean up decoder } disconnect() { - // if ('open' == this.conn.readyState) { - // debug('forcing transport close'); - this.conn.close(); - this.onclose('forced server close'); - // } + if ('open' == this.conn.readyState) { + // debug('forcing transport close'); + this.conn.close(); + this.onclose('forced server close'); + } } remove(socket: Socket) { if (this.sockets.hasOwnProperty(socket.id)) { diff --git a/packages/websocket/src/socket-io/constants.ts b/packages/websocket/src/socket-io/constants.ts new file mode 100644 index 00000000..03a43fd3 --- /dev/null +++ b/packages/websocket/src/socket-io/constants.ts @@ -0,0 +1,8 @@ +export enum ServerEvent { + detect = 'detect', + connect = 'connect', + connection = 'connection', + message = 'message', + error = 'error', + disconnect = 'disconnect' +} \ No newline at end of file diff --git a/packages/websocket/src/socket-io/index.ts b/packages/websocket/src/socket-io/index.ts index efd1e641..20a4d68d 100644 --- a/packages/websocket/src/socket-io/index.ts +++ b/packages/websocket/src/socket-io/index.ts @@ -1,8 +1,6 @@ import { EventEmitter } from 'events' -import { NettyWebSocketServer, NettyClient } from '../server' -import { ServerEvent } from '../server/constants'; - +import { ServerEvent } from './constants'; import { Namespace } from './namespace'; import { Client } from './client'; import { SocketIO } from './interfaces' @@ -17,8 +15,12 @@ interface ServerOptions extends SocketIO.ServerOptions { root?: string; } +interface WebSocketServer extends EventEmitter { + close(): void +} + class Server implements SocketIO.Server { - private nettyServer: NettyWebSocketServer; + private websocketServer: WebSocketServer; private allClients: { [key: string]: Client }; engine: { ws: any; }; @@ -31,17 +33,24 @@ class Server implements SocketIO.Server { _adapter: Adapter; options: ServerOptions; - constructor(pipeline: any, options: ServerOptions) { - if (!pipeline) { throw new Error('Netty Pipeline can\'t be undefiend!') } + constructor(instance: any, options: ServerOptions) { + if (!instance) { throw new Error('instance can\'t be undefiend!') } this.allClients = {}; this.nsps = {}; this.sockets = new Namespace('/', this); this.nsps['/'] = this.sockets; - this.initNettyServer(pipeline, Object.assign({ - event: new EventEmitter(), - path: '/socket.io', - root: root + '/wwwroot' - }, options)); + if (instance.class.name.startsWith('io.netty.channel')) { + let { NettyWebSocketServer } = require("../server") + this.websocketServer = new NettyWebSocketServer(instance, Object.assign({ + event: new EventEmitter(), + path: '/socket.io', + root: root + '/wwwroot' + }, options)); + } else { + let { TomcatWebSocketServer } = require("../tomcat/server") + this.websocketServer = new TomcatWebSocketServer(instance, options); + } + this.initServer() } checkRequest(req: any, fn: (err: any, success: boolean) => void): void { @@ -114,7 +123,7 @@ class Server implements SocketIO.Server { for (let socket in this.sockets.sockets) { this.sockets.sockets[socket].onclose() } - this.nettyServer.close(); + this.websocketServer.close(); } on(event: "connection", listener: (socket: SocketIO.Socket) => void): SocketIO.Namespace; on(event: "connect", listener: (socket: SocketIO.Socket) => void): SocketIO.Namespace; @@ -152,17 +161,16 @@ class Server implements SocketIO.Server { fn(false); }; - private initNettyServer(pipeline, options) { - this.nettyServer = new NettyWebSocketServer(pipeline, options); - this.nettyServer.on(ServerEvent.connect, (nettyClient: NettyClient) => { - let client = new Client(this, nettyClient); + private initServer() { + this.websocketServer.on(ServerEvent.connect, (socket: SocketIO.EngineSocket) => { + let client = new Client(this, socket); this.onconnection(client); }) - this.nettyServer.on(ServerEvent.message, (nettyClient: NettyClient, text) => { - this.processPacket(this.parser.decode(text), this.allClients[nettyClient.id]); + this.websocketServer.on(ServerEvent.message, (socket: SocketIO.EngineSocket, text) => { + this.processPacket(this.parser.decode(text), this.allClients[socket.id]); }) - this.nettyServer.on(ServerEvent.error, (nettyClient: NettyClient, cause) => { - console.error(`Client ${nettyClient.id} cause error: ` + cause) + this.websocketServer.on(ServerEvent.error, (socket: SocketIO.EngineSocket, cause) => { + console.error(`Client ${socket.id} cause error: ` + cause) console.ex(cause) }) } diff --git a/packages/websocket/src/socket-io/interfaces.ts b/packages/websocket/src/socket-io/interfaces.ts index 04661f0f..f3da485a 100644 --- a/packages/websocket/src/socket-io/interfaces.ts +++ b/packages/websocket/src/socket-io/interfaces.ts @@ -830,5 +830,15 @@ export declare namespace SocketIO { * (Transport): transport reference */ transport: any; + + /** + * send + */ + send(text: string); + + /** + * close + */ + close(); } } diff --git a/packages/websocket/src/socket-io/namespace.ts b/packages/websocket/src/socket-io/namespace.ts index 7f63bd2f..5ce7debe 100644 --- a/packages/websocket/src/socket-io/namespace.ts +++ b/packages/websocket/src/socket-io/namespace.ts @@ -2,7 +2,7 @@ import { EventEmitter } from 'events' import { Client } from './client' import { SocketIO } from './interfaces'; -import { ServerEvent } from '../server'; +import { ServerEvent } from './constants'; import { Socket } from './socket'; import { Adapter } from './adapter'; import { Server } from './index' diff --git a/packages/websocket/src/tomcat/client.ts b/packages/websocket/src/tomcat/client.ts new file mode 100644 index 00000000..95d0b270 --- /dev/null +++ b/packages/websocket/src/tomcat/client.ts @@ -0,0 +1,44 @@ +import { EventEmitter } from 'events' +import { SocketIO } from '../socket-io/interfaces'; + +export class TomcatClient extends EventEmitter implements SocketIO.EngineSocket { + private _id: string; + private session: any + + server: any; + readyState: string; + remoteAddress: string; + upgraded: boolean; + request: any; + transport: any; + + constructor(server: any, session: any) { + super(); + this.server = server; + this.readyState = 'open'; + this.remoteAddress = session + '' + this.upgraded = true; + this.request = { + uri: () => { + return session.getRequestURI() + '' + }, + headers: () => { + return [] + } + }; + this.transport = null; + + this.session = session; + this._id = session.getId(); + } + + get id() { + return this._id; + } + send(text: string) { + this.session.getBasicRemote().sendText(text) + } + close() { + this.session.close(); + } +} diff --git a/packages/websocket/src/tomcat/constants.ts b/packages/websocket/src/tomcat/constants.ts new file mode 100644 index 00000000..25a81a68 --- /dev/null +++ b/packages/websocket/src/tomcat/constants.ts @@ -0,0 +1 @@ +export const ProxyBeanName = "webSocketServerProxy" diff --git a/packages/websocket/src/tomcat/server.ts b/packages/websocket/src/tomcat/server.ts new file mode 100644 index 00000000..64bb896e --- /dev/null +++ b/packages/websocket/src/tomcat/server.ts @@ -0,0 +1,69 @@ +import { EventEmitter } from 'events' + +import { ServerOptions } from '../socket-io' +import { ServerEvent } from '../socket-io/constants' +import { SocketIO } from '../socket-io/interfaces' +import { ProxyBeanName } from './constants' +import { TomcatClient } from './client' + +const WebSocketServerProxy = Java.type("com.sixi.framework.scriptservice.websocket.WebSocketServerProxy") +const ThreadPoolExecutor = Java.type('java.util.concurrent.ThreadPoolExecutor') +const ThreadPoolTaskExecutor = Java.type('org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor') + +interface TomcatWebSocketSession { + getId: () => number +} + +class TomcatWebSocketServer extends EventEmitter { + private beanFactory: any + private executor: any + private allClients: { [key: string]: SocketIO.EngineSocket } + + constructor(beanFactory: any, options: ServerOptions) { + super() + this.allClients = {} + this.beanFactory = beanFactory + this.initThreadPool() + try { this.beanFactory.destroySingleton(ProxyBeanName) } catch (error) { } + let NashornWebSocketServerProxy = Java.extend(WebSocketServerProxy, { + onOpen: (session: TomcatWebSocketSession) => { + let tomcatClient = new TomcatClient(this, session) + this.allClients[tomcatClient.id] = tomcatClient + this.emit(ServerEvent.connect, tomcatClient) + }, + onMessage: (message: any, session: TomcatWebSocketSession) => { + this.executor.execute(() => { + this.emit(ServerEvent.message, this.allClients[session.getId()], message) + }) + }, + onClose: (session: TomcatWebSocketSession, reason: any) => { + this.emit(ServerEvent.disconnect, this.allClients[session.getId()], reason) + }, + onError: (session: TomcatWebSocketSession, error: any) => { + this.emit(ServerEvent.error, this.allClients[session.getId()], error) + }, + }) + this.beanFactory.registerSingleton(ProxyBeanName, new NashornWebSocketServerProxy()) + } + private initThreadPool() { + this.executor = new ThreadPoolTaskExecutor() + this.executor.setCorePoolSize(10) + this.executor.setMaxPoolSize(100) + this.executor.setQueueCapacity(500) + this.executor.setKeepAliveSeconds(60) + this.executor.setThreadNamePrefix("@ccms/websocket-") + this.executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()) + this.executor.initialize() + } + close() { + Object.values(this.allClients).forEach(client => client.close()) + this.beanFactory.destroySingleton(ProxyBeanName) + this.executor.shutdown() + } +} + +export { + TomcatWebSocketServer, + ServerEvent, + TomcatClient +}