From 53502e12cf959ded80d5dd073c20c7d8fa0c387e Mon Sep 17 00:00:00 2001 From: MiaoWoo Date: Sat, 14 Aug 2021 12:43:20 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=8C=E5=96=84client=E7=9B=B8?= =?UTF-8?q?=E5=85=B3=E5=8A=9F=E8=83=BD=20=E9=87=8D=E6=9E=84server=E9=83=A8?= =?UTF-8?q?=E5=88=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: MiaoWoo --- packages/websocket/package.json | 4 + packages/websocket/src/client/index.ts | 5 +- packages/websocket/src/client/netty/index.ts | 2 +- packages/websocket/src/client/transport.ts | 2 + packages/websocket/src/debug.ts | 1 + .../websocket/src/engine.io-client/index.ts | 16 + .../websocket/src/engine.io-client/socket.ts | 688 +++++++++++++++ .../src/engine.io-client/transport.ts | 119 +++ .../src/engine.io-client/transports/index.ts | 4 + .../engine.io-client/transports/websocket.ts | 259 ++++++ .../websocket/src/engine.io-client/util.ts | 23 + packages/websocket/src/index.ts | 1 + packages/websocket/src/server/index.ts | 7 +- .../{ => server}/netty/adapter/httprequest.ts | 0 .../src/{ => server}/netty/adapter/index.ts | 0 .../netty/adapter/text_websocket_frame.ts | 0 .../{ => server}/netty/adapter/websocket.ts | 0 .../src/{ => server}/netty/client.ts | 2 +- .../src/{ => server}/netty/constants.ts | 0 .../src/{ => server}/netty/httprequest.ts | 4 +- .../websocket/src/{ => server}/netty/index.ts | 6 +- .../netty/text_websocket_frame.ts | 4 +- .../{ => server}/netty/websocket_detect.ts | 2 +- .../{ => server}/netty/websocket_handler.ts | 4 +- .../src/{ => server}/tomcat/client.ts | 2 +- .../src/{ => server}/tomcat/constants.ts | 0 .../src/{ => server}/tomcat/index.ts | 4 +- .../websocket/src/socket.io-client/index.ts | 105 +++ .../websocket/src/socket.io-client/manager.ts | 816 ++++++++++++++++++ packages/websocket/src/socket.io-client/on.ts | 14 + .../websocket/src/socket.io-client/socket.ts | 558 ++++++++++++ .../src/socket.io-client/typed-events.ts | 157 ++++ .../websocket/src/socket.io-client/url.ts | 97 +++ packages/websocket/tsconfig.json | 7 +- 34 files changed, 2893 insertions(+), 20 deletions(-) create mode 100644 packages/websocket/src/debug.ts create mode 100644 packages/websocket/src/engine.io-client/index.ts create mode 100644 packages/websocket/src/engine.io-client/socket.ts create mode 100644 packages/websocket/src/engine.io-client/transport.ts create mode 100755 packages/websocket/src/engine.io-client/transports/index.ts create mode 100644 packages/websocket/src/engine.io-client/transports/websocket.ts create mode 100644 packages/websocket/src/engine.io-client/util.ts rename packages/websocket/src/{ => server}/netty/adapter/httprequest.ts (100%) rename packages/websocket/src/{ => server}/netty/adapter/index.ts (100%) rename packages/websocket/src/{ => server}/netty/adapter/text_websocket_frame.ts (100%) rename packages/websocket/src/{ => server}/netty/adapter/websocket.ts (100%) rename packages/websocket/src/{ => server}/netty/client.ts (92%) rename packages/websocket/src/{ => server}/netty/constants.ts (100%) rename packages/websocket/src/{ => server}/netty/httprequest.ts (98%) rename packages/websocket/src/{ => server}/netty/index.ts (94%) rename packages/websocket/src/{ => server}/netty/text_websocket_frame.ts (89%) rename packages/websocket/src/{ => server}/netty/websocket_detect.ts (92%) rename packages/websocket/src/{ => server}/netty/websocket_handler.ts (96%) rename packages/websocket/src/{ => server}/tomcat/client.ts (92%) rename packages/websocket/src/{ => server}/tomcat/constants.ts (100%) rename packages/websocket/src/{ => server}/tomcat/index.ts (95%) create mode 100644 packages/websocket/src/socket.io-client/index.ts create mode 100644 packages/websocket/src/socket.io-client/manager.ts create mode 100644 packages/websocket/src/socket.io-client/on.ts create mode 100644 packages/websocket/src/socket.io-client/socket.ts create mode 100644 packages/websocket/src/socket.io-client/typed-events.ts create mode 100644 packages/websocket/src/socket.io-client/url.ts diff --git a/packages/websocket/package.json b/packages/websocket/package.json index ea4420c8..565dc96e 100644 --- a/packages/websocket/package.json +++ b/packages/websocket/package.json @@ -18,6 +18,10 @@ "build": "yarn clean && tsc", "test": "echo \"Error: run tests from root\" && exit 1" }, + "dependencies": { + "backo2": "^1.0.2", + "parseuri": "^0.0.6" + }, "devDependencies": { "@ccms/nashorn": "^0.16.0", "@javatypes/tomcat-websocket-api": "^0.0.3", diff --git a/packages/websocket/src/client/index.ts b/packages/websocket/src/client/index.ts index 7dd1339b..5d6d4c4e 100644 --- a/packages/websocket/src/client/index.ts +++ b/packages/websocket/src/client/index.ts @@ -46,22 +46,25 @@ export class WebSocket extends EventEmitter { try { let TransportImpl = require('./netty').NettyWebSocket this.client = new TransportImpl(url, subProtocol, headers) + console.debug('create websocket from ' + this.client.constructor.name) } catch (error) { console.error('create websocket impl error: ' + error) console.ex(error) return } this.client.on('open', (event) => { + console.debug('client WebSocket call open', this.onopen) this.onopen?.(event) manager.add(this) }) this.client.on('message', (event) => this.onmessage?.(event)) this.client.on('close', (event) => { + console.log('client WebSocket call close', this.onclose) this.onclose?.(event) manager.del(this) }) this.client.on('error', (event) => this.onerror?.(event)) - this.client.connect() + setTimeout(() => this.client.connect(), 20) } get id() { return this.client.id diff --git a/packages/websocket/src/client/netty/index.ts b/packages/websocket/src/client/netty/index.ts index 9aea6470..4857d44a 100644 --- a/packages/websocket/src/client/netty/index.ts +++ b/packages/websocket/src/client/netty/index.ts @@ -63,7 +63,7 @@ export class NettyWebSocket extends Transport { this._port = 80 } } - console.debug(`constructor NettyWebSocket url: ${url} scheme: ${this._schema} host: ${this._host} port: ${this._port}`) + console.debug(`constructor NettyWebSocket url: ${url} scheme: ${this._schema} host: ${this._host} port: ${this._port} header: ${JSON.stringify(headers)}`) } getId() { return this.channel?.id() + '' diff --git a/packages/websocket/src/client/transport.ts b/packages/websocket/src/client/transport.ts index 30d086ef..90b5bac2 100644 --- a/packages/websocket/src/client/transport.ts +++ b/packages/websocket/src/client/transport.ts @@ -32,9 +32,11 @@ export abstract class Transport extends EventEmitter { } connect() { + console.debug(`client Transport connect`) try { this.doConnect() } catch (error) { + console.ex(error) this.onerror({ error }) } } diff --git a/packages/websocket/src/debug.ts b/packages/websocket/src/debug.ts new file mode 100644 index 00000000..c0e345d0 --- /dev/null +++ b/packages/websocket/src/debug.ts @@ -0,0 +1 @@ +export = (namepsace) => (...args) => { }//console.debug(namepsace, ...args) diff --git a/packages/websocket/src/engine.io-client/index.ts b/packages/websocket/src/engine.io-client/index.ts new file mode 100644 index 00000000..30fd00be --- /dev/null +++ b/packages/websocket/src/engine.io-client/index.ts @@ -0,0 +1,16 @@ +import { Socket } from './socket' + +export default (uri, opts) => new Socket(uri, opts) + +/** + * Expose deps for legacy compatibility + * and standalone browser access. + */ +const protocol = Socket.protocol // this is an int +export { Socket, protocol } +// module.exports.Transport = require("./transport") +// module.exports.transports = require("./transports/index") +// module.exports.parser = require("../engine.io-parser") +export * from './transport' +export * from './transports/index' +export * from '../engine.io-parser' diff --git a/packages/websocket/src/engine.io-client/socket.ts b/packages/websocket/src/engine.io-client/socket.ts new file mode 100644 index 00000000..1a11b538 --- /dev/null +++ b/packages/websocket/src/engine.io-client/socket.ts @@ -0,0 +1,688 @@ +import transports from "./transports" +// const transports = require("./transports/index") +const Emitter = require("component-emitter") +const debug = (...args: any) => console.debug('engine.io-client:socket', ...args)//require("debug")("engine.io-client:socket") +import parser from "../engine.io-parser" +const parseuri = require("parseuri") +const parseqs = require("parseqs") +import { installTimerFunctions } from "./util" + +export class Socket extends Emitter { + /** + * Socket constructor. + * + * @param {String|Object} uri or options + * @param {Object} options + * @api public + */ + constructor(uri, opts: any = {}) { + super() + + if (uri && "object" === typeof uri) { + opts = uri + uri = null + } + + if (uri) { + uri = parseuri(uri) + opts.hostname = uri.host + opts.secure = uri.protocol === "https" || uri.protocol === "wss" + opts.port = uri.port + if (uri.query) opts.query = uri.query + } else if (opts.host) { + opts.hostname = parseuri(opts.host).host + } + + installTimerFunctions(this, opts) + + this.secure = + null != opts.secure + ? opts.secure + : typeof location !== "undefined" && "https:" === location.protocol + + if (opts.hostname && !opts.port) { + // if no port is specified manually, use the protocol default + opts.port = this.secure ? "443" : "80" + } + + this.hostname = + opts.hostname || + (typeof location !== "undefined" ? location.hostname : "localhost") + this.port = + opts.port || + (typeof location !== "undefined" && location.port + ? location.port + : this.secure + ? 443 + : 80) + + this.transports = ["websocket"] + this.readyState = "" + this.writeBuffer = [] + this.prevBufferLen = 0 + + this.opts = Object.assign( + { + path: "/engine.io", + agent: false, + withCredentials: false, + upgrade: true, + jsonp: true, + timestampParam: "t", + rememberUpgrade: false, + rejectUnauthorized: true, + perMessageDeflate: { + threshold: 1024 + }, + transportOptions: {}, + closeOnBeforeunload: true + }, + opts + ) + + this.opts.path = this.opts.path.replace(/\/$/, "") + "/" + + if (typeof this.opts.query === "string") { + this.opts.query = parseqs.decode(this.opts.query) + } + + // set on handshake + this.id = null + this.upgrades = null + this.pingInterval = null + this.pingTimeout = null + + // set on heartbeat + this.pingTimeoutTimer = null + + if (typeof addEventListener === "function") { + if (this.opts.closeOnBeforeunload) { + // Firefox closes the connection when the "beforeunload" event is emitted but not Chrome. This event listener + // ensures every browser behaves the same (no "disconnect" event at the Socket.IO level when the page is + // closed/reloaded) + addEventListener( + "beforeunload", + () => { + if (this.transport) { + // silently close the transport + this.transport.removeAllListeners() + this.transport.close() + } + }, + false + ) + } + if (this.hostname !== "localhost") { + this.offlineEventListener = () => { + this.onClose("transport close") + } + addEventListener("offline", this.offlineEventListener, false) + } + } + + this.open() + } + + /** + * Creates transport of the given type. + * + * @param {String} transport name + * @return {Transport} + * @api private + */ + createTransport(name, opt?) { + if (name != 'websocket') { + throw new Error('Only Support WebSocket in MiaoScript!') + } + debug('creating transport "%s"', name) + const query: any = clone(this.opts.query) + + // append engine.io protocol identifier + query.EIO = parser.protocol + + // transport name + query.transport = name + + // session id if we already have one + if (this.id) query.sid = this.id + + const opts = Object.assign( + {}, + this.opts.transportOptions[name], + this.opts, + { + query, + socket: this, + hostname: this.hostname, + secure: this.secure, + port: this.port + } + ) + + debug("options: %j", JSON.stringify(opts)) + debug("new func", transports[name]) + return new transports[name](opts) + } + + /** + * Initializes transport to use and starts probe. + * + * @api private + */ + open() { + let transport + if ( + this.opts.rememberUpgrade && + Socket.priorWebsocketSuccess && + this.transports.indexOf("websocket") !== -1 + ) { + transport = "websocket" + } else if (0 === this.transports.length) { + // Emit error on next tick so it can be listened to + this.setTimeoutFn(() => { + this.emit("error", "No transports available") + }, 0) + return + } else { + transport = this.transports[0] + } + this.readyState = "opening" + + // Retry with the next transport if the transport is disabled (jsonp: false) + try { + transport = this.createTransport(transport) + } catch (e) { + debug("error while creating transport: %s", e) + this.transports.shift() + this.open() + return + } + + transport.open() + this.setTransport(transport) + } + + /** + * Sets the current transport. Disables the existing one (if any). + * + * @api private + */ + setTransport(transport) { + debug("setting transport %s", transport.name) + + if (this.transport) { + debug("clearing existing transport %s", this.transport.name) + this.transport.removeAllListeners() + } + + // set up transport + this.transport = transport + + // set up transport listeners + transport + .on("drain", this.onDrain.bind(this)) + .on("packet", this.onPacket.bind(this)) + .on("error", this.onError.bind(this)) + .on("close", () => { + this.onClose("transport close") + }) + } + + /** + * Probes a transport. + * + * @param {String} transport name + * @api private + */ + probe(name) { + debug('probing transport "%s"', name) + let transport = this.createTransport(name, { probe: 1 }) + let failed = false + + Socket.priorWebsocketSuccess = false + + const onTransportOpen = () => { + if (failed) return + + debug('probe transport "%s" opened', name) + transport.send([{ type: "ping", data: "probe" }]) + transport.once("packet", msg => { + if (failed) return + if ("pong" === msg.type && "probe" === msg.data) { + debug('probe transport "%s" pong', name) + this.upgrading = true + this.emit("upgrading", transport) + if (!transport) return + Socket.priorWebsocketSuccess = "websocket" === transport.name + + debug('pausing current transport "%s"', this.transport.name) + this.transport.pause(() => { + if (failed) return + if ("closed" === this.readyState) return + debug("changing transport and sending upgrade packet") + + cleanup() + + this.setTransport(transport) + transport.send([{ type: "upgrade" }]) + this.emit("upgrade", transport) + transport = null + this.upgrading = false + this.flush() + }) + } else { + debug('probe transport "%s" failed', name) + const err: any = new Error("probe error") + err.transport = transport.name + this.emit("upgradeError", err) + } + }) + } + + function freezeTransport() { + if (failed) return + + // Any callback called by transport should be ignored since now + failed = true + + cleanup() + + transport.close() + transport = null + } + + // Handle any error that happens while probing + const onerror = err => { + const error: any = new Error("probe error: " + err) + error.transport = transport.name + + freezeTransport() + + debug('probe transport "%s" failed because of error: %s', name, err) + + this.emit("upgradeError", error) + } + + function onTransportClose() { + onerror("transport closed") + } + + // When the socket is closed while we're probing + function onclose() { + onerror("socket closed") + } + + // When the socket is upgraded while we're probing + function onupgrade(to) { + if (transport && to.name !== transport.name) { + debug('"%s" works - aborting "%s"', to.name, transport.name) + freezeTransport() + } + } + + // Remove all listeners on the transport and on self + const cleanup = () => { + transport.removeListener("open", onTransportOpen) + transport.removeListener("error", onerror) + transport.removeListener("close", onTransportClose) + this.removeListener("close", onclose) + this.removeListener("upgrading", onupgrade) + } + + transport.once("open", onTransportOpen) + transport.once("error", onerror) + transport.once("close", onTransportClose) + + this.once("close", onclose) + this.once("upgrading", onupgrade) + + transport.open() + } + + /** + * Called when connection is deemed open. + * + * @api public + */ + onOpen() { + debug("socket open") + this.readyState = "open" + Socket.priorWebsocketSuccess = "websocket" === this.transport.name + this.emit("open") + this.flush() + + // we check for `readyState` in case an `open` + // listener already closed the socket + if ( + "open" === this.readyState && + this.opts.upgrade && + this.transport.pause + ) { + debug("starting upgrade probes") + let i = 0 + const l = this.upgrades.length + for (; i < l; i++) { + this.probe(this.upgrades[i]) + } + } + } + + /** + * Handles a packet. + * + * @api private + */ + onPacket(packet) { + if ( + "opening" === this.readyState || + "open" === this.readyState || + "closing" === this.readyState + ) { + debug('socket receive: type "%s", data "%s"', packet.type, packet.data) + + this.emit("packet", packet) + + // Socket is live - any packet counts + this.emit("heartbeat") + + switch (packet.type) { + case "open": + this.onHandshake(JSON.parse(packet.data)) + break + + case "ping": + this.resetPingTimeout() + this.sendPacket("pong") + this.emit("ping") + this.emit("pong") + break + + case "error": + const err: any = new Error("server error") + err.code = packet.data + this.onError(err) + break + + case "message": + this.emit("data", packet.data) + this.emit("message", packet.data) + break + } + } else { + debug('packet received with socket readyState "%s"', this.readyState) + } + } + + /** + * Called upon handshake completion. + * + * @param {Object} handshake obj + * @api private + */ + onHandshake(data) { + this.emit("handshake", data) + this.id = data.sid + this.transport.query.sid = data.sid + this.upgrades = this.filterUpgrades(data.upgrades) + this.pingInterval = data.pingInterval + this.pingTimeout = data.pingTimeout + this.onOpen() + // In case open handler closes socket + if ("closed" === this.readyState) return + this.resetPingTimeout() + } + + /** + * Sets and resets ping timeout timer based on server pings. + * + * @api private + */ + resetPingTimeout() { + this.clearTimeoutFn(this.pingTimeoutTimer) + this.pingTimeoutTimer = this.setTimeoutFn(() => { + this.onClose("ping timeout") + }, this.pingInterval + this.pingTimeout) + if (this.opts.autoUnref) { + this.pingTimeoutTimer.unref() + } + } + + /** + * Called on `drain` event + * + * @api private + */ + onDrain() { + this.writeBuffer.splice(0, this.prevBufferLen) + + // setting prevBufferLen = 0 is very important + // for example, when upgrading, upgrade packet is sent over, + // and a nonzero prevBufferLen could cause problems on `drain` + this.prevBufferLen = 0 + + if (0 === this.writeBuffer.length) { + this.emit("drain") + } else { + this.flush() + } + } + + /** + * Flush write buffers. + * + * @api private + */ + flush() { + if ( + "closed" !== this.readyState && + this.transport.writable && + !this.upgrading && + this.writeBuffer.length + ) { + debug("flushing %d packets in socket", this.writeBuffer.length) + this.transport.send(this.writeBuffer) + // keep track of current length of writeBuffer + // splice writeBuffer and callbackBuffer on `drain` + this.prevBufferLen = this.writeBuffer.length + this.emit("flush") + } + } + + /** + * Sends a message. + * + * @param {String} message. + * @param {Function} callback function. + * @param {Object} options. + * @return {Socket} for chaining. + * @api public + */ + write(msg, options, fn) { + this.sendPacket("message", msg, options, fn) + return this + } + + send(msg, options, fn) { + this.sendPacket("message", msg, options, fn) + return this + } + + /** + * Sends a packet. + * + * @param {String} packet type. + * @param {String} data. + * @param {Object} options. + * @param {Function} callback function. + * @api private + */ + sendPacket(type, data?, options?, fn?) { + if ("function" === typeof data) { + fn = data + data = undefined + } + + if ("function" === typeof options) { + fn = options + options = null + } + + if ("closing" === this.readyState || "closed" === this.readyState) { + return + } + + options = options || {} + options.compress = false !== options.compress + + const packet = { + type: type, + data: data, + options: options + } + this.emit("packetCreate", packet) + this.writeBuffer.push(packet) + if (fn) this.once("flush", fn) + this.flush() + } + + /** + * Closes the connection. + * + * @api private + */ + close() { + const close = () => { + this.onClose("forced close") + debug("socket closing - telling transport to close") + this.transport.close() + } + + const cleanupAndClose = () => { + this.removeListener("upgrade", cleanupAndClose) + this.removeListener("upgradeError", cleanupAndClose) + close() + } + + const waitForUpgrade = () => { + // wait for upgrade to finish since we can't send packets while pausing a transport + this.once("upgrade", cleanupAndClose) + this.once("upgradeError", cleanupAndClose) + } + + if ("opening" === this.readyState || "open" === this.readyState) { + this.readyState = "closing" + + if (this.writeBuffer.length) { + this.once("drain", () => { + if (this.upgrading) { + waitForUpgrade() + } else { + close() + } + }) + } else if (this.upgrading) { + waitForUpgrade() + } else { + close() + } + } + + return this + } + + /** + * Called upon transport error + * + * @api private + */ + onError(err) { + debug("socket error %j", err) + Socket.priorWebsocketSuccess = false + this.emit("error", err) + this.onClose("transport error", err) + } + + /** + * Called upon transport close. + * + * @api private + */ + onClose(reason, desc?) { + if ( + "opening" === this.readyState || + "open" === this.readyState || + "closing" === this.readyState + ) { + debug('socket close with reason: "%s"', reason) + + // clear timers + this.clearTimeoutFn(this.pingIntervalTimer) + this.clearTimeoutFn(this.pingTimeoutTimer) + + // stop event from firing again for transport + this.transport.removeAllListeners("close") + + // ensure transport won't stay open + this.transport.close() + + // ignore further transport communication + this.transport.removeAllListeners() + + if (typeof removeEventListener === "function") { + removeEventListener("offline", this.offlineEventListener, false) + } + + // set ready state + this.readyState = "closed" + + // clear session id + this.id = null + + // emit close event + this.emit("close", reason, desc) + + // clean buffers after, so users can still + // grab the buffers on `close` event + this.writeBuffer = [] + this.prevBufferLen = 0 + } + } + + /** + * Filters upgrades, returning only those matching client transports. + * + * @param {Array} server upgrades + * @api private + * + */ + filterUpgrades(upgrades) { + const filteredUpgrades = [] + let i = 0 + const j = upgrades.length + for (; i < j; i++) { + if (~this.transports.indexOf(upgrades[i])) + filteredUpgrades.push(upgrades[i]) + } + return filteredUpgrades + } +} + +Socket.priorWebsocketSuccess = false + +/** + * Protocol version. + * + * @api public + */ + +Socket.protocol = parser.protocol // this is an int + +function clone(obj) { + const o = {} + for (let i in obj) { + if (obj.hasOwnProperty(i)) { + o[i] = obj[i] + } + } + return o +} diff --git a/packages/websocket/src/engine.io-client/transport.ts b/packages/websocket/src/engine.io-client/transport.ts new file mode 100644 index 00000000..03561387 --- /dev/null +++ b/packages/websocket/src/engine.io-client/transport.ts @@ -0,0 +1,119 @@ +import parser from "../engine.io-parser" +const Emitter = require("component-emitter") +import { installTimerFunctions } from "./util" +const debug = (...args: any) => console.debug('engine.io-client:transport', ...args)//require("debug")("engine.io-client:transport") + +export class Transport extends Emitter { + /** + * Transport abstract constructor. + * + * @param {Object} options. + * @api private + */ + constructor(opts) { + super() + installTimerFunctions(this, opts) + + this.opts = opts + this.query = opts.query + this.readyState = "" + this.socket = opts.socket + } + + /** + * Emits an error. + * + * @param {String} str + * @return {Transport} for chaining + * @api public + */ + onError(msg, desc) { + const err: any = new Error(msg) + err.type = "TransportError" + err.description = desc + this.emit("error", err) + return this + } + + /** + * Opens the transport. + * + * @api public + */ + open() { + if ("closed" === this.readyState || "" === this.readyState) { + this.readyState = "opening" + this.doOpen() + } + + return this + } + + /** + * Closes the transport. + * + * @api private + */ + close() { + if ("opening" === this.readyState || "open" === this.readyState) { + this.doClose() + this.onClose() + } + + return this + } + + /** + * Sends multiple packets. + * + * @param {Array} packets + * @api private + */ + send(packets) { + if ("open" === this.readyState) { + this.write(packets) + } else { + // this might happen if the transport was silently closed in the beforeunload event handler + debug("transport is not open, discarding packets") + } + } + + /** + * Called upon open + * + * @api private + */ + onOpen() { + this.readyState = "open" + this.writable = true + this.emit("open") + } + + /** + * Called with data. + * + * @param {String} data + * @api private + */ + onData(data) { + const packet = parser.decodePacket(data, this.socket.binaryType) + this.onPacket(packet) + } + + /** + * Called with a decoded packet. + */ + onPacket(packet) { + this.emit("packet", packet) + } + + /** + * Called upon close. + * + * @api private + */ + onClose() { + this.readyState = "closed" + this.emit("close") + } +} diff --git a/packages/websocket/src/engine.io-client/transports/index.ts b/packages/websocket/src/engine.io-client/transports/index.ts new file mode 100755 index 00000000..c279d9d8 --- /dev/null +++ b/packages/websocket/src/engine.io-client/transports/index.ts @@ -0,0 +1,4 @@ +import { WS } from "./websocket" +export default { + 'websocket': WS +} diff --git a/packages/websocket/src/engine.io-client/transports/websocket.ts b/packages/websocket/src/engine.io-client/transports/websocket.ts new file mode 100644 index 00000000..3d6ec804 --- /dev/null +++ b/packages/websocket/src/engine.io-client/transports/websocket.ts @@ -0,0 +1,259 @@ +import { Transport } from '../transport' +// const Transport = require("../transport") +import parser from '../../engine.io-parser' +// const parser = require("../engine.io-parser") +const parseqs = require("parseqs") +const yeast = require("yeast") +import { pick } from '../util' +// const { pick } = require("../util") +import { WebSocket } from '../../client' +const usingBrowserWebSocket = true +// const { +// WebSocket, +// usingBrowserWebSocket, +// defaultBinaryType, +// nextTick +// } = require("./websocket-constructor") + +const debug = (...args: any) => console.debug('engine.io-client:websocket', ...args)//require("debug")("engine.io-client:websocket") + +// detect ReactNative environment +const isReactNative = + typeof navigator !== "undefined" && + typeof navigator.product === "string" && + navigator.product.toLowerCase() === "reactnative" + +export class WS extends Transport { + /** + * WebSocket transport constructor. + * + * @api {Object} connection options + * @api public + */ + constructor(opts) { + super(opts) + + this.supportsBinary = !opts.forceBase64 + } + + /** + * Transport name. + * + * @api public + */ + get name() { + return "websocket" + } + + /** + * Opens socket. + * + * @api private + */ + doOpen() { + if (!this.check()) { + // let probe timeout + return + } + + const uri = this.uri() + const protocols = this.opts.protocols + + // React Native only supports the 'headers' option, and will print a warning if anything else is passed + const opts = isReactNative + ? {} + : pick( + this.opts, + "agent", + "perMessageDeflate", + "pfx", + "key", + "passphrase", + "cert", + "ca", + "ciphers", + "rejectUnauthorized", + "localAddress", + "protocolVersion", + "origin", + "maxPayload", + "family", + "checkServerIdentity" + ) + + if (this.opts.extraHeaders) { + opts.headers = this.opts.extraHeaders + } + + try { + this.ws = new WebSocket(uri, protocols) + // usingBrowserWebSocket && !isReactNative + // ? protocols + // ? new WebSocket(uri, protocols) + // : new WebSocket(uri) + // : new WebSocket(uri, protocols, opts) + } catch (err) { + return this.emit("error", err) + } + + this.ws.binaryType = this.socket.binaryType || 'arraybuffer' + + this.addEventListeners() + } + + /** + * Adds event listeners to the socket + * + * @api private + */ + addEventListeners() { + this.ws.onopen = () => { + if (this.opts.autoUnref) { + this.ws._socket.unref() + } + this.onOpen() + } + this.ws.onclose = this.onClose.bind(this) + this.ws.onmessage = ev => this.onData(ev.data) + this.ws.onerror = e => this.onError("websocket error", e) + } + + /** + * Writes data to socket. + * + * @param {Array} array of packets. + * @api private + */ + write(packets) { + this.writable = false + + // encodePacket efficient as it uses WS framing + // no need for encodePayload + for (let i = 0; i < packets.length; i++) { + const packet = packets[i] + const lastPacket = i === packets.length - 1 + + parser.encodePacket(packet, this.supportsBinary, data => { + // always create a new object (GH-437) + const opts: any = {} + if (!usingBrowserWebSocket) { + if (packet.options) { + opts.compress = packet.options.compress + } + + if (this.opts.perMessageDeflate) { + const len = + "string" === typeof data ? Buffer.byteLength(data) : data.length + if (len < this.opts.perMessageDeflate.threshold) { + opts.compress = false + } + } + } + + // Sometimes the websocket has already been closed but the browser didn't + // have a chance of informing us about it yet, in that case send will + // throw an error + try { + if (usingBrowserWebSocket) { + // TypeError is thrown when passing the second argument on Safari + this.ws.send(data) + } else { + this.ws.send(data, opts) + } + } catch (e) { + debug("websocket closed before onclose event") + } + + if (lastPacket) { + // fake drain + // defer to next tick to allow Socket to clear writeBuffer + process.nextTick(() => { + this.writable = true + this.emit("drain") + }, this.setTimeoutFn) + } + }) + } + } + + /** + * Called upon close + * + * @api private + */ + onClose() { + Transport.prototype.onClose.call(this) + } + + /** + * Closes socket. + * + * @api private + */ + doClose() { + if (typeof this.ws !== "undefined") { + this.ws.close() + this.ws = null + } + } + + /** + * Generates uri for connection. + * + * @api private + */ + uri() { + let query = this.query || {} + const schema = this.opts.secure ? "wss" : "ws" + let port = "" + + // avoid port if default for schema + if ( + this.opts.port && + (("wss" === schema && Number(this.opts.port) !== 443) || + ("ws" === schema && Number(this.opts.port) !== 80)) + ) { + port = ":" + this.opts.port + } + + // append timestamp to URI + if (this.opts.timestampRequests) { + query[this.opts.timestampParam] = yeast() + } + + // communicate binary support capabilities + if (!this.supportsBinary) { + query.b64 = 1 + } + + query = parseqs.encode(query) + + // prepend ? to query + if (query.length) { + query = "?" + query + } + + const ipv6 = this.opts.hostname.indexOf(":") !== -1 + return ( + schema + + "://" + + (ipv6 ? "[" + this.opts.hostname + "]" : this.opts.hostname) + + port + + this.opts.path + + query + ) + } + + /** + * Feature detection for WebSocket. + * + * @return {Boolean} whether this transport is available. + * @api public + */ + check() { + return ( + !!WebSocket && + !("__initialize" in WebSocket && this.name === WS.prototype.name) + ) + } +} diff --git a/packages/websocket/src/engine.io-client/util.ts b/packages/websocket/src/engine.io-client/util.ts new file mode 100644 index 00000000..878b0c91 --- /dev/null +++ b/packages/websocket/src/engine.io-client/util.ts @@ -0,0 +1,23 @@ +const pick = (obj, ...attr) => { + return attr.reduce((acc, k) => { + if (obj.hasOwnProperty(k)) { + acc[k] = obj[k] + } + return acc + }, {}) +} + +// Keep a reference to the real timeout functions so they can be used when overridden +const NATIVE_SET_TIMEOUT = setTimeout +const NATIVE_CLEAR_TIMEOUT = clearTimeout + +const installTimerFunctions = (obj, opts) => { + if (opts.useNativeTimers) { + obj.setTimeoutFn = NATIVE_SET_TIMEOUT.bind(globalThis) + obj.clearTimeoutFn = NATIVE_CLEAR_TIMEOUT.bind(globalThis) + } else { + obj.setTimeoutFn = setTimeout.bind(globalThis) + obj.clearTimeoutFn = clearTimeout.bind(globalThis) + } +} +export { pick, installTimerFunctions } diff --git a/packages/websocket/src/index.ts b/packages/websocket/src/index.ts index 68be7294..9b39efee 100644 --- a/packages/websocket/src/index.ts +++ b/packages/websocket/src/index.ts @@ -48,3 +48,4 @@ export * from './socket.io' export * from './client' export * from './server' export * from './engine.io/transport' +export * as client from './socket.io-client' diff --git a/packages/websocket/src/server/index.ts b/packages/websocket/src/server/index.ts index c888d85e..b56fb19b 100644 --- a/packages/websocket/src/server/index.ts +++ b/packages/websocket/src/server/index.ts @@ -49,9 +49,6 @@ export abstract class WebSocketServer extends EventEmitter { this.execute(handler, (websocket) => websocket.emit(ServerEvent.disconnect, cause)) } protected onerror(handler: any, error: Error) { - if (global.debug) { - console.ex(error) - } this.execute(handler, (websocket) => websocket.emit(ServerEvent.error, error)) } protected execute(handler: any, callback: (websocket: WebSocketClient) => void) { @@ -82,9 +79,9 @@ export const attach = (instance, options) => { }, options) let WebSocketServerImpl = undefined if (instance.class.name.startsWith('io.netty.channel')) { - WebSocketServerImpl = require("../netty").NettyWebSocketServer + WebSocketServerImpl = require("./netty").NettyWebSocketServer } else { - WebSocketServerImpl = require("../tomcat").TomcatWebSocketServer + WebSocketServerImpl = require("./tomcat").TomcatWebSocketServer } return new WebSocketServerImpl(instance, options) } diff --git a/packages/websocket/src/netty/adapter/httprequest.ts b/packages/websocket/src/server/netty/adapter/httprequest.ts similarity index 100% rename from packages/websocket/src/netty/adapter/httprequest.ts rename to packages/websocket/src/server/netty/adapter/httprequest.ts diff --git a/packages/websocket/src/netty/adapter/index.ts b/packages/websocket/src/server/netty/adapter/index.ts similarity index 100% rename from packages/websocket/src/netty/adapter/index.ts rename to packages/websocket/src/server/netty/adapter/index.ts diff --git a/packages/websocket/src/netty/adapter/text_websocket_frame.ts b/packages/websocket/src/server/netty/adapter/text_websocket_frame.ts similarity index 100% rename from packages/websocket/src/netty/adapter/text_websocket_frame.ts rename to packages/websocket/src/server/netty/adapter/text_websocket_frame.ts diff --git a/packages/websocket/src/netty/adapter/websocket.ts b/packages/websocket/src/server/netty/adapter/websocket.ts similarity index 100% rename from packages/websocket/src/netty/adapter/websocket.ts rename to packages/websocket/src/server/netty/adapter/websocket.ts diff --git a/packages/websocket/src/netty/client.ts b/packages/websocket/src/server/netty/client.ts similarity index 92% rename from packages/websocket/src/netty/client.ts rename to packages/websocket/src/server/netty/client.ts index be267ccc..5b9f18a8 100644 --- a/packages/websocket/src/netty/client.ts +++ b/packages/websocket/src/server/netty/client.ts @@ -1,4 +1,4 @@ -import { WebSocketClient } from '../server/client' +import { WebSocketClient } from '../client' const TextWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.TextWebSocketFrame') diff --git a/packages/websocket/src/netty/constants.ts b/packages/websocket/src/server/netty/constants.ts similarity index 100% rename from packages/websocket/src/netty/constants.ts rename to packages/websocket/src/server/netty/constants.ts diff --git a/packages/websocket/src/netty/httprequest.ts b/packages/websocket/src/server/netty/httprequest.ts similarity index 98% rename from packages/websocket/src/netty/httprequest.ts rename to packages/websocket/src/server/netty/httprequest.ts index d7eb07f4..211e34c8 100644 --- a/packages/websocket/src/netty/httprequest.ts +++ b/packages/websocket/src/server/netty/httprequest.ts @@ -1,8 +1,8 @@ -import { JavaServerOptions } from '../server' - import { HttpRequestHandlerAdapter } from './adapter' import { AttributeKeys } from './constants' +import type { JavaServerOptions } from '../' + const DefaultHttpResponse = Java.type('io.netty.handler.codec.http.DefaultHttpResponse') const DefaultFullHttpResponse = Java.type('io.netty.handler.codec.http.DefaultFullHttpResponse') const HttpHeaders = Java.type('io.netty.handler.codec.http.HttpHeaders') diff --git a/packages/websocket/src/netty/index.ts b/packages/websocket/src/server/netty/index.ts similarity index 94% rename from packages/websocket/src/netty/index.ts rename to packages/websocket/src/server/netty/index.ts index 86bb4336..df4a9e4d 100644 --- a/packages/websocket/src/netty/index.ts +++ b/packages/websocket/src/server/netty/index.ts @@ -1,11 +1,13 @@ -import { JavaServerOptions, ServerEvent, WebSocketServer } from '../server' -import { Request } from '../server/request' +import { ServerEvent, WebSocketServer } from '../' +import { Request } from '../request' import { NettyClient } from './client' import { AttributeKeys, Keys } from './constants' import { WebSocketDetect } from './websocket_detect' import { WebSocketHandler } from './websocket_handler' +import type { JavaServerOptions } from '../' + class NettyWebSocketServer extends WebSocketServer { constructor(pipeline: any, options: JavaServerOptions) { super(pipeline, options) diff --git a/packages/websocket/src/netty/text_websocket_frame.ts b/packages/websocket/src/server/netty/text_websocket_frame.ts similarity index 89% rename from packages/websocket/src/netty/text_websocket_frame.ts rename to packages/websocket/src/server/netty/text_websocket_frame.ts index e0d1fd7f..c5f4dd2f 100644 --- a/packages/websocket/src/netty/text_websocket_frame.ts +++ b/packages/websocket/src/server/netty/text_websocket_frame.ts @@ -1,8 +1,10 @@ import { EventEmitter } from 'events' -import { JavaServerOptions, ServerEvent } from '../server' +import { ServerEvent } from '../' import { TextWebSocketFrameHandlerAdapter } from './adapter' +import type { JavaServerOptions } from '../' + export class TextWebSocketFrameHandler extends TextWebSocketFrameHandlerAdapter { private event: EventEmitter constructor(options: JavaServerOptions) { diff --git a/packages/websocket/src/netty/websocket_detect.ts b/packages/websocket/src/server/netty/websocket_detect.ts similarity index 92% rename from packages/websocket/src/netty/websocket_detect.ts rename to packages/websocket/src/server/netty/websocket_detect.ts index 4122a3fb..b2657941 100644 --- a/packages/websocket/src/netty/websocket_detect.ts +++ b/packages/websocket/src/server/netty/websocket_detect.ts @@ -1,7 +1,7 @@ import { EventEmitter } from 'events' import { WebSocketHandlerAdapter } from "./adapter" -import { ServerEvent } from '../server' +import { ServerEvent } from '../' export class WebSocketDetect extends WebSocketHandlerAdapter { private event: EventEmitter diff --git a/packages/websocket/src/netty/websocket_handler.ts b/packages/websocket/src/server/netty/websocket_handler.ts similarity index 96% rename from packages/websocket/src/netty/websocket_handler.ts rename to packages/websocket/src/server/netty/websocket_handler.ts index 1b40898b..446f0ebf 100644 --- a/packages/websocket/src/netty/websocket_handler.ts +++ b/packages/websocket/src/server/netty/websocket_handler.ts @@ -1,10 +1,12 @@ -import { JavaServerOptions, ServerEvent } from '../server' +import { ServerEvent } from '../' import { Keys } from './constants' import { HttpRequestHandler } from './httprequest' import { WebSocketHandlerAdapter } from "./adapter" import { TextWebSocketFrameHandler } from './text_websocket_frame' +import type { JavaServerOptions } from '../' + const CharsetUtil = Java.type('io.netty.util.CharsetUtil') const HttpServerCodec = Java.type('io.netty.handler.codec.http.HttpServerCodec') const ChunkedWriteHandler = Java.type('io.netty.handler.stream.ChunkedWriteHandler') diff --git a/packages/websocket/src/tomcat/client.ts b/packages/websocket/src/server/tomcat/client.ts similarity index 92% rename from packages/websocket/src/tomcat/client.ts rename to packages/websocket/src/server/tomcat/client.ts index ba1a3263..6ad726cd 100644 --- a/packages/websocket/src/tomcat/client.ts +++ b/packages/websocket/src/server/tomcat/client.ts @@ -1,4 +1,4 @@ -import { WebSocketClient } from '../server/client' +import { WebSocketClient } from '../client' export class TomcatClient extends WebSocketClient { private session: javax.websocket.Session diff --git a/packages/websocket/src/tomcat/constants.ts b/packages/websocket/src/server/tomcat/constants.ts similarity index 100% rename from packages/websocket/src/tomcat/constants.ts rename to packages/websocket/src/server/tomcat/constants.ts diff --git a/packages/websocket/src/tomcat/index.ts b/packages/websocket/src/server/tomcat/index.ts similarity index 95% rename from packages/websocket/src/tomcat/index.ts rename to packages/websocket/src/server/tomcat/index.ts index 7cd65590..ccceb44c 100644 --- a/packages/websocket/src/tomcat/index.ts +++ b/packages/websocket/src/server/tomcat/index.ts @@ -1,5 +1,5 @@ -import { JavaServerOptions, WebSocketServer } from '../server' -import { Request } from '../server/request' +import { JavaServerOptions, WebSocketServer } from '../' +import { Request } from '../request' import { TomcatClient } from './client' import { ProxyBeanName } from './constants' diff --git a/packages/websocket/src/socket.io-client/index.ts b/packages/websocket/src/socket.io-client/index.ts new file mode 100644 index 00000000..28fc606c --- /dev/null +++ b/packages/websocket/src/socket.io-client/index.ts @@ -0,0 +1,105 @@ +import { url } from "./url" +import { Manager, ManagerOptions } from "./manager" +import { Socket, SocketOptions } from "./socket" + +const debug = require("../debug")("socket.io-client") + +/** + * Module exports. + */ + +module.exports = exports = lookup + +/** + * Managers cache. + */ +const cache: Record = (exports.managers = {}) + +/** + * Looks up an existing `Manager` for multiplexing. + * If the user summons: + * + * `io('http://localhost/a');` + * `io('http://localhost/b');` + * + * We reuse the existing instance based on same scheme/port/host, + * and we initialize sockets for each namespace. + * + * @public + */ +function lookup(opts?: Partial): Socket +function lookup( + uri: string, + opts?: Partial +): Socket +function lookup( + uri: string | Partial, + opts?: Partial +): Socket +function lookup( + uri: string | Partial, + opts?: Partial +): Socket { + if (typeof uri === "object") { + opts = uri + uri = undefined + } + + opts = opts || {} + + const parsed = url(uri as string, opts.path || "/socket.io") + const source = parsed.source + const id = parsed.id + const path = parsed.path + const sameNamespace = cache[id] && path in cache[id]["nsps"] + const newConnection = + opts.forceNew || + opts["force new connection"] || + false === opts.multiplex || + sameNamespace + + let io: Manager + + if (newConnection) { + debug("ignoring socket cache for %s", source) + io = new Manager(source, opts) + } else { + if (!cache[id]) { + debug("new io instance for %s", source) + cache[id] = new Manager(source, opts) + } + io = cache[id] + } + if (parsed.query && !opts.query) { + opts.query = parsed.queryKey + } + return io.socket(parsed.path, opts) +} + +/** + * Protocol version. + * + * @public + */ + +export { protocol } from "../socket.io-parser" + +/** + * `connect`. + * + * @param {String} uri + * @public + */ + +exports.connect = lookup + +/** + * Expose constructors for standalone build. + * + * @public + */ + +export { Manager, ManagerOptions } from "./manager" +export { Socket } from "./socket" +export { lookup as io, SocketOptions } +export default lookup diff --git a/packages/websocket/src/socket.io-client/manager.ts b/packages/websocket/src/socket.io-client/manager.ts new file mode 100644 index 00000000..8e5dfe74 --- /dev/null +++ b/packages/websocket/src/socket.io-client/manager.ts @@ -0,0 +1,816 @@ +import eio from "../engine.io-client" +import { Socket, SocketOptions } from "./socket" +import * as parser from "../socket.io-parser" +import { Decoder, Encoder, Packet } from "../socket.io-parser" +import { on } from "./on" +import * as Backoff from "backo2" +import { + DefaultEventsMap, + EventsMap, + StrictEventEmitter, +} from "./typed-events" + +const debug = require("../debug")("socket.io-client") + +interface EngineOptions { + /** + * The host that we're connecting to. Set from the URI passed when connecting + */ + host: string + + /** + * The hostname for our connection. Set from the URI passed when connecting + */ + hostname: string + + /** + * If this is a secure connection. Set from the URI passed when connecting + */ + secure: boolean + + /** + * The port for our connection. Set from the URI passed when connecting + */ + port: string + + /** + * Any query parameters in our uri. Set from the URI passed when connecting + */ + query: { [key: string]: string } + + /** + * `http.Agent` to use, defaults to `false` (NodeJS only) + */ + agent: string | boolean + + /** + * Whether the client should try to upgrade the transport from + * long-polling to something better. + * @default true + */ + upgrade: boolean + + /** + * Forces JSONP for polling transport. + */ + forceJSONP: boolean + + /** + * Determines whether to use JSONP when necessary for polling. If + * disabled (by settings to false) an error will be emitted (saying + * "No transports available") if no other transports are available. + * If another transport is available for opening a connection (e.g. + * WebSocket) that transport will be used instead. + * @default true + */ + jsonp: boolean + + /** + * Forces base 64 encoding for polling transport even when XHR2 + * responseType is available and WebSocket even if the used standard + * supports binary. + */ + forceBase64: boolean + + /** + * Enables XDomainRequest for IE8 to avoid loading bar flashing with + * click sound. default to `false` because XDomainRequest has a flaw + * of not sending cookie. + * @default false + */ + enablesXDR: boolean + + /** + * The param name to use as our timestamp key + * @default 't' + */ + timestampParam: string + + /** + * Whether to add the timestamp with each transport request. Note: this + * is ignored if the browser is IE or Android, in which case requests + * are always stamped + * @default false + */ + timestampRequests: boolean + + /** + * A list of transports to try (in order). Engine.io always attempts to + * connect directly with the first one, provided the feature detection test + * for it passes. + * @default ['polling','websocket'] + */ + transports: string[] + + /** + * The port the policy server listens on + * @default 843 + */ + policyPost: number + + /** + * If true and if the previous websocket connection to the server succeeded, + * the connection attempt will bypass the normal upgrade process and will + * initially try websocket. A connection attempt following a transport error + * will use the normal upgrade process. It is recommended you turn this on + * only when using SSL/TLS connections, or if you know that your network does + * not block websockets. + * @default false + */ + rememberUpgrade: boolean + + /** + * Are we only interested in transports that support binary? + */ + onlyBinaryUpgrades: boolean + + /** + * Timeout for xhr-polling requests in milliseconds (0) (only for polling transport) + */ + requestTimeout: number + + /** + * Transport options for Node.js client (headers etc) + */ + transportOptions: Object + + /** + * (SSL) Certificate, Private key and CA certificates to use for SSL. + * Can be used in Node.js client environment to manually specify + * certificate information. + */ + pfx: string + + /** + * (SSL) Private key to use for SSL. Can be used in Node.js client + * environment to manually specify certificate information. + */ + key: string + + /** + * (SSL) A string or passphrase for the private key or pfx. Can be + * used in Node.js client environment to manually specify certificate + * information. + */ + passphrase: string + + /** + * (SSL) Public x509 certificate to use. Can be used in Node.js client + * environment to manually specify certificate information. + */ + cert: string + + /** + * (SSL) An authority certificate or array of authority certificates to + * check the remote host against.. Can be used in Node.js client + * environment to manually specify certificate information. + */ + ca: string | string[] + + /** + * (SSL) A string describing the ciphers to use or exclude. Consult the + * [cipher format list] + * (http://www.openssl.org/docs/apps/ciphers.html#CIPHER_LIST_FORMAT) for + * details on the format.. Can be used in Node.js client environment to + * manually specify certificate information. + */ + ciphers: string + + /** + * (SSL) If true, the server certificate is verified against the list of + * supplied CAs. An 'error' event is emitted if verification fails. + * Verification happens at the connection level, before the HTTP request + * is sent. Can be used in Node.js client environment to manually specify + * certificate information. + */ + rejectUnauthorized: boolean + + /** + * Headers that will be passed for each request to the server (via xhr-polling and via websockets). + * These values then can be used during handshake or for special proxies. + */ + extraHeaders?: { [header: string]: string } + + /** + * Whether to include credentials (cookies, authorization headers, TLS + * client certificates, etc.) with cross-origin XHR polling requests + * @default false + */ + withCredentials: boolean + + /** + * Whether to automatically close the connection whenever the beforeunload event is received. + * @default true + */ + closeOnBeforeunload: boolean +} + +export interface ManagerOptions extends EngineOptions { + /** + * Should we force a new Manager for this connection? + * @default false + */ + forceNew: boolean + + /** + * Should we multiplex our connection (reuse existing Manager) ? + * @default true + */ + multiplex: boolean + + /** + * The path to get our client file from, in the case of the server + * serving it + * @default '/socket.io' + */ + path: string + + /** + * Should we allow reconnections? + * @default true + */ + reconnection: boolean + + /** + * How many reconnection attempts should we try? + * @default Infinity + */ + reconnectionAttempts: number + + /** + * The time delay in milliseconds between reconnection attempts + * @default 1000 + */ + reconnectionDelay: number + + /** + * The max time delay in milliseconds between reconnection attempts + * @default 5000 + */ + reconnectionDelayMax: number + + /** + * Used in the exponential backoff jitter when reconnecting + * @default 0.5 + */ + randomizationFactor: number + + /** + * The timeout in milliseconds for our connection attempt + * @default 20000 + */ + timeout: number + + /** + * Should we automatically connect? + * @default true + */ + autoConnect: boolean + + /** + * weather we should unref the reconnect timer when it is + * create automatically + * @default false + */ + autoUnref: boolean + + /** + * the parser to use. Defaults to an instance of the Parser that ships with socket.io. + */ + parser: any +} + +interface ManagerReservedEvents { + open: () => void + error: (err: Error) => void + ping: () => void + packet: (packet: Packet) => void + close: (reason: string) => void + reconnect_failed: () => void + reconnect_attempt: (attempt: number) => void + reconnect_error: (err: Error) => void + reconnect: (attempt: number) => void +} + +export class Manager< + ListenEvents extends EventsMap = DefaultEventsMap, + EmitEvents extends EventsMap = ListenEvents + > extends StrictEventEmitter<{}, {}, ManagerReservedEvents> { + /** + * The Engine.IO client instance + * + * @public + */ + public engine: any + /** + * @private + */ + _autoConnect: boolean + /** + * @private + */ + _readyState: "opening" | "open" | "closed" + /** + * @private + */ + _reconnecting: boolean + + private readonly uri: string + public opts: Partial + + private nsps: Record = {}; + private subs: Array> = []; + private backoff: Backoff + private _reconnection: boolean + private _reconnectionAttempts: number + private _reconnectionDelay: number + private _randomizationFactor: number + private _reconnectionDelayMax: number + private _timeout: any + + private encoder: Encoder + private decoder: Decoder + private skipReconnect: boolean + + /** + * `Manager` constructor. + * + * @param uri - engine instance or engine uri/opts + * @param opts - options + * @public + */ + constructor(opts: Partial) + constructor(uri?: string, opts?: Partial) + constructor( + uri?: string | Partial, + opts?: Partial + ) + constructor( + uri?: string | Partial, + opts?: Partial + ) { + super() + if (uri && "object" === typeof uri) { + opts = uri + uri = undefined + } + opts = opts || {} + + opts.path = opts.path || "/socket.io" + this.opts = opts + this.reconnection(opts.reconnection !== false) + this.reconnectionAttempts(opts.reconnectionAttempts || Infinity) + this.reconnectionDelay(opts.reconnectionDelay || 1000) + this.reconnectionDelayMax(opts.reconnectionDelayMax || 5000) + this.randomizationFactor(opts.randomizationFactor ?? 0.5) + this.backoff = new Backoff({ + min: this.reconnectionDelay(), + max: this.reconnectionDelayMax(), + jitter: this.randomizationFactor(), + }) + this.timeout(null == opts.timeout ? 20000 : opts.timeout) + this._readyState = "closed" + this.uri = uri as string + const _parser = opts.parser || parser + this.encoder = new _parser.Encoder() + this.decoder = new _parser.Decoder() + this._autoConnect = opts.autoConnect !== false + if (this._autoConnect) this.open() + } + + /** + * Sets the `reconnection` config. + * + * @param {Boolean} v - true/false if it should automatically reconnect + * @return {Manager} self or value + * @public + */ + public reconnection(v: boolean): this + public reconnection(): boolean + public reconnection(v?: boolean): this | boolean + public reconnection(v?: boolean): this | boolean { + if (!arguments.length) return this._reconnection + this._reconnection = !!v + return this + } + + /** + * Sets the reconnection attempts config. + * + * @param {Number} v - max reconnection attempts before giving up + * @return {Manager} self or value + * @public + */ + public reconnectionAttempts(v: number): this + public reconnectionAttempts(): number + public reconnectionAttempts(v?: number): this | number + public reconnectionAttempts(v?: number): this | number { + if (v === undefined) return this._reconnectionAttempts + this._reconnectionAttempts = v + return this + } + + /** + * Sets the delay between reconnections. + * + * @param {Number} v - delay + * @return {Manager} self or value + * @public + */ + public reconnectionDelay(v: number): this + public reconnectionDelay(): number + public reconnectionDelay(v?: number): this | number + public reconnectionDelay(v?: number): this | number { + if (v === undefined) return this._reconnectionDelay + this._reconnectionDelay = v + this.backoff?.setMin(v) + return this + } + + /** + * Sets the randomization factor + * + * @param v - the randomization factor + * @return self or value + * @public + */ + public randomizationFactor(v: number): this + public randomizationFactor(): number + public randomizationFactor(v?: number): this | number + public randomizationFactor(v?: number): this | number { + if (v === undefined) return this._randomizationFactor + this._randomizationFactor = v + this.backoff?.setJitter(v) + return this + } + + /** + * Sets the maximum delay between reconnections. + * + * @param v - delay + * @return self or value + * @public + */ + public reconnectionDelayMax(v: number): this + public reconnectionDelayMax(): number + public reconnectionDelayMax(v?: number): this | number + public reconnectionDelayMax(v?: number): this | number { + if (v === undefined) return this._reconnectionDelayMax + this._reconnectionDelayMax = v + this.backoff?.setMax(v) + return this + } + + /** + * Sets the connection timeout. `false` to disable + * + * @param v - connection timeout + * @return self or value + * @public + */ + public timeout(v: number | boolean): this + public timeout(): number | boolean + public timeout(v?: number | boolean): this | number | boolean + public timeout(v?: number | boolean): this | number | boolean { + if (!arguments.length) return this._timeout + this._timeout = v + return this + } + + /** + * Starts trying to reconnect if reconnection is enabled and we have not + * started reconnecting yet + * + * @private + */ + private maybeReconnectOnOpen() { + // Only try to reconnect if it's the first time we're connecting + if ( + !this._reconnecting && + this._reconnection && + this.backoff.attempts === 0 + ) { + // keeps reconnection from firing twice for the same reconnection loop + this.reconnect() + } + } + + /** + * Sets the current transport `socket`. + * + * @param {Function} fn - optional, callback + * @return self + * @public + */ + public open(fn?: (err?: Error) => void): this { + debug("readyState %s", this._readyState) + if (~this._readyState.indexOf("open")) return this + + debug("opening %s", this.uri) + // @ts-ignore + this.engine = eio(this.uri, this.opts) + const socket = this.engine + const self = this + this._readyState = "opening" + this.skipReconnect = false + + // emit `open` + const openSubDestroy = on(socket, "open", function () { + self.onopen() + fn && fn() + }) + + // emit `error` + const errorSub = on(socket, "error", (err) => { + debug("error") + self.cleanup() + self._readyState = "closed" + this.emitReserved("error", err) + if (fn) { + fn(err) + } else { + // Only do this if there is no fn to handle the error + self.maybeReconnectOnOpen() + } + }) + + if (false !== this._timeout) { + const timeout = this._timeout + debug("connect attempt will timeout after %d", timeout) + + if (timeout === 0) { + openSubDestroy() // prevents a race condition with the 'open' event + } + + // set timer + const timer = setTimeout(() => { + debug("connect attempt timed out after %d", timeout) + openSubDestroy() + socket.close() + socket.emit("error", new Error("timeout")) + }, timeout) + + if (this.opts.autoUnref) { + timer.unref() + } + + this.subs.push(function subDestroy(): void { + clearTimeout(timer) + }) + } + + this.subs.push(openSubDestroy) + this.subs.push(errorSub) + + return this + } + + /** + * Alias for open() + * + * @return self + * @public + */ + public connect(fn?: (err?: Error) => void): this { + return this.open(fn) + } + + /** + * Called upon transport open. + * + * @private + */ + private onopen(): void { + debug("open") + + // clear old subs + this.cleanup() + + // mark as open + this._readyState = "open" + this.emitReserved("open") + + // add new subs + const socket = this.engine + this.subs.push( + on(socket, "ping", this.onping.bind(this)), + on(socket, "data", this.ondata.bind(this)), + on(socket, "error", this.onerror.bind(this)), + on(socket, "close", this.onclose.bind(this)), + on(this.decoder, "decoded", this.ondecoded.bind(this)) + ) + } + + /** + * Called upon a ping. + * + * @private + */ + private onping(): void { + this.emitReserved("ping") + } + + /** + * Called with data. + * + * @private + */ + private ondata(data): void { + this.decoder.add(data) + } + + /** + * Called when parser fully decodes a packet. + * + * @private + */ + private ondecoded(packet): void { + this.emitReserved("packet", packet) + } + + /** + * Called upon socket error. + * + * @private + */ + private onerror(err): void { + debug("error", err) + this.emitReserved("error", err) + } + + /** + * Creates a new socket for the given `nsp`. + * + * @return {Socket} + * @public + */ + public socket(nsp: string, opts?: Partial): Socket { + let socket = this.nsps[nsp] + if (!socket) { + socket = new Socket(this, nsp, opts) + this.nsps[nsp] = socket + } + + return socket + } + + /** + * Called upon a socket close. + * + * @param socket + * @private + */ + _destroy(socket: Socket): void { + const nsps = Object.keys(this.nsps) + + for (const nsp of nsps) { + const socket = this.nsps[nsp] + + if (socket.active) { + debug("socket %s is still active, skipping close", nsp) + return + } + } + + this._close() + } + + /** + * Writes a packet. + * + * @param packet + * @private + */ + _packet(packet: Partial): void { + debug("writing packet %j", packet) + + const encodedPackets = this.encoder.encode(packet as Packet) + for (let i = 0; i < encodedPackets.length; i++) { + this.engine.write(encodedPackets[i], packet.options) + } + } + + /** + * Clean up transport subscriptions and packet buffer. + * + * @private + */ + private cleanup(): void { + debug("cleanup") + + this.subs.forEach((subDestroy) => subDestroy()) + this.subs.length = 0 + + this.decoder.destroy() + } + + /** + * Close the current socket. + * + * @private + */ + _close(): void { + debug("disconnect") + this.skipReconnect = true + this._reconnecting = false + if ("opening" === this._readyState) { + // `onclose` will not fire because + // an open event never happened + this.cleanup() + } + this.backoff.reset() + this._readyState = "closed" + if (this.engine) this.engine.close() + } + + /** + * Alias for close() + * + * @private + */ + private disconnect(): void { + return this._close() + } + + /** + * Called upon engine close. + * + * @private + */ + private onclose(reason: string): void { + debug("onclose") + + this.cleanup() + this.backoff.reset() + this._readyState = "closed" + this.emitReserved("close", reason) + + if (this._reconnection && !this.skipReconnect) { + this.reconnect() + } + } + + /** + * Attempt a reconnection. + * + * @private + */ + private reconnect(): this | void { + if (this._reconnecting || this.skipReconnect) return this + + const self = this + + if (this.backoff.attempts >= this._reconnectionAttempts) { + debug("reconnect failed") + this.backoff.reset() + this.emitReserved("reconnect_failed") + this._reconnecting = false + } else { + const delay = this.backoff.duration() + debug("will wait %dms before reconnect attempt", delay) + + this._reconnecting = true + const timer = setTimeout(() => { + if (self.skipReconnect) return + + debug("attempting reconnect") + this.emitReserved("reconnect_attempt", self.backoff.attempts) + + // check again for the case socket closed in above events + if (self.skipReconnect) return + + self.open((err) => { + if (err) { + debug("reconnect attempt error") + self._reconnecting = false + self.reconnect() + this.emitReserved("reconnect_error", err) + } else { + debug("reconnect success") + self.onreconnect() + } + }) + }, delay) + + if (this.opts.autoUnref) { + timer.unref() + } + + this.subs.push(function subDestroy() { + clearTimeout(timer) + }) + } + } + + /** + * Called upon successful reconnect. + * + * @private + */ + private onreconnect(): void { + const attempt = this.backoff.attempts + this._reconnecting = false + this.backoff.reset() + this.emitReserved("reconnect", attempt) + } +} diff --git a/packages/websocket/src/socket.io-client/on.ts b/packages/websocket/src/socket.io-client/on.ts new file mode 100644 index 00000000..7f27647b --- /dev/null +++ b/packages/websocket/src/socket.io-client/on.ts @@ -0,0 +1,14 @@ +// import type * as Emitter from "component-emitter"; +import { EventEmitter } from "events" +import { StrictEventEmitter } from "./typed-events" + +export function on( + obj: EventEmitter | StrictEventEmitter, + ev: string, + fn: (err?: any) => any +): VoidFunction { + obj.on(ev, fn) + return function subDestroy(): void { + obj.off(ev, fn) + } +} diff --git a/packages/websocket/src/socket.io-client/socket.ts b/packages/websocket/src/socket.io-client/socket.ts new file mode 100644 index 00000000..67ea5b9c --- /dev/null +++ b/packages/websocket/src/socket.io-client/socket.ts @@ -0,0 +1,558 @@ +import { Packet, PacketType } from "../socket.io-parser" +import { on } from "./on" +import { Manager } from "./manager" +import { + DefaultEventsMap, + EventNames, + EventParams, + EventsMap, + StrictEventEmitter, +} from "./typed-events" + +const debug = require("../debug")("socket.io-client") + +export interface SocketOptions { + /** + * the authentication payload sent when connecting to the Namespace + */ + auth: { [key: string]: any } | ((cb: (data: object) => void) => void) +} + +/** + * Internal events. + * These events can't be emitted by the user. + */ +const RESERVED_EVENTS = Object.freeze({ + connect: 1, + connect_error: 1, + disconnect: 1, + disconnecting: 1, + // EventEmitter reserved events: https://nodejs.org/api/events.html#events_event_newlistener + newListener: 1, + removeListener: 1, +}) + +interface Flags { + compress?: boolean + volatile?: boolean +} + +interface SocketReservedEvents { + connect: () => void + connect_error: (err: Error) => void + disconnect: (reason: Socket.DisconnectReason) => void +} + +export class Socket< + ListenEvents extends EventsMap = DefaultEventsMap, + EmitEvents extends EventsMap = ListenEvents + > extends StrictEventEmitter { + public readonly io: Manager + + public id: string + public connected: boolean + public disconnected: boolean + + public auth: { [key: string]: any } | ((cb: (data: object) => void) => void) + public receiveBuffer: Array> = []; + public sendBuffer: Array = []; + + private readonly nsp: string + + private ids: number = 0; + private acks: object = {}; + private flags: Flags = {}; + private subs?: Array + private _anyListeners: Array<(...args: any[]) => void> + + /** + * `Socket` constructor. + * + * @public + */ + constructor(io: Manager, nsp: string, opts?: Partial) { + super() + this.io = io + this.nsp = nsp + this.ids = 0 + this.acks = {} + this.receiveBuffer = [] + this.sendBuffer = [] + this.connected = false + this.disconnected = true + this.flags = {} + if (opts && opts.auth) { + this.auth = opts.auth + } + if (this.io._autoConnect) this.open() + } + + /** + * Subscribe to open, close and packet events + * + * @private + */ + private subEvents(): void { + if (this.subs) return + + const io = this.io + this.subs = [ + on(io, "open", this.onopen.bind(this)), + on(io, "packet", this.onpacket.bind(this)), + on(io, "error", this.onerror.bind(this)), + on(io, "close", this.onclose.bind(this)), + ] + } + + /** + * Whether the Socket will try to reconnect when its Manager connects or reconnects + */ + public get active(): boolean { + return !!this.subs + } + + /** + * "Opens" the socket. + * + * @public + */ + public connect(): this { + if (this.connected) return this + + this.subEvents() + if (!this.io["_reconnecting"]) this.io.open() // ensure open + if ("open" === this.io._readyState) this.onopen() + return this + } + + /** + * Alias for connect() + */ + public open(): this { + return this.connect() + } + + /** + * Sends a `message` event. + * + * @return self + * @public + */ + public send(...args: any[]): this { + args.unshift("message") + // @ts-ignore + this.emit.apply(this, args) + return this + } + + /** + * Override `emit`. + * If the event is in `events`, it's emitted normally. + * + * @return self + * @public + */ + public emit>( + ev: Ev, + ...args: EventParams + ): this { + if (RESERVED_EVENTS.hasOwnProperty(ev)) { + throw new Error('"' + ev + '" is a reserved event name') + } + + args.unshift(ev) + const packet: any = { + type: PacketType.EVENT, + data: args, + } + + packet.options = {} + packet.options.compress = this.flags.compress !== false + + // event ack callback + if ("function" === typeof args[args.length - 1]) { + debug("emitting packet with ack id %d", this.ids) + this.acks[this.ids] = args.pop() + packet.id = this.ids++ + } + + const isTransportWritable = + this.io.engine && + this.io.engine.transport && + this.io.engine.transport.writable + + const discardPacket = + this.flags.volatile && (!isTransportWritable || !this.connected) + if (discardPacket) { + debug("discard packet as the transport is not currently writable") + } else if (this.connected) { + this.packet(packet) + } else { + this.sendBuffer.push(packet) + } + + this.flags = {} + + return this + } + + /** + * Sends a packet. + * + * @param packet + * @private + */ + private packet(packet: Partial): void { + packet.nsp = this.nsp + this.io._packet(packet) + } + + /** + * Called upon engine `open`. + * + * @private + */ + private onopen(): void { + debug("transport is open - connecting") + if (typeof this.auth == "function") { + this.auth((data) => { + this.packet({ type: PacketType.CONNECT, data }) + }) + } else { + this.packet({ type: PacketType.CONNECT, data: this.auth }) + } + } + + /** + * Called upon engine or manager `error`. + * + * @param err + * @private + */ + private onerror(err: Error): void { + if (!this.connected) { + this.emitReserved("connect_error", err) + } + } + + /** + * Called upon engine `close`. + * + * @param reason + * @private + */ + private onclose(reason: Socket.DisconnectReason): void { + debug("close (%s)", reason) + this.connected = false + this.disconnected = true + delete this.id + this.emitReserved("disconnect", reason) + } + + /** + * Called with socket packet. + * + * @param packet + * @private + */ + private onpacket(packet: Packet): void { + const sameNamespace = packet.nsp === this.nsp + + if (!sameNamespace) return + + switch (packet.type) { + case PacketType.CONNECT: + if (packet.data && packet.data.sid) { + const id = packet.data.sid + this.onconnect(id) + } else { + this.emitReserved( + "connect_error", + new Error( + "It seems you are trying to reach a Socket.IO server in v2.x with a v3.x client, but they are not compatible (more information here: https://socket.io/docs/v3/migrating-from-2-x-to-3-0/)" + ) + ) + } + break + + case PacketType.EVENT: + this.onevent(packet) + break + + case PacketType.BINARY_EVENT: + this.onevent(packet) + break + + case PacketType.ACK: + this.onack(packet) + break + + case PacketType.BINARY_ACK: + this.onack(packet) + break + + case PacketType.DISCONNECT: + this.ondisconnect() + break + + case PacketType.CONNECT_ERROR: + const err = new Error(packet.data.message) + // @ts-ignore + err.data = packet.data.data + this.emitReserved("connect_error", err) + break + } + } + + /** + * Called upon a server event. + * + * @param packet + * @private + */ + private onevent(packet: Packet): void { + const args: Array = packet.data || [] + debug("emitting event %j", args) + + if (null != packet.id) { + debug("attaching ack callback to event") + args.push(this.ack(packet.id)) + } + + if (this.connected) { + this.emitEvent(args) + } else { + this.receiveBuffer.push(Object.freeze(args)) + } + } + + private emitEvent(args: ReadonlyArray): void { + if (this._anyListeners && this._anyListeners.length) { + const listeners = this._anyListeners.slice() + for (const listener of listeners) { + // @ts-ignore + listener.apply(this, args) + } + } + // @ts-ignore + super.emit.apply(this, args) + } + + /** + * Produces an ack callback to emit with an event. + * + * @private + */ + private ack(id: number): (...args: any[]) => void { + const self = this + let sent = false + return function (...args: any[]) { + // prevent double callbacks + if (sent) return + sent = true + debug("sending ack %j", args) + + self.packet({ + type: PacketType.ACK, + id: id, + data: args, + }) + } + } + + /** + * Called upon a server acknowlegement. + * + * @param packet + * @private + */ + private onack(packet: Packet): void { + const ack = this.acks[packet.id] + if ("function" === typeof ack) { + debug("calling ack %s with %j", packet.id, packet.data) + ack.apply(this, packet.data) + delete this.acks[packet.id] + } else { + debug("bad ack %s", packet.id) + } + } + + /** + * Called upon server connect. + * + * @private + */ + private onconnect(id: string): void { + debug("socket connected with id %s", id) + this.id = id + this.connected = true + this.disconnected = false + this.emitBuffered() + this.emitReserved("connect") + } + + /** + * Emit buffered events (received and emitted). + * + * @private + */ + private emitBuffered(): void { + this.receiveBuffer.forEach((args) => this.emitEvent(args)) + this.receiveBuffer = [] + + this.sendBuffer.forEach((packet) => this.packet(packet)) + this.sendBuffer = [] + } + + /** + * Called upon server disconnect. + * + * @private + */ + private ondisconnect(): void { + debug("server disconnect (%s)", this.nsp) + this.destroy() + this.onclose("io server disconnect") + } + + /** + * Called upon forced client/server side disconnections, + * this method ensures the manager stops tracking us and + * that reconnections don't get triggered for this. + * + * @private + */ + private destroy(): void { + if (this.subs) { + // clean subscriptions to avoid reconnections + this.subs.forEach((subDestroy) => subDestroy()) + this.subs = undefined + } + this.io["_destroy"](this) + } + + /** + * Disconnects the socket manually. + * + * @return self + * @public + */ + public disconnect(): this { + if (this.connected) { + debug("performing disconnect (%s)", this.nsp) + this.packet({ type: PacketType.DISCONNECT }) + } + + // remove socket from pool + this.destroy() + + if (this.connected) { + // fire events + this.onclose("io client disconnect") + } + return this + } + + /** + * Alias for disconnect() + * + * @return self + * @public + */ + public close(): this { + return this.disconnect() + } + + /** + * Sets the compress flag. + * + * @param compress - if `true`, compresses the sending data + * @return self + * @public + */ + public compress(compress: boolean): this { + this.flags.compress = compress + return this + } + + /** + * Sets a modifier for a subsequent event emission that the event message will be dropped when this socket is not + * ready to send messages. + * + * @returns self + * @public + */ + public get volatile(): this { + this.flags.volatile = true + return this + } + + /** + * Adds a listener that will be fired when any event is emitted. The event name is passed as the first argument to the + * callback. + * + * @param listener + * @public + */ + public onAny(listener: (...args: any[]) => void): this { + this._anyListeners = this._anyListeners || [] + this._anyListeners.push(listener) + return this + } + + /** + * Adds a listener that will be fired when any event is emitted. The event name is passed as the first argument to the + * callback. The listener is added to the beginning of the listeners array. + * + * @param listener + * @public + */ + public prependAny(listener: (...args: any[]) => void): this { + this._anyListeners = this._anyListeners || [] + this._anyListeners.unshift(listener) + return this + } + + /** + * Removes the listener that will be fired when any event is emitted. + * + * @param listener + * @public + */ + public offAny(listener?: (...args: any[]) => void): this { + if (!this._anyListeners) { + return this + } + if (listener) { + const listeners = this._anyListeners + for (let i = 0; i < listeners.length; i++) { + if (listener === listeners[i]) { + listeners.splice(i, 1) + return this + } + } + } else { + this._anyListeners = [] + } + return this + } + + /** + * Returns an array of listeners that are listening for any event that is specified. This array can be manipulated, + * e.g. to remove listeners. + * + * @public + */ + public listenersAny() { + return this._anyListeners || [] + } +} + +export namespace Socket { + export type DisconnectReason = + | "io server disconnect" + | "io client disconnect" + | "ping timeout" + | "transport close" + | "transport error" +} diff --git a/packages/websocket/src/socket.io-client/typed-events.ts b/packages/websocket/src/socket.io-client/typed-events.ts new file mode 100644 index 00000000..997415ce --- /dev/null +++ b/packages/websocket/src/socket.io-client/typed-events.ts @@ -0,0 +1,157 @@ +import { EventEmitter } from "events" + +/** + * An events map is an interface that maps event names to their value, which + * represents the type of the `on` listener. + */ +export interface EventsMap { + [event: string]: any +} + +/** + * The default events map, used if no EventsMap is given. Using this EventsMap + * is equivalent to accepting all event names, and any data. + */ +export interface DefaultEventsMap { + [event: string]: (...args: any[]) => void +} + +/** + * Returns a union type containing all the keys of an event map. + */ +export type EventNames = keyof Map & (string | symbol) + +/** The tuple type representing the parameters of an event listener */ +export type EventParams< + Map extends EventsMap, + Ev extends EventNames + > = Parameters + +/** + * The event names that are either in ReservedEvents or in UserEvents + */ +export type ReservedOrUserEventNames< + ReservedEventsMap extends EventsMap, + UserEvents extends EventsMap + > = EventNames | EventNames + +/** + * Type of a listener of a user event or a reserved event. If `Ev` is in + * `ReservedEvents`, the reserved event listener is returned. + */ +export type ReservedOrUserListener< + ReservedEvents extends EventsMap, + UserEvents extends EventsMap, + Ev extends ReservedOrUserEventNames + > = FallbackToUntypedListener< + Ev extends EventNames + ? ReservedEvents[Ev] + : Ev extends EventNames + ? UserEvents[Ev] + : never + > + +/** + * Returns an untyped listener type if `T` is `never`; otherwise, returns `T`. + * + * This is a hack to mitigate https://github.com/socketio/socket.io/issues/3833. + * Needed because of https://github.com/microsoft/TypeScript/issues/41778 + */ +type FallbackToUntypedListener = [T] extends [never] + ? (...args: any[]) => void + : T + +/** + * Strictly typed version of an `EventEmitter`. A `TypedEventEmitter` takes type + * parameters for mappings of event names to event data types, and strictly + * types method calls to the `EventEmitter` according to these event maps. + * + * @typeParam ListenEvents - `EventsMap` of user-defined events that can be + * listened to with `on` or `once` + * @typeParam EmitEvents - `EventsMap` of user-defined events that can be + * emitted with `emit` + * @typeParam ReservedEvents - `EventsMap` of reserved events, that can be + * emitted by socket.io with `emitReserved`, and can be listened to with + * `listen`. + */ +export abstract class StrictEventEmitter< + ListenEvents extends EventsMap, + EmitEvents extends EventsMap, + ReservedEvents extends EventsMap = {} + > extends EventEmitter { + /** + * Adds the `listener` function as an event listener for `ev`. + * + * @param ev Name of the event + * @param listener Callback function + */ + on>( + ev: Ev, + listener: ReservedOrUserListener + ): this { + super.on(ev as string, listener) + return this + } + + /** + * Adds a one-time `listener` function as an event listener for `ev`. + * + * @param ev Name of the event + * @param listener Callback function + */ + once>( + ev: Ev, + listener: ReservedOrUserListener + ): this { + super.once(ev as string, listener) + return this + } + + /** + * Emits an event. + * + * @param ev Name of the event + * @param args Values to send to listeners of this event + */ + // @ts-ignore + emit>( + ev: Ev, + ...args: EventParams + ): this { + super.emit(ev as string, ...args) + return this + } + + /** + * Emits a reserved event. + * + * This method is `protected`, so that only a class extending + * `StrictEventEmitter` can emit its own reserved events. + * + * @param ev Reserved event name + * @param args Arguments to emit along with the event + */ + protected emitReserved>( + ev: Ev, + ...args: EventParams + ): this { + super.emit(ev as string, ...args) + return this + } + + /** + * Returns the listeners listening to an event. + * + * @param event Event name + * @returns Array of listeners subscribed to `event` + */ + listeners>( + event: Ev + ): ReservedOrUserListener[] { + return super.listeners(event as string) as ReservedOrUserListener< + ReservedEvents, + ListenEvents, + Ev + >[] + } +} diff --git a/packages/websocket/src/socket.io-client/url.ts b/packages/websocket/src/socket.io-client/url.ts new file mode 100644 index 00000000..00729fc4 --- /dev/null +++ b/packages/websocket/src/socket.io-client/url.ts @@ -0,0 +1,97 @@ +import * as parseuri from "parseuri" + +const debug = require("../debug")("socket.io-client") + +type ParsedUrl = { + source: string + protocol: string + authority: string + userInfo: string + user: string + password: string + host: string + port: string + relative: string + path: string + directory: string + file: string + query: string + anchor: string + pathNames: Array + queryKey: { [key: string]: string } + + // Custom properties (not native to parseuri): + id: string + href: string +} + +/** + * URL parser. + * + * @param uri - url + * @param path - the request path of the connection + * @param loc - An object meant to mimic window.location. + * Defaults to window.location. + * @public + */ + +export function url( + uri: string | ParsedUrl, + path: string = "", + loc?: Location +): ParsedUrl { + let obj = uri as ParsedUrl + + // default to window.location + loc = loc || (typeof location !== "undefined" && location) + if (null == uri) uri = loc.protocol + "//" + loc.host + + // relative path support + if (typeof uri === "string") { + if ("/" === uri.charAt(0)) { + if ("/" === uri.charAt(1)) { + uri = loc.protocol + uri + } else { + uri = loc.host + uri + } + } + + if (!/^(https?|wss?):\/\//.test(uri)) { + debug("protocol-less url %s", uri) + if ("undefined" !== typeof loc) { + uri = loc.protocol + "//" + uri + } else { + uri = "https://" + uri + } + } + + // parse + debug("parse %s", uri) + obj = parseuri(uri) as ParsedUrl + } + + // make sure we treat `localhost:80` and `localhost` equally + if (!obj.port) { + if (/^(http|ws)$/.test(obj.protocol)) { + obj.port = "80" + } else if (/^(http|ws)s$/.test(obj.protocol)) { + obj.port = "443" + } + } + + obj.path = obj.path || "/" + + const ipv6 = obj.host.indexOf(":") !== -1 + const host = ipv6 ? "[" + obj.host + "]" : obj.host + + // define unique id + obj.id = obj.protocol + "://" + host + ":" + obj.port + path + // define href + obj.href = + obj.protocol + + "://" + + host + + (loc && loc.port === obj.port ? "" : ":" + obj.port) + + return obj +} diff --git a/packages/websocket/tsconfig.json b/packages/websocket/tsconfig.json index 7aae5d2b..1f9478b0 100644 --- a/packages/websocket/tsconfig.json +++ b/packages/websocket/tsconfig.json @@ -2,6 +2,9 @@ "extends": "../../tsconfig.json", "compilerOptions": { "baseUrl": "src", - "outDir": "dist" + "outDir": "dist", + "lib": [ + "dom" + ] } -} \ No newline at end of file +}