From b6318cb8c12a9f22b78374cd8daae89ad53fc7fa Mon Sep 17 00:00:00 2001 From: MiaoWoo Date: Mon, 23 Mar 2020 18:33:12 +0800 Subject: [PATCH] feat: complate sockt.io server Signed-off-by: MiaoWoo --- packages/websocket/src/server/client.ts | 10 ++ packages/websocket/src/server/constants.ts | 6 + packages/websocket/src/server/httprequest.ts | 2 + packages/websocket/src/server/index.ts | 3 + packages/websocket/src/socket-io/client.ts | 112 +++++++++++---- packages/websocket/src/socket-io/index.ts | 33 +++-- packages/websocket/src/socket-io/namespace.ts | 50 +++++-- packages/websocket/src/socket-io/socket.ts | 133 ++++++++++++++++-- 8 files changed, 288 insertions(+), 61 deletions(-) diff --git a/packages/websocket/src/server/client.ts b/packages/websocket/src/server/client.ts index 890cdf95..91c1ac6c 100644 --- a/packages/websocket/src/server/client.ts +++ b/packages/websocket/src/server/client.ts @@ -1,5 +1,6 @@ import { EventEmitter } from 'events' import { SocketIO } from 'socket-io/interfaces'; +import { Keys, AttributeKeys } from './constants'; const TextWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.TextWebSocketFrame') @@ -17,6 +18,12 @@ export class NettyClient extends EventEmitter implements SocketIO.EngineSocket { constructor(server: any, channel: any) { super(); this.server = server; + this.readyState = 'open'; + this.remoteAddress = channel.remoteAddress() + '' + this.upgraded = true; + this.request = channel.attr(AttributeKeys.Request).get(); + this.transport = null; + this.channel = channel; this._id = channel.id(); } @@ -27,4 +34,7 @@ export class NettyClient extends EventEmitter implements SocketIO.EngineSocket { send(text: string) { this.channel.writeAndFlush(new TextWebSocketFrame(text)) } + close() { + this.channel.close(); + } } diff --git a/packages/websocket/src/server/constants.ts b/packages/websocket/src/server/constants.ts index fd278e21..81ec9c30 100644 --- a/packages/websocket/src/server/constants.ts +++ b/packages/websocket/src/server/constants.ts @@ -6,8 +6,14 @@ export enum ServerEvent { disconnect = 'disconnect' } +const AttributeKey = Java.type('io.netty.util.AttributeKey'); + export enum Keys { Detect = "miao_detect", Handler = "miaowebsocket", Default = "DefaultChannelPipeline" } + +export enum AttributeKeys { + Request = AttributeKey.valueOf('request') +} \ No newline at end of file diff --git a/packages/websocket/src/server/httprequest.ts b/packages/websocket/src/server/httprequest.ts index 141ef8c2..2fcc72cd 100644 --- a/packages/websocket/src/server/httprequest.ts +++ b/packages/websocket/src/server/httprequest.ts @@ -1,4 +1,5 @@ import { HttpRequestHandlerAdapter } from '../netty' +import { Keys, AttributeKeys } from './constants' const DefaultHttpResponse = Java.type('io.netty.handler.codec.http.DefaultHttpResponse') const DefaultFullHttpResponse = Java.type('io.netty.handler.codec.http.DefaultFullHttpResponse') @@ -31,6 +32,7 @@ export class HttpRequestHandler extends HttpRequestHandlerAdapter { } channelRead0(ctx: any, request: any) { if (request.getUri().startsWith(this.ws)) { + ctx.channel().attr(AttributeKeys.Request).set(request); ctx.fireChannelRead(request.retain()) } else { ctx.executor().execute(new Runnable({ diff --git a/packages/websocket/src/server/index.ts b/packages/websocket/src/server/index.ts index a380f192..ea797823 100644 --- a/packages/websocket/src/server/index.ts +++ b/packages/websocket/src/server/index.ts @@ -36,6 +36,9 @@ class NettyWebSocketServer { if (this.pipeline.names().contains(Keys.Detect)) { this.pipeline.remove(Keys.Detect) } + Object.values(this.allClients).forEach(client => { + client.close(); + }) } on(event: string, listener: (...args: any[]) => void) { diff --git a/packages/websocket/src/socket-io/client.ts b/packages/websocket/src/socket-io/client.ts index f301d9bc..31e398a6 100644 --- a/packages/websocket/src/socket-io/client.ts +++ b/packages/websocket/src/socket-io/client.ts @@ -1,54 +1,104 @@ import { EventEmitter } from 'events' import { Parser } from './parser' import { Packet } from './packet'; -import { PacketTypes, SubPacketTypes } from './types'; -import { ServerEvent, NettyClient } from '../server'; +import { NettyClient } from '../server'; import { SocketIO } from './interfaces' +import { Server, Socket } from './index'; +import { PacketTypes, SubPacketTypes } from './types'; const parser = new Parser(); export class Client extends EventEmitter implements SocketIO.Client { - private nettyClient: NettyClient; - private event: EventEmitter; - private _id: string; - - server: SocketIO.Server; - conn: SocketIO.EngineSocket; + server: Server; + conn: NettyClient; request: any; - sockets: { [id: string]: SocketIO.Socket; }; + sockets: { [id: string]: Socket; }; nsps: { [nsp: string]: SocketIO.Socket; }; + connectBuffer: any; - constructor(server: SocketIO.Server, nettyClient: NettyClient) { + constructor(server: Server, nettyClient: NettyClient) { super(); this.server = server; - this.event = new EventEmitter(); this.conn = nettyClient; - this.nettyClient = nettyClient; - this._id = this.nettyClient.id; + this.request = nettyClient.request; + this.sockets = {}; + this.nsps = {}; } get id() { - return this._id; + return this.conn.id; } - on(event: string, callback: (...args: any[]) => void) { - this.event.on(event, callback); - return this - } - emit(event: string, ...args: any[]): boolean { - this.packet({ - type: PacketTypes.MESSAGE, - sub_type: SubPacketTypes.EVENT, - name: event, - data: args[0] + connect(name, query) { + if (this.server.nsps[name]) { + // console.debug(`connecting to namespace ${name}`); + return this.doConnect(name, query); + } + this.server.checkNamespace(name, query, (dynamicNsp) => { + if (dynamicNsp) { + // console.debug('dynamic namespace %s was created', dynamicNsp.name); + this.doConnect(name, query); + } else { + // console.debug('creation of namespace %s was denied', name); + this.packet({ + type: PacketTypes.MESSAGE, + sub_type: SubPacketTypes.ERROR, + nsp: name, + data: 'Invalid namespace' + }); + } }) - return true; } - send(data: any) { - this.emit("message", data); - } - process(packet: Packet) { - this.event.emit(packet.name, packet.data); + doConnect(name, query) { + var nsp = this.server.of(name); + if ('/' != name && !this.nsps['/']) { + this.connectBuffer.push(name); + return; + } + var socket = nsp.add(this, query, () => { + this.sockets[socket.id] = socket; + this.nsps[nsp.name] = socket; + + if ('/' == nsp.name && this.connectBuffer.length > 0) { + this.connectBuffer.forEach(this.connect, this); + this.connectBuffer = []; + } + }); } packet(packet: Packet) { - this.nettyClient.send(parser.encode(packet)) + this.conn.send(parser.encode(packet)) } + onclose(reason: string) { + // debug('client close with reason %s', reason); + // ignore a potential subsequent `close` event + this.destroy(); + // `nsps` and `sockets` are cleaned up seamlessly + for (var id in this.sockets) { + if (this.sockets.hasOwnProperty(id)) { + this.sockets[id].onclose(reason); + } + } + this.sockets = {}; + // this.decoder.destroy(); // clean up decoder + }; + close() { + // if ('open' == this.conn.readyState) { + // debug('forcing transport close'); + this.conn.close(); + this.onclose('forced server close'); + // } + } + remove(socket: Socket) { + if (this.sockets.hasOwnProperty(socket.id)) { + var nsp = this.sockets[socket.id].nsp.name; + delete this.sockets[socket.id]; + delete this.nsps[nsp]; + } else { + // debug('ignoring remove for %s', socket.id); + } + } + destroy() { + // this.conn.removeListener('data', this.ondata); + // this.conn.removeListener('error', this.onerror); + // this.conn.removeListener('close', this.onclose); + // this.decoder.removeListener('decoded', this.ondecoded); + }; } diff --git a/packages/websocket/src/socket-io/index.ts b/packages/websocket/src/socket-io/index.ts index 13b3a008..49e88e7d 100644 --- a/packages/websocket/src/socket-io/index.ts +++ b/packages/websocket/src/socket-io/index.ts @@ -32,7 +32,7 @@ class Server implements SocketIO.Server { this.event = new EventEmitter(); this.allClients = {}; this.nsps = {}; - this.sockets = new Namespace('/'); + this.sockets = new Namespace('/', this); this.nsps['/'] = this.sockets; this.initNettyServer(pipeline, options); } @@ -84,6 +84,7 @@ class Server implements SocketIO.Server { throw new Error("Method not implemented."); } onconnection(socket: any): SocketIO.Server { + this.allClients[socket.id] = socket; socket.packet({ type: PacketTypes.OPEN, data: { @@ -96,9 +97,9 @@ class Server implements SocketIO.Server { this.sockets.add(socket); return this; } - of(nsp: string): SocketIO.Namespace { + of(nsp: string): Namespace { if (!this.nsps[nsp]) { - this.nsps[nsp] = new Namespace(nsp); + this.nsps[nsp] = new Namespace(nsp, this); } return this.nsps[nsp]; } @@ -140,6 +141,12 @@ class Server implements SocketIO.Server { compress(...args: any[]): SocketIO.Namespace { throw new Error("Method not implemented."); } + + // =============================== + checkNamespace(name, query, fn) { + fn(false); + }; + disable() { this.nettyServer.disable(); } @@ -176,18 +183,22 @@ class Server implements SocketIO.Server { } private processSubPacket(packet: Packet, client: Client) { - switch (packet.sub_type) { - case SubPacketTypes.CONNECT: - client.packet(packet); - break; - case SubPacketTypes.EVENT: - client.process(packet); - break; + let namespace = this.nsps[packet.nsp] + if (!namespace) { + client.packet({ + type: PacketTypes.MESSAGE, + sub_type: SubPacketTypes.ERROR, + data: 'not support dynamic namespace' + }); + client.close(); + return; } + namespace.process(packet, client); } } export { Server, + Socket, Server as SocketIOServer, Client as SocketIOClient -} \ No newline at end of file +} diff --git a/packages/websocket/src/socket-io/namespace.ts b/packages/websocket/src/socket-io/namespace.ts index de3dfebe..6bb8dc67 100644 --- a/packages/websocket/src/socket-io/namespace.ts +++ b/packages/websocket/src/socket-io/namespace.ts @@ -6,31 +6,41 @@ import { ServerEvent } from '../server'; import { Socket } from './socket'; import { Adapter } from './adapter'; import { Server } from './index' +import { Packet } from './packet'; +import { SubPacketTypes } from './types'; export class Namespace extends EventEmitter implements SocketIO.Namespace { name: string; server: Server; - sockets: { [id: string]: SocketIO.Socket; }; - connected: { [id: string]: SocketIO.Socket; }; + sockets: { [id: string]: Socket; }; + connected: { [id: string]: Socket; }; adapter: SocketIO.Adapter; json: SocketIO.Namespace; - constructor(name: string) { + constructor(name: string, server: Server) { super(); this.name = name; + this.server = server; this.sockets = {}; this.connected = {}; this.adapter = new Adapter(this); } initAdapter() { - let adp = this.server.adapter() - this.adapter = new adp() + // @ts-ignore + this.adapter = new (this.server.adapter())() } - add(client: Client) { - let nameClient = new Socket(this, client, {}); - this.sockets[client.id] = nameClient; - client.nsps[this.name] = nameClient; - this.onconnection(nameClient); + add(client: Client, query?: any, callback?: () => void) { + // client.conn.request.url(); + let socket = new Socket(this, client, {}); + this.sockets[client.id] = socket; + client.nsps[this.name] = socket; + this.onconnection(socket); + return socket; + } + del(client: Client) { + let socket = this.sockets[client.id]; + socket.disconnect(); + delete this.sockets[client.id]; } use(fn: (socket: SocketIO.Socket, fn: (err?: any) => void) => void): SocketIO.Namespace { // TODO @@ -56,6 +66,26 @@ export class Namespace extends EventEmitter implements SocketIO.Namespace { compress(compress: boolean): SocketIO.Namespace { throw new Error("Method not implemented."); } + process(packet: Packet, client: Client) { + switch (packet.sub_type) { + case SubPacketTypes.CONNECT: + this.add(client); + break; + case SubPacketTypes.DISCONNECT: + this.del(client); + break; + case SubPacketTypes.EVENT: + this.sockets[client.id].onpacket(packet); + break; + } + } + remove(socket: Socket) { + if (this.sockets.hasOwnProperty(socket.id)) { + delete this.sockets[socket.id]; + } else { + // debug('ignoring remove for %s', socket.id); + } + } private onconnection(socket: any) { let client = socket as Socket; this.sockets[client.id] = client; diff --git a/packages/websocket/src/socket-io/socket.ts b/packages/websocket/src/socket-io/socket.ts index 8acb9b2f..7f32af15 100644 --- a/packages/websocket/src/socket-io/socket.ts +++ b/packages/websocket/src/socket-io/socket.ts @@ -1,19 +1,23 @@ import { EventEmitter } from 'events' -import { SocketIOClient } from "socket-io"; import { SocketIO } from "./interfaces"; import { Packet } from './packet'; import { PacketTypes, SubPacketTypes } from './types'; +import { Client } from './client'; +import { Namespace } from './namespace'; export class Socket extends EventEmitter implements SocketIO.Socket { - nsp: SocketIO.Namespace; + private event: EventEmitter; + + nsp: Namespace; server: SocketIO.Server; adapter: SocketIO.Adapter; id: string; request: any; - client: SocketIOClient; + client: Client; conn: SocketIO.EngineSocket; rooms: { [id: string]: string; }; + acks: { [id: string]: Function; }; connected: boolean; disconnected: boolean; handshake: SocketIO.Handshake; @@ -23,22 +27,25 @@ export class Socket extends EventEmitter implements SocketIO.Socket { fns: any[]; _rooms: string[]; - constructor(nsp, client, query) { + constructor(nsp: Namespace, client: Client, query = {}) { super(); this.nsp = nsp; this.server = nsp.server; this.adapter = this.nsp.adapter; this.id = nsp.name !== '/' ? nsp.name + '#' + client.id : client.id; this.client = client; + this.request = client.request; this.conn = client.conn; this.rooms = {}; - // this.acks = {}; + this.acks = {}; this.connected = true; this.disconnected = false; // this.handshake = this.buildHandshake(query); this.fns = []; // this.flags = {}; this._rooms = []; + + this.event = new EventEmitter(); } to(room: string): SocketIO.Socket { @@ -81,7 +88,8 @@ export class Socket extends EventEmitter implements SocketIO.Socket { this.rooms = {}; } disconnect(close?: boolean): SocketIO.Socket { - throw new Error("Method not implemented."); + this.client.close(); + return this; } compress(compress: boolean): SocketIO.Socket { throw new Error("Method not implemented."); @@ -90,12 +98,42 @@ export class Socket extends EventEmitter implements SocketIO.Socket { throw new Error("Method not implemented."); } - packet(packet: Packet) { - this.client.packet(packet); + // ========================================== + + buildHandshake(query): SocketIO.Handshake { + let requestQuery = this.request.uri(); + return { + headers: this.request.headers(), + time: (new Date) + '', + address: this.conn.remoteAddress, + xdomain: !!this.request.headers.origin, + secure: !!this.request.connection.encrypted, + issued: +(new Date), + url: this.request.url, + query: Object.assign(query, requestQuery) + } } + on(event: string, callback: (...args: any[]) => void) { + this.event.on(event, callback); + return this + } + emit(event: string, ...args: any[]): boolean { + this.packet({ + type: PacketTypes.MESSAGE, + sub_type: SubPacketTypes.EVENT, + name: event, + data: args[0] + }) + return true; + } + packet(packet: Packet) { + packet.nsp = this.nsp.name; + this.client.packet(packet); + } onconnect() { this.nsp.connected[this.id] = this; + this.client.sockets[this.id] = this; this.join(this.id); // var skip = this.nsp.name === '/' && this.nsp.fns.length === 0; // if (skip) { @@ -107,4 +145,81 @@ export class Socket extends EventEmitter implements SocketIO.Socket { }); // } } -} \ No newline at end of file + onclose(reason) { + if (!this.connected) return this; + // debug('closing socket - reason %s', reason); + this.emit('disconnecting', reason); + this.leaveAll(); + this.nsp.remove(this); + this.client.remove(this); + this.connected = false; + this.disconnected = true; + delete this.nsp.connected[this.id]; + this.emit('disconnect', reason); + }; + onpacket(packet: Packet) { + switch (packet.sub_type) { + case SubPacketTypes.EVENT: + this.onevent(packet); + break; + case SubPacketTypes.BINARY_EVENT: + this.onevent(packet); + break; + case SubPacketTypes.ACK: + this.onack(packet); + break; + case SubPacketTypes.BINARY_ACK: + this.onack(packet); + break; + case SubPacketTypes.DISCONNECT: + this.ondisconnect(); + break; + case SubPacketTypes.ERROR: + this.onerror(new Error(packet.data)); + } + } + onerror(error: Error) { + + } + ondisconnect() { + this.onclose + } + onevent(packet: Packet) { + // console.debug('emitting event %j', args); + if (null != packet.id) { + // console.debug('attaching ack callback to event'); + this.dispatch(packet, this.ack(packet.id)) + } + this.dispatch(packet); + }; + ack(id: number) { + var sent = false; + return (...args: any[]) => { + if (sent) return; + this.packet({ + id: id, + type: PacketTypes.MESSAGE, + sub_type: this.hasBin(args) ? SubPacketTypes.BINARY_ACK : SubPacketTypes.ACK, + data: args + }); + sent = true; + } + } + onack(packet: Packet) { + var ack = this.acks[packet.id]; + if ('function' == typeof ack) { + // debug('calling ack %s with %j', packet.id, packet.data); + ack.apply(this, packet.data); + delete this.acks[packet.id]; + } else { + // debug('bad ack %s', packet.id); + } + } + dispatch(packet: Packet, ack?: Function) { + if (ack) { this.acks[packet.id] = ack; } + this.event.emit(packet.name, packet.data) + } + private hasBin(obj: any) { + return false; + } +}