From 8731f5d9ef90c0b2be596689fcb1d220463b4861 Mon Sep 17 00:00:00 2001 From: MiaoWoo Date: Thu, 14 May 2020 19:03:41 +0800 Subject: [PATCH] feat: support handshark & boardcast Signed-off-by: MiaoWoo --- packages/websocket/src/socket-io/adapter.ts | 6 +-- packages/websocket/src/socket-io/client.ts | 4 +- .../websocket/src/socket-io/interfaces.ts | 10 +++++ packages/websocket/src/socket-io/namespace.ts | 6 ++- packages/websocket/src/socket-io/parser.ts | 5 +++ packages/websocket/src/socket-io/socket.ts | 39 +++++++++++++------ 6 files changed, 51 insertions(+), 19 deletions(-) diff --git a/packages/websocket/src/socket-io/adapter.ts b/packages/websocket/src/socket-io/adapter.ts index 4c670caa..3e4940a7 100644 --- a/packages/websocket/src/socket-io/adapter.ts +++ b/packages/websocket/src/socket-io/adapter.ts @@ -72,7 +72,7 @@ export class Adapter extends EventEmitter implements SocketIO.Adapter { var socket: Socket; packet.nsp = this.nsp.name; - // let encodedPackets = this.parser.encode(packet) + let encodedPackets = this.parser.encode(packet) if (rooms.length) { for (var i = 0; i < rooms.length; i++) { var room = self.rooms[rooms[i]]; @@ -83,7 +83,7 @@ export class Adapter extends EventEmitter implements SocketIO.Adapter { if (ids[id] || ~except.indexOf(id)) continue; socket = self.nsp.connected[id]; if (socket) { - socket.packet(packet, packetOpts); + socket.packet(encodedPackets as any, packetOpts); ids[id] = true; } } @@ -94,7 +94,7 @@ export class Adapter extends EventEmitter implements SocketIO.Adapter { if (self.sids.hasOwnProperty(id)) { if (~except.indexOf(id)) continue; socket = self.nsp.connected[id]; - if (socket) socket.packet(packet, packetOpts); + if (socket) socket.packet(encodedPackets as any, packetOpts); } } } diff --git a/packages/websocket/src/socket-io/client.ts b/packages/websocket/src/socket-io/client.ts index 8f67d6bc..fdefb266 100644 --- a/packages/websocket/src/socket-io/client.ts +++ b/packages/websocket/src/socket-io/client.ts @@ -62,8 +62,8 @@ export class Client extends EventEmitter implements SocketIO.Client { } }); } - packet(packet: Packet, opts?: any) { - this.conn.send(parser.encode(packet)) + packet(packet: Packet, opts: any = { preEncoded: false }) { + this.conn.send(opts.preEncoded ? packet as unknown as string : parser.encode(packet)) } onclose(reason?: string) { // debug('client close with reason %s', reason); diff --git a/packages/websocket/src/socket-io/interfaces.ts b/packages/websocket/src/socket-io/interfaces.ts index c509cfc8..04661f0f 100644 --- a/packages/websocket/src/socket-io/interfaces.ts +++ b/packages/websocket/src/socket-io/interfaces.ts @@ -727,6 +727,16 @@ export declare namespace SocketIO { */ del(id: string, room: string, callback?: (err?: any) => void): void; + /** + * Adds a socket to a list of room. + * + * @param {String} socket id + * @param {String} rooms + * @param {Function} callback + * @api public + */ + addAll(id: string, rooms: string | any[], fn: { (err?: any): void; bind?: any; }); + /** * Removes a socket from all the rooms that it's joined * @param id The ID of the socket that we're removing diff --git a/packages/websocket/src/socket-io/namespace.ts b/packages/websocket/src/socket-io/namespace.ts index 7c8b612a..7f63bd2f 100644 --- a/packages/websocket/src/socket-io/namespace.ts +++ b/packages/websocket/src/socket-io/namespace.ts @@ -7,7 +7,7 @@ import { Socket } from './socket'; import { Adapter } from './adapter'; import { Server } from './index' import { Packet } from './packet'; -import { SubPacketTypes } from './types'; +import { PacketTypes, SubPacketTypes } from './types'; export class Namespace extends EventEmitter implements SocketIO.Namespace { name: string; @@ -78,7 +78,9 @@ export class Namespace extends EventEmitter implements SocketIO.Namespace { } // set up packet object var packet = { - type: (this.flags.binary !== undefined ? this.flags.binary : this.hasBin(args)) ? SubPacketTypes.BINARY_EVENT : SubPacketTypes.EVENT, + type: PacketTypes.MESSAGE, + sub_type: (this.flags.binary !== undefined ? this.flags.binary : this.hasBin(args)) ? SubPacketTypes.BINARY_EVENT : SubPacketTypes.EVENT, + name: event, data: args } diff --git a/packages/websocket/src/socket-io/parser.ts b/packages/websocket/src/socket-io/parser.ts index 44df4cc7..bd4c1f39 100644 --- a/packages/websocket/src/socket-io/parser.ts +++ b/packages/websocket/src/socket-io/parser.ts @@ -103,6 +103,11 @@ export class Parser { p.nsp = '/'; } + // handle namespace query + if (p.nsp.indexOf('?') !== -1) { + p.nsp = p.nsp.split('?')[0]; + } + // look up id let next = str.charAt(i + 1); if ('' !== next && !isNaN(Number(next))) { diff --git a/packages/websocket/src/socket-io/socket.ts b/packages/websocket/src/socket-io/socket.ts index e2ee76ba..8fd9ea1b 100644 --- a/packages/websocket/src/socket-io/socket.ts +++ b/packages/websocket/src/socket-io/socket.ts @@ -5,6 +5,7 @@ import { Packet } from './packet'; import { PacketTypes, SubPacketTypes } from './types'; import { Client } from './client'; import { Namespace } from './namespace'; +import * as querystring from 'querystring' export class Socket extends EventEmitter implements SocketIO.Socket { nsp: Namespace; @@ -45,7 +46,7 @@ export class Socket extends EventEmitter implements SocketIO.Socket { this.acks = {}; this.connected = true; this.disconnected = false; - // this.handshake = this.buildHandshake(query); + this.handshake = this.buildHandshake(query); this.fns = []; this.flags = {}; this._rooms = []; @@ -97,6 +98,14 @@ export class Socket extends EventEmitter implements SocketIO.Socket { fn && fn(null); return this; } + this.adapter.addAll(this.id, rooms, (err) => { + if (err) return fn && fn(err); + // debug('joined room %s', rooms); + (rooms as Array).forEach((room) => { + this.rooms[room] = room; + }); + fn && fn(null); + }); return this; } leave(name: string, fn?: Function): SocketIO.Socket { @@ -127,16 +136,21 @@ export class Socket extends EventEmitter implements SocketIO.Socket { // ========================================== buildHandshake(query): SocketIO.Handshake { - let requestQuery = this.request.uri(); + let requestUri = this.request.uri(); + let headers = {}; + let nativeHeaders = this.request.headers(); + nativeHeaders.forEach(function (header) { + headers[header.getKey()] = header.getValue(); + }) return { - headers: this.request.headers(), + headers: headers, time: (new Date) + '', - address: this.conn.remoteAddress, - xdomain: !!this.request.headers.origin, - secure: !!this.request.connection.encrypted, + address: this.conn.remoteAddress + '', + xdomain: !!headers['origin'], + secure: false, issued: +(new Date), - url: this.request.url, - query: Object.assign(query, requestQuery) + url: requestUri, + query: Object.assign(query, querystring.parse(requestUri.indexOf('?') != -1 ? requestUri.split('?')[1] : '')) } } emit(event: string, ...args: any[]): boolean { @@ -183,10 +197,11 @@ export class Socket extends EventEmitter implements SocketIO.Socket { // @ts-ignore return this; } - packet(packet: Packet, opts?: any) { - packet.nsp = this.nsp.name; - opts = opts || {}; - opts.compress = false !== opts.compress; + packet(packet: Packet, opts: any = { preEncoded: false }) { + if (!opts.preEncoded) { + packet.nsp = this.nsp.name; + opts.compress = false !== opts.compress; + } this.client.packet(packet, opts); } onconnect() {