diff --git a/packages/websocket/src/server/client.ts b/packages/websocket/src/server/client.ts index e9742db9..890cdf95 100644 --- a/packages/websocket/src/server/client.ts +++ b/packages/websocket/src/server/client.ts @@ -1,26 +1,29 @@ import { EventEmitter } from 'events' +import { SocketIO } from 'socket-io/interfaces'; const TextWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.TextWebSocketFrame') -export class NettyClient { - private event: EventEmitter - +export class NettyClient extends EventEmitter implements SocketIO.EngineSocket { private _id: string; private channel: any - constructor(channel: any) { + + server: any; + readyState: string; + remoteAddress: string; + upgraded: boolean; + request: any; + transport: any; + + constructor(server: any, channel: any) { + super(); + this.server = server; this.channel = channel; this._id = channel.id(); - this.event = new EventEmitter(); } + get id() { return this._id; } - on(event: string, callback: (...args: any[]) => void) { - this.event.on(event, callback); - } - emit(event: string, text: string) { - this.event.emit(event, text); - } send(text: string) { this.channel.writeAndFlush(new TextWebSocketFrame(text)) } diff --git a/packages/websocket/src/server/config.ts b/packages/websocket/src/server/config.ts new file mode 100644 index 00000000..87a892ae --- /dev/null +++ b/packages/websocket/src/server/config.ts @@ -0,0 +1,6 @@ +import { EventEmitter } from 'events' + +export interface NettyWebSocketServerOptions { + event: EventEmitter, + path?: string; +} diff --git a/packages/websocket/src/server/index.ts b/packages/websocket/src/server/index.ts index f0ad5b22..a380f192 100644 --- a/packages/websocket/src/server/index.ts +++ b/packages/websocket/src/server/index.ts @@ -4,10 +4,7 @@ import { ServerEvent, Keys } from './constants' import { WebSocketDetect } from './websocket_detect' import { WebSocketHandler } from './websocket_handler' import { NettyClient } from './client' - -interface NettyWebSocketServerOptions { - path?: string; -} +import { NettyWebSocketServerOptions } from './config' class NettyWebSocketServer { private event: EventEmitter @@ -18,20 +15,20 @@ class NettyWebSocketServer { this.event = new EventEmitter(); this.allClients = {}; this.pipeline = pipeline; - let connectEvent = new EventEmitter(); + let connectEvent = options.event; this.pipeline.addFirst(Keys.Detect, new WebSocketDetect(connectEvent).getHandler()) connectEvent.on(ServerEvent.detect, (ctx, channel) => { - channel.pipeline().addFirst(Keys.Handler, new WebSocketHandler(connectEvent).getHandler()) + channel.pipeline().addFirst(Keys.Handler, new WebSocketHandler(options).getHandler()) ctx.fireChannelRead(channel) }) connectEvent.on(ServerEvent.connect, (ctx) => { - let nettyClient = new NettyClient(ctx.channel()); + let nettyClient = new NettyClient(this, ctx.channel()); this.allClients[nettyClient.id] = nettyClient; this.event.emit(ServerEvent.connect, nettyClient); }) connectEvent.on(ServerEvent.message, (ctx, msg) => { let channel = ctx.channel(); - this.allClients[channel.id()]?.emit(ServerEvent.message, msg.text()) + this.event.emit(ServerEvent.message, this.allClients[channel.id()], msg.text()) }) } diff --git a/packages/websocket/src/server/text_websocket_frame.ts b/packages/websocket/src/server/text_websocket_frame.ts index 31429edc..fb2a3f61 100644 --- a/packages/websocket/src/server/text_websocket_frame.ts +++ b/packages/websocket/src/server/text_websocket_frame.ts @@ -1,12 +1,13 @@ import { TextWebSocketFrameHandlerAdapter } from '../netty' import { EventEmitter } from 'events' import { ServerEvent } from './constants' +import { NettyWebSocketServerOptions } from './config'; export class TextWebSocketFrameHandler extends TextWebSocketFrameHandlerAdapter { private event: EventEmitter; - constructor(event: EventEmitter) { + constructor(options: NettyWebSocketServerOptions) { super() - this.event = event; + this.event = options.event; } userEventTriggered(ctx: any, evt: any) { if (evt == 'HANDSHAKE_COMPLETE') { diff --git a/packages/websocket/src/server/websocket_handler.ts b/packages/websocket/src/server/websocket_handler.ts index c3ea273a..0cfce45c 100644 --- a/packages/websocket/src/server/websocket_handler.ts +++ b/packages/websocket/src/server/websocket_handler.ts @@ -4,6 +4,7 @@ import { Keys } from './constants' import { WebSocketHandlerAdapter } from "../netty" import { HttpRequestHandler } from './httprequest' import { TextWebSocketFrameHandler } from './text_websocket_frame' +import { NettyWebSocketServerOptions } from './config' const CharsetUtil = Java.type('io.netty.util.CharsetUtil') @@ -13,10 +14,10 @@ const HttpObjectAggregator = Java.type('io.netty.handler.codec.http.HttpObjectAg const WebSocketServerProtocolHandler = Java.type('io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler') export class WebSocketHandler extends WebSocketHandlerAdapter { - private event: EventEmitter; - constructor(event: EventEmitter) { + private options: NettyWebSocketServerOptions; + constructor(options: NettyWebSocketServerOptions) { super() - this.event = event; + this.options = options; } channelRead(ctx: any, msg: any) { msg.markReaderIndex() @@ -32,8 +33,8 @@ export class WebSocketHandler extends WebSocketHandlerAdapter { pipeline.addLast('chunk', new ChunkedWriteHandler()) pipeline.addLast('httpobj', new HttpObjectAggregator(64 * 1024)) pipeline.addLast('http_request', new HttpRequestHandler().getHandler()) - pipeline.addLast('websocket', new WebSocketServerProtocolHandler("/ws", true)) - pipeline.addLast('websocket_handler', new TextWebSocketFrameHandler(this.event).getHandler()) + pipeline.addLast('websocket', new WebSocketServerProtocolHandler(this.options.path, true)) + pipeline.addLast('websocket_handler', new TextWebSocketFrameHandler(this.options).getHandler()) } pipeline.remove(Keys.Handler) msg.resetReaderIndex() diff --git a/packages/websocket/src/socket-io/adapter.ts b/packages/websocket/src/socket-io/adapter.ts new file mode 100644 index 00000000..db0cc308 --- /dev/null +++ b/packages/websocket/src/socket-io/adapter.ts @@ -0,0 +1,124 @@ +import { EventEmitter } from 'events' +import { SocketIO } from './interfaces'; +import { Namespace } from './namespace'; +import { Parser } from './parser'; + +export class Adapter extends EventEmitter implements SocketIO.Adapter { + nsp: SocketIO.Namespace; + rooms: Rooms; + sids: { [id: string]: { [room: string]: boolean; }; }; + parser: Parser + constructor(nsp: Namespace) { + super() + this.parser = nsp.server.parser; + } + add(id: string, room: string, callback?: (err?: any) => void): void { + return this.addAll(id, [room], callback); + } + /** + * Adds a socket to a list of room. + * + * @param {String} socket id + * @param {String} rooms + * @param {Function} callback + * @api public + */ + addAll(id, rooms, fn) { + for (var i = 0; i < rooms.length; i++) { + var room = rooms[i]; + this.sids[id] = this.sids[id] || {}; + this.sids[id][room] = true; + this.rooms[room] = this.rooms[room] || new Room(); + this.rooms[room].add(id); + } + fn && fn.bind(null, null) + }; + del(id: string, room: string, callback?: (err?: any) => void): void { + if (this.sids[id]) delete this.sids[id][room]; + + if (this.rooms.hasOwnProperty(room)) { + this.rooms[room].del(id); + if (this.rooms[room].length === 0) delete this.rooms[room]; + } + callback && callback.bind(null, null) + } + + delAll(id: string): void { + var rooms = this.sids[id]; + if (rooms) { + for (var room in rooms) { + if (this.rooms.hasOwnProperty(room)) { + this.rooms[room].del(id); + if (this.rooms[room].length === 0) delete this.rooms[room]; + } + } + } + delete this.sids[id]; + } + broadcast(packet: any, opts: { rooms?: string[]; except?: string[]; flags?: { [flag: string]: boolean; }; }): void { + var rooms = opts.rooms || []; + var except = opts.except || []; + var flags = opts.flags || {}; + var packetOpts = { + preEncoded: true, + volatile: flags.volatile, + compress: flags.compress + }; + var ids = {}; + var self = this; + var socket; + + packet.nsp = this.nsp.name; + let encodedPackets = this.parser.encode(packet) + if (rooms.length) { + for (var i = 0; i < rooms.length; i++) { + var room = self.rooms[rooms[i]]; + if (!room) continue; + var sockets = room.sockets; + for (var id in sockets) { + if (sockets.hasOwnProperty(id)) { + if (ids[id] || ~except.indexOf(id)) continue; + socket = self.nsp.connected[id]; + if (socket) { + socket.packet(encodedPackets, packetOpts); + ids[id] = true; + } + } + } + } + } else { + for (var id in self.sids) { + if (self.sids.hasOwnProperty(id)) { + if (~except.indexOf(id)) continue; + socket = self.nsp.connected[id]; + if (socket) socket.packet(encodedPackets, packetOpts); + } + } + } + } +} + +class Rooms implements SocketIO.Rooms { + [room: string]: Room; +} + +class Room implements SocketIO.Room { + sockets: { [id: string]: boolean; }; + length: number; + constructor() { + this.sockets = {}; + this.length = 0; + } + add(id) { + if (!this.sockets.hasOwnProperty(id)) { + this.sockets[id] = true; + this.length++; + } + } + del(id) { + if (this.sockets.hasOwnProperty(id)) { + delete this.sockets[id]; + this.length--; + } + } +} \ No newline at end of file diff --git a/packages/websocket/src/socket-io/client.ts b/packages/websocket/src/socket-io/client.ts index 7b85de70..f301d9bc 100644 --- a/packages/websocket/src/socket-io/client.ts +++ b/packages/websocket/src/socket-io/client.ts @@ -7,9 +7,9 @@ import { SocketIO } from './interfaces' const parser = new Parser(); -export class SocketIOClient implements SocketIO.Client { +export class Client extends EventEmitter implements SocketIO.Client { private nettyClient: NettyClient; - private event: EventEmitter + private event: EventEmitter; private _id: string; server: SocketIO.Server; @@ -18,67 +18,37 @@ export class SocketIOClient implements SocketIO.Client { sockets: { [id: string]: SocketIO.Socket; }; nsps: { [nsp: string]: SocketIO.Socket; }; - constructor(nettyClient: NettyClient) { - this.event = new EventEmitter() + constructor(server: SocketIO.Server, nettyClient: NettyClient) { + super(); + this.server = server; + this.event = new EventEmitter(); + this.conn = nettyClient; this.nettyClient = nettyClient; this._id = this.nettyClient.id; - this.event.emit('connect', this); - this.nettyClient.on(ServerEvent.message, (text) => this.process(text)) } - get id() { return this._id; } - connect() { - this.packet({ - type: PacketTypes.OPEN, - data: { - sid: this._id, - upgrades: [], - pingInterval: 25000, - pingTimeout: 5000 - } - }) - this.packet({ - type: PacketTypes.MESSAGE, - sub_type: SubPacketTypes.CONNECT - }) + on(event: string, callback: (...args: any[]) => void) { + this.event.on(event, callback); + return this } - emit(event: string, data: any) { + emit(event: string, ...args: any[]): boolean { this.packet({ type: PacketTypes.MESSAGE, sub_type: SubPacketTypes.EVENT, name: event, - data + data: args[0] }) + return true; } send(data: any) { this.emit("message", data); } + process(packet: Packet) { + this.event.emit(packet.name, packet.data); + } packet(packet: Packet) { this.nettyClient.send(parser.encode(packet)) } - - private process(text: string) { - let packet = parser.decode(text); - switch (packet.type) { - case PacketTypes.OPEN: - break; - case PacketTypes.PING: - this.packet({ - type: PacketTypes.PONG - }) - break; - case PacketTypes.MESSAGE: - switch (packet.sub_type) { - case SubPacketTypes.CONNECT: - this.nettyClient.send(text); - break; - case SubPacketTypes.EVENT: - this.event.emit(packet.name, packet.data); - break; - } - break; - } - } } diff --git a/packages/websocket/src/socket-io/index.ts b/packages/websocket/src/socket-io/index.ts index 0b1a7cc5..13b3a008 100644 --- a/packages/websocket/src/socket-io/index.ts +++ b/packages/websocket/src/socket-io/index.ts @@ -4,35 +4,37 @@ import { NettyWebSocketServer } from '../server' import { ServerEvent } from '../server/constants'; import { Namespace } from './namespace'; -import { SocketIOClient } from './client'; +import { Client } from './client'; import { SocketIO } from './interfaces' +import { Parser } from './parser' +import { PacketTypes, SubPacketTypes } from './types'; +import { Packet } from './packet'; +import { Socket } from './socket'; +import { Adapter } from './adapter'; -export class SocketIOServer implements SocketIO.Server { +class Server implements SocketIO.Server { private event: EventEmitter; private nettyServer: NettyWebSocketServer; - private allClients: { [key: string]: SocketIOClient }; - private namespaces: { [key: string]: Namespace }; + private allClients: { [key: string]: Client }; engine: { ws: any; }; - nsps: { [namespace: string]: SocketIO.Namespace; }; - sockets: SocketIO.Namespace; + nsps: { [namespace: string]: Namespace; }; + sockets: Namespace; json: SocketIO.Server; volatile: SocketIO.Server; local: SocketIO.Server; + parser = new Parser(); + _adapter: Adapter; + options: SocketIO.ServerOptions; constructor(pipeline: any, options: SocketIO.ServerOptions) { if (!pipeline) { throw new Error('Netty Pipeline can\'t be undefiend!') } this.event = new EventEmitter(); this.allClients = {}; - this.namespaces = {}; - this.nettyServer = new NettyWebSocketServer(pipeline, { - - }); - this.nettyServer.on(ServerEvent.connect, (nettyClient) => { - let client = new SocketIOClient(nettyClient); - this.allClients[client.id] = client; - client.connect(); - }) + this.nsps = {}; + this.sockets = new Namespace('/'); + this.nsps['/'] = this.sockets; + this.initNettyServer(pipeline, options); } checkRequest(req: any, fn: (err: any, success: boolean) => void): void { @@ -46,12 +48,21 @@ export class SocketIOServer implements SocketIO.Server { path(): string; path(v: string): SocketIO.Server; path(v?: any): string | SocketIO.Server { + if (!arguments.length) return this.options.path; + this.options.path = v.replace(/\/$/, ''); return this; } - adapter(); + adapter(): Adapter; adapter(v: any): SocketIO.Server; - adapter(v?: any): SocketIO.Server { - throw new Error("Method not implemented."); + adapter(v?: any): Adapter | SocketIO.Server { + if (!arguments.length) return this._adapter; + this._adapter = v; + for (var i in this.nsps) { + if (this.nsps.hasOwnProperty(i)) { + this.nsps[i].initAdapter(); + } + } + return this; } origins(): string | string[]; origins(v: string | string[]): SocketIO.Server; @@ -73,10 +84,23 @@ export class SocketIOServer implements SocketIO.Server { throw new Error("Method not implemented."); } onconnection(socket: any): SocketIO.Server { - throw new Error("Method not implemented."); + socket.packet({ + type: PacketTypes.OPEN, + data: { + sid: socket.id, + upgrades: [], + pingInterval: 25000, + pingTimeout: 5000 + } + }) + this.sockets.add(socket); + return this; } - of(nsp: string | Function | RegExp): SocketIO.Namespace { - throw new Error("Method not implemented."); + of(nsp: string): SocketIO.Namespace { + if (!this.nsps[nsp]) { + this.nsps[nsp] = new Namespace(nsp); + } + return this.nsps[nsp]; } close(fn?: () => void): void { throw new Error("Method not implemented."); @@ -85,7 +109,8 @@ export class SocketIOServer implements SocketIO.Server { on(event: "connect", listener: (socket: SocketIO.Socket) => void): SocketIO.Namespace; on(event: string, listener: Function): SocketIO.Namespace; on(event: any, listener: any): SocketIO.Namespace { - throw new Error("Method not implemented."); + this.event.on(event, listener); + return this.sockets; } to(room: string): SocketIO.Namespace { throw new Error("Method not implemented."); @@ -97,22 +122,72 @@ export class SocketIOServer implements SocketIO.Server { throw new Error("Method not implemented."); } emit(event: string, ...args: any[]): SocketIO.Namespace { - throw new Error("Method not implemented."); + this.sockets.emit(event, ...args); + return this.sockets; } send(...args: any[]): SocketIO.Namespace { - throw new Error("Method not implemented."); + this.sockets.send(...args); + return this.sockets; } write(...args: any[]): SocketIO.Namespace { - throw new Error("Method not implemented."); + this.sockets.write(...args); + return this.sockets; } clients(...args: any[]): SocketIO.Namespace { - throw new Error("Method not implemented."); + this.sockets.clients(args[0]); + return this.sockets; } compress(...args: any[]): SocketIO.Namespace { throw new Error("Method not implemented."); } - disable() { this.nettyServer.disable(); } + + private initNettyServer(pipeline, options) { + this.nettyServer = new NettyWebSocketServer(pipeline, { + event: new EventEmitter(), + path: options.path + }); + this.nettyServer.on(ServerEvent.connect, (nettyClient) => { + let client = new Client(this, nettyClient); + this.onconnection(client); + }) + this.nettyServer.on(ServerEvent.message, (nettyClient, text) => { + this.processPacket(this.parser.decode(text), this.allClients[nettyClient.id]); + }) + } + + private processPacket(packet: Packet, client: Client) { + switch (packet.type) { + case PacketTypes.PING: + client.packet({ + type: PacketTypes.PONG, + data: packet.data + }) + break; + case PacketTypes.UPGRADE: + break; + case PacketTypes.MESSAGE: + this.processSubPacket(packet, client); + break; + case PacketTypes.CLOSE: + } + } + + private processSubPacket(packet: Packet, client: Client) { + switch (packet.sub_type) { + case SubPacketTypes.CONNECT: + client.packet(packet); + break; + case SubPacketTypes.EVENT: + client.process(packet); + break; + } + } +} +export { + Server, + Server as SocketIOServer, + Client as SocketIOClient } \ No newline at end of file diff --git a/packages/websocket/src/socket-io/namespace.ts b/packages/websocket/src/socket-io/namespace.ts index d63d102d..de3dfebe 100644 --- a/packages/websocket/src/socket-io/namespace.ts +++ b/packages/websocket/src/socket-io/namespace.ts @@ -1,96 +1,66 @@ import { EventEmitter } from 'events' -import { SocketIOClient } from './client' +import { Client } from './client' import { SocketIO } from './interfaces'; +import { ServerEvent } from '../server'; +import { Socket } from './socket'; +import { Adapter } from './adapter'; +import { Server } from './index' -export class Namespace implements SocketIO.Namespace { - private event: EventEmitter; - private allClients: { [key: string]: SocketIOClient }; - private roomClients: { [key: string]: Set }; - private clientRooms: { [key: string]: Set }; - +export class Namespace extends EventEmitter implements SocketIO.Namespace { name: string; - server: SocketIO.Server; + server: Server; sockets: { [id: string]: SocketIO.Socket; }; connected: { [id: string]: SocketIO.Socket; }; adapter: SocketIO.Adapter; json: SocketIO.Namespace; constructor(name: string) { + super(); this.name = name; - this.event = new EventEmitter(); - this.allClients = {}; - this.roomClients = {}; - this.clientRooms = {}; + this.sockets = {}; + this.connected = {}; + this.adapter = new Adapter(this); + } + initAdapter() { + let adp = this.server.adapter() + this.adapter = new adp() + } + add(client: Client) { + let nameClient = new Socket(this, client, {}); + this.sockets[client.id] = nameClient; + client.nsps[this.name] = nameClient; + this.onconnection(nameClient); } use(fn: (socket: SocketIO.Socket, fn: (err?: any) => void) => void): SocketIO.Namespace { - throw new Error("Method not implemented."); - } - to(room: string): SocketIO.Namespace { - throw new Error("Method not implemented."); - } - in(room: string): SocketIO.Namespace { - throw new Error("Method not implemented."); - } - send(...args: any[]): SocketIO.Namespace { - throw new Error("Method not implemented."); - } - write(...args: any[]): SocketIO.Namespace { - throw new Error("Method not implemented."); - } - on(event: "connection", listener: (socket: SocketIO.Socket) => void): this; - on(event: "connect", listener: (socket: SocketIO.Socket) => void): this; - on(event: string, listener: Function): this; - on(event: any, listener: any) { + // TODO return this; } + to(room: string): SocketIO.Namespace { + // TODO + return this; + } + in(room: string): SocketIO.Namespace { + return this.to(room); + } + send(...args: any[]): SocketIO.Namespace { + // TODO + return this; + } + write(...args: any[]): SocketIO.Namespace { + return this.send(...args); + } clients(fn: Function): SocketIO.Namespace { - throw new Error("Method not implemented."); + return fn(Object.values(this.sockets)) } compress(compress: boolean): SocketIO.Namespace { throw new Error("Method not implemented."); } - addListener(event: string | symbol, listener: (...args: any[]) => void): this { - throw new Error("Method not implemented."); + private onconnection(socket: any) { + let client = socket as Socket; + this.sockets[client.id] = client; + client.onconnect() + this.emit(ServerEvent.connect, socket); + this.emit(ServerEvent.connection, socket); } - once(event: string | symbol, listener: (...args: any[]) => void): this { - throw new Error("Method not implemented."); - } - removeListener(event: string | symbol, listener: (...args: any[]) => void): this { - throw new Error("Method not implemented."); - } - off(event: string | symbol, listener: (...args: any[]) => void): this { - throw new Error("Method not implemented."); - } - removeAllListeners(event?: string | symbol): this { - throw new Error("Method not implemented."); - } - setMaxListeners(n: number): this { - throw new Error("Method not implemented."); - } - getMaxListeners(): number { - throw new Error("Method not implemented."); - } - listeners(event: string | symbol): Function[] { - throw new Error("Method not implemented."); - } - rawListeners(event: string | symbol): Function[] { - throw new Error("Method not implemented."); - } - emit(event: string | symbol, ...args: any[]): boolean { - throw new Error("Method not implemented."); - } - listenerCount(type: string | symbol): number { - throw new Error("Method not implemented."); - } - prependListener(event: string | symbol, listener: (...args: any[]) => void): this { - throw new Error("Method not implemented."); - } - prependOnceListener(event: string | symbol, listener: (...args: any[]) => void): this { - throw new Error("Method not implemented."); - } - eventNames(): (string | symbol)[] { - throw new Error("Method not implemented."); - } - } diff --git a/packages/websocket/src/socket-io/socket.ts b/packages/websocket/src/socket-io/socket.ts new file mode 100644 index 00000000..8acb9b2f --- /dev/null +++ b/packages/websocket/src/socket-io/socket.ts @@ -0,0 +1,110 @@ +import { EventEmitter } from 'events' + +import { SocketIOClient } from "socket-io"; +import { SocketIO } from "./interfaces"; +import { Packet } from './packet'; +import { PacketTypes, SubPacketTypes } from './types'; + +export class Socket extends EventEmitter implements SocketIO.Socket { + nsp: SocketIO.Namespace; + server: SocketIO.Server; + adapter: SocketIO.Adapter; + id: string; + request: any; + client: SocketIOClient; + conn: SocketIO.EngineSocket; + rooms: { [id: string]: string; }; + connected: boolean; + disconnected: boolean; + handshake: SocketIO.Handshake; + json: SocketIO.Socket; + volatile: SocketIO.Socket; + broadcast: SocketIO.Socket; + fns: any[]; + _rooms: string[]; + + constructor(nsp, client, query) { + super(); + this.nsp = nsp; + this.server = nsp.server; + this.adapter = this.nsp.adapter; + this.id = nsp.name !== '/' ? nsp.name + '#' + client.id : client.id; + this.client = client; + this.conn = client.conn; + this.rooms = {}; + // this.acks = {}; + this.connected = true; + this.disconnected = false; + // this.handshake = this.buildHandshake(query); + this.fns = []; + // this.flags = {}; + this._rooms = []; + } + + to(room: string): SocketIO.Socket { + if (!~this._rooms.indexOf(room)) this._rooms.push(room); + return this; + } + in(room: string): SocketIO.Socket { + return this.to(room); + } + use(fn: (packet: SocketIO.Packet, next: (err?: any) => void) => void): SocketIO.Socket { + throw new Error("Method not implemented."); + } + send(...args: any[]): SocketIO.Socket { + this.emit("message", ...args) + return this; + } + write(...args: any[]): SocketIO.Socket { + return this.send(...args); + } + join(rooms: string | string[], fn?: (err?: any) => void): SocketIO.Socket { + if (!Array.isArray(rooms)) { + rooms = [rooms]; + } + rooms = rooms.filter((room) => { + return !this.rooms.hasOwnProperty(room); + }); + if (!rooms.length) { + fn && fn(null); + return this; + } + return this; + } + leave(name: string, fn?: Function): SocketIO.Socket { + delete this.rooms[name]; + fn && fn(null) + return this; + } + leaveAll(): void { + this.adapter.delAll(this.id); + this.rooms = {}; + } + disconnect(close?: boolean): SocketIO.Socket { + throw new Error("Method not implemented."); + } + compress(compress: boolean): SocketIO.Socket { + throw new Error("Method not implemented."); + } + error(err: any): void { + throw new Error("Method not implemented."); + } + + packet(packet: Packet) { + this.client.packet(packet); + } + + onconnect() { + this.nsp.connected[this.id] = this; + this.join(this.id); + // var skip = this.nsp.name === '/' && this.nsp.fns.length === 0; + // if (skip) { + // debug('packet already sent in initial handshake'); + // } else { + this.packet({ + type: PacketTypes.MESSAGE, + sub_type: SubPacketTypes.CONNECT + }); + // } + } +} \ No newline at end of file