From 2fcadeda4e2e921218996a11b5bb6d4268b2de83 Mon Sep 17 00:00:00 2001 From: MiaoWoo Date: Sat, 21 Mar 2020 15:47:42 +0800 Subject: [PATCH] feat: complate socket.io base framework Signed-off-by: MiaoWoo --- packages/websocket/package.json | 4 +- packages/websocket/src/index.ts | 2 + packages/websocket/src/netty/httprequest.ts | 11 +- packages/websocket/src/netty/index.ts | 3 + .../src/netty/text_websocket_frame.ts | 9 +- packages/websocket/src/netty/websocket.ts | 11 +- packages/websocket/src/server/client.ts | 27 + packages/websocket/src/server/constants.ts | 13 + packages/websocket/src/server/httprequest.ts | 66 ++ packages/websocket/src/server/index.ts | 53 ++ .../src/server/text_websocket_frame.ts | 19 + .../websocket/src/server/websocket_detect.ts | 14 + .../websocket/src/server/websocket_handler.ts | 42 + packages/websocket/src/socket-io/client.ts | 84 ++ packages/websocket/src/socket-io/index.ts | 118 +++ .../websocket/src/socket-io/interfaces.ts | 824 ++++++++++++++++++ packages/websocket/src/socket-io/namespace.ts | 96 ++ packages/websocket/src/socket-io/packet.ts | 11 + packages/websocket/src/socket-io/parser.ts | 160 ++++ packages/websocket/src/socket-io/types.ts | 18 + 20 files changed, 1569 insertions(+), 16 deletions(-) create mode 100644 packages/websocket/src/netty/index.ts create mode 100644 packages/websocket/src/server/client.ts create mode 100644 packages/websocket/src/server/constants.ts create mode 100644 packages/websocket/src/server/httprequest.ts create mode 100644 packages/websocket/src/server/index.ts create mode 100644 packages/websocket/src/server/text_websocket_frame.ts create mode 100644 packages/websocket/src/server/websocket_detect.ts create mode 100644 packages/websocket/src/server/websocket_handler.ts create mode 100644 packages/websocket/src/socket-io/client.ts create mode 100644 packages/websocket/src/socket-io/index.ts create mode 100644 packages/websocket/src/socket-io/interfaces.ts create mode 100644 packages/websocket/src/socket-io/namespace.ts create mode 100644 packages/websocket/src/socket-io/packet.ts create mode 100644 packages/websocket/src/socket-io/parser.ts create mode 100644 packages/websocket/src/socket-io/types.ts diff --git a/packages/websocket/package.json b/packages/websocket/package.json index b537bef7..5f3e2328 100644 --- a/packages/websocket/package.json +++ b/packages/websocket/package.json @@ -27,7 +27,9 @@ "typescript": "^3.8.3" }, "dependencies": { - "@ms/nashorn": "^0.3.1" + "@ms/container": "^0.3.2", + "@ms/nashorn": "^0.3.1", + "socket.io-parser": "^3.4.0" }, "gitHead": "781524f83e52cad26d7c480513e3c525df867121" } diff --git a/packages/websocket/src/index.ts b/packages/websocket/src/index.ts index 19d4efa2..3e400609 100644 --- a/packages/websocket/src/index.ts +++ b/packages/websocket/src/index.ts @@ -1 +1,3 @@ /// + +export * from './socket-io' \ No newline at end of file diff --git a/packages/websocket/src/netty/httprequest.ts b/packages/websocket/src/netty/httprequest.ts index 7942d180..b6523605 100644 --- a/packages/websocket/src/netty/httprequest.ts +++ b/packages/websocket/src/netty/httprequest.ts @@ -2,18 +2,19 @@ const TypeParameterMatcher = Java.type('io.netty.util.internal.TypeParameterMatc const SimpleChannelInboundHandler = Java.type('io.netty.channel.SimpleChannelInboundHandler') const FullHttpRequestMatcher = TypeParameterMatcher.get(base.getClass('io.netty.handler.codec.http.FullHttpRequest')) -export default abstract class HttpRequestHandlerAdapter { +export abstract class HttpRequestHandlerAdapter { private _Handler; constructor() { - this._Handler == Java.extend(SimpleChannelInboundHandler, { + let HttpRequestHandlerAdapterImpl = Java.extend(SimpleChannelInboundHandler, { acceptInboundMessage: (msg: any) => { return FullHttpRequestMatcher.match(msg) }, - channelRead0: this.channelRead0 + channelRead0: this.channelRead0.bind(this) }) + this._Handler = new HttpRequestHandlerAdapterImpl(); } - abstract channelRead0(ctx: any, msg: any); + abstract channelRead0(ctx: any, request: any); getHandler() { return this._Handler; } -} \ No newline at end of file +} diff --git a/packages/websocket/src/netty/index.ts b/packages/websocket/src/netty/index.ts new file mode 100644 index 00000000..366082f5 --- /dev/null +++ b/packages/websocket/src/netty/index.ts @@ -0,0 +1,3 @@ +export * from './text_websocket_frame' +export * from './websocket' +export * from './httprequest' \ No newline at end of file diff --git a/packages/websocket/src/netty/text_websocket_frame.ts b/packages/websocket/src/netty/text_websocket_frame.ts index c7760a28..27c29ddc 100644 --- a/packages/websocket/src/netty/text_websocket_frame.ts +++ b/packages/websocket/src/netty/text_websocket_frame.ts @@ -2,16 +2,17 @@ const TypeParameterMatcher = Java.type('io.netty.util.internal.TypeParameterMatc const TextWebSocketFrameMatcher = TypeParameterMatcher.get(base.getClass('io.netty.handler.codec.http.websocketx.TextWebSocketFrame')) const SimpleChannelInboundHandler = Java.type('io.netty.channel.SimpleChannelInboundHandler') -export default abstract class TextWebSocketFrameHandlerAdapter { +export abstract class TextWebSocketFrameHandlerAdapter { private _Handler; constructor() { - this._Handler == Java.extend(SimpleChannelInboundHandler, { - userEventTriggered: this.userEventTriggered, + let TextWebSocketFrameHandlerAdapterImpl = Java.extend(SimpleChannelInboundHandler, { + userEventTriggered: this.userEventTriggered.bind(this), acceptInboundMessage: (msg: any) => { return TextWebSocketFrameMatcher.match(msg) }, - channelRead0: this.channelRead0 + channelRead0: this.channelRead0.bind(this) }) + this._Handler = new TextWebSocketFrameHandlerAdapterImpl(); } abstract userEventTriggered(ctx: any, evt: any); abstract channelRead0(ctx: any, msg: any); diff --git a/packages/websocket/src/netty/websocket.ts b/packages/websocket/src/netty/websocket.ts index 53692c4d..aae1b61e 100644 --- a/packages/websocket/src/netty/websocket.ts +++ b/packages/websocket/src/netty/websocket.ts @@ -1,15 +1,14 @@ -const MiaoWebSocket = 'miaowebsocket' -const CharsetUtil = Java.type('io.netty.util.CharsetUtil') const ChannelInboundHandlerAdapter = Java.type('io.netty.channel.ChannelInboundHandlerAdapter') -export default abstract class WebSocketHandlerAdapter { +export abstract class WebSocketHandlerAdapter { private _Handler; constructor() { - this._Handler = Java.extend(ChannelInboundHandlerAdapter, { - channelRead: this.channelRead + let ChannelInboundHandlerAdapterImpl = Java.extend(ChannelInboundHandlerAdapter, { + channelRead: this.channelRead.bind(this) }) + this._Handler = new ChannelInboundHandlerAdapterImpl() } - abstract channelRead(ctx: any, msg: any); + abstract channelRead(ctx: any, channel: any); getHandler() { return this._Handler; } diff --git a/packages/websocket/src/server/client.ts b/packages/websocket/src/server/client.ts new file mode 100644 index 00000000..e9742db9 --- /dev/null +++ b/packages/websocket/src/server/client.ts @@ -0,0 +1,27 @@ +import { EventEmitter } from 'events' + +const TextWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.TextWebSocketFrame') + +export class NettyClient { + private event: EventEmitter + + private _id: string; + private channel: any + constructor(channel: any) { + this.channel = channel; + this._id = channel.id(); + this.event = new EventEmitter(); + } + get id() { + return this._id; + } + on(event: string, callback: (...args: any[]) => void) { + this.event.on(event, callback); + } + emit(event: string, text: string) { + this.event.emit(event, text); + } + send(text: string) { + this.channel.writeAndFlush(new TextWebSocketFrame(text)) + } +} diff --git a/packages/websocket/src/server/constants.ts b/packages/websocket/src/server/constants.ts new file mode 100644 index 00000000..fd278e21 --- /dev/null +++ b/packages/websocket/src/server/constants.ts @@ -0,0 +1,13 @@ +export enum ServerEvent { + detect = 'detect', + connect = 'connect', + connection = 'connection', + message = 'message', + disconnect = 'disconnect' +} + +export enum Keys { + Detect = "miao_detect", + Handler = "miaowebsocket", + Default = "DefaultChannelPipeline" +} diff --git a/packages/websocket/src/server/httprequest.ts b/packages/websocket/src/server/httprequest.ts new file mode 100644 index 00000000..141ef8c2 --- /dev/null +++ b/packages/websocket/src/server/httprequest.ts @@ -0,0 +1,66 @@ +import { HttpRequestHandlerAdapter } from '../netty' + +const DefaultHttpResponse = Java.type('io.netty.handler.codec.http.DefaultHttpResponse') +const DefaultFullHttpResponse = Java.type('io.netty.handler.codec.http.DefaultFullHttpResponse') +const HttpHeaders = Java.type('io.netty.handler.codec.http.HttpHeaders') +const HttpVersion = Java.type('io.netty.handler.codec.http.HttpVersion') +const HttpResponseStatus = Java.type('io.netty.handler.codec.http.HttpResponseStatus') +const LastHttpContent = Java.type('io.netty.handler.codec.http.LastHttpContent') + +const File = Java.type('java.io.File') +const Runnable = Java.type('java.lang.Runnable') +const RandomAccessFile = Java.type('java.io.RandomAccessFile') +const DefaultFileRegion = Java.type('io.netty.channel.DefaultFileRegion') +const ChannelFutureListener = Java.type('io.netty.channel.ChannelFutureListener') + +export type HttpRequestConfig = { + root?: string; + ws?: string; +} + +export class HttpRequestHandler extends HttpRequestHandlerAdapter { + private ws: string; + private root: string; + constructor(config: HttpRequestConfig = { + root: root + '/wwwroot', + ws: '/ws' + }) { + super() + this.root = config.root; + this.ws = config.ws; + } + channelRead0(ctx: any, request: any) { + if (request.getUri().startsWith(this.ws)) { + ctx.fireChannelRead(request.retain()) + } else { + ctx.executor().execute(new Runnable({ + run: () => { + if (HttpHeaders.is100ContinueExpected(request)) { + ctx.writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE)) + } + let filename = request.getUri().split('?')[0].substr(1) + let file = new File(this.root, filename || 'index.html') + if (!file.exists() || !file.isFile()) { + ctx.write(new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.NOT_FOUND)) + ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(ChannelFutureListener.CLOSE) + return + } + let response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK) + response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html charset=UTF-8") + let raf = new RandomAccessFile(file, 'r') + let keepAlive = HttpHeaders.isKeepAlive(request) + if (keepAlive) { + response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length()) + response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE) + } + ctx.write(response) + ctx.write(new DefaultFileRegion(raf.getChannel(), 0, raf.length())) + let future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT) + if (!keepAlive) { + future.addListener(ChannelFutureListener.CLOSE) + } + } + })) + } + } +} diff --git a/packages/websocket/src/server/index.ts b/packages/websocket/src/server/index.ts new file mode 100644 index 00000000..f0ad5b22 --- /dev/null +++ b/packages/websocket/src/server/index.ts @@ -0,0 +1,53 @@ +import { EventEmitter } from 'events' + +import { ServerEvent, Keys } from './constants' +import { WebSocketDetect } from './websocket_detect' +import { WebSocketHandler } from './websocket_handler' +import { NettyClient } from './client' + +interface NettyWebSocketServerOptions { + path?: string; +} + +class NettyWebSocketServer { + private event: EventEmitter + private pipeline: any; + private allClients: { [key: string]: NettyClient }; + + constructor(pipeline: any, options: NettyWebSocketServerOptions) { + this.event = new EventEmitter(); + this.allClients = {}; + this.pipeline = pipeline; + let connectEvent = new EventEmitter(); + this.pipeline.addFirst(Keys.Detect, new WebSocketDetect(connectEvent).getHandler()) + connectEvent.on(ServerEvent.detect, (ctx, channel) => { + channel.pipeline().addFirst(Keys.Handler, new WebSocketHandler(connectEvent).getHandler()) + ctx.fireChannelRead(channel) + }) + connectEvent.on(ServerEvent.connect, (ctx) => { + let nettyClient = new NettyClient(ctx.channel()); + this.allClients[nettyClient.id] = nettyClient; + this.event.emit(ServerEvent.connect, nettyClient); + }) + connectEvent.on(ServerEvent.message, (ctx, msg) => { + let channel = ctx.channel(); + this.allClients[channel.id()]?.emit(ServerEvent.message, msg.text()) + }) + } + + disable() { + if (this.pipeline.names().contains(Keys.Detect)) { + this.pipeline.remove(Keys.Detect) + } + } + + on(event: string, listener: (...args: any[]) => void) { + this.event.on(event, listener) + } +} + +export { + NettyWebSocketServer, + ServerEvent, + NettyClient +}; \ No newline at end of file diff --git a/packages/websocket/src/server/text_websocket_frame.ts b/packages/websocket/src/server/text_websocket_frame.ts new file mode 100644 index 00000000..31429edc --- /dev/null +++ b/packages/websocket/src/server/text_websocket_frame.ts @@ -0,0 +1,19 @@ +import { TextWebSocketFrameHandlerAdapter } from '../netty' +import { EventEmitter } from 'events' +import { ServerEvent } from './constants' + +export class TextWebSocketFrameHandler extends TextWebSocketFrameHandlerAdapter { + private event: EventEmitter; + constructor(event: EventEmitter) { + super() + this.event = event; + } + userEventTriggered(ctx: any, evt: any) { + if (evt == 'HANDSHAKE_COMPLETE') { + this.event.emit(ServerEvent.connect, ctx) + } + } + channelRead0(ctx: any, msg: any) { + this.event.emit(ServerEvent.message, ctx, msg) + } +} diff --git a/packages/websocket/src/server/websocket_detect.ts b/packages/websocket/src/server/websocket_detect.ts new file mode 100644 index 00000000..620be384 --- /dev/null +++ b/packages/websocket/src/server/websocket_detect.ts @@ -0,0 +1,14 @@ +import { EventEmitter } from 'events' +import { WebSocketHandlerAdapter } from "../netty" +import { ServerEvent } from './constants' + +export class WebSocketDetect extends WebSocketHandlerAdapter { + private event: EventEmitter; + constructor(event: EventEmitter) { + super() + this.event = event; + } + channelRead(ctx: any, channel: any) { + this.event.emit(ServerEvent.detect, ctx, channel); + } +} diff --git a/packages/websocket/src/server/websocket_handler.ts b/packages/websocket/src/server/websocket_handler.ts new file mode 100644 index 00000000..c3ea273a --- /dev/null +++ b/packages/websocket/src/server/websocket_handler.ts @@ -0,0 +1,42 @@ +import { EventEmitter } from 'events' + +import { Keys } from './constants' +import { WebSocketHandlerAdapter } from "../netty" +import { HttpRequestHandler } from './httprequest' +import { TextWebSocketFrameHandler } from './text_websocket_frame' + +const CharsetUtil = Java.type('io.netty.util.CharsetUtil') + +const HttpServerCodec = Java.type('io.netty.handler.codec.http.HttpServerCodec') +const ChunkedWriteHandler = Java.type('io.netty.handler.stream.ChunkedWriteHandler') +const HttpObjectAggregator = Java.type('io.netty.handler.codec.http.HttpObjectAggregator') +const WebSocketServerProtocolHandler = Java.type('io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler') + +export class WebSocketHandler extends WebSocketHandlerAdapter { + private event: EventEmitter; + constructor(event: EventEmitter) { + super() + this.event = event; + } + channelRead(ctx: any, msg: any) { + msg.markReaderIndex() + let message: string = msg.toString(CharsetUtil.UTF_8) + let channel = ctx.channel() + let pipeline = channel.pipeline() + if (message.indexOf('HTTP/1.1') > 0) { + pipeline.names().forEach(f => { + if (f == Keys.Handler || f.indexOf(Keys.Default) > -1) { return } + pipeline.remove(f) + }) + pipeline.addLast('http', new HttpServerCodec()) + pipeline.addLast('chunk', new ChunkedWriteHandler()) + pipeline.addLast('httpobj', new HttpObjectAggregator(64 * 1024)) + pipeline.addLast('http_request', new HttpRequestHandler().getHandler()) + pipeline.addLast('websocket', new WebSocketServerProtocolHandler("/ws", true)) + pipeline.addLast('websocket_handler', new TextWebSocketFrameHandler(this.event).getHandler()) + } + pipeline.remove(Keys.Handler) + msg.resetReaderIndex() + ctx.fireChannelRead(msg) + } +} diff --git a/packages/websocket/src/socket-io/client.ts b/packages/websocket/src/socket-io/client.ts new file mode 100644 index 00000000..7b85de70 --- /dev/null +++ b/packages/websocket/src/socket-io/client.ts @@ -0,0 +1,84 @@ +import { EventEmitter } from 'events' +import { Parser } from './parser' +import { Packet } from './packet'; +import { PacketTypes, SubPacketTypes } from './types'; +import { ServerEvent, NettyClient } from '../server'; +import { SocketIO } from './interfaces' + +const parser = new Parser(); + +export class SocketIOClient implements SocketIO.Client { + private nettyClient: NettyClient; + private event: EventEmitter + private _id: string; + + server: SocketIO.Server; + conn: SocketIO.EngineSocket; + request: any; + sockets: { [id: string]: SocketIO.Socket; }; + nsps: { [nsp: string]: SocketIO.Socket; }; + + constructor(nettyClient: NettyClient) { + this.event = new EventEmitter() + this.nettyClient = nettyClient; + this._id = this.nettyClient.id; + this.event.emit('connect', this); + this.nettyClient.on(ServerEvent.message, (text) => this.process(text)) + } + + get id() { + return this._id; + } + connect() { + this.packet({ + type: PacketTypes.OPEN, + data: { + sid: this._id, + upgrades: [], + pingInterval: 25000, + pingTimeout: 5000 + } + }) + this.packet({ + type: PacketTypes.MESSAGE, + sub_type: SubPacketTypes.CONNECT + }) + } + emit(event: string, data: any) { + this.packet({ + type: PacketTypes.MESSAGE, + sub_type: SubPacketTypes.EVENT, + name: event, + data + }) + } + send(data: any) { + this.emit("message", data); + } + packet(packet: Packet) { + this.nettyClient.send(parser.encode(packet)) + } + + private process(text: string) { + let packet = parser.decode(text); + switch (packet.type) { + case PacketTypes.OPEN: + break; + case PacketTypes.PING: + this.packet({ + type: PacketTypes.PONG + }) + break; + case PacketTypes.MESSAGE: + switch (packet.sub_type) { + case SubPacketTypes.CONNECT: + this.nettyClient.send(text); + break; + case SubPacketTypes.EVENT: + this.event.emit(packet.name, packet.data); + break; + } + break; + } + } +} diff --git a/packages/websocket/src/socket-io/index.ts b/packages/websocket/src/socket-io/index.ts new file mode 100644 index 00000000..0b1a7cc5 --- /dev/null +++ b/packages/websocket/src/socket-io/index.ts @@ -0,0 +1,118 @@ +import { EventEmitter } from 'events' + +import { NettyWebSocketServer } from '../server' +import { ServerEvent } from '../server/constants'; + +import { Namespace } from './namespace'; +import { SocketIOClient } from './client'; +import { SocketIO } from './interfaces' + +export class SocketIOServer implements SocketIO.Server { + private event: EventEmitter; + private nettyServer: NettyWebSocketServer; + private allClients: { [key: string]: SocketIOClient }; + private namespaces: { [key: string]: Namespace }; + + engine: { ws: any; }; + nsps: { [namespace: string]: SocketIO.Namespace; }; + sockets: SocketIO.Namespace; + json: SocketIO.Server; + volatile: SocketIO.Server; + local: SocketIO.Server; + + constructor(pipeline: any, options: SocketIO.ServerOptions) { + if (!pipeline) { throw new Error('Netty Pipeline can\'t be undefiend!') } + this.event = new EventEmitter(); + this.allClients = {}; + this.namespaces = {}; + this.nettyServer = new NettyWebSocketServer(pipeline, { + + }); + this.nettyServer.on(ServerEvent.connect, (nettyClient) => { + let client = new SocketIOClient(nettyClient); + this.allClients[client.id] = client; + client.connect(); + }) + } + + checkRequest(req: any, fn: (err: any, success: boolean) => void): void { + throw new Error("Method not implemented."); + } + serveClient(): boolean; + serveClient(v: boolean): SocketIO.Server; + serveClient(v?: any): boolean | SocketIO.Server { + throw new Error("Method not implemented."); + } + path(): string; + path(v: string): SocketIO.Server; + path(v?: any): string | SocketIO.Server { + return this; + } + adapter(); + adapter(v: any): SocketIO.Server; + adapter(v?: any): SocketIO.Server { + throw new Error("Method not implemented."); + } + origins(): string | string[]; + origins(v: string | string[]): SocketIO.Server; + origins(fn: (origin: string, callback: (error: string, success: boolean) => void) => void): SocketIO.Server; + origins(fn?: any): string | string[] | SocketIO.Server { + throw new Error("Method not implemented."); + } + attach(srv: any, opts?: SocketIO.ServerOptions): SocketIO.Server; + attach(port: number, opts?: SocketIO.ServerOptions): SocketIO.Server; + attach(port: any, opts?: any): SocketIO.Server { + throw new Error("Method not implemented."); + } + listen(srv: any, opts?: SocketIO.ServerOptions): SocketIO.Server; + listen(port: number, opts?: SocketIO.ServerOptions): SocketIO.Server; + listen(port: any, opts?: any): SocketIO.Server { + throw new Error("Method not implemented."); + } + bind(srv: any): SocketIO.Server { + throw new Error("Method not implemented."); + } + onconnection(socket: any): SocketIO.Server { + throw new Error("Method not implemented."); + } + of(nsp: string | Function | RegExp): SocketIO.Namespace { + throw new Error("Method not implemented."); + } + close(fn?: () => void): void { + throw new Error("Method not implemented."); + } + on(event: "connection", listener: (socket: SocketIO.Socket) => void): SocketIO.Namespace; + on(event: "connect", listener: (socket: SocketIO.Socket) => void): SocketIO.Namespace; + on(event: string, listener: Function): SocketIO.Namespace; + on(event: any, listener: any): SocketIO.Namespace { + throw new Error("Method not implemented."); + } + to(room: string): SocketIO.Namespace { + throw new Error("Method not implemented."); + } + in(room: string): SocketIO.Namespace { + throw new Error("Method not implemented."); + } + use(fn: (socket: SocketIO.Socket, fn: (err?: any) => void) => void): SocketIO.Namespace { + throw new Error("Method not implemented."); + } + emit(event: string, ...args: any[]): SocketIO.Namespace { + throw new Error("Method not implemented."); + } + send(...args: any[]): SocketIO.Namespace { + throw new Error("Method not implemented."); + } + write(...args: any[]): SocketIO.Namespace { + throw new Error("Method not implemented."); + } + clients(...args: any[]): SocketIO.Namespace { + throw new Error("Method not implemented."); + } + compress(...args: any[]): SocketIO.Namespace { + throw new Error("Method not implemented."); + } + + disable() { + this.nettyServer.disable(); + } +} \ No newline at end of file diff --git a/packages/websocket/src/socket-io/interfaces.ts b/packages/websocket/src/socket-io/interfaces.ts new file mode 100644 index 00000000..c509cfc8 --- /dev/null +++ b/packages/websocket/src/socket-io/interfaces.ts @@ -0,0 +1,824 @@ +export declare namespace SocketIO { + interface Server { + engine: { ws: any }; + + /** + * A dictionary of all the namespaces currently on this Server + */ + nsps: { [namespace: string]: Namespace }; + + /** + * The default '/' Namespace + */ + sockets: Namespace; + + /** + * Sets the 'json' flag when emitting an event + */ + json: Server; + + /** + * Sets a modifier for a subsequent event emission that the event data may be lost if the clients are not ready to receive messages + */ + volatile: Server; + + /** + * Sets a modifier for a subsequent event emission that the event data will only be broadcast to the current node + */ + local: Server; + + /** + * Server request verification function, that checks for allowed origins + * @param req The http.IncomingMessage request + * @param fn The callback to be called. It should take one parameter, err, + * which will be null if there was no problem, and one parameter, success, + * of type boolean + */ + checkRequest(req: any, fn: (err: any, success: boolean) => void): void; + + /** + * Gets whether we're serving the client.js file or not + * @default true + */ + serveClient(): boolean; + + /** + * Sets whether we're serving the client.js file or not + * @param v True if we want to serve the file, false otherwise + * @default true + * @return This Server + */ + serveClient(v: boolean): Server; + + /** + * Gets the client serving path + * @default '/socket.io' + */ + path(): string; + + /** + * Sets the client serving path + * @param v The path to serve the client file on + * @default '/socket.io' + * @return This Server + */ + path(v: string): Server; + + /** + * Gets the adapter that we're going to use for handling rooms + * @default typeof Adapter + */ + adapter(): any; + + /** + * Sets the adapter (class) that we're going to use for handling rooms + * @param v The class for the adapter to create + * @default typeof Adapter + * @return This Server + */ + adapter(v: any): Server; + + /** + * Gets the allowed origins for requests + * @default "*:*" + */ + origins(): string | string[]; + + /** + * Sets the allowed origins for requests + * @param v The allowed origins, in host:port form + * @default "*:*" + * return This Server + */ + origins(v: string | string[]): Server; + + /** + * Provides a function taking two arguments origin:String + * and callback(error, success), where success is a boolean + * value indicating whether origin is allowed or not. If + * success is set to false, error must be provided as a string + * value that will be appended to the server response, e.g. “Origin not allowed”. + * @param fn The function that will be called to check the origin + * return This Server + */ + origins(fn: (origin: string, callback: (error: string | null, success: boolean) => void) => void): Server; + + /** + * Attaches socket.io to a server + * @param srv The http.Server that we want to attach to + * @param opts An optional parameters object + * @return This Server + */ + attach(srv: any, opts?: ServerOptions): Server; + + /** + * Attaches socket.io to a port + * @param port The port that we want to attach to + * @param opts An optional parameters object + * @return This Server + */ + attach(port: number, opts?: ServerOptions): Server; + + /** + * @see attach( srv, opts ) + */ + listen(srv: any, opts?: ServerOptions): Server; + + /** + * @see attach( port, opts ) + */ + listen(port: number, opts?: ServerOptions): Server; + + /** + * Binds socket.io to an engine.io instance + * @param src The Engine.io (or compatible) server to bind to + * @return This Server + */ + bind(srv: any): Server; + + /** + * Called with each incoming connection + * @param socket The Engine.io Socket + * @return This Server + */ + onconnection(socket: any): Server; + + /** + * Looks up/creates a Namespace + * @param nsp The name of the NameSpace to look up/create. Should start + * with a '/' + * @return The Namespace + */ + of(nsp: string | RegExp | Function): Namespace; + + /** + * Closes the server connection + */ + close(fn?: () => void): void; + + /** + * The event fired when we get a new connection + * @param event The event being fired: 'connection' + * @param listener A listener that should take one parameter of type Socket + * @return The default '/' Namespace + */ + on(event: 'connection', listener: (socket: Socket) => void): Namespace; + + /** + * @see on( 'connection', listener ) + */ + on(event: 'connect', listener: (socket: Socket) => void): Namespace; + + /** + * Base 'on' method to add a listener for an event + * @param event The event that we want to add a listener for + * @param listener The callback to call when we get the event. The parameters + * for the callback depend on the event + * @return The default '/' Namespace + */ + on(event: string, listener: Function): Namespace; + + /** + * Targets a room when emitting to the default '/' Namespace + * @param room The name of the room that we're targeting + * @return The default '/' Namespace + */ + to(room: string): Namespace; + + /** + * @see to( room ) + */ + in(room: string): Namespace; + + /** + * Registers a middleware function, which is a function that gets executed + * for every incoming Socket, on the default '/' Namespace + * @param fn The function to call when we get a new incoming socket. It should + * take one parameter of type Socket, and one callback function to call to + * execute the next middleware function. The callback can take one optional + * parameter, err, if there was an error. Errors passed to middleware callbacks + * are sent as special 'error' packets to clients + * @return The default '/' Namespace + */ + use(fn: (socket: Socket, fn: (err?: any) => void) => void): Namespace; + + /** + * Emits an event to the default Namespace + * @param event The event that we want to emit + * @param args Any number of optional arguments to pass with the event. If the + * last argument is a function, it will be called as an ack. The ack should + * take whatever data was sent with the packet + * @return The default '/' Namespace + */ + emit(event: string, ...args: any[]): Namespace; + + /** + * Sends a 'message' event + * @see emit( event, ...args ) + * @return The default '/' Namespace + */ + send(...args: any[]): Namespace; + + /** + * @see send( ...args ) + */ + write(...args: any[]): Namespace; + + /** + * Gets a list of clients + * @return The default '/' Namespace + */ + clients(...args: any[]): Namespace; + + /** + * Sets the compress flag + * @return The default '/' Namespace + */ + compress(...args: any[]): Namespace; + } + + /** + * Options to pass to our server when creating it + */ + interface ServerOptions { + + /** + * The path to server the client file to + * @default '/socket.io' + */ + path?: string; + + /** + * Should we serve the client file? + * @default true + */ + serveClient?: boolean; + + /** + * The adapter to use for handling rooms. NOTE: this should be a class, + * not an object + * @default typeof Adapter + */ + adapter?: Adapter; + + /** + * Accepted origins + * @default '*:*' + */ + origins?: string | string[]; + + /** + * How many milliseconds without a pong packed to consider the connection closed (engine.io) + * @default 60000 + */ + pingTimeout?: number; + + /** + * How many milliseconds before sending a new ping packet (keep-alive) (engine.io) + * @default 25000 + */ + pingInterval?: number; + + /** + * How many bytes or characters a message can be when polling, before closing the session + * (to avoid Dos) (engine.io) + * @default 10E7 + */ + maxHttpBufferSize?: number; + + /** + * A function that receives a given handshake or upgrade request as its first parameter, + * and can decide whether to continue or not. The second argument is a function that needs + * to be called with the decided information: fn( err, success ), where success is a boolean + * value where false means that the request is rejected, and err is an error code (engine.io) + * @default null + */ + allowRequest?: (request: any, callback: (err: number, success: boolean) => void) => void; + + /** + * Transports to allow connections to (engine.io) + * @default ['polling','websocket'] + */ + transports?: string[]; + + /** + * Whether to allow transport upgrades (engine.io) + * @default true + */ + allowUpgrades?: boolean; + + /** + * parameters of the WebSocket permessage-deflate extension (see ws module). + * Set to false to disable (engine.io) + * @default true + */ + perMessageDeflate?: Object | boolean; + + /** + * Parameters of the http compression for the polling transports (see zlib). + * Set to false to disable, or set an object with parameter "threshold:number" + * to only compress data if the byte size is above this value (1024) (engine.io) + * @default true|1024 + */ + httpCompression?: Object | boolean; + + /** + * Name of the HTTP cookie that contains the client sid to send as part of + * handshake response headers. Set to false to not send one (engine.io) + * @default "io" + */ + cookie?: string | boolean; + + /** + * Whether to let engine.io handle the OPTIONS requests. + * You can also pass a custom function to handle the requests + * @default true + */ + handlePreflightRequest?: ((req: any, res: any) => void) | boolean; + } + + /** + * The Namespace, sandboxed environments for sockets, each connection + * to a Namespace requires a new Socket + */ + interface Namespace extends NodeJS.EventEmitter { + + /** + * The name of the NameSpace + */ + name: string; + + /** + * The controller Server for this Namespace + */ + server: Server; + + /** + * A dictionary of all the Sockets connected to this Namespace, where + * the Socket ID is the key + */ + sockets: { [id: string]: Socket }; + + /** + * A dictionary of all the Sockets connected to this Namespace, where + * the Socket ID is the key + */ + connected: { [id: string]: Socket }; + + /** + * The Adapter that we're using to handle dealing with rooms etc + */ + adapter: Adapter; + + /** + * Sets the 'json' flag when emitting an event + */ + json: Namespace; + + /** + * Registers a middleware function, which is a function that gets executed + * for every incoming Socket + * @param fn The function to call when we get a new incoming socket. It should + * take one parameter of type Socket, and one callback function to call to + * execute the next middleware function. The callback can take one optional + * parameter, err, if there was an error. Errors passed to middleware callbacks + * are sent as special 'error' packets to clients + * @return This Namespace + */ + use(fn: (socket: Socket, fn: (err?: any) => void) => void): Namespace; + + /** + * Targets a room when emitting + * @param room The name of the room that we're targeting + * @return This Namespace + */ + to(room: string): Namespace; + + /** + * @see to( room ) + */ + in(room: string): Namespace; + + /** + * Sends a 'message' event + * @see emit( event, ...args ) + * @return This Namespace + */ + send(...args: any[]): Namespace; + + /** + * @see send( ...args ) + */ + write(...args: any[]): Namespace; + + /** + * The event fired when we get a new connection + * @param event The event being fired: 'connection' + * @param listener A listener that should take one parameter of type Socket + * @return This Namespace + */ + on(event: 'connection', listener: (socket: Socket) => void): this; + + /** + * @see on( 'connection', listener ) + */ + on(event: 'connect', listener: (socket: Socket) => void): this; + + /** + * Base 'on' method to add a listener for an event + * @param event The event that we want to add a listener for + * @param listener The callback to call when we get the event. The parameters + * for the callback depend on the event + * @ This Namespace + */ + on(event: string, listener: Function): this; + + /** + * Gets a list of clients. + * @return This Namespace + */ + clients(fn: Function): Namespace; + + /** + * Sets the compress flag. + * @param compress If `true`, compresses the sending data + * @return This Namespace + */ + compress(compress: boolean): Namespace; + } + + interface Packet extends Array { + /** + * Event name + */ + [0]: string; + /** + * Packet data + */ + [1]: any; + /** + * Ack function + */ + [2]: (...args: any[]) => void; + } + + /** + * The socket, which handles our connection for a namespace. NOTE: while + * we technically extend NodeJS.EventEmitter, we're not putting it here + * as we have a problem with the emit() event (as it's overridden with a + * different return) + */ + interface Socket extends NodeJS.EventEmitter { + + /** + * The namespace that this socket is for + */ + nsp: Namespace; + + /** + * The Server that our namespace is in + */ + server: Server; + + /** + * The Adapter that we use to handle our rooms + */ + adapter: Adapter; + + /** + * The unique ID for this Socket. Regenerated at every connection. This is + * also the name of the room that the Socket automatically joins on connection + */ + id: string; + + /** + * The http.IncomingMessage request sent with the connection. Useful + * for recovering headers etc + */ + request: any; + + /** + * The Client associated with this Socket + */ + client: Client; + + /** + * The underlying Engine.io Socket instance + */ + conn: EngineSocket; + + /** + * The list of rooms that this Socket is currently in, where + * the ID the the room ID + */ + rooms: { [id: string]: string }; + + /** + * Is the Socket currently connected? + */ + connected: boolean; + + /** + * Is the Socket currently disconnected? + */ + disconnected: boolean; + + /** + * The object used when negociating the handshake + */ + handshake: Handshake; + /** + * Sets the 'json' flag when emitting an event + */ + json: Socket; + + /** + * Sets the 'volatile' flag when emitting an event. Volatile messages are + * messages that can be dropped because of network issues and the like. Use + * for high-volume/real-time messages where you don't need to receive *all* + * of them + */ + volatile: Socket; + + /** + * Sets the 'broadcast' flag when emitting an event. Broadcasting an event + * will send it to all the other sockets in the namespace except for yourself + */ + broadcast: Socket; + + /** + * Targets a room when broadcasting + * @param room The name of the room that we're targeting + * @return This Socket + */ + to(room: string): Socket; + + /** + * @see to( room ) + */ + in(room: string): Socket; + + /** + * Registers a middleware, which is a function that gets executed for every incoming Packet and receives as parameter the packet and a function to optionally defer execution to the next registered middleware. + * + * Errors passed to middleware callbacks are sent as special error packets to clients. + */ + use(fn: (packet: Packet, next: (err?: any) => void) => void): Socket; + + /** + * Sends a 'message' event + * @see emit( event, ...args ) + */ + send(...args: any[]): Socket; + + /** + * @see send( ...args ) + */ + write(...args: any[]): Socket; + + /** + * Joins a room. You can join multiple rooms, and by default, on connection, + * you join a room with the same name as your ID + * @param name The name of the room that we want to join + * @param fn An optional callback to call when we've joined the room. It should + * take an optional parameter, err, of a possible error + * @return This Socket + */ + join(name: string | string[], fn?: (err?: any) => void): Socket; + + /** + * Leaves a room + * @param name The name of the room to leave + * @param fn An optional callback to call when we've left the room. It should + * take on optional parameter, err, of a possible error + */ + leave(name: string, fn?: Function): Socket; + + /** + * Leaves all the rooms that we've joined + */ + leaveAll(): void; + + /** + * Disconnects this Socket + * @param close If true, also closes the underlying connection + * @return This Socket + */ + disconnect(close?: boolean): Socket; + + /** + * Returns all the callbacks for a particular event + * @param event The event that we're looking for the callbacks of + * @return An array of callback Functions, or an empty array if we don't have any + */ + listeners(event: string): Function[]; + + /** + * Sets the compress flag + * @param compress If `true`, compresses the sending data + * @return This Socket + */ + compress(compress: boolean): Socket; + + /** + * Emits the error + * @param err Error message= + */ + error(err: any): void; + } + + interface Handshake { + /** + * The headers passed along with the request. e.g. 'host', + * 'connection', 'accept', 'referer', 'cookie' + */ + headers: any; + + /** + * The current time, as a string + */ + time: string; + + /** + * The remote address of the connection request + */ + address: string; + + /** + * Is this a cross-domain request? + */ + xdomain: boolean; + + /** + * Is this a secure request? + */ + secure: boolean; + + /** + * The timestamp for when this was issued + */ + issued: number; + + /** + * The request url + */ + url: string; + + /** + * Any query string parameters in the request url + */ + query: any; + } + + /** + * The interface describing a room + */ + interface Room { + sockets: { [id: string]: boolean }; + length: number; + } + + /** + * The interface describing a dictionary of rooms + * Where room is the name of the room + */ + + interface Rooms { + [room: string]: Room; + } + + /** + * The interface used when dealing with rooms etc + */ + interface Adapter extends NodeJS.EventEmitter { + + /** + * The namespace that this adapter is for + */ + nsp: Namespace; + + /** + * A dictionary of all the rooms that we have in this namespace + */ + rooms: Rooms; + + /** + * A dictionary of all the socket ids that we're dealing with, and all + * the rooms that the socket is currently in + */ + sids: { [id: string]: { [room: string]: boolean } }; + + /** + * Adds a socket to a room. If the room doesn't exist, it's created + * @param id The ID of the socket to add + * @param room The name of the room to add the socket to + * @param callback An optional callback to call when the socket has been + * added. It should take an optional parameter, error, if there was a problem + */ + add(id: string, room: string, callback?: (err?: any) => void): void; + + /** + * Removes a socket from a room. If there are no more sockets in the room, + * the room is deleted + * @param id The ID of the socket that we're removing + * @param room The name of the room to remove the socket from + * @param callback An optional callback to call when the socket has been + * removed. It should take on optional parameter, error, if there was a problem + */ + del(id: string, room: string, callback?: (err?: any) => void): void; + + /** + * Removes a socket from all the rooms that it's joined + * @param id The ID of the socket that we're removing + */ + delAll(id: string): void; + + /** + * Broadcasts a packet + * @param packet The packet to broadcast + * @param opts Any options to send along: + * - rooms: An optional list of rooms to broadcast to. If empty, the packet is broadcast to all sockets + * - except: A list of Socket IDs to exclude + * - flags: Any flags that we want to send along ('json', 'volatile', 'broadcast') + */ + broadcast(packet: any, opts: { rooms?: string[]; except?: string[]; flags?: { [flag: string]: boolean } }): void; + } + + /** + * The client behind each socket (can have multiple sockets) + */ + interface Client { + /** + * The Server that this client belongs to + */ + server: Server; + + /** + * The underlying Engine.io Socket instance + */ + conn: EngineSocket; + + /** + * The ID for this client. Regenerated at every connection + */ + id: string; + + /** + * The http.IncomingMessage request sent with the connection. Useful + * for recovering headers etc + */ + request: any; + + /** + * The dictionary of sockets currently connect via this client (i.e. to different + * namespaces) where the Socket ID is the key + */ + sockets: { [id: string]: Socket }; + + /** + * A dictionary of all the namespaces for this client, with the Socket that + * deals with that namespace + */ + nsps: { [nsp: string]: Socket }; + } + + /** + * A reference to the underlying engine.io Socket connection. + */ + interface EngineSocket extends NodeJS.EventEmitter { + /** + * The ID for this socket - matches Client.id + */ + id: string; + + /** + * The Engine.io Server for this socket + */ + server: any; + + /** + * The ready state for the client. Either 'opening', 'open', 'closing', or 'closed' + */ + readyState: string; + + /** + * The remote IP for this connection + */ + remoteAddress: string; + + /** + * whether the transport has been upgraded + */ + upgraded: boolean; + + /** + * (http.IncomingMessage): request that originated the Socket + */ + request: any; + + /** + * (Transport): transport reference + */ + transport: any; + } +} diff --git a/packages/websocket/src/socket-io/namespace.ts b/packages/websocket/src/socket-io/namespace.ts new file mode 100644 index 00000000..d63d102d --- /dev/null +++ b/packages/websocket/src/socket-io/namespace.ts @@ -0,0 +1,96 @@ +import { EventEmitter } from 'events' + +import { SocketIOClient } from './client' +import { SocketIO } from './interfaces'; + +export class Namespace implements SocketIO.Namespace { + private event: EventEmitter; + private allClients: { [key: string]: SocketIOClient }; + private roomClients: { [key: string]: Set }; + private clientRooms: { [key: string]: Set }; + + name: string; + server: SocketIO.Server; + sockets: { [id: string]: SocketIO.Socket; }; + connected: { [id: string]: SocketIO.Socket; }; + adapter: SocketIO.Adapter; + json: SocketIO.Namespace; + + constructor(name: string) { + this.name = name; + this.event = new EventEmitter(); + this.allClients = {}; + this.roomClients = {}; + this.clientRooms = {}; + } + use(fn: (socket: SocketIO.Socket, fn: (err?: any) => void) => void): SocketIO.Namespace { + throw new Error("Method not implemented."); + } + to(room: string): SocketIO.Namespace { + throw new Error("Method not implemented."); + } + in(room: string): SocketIO.Namespace { + throw new Error("Method not implemented."); + } + send(...args: any[]): SocketIO.Namespace { + throw new Error("Method not implemented."); + } + write(...args: any[]): SocketIO.Namespace { + throw new Error("Method not implemented."); + } + on(event: "connection", listener: (socket: SocketIO.Socket) => void): this; + on(event: "connect", listener: (socket: SocketIO.Socket) => void): this; + on(event: string, listener: Function): this; + on(event: any, listener: any) { + return this; + } + clients(fn: Function): SocketIO.Namespace { + throw new Error("Method not implemented."); + } + compress(compress: boolean): SocketIO.Namespace { + throw new Error("Method not implemented."); + } + addListener(event: string | symbol, listener: (...args: any[]) => void): this { + throw new Error("Method not implemented."); + } + once(event: string | symbol, listener: (...args: any[]) => void): this { + throw new Error("Method not implemented."); + } + removeListener(event: string | symbol, listener: (...args: any[]) => void): this { + throw new Error("Method not implemented."); + } + off(event: string | symbol, listener: (...args: any[]) => void): this { + throw new Error("Method not implemented."); + } + removeAllListeners(event?: string | symbol): this { + throw new Error("Method not implemented."); + } + setMaxListeners(n: number): this { + throw new Error("Method not implemented."); + } + getMaxListeners(): number { + throw new Error("Method not implemented."); + } + listeners(event: string | symbol): Function[] { + throw new Error("Method not implemented."); + } + rawListeners(event: string | symbol): Function[] { + throw new Error("Method not implemented."); + } + emit(event: string | symbol, ...args: any[]): boolean { + throw new Error("Method not implemented."); + } + listenerCount(type: string | symbol): number { + throw new Error("Method not implemented."); + } + prependListener(event: string | symbol, listener: (...args: any[]) => void): this { + throw new Error("Method not implemented."); + } + prependOnceListener(event: string | symbol, listener: (...args: any[]) => void): this { + throw new Error("Method not implemented."); + } + eventNames(): (string | symbol)[] { + throw new Error("Method not implemented."); + } + +} diff --git a/packages/websocket/src/socket-io/packet.ts b/packages/websocket/src/socket-io/packet.ts new file mode 100644 index 00000000..1ce868b2 --- /dev/null +++ b/packages/websocket/src/socket-io/packet.ts @@ -0,0 +1,11 @@ +import { PacketTypes, SubPacketTypes } from './types' + +export interface Packet { + type: PacketTypes; + sub_type?: SubPacketTypes; + nsp?: string; + id?: number; + name?: string; + data?: any; + attachments?: any; +} diff --git a/packages/websocket/src/socket-io/parser.ts b/packages/websocket/src/socket-io/parser.ts new file mode 100644 index 00000000..38778b80 --- /dev/null +++ b/packages/websocket/src/socket-io/parser.ts @@ -0,0 +1,160 @@ +import { Packet } from "./packet"; +import { PacketTypes, SubPacketTypes } from "./types"; + +export class Parser { + encode(packet: Packet): string { + // first is type + var str = '' + packet.type; + if (packet.type == PacketTypes.PONG) { + if (packet.data) { str += packet.data }; + return str; + } + if (packet.sub_type != undefined) { + str += packet.sub_type; + } + // attachments if we have them + if ([SubPacketTypes.BINARY_EVENT, SubPacketTypes.BINARY_ACK].includes(packet.sub_type)) { + str += packet.attachments + '-'; + } + // if we have a namespace other than `/` + // we append it followed by a comma `,` + if (packet.nsp && '/' !== packet.nsp) { + str += packet.nsp + ','; + } + // immediately followed by the id + if (null != packet.id) { + str += packet.id; + } + if (packet.sub_type == SubPacketTypes.EVENT) { + packet.data = [packet.name, packet.data] + } + // json data + if (null != packet.data) { + var payload = this.tryStringify(packet.data); + if (payload !== false) { + str += payload; + } else { + return '4"encode error"' + } + } + console.debug(`encoded ${JSON.stringify(packet)} as ${str}`); + return str; + } + tryStringify(str) { + try { + return JSON.stringify(str); + } catch (e) { + return false; + } + } + decode(str: string): Packet { + var i = 0; + // ignore parse binary + // if ((frame.getByte(0) == 'b' && frame.getByte(1) == '4') + // || frame.getByte(0) == 4 || frame.getByte(0) == 1) { + // return parseBinary(head, frame); + // } + // look up type + var p: Packet = { + type: Number(str.charAt(i)) + }; + if (null == PacketTypes[p.type]) { + return this.error('unknown packet type ' + p.type); + } + // if str empty return + if (str.length == i + 1) { + return p; + } + // if is ping packet read data and return + if (PacketTypes.PING == p.type) { + p.data = str.substr(++i); + return p; + } + // look up sub type + p.sub_type = Number(str.charAt(++i)); + if (null == PacketTypes[p.sub_type]) { + return this.error('unknown sub packet type ' + p.type); + } + // look up attachments if type binary + if ([SubPacketTypes.BINARY_ACK, SubPacketTypes.BINARY_EVENT].includes(p.sub_type)) { + var buf = ''; + while (str.charAt(++i) !== '-') { + buf += str.charAt(i); + if (i == str.length) break; + } + if (buf != `${Number(buf)}` || str.charAt(i) !== '-') { + throw new Error('Illegal attachments'); + } + p.attachments = Number(buf); + } + + // look up namespace (if any) + if ('/' === str.charAt(i + 1)) { + p.nsp = ''; + while (++i) { + var c = str.charAt(i); + if (',' === c) break; + p.nsp += c; + if (i === str.length) break; + } + } else { + p.nsp = '/'; + } + + // look up id + var next = str.charAt(i + 1); + if ('' !== next && Number.isNaN(Number(next))) { + var id = '' + while (++i) { + var c = str.charAt(i); + if (null == c || Number.isNaN(Number(c))) { + --i; + break; + } + id += str.charAt(i); + if (i === str.length) break; + } + p.id = Number(id); + } + + // look up packet name + if (p.sub_type == SubPacketTypes.EVENT) { + + } + // ignore binary packet + if (p.sub_type == SubPacketTypes.BINARY_EVENT) { + return this.error('not support binary parse...') + } + + // look up json data + if (str.charAt(++i)) { + var payload = this.tryParse(str.substr(i)); + var isPayloadValid = payload !== false && (p.sub_type == SubPacketTypes.ERROR || Array.isArray(payload)); + if (isPayloadValid) { + p.name = payload[0]; + p.data = payload[1]; + } else { + return this.error('invalid payload'); + } + } + + console.debug(`decoded ${str} as ${JSON.stringify(p)}`); + return p; + } + + tryParse(str) { + try { + return JSON.parse(str); + } catch (e) { + return false; + } + } + + error(error: string): Packet { + return { + type: PacketTypes.MESSAGE, + sub_type: SubPacketTypes.ERROR, + data: 'parser error: ' + error + }; + } +} \ No newline at end of file diff --git a/packages/websocket/src/socket-io/types.ts b/packages/websocket/src/socket-io/types.ts new file mode 100644 index 00000000..39758563 --- /dev/null +++ b/packages/websocket/src/socket-io/types.ts @@ -0,0 +1,18 @@ +export enum PacketTypes { + OPEN, + CLOSE, + PING, + PONG, + MESSAGE, + UPGRADE, + NOOP, +} +export enum SubPacketTypes { + CONNECT, + DISCONNECT, + EVENT, + ACK, + ERROR, + BINARY_EVENT, + BINARY_ACK +}