From 47478e13aaf4a7a1d5521a3ee9cd8319bf039ad1 Mon Sep 17 00:00:00 2001 From: MiaoWoo Date: Wed, 18 Nov 2020 16:21:56 +0800 Subject: [PATCH] refactor(websocket): upgrade socket.io Signed-off-by: MiaoWoo --- packages/websocket/src/index.ts | 56 +- packages/websocket/src/interfaces.ts | 5 + .../src/netty/adapter/httprequest.ts | 20 + packages/websocket/src/netty/adapter/index.ts | 3 + .../src/netty/adapter/text_websocket_frame.ts | 24 + .../src/netty/{ => adapter}/websocket.ts | 2 + .../websocket/src/{server => netty}/client.ts | 17 +- .../src/{server => netty}/constants.ts | 0 packages/websocket/src/netty/httprequest.ts | 73 +- packages/websocket/src/netty/index.ts | 73 +- .../src/netty/text_websocket_frame.ts | 39 +- .../src/{server => netty}/websocket_detect.ts | 8 +- .../{server => netty}/websocket_handler.ts | 9 +- packages/websocket/src/server/httprequest.ts | 61 -- packages/websocket/src/server/index.ts | 70 -- .../src/server/text_websocket_frame.ts | 23 - packages/websocket/src/socket-io/adapter.ts | 221 +++-- packages/websocket/src/socket-io/client.ts | 289 ++++-- packages/websocket/src/socket-io/index.ts | 680 +++++++++++--- .../websocket/src/socket-io/interfaces.ts | 844 ------------------ packages/websocket/src/socket-io/namespace.ts | 285 ++++-- .../src/socket-io/parent-namespace.ts | 40 + packages/websocket/src/socket-io/parser.ts | 107 +-- packages/websocket/src/socket-io/socket.ts | 564 ++++++++---- packages/websocket/src/tomcat/client.ts | 8 +- .../src/tomcat/{server.ts => index.ts} | 3 +- 26 files changed, 1844 insertions(+), 1680 deletions(-) create mode 100644 packages/websocket/src/interfaces.ts create mode 100644 packages/websocket/src/netty/adapter/httprequest.ts create mode 100644 packages/websocket/src/netty/adapter/index.ts create mode 100644 packages/websocket/src/netty/adapter/text_websocket_frame.ts rename packages/websocket/src/netty/{ => adapter}/websocket.ts (87%) rename packages/websocket/src/{server => netty}/client.ts (63%) rename packages/websocket/src/{server => netty}/constants.ts (100%) rename packages/websocket/src/{server => netty}/websocket_detect.ts (66%) rename packages/websocket/src/{server => netty}/websocket_handler.ts (87%) delete mode 100644 packages/websocket/src/server/httprequest.ts delete mode 100644 packages/websocket/src/server/index.ts delete mode 100644 packages/websocket/src/server/text_websocket_frame.ts delete mode 100644 packages/websocket/src/socket-io/interfaces.ts create mode 100644 packages/websocket/src/socket-io/parent-namespace.ts rename packages/websocket/src/tomcat/{server.ts => index.ts} (96%) diff --git a/packages/websocket/src/index.ts b/packages/websocket/src/index.ts index 5797d100..ac3ae019 100644 --- a/packages/websocket/src/index.ts +++ b/packages/websocket/src/index.ts @@ -4,43 +4,43 @@ import { Server, ServerOptions } from './socket-io' interface SocketIOStatic { - /** - * Default Server constructor - */ - (): Server + /** + * Default Server constructor + */ + (): Server - /** - * Creates a new Server - * @param srv The HTTP server that we're going to bind to - * @param opts An optional parameters object - */ - (srv: any, opts?: ServerOptions): Server + /** + * Creates a new Server + * @param srv The HTTP server that we're going to bind to + * @param opts An optional parameters object + */ + (srv: any, opts?: Partial): Server - /** - * Creates a new Server - * @param port A port to bind to, as a number, or a string - * @param An optional parameters object - */ - (port: string | number, opts?: ServerOptions): Server + /** + * Creates a new Server + * @param port A port to bind to, as a number, or a string + * @param An optional parameters object + */ + (port: string | number, opts?: Partial): Server - /** - * Creates a new Server - * @param A parameters object - */ - (opts: ServerOptions): Server + /** + * Creates a new Server + * @param A parameters object + */ + (opts: Partial): Server - /** - * Backwards compatibility - * @see io().listen() - */ - listen?: SocketIOStatic + /** + * Backwards compatibility + * @see io().listen() + */ + listen?: SocketIOStatic } type SocketStatic = SocketIOStatic & { Instance?: symbol } // @ts-ignore -let io: SocketStatic = function (pipeline: any, options: ServerOptions) { - return new Server(pipeline, options) +let io: SocketStatic = function (pipeline: any, options: Partial) { + return new Server(pipeline, options) } io.Instance = Symbol("@ccms/websocket") export default io diff --git a/packages/websocket/src/interfaces.ts b/packages/websocket/src/interfaces.ts new file mode 100644 index 00000000..f583f6fc --- /dev/null +++ b/packages/websocket/src/interfaces.ts @@ -0,0 +1,5 @@ +export interface InnerClient { + id: string + send(text: string) + close() +} diff --git a/packages/websocket/src/netty/adapter/httprequest.ts b/packages/websocket/src/netty/adapter/httprequest.ts new file mode 100644 index 00000000..b6523605 --- /dev/null +++ b/packages/websocket/src/netty/adapter/httprequest.ts @@ -0,0 +1,20 @@ +const TypeParameterMatcher = Java.type('io.netty.util.internal.TypeParameterMatcher') +const SimpleChannelInboundHandler = Java.type('io.netty.channel.SimpleChannelInboundHandler') +const FullHttpRequestMatcher = TypeParameterMatcher.get(base.getClass('io.netty.handler.codec.http.FullHttpRequest')) + +export abstract class HttpRequestHandlerAdapter { + private _Handler; + constructor() { + let HttpRequestHandlerAdapterImpl = Java.extend(SimpleChannelInboundHandler, { + acceptInboundMessage: (msg: any) => { + return FullHttpRequestMatcher.match(msg) + }, + channelRead0: this.channelRead0.bind(this) + }) + this._Handler = new HttpRequestHandlerAdapterImpl(); + } + abstract channelRead0(ctx: any, request: any); + getHandler() { + return this._Handler; + } +} diff --git a/packages/websocket/src/netty/adapter/index.ts b/packages/websocket/src/netty/adapter/index.ts new file mode 100644 index 00000000..366082f5 --- /dev/null +++ b/packages/websocket/src/netty/adapter/index.ts @@ -0,0 +1,3 @@ +export * from './text_websocket_frame' +export * from './websocket' +export * from './httprequest' \ No newline at end of file diff --git a/packages/websocket/src/netty/adapter/text_websocket_frame.ts b/packages/websocket/src/netty/adapter/text_websocket_frame.ts new file mode 100644 index 00000000..f444ad6d --- /dev/null +++ b/packages/websocket/src/netty/adapter/text_websocket_frame.ts @@ -0,0 +1,24 @@ +const TypeParameterMatcher = Java.type('io.netty.util.internal.TypeParameterMatcher') +const TextWebSocketFrameMatcher = TypeParameterMatcher.get(base.getClass('io.netty.handler.codec.http.websocketx.TextWebSocketFrame')) +const SimpleChannelInboundHandler = Java.type('io.netty.channel.SimpleChannelInboundHandler') + +export abstract class TextWebSocketFrameHandlerAdapter { + private _Handler + constructor() { + let TextWebSocketFrameHandlerAdapterImpl = Java.extend(SimpleChannelInboundHandler, { + userEventTriggered: this.userEventTriggered.bind(this), + acceptInboundMessage: (msg: any) => { + return TextWebSocketFrameMatcher.match(msg) + }, + channelRead0: this.channelRead0.bind(this), + exceptionCaught: this.exceptionCaught.bind(this) + }) + this._Handler = new TextWebSocketFrameHandlerAdapterImpl() + } + abstract userEventTriggered(ctx: any, evt: any) + abstract channelRead0(ctx: any, msg: any) + abstract exceptionCaught(ctx: any, cause: Error) + getHandler() { + return this._Handler + } +} diff --git a/packages/websocket/src/netty/websocket.ts b/packages/websocket/src/netty/adapter/websocket.ts similarity index 87% rename from packages/websocket/src/netty/websocket.ts rename to packages/websocket/src/netty/adapter/websocket.ts index e1e15ca6..f2dd285e 100644 --- a/packages/websocket/src/netty/websocket.ts +++ b/packages/websocket/src/netty/adapter/websocket.ts @@ -5,12 +5,14 @@ export abstract class WebSocketHandlerAdapter { constructor() { let ChannelInboundHandlerAdapterImpl = Java.extend(ChannelInboundHandlerAdapter, { channelRead: this.channelRead.bind(this), + channelInactive: this.channelInactive.bind(this), channelUnregistered: this.exceptionCaught.bind(this), exceptionCaught: this.exceptionCaught.bind(this) }) this._Handler = new ChannelInboundHandlerAdapterImpl() } abstract channelRead(ctx: any, channel: any) + abstract channelInactive(ctx: any) abstract channelUnregistered(ctx: any) abstract exceptionCaught(ctx: any, cause: Error) getHandler() { diff --git a/packages/websocket/src/server/client.ts b/packages/websocket/src/netty/client.ts similarity index 63% rename from packages/websocket/src/server/client.ts rename to packages/websocket/src/netty/client.ts index f49df917..4882c955 100644 --- a/packages/websocket/src/server/client.ts +++ b/packages/websocket/src/netty/client.ts @@ -1,10 +1,10 @@ import { EventEmitter } from 'events' -import { SocketIO } from '../socket-io/interfaces' +import { InnerClient } from '../interfaces' import { AttributeKeys } from './constants' const TextWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.TextWebSocketFrame') -export class NettyClient extends EventEmitter implements SocketIO.EngineSocket { +export class NettyClient extends EventEmitter implements InnerClient { private _id: string private channel: any @@ -13,7 +13,6 @@ export class NettyClient extends EventEmitter implements SocketIO.EngineSocket { remoteAddress: string upgraded: boolean request: any - transport: any constructor(server: any, channel: any) { super() @@ -22,7 +21,6 @@ export class NettyClient extends EventEmitter implements SocketIO.EngineSocket { this.remoteAddress = channel.remoteAddress() + '' this.upgraded = true this.request = channel.attr(AttributeKeys.Request).get() - this.transport = null this.channel = channel this._id = channel.id() + '' @@ -32,9 +30,16 @@ export class NettyClient extends EventEmitter implements SocketIO.EngineSocket { return this._id } send(text: string) { - this.channel.writeAndFlush(new TextWebSocketFrame(text)) + if (this.readyState == 'open') { + this.channel.writeAndFlush(new TextWebSocketFrame(text)) + } else { + console.debug(`send message ${text} to close client ${this._id}`) + } } close() { - this.channel.close() + if (this.readyState = 'open') { + this.channel.close() + this.readyState = 'close' + } } } diff --git a/packages/websocket/src/server/constants.ts b/packages/websocket/src/netty/constants.ts similarity index 100% rename from packages/websocket/src/server/constants.ts rename to packages/websocket/src/netty/constants.ts diff --git a/packages/websocket/src/netty/httprequest.ts b/packages/websocket/src/netty/httprequest.ts index b6523605..89fc9a26 100644 --- a/packages/websocket/src/netty/httprequest.ts +++ b/packages/websocket/src/netty/httprequest.ts @@ -1,20 +1,61 @@ -const TypeParameterMatcher = Java.type('io.netty.util.internal.TypeParameterMatcher') -const SimpleChannelInboundHandler = Java.type('io.netty.channel.SimpleChannelInboundHandler') -const FullHttpRequestMatcher = TypeParameterMatcher.get(base.getClass('io.netty.handler.codec.http.FullHttpRequest')) +import { HttpRequestHandlerAdapter } from './adapter' +import { AttributeKeys } from './constants' +import { ServerOptions } from 'socket-io' -export abstract class HttpRequestHandlerAdapter { - private _Handler; - constructor() { - let HttpRequestHandlerAdapterImpl = Java.extend(SimpleChannelInboundHandler, { - acceptInboundMessage: (msg: any) => { - return FullHttpRequestMatcher.match(msg) - }, - channelRead0: this.channelRead0.bind(this) - }) - this._Handler = new HttpRequestHandlerAdapterImpl(); +const DefaultHttpResponse = Java.type('io.netty.handler.codec.http.DefaultHttpResponse') +const DefaultFullHttpResponse = Java.type('io.netty.handler.codec.http.DefaultFullHttpResponse') +const HttpHeaders = Java.type('io.netty.handler.codec.http.HttpHeaders') +const HttpVersion = Java.type('io.netty.handler.codec.http.HttpVersion') +const HttpResponseStatus = Java.type('io.netty.handler.codec.http.HttpResponseStatus') +const LastHttpContent = Java.type('io.netty.handler.codec.http.LastHttpContent') + +const File = Java.type('java.io.File') +const Runnable = Java.type('java.lang.Runnable') +const RandomAccessFile = Java.type('java.io.RandomAccessFile') +const DefaultFileRegion = Java.type('io.netty.channel.DefaultFileRegion') +const ChannelFutureListener = Java.type('io.netty.channel.ChannelFutureListener') + +export class HttpRequestHandler extends HttpRequestHandlerAdapter { + private ws: string + private root: string + constructor(options: ServerOptions) { + super() + this.root = options.root + this.ws = options.path } - abstract channelRead0(ctx: any, request: any); - getHandler() { - return this._Handler; + channelRead0(ctx: any, request: any) { + if (request.getUri().startsWith(this.ws)) { + ctx.channel().attr(AttributeKeys.Request).set(request) + ctx.fireChannelRead(request.retain()) + } else { + ctx.executor().execute(new Runnable({ + run: () => { + if (HttpHeaders.is100ContinueExpected(request)) { + ctx.writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE)) + } + let filename = request.getUri().split('?')[0].substr(1) + let file = new File(this.root, filename || 'index.html') + if (!file.exists() || !file.isFile()) { + ctx.write(new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.NOT_FOUND)) + ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(ChannelFutureListener.CLOSE) + return + } + let response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK) + response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html charset=UTF-8") + let raf = new RandomAccessFile(file, 'r') + let keepAlive = HttpHeaders.isKeepAlive(request) + if (keepAlive) { + response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length()) + response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE) + } + ctx.write(response) + ctx.write(new DefaultFileRegion(raf.getChannel(), 0, raf.length())) + let future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT) + if (!keepAlive) { + future.addListener(ChannelFutureListener.CLOSE) + } + } + })) + } } } diff --git a/packages/websocket/src/netty/index.ts b/packages/websocket/src/netty/index.ts index 366082f5..7dc96fb1 100644 --- a/packages/websocket/src/netty/index.ts +++ b/packages/websocket/src/netty/index.ts @@ -1,3 +1,70 @@ -export * from './text_websocket_frame' -export * from './websocket' -export * from './httprequest' \ No newline at end of file +import { EventEmitter } from 'events' + +import { ServerOptions } from '../socket-io' +import { ServerEvent } from '../socket-io/constants' + +import { NettyClient } from './client' +import { Keys } from './constants' +import { WebSocketDetect } from './websocket_detect' +import { WebSocketHandler } from './websocket_handler' + +class NettyWebSocketServer extends EventEmitter { + private pipeline: any + private clients: Map + + constructor(pipeline: any, options: ServerOptions) { + super() + this.clients = new Map() + this.pipeline = pipeline + let connectEvent = options.event + try { this.pipeline.remove(Keys.Detect) } catch (error) { } + this.pipeline.addFirst(Keys.Detect, new WebSocketDetect(connectEvent).getHandler()) + connectEvent.on(ServerEvent.detect, (ctx, channel) => { + channel.pipeline().addFirst(Keys.Handler, new WebSocketHandler(options).getHandler()) + ctx.fireChannelRead(channel) + }) + connectEvent.on(ServerEvent.connect, (ctx) => { + let cid = ctx?.channel().id() + '' + let nettyClient = new NettyClient(this, ctx.channel()) + this.clients.set(cid, nettyClient) + this.emit(ServerEvent.connect, nettyClient) + }) + connectEvent.on(ServerEvent.message, (ctx, msg) => { + let cid = ctx?.channel().id() + '' + if (this.clients.has(cid)) { + this.emit(ServerEvent.message, this.clients.get(cid), msg.text()) + } else { + console.error(`unknow client ${ctx} reciver message ${msg.text()}`) + } + }) + connectEvent.on(ServerEvent.disconnect, (ctx, cause) => { + let cid = ctx?.channel().id() + '' + if (this.clients.has(cid)) { + this.emit(ServerEvent.disconnect, this.clients.get(cid), cause) + } else { + console.error(`unknow client ${ctx} disconnect cause ${cause}`) + } + }) + connectEvent.on(ServerEvent.error, (ctx, cause) => { + let cid = ctx?.channel().id() + '' + if (this.clients.has(cid)) { + this.emit(ServerEvent.error, this.clients.get(cid), cause) + } else { + console.error(`unknow client ${ctx} cause error ${cause}`) + console.ex(cause) + } + }) + } + close() { + if (this.pipeline.names().contains(Keys.Detect)) { + this.pipeline.remove(Keys.Detect) + } + this.clients.forEach(client => client.close()) + } +} + +export { + NettyWebSocketServer, + ServerEvent, + NettyClient +} diff --git a/packages/websocket/src/netty/text_websocket_frame.ts b/packages/websocket/src/netty/text_websocket_frame.ts index f444ad6d..deaec789 100644 --- a/packages/websocket/src/netty/text_websocket_frame.ts +++ b/packages/websocket/src/netty/text_websocket_frame.ts @@ -1,24 +1,23 @@ -const TypeParameterMatcher = Java.type('io.netty.util.internal.TypeParameterMatcher') -const TextWebSocketFrameMatcher = TypeParameterMatcher.get(base.getClass('io.netty.handler.codec.http.websocketx.TextWebSocketFrame')) -const SimpleChannelInboundHandler = Java.type('io.netty.channel.SimpleChannelInboundHandler') +import { EventEmitter } from 'events' +import { ServerOptions } from '../socket-io' +import { ServerEvent } from '../socket-io/constants' +import { TextWebSocketFrameHandlerAdapter } from './adapter' -export abstract class TextWebSocketFrameHandlerAdapter { - private _Handler - constructor() { - let TextWebSocketFrameHandlerAdapterImpl = Java.extend(SimpleChannelInboundHandler, { - userEventTriggered: this.userEventTriggered.bind(this), - acceptInboundMessage: (msg: any) => { - return TextWebSocketFrameMatcher.match(msg) - }, - channelRead0: this.channelRead0.bind(this), - exceptionCaught: this.exceptionCaught.bind(this) - }) - this._Handler = new TextWebSocketFrameHandlerAdapterImpl() +export class TextWebSocketFrameHandler extends TextWebSocketFrameHandlerAdapter { + private event: EventEmitter + constructor(options: ServerOptions) { + super() + this.event = options.event } - abstract userEventTriggered(ctx: any, evt: any) - abstract channelRead0(ctx: any, msg: any) - abstract exceptionCaught(ctx: any, cause: Error) - getHandler() { - return this._Handler + userEventTriggered(ctx: any, evt: any) { + if (evt == 'HANDSHAKE_COMPLETE') { + this.event.emit(ServerEvent.connect, ctx) + } + } + channelRead0(ctx: any, msg: any) { + this.event.emit(ServerEvent.message, ctx, msg) + } + exceptionCaught(ctx: any, cause: Error) { + this.event.emit(ServerEvent.error, ctx, cause) } } diff --git a/packages/websocket/src/server/websocket_detect.ts b/packages/websocket/src/netty/websocket_detect.ts similarity index 66% rename from packages/websocket/src/server/websocket_detect.ts rename to packages/websocket/src/netty/websocket_detect.ts index a321e769..a5a510ec 100644 --- a/packages/websocket/src/server/websocket_detect.ts +++ b/packages/websocket/src/netty/websocket_detect.ts @@ -1,5 +1,5 @@ import { EventEmitter } from 'events' -import { WebSocketHandlerAdapter } from "../netty" +import { WebSocketHandlerAdapter } from "./adapter" import { ServerEvent } from '../socket-io/constants' export class WebSocketDetect extends WebSocketHandlerAdapter { @@ -11,7 +11,13 @@ export class WebSocketDetect extends WebSocketHandlerAdapter { channelRead(ctx: any, channel: any) { this.event.emit(ServerEvent.detect, ctx, channel) } + channelInactive(ctx: any) { + console.debug('WebSocketDetect channelUnregistered ' + ctx) + this.event.emit(ServerEvent.disconnect, ctx, 'client disconnect') + ctx.channelInactive() + } channelUnregistered(ctx: any) { + console.debug('WebSocketDetect channelUnregistered ' + ctx) this.event.emit(ServerEvent.disconnect, ctx, 'client disconnect') ctx.fireChannelUnregistered() } diff --git a/packages/websocket/src/server/websocket_handler.ts b/packages/websocket/src/netty/websocket_handler.ts similarity index 87% rename from packages/websocket/src/server/websocket_handler.ts rename to packages/websocket/src/netty/websocket_handler.ts index e2ed4dd7..ba30bb43 100644 --- a/packages/websocket/src/server/websocket_handler.ts +++ b/packages/websocket/src/netty/websocket_handler.ts @@ -3,7 +3,7 @@ import { ServerEvent } from '../socket-io/constants' import { Keys } from './constants' import { HttpRequestHandler } from './httprequest' -import { WebSocketHandlerAdapter } from "../netty" +import { WebSocketHandlerAdapter } from "./adapter" import { TextWebSocketFrameHandler } from './text_websocket_frame' const CharsetUtil = Java.type('io.netty.util.CharsetUtil') @@ -41,7 +41,14 @@ export class WebSocketHandler extends WebSocketHandlerAdapter { ctx.fireChannelRead(msg) } + channelInactive(ctx: any) { + console.debug('WebSocketHandler channelInactive ' + ctx) + this.options.event.emit(ServerEvent.disconnect, ctx, 'client disconnect') + ctx.channelInactive() + } + channelUnregistered(ctx: any) { + console.debug('WebSocketHandler channelUnregistered ' + ctx) this.options.event.emit(ServerEvent.disconnect, ctx, 'client disconnect') ctx.fireChannelUnregistered() } diff --git a/packages/websocket/src/server/httprequest.ts b/packages/websocket/src/server/httprequest.ts deleted file mode 100644 index 1acae6b5..00000000 --- a/packages/websocket/src/server/httprequest.ts +++ /dev/null @@ -1,61 +0,0 @@ -import { HttpRequestHandlerAdapter } from '../netty' -import { AttributeKeys } from './constants' -import { ServerOptions } from 'socket-io' - -const DefaultHttpResponse = Java.type('io.netty.handler.codec.http.DefaultHttpResponse') -const DefaultFullHttpResponse = Java.type('io.netty.handler.codec.http.DefaultFullHttpResponse') -const HttpHeaders = Java.type('io.netty.handler.codec.http.HttpHeaders') -const HttpVersion = Java.type('io.netty.handler.codec.http.HttpVersion') -const HttpResponseStatus = Java.type('io.netty.handler.codec.http.HttpResponseStatus') -const LastHttpContent = Java.type('io.netty.handler.codec.http.LastHttpContent') - -const File = Java.type('java.io.File') -const Runnable = Java.type('java.lang.Runnable') -const RandomAccessFile = Java.type('java.io.RandomAccessFile') -const DefaultFileRegion = Java.type('io.netty.channel.DefaultFileRegion') -const ChannelFutureListener = Java.type('io.netty.channel.ChannelFutureListener') - -export class HttpRequestHandler extends HttpRequestHandlerAdapter { - private ws: string; - private root: string; - constructor(options: ServerOptions) { - super() - this.root = options.root; - this.ws = options.path; - } - channelRead0(ctx: any, request: any) { - if (request.getUri().startsWith(this.ws)) { - ctx.channel().attr(AttributeKeys.Request).set(request); - ctx.fireChannelRead(request.retain()) - } else { - ctx.executor().execute(new Runnable({ - run: () => { - if (HttpHeaders.is100ContinueExpected(request)) { - ctx.writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE)) - } - let filename = request.getUri().split('?')[0].substr(1) - let file = new File(this.root, filename || 'index.html') - if (!file.exists() || !file.isFile()) { - ctx.write(new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.NOT_FOUND)) - ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(ChannelFutureListener.CLOSE) - return - } - let response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK) - response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html charset=UTF-8") - let raf = new RandomAccessFile(file, 'r') - let keepAlive = HttpHeaders.isKeepAlive(request) - if (keepAlive) { - response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length()) - response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE) - } - ctx.write(response) - ctx.write(new DefaultFileRegion(raf.getChannel(), 0, raf.length())) - let future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT) - if (!keepAlive) { - future.addListener(ChannelFutureListener.CLOSE) - } - } - })) - } - } -} diff --git a/packages/websocket/src/server/index.ts b/packages/websocket/src/server/index.ts deleted file mode 100644 index 7dc96fb1..00000000 --- a/packages/websocket/src/server/index.ts +++ /dev/null @@ -1,70 +0,0 @@ -import { EventEmitter } from 'events' - -import { ServerOptions } from '../socket-io' -import { ServerEvent } from '../socket-io/constants' - -import { NettyClient } from './client' -import { Keys } from './constants' -import { WebSocketDetect } from './websocket_detect' -import { WebSocketHandler } from './websocket_handler' - -class NettyWebSocketServer extends EventEmitter { - private pipeline: any - private clients: Map - - constructor(pipeline: any, options: ServerOptions) { - super() - this.clients = new Map() - this.pipeline = pipeline - let connectEvent = options.event - try { this.pipeline.remove(Keys.Detect) } catch (error) { } - this.pipeline.addFirst(Keys.Detect, new WebSocketDetect(connectEvent).getHandler()) - connectEvent.on(ServerEvent.detect, (ctx, channel) => { - channel.pipeline().addFirst(Keys.Handler, new WebSocketHandler(options).getHandler()) - ctx.fireChannelRead(channel) - }) - connectEvent.on(ServerEvent.connect, (ctx) => { - let cid = ctx?.channel().id() + '' - let nettyClient = new NettyClient(this, ctx.channel()) - this.clients.set(cid, nettyClient) - this.emit(ServerEvent.connect, nettyClient) - }) - connectEvent.on(ServerEvent.message, (ctx, msg) => { - let cid = ctx?.channel().id() + '' - if (this.clients.has(cid)) { - this.emit(ServerEvent.message, this.clients.get(cid), msg.text()) - } else { - console.error(`unknow client ${ctx} reciver message ${msg.text()}`) - } - }) - connectEvent.on(ServerEvent.disconnect, (ctx, cause) => { - let cid = ctx?.channel().id() + '' - if (this.clients.has(cid)) { - this.emit(ServerEvent.disconnect, this.clients.get(cid), cause) - } else { - console.error(`unknow client ${ctx} disconnect cause ${cause}`) - } - }) - connectEvent.on(ServerEvent.error, (ctx, cause) => { - let cid = ctx?.channel().id() + '' - if (this.clients.has(cid)) { - this.emit(ServerEvent.error, this.clients.get(cid), cause) - } else { - console.error(`unknow client ${ctx} cause error ${cause}`) - console.ex(cause) - } - }) - } - close() { - if (this.pipeline.names().contains(Keys.Detect)) { - this.pipeline.remove(Keys.Detect) - } - this.clients.forEach(client => client.close()) - } -} - -export { - NettyWebSocketServer, - ServerEvent, - NettyClient -} diff --git a/packages/websocket/src/server/text_websocket_frame.ts b/packages/websocket/src/server/text_websocket_frame.ts deleted file mode 100644 index dd2e82b5..00000000 --- a/packages/websocket/src/server/text_websocket_frame.ts +++ /dev/null @@ -1,23 +0,0 @@ -import { EventEmitter } from 'events' -import { ServerOptions } from '../socket-io'; -import { ServerEvent } from '../socket-io/constants' -import { TextWebSocketFrameHandlerAdapter } from '../netty' - -export class TextWebSocketFrameHandler extends TextWebSocketFrameHandlerAdapter { - private event: EventEmitter; - constructor(options: ServerOptions) { - super() - this.event = options.event; - } - userEventTriggered(ctx: any, evt: any) { - if (evt == 'HANDSHAKE_COMPLETE') { - this.event.emit(ServerEvent.connect, ctx) - } - } - channelRead0(ctx: any, msg: any) { - this.event.emit(ServerEvent.message, ctx, msg) - } - exceptionCaught(ctx: any, cause: Error) { - this.event.emit(ServerEvent.error, ctx, cause) - } -} diff --git a/packages/websocket/src/socket-io/adapter.ts b/packages/websocket/src/socket-io/adapter.ts index 3e4940a7..efd25120 100644 --- a/packages/websocket/src/socket-io/adapter.ts +++ b/packages/websocket/src/socket-io/adapter.ts @@ -1,24 +1,39 @@ import { EventEmitter } from 'events' -import { SocketIO } from './interfaces'; -import { Namespace } from './namespace'; -import { Parser } from './parser'; -import { Socket } from './socket'; +import { Namespace } from './namespace' +import { Parser } from './parser' +import { Socket } from './socket' -export class Adapter extends EventEmitter implements SocketIO.Adapter { - nsp: Namespace; - rooms: Rooms; - sids: { [id: string]: { [room: string]: boolean; }; }; +export type SocketId = string +export type Room = string + +export interface BroadcastFlags { + volatile?: boolean + compress?: boolean + local?: boolean + broadcast?: boolean + binary?: boolean +} + +export interface BroadcastOptions { + rooms: Set + except?: Set + flags?: BroadcastFlags +} + +export class Adapter extends EventEmitter implements Adapter { + rooms: Map> + sids: Map> + private readonly encoder: Parser parser: Parser - constructor(nsp: Namespace) { + + constructor(readonly nsp: Namespace) { super() - this.nsp = nsp; - this.rooms = new Rooms(); - this.sids = {}; - this.parser = nsp.server.parser; - } - add(id: string, room: string, callback?: (err?: any) => void): void { - return this.addAll(id, [room], callback); + this.rooms = new Map() + this.sids = new Map() + this.parser = nsp.server._parser + this.encoder = this.parser } + /** * Adds a socket to a list of room. * @@ -27,101 +42,123 @@ export class Adapter extends EventEmitter implements SocketIO.Adapter { * @param {Function} callback * @api public */ - addAll(id: string, rooms: string | any[], fn: { (err?: any): void; bind?: any; }) { - for (var i = 0; i < rooms.length; i++) { - var room = rooms[i]; - this.sids[id] = this.sids[id] || {}; - this.sids[id][room] = true; - this.rooms[room] = this.rooms[room] || new Room(); - this.rooms[room].add(id); - } - fn && fn.bind(null, null) - }; - del(id: string, room: string, callback?: (err?: any) => void): void { - if (this.sids[id]) delete this.sids[id][room]; + addAll(id: SocketId, rooms: Set): Promise | void { + for (const room of rooms) { + if (!this.sids.has(id)) { + this.sids.set(id, new Set()) + } + this.sids.get(id).add(room) - if (this.rooms.hasOwnProperty(room)) { - this.rooms[room].del(id); - if (this.rooms[room].length === 0) delete this.rooms[room]; + if (!this.rooms.has(room)) { + this.rooms.set(room, new Set()) + } + this.rooms.get(room).add(id) + } + } + del(id: string, room: string, callback?: (err?: any) => void): void { + if (this.sids.has(id)) { + this.sids.get(id).delete(room) + } + + if (this.rooms.has(room)) { + this.rooms.get(room).delete(id) + if (this.rooms.get(room).size === 0) this.rooms.delete(room) } callback && callback.bind(null, null) } delAll(id: string): void { - var rooms = this.sids[id]; - if (rooms) { - for (var room in rooms) { - if (this.rooms.hasOwnProperty(room)) { - this.rooms[room].del(id); - if (this.rooms[room].length === 0) delete this.rooms[room]; - } + if (!this.sids.has(id)) { + return + } + + for (const room of this.sids.get(id)) { + if (this.rooms.has(room)) { + this.rooms.get(room).delete(id) + if (this.rooms.get(room).size === 0) this.rooms.delete(room) } } - delete this.sids[id]; + + this.sids.delete(id) } - broadcast(packet: any, opts: { rooms?: string[]; except?: string[]; flags?: { [flag: string]: boolean; }; }): void { - var rooms = opts.rooms || []; - var except = opts.except || []; - var flags = opts.flags || {}; - var packetOpts = { + + /** + * Broadcasts a packet. + * + * Options: + * - `flags` {Object} flags for this packet + * - `except` {Array} sids that should be excluded + * - `rooms` {Array} list of rooms to broadcast to + * + * @param {Object} packet the packet object + * @param {Object} opts the options + * @public + */ + public broadcast(packet: any, opts: BroadcastOptions): void { + const rooms = opts.rooms + const except = opts.except || new Set() + const flags = opts.flags || {} + const packetOpts = { preEncoded: true, volatile: flags.volatile, compress: flags.compress - }; - var ids = {}; - var self = this; - var socket: Socket; + } + const ids = new Set() - packet.nsp = this.nsp.name; - let encodedPackets = this.parser.encode(packet) - if (rooms.length) { - for (var i = 0; i < rooms.length; i++) { - var room = self.rooms[rooms[i]]; - if (!room) continue; - var sockets = room.sockets; - for (var id in sockets) { - if (sockets.hasOwnProperty(id)) { - if (ids[id] || ~except.indexOf(id)) continue; - socket = self.nsp.connected[id]; - if (socket) { - socket.packet(encodedPackets as any, packetOpts); - ids[id] = true; - } + packet.nsp = this.nsp.name + const encodedPackets = this.encoder.encode(packet) + + if (rooms.size) { + for (const room of rooms) { + if (!this.rooms.has(room)) continue + + for (const id of this.rooms.get(room)) { + if (ids.has(id) || except.has(id)) continue + const socket = this.nsp.sockets.get(id) + if (socket) { + socket.packet(encodedPackets as any, packetOpts) + ids.add(id) } } } } else { - for (var id in self.sids) { - if (self.sids.hasOwnProperty(id)) { - if (~except.indexOf(id)) continue; - socket = self.nsp.connected[id]; - if (socket) socket.packet(encodedPackets as any, packetOpts); - } + for (const [id] of this.sids) { + if (except.has(id)) continue + const socket = this.nsp.sockets.get(id) + if (socket) socket.packet(encodedPackets as any, packetOpts) } } } -} -class Rooms implements SocketIO.Rooms { - [room: string]: Room; -} + /** + * Gets a list of sockets by sid. + * + * @param {Set} rooms the explicit set of rooms to check. + */ + public sockets(rooms: Set): Promise> { + const sids = new Set() + if (rooms.size) { + for (const room of rooms) { + if (!this.rooms.has(room)) continue + for (const id of this.rooms.get(room)) { + if (this.nsp.sockets.has(id)) { + sids.add(id) + } + } + } + } else { + for (const [id] of this.sids) { + if (this.nsp.sockets.has(id)) sids.add(id) + } + } + return Promise.resolve(sids) + } -class Room implements SocketIO.Room { - sockets: { [id: string]: boolean; }; - length: number; - constructor() { - this.sockets = {}; - this.length = 0; + /** + * Gets the list of rooms a given socket has joined. + * + * @param {SocketId} id the socket id + */ + public socketRooms(id: SocketId): Set | undefined { + return this.sids.get(id) } - add(id) { - if (!this.sockets.hasOwnProperty(id)) { - this.sockets[id] = true; - this.length++; - } - } - del(id) { - if (this.sockets.hasOwnProperty(id)) { - delete this.sockets[id]; - this.length--; - } - } -} \ No newline at end of file +} diff --git a/packages/websocket/src/socket-io/client.ts b/packages/websocket/src/socket-io/client.ts index 79c07178..6b529af9 100644 --- a/packages/websocket/src/socket-io/client.ts +++ b/packages/websocket/src/socket-io/client.ts @@ -1,103 +1,242 @@ import { EventEmitter } from 'events' import { Parser } from './parser' -import { Packet } from './packet'; -import { SocketIO } from './interfaces' -import { Server, Socket } from './index'; -import { PacketTypes, SubPacketTypes } from './types'; -import { ServerEvent } from './constants'; +import { Packet } from './packet' +import { Namespace, Server, Socket } from './index' +import { PacketTypes, SubPacketTypes } from './types' +import { ServerEvent } from './constants' +import { SocketId } from './adapter' -const parser = new Parser(); +const parser = new Parser() -export class Client extends EventEmitter implements SocketIO.Client { - id: string; - server: Server; - conn: SocketIO.EngineSocket; - request: any; - sockets: { [id: string]: Socket; }; - nsps: { [nsp: string]: SocketIO.Socket; }; - connectBuffer: any; - - constructor(server: Server, engine: SocketIO.EngineSocket) { - super(); - this.server = server; - this.conn = engine; - this.id = this.conn.id + ''; - this.request = engine.request; - this.sockets = {}; - this.nsps = {}; +export class Client extends EventEmitter implements Client { + public readonly conn + /** + * @private + */ + readonly id: string + private readonly server: Server + // private readonly encoder: Encoder + private readonly decoder: any + private sockets: Map + private nsps: Map + private connectTimeout: NodeJS.Timeout + constructor(server: Server, conn) { + super() + this.server = server + this.conn = conn + // this.encoder = server.encoder + this.decoder = server._parser + this.id = this.conn.id + '' + this.setup() + // ============================= + this.sockets = new Map() + this.nsps = new Map() this.conn.on(ServerEvent.disconnect, (reason) => { this.onclose(reason) }) } - connect(name, query) { - if (this.server.nsps[name]) { - // console.debug(`connecting to namespace ${name}`); - return this.doConnect(name, query); - } - this.server.checkNamespace(name, query, (dynamicNsp) => { - if (dynamicNsp) { - // console.debug('dynamic namespace %s was created', dynamicNsp.name); - this.doConnect(name, query); + /** + * @return the reference to the request that originated the Engine.IO connection + * + * @public + */ + public get request(): any /**IncomingMessage */ { + return this.conn.request + } + /** + * Sets up event listeners. + * + * @private + */ + private setup() { + // @ts-ignore + // this.decoder.on("decoded", this.ondecoded) + this.conn.on("data", this.ondata.bind(this)) + this.conn.on("error", this.onerror.bind(this)) + this.conn.on("close", this.onclose.bind(this)) + console.debug(`setup client ${this.id}`) + this.connectTimeout = setTimeout(() => { + if (this.nsps.size === 0) { + console.debug("no namespace joined yet, close the client") + this.close() } else { - // console.debug('creation of namespace %s was denied', name); - this.packet({ + console.debug("the client has already joined a namespace, nothing to do") + } + }, this.server._connectTimeout) + } + /** + * Connects a client to a namespace. + * + * @param {String} name - the namespace + * @param {Object} auth - the auth parameters + * @private + */ + private connect(name: string, auth: object = {}) { + console.debug(`client ${this.id} connecting to namespace ${name} has: ${this.server._nsps[name]}`) + if (this.server._nsps.has(name)) { + return this.doConnect(name, auth) + } + + this.server._checkNamespace(name, auth, (dynamicNsp: Namespace) => { + if (dynamicNsp) { + console.debug(`dynamic namespace ${dynamicNsp.name} was created`) + this.doConnect(name, auth) + } else { + console.debug(`creation of namespace ${name} was denied`) + this._packet({ type: PacketTypes.MESSAGE, sub_type: SubPacketTypes.ERROR, nsp: name, - data: 'Invalid namespace' - }); + data: { + message: "Invalid namespace" + } + }) } }) } - doConnect(name, query) { - var nsp = this.server.of(name); - if ('/' != name && !this.nsps['/']) { - this.connectBuffer.push(name); - return; + doConnect(name, auth: object) { + if (this.connectTimeout) { + clearTimeout(this.connectTimeout) + this.connectTimeout = null } - var socket = nsp.add(this, query, () => { - this.sockets[socket.id] = socket; - this.nsps[nsp.name] = socket; + const nsp = this.server.of(name) - if ('/' == nsp.name && this.connectBuffer.length > 0) { - this.connectBuffer.forEach(this.connect, this); - this.connectBuffer = []; - } - }); + const socket = nsp._add(this, auth, () => { + console.debug(`doConnect set sockets ${socket.id}`) + this.sockets.set(socket.id, socket) + console.debug(`doConnect set nsps ${nsp.name}`) + this.nsps.set(nsp.name, socket) + }) } - packet(packet: Packet, opts: any = { preEncoded: false }) { + /** + * Disconnects from all namespaces and closes transport. + * + * @private + */ + _disconnect() { + for (const socket of this.sockets.values()) { + socket.disconnect() + } + this.sockets.clear() + this.close() + } + /** + * Removes a socket. Called by each `Socket`. + * + * @private + */ + _remove(socket: Socket) { + if (this.sockets.has(socket.id)) { + const nsp = this.sockets.get(socket.id).nsp.name + this.sockets.delete(socket.id) + this.nsps.delete(nsp) + } else { + console.debug(`ignoring remove for ${socket.id}`,) + } + } + /** + * Closes the underlying connection. + * + * @private + */ + private close() { + if ("open" == this.conn.readyState) { + console.debug("forcing transport close") + this.onclose("forced server close") + this.conn.close() + } + } + /** + * Writes a packet to the transport. + * + * @param {Object} packet object + * @param {Object} opts + * @private + */ + _packet(packet, opts = { preEncoded: false }) { + // opts = opts || {} + // const self = this + + // // this writes to the actual connection + // function writeToEngine(encodedPackets) { + // if (opts.volatile && !self.conn.transport.writable) return + // for (let i = 0; i < encodedPackets.length; i++) { + // self.conn.write(encodedPackets[i], { compress: opts.compress }) + // } + // } + + // if ("open" == this.conn.readyState) { + // debug("writing packet %j", packet) + // if (!opts.preEncoded) { + // // not broadcasting, need to encode + // writeToEngine(this.encoder.encode(packet)) // encode, then write results to engine + // } else { + // // a broadcast pre-encodes a packet + // writeToEngine(packet) + // } + // } else { + // debug("ignoring packet write %j", packet) + // } this.conn.send(opts.preEncoded ? packet as unknown as string : parser.encode(packet)) } - onclose(reason?: string) { - // debug('client close with reason %s', reason); - // ignore a potential subsequent `close` event - this.destroy(); - // `nsps` and `sockets` are cleaned up seamlessly - for (var id in this.sockets) { - if (this.sockets.hasOwnProperty(id)) { - this.sockets[id].onclose(reason); + /** + * Called with incoming transport data. + * + * @private + */ + private ondata(data) { + // try/catch is needed for protocol violations (GH-1880) + try { + this.decoder.add(data) + } catch (e) { + this.onerror(e) + } + } + /** + * Called when parser fully decodes a packet. + * + * @private + */ + ondecoded(packet: Packet) { + if (SubPacketTypes.CONNECT == packet.sub_type) { + this.connect(packet.nsp, packet.data) + } else { + const socket = this.nsps.get(packet.nsp) + if (socket) { + process.nextTick(function () { + socket._onpacket(packet) + }) + } else { + console.debug(`client ${this.id} no socket for namespace ${packet.nsp} avalibe socket: `) + this.nsps.forEach((v, k) => console.debug(`- ${k} => ${v}`)) } } - this.sockets = {}; + } + /** + * Handles an error. + * + * @param {Object} err object + * @private + */ + private onerror(err) { + for (const socket of this.sockets.values()) { + socket._onerror(err) + } + this.conn.close() + } + onclose(reason?: string) { + console.debug(`client ${this.id} close with reason ${reason}`) + // ignore a potential subsequent `close` event + this.destroy() + // `nsps` and `sockets` are cleaned up seamlessly + for (const socket of this.sockets.values()) { + socket._onclose(reason) + } + this.sockets.clear() // this.decoder.destroy(); // clean up decoder } - disconnect() { - if ('open' == this.conn.readyState) { - // debug('forcing transport close'); - this.conn.close(); - this.onclose('forced server close'); - } - } - remove(socket: Socket) { - if (this.sockets.hasOwnProperty(socket.id)) { - var nsp = this.sockets[socket.id].nsp.name; - delete this.sockets[socket.id]; - delete this.nsps[nsp]; - } else { - // debug('ignoring remove for %s', socket.id); - } - } + destroy() { // this.conn.removeListener('data', this.ondata); // this.conn.removeListener('error', this.onerror); diff --git a/packages/websocket/src/socket-io/index.ts b/packages/websocket/src/socket-io/index.ts index d25a0bd9..d6313e25 100644 --- a/packages/websocket/src/socket-io/index.ts +++ b/packages/websocket/src/socket-io/index.ts @@ -3,104 +3,500 @@ import { EventEmitter } from 'events' import { ServerEvent } from './constants' import { Namespace } from './namespace' import { Client } from './client' -import { SocketIO } from './interfaces' import { Parser } from './parser' import { PacketTypes, SubPacketTypes } from './types' import { Packet } from './packet' import { Socket } from './socket' import { Adapter } from './adapter' +import { InnerClient } from '../interfaces' +import { ParentNamespace } from './parent-namespace' -interface ServerOptions extends SocketIO.ServerOptions { +interface EngineOptions { + /** + * how many ms without a pong packet to consider the connection closed + * @default 5000 + */ + pingTimeout: number + /** + * how many ms before sending a new ping packet + * @default 25000 + */ + pingInterval: number + /** + * how many ms before an uncompleted transport upgrade is cancelled + * @default 10000 + */ + upgradeTimeout: number + /** + * how many bytes or characters a message can be, before closing the session (to avoid DoS). + * @default 1e5 (100 KB) + */ + maxHttpBufferSize: number + /** + * A function that receives a given handshake or upgrade request as its first parameter, + * and can decide whether to continue or not. The second argument is a function that needs + * to be called with the decided information: fn(err, success), where success is a boolean + * value where false means that the request is rejected, and err is an error code. + */ + // allowRequest: ( + // req: http.IncomingMessage, + // fn: (err: string | null | undefined, success: boolean) => void + // ) => void + /** + * the low-level transports that are enabled + * @default ["polling", "websocket"] + */ + // transports: Transport[] + /** + * whether to allow transport upgrades + * @default true + */ + allowUpgrades: boolean + /** + * parameters of the WebSocket permessage-deflate extension (see ws module api docs). Set to false to disable. + * @default false + */ + perMessageDeflate: boolean | object + /** + * parameters of the http compression for the polling transports (see zlib api docs). Set to false to disable. + * @default true + */ + httpCompression: boolean | object + /** + * what WebSocket server implementation to use. Specified module must + * conform to the ws interface (see ws module api docs). Default value is ws. + * An alternative c++ addon is also available by installing uws module. + */ + wsEngine: string + /** + * an optional packet which will be concatenated to the handshake packet emitted by Engine.IO. + */ + initialPacket: any + /** + * configuration of the cookie that contains the client sid to send as part of handshake response headers. This cookie + * might be used for sticky-session. Defaults to not sending any cookie. + * @default false + */ + // cookie: CookieSerializeOptions | boolean + /** + * the options that will be forwarded to the cors module + */ + // cors: CorsOptions +} + +interface AttachOptions { + /** + * name of the path to capture + * @default "/engine.io" + */ + path: string + /** + * destroy unhandled upgrade requests + * @default true + */ + destroyUpgrade: boolean + /** + * milliseconds after which unhandled requests are ended + * @default 1000 + */ + destroyUpgradeTimeout: number +} + +interface EngineAttachOptions extends EngineOptions, AttachOptions { } + +interface ServerOptions extends EngineAttachOptions { event?: EventEmitter root?: string + /** + * name of the path to capture + * @default "/socket.io" + */ + path: string + /** + * whether to serve the client files + * @default true + */ + serveClient: boolean + /** + * the adapter to use + * @default the in-memory adapter (https://github.com/socketio/socket.io-adapter) + */ + adapter: any + /** + * the parser to use + * @default the default parser (https://github.com/socketio/socket.io-parser) + */ + parser: any + /** + * how many ms before a client without namespace is closed + * @default 45000 + */ + connectTimeout: number } interface WebSocketServer extends EventEmitter { close(): void } -class Server implements SocketIO.Server { +class Server { + public readonly sockets: Namespace + + /** + * @private + */ + readonly _parser: Parser + private readonly encoder + + /** + * @private + */ + _nsps: Map + private parentNsps: Map< + | string + | RegExp + | (( + name: string, + query: object, + fn: (err: Error, success: boolean) => void + ) => void), + ParentNamespace + > = new Map(); + private _adapter: Adapter + // private _serveClient: boolean; + private eio + private engine: { ws: any } + private _path: string + private clientPathRegex: RegExp + /** + * @private + */ + _connectTimeout: number + + options: ServerOptions private websocketServer: WebSocketServer private allClients: Map - engine: { ws: any } - nsps: { [namespace: string]: Namespace } - sockets: Namespace - json: SocketIO.Server - volatile: SocketIO.Server - local: SocketIO.Server - parser = new Parser(); - _adapter: Adapter - options: ServerOptions - - constructor(instance: any, options: ServerOptions) { + constructor(instance: any, options: Partial) { if (!instance) { throw new Error('instance can\'t be undefiend!') } this.allClients = new Map() - this.nsps = {} - this.sockets = new Namespace('/', this) - this.nsps['/'] = this.sockets + this._nsps = new Map() + this.connectTimeout(options.connectTimeout || 45000) + this._parser = options.parser || new Parser() + this.adapter(options.adapter || Adapter) + this.sockets = this.of('/') if (instance.class.name.startsWith('io.netty.channel')) { - let { NettyWebSocketServer } = require("../server") + let { NettyWebSocketServer } = require("../netty") this.websocketServer = new NettyWebSocketServer(instance, Object.assign({ event: new EventEmitter(), path: '/socket.io', root: root + '/wwwroot' }, options)) } else { - let { TomcatWebSocketServer } = require("../tomcat/server") + let { TomcatWebSocketServer } = require("../tomcat") this.websocketServer = new TomcatWebSocketServer(instance, options) } this.initServer() } + /** + * Sets/gets whether client code is being served. + * + * @param {Boolean} v - whether to serve client code + * @return {Server|Boolean} self when setting or value when getting + * @public + */ + public serveClient(v: boolean): Server + public serveClient(): boolean + public serveClient(v?: boolean): Server | boolean { + throw new Error("Method not implemented.") + } + /** + * Executes the middleware for an incoming namespace not already created on the server. + * + * @param {String} name - name of incoming namespace + * @param {Object} auth - the auth parameters + * @param {Function} fn - callback + * + * @private + */ + _checkNamespace( + name: string, + auth: object, + fn: (nsp: Namespace) => void + ) { + // if (this.parentNsps.size === 0) return fn(false) - checkRequest(req: any, fn: (err: any, success: boolean) => void): void { - throw new Error("Method not implemented.") - } - serveClient(): boolean - serveClient(v: boolean): SocketIO.Server - serveClient(v?: any): boolean | SocketIO.Server { - throw new Error("Method not implemented.") + // const keysIterator = this.parentNsps.keys() + + // const run = () => { + // let nextFn = keysIterator.next() + // if (nextFn.done) { + // return fn(false) + // } + // nextFn.value(name, auth, (err, allow) => { + // if (err || !allow) { + // run() + // } else { + // fn(this.parentNsps.get(nextFn.value).createChild(name)) + // } + // }) + // } + fn(undefined) } + /** + * Sets the client serving path. + * + * @param {String} v pathname + * @return {Server|String} self when setting or value when getting + * @public + */ path(): string - path(v: string): SocketIO.Server - path(v?: any): string | SocketIO.Server { - if (!arguments.length) return this.options.path - this.options.path = v.replace(/\/$/, '') + path(v: string): Server + path(v?: any): string | Server { + if (!arguments.length) return this._path + + this._path = v.replace(/\/$/, "") + + const escapedPath = this._path.replace(/[-\/\\^$*+?.()|[\]{}]/g, "\\$&") + this.clientPathRegex = new RegExp( + "^" + + escapedPath + + "/socket\\.io(\\.min|\\.msgpack\\.min)?\\.js(\\.map)?$" + ) return this } - adapter(): Adapter - adapter(v: any): SocketIO.Server - adapter(v?: any): Adapter | SocketIO.Server { + /** + * Set the delay after which a client without namespace is closed + * @param v + * @public + */ + public connectTimeout(v: number): Server + public connectTimeout(): number + public connectTimeout(v?: number): Server | number { + if (v === undefined) return this._connectTimeout + this._connectTimeout = v + return this + } + /** + * Sets the adapter for rooms. + * + * @param {Adapter} v pathname + * @return {Server|Adapter} self when setting or value when getting + * @public + */ + public adapter(): any + public adapter(v: any) + public adapter(v?): Server | any { if (!arguments.length) return this._adapter this._adapter = v - for (var i in this.nsps) { - if (this.nsps.hasOwnProperty(i)) { - this.nsps[i].initAdapter() - } + for (const nsp of this._nsps.values()) { + nsp._initAdapter() } return this } - origins(): string | string[] - origins(v: string | string[]): SocketIO.Server - origins(fn: (origin: string, callback: (error: string, success: boolean) => void) => void): SocketIO.Server - origins(fn?: any): string | string[] | SocketIO.Server { - throw new Error("Method not implemented.") - } - attach(srv: any, opts?: SocketIO.ServerOptions): SocketIO.Server - attach(port: number, opts?: SocketIO.ServerOptions): SocketIO.Server - attach(port: any, opts?: any): SocketIO.Server { - throw new Error("Method not implemented.") - } - listen(srv: any, opts?: SocketIO.ServerOptions): SocketIO.Server - listen(port: number, opts?: SocketIO.ServerOptions): SocketIO.Server - listen(port: any, opts?: any): SocketIO.Server { - throw new Error("Method not implemented.") - } - bind(srv: any): SocketIO.Server { - throw new Error("Method not implemented.") - } - onconnection(client: Client): SocketIO.Server { - client.packet({ + // /** + // * Attaches socket.io to a server or port. + // * + // * @param {http.Server|Number} srv - server or port + // * @param {Object} opts - options passed to engine.io + // * @return {Server} self + // * @public + // */ + // public listen(srv: http.Server, opts?: Partial): Server + // public listen(srv: number, opts?: Partial): Server + // public listen(srv: any, opts: Partial = {}): Server { + // return this.attach(srv, opts) + // } + + // /** + // * Attaches socket.io to a server or port. + // * + // * @param {http.Server|Number} srv - server or port + // * @param {Object} opts - options passed to engine.io + // * @return {Server} self + // * @public + // */ + // public attach(srv: http.Server, opts?: Partial): Server + // public attach(port: number, opts?: Partial): Server + // public attach(srv: any, opts: Partial = {}): Server { + // if ("function" == typeof srv) { + // const msg = + // "You are trying to attach socket.io to an express " + + // "request handler function. Please pass a http.Server instance." + // throw new Error(msg) + // } + + // // handle a port as a string + // if (Number(srv) == srv) { + // srv = Number(srv) + // } + + // if ("number" == typeof srv) { + // debug("creating http server and binding to %d", srv) + // const port = srv + // srv = http.createServer((req, res) => { + // res.writeHead(404) + // res.end() + // }) + // srv.listen(port) + // } + + // // set engine.io path to `/socket.io` + // opts.path = opts.path || this._path + + // this.initEngine(srv, opts) + + // return this + // } + // /** + // * Initialize engine + // * + // * @param srv - the server to attach to + // * @param opts - options passed to engine.io + // * @private + // */ + // private initEngine(srv: http.Server, opts: Partial) { + // // initialize engine + // debug("creating engine.io instance with opts %j", opts) + // this.eio = engine.attach(srv, opts) + + // // attach static file serving + // if (this._serveClient) this.attachServe(srv) + + // // Export http server + // this.httpServer = srv + + // // bind to engine events + // this.bind(this.eio) + // } + + // /** + // * Attaches the static file serving. + // * + // * @param {Function|http.Server} srv http server + // * @private + // */ + // private attachServe(srv) { + // debug("attaching client serving req handler") + + // const evs = srv.listeners("request").slice(0) + // srv.removeAllListeners("request") + // srv.on("request", (req, res) => { + // if (this.clientPathRegex.test(req.url)) { + // this.serve(req, res) + // } else { + // for (let i = 0; i < evs.length; i++) { + // evs[i].call(srv, req, res) + // } + // } + // }) + // } + // /** + // * Handles a request serving of client source and map + // * + // * @param {http.IncomingMessage} req + // * @param {http.ServerResponse} res + // * @private + // */ + // private serve(req: http.IncomingMessage, res: http.ServerResponse) { + // const filename = req.url.replace(this._path, "") + // const isMap = dotMapRegex.test(filename) + // const type = isMap ? "map" : "source" + + // // Per the standard, ETags must be quoted: + // // https://tools.ietf.org/html/rfc7232#section-2.3 + // const expectedEtag = '"' + clientVersion + '"' + + // const etag = req.headers["if-none-match"] + // if (etag) { + // if (expectedEtag == etag) { + // debug("serve client %s 304", type) + // res.writeHead(304) + // res.end() + // return + // } + // } + + // debug("serve client %s", type) + + // res.setHeader("Cache-Control", "public, max-age=0") + // res.setHeader( + // "Content-Type", + // "application/" + (isMap ? "json" : "javascript") + // ) + // res.setHeader("ETag", expectedEtag) + + // if (!isMap) { + // res.setHeader("X-SourceMap", filename.substring(1) + ".map") + // } + // Server.sendFile(filename, req, res) + // } + + // /** + // * @param filename + // * @param req + // * @param res + // * @private + // */ + // private static sendFile( + // filename: string, + // req: http.IncomingMessage, + // res: http.ServerResponse + // ) { + // const readStream = createReadStream( + // path.join(__dirname, "../client-dist/", filename) + // ) + // const encoding = accepts(req).encodings(["br", "gzip", "deflate"]) + + // const onError = err => { + // if (err) { + // res.end() + // } + // } + + // switch (encoding) { + // case "br": + // res.writeHead(200, { "content-encoding": "br" }) + // readStream.pipe(createBrotliCompress()).pipe(res) + // pipeline(readStream, createBrotliCompress(), res, onError) + // break + // case "gzip": + // res.writeHead(200, { "content-encoding": "gzip" }) + // pipeline(readStream, createGzip(), res, onError) + // break + // case "deflate": + // res.writeHead(200, { "content-encoding": "deflate" }) + // pipeline(readStream, createDeflate(), res, onError) + // break + // default: + // res.writeHead(200) + // pipeline(readStream, res, onError) + // } + // } + + // /** + // * Binds socket.io to an engine.io instance. + // * + // * @param {engine.Server} engine engine.io (or compatible) server + // * @return {Server} self + // * @public + // */ + // public bind(engine): Server { + // this.engine = engine + // this.engine.on("connection", this.onconnection.bind(this)) + // return this + // } + /** + * Called with each incoming transport connection. + * + * @param {engine.Socket} conn + * @return {Server} self + * @private + */ + private onconnection(conn): Server { + console.debug(`incoming connection with id ${conn.id}`) + let client = new Client(this, conn) + this.allClients.set(conn.id, client) + client._packet({ type: PacketTypes.OPEN, data: { sid: client.id, @@ -109,88 +505,136 @@ class Server implements SocketIO.Server { pingTimeout: 5000 } }) - this.sockets.add(client) return this } - of(nsp: string): Namespace { - if (!this.nsps[nsp]) { - this.nsps[nsp] = new Namespace(nsp, this) + // of(nsp: string): Namespace { + // if (!this._nsps.has(nsp)) { + // console.debug(`create Namespace ${nsp}`) + // this._nsps.set(nsp, new Namespace(this, nsp)) + // } + // return this._nsps.get(nsp) + // } + /** + * Looks up a namespace. + * + * @param {String|RegExp|Function} name nsp name + * @param {Function} [fn] optional, nsp `connection` ev handler + * @public + */ + public of( + name: + | string + | RegExp + | (( + name: string, + query: object, + fn: (err: Error, success: boolean) => void + ) => void), + fn?: (socket: Socket) => void + ) { + if (typeof name === "function" || name instanceof RegExp) { + const parentNsp = new ParentNamespace(this) + console.debug(`initializing parent namespace ${parentNsp.name}`) + if (typeof name === "function") { + this.parentNsps.set(name, parentNsp) + } else { + this.parentNsps.set( + (nsp, conn, next) => next(null, (name as RegExp).test(nsp)), + parentNsp + ) + } + if (fn) { + // @ts-ignore + parentNsp.on("connect", fn) + } + return parentNsp } - return this.nsps[nsp] + if (String(name)[0] !== "/") name = "/" + name + let nsp = this._nsps.get(name) + if (!nsp) { + console.debug(`initializing namespace ${name}`) + nsp = new Namespace(this, name) + this._nsps.set(name, nsp) + } + if (fn) nsp.on("connect", fn) + return nsp } close(fn?: () => void): void { - for (let socket in this.sockets.sockets) { - this.sockets.sockets[socket].onclose() + for (const socket of this.sockets.sockets.values()) { + socket._onclose("server shutting down") } + + // this.engine.close() this.websocketServer.close() + + // if (this.httpServer) { + // this.httpServer.close(fn) + // } else { + fn && fn() + // } } - on(event: "connection", listener: (socket: SocketIO.Socket) => void): SocketIO.Namespace - on(event: "connect", listener: (socket: SocketIO.Socket) => void): SocketIO.Namespace - on(event: string, listener: Function): SocketIO.Namespace - on(event: any, listener: any): SocketIO.Namespace { + on(event: "connection", listener: (socket: Socket) => void): Namespace + on(event: "connect", listener: (socket: Socket) => void): Namespace + on(event: string, listener: Function): Namespace + on(event: any, listener: any): Namespace { return this.sockets.on(event, listener) } - to(room: string): SocketIO.Namespace { + to(room: string): Namespace { return this.sockets.to(room) } - in(room: string): SocketIO.Namespace { + in(room: string): Namespace { return this.sockets.in(room) } - use(fn: (socket: SocketIO.Socket, fn: (err?: any) => void) => void): SocketIO.Namespace { + use(fn: (socket: Socket, fn: (err?: any) => void) => void): Namespace { return this.sockets.use(fn) } - emit(event: string, ...args: any[]): SocketIO.Namespace { + emit(event: string, ...args: any[]): Namespace { // @ts-ignore return this.sockets.emit(event, ...args) } - send(...args: any[]): SocketIO.Namespace { + send(...args: any[]): Namespace { return this.sockets.send(...args) } - write(...args: any[]): SocketIO.Namespace { + write(...args: any[]): Namespace { return this.sockets.write(...args) } - clients(...args: any[]): SocketIO.Namespace { + clients(...args: any[]): Namespace { return this.sockets.clients(args[0]) } - compress(...args: any[]): SocketIO.Namespace { + compress(...args: any[]): Namespace { return this.sockets.compress(args[0]) } // =============================== - checkNamespace(name, query, fn) { - fn(false) - }; - private initServer() { - this.websocketServer.on(ServerEvent.connect, (socket: SocketIO.EngineSocket) => { - let client = new Client(this, socket) - this.allClients.set(socket.id, client) - this.onconnection(client) + this.websocketServer.on(ServerEvent.connect, (innerClient: InnerClient) => { + this.onconnection(innerClient) }) - this.websocketServer.on(ServerEvent.message, (socket: SocketIO.EngineSocket, text) => { - if (this.allClients.has(socket.id)) { - this.processPacket(this.parser.decode(text), this.allClients.get(socket.id)) + this.websocketServer.on(ServerEvent.message, (innerClient: InnerClient, text) => { + if (this.allClients.has(innerClient.id)) { + this.processPacket(this._parser.decode(text), this.allClients.get(innerClient.id)) } else { - console.error(`unknow engine socket ${socket.id} reciver message ${text}`) + console.error(`unknow engine socket ${innerClient.id} reciver message ${text}`) } }) - this.websocketServer.on(ServerEvent.disconnect, (socket: SocketIO.EngineSocket, reason) => { - if (this.allClients.has(socket.id)) { - this.allClients.get(socket.id).onclose(reason) - this.allClients.delete(socket.id) + this.websocketServer.on(ServerEvent.disconnect, (innerClient: InnerClient, reason) => { + if (this.allClients.has(innerClient.id)) { + this.allClients.get(innerClient.id).onclose(reason) + this.allClients.delete(innerClient.id) } else { - console.error(`unknow engine socket ${socket?.id} disconnect cause ${reason}`) + console.error(`unknow engine innerClient ${innerClient?.id} disconnect cause ${reason}`) } }) - this.websocketServer.on(ServerEvent.error, (socket: SocketIO.EngineSocket, cause) => { - if (this.allClients.has(socket?.id)) { - if (socket.listeners(ServerEvent.error).length) { - socket.emit(ServerEvent.error, cause) + this.websocketServer.on(ServerEvent.error, (innerClient: InnerClient, cause) => { + if (this.allClients.has(innerClient?.id)) { + let client = this.allClients.get(innerClient?.id) + if (client.listeners(ServerEvent.error).length) { + client.emit(ServerEvent.error, cause) } else { - console.error(`engine socket ${socket.id} cause error: ${cause}`) + console.error(`engine innerClient ${innerClient.id} cause error: ${cause}`) console.ex(cause) } } else { - console.error(`unknow engine socket ${socket?.id} cause error: ${cause}`) + console.error(`unknow innerClient ${innerClient?.id} cause error: ${cause}`) console.ex(cause) } }) @@ -199,7 +643,7 @@ class Server implements SocketIO.Server { private processPacket(packet: Packet, client: Client) { switch (packet.type) { case PacketTypes.PING: - client.packet({ + client._packet({ type: PacketTypes.PONG, data: packet.data }) @@ -216,19 +660,33 @@ class Server implements SocketIO.Server { } private processSubPacket(packet: Packet, client: Client) { - let namespace = this.nsps[packet.nsp] - if (!namespace) { - client.packet({ - type: PacketTypes.MESSAGE, - sub_type: SubPacketTypes.ERROR, - data: 'not support dynamic namespace: ' + packet.nsp - }) - client.disconnect() - return + switch (packet.sub_type) { + case SubPacketTypes.CONNECT: + client.doConnect(packet.nsp, {}) + break + default: + client.ondecoded(packet) + break } - namespace.process(packet, client) } } + +/** + * Expose main namespace (/). + */ + +const emitterMethods = Object.keys(EventEmitter.prototype).filter(function ( + key +) { + return typeof EventEmitter.prototype[key] === "function" +}) + +emitterMethods.forEach(function (fn) { + Server.prototype[fn] = function () { + return this.sockets[fn].apply(this.sockets, arguments) + } +}) + export { Server, Socket, diff --git a/packages/websocket/src/socket-io/interfaces.ts b/packages/websocket/src/socket-io/interfaces.ts deleted file mode 100644 index f3da485a..00000000 --- a/packages/websocket/src/socket-io/interfaces.ts +++ /dev/null @@ -1,844 +0,0 @@ -export declare namespace SocketIO { - interface Server { - engine: { ws: any }; - - /** - * A dictionary of all the namespaces currently on this Server - */ - nsps: { [namespace: string]: Namespace }; - - /** - * The default '/' Namespace - */ - sockets: Namespace; - - /** - * Sets the 'json' flag when emitting an event - */ - json: Server; - - /** - * Sets a modifier for a subsequent event emission that the event data may be lost if the clients are not ready to receive messages - */ - volatile: Server; - - /** - * Sets a modifier for a subsequent event emission that the event data will only be broadcast to the current node - */ - local: Server; - - /** - * Server request verification function, that checks for allowed origins - * @param req The http.IncomingMessage request - * @param fn The callback to be called. It should take one parameter, err, - * which will be null if there was no problem, and one parameter, success, - * of type boolean - */ - checkRequest(req: any, fn: (err: any, success: boolean) => void): void; - - /** - * Gets whether we're serving the client.js file or not - * @default true - */ - serveClient(): boolean; - - /** - * Sets whether we're serving the client.js file or not - * @param v True if we want to serve the file, false otherwise - * @default true - * @return This Server - */ - serveClient(v: boolean): Server; - - /** - * Gets the client serving path - * @default '/socket.io' - */ - path(): string; - - /** - * Sets the client serving path - * @param v The path to serve the client file on - * @default '/socket.io' - * @return This Server - */ - path(v: string): Server; - - /** - * Gets the adapter that we're going to use for handling rooms - * @default typeof Adapter - */ - adapter(): any; - - /** - * Sets the adapter (class) that we're going to use for handling rooms - * @param v The class for the adapter to create - * @default typeof Adapter - * @return This Server - */ - adapter(v: any): Server; - - /** - * Gets the allowed origins for requests - * @default "*:*" - */ - origins(): string | string[]; - - /** - * Sets the allowed origins for requests - * @param v The allowed origins, in host:port form - * @default "*:*" - * return This Server - */ - origins(v: string | string[]): Server; - - /** - * Provides a function taking two arguments origin:String - * and callback(error, success), where success is a boolean - * value indicating whether origin is allowed or not. If - * success is set to false, error must be provided as a string - * value that will be appended to the server response, e.g. “Origin not allowed”. - * @param fn The function that will be called to check the origin - * return This Server - */ - origins(fn: (origin: string, callback: (error: string | null, success: boolean) => void) => void): Server; - - /** - * Attaches socket.io to a server - * @param srv The http.Server that we want to attach to - * @param opts An optional parameters object - * @return This Server - */ - attach(srv: any, opts?: ServerOptions): Server; - - /** - * Attaches socket.io to a port - * @param port The port that we want to attach to - * @param opts An optional parameters object - * @return This Server - */ - attach(port: number, opts?: ServerOptions): Server; - - /** - * @see attach( srv, opts ) - */ - listen(srv: any, opts?: ServerOptions): Server; - - /** - * @see attach( port, opts ) - */ - listen(port: number, opts?: ServerOptions): Server; - - /** - * Binds socket.io to an engine.io instance - * @param src The Engine.io (or compatible) server to bind to - * @return This Server - */ - bind(srv: any): Server; - - /** - * Called with each incoming connection - * @param socket The Engine.io Socket - * @return This Server - */ - onconnection(socket: any): Server; - - /** - * Looks up/creates a Namespace - * @param nsp The name of the NameSpace to look up/create. Should start - * with a '/' - * @return The Namespace - */ - of(nsp: string | RegExp | Function): Namespace; - - /** - * Closes the server connection - */ - close(fn?: () => void): void; - - /** - * The event fired when we get a new connection - * @param event The event being fired: 'connection' - * @param listener A listener that should take one parameter of type Socket - * @return The default '/' Namespace - */ - on(event: 'connection', listener: (socket: Socket) => void): Namespace; - - /** - * @see on( 'connection', listener ) - */ - on(event: 'connect', listener: (socket: Socket) => void): Namespace; - - /** - * Base 'on' method to add a listener for an event - * @param event The event that we want to add a listener for - * @param listener The callback to call when we get the event. The parameters - * for the callback depend on the event - * @return The default '/' Namespace - */ - on(event: string, listener: Function): Namespace; - - /** - * Targets a room when emitting to the default '/' Namespace - * @param room The name of the room that we're targeting - * @return The default '/' Namespace - */ - to(room: string): Namespace; - - /** - * @see to( room ) - */ - in(room: string): Namespace; - - /** - * Registers a middleware function, which is a function that gets executed - * for every incoming Socket, on the default '/' Namespace - * @param fn The function to call when we get a new incoming socket. It should - * take one parameter of type Socket, and one callback function to call to - * execute the next middleware function. The callback can take one optional - * parameter, err, if there was an error. Errors passed to middleware callbacks - * are sent as special 'error' packets to clients - * @return The default '/' Namespace - */ - use(fn: (socket: Socket, fn: (err?: any) => void) => void): Namespace; - - /** - * Emits an event to the default Namespace - * @param event The event that we want to emit - * @param args Any number of optional arguments to pass with the event. If the - * last argument is a function, it will be called as an ack. The ack should - * take whatever data was sent with the packet - * @return The default '/' Namespace - */ - emit(event: string, ...args: any[]): Namespace; - - /** - * Sends a 'message' event - * @see emit( event, ...args ) - * @return The default '/' Namespace - */ - send(...args: any[]): Namespace; - - /** - * @see send( ...args ) - */ - write(...args: any[]): Namespace; - - /** - * Gets a list of clients - * @return The default '/' Namespace - */ - clients(...args: any[]): Namespace; - - /** - * Sets the compress flag - * @return The default '/' Namespace - */ - compress(...args: any[]): Namespace; - } - - /** - * Options to pass to our server when creating it - */ - interface ServerOptions { - - /** - * The path to server the client file to - * @default '/socket.io' - */ - path?: string; - - /** - * Should we serve the client file? - * @default true - */ - serveClient?: boolean; - - /** - * The adapter to use for handling rooms. NOTE: this should be a class, - * not an object - * @default typeof Adapter - */ - adapter?: Adapter; - - /** - * Accepted origins - * @default '*:*' - */ - origins?: string | string[]; - - /** - * How many milliseconds without a pong packed to consider the connection closed (engine.io) - * @default 60000 - */ - pingTimeout?: number; - - /** - * How many milliseconds before sending a new ping packet (keep-alive) (engine.io) - * @default 25000 - */ - pingInterval?: number; - - /** - * How many bytes or characters a message can be when polling, before closing the session - * (to avoid Dos) (engine.io) - * @default 10E7 - */ - maxHttpBufferSize?: number; - - /** - * A function that receives a given handshake or upgrade request as its first parameter, - * and can decide whether to continue or not. The second argument is a function that needs - * to be called with the decided information: fn( err, success ), where success is a boolean - * value where false means that the request is rejected, and err is an error code (engine.io) - * @default null - */ - allowRequest?: (request: any, callback: (err: number, success: boolean) => void) => void; - - /** - * Transports to allow connections to (engine.io) - * @default ['polling','websocket'] - */ - transports?: string[]; - - /** - * Whether to allow transport upgrades (engine.io) - * @default true - */ - allowUpgrades?: boolean; - - /** - * parameters of the WebSocket permessage-deflate extension (see ws module). - * Set to false to disable (engine.io) - * @default true - */ - perMessageDeflate?: Object | boolean; - - /** - * Parameters of the http compression for the polling transports (see zlib). - * Set to false to disable, or set an object with parameter "threshold:number" - * to only compress data if the byte size is above this value (1024) (engine.io) - * @default true|1024 - */ - httpCompression?: Object | boolean; - - /** - * Name of the HTTP cookie that contains the client sid to send as part of - * handshake response headers. Set to false to not send one (engine.io) - * @default "io" - */ - cookie?: string | boolean; - - /** - * Whether to let engine.io handle the OPTIONS requests. - * You can also pass a custom function to handle the requests - * @default true - */ - handlePreflightRequest?: ((req: any, res: any) => void) | boolean; - } - - /** - * The Namespace, sandboxed environments for sockets, each connection - * to a Namespace requires a new Socket - */ - interface Namespace extends NodeJS.EventEmitter { - - /** - * The name of the NameSpace - */ - name: string; - - /** - * The controller Server for this Namespace - */ - server: Server; - - /** - * A dictionary of all the Sockets connected to this Namespace, where - * the Socket ID is the key - */ - sockets: { [id: string]: Socket }; - - /** - * A dictionary of all the Sockets connected to this Namespace, where - * the Socket ID is the key - */ - connected: { [id: string]: Socket }; - - /** - * The Adapter that we're using to handle dealing with rooms etc - */ - adapter: Adapter; - - /** - * Sets the 'json' flag when emitting an event - */ - json: Namespace; - - /** - * Registers a middleware function, which is a function that gets executed - * for every incoming Socket - * @param fn The function to call when we get a new incoming socket. It should - * take one parameter of type Socket, and one callback function to call to - * execute the next middleware function. The callback can take one optional - * parameter, err, if there was an error. Errors passed to middleware callbacks - * are sent as special 'error' packets to clients - * @return This Namespace - */ - use(fn: (socket: Socket, fn: (err?: any) => void) => void): Namespace; - - /** - * Targets a room when emitting - * @param room The name of the room that we're targeting - * @return This Namespace - */ - to(room: string): Namespace; - - /** - * @see to( room ) - */ - in(room: string): Namespace; - - /** - * Sends a 'message' event - * @see emit( event, ...args ) - * @return This Namespace - */ - send(...args: any[]): Namespace; - - /** - * @see send( ...args ) - */ - write(...args: any[]): Namespace; - - /** - * The event fired when we get a new connection - * @param event The event being fired: 'connection' - * @param listener A listener that should take one parameter of type Socket - * @return This Namespace - */ - on(event: 'connection', listener: (socket: Socket) => void): this; - - /** - * @see on( 'connection', listener ) - */ - on(event: 'connect', listener: (socket: Socket) => void): this; - - /** - * Base 'on' method to add a listener for an event - * @param event The event that we want to add a listener for - * @param listener The callback to call when we get the event. The parameters - * for the callback depend on the event - * @ This Namespace - */ - on(event: string, listener: Function): this; - - /** - * Gets a list of clients. - * @return This Namespace - */ - clients(fn: Function): Namespace; - - /** - * Sets the compress flag. - * @param compress If `true`, compresses the sending data - * @return This Namespace - */ - compress(compress: boolean): Namespace; - } - - interface Packet extends Array { - /** - * Event name - */ - [0]: string; - /** - * Packet data - */ - [1]: any; - /** - * Ack function - */ - [2]: (...args: any[]) => void; - } - - /** - * The socket, which handles our connection for a namespace. NOTE: while - * we technically extend NodeJS.EventEmitter, we're not putting it here - * as we have a problem with the emit() event (as it's overridden with a - * different return) - */ - interface Socket extends NodeJS.EventEmitter { - - /** - * The namespace that this socket is for - */ - nsp: Namespace; - - /** - * The Server that our namespace is in - */ - server: Server; - - /** - * The Adapter that we use to handle our rooms - */ - adapter: Adapter; - - /** - * The unique ID for this Socket. Regenerated at every connection. This is - * also the name of the room that the Socket automatically joins on connection - */ - id: string; - - /** - * The http.IncomingMessage request sent with the connection. Useful - * for recovering headers etc - */ - request: any; - - /** - * The Client associated with this Socket - */ - client: Client; - - /** - * The underlying Engine.io Socket instance - */ - conn: EngineSocket; - - /** - * The list of rooms that this Socket is currently in, where - * the ID the the room ID - */ - rooms: { [id: string]: string }; - - /** - * Is the Socket currently connected? - */ - connected: boolean; - - /** - * Is the Socket currently disconnected? - */ - disconnected: boolean; - - /** - * The object used when negociating the handshake - */ - handshake: Handshake; - /** - * Sets the 'json' flag when emitting an event - */ - json: Socket; - - /** - * Sets the 'volatile' flag when emitting an event. Volatile messages are - * messages that can be dropped because of network issues and the like. Use - * for high-volume/real-time messages where you don't need to receive *all* - * of them - */ - volatile: Socket; - - /** - * Sets the 'broadcast' flag when emitting an event. Broadcasting an event - * will send it to all the other sockets in the namespace except for yourself - */ - broadcast: Socket; - - /** - * Targets a room when broadcasting - * @param room The name of the room that we're targeting - * @return This Socket - */ - to(room: string): Socket; - - /** - * @see to( room ) - */ - in(room: string): Socket; - - /** - * Registers a middleware, which is a function that gets executed for every incoming Packet and receives as parameter the packet and a function to optionally defer execution to the next registered middleware. - * - * Errors passed to middleware callbacks are sent as special error packets to clients. - */ - use(fn: (packet: Packet, next: (err?: any) => void) => void): Socket; - - /** - * Sends a 'message' event - * @see emit( event, ...args ) - */ - send(...args: any[]): Socket; - - /** - * @see send( ...args ) - */ - write(...args: any[]): Socket; - - /** - * Joins a room. You can join multiple rooms, and by default, on connection, - * you join a room with the same name as your ID - * @param name The name of the room that we want to join - * @param fn An optional callback to call when we've joined the room. It should - * take an optional parameter, err, of a possible error - * @return This Socket - */ - join(name: string | string[], fn?: (err?: any) => void): Socket; - - /** - * Leaves a room - * @param name The name of the room to leave - * @param fn An optional callback to call when we've left the room. It should - * take on optional parameter, err, of a possible error - */ - leave(name: string, fn?: Function): Socket; - - /** - * Leaves all the rooms that we've joined - */ - leaveAll(): void; - - /** - * Disconnects this Socket - * @param close If true, also closes the underlying connection - * @return This Socket - */ - disconnect(close?: boolean): Socket; - - /** - * Returns all the callbacks for a particular event - * @param event The event that we're looking for the callbacks of - * @return An array of callback Functions, or an empty array if we don't have any - */ - listeners(event: string): Function[]; - - /** - * Sets the compress flag - * @param compress If `true`, compresses the sending data - * @return This Socket - */ - compress(compress: boolean): Socket; - - /** - * Emits the error - * @param err Error message= - */ - error(err: any): void; - } - - interface Handshake { - /** - * The headers passed along with the request. e.g. 'host', - * 'connection', 'accept', 'referer', 'cookie' - */ - headers: any; - - /** - * The current time, as a string - */ - time: string; - - /** - * The remote address of the connection request - */ - address: string; - - /** - * Is this a cross-domain request? - */ - xdomain: boolean; - - /** - * Is this a secure request? - */ - secure: boolean; - - /** - * The timestamp for when this was issued - */ - issued: number; - - /** - * The request url - */ - url: string; - - /** - * Any query string parameters in the request url - */ - query: any; - } - - /** - * The interface describing a room - */ - interface Room { - sockets: { [id: string]: boolean }; - length: number; - } - - /** - * The interface describing a dictionary of rooms - * Where room is the name of the room - */ - - interface Rooms { - [room: string]: Room; - } - - /** - * The interface used when dealing with rooms etc - */ - interface Adapter extends NodeJS.EventEmitter { - - /** - * The namespace that this adapter is for - */ - nsp: Namespace; - - /** - * A dictionary of all the rooms that we have in this namespace - */ - rooms: Rooms; - - /** - * A dictionary of all the socket ids that we're dealing with, and all - * the rooms that the socket is currently in - */ - sids: { [id: string]: { [room: string]: boolean } }; - - /** - * Adds a socket to a room. If the room doesn't exist, it's created - * @param id The ID of the socket to add - * @param room The name of the room to add the socket to - * @param callback An optional callback to call when the socket has been - * added. It should take an optional parameter, error, if there was a problem - */ - add(id: string, room: string, callback?: (err?: any) => void): void; - - /** - * Removes a socket from a room. If there are no more sockets in the room, - * the room is deleted - * @param id The ID of the socket that we're removing - * @param room The name of the room to remove the socket from - * @param callback An optional callback to call when the socket has been - * removed. It should take on optional parameter, error, if there was a problem - */ - del(id: string, room: string, callback?: (err?: any) => void): void; - - /** - * Adds a socket to a list of room. - * - * @param {String} socket id - * @param {String} rooms - * @param {Function} callback - * @api public - */ - addAll(id: string, rooms: string | any[], fn: { (err?: any): void; bind?: any; }); - - /** - * Removes a socket from all the rooms that it's joined - * @param id The ID of the socket that we're removing - */ - delAll(id: string): void; - - /** - * Broadcasts a packet - * @param packet The packet to broadcast - * @param opts Any options to send along: - * - rooms: An optional list of rooms to broadcast to. If empty, the packet is broadcast to all sockets - * - except: A list of Socket IDs to exclude - * - flags: Any flags that we want to send along ('json', 'volatile', 'broadcast') - */ - broadcast(packet: any, opts: { rooms?: string[]; except?: string[]; flags?: { [flag: string]: boolean } }): void; - } - - /** - * The client behind each socket (can have multiple sockets) - */ - interface Client { - /** - * The Server that this client belongs to - */ - server: Server; - - /** - * The underlying Engine.io Socket instance - */ - conn: EngineSocket; - - /** - * The ID for this client. Regenerated at every connection - */ - id: string; - - /** - * The http.IncomingMessage request sent with the connection. Useful - * for recovering headers etc - */ - request: any; - - /** - * The dictionary of sockets currently connect via this client (i.e. to different - * namespaces) where the Socket ID is the key - */ - sockets: { [id: string]: Socket }; - - /** - * A dictionary of all the namespaces for this client, with the Socket that - * deals with that namespace - */ - nsps: { [nsp: string]: Socket }; - } - - /** - * A reference to the underlying engine.io Socket connection. - */ - interface EngineSocket extends NodeJS.EventEmitter { - /** - * The ID for this socket - matches Client.id - */ - id: string; - - /** - * The Engine.io Server for this socket - */ - server: any; - - /** - * The ready state for the client. Either 'opening', 'open', 'closing', or 'closed' - */ - readyState: string; - - /** - * The remote IP for this connection - */ - remoteAddress: string; - - /** - * whether the transport has been upgraded - */ - upgraded: boolean; - - /** - * (http.IncomingMessage): request that originated the Socket - */ - request: any; - - /** - * (Transport): transport reference - */ - transport: any; - - /** - * send - */ - send(text: string); - - /** - * close - */ - close(); - } -} diff --git a/packages/websocket/src/socket-io/namespace.ts b/packages/websocket/src/socket-io/namespace.ts index 22248105..95caca35 100644 --- a/packages/websocket/src/socket-io/namespace.ts +++ b/packages/websocket/src/socket-io/namespace.ts @@ -1,85 +1,153 @@ import { EventEmitter } from 'events' import { Client } from './client' -import { SocketIO } from './interfaces' import { ServerEvent } from './constants' -import { Socket } from './socket' -import { Adapter } from './adapter' +import { RESERVED_EVENTS, Socket } from './socket' +import { Adapter, Room, SocketId } from './adapter' import { Server } from './index' import { Packet } from './packet' import { PacketTypes, SubPacketTypes } from './types' -export class Namespace extends EventEmitter implements SocketIO.Namespace { - name: string - server: Server - sockets: { [id: string]: Socket } - connected: { [id: string]: Socket } - adapter: SocketIO.Adapter - json: SocketIO.Namespace +export interface ExtendedError extends Error { + data?: any +} - fns: any[] - ids: number - rooms: string[] - flags: { [key: string]: boolean } +export class Namespace extends EventEmitter { + public readonly name: string + public readonly sockets: Map - private events = ['connect', 'connection', 'newListener'] + public adapter: Adapter - constructor(name: string, server: Server) { + /** @private */ + readonly server: Server + json: Namespace + + /** @private */ + _fns: Array< + (socket: Socket, next: (err: ExtendedError) => void) => void + > = []; + + /** @private */ + _rooms: Set + + /** @private */ + _flags: any = {} + + /** @private */ + _ids: number = 0 + + constructor(server: Server, name: string) { super() - this.name = name this.server = server - this.sockets = {} - this.connected = {} - this.fns = [] - this.ids = 0 - this.rooms = [] - this.flags = {} - this.adapter = new Adapter(this) + this.name = name + '' + this._initAdapter() + // ======================= + this.sockets = new Map() + this._rooms = new Set() } - initAdapter() { + _initAdapter() { // @ts-ignore - this.adapter = new (this.server.adapter())() + this.adapter = new (this.server.adapter())(this) } - add(client: Client, query?: any, callback?: () => void) { - // client.conn.request.url(); - let socket = new Socket(this, client, {}) - this.sockets[client.id] = socket - client.nsps[this.name] = socket - this.onconnection(socket) + /** + * Sets up namespace middleware. + * + * @return {Namespace} self + * @public + */ + public use( + fn: (socket: Socket, next: (err?: ExtendedError) => void) => void + ): Namespace { + throw new Error("Method not implemented.") + this._fns.push(fn) + return this + } + /** + * Executes the middleware for an incoming client. + * + * @param {Socket} socket - the socket that will get added + * @param {Function} fn - last fn call in the middleware + * @private + */ + private run(socket: Socket, fn: (err: ExtendedError) => void) { + const fns = this._fns.slice(0) + if (!fns.length) return fn(null) + + function run(i) { + fns[i](socket, function (err) { + // upon error, short-circuit + if (err) return fn(err) + + // if no middleware left, summon callback + if (!fns[i + 1]) return fn(null) + + // go on to next + run(i + 1) + }) + } + + run(0) + } + to(name: string): Namespace { + this._rooms.add(name) + return this + } + in(name: string): Namespace { + return this.to(name) + } + _add(client: Client, query?: any, fn?: () => void) { + console.debug(`adding socket to nsp ${this.name}`) + const socket = new Socket(this, client, query || {}) + this.run(socket, err => { + process.nextTick(() => { + if ("open" == client.conn.readyState) { + if (err) + return socket._error({ + message: err.message, + data: err.data + }) + + // track socket + this.sockets.set(socket.id, socket) + + // it's paramount that the internal `onconnect` logic + // fires before user-set events to prevent state order + // violations (such as a disconnection before the connection + // logic is complete) + socket._onconnect() + if (fn) fn() + + // fire user-set events + super.emit(ServerEvent.connect, socket) + super.emit(ServerEvent.connection, socket) + } else { + console.debug(`next called after client ${client.id} was closed - ignoring socket`) + } + }) + }) return socket } - del(client: Client) { - let socket = this.sockets[client.id] - socket.disconnect() - delete this.sockets[client.id] - } - use(fn: (socket: SocketIO.Socket, fn: (err?: any) => void) => void): SocketIO.Namespace { - throw new Error("Method not implemented.") - } - to(room: string): SocketIO.Namespace { - if (!~this.rooms.indexOf(room)) this.rooms.push(room) - return this - } - in(room: string): SocketIO.Namespace { - return this.to(room) - } - send(...args: any[]): SocketIO.Namespace { - super.emit('message', ...args) - return this - } - write(...args: any[]): SocketIO.Namespace { - return this.send(...args) + /** + * Removes a client. Called by each `Socket`. + * + * @private + */ + _remove(socket: Socket): void { + if (this.sockets.has(socket.id)) { + console.debug(`namespace ${this.name} remove socket ${socket.id}`) + this.sockets.delete(socket.id) + } else { + console.debug(`ignoring remove for ${socket.id}`) + } } emit(event: string, ...args: any[]): boolean { - if (~this.events.indexOf(event)) { - super.emit(event, ...args) - // @ts-ignore - return this + if (RESERVED_EVENTS.has(event)) { + throw new Error(`"${event}" is a reserved event name`) } // set up packet object var packet = { type: PacketTypes.MESSAGE, - sub_type: (this.flags.binary !== undefined ? this.flags.binary : this.hasBin(args)) ? SubPacketTypes.BINARY_EVENT : SubPacketTypes.EVENT, + sub_type: (this._flags.binary !== undefined ? this._flags.binary : this.hasBin(args)) ? SubPacketTypes.BINARY_EVENT : SubPacketTypes.EVENT, name: event, data: args } @@ -88,55 +156,104 @@ export class Namespace extends EventEmitter implements SocketIO.Namespace { throw new Error('Callbacks are not supported when broadcasting') } - var rooms = this.rooms.slice(0) - var flags = Object.assign({}, this.flags) + var rooms = new Set(this._rooms) + var flags = Object.assign({}, this._flags) // reset flags - this.rooms = [] - this.flags = {} + this._rooms.clear() + this._flags = {} this.adapter.broadcast(packet, { - rooms: rooms, + rooms: new Set(rooms), flags: flags }) // @ts-ignore return this } + send(...args: any[]): Namespace { + this.emit('message', ...args) + return this + } + write(...args: any[]): Namespace { + return this.send(...args) + } + /** + * Gets a list of clients. + * + * @return {Namespace} self + * @public + */ + public allSockets(): Promise> { + if (!this.adapter) { + throw new Error( + "No adapter for this namespace, are you trying to get the list of clients of a dynamic namespace?" + ) + } + const rooms = new Set(this._rooms) + this._rooms.clear() + return this.adapter.sockets(rooms) + } + + /** + * Sets the compress flag. + * + * @param {Boolean} compress - if `true`, compresses the sending data + * @return {Namespace} self + * @public + */ + public compress(compress: boolean): Namespace { + this._flags.compress = compress + return this + } + + /** + * Sets a modifier for a subsequent event emission that the event data may be lost if the client is not ready to + * receive messages (because of network slowness or other issues, or because they’re connected through long polling + * and is in the middle of a request-response cycle). + * + * @return {Namespace} self + * @public + */ + public get volatile(): Namespace { + this._flags.volatile = true + return this + } + + /** + * Sets a modifier for a subsequent event emission that the event data will only be broadcast to the current node. + * + * @return {Namespace} self + * @public + */ + public get local(): Namespace { + this._flags.local = true + return this + } + hasBin(args: any[]) { return false } - clients(fn: (sockets: Socket[]) => SocketIO.Namespace): SocketIO.Namespace { - return fn(Object.values(this.sockets)) + del(client: Client) { + let socket = this.sockets[client.id] + socket.disconnect() + delete this.sockets[client.id] } - compress(compress: boolean): SocketIO.Namespace { - throw new Error("Method not implemented.") + clients(fn: (sockets: Socket[]) => Namespace): Namespace { + return fn(Object.values(this.sockets)) } process(packet: Packet, client: Client) { switch (packet.sub_type) { case SubPacketTypes.CONNECT: - this.add(client) + client.doConnect(this.name, {}) break default: - this.sockets[client.id].onpacket(packet) + this.sockets.get(client.id)._onpacket(packet) break } } - remove(socket: Socket) { - if (this.sockets.hasOwnProperty(socket.id)) { - delete this.sockets[socket.id] - } else { - // debug('ignoring remove for %s', socket.id); - } - } close() { - this.removeAllListeners('connect') + this.removeAllListeners(ServerEvent.connect) + this.removeAllListeners(ServerEvent.connection) Object.values(this.sockets).forEach(socket => socket.disconnect(false)) } - private onconnection(socket: any) { - let client = socket as Socket - this.sockets[client.id] = client - this.emit(ServerEvent.connect, socket) - client.onconnect() - this.emit(ServerEvent.connection, socket) - } } diff --git a/packages/websocket/src/socket-io/parent-namespace.ts b/packages/websocket/src/socket-io/parent-namespace.ts new file mode 100644 index 00000000..88e33d6d --- /dev/null +++ b/packages/websocket/src/socket-io/parent-namespace.ts @@ -0,0 +1,40 @@ +import { Namespace } from "./namespace" + +export class ParentNamespace extends Namespace { + private static count: number = 0; + private children: Set = new Set(); + + constructor(server) { + super(server, "/_" + ParentNamespace.count++) + } + + _initAdapter() { } + + public emit(...args: any[]): boolean { + this.children.forEach(nsp => { + nsp._rooms = this._rooms + nsp._flags = this._flags + nsp.emit.apply(nsp, args as any) + }) + this._rooms.clear() + this._flags = {} + + return true + } + + createChild(name) { + const namespace = new Namespace(this.server, name) + namespace._fns = this._fns.slice(0) + this.listeners("connect").forEach(listener => + // @ts-ignore + namespace.on("connect", listener) + ) + this.listeners("connection").forEach(listener => + // @ts-ignore + namespace.on("connection", listener) + ) + this.children.add(namespace) + this.server._nsps.set(name, namespace) + return namespace + } +} diff --git a/packages/websocket/src/socket-io/parser.ts b/packages/websocket/src/socket-io/parser.ts index bd4c1f39..c86308ac 100644 --- a/packages/websocket/src/socket-io/parser.ts +++ b/packages/websocket/src/socket-io/parser.ts @@ -1,30 +1,31 @@ -import { Packet } from "./packet"; -import { PacketTypes, SubPacketTypes } from "./types"; +import { EventEmitter } from 'events' +import { Packet } from "./packet" +import { PacketTypes, SubPacketTypes } from "./types" -export class Parser { +export class Parser extends EventEmitter { encode(packet: Packet): string { let origin = JSON.stringify(packet) // first is type - let str = '' + packet.type; + let str = '' + packet.type if (packet.type == PacketTypes.PONG) { if (packet.data) { str += packet.data }; - return str; + return str } if (packet.sub_type != undefined) { - str += packet.sub_type; + str += packet.sub_type } // attachments if we have them if ([SubPacketTypes.BINARY_EVENT, SubPacketTypes.BINARY_ACK].includes(packet.sub_type)) { - str += packet.attachments + '-'; + str += packet.attachments + '-' } // if we have a namespace other than `/` // we append it followed by a comma `,` if (packet.nsp && '/' !== packet.nsp) { - str += packet.nsp + ','; + str += packet.nsp + ',' } // immediately followed by the id if (null != packet.id) { - str += packet.id; + str += packet.id } if (packet.sub_type == SubPacketTypes.EVENT) { if (packet.name == undefined) { throw new Error(`SubPacketTypes.EVENT name can't be empty!`) } @@ -32,25 +33,25 @@ export class Parser { } // json data if (null != packet.data) { - let payload = this.tryStringify(packet.data); + let payload = this.tryStringify(packet.data) if (payload !== false) { - str += payload; + str += payload } else { return '4"encode error"' } } - console.trace(`encoded ${origin} as ${str}`); - return str; + console.trace(`encoded ${origin} as ${str}`) + return str } tryStringify(str: any) { try { - return JSON.stringify(str); + return JSON.stringify(str) } catch (e) { - return false; + return false } } decode(str: string): Packet { - let i = 0; + let i = 0 // ignore parse binary // if ((frame.getByte(0) == 'b' && frame.getByte(1) == '4') // || frame.getByte(0) == 4 || frame.getByte(0) == 1) { @@ -59,69 +60,69 @@ export class Parser { // look up type let p: Packet = { type: Number(str.charAt(i)) - }; + } if (null == PacketTypes[p.type]) { - return this.error('unknown packet type ' + p.type); + return this.error('unknown packet type ' + p.type) } // if str empty return if (str.length == i + 1) { - return p; + return p } // if is ping packet read data and return if (PacketTypes.PING == p.type) { - p.data = str.substr(++i); - return p; + p.data = str.substr(++i) + return p } // look up sub type - p.sub_type = Number(str.charAt(++i)); + p.sub_type = Number(str.charAt(++i)) if (null == PacketTypes[p.sub_type]) { - return this.error('unknown sub packet type ' + p.type); + return this.error('unknown sub packet type ' + p.type) } // look up attachments if type binary if ([SubPacketTypes.BINARY_ACK, SubPacketTypes.BINARY_EVENT].includes(p.sub_type)) { - let buf = ''; + let buf = '' while (str.charAt(++i) !== '-') { - buf += str.charAt(i); - if (i == str.length) break; + buf += str.charAt(i) + if (i == str.length) break } if (buf != `${Number(buf)}` || str.charAt(i) !== '-') { - return this.error('Illegal attachments'); + return this.error('Illegal attachments') } - p.attachments = Number(buf); + p.attachments = Number(buf) } // look up namespace (if any) if ('/' === str.charAt(i + 1)) { - p.nsp = ''; + p.nsp = '' while (++i) { - let c = str.charAt(i); - if (',' === c) break; - p.nsp += c; - if (i === str.length) break; + let c = str.charAt(i) + if (',' === c) break + p.nsp += c + if (i === str.length) break } } else { - p.nsp = '/'; + p.nsp = '/' } // handle namespace query if (p.nsp.indexOf('?') !== -1) { - p.nsp = p.nsp.split('?')[0]; + p.nsp = p.nsp.split('?')[0] } // look up id - let next = str.charAt(i + 1); + let next = str.charAt(i + 1) if ('' !== next && !isNaN(Number(next))) { let id = '' while (++i) { - let c = str.charAt(i); + let c = str.charAt(i) if (null == c || isNaN(Number(c))) { - --i; - break; + --i + break } - id += str.charAt(i); - if (i === str.length) break; + id += str.charAt(i) + if (i === str.length) break } - p.id = Number(id); + p.id = Number(id) } // ignore binary packet @@ -131,25 +132,25 @@ export class Parser { // look up json data if (str.charAt(++i)) { - let payload = this.tryParse(str.substr(i)); - let isPayloadValid = payload !== false && (p.sub_type == SubPacketTypes.ERROR || Array.isArray(payload)); + let payload = this.tryParse(str.substr(i)) + let isPayloadValid = payload !== false && (p.sub_type == SubPacketTypes.ERROR || Array.isArray(payload)) if (isPayloadValid) { - p.name = payload[0]; - p.data = payload.slice(1); + p.name = payload[0] + p.data = payload.slice(1) } else { - return this.error('invalid payload ' + str.substr(i)); + return this.error('invalid payload ' + str.substr(i)) } } - console.trace(`decoded ${str} as ${JSON.stringify(p)}`); - return p; + console.trace(`decoded ${str} as ${JSON.stringify(p)}`) + return p } tryParse(str: string) { try { - return JSON.parse(str); + return JSON.parse(str) } catch (e) { - return false; + return false } } @@ -158,6 +159,6 @@ export class Parser { type: PacketTypes.MESSAGE, sub_type: SubPacketTypes.ERROR, data: 'parser error: ' + error - }; + } } -} \ No newline at end of file +} diff --git a/packages/websocket/src/socket-io/socket.ts b/packages/websocket/src/socket-io/socket.ts index 595a94e6..540a8bee 100644 --- a/packages/websocket/src/socket-io/socket.ts +++ b/packages/websocket/src/socket-io/socket.ts @@ -1,158 +1,117 @@ import { EventEmitter } from 'events' -import { SocketIO } from "./interfaces" import { Packet } from './packet' import { PacketTypes, SubPacketTypes } from './types' import { Client } from './client' import { Namespace } from './namespace' import * as querystring from 'querystring' import { ServerEvent } from './constants' +import { Adapter, BroadcastFlags, Room, SocketId } from './adapter' +import { Server } from 'index' -export class Socket extends EventEmitter implements SocketIO.Socket { +export const RESERVED_EVENTS = new Set([ + "connect", + "connect_error", + "disconnect", + "disconnecting", + // EventEmitter reserved events: https://nodejs.org/api/events.html#events_event_newlistener + "newListener", + "removeListener" +]) + +/** + * The handshake details + */ +export interface Handshake { + /** + * The headers sent as part of the handshake + */ + headers: object + + /** + * The date of creation (as string) + */ + time: string + + /** + * The ip of the client + */ + address: string + + /** + * Whether the connection is cross-domain + */ + xdomain: boolean + + /** + * Whether the connection is secure + */ + secure: boolean + + /** + * The date of creation (as unix timestamp) + */ + issued: number + + /** + * The request URL string + */ + url: string + + /** + * The query object + */ + query: any + + /** + * The auth object + */ + auth: any +} +export class Socket extends EventEmitter { nsp: Namespace - server: SocketIO.Server - adapter: SocketIO.Adapter - id: string - request: any + + public readonly id: SocketId + public readonly handshake: Handshake + + public connected: boolean + public disconnected: boolean + + private readonly server: Server + private readonly adapter: Adapter + client: Client - conn: SocketIO.EngineSocket - rooms: { [id: string]: string } - acks: { [id: string]: Function } - connected: boolean - disconnected: boolean - handshake: SocketIO.Handshake + private acks: Map void> + fns: any[] - flags: { [key: string]: boolean } - _rooms: string[] + private flags: BroadcastFlags = {}; + private _rooms: Set = new Set(); + private _anyListeners: Array<(...args: any[]) => void> private events = [ - 'error', 'connect', + "connect_error", 'disconnect', 'disconnecting', 'newListener', 'removeListener' ] - constructor(nsp: Namespace, client: Client, query = {}) { + constructor(nsp: Namespace, client: Client, auth = {}) { super() this.nsp = nsp this.server = nsp.server this.adapter = this.nsp.adapter this.id = nsp.name !== '/' ? nsp.name + '#' + client.id : client.id this.client = client - this.request = client.request - this.conn = client.conn - this.rooms = {} - this.acks = {} + this.acks = new Map() this.connected = true this.disconnected = false - this.handshake = this.buildHandshake(query) + this.handshake = this.buildHandshake(auth) + this.fns = [] this.flags = {} - this._rooms = [] - } - - get json() { - this.flags.json = true - return this - } - - get volatile() { - this.flags.volatile = true - return this - } - get broadcast() { - this.flags.broadcast = true - return this - } - get local() { - this.flags.local = true - return this - } - - to(room: string): SocketIO.Socket { - if (!~this._rooms.indexOf(room)) this._rooms.push(room) - return this - } - in(room: string): SocketIO.Socket { - return this.to(room) - } - use(fn: (packet: SocketIO.Packet, next: (err?: any) => void) => void): SocketIO.Socket { - throw new Error("Method not implemented.") - } - send(...args: any[]): SocketIO.Socket { - this.emit("message", ...args) - return this - } - write(...args: any[]): SocketIO.Socket { - return this.send(...args) - } - join(rooms: string | string[], fn?: (err?: any) => void): SocketIO.Socket { - if (!Array.isArray(rooms)) { - rooms = [rooms] - } - rooms = rooms.filter((room) => { - return !this.rooms.hasOwnProperty(room) - }) - if (!rooms.length) { - fn && fn(null) - return this - } - this.adapter.addAll(this.id, rooms, (err) => { - if (err) return fn && fn(err); - // debug('joined room %s', rooms); - (rooms as Array).forEach((room) => { - this.rooms[room] = room - }) - fn && fn(null) - }) - return this - } - leave(name: string, fn?: Function): SocketIO.Socket { - delete this.rooms[name] - fn && fn(null) - return this - } - leaveAll(): void { - this.adapter.delAll(this.id) - this.rooms = {} - } - disconnect(close?: boolean): SocketIO.Socket { - if (!this.connected) return this - if (close) { - this.client.disconnect() - } else { - this.packet({ type: PacketTypes.MESSAGE, sub_type: SubPacketTypes.DISCONNECT }) - this.onclose('server namespace disconnect') - } - return this - } - compress(compress: boolean): SocketIO.Socket { - throw new Error("Method not implemented.") - } - error(err: any): void { - this.packet({ type: PacketTypes.MESSAGE, sub_type: SubPacketTypes.ERROR, data: err }) - } - - // ========================================== - buildHandshake(query): SocketIO.Handshake { - let requestUri = this.request.uri() - let headers = {} - let nativeHeaders = this.request.headers() - nativeHeaders.forEach(function (header) { - headers[header.getKey()] = header.getValue() - }) - return { - headers: headers, - time: (new Date) + '', - address: this.conn.remoteAddress + '', - xdomain: !!headers['origin'], - secure: false, - issued: +(new Date), - url: requestUri, - query: Object.assign(query, querystring.parse(requestUri.indexOf('?') != -1 ? requestUri.split('?')[1] : '')) - } + this._rooms = new Set() } emit(event: string, ...args: any[]): boolean { if (~this.events.indexOf(event)) { @@ -169,25 +128,26 @@ export class Socket extends EventEmitter implements SocketIO.Socket { } // access last argument to see if it's an ACK callback - if (typeof args[args.length - 1] === 'function') { - if (this._rooms.length || this.flags.broadcast) { - throw new Error('Callbacks are not supported when broadcasting') + if (typeof args[args.length - 1] === "function") { + if (this._rooms.size || this.flags.broadcast) { + throw new Error("Callbacks are not supported when broadcasting") } - // debug('emitting packet with ack id %d', this.nsp.ids); - this.acks[this.nsp.ids] = args.pop() - packet.id = this.nsp.ids++ + + // console.debug("emitting packet with ack id %d", this.nsp._ids) + this.acks.set(this.nsp._ids, args.pop()) + packet.id = this.nsp._ids++ } - let rooms = this._rooms.slice(0) - let flags = Object.assign({}, this.flags) + const rooms = new Set(this._rooms) + const flags = Object.assign({}, this.flags) // reset flags - this._rooms = [] + this._rooms.clear() this.flags = {} - if (rooms.length || flags.broadcast) { + if (rooms.size || flags.broadcast) { this.adapter.broadcast(packet, { - except: [this.id], + except: new Set([this.id]), rooms: rooms, flags: flags }) @@ -195,47 +155,69 @@ export class Socket extends EventEmitter implements SocketIO.Socket { // dispatch packet this.packet(packet, flags) } - // @ts-ignore + return true + } + to(name: Room): Socket { + this._rooms.add(name) return this } - packet(packet: Packet, opts: any = { preEncoded: false }) { - if (!opts.preEncoded) { - packet.nsp = this.nsp.name - opts.compress = false !== opts.compress - } - try { - this.client.packet(packet, opts) - } catch (error) { - this.onerror(error) - } + in(room: string): Socket { + return this.to(room) } - onconnect() { - this.nsp.connected[this.id] = this - this.client.sockets[this.id] = this + use(fn: (packet: Packet, next: (err?: any) => void) => void): Socket { + throw new Error("Method not implemented.") + } + send(...args: any[]): Socket { + this.emit("message", ...args) + return this + } + write(...args: any[]): Socket { + return this.send(...args) + } + public join(rooms: Room | Array): Promise | void { + console.debug(`join room ${rooms}`) + + return this.adapter.addAll( + this.id, + new Set(Array.isArray(rooms) ? rooms : [rooms]) + ) + } + /** + * Leaves a room. + * + * @param {String} room + * @return a Promise or nothing, depending on the adapter + * @public + */ + public leave(room: string): Promise | void { + console.debug(`leave room ${room}`) + + return this.adapter.del(this.id, room) + } + + /** + * Leave all rooms. + * + * @private + */ + private leaveAll(): void { + this.adapter.delAll(this.id) + } + + /** + * Called by `Namespace` upon successful + * middleware execution (ie: authorization). + * Socket is added to namespace array before + * call to join, so adapters can access it. + * + * @private + */ + _onconnect(): void { + console.debug("socket connected - writing packet") this.join(this.id) - // let skip = this.nsp.name === '/' && this.nsp.fns.length === 0; - // if (skip) { - // debug('packet already sent in initial handshake'); - // } else { - this.packet({ - type: PacketTypes.MESSAGE, - sub_type: SubPacketTypes.CONNECT - }) - // } + this.packet({ type: PacketTypes.MESSAGE, sub_type: SubPacketTypes.CONNECT }) } - onclose(reason?: string) { - if (!this.connected) return this - // debug('closing socket - reason %s', reason); - this.emit('disconnecting', reason) - this.leaveAll() - this.nsp.remove(this) - this.client.remove(this) - this.connected = false - this.disconnected = true - delete this.nsp.connected[this.id] - this.emit('disconnect', reason) - } - onpacket(packet: Packet) { + _onpacket(packet: Packet) { switch (packet.sub_type) { // 2 case SubPacketTypes.EVENT: @@ -259,23 +241,12 @@ export class Socket extends EventEmitter implements SocketIO.Socket { break // 4 case SubPacketTypes.ERROR: - this.onerror(new Error(packet.data)) + this._onerror(new Error(packet.data)) } } - onerror(err: Error) { - if (this.listeners('error').length) { - this.emit('error', err) - } else { - console.error('Missing error handler on `socket`.') - console.error(err.stack) - } - } - ondisconnect() { - this.onclose('client namespace disconnect') - } onevent(packet: Packet) { if (null != packet.id) { - // debug('attaching ack callback to event'); + console.debug('attaching ack callback to event') this.dispatch(packet, this.ack(packet.id)) } else { this.dispatch(packet) @@ -297,11 +268,232 @@ export class Socket extends EventEmitter implements SocketIO.Socket { onack(packet: Packet) { let ack = this.acks[packet.id] if ('function' == typeof ack) { - // debug('calling ack %s with %j', packet.id, packet.data); + console.debug('calling ack %s with %j', packet.id, packet.data) ack.apply(this, packet.data) delete this.acks[packet.id] } else { - // debug('bad ack %s', packet.id); + console.debug('bad ack %s', packet.id) + } + } + /** + * Called upon client disconnect packet. + * + * @private + */ + private ondisconnect(): void { + console.debug("got disconnect packet") + this._onclose("client namespace disconnect") + } + + /** + * Handles a client error. + * + * @private + */ + _onerror(err): void { + if (this.listeners("error").length) { + super.emit("error", err) + } else { + console.error("Missing error handler on `socket`.") + console.error(err.stack) + } + } + + /** + * Called upon closing. Called by `Client`. + * + * @param {String} reason + * @throw {Error} optional error object + * + * @private + */ + _onclose(reason: string) { + console.debug(`closing socket - reason: ${reason} connected: ${this.connected}`) + if (!this.connected) return this + this.emit('disconnecting', reason) + this.leaveAll() + this.nsp._remove(this) + this.client._remove(this) + this.connected = false + this.disconnected = true + this.emit('disconnect', reason) + } + + /** + * Produces an `error` packet. + * + * @param {Object} err - error object + * + * @private + */ + _error(err) { + this.packet({ type: PacketTypes.MESSAGE, sub_type: SubPacketTypes.ERROR, data: err }) + } + disconnect(close?: boolean): Socket { + if (!this.connected) return this + if (close) { + this.client._disconnect() + } else { + this.packet({ type: PacketTypes.MESSAGE, sub_type: SubPacketTypes.DISCONNECT }) + this._onclose('server namespace disconnect') + } + return this + } + + compress(compress: boolean): Socket { + throw new Error("Method not implemented.") + } + + /** + * Sets a modifier for a subsequent event emission that the event data may be lost if the client is not ready to + * receive messages (because of network slowness or other issues, or because they’re connected through long polling + * and is in the middle of a request-response cycle). + * + * @return {Socket} self + * @public + */ + public get volatile(): Socket { + this.flags.volatile = true + return this + } + + /** + * Sets a modifier for a subsequent event emission that the event data will only be broadcast to every sockets but the + * sender. + * + * @return {Socket} self + * @public + */ + public get broadcast(): Socket { + this.flags.broadcast = true + return this + } + + /** + * Sets a modifier for a subsequent event emission that the event data will only be broadcast to the current node. + * + * @return {Socket} self + * @public + */ + public get local(): Socket { + this.flags.local = true + return this + } + + /** + * A reference to the request that originated the underlying Engine.IO Socket. + * + * @public + */ + public get request(): any { + return this.client.request + } + + /** + * A reference to the underlying Client transport connection (Engine.IO Socket object). + * + * @public + */ + public get conn() { + return this.client.conn + } + + /** + * @public + */ + public get rooms(): Set { + return this.adapter.socketRooms(this.id) || new Set() + } + + /** + * Adds a listener that will be fired when any event is emitted. The event name is passed as the first argument to the + * callback. + * + * @param listener + * @public + */ + public onAny(listener: (...args: any[]) => void): Socket { + this._anyListeners = this._anyListeners || [] + this._anyListeners.push(listener) + return this + } + + /** + * Adds a listener that will be fired when any event is emitted. The event name is passed as the first argument to the + * callback. The listener is added to the beginning of the listeners array. + * + * @param listener + * @public + */ + public prependAny(listener: (...args: any[]) => void): Socket { + this._anyListeners = this._anyListeners || [] + this._anyListeners.unshift(listener) + return this + } + + /** + * Removes the listener that will be fired when any event is emitted. + * + * @param listener + * @public + */ + public offAny(listener?: (...args: any[]) => void): Socket { + if (!this._anyListeners) { + return this + } + if (listener) { + const listeners = this._anyListeners + for (let i = 0; i < listeners.length; i++) { + if (listener === listeners[i]) { + listeners.splice(i, 1) + return this + } + } + } else { + this._anyListeners = [] + } + return this + } + + /** + * Returns an array of listeners that are listening for any event that is specified. This array can be manipulated, + * e.g. to remove listeners. + * + * @public + */ + public listenersAny() { + return this._anyListeners || [] + } + + // ========================================== + buildHandshake(auth): Handshake { + let requestUri = this.request.uri() + let headers = {} + let nativeHeaders = this.request.headers() + nativeHeaders.forEach(function (header) { + headers[header.getKey()] = header.getValue() + }) + return { + headers: headers, + time: new Date() + '', + address: this.conn.remoteAddress + '', + xdomain: !!headers['origin'], + secure: false, + issued: +new Date(), + url: requestUri, + query: querystring.parse(requestUri.indexOf('?') != -1 ? requestUri.split('?')[1] : ''), + auth + } + } + packet(packet: Packet, opts: any = { preEncoded: false }) { + if (!opts.preEncoded) { + packet.nsp = this.nsp.name + opts.compress = false !== opts.compress + } + try { + this.client._packet(packet, opts) + } catch (error) { + this._onerror(error) } } dispatch(packet: Packet, ack?: Function) { diff --git a/packages/websocket/src/tomcat/client.ts b/packages/websocket/src/tomcat/client.ts index 4d516880..3bbfae14 100644 --- a/packages/websocket/src/tomcat/client.ts +++ b/packages/websocket/src/tomcat/client.ts @@ -1,7 +1,7 @@ import { EventEmitter } from 'events' -import { SocketIO } from '../socket-io/interfaces' +import { InnerClient } from '../interfaces' -export class TomcatClient extends EventEmitter implements SocketIO.EngineSocket { +export class TomcatClient extends EventEmitter implements InnerClient { private _id: string private session: javax.websocket.Session @@ -10,7 +10,6 @@ export class TomcatClient extends EventEmitter implements SocketIO.EngineSocket remoteAddress: string upgraded: boolean request: any - transport: any constructor(server: any, session: javax.websocket.Session) { super() @@ -22,7 +21,6 @@ export class TomcatClient extends EventEmitter implements SocketIO.EngineSocket uri: () => `${session.getRequestURI()}`, headers: () => [] } - this.transport = null this.session = session this._id = session.getId() + '' @@ -34,6 +32,8 @@ export class TomcatClient extends EventEmitter implements SocketIO.EngineSocket send(text: string) { if (this.readyState == 'open') { Java.synchronized(() => this.session.getBasicRemote().sendText(text), this.session)() + } else { + console.debug(`send message ${text} to close client ${this._id}`) } } close() { diff --git a/packages/websocket/src/tomcat/server.ts b/packages/websocket/src/tomcat/index.ts similarity index 96% rename from packages/websocket/src/tomcat/server.ts rename to packages/websocket/src/tomcat/index.ts index adce7ca3..7b855d49 100644 --- a/packages/websocket/src/tomcat/server.ts +++ b/packages/websocket/src/tomcat/index.ts @@ -2,7 +2,6 @@ import { EventEmitter } from 'events' import { ServerOptions } from '../socket-io' import { ServerEvent } from '../socket-io/constants' -import { SocketIO } from '../socket-io/interfaces' import { ProxyBeanName } from './constants' import { TomcatClient } from './client' @@ -15,7 +14,7 @@ type TomcatWebSocketSession = javax.websocket.Session class TomcatWebSocketServer extends EventEmitter { private beanFactory: any private executor: any - private clients: Map + private clients: Map constructor(beanFactory: any, options: ServerOptions) { super()