import { EventEmitter } from "events" import { Server } from "./server" import { Transport } from "./transport" import type { Request } from "../server/request" // const debug = require("debug")("engine:socket") export class Socket extends EventEmitter { public id: string private server: Server private upgrading = false private upgraded = false public readyState = "opening" private writeBuffer = [] private packetsFn = [] private sentCallbackFn = [] private cleanupFn = [] public request: Request public protocol: number public remoteAddress: any public transport: Transport private checkIntervalTimer: NodeJS.Timeout private upgradeTimeoutTimer: NodeJS.Timeout private pingTimeoutTimer: NodeJS.Timeout private pingIntervalTimer: NodeJS.Timeout /** * Client class (abstract). * * @api private */ constructor(id: string, server: Server, transport: Transport, req: Request, protocol: number) { super() this.id = id this.server = server this.request = req this.protocol = protocol // Cache IP since it might not be in the req later if (req.websocket && req.websocket._socket) { this.remoteAddress = req.websocket._socket.remoteAddress } else { this.remoteAddress = req.connection.remoteAddress } this.checkIntervalTimer = null this.upgradeTimeoutTimer = null this.pingTimeoutTimer = null this.pingIntervalTimer = null this.setTransport(transport) this.onOpen() } /** * Called upon transport considered open. * * @api private */ onOpen() { this.readyState = "open" // sends an `open` packet this.transport.sid = this.id this.sendPacket( "open", JSON.stringify({ sid: this.id, upgrades: this.getAvailableUpgrades(), pingInterval: this.server.opts.pingInterval, pingTimeout: this.server.opts.pingTimeout }) ) if (this.server.opts.initialPacket) { this.sendPacket("message", this.server.opts.initialPacket) } this.emit("open") if (this.protocol === 3) { // in protocol v3, the client sends a ping, and the server answers with a pong this.resetPingTimeout( this.server.opts.pingInterval + this.server.opts.pingTimeout ) } else { // in protocol v4, the server sends a ping, and the client answers with a pong this.schedulePing() } } /** * Called upon transport packet. * * @param {Object} packet * @api private */ onPacket(packet: { type: any; data: any }) { if ("open" !== this.readyState) { console.debug("packet received with closed socket") return } // export packet event // debug(`received packet ${packet.type}`) this.emit("packet", packet) // Reset ping timeout on any packet, incoming data is a good sign of // other side's liveness this.resetPingTimeout( this.server.opts.pingInterval + this.server.opts.pingTimeout ) switch (packet.type) { case "ping": if (this.transport.protocol !== 3) { this.onError("invalid heartbeat direction") return } // debug("got ping") this.sendPacket("pong") this.emit("heartbeat") break case "pong": if (this.transport.protocol === 3) { this.onError("invalid heartbeat direction") return } // debug("got pong") this.schedulePing() this.emit("heartbeat") break case "error": this.onClose("parse error") break case "message": this.emit("data", packet.data) this.emit("message", packet.data) break } } /** * Called upon transport error. * * @param {Error} error object * @api private */ onError(err: string) { // debug("transport error") this.onClose("transport error", err) } /** * Pings client every `this.pingInterval` and expects response * within `this.pingTimeout` or closes connection. * * @api private */ schedulePing() { clearTimeout(this.pingIntervalTimer) this.pingIntervalTimer = setTimeout(() => { // debug( // "writing ping packet - expecting pong within %sms", // this.server.opts.pingTimeout // ) this.sendPacket("ping") this.resetPingTimeout(this.server.opts.pingTimeout) }, this.server.opts.pingInterval) } /** * Resets ping timeout. * * @api private */ resetPingTimeout(timeout: number) { clearTimeout(this.pingTimeoutTimer) this.pingTimeoutTimer = setTimeout(() => { if (this.readyState === "closed") return this.onClose("ping timeout") }, timeout) } /** * Attaches handlers for the given transport. * * @param {Transport} transport * @api private */ setTransport(transport: Transport) { console.debug(`engine.io socket ${this.id} set transport ${transport.name}`) const onError = this.onError.bind(this) const onPacket = this.onPacket.bind(this) const flush = this.flush.bind(this) const onClose = this.onClose.bind(this, "transport close") this.transport = transport this.transport.once("error", onError) this.transport.on("packet", onPacket) this.transport.on("drain", flush) this.transport.once("close", onClose) // this function will manage packet events (also message callbacks) this.setupSendCallback() this.cleanupFn.push(function () { transport.removeListener("error", onError) transport.removeListener("packet", onPacket) transport.removeListener("drain", flush) transport.removeListener("close", onClose) }) } /** * Upgrades socket to the given transport * * @param {Transport} transport * @api private */ maybeUpgrade(transport: Transport) { console.debug( 'might upgrade socket transport from "', this.transport.name, '" to "', transport.name, '"' ) this.upgrading = true // set transport upgrade timer this.upgradeTimeoutTimer = setTimeout(() => { console.debug("client did not complete upgrade - closing transport") cleanup() if ("open" === transport.readyState) { transport.close() } }, this.server.opts.upgradeTimeout) const onPacket = (packet: { type: string; data: string }) => { if ("ping" === packet.type && "probe" === packet.data) { transport.send([{ type: "pong", data: "probe" }]) this.emit("upgrading", transport) clearInterval(this.checkIntervalTimer) this.checkIntervalTimer = setInterval(check, 100) } else if ("upgrade" === packet.type && this.readyState !== "closed") { // debug("got upgrade packet - upgrading") cleanup() this.transport.discard() this.upgraded = true this.clearTransport() this.setTransport(transport) this.emit("upgrade", transport) this.flush() if (this.readyState === "closing") { transport.close(() => { this.onClose("forced close") }) } } else { cleanup() transport.close() } } // we force a polling cycle to ensure a fast upgrade const check = () => { if ("polling" === this.transport.name && this.transport.writable) { // debug("writing a noop packet to polling for fast upgrade") this.transport.send([{ type: "noop" }]) } } const cleanup = () => { this.upgrading = false clearInterval(this.checkIntervalTimer) this.checkIntervalTimer = null clearTimeout(this.upgradeTimeoutTimer) this.upgradeTimeoutTimer = null transport.removeListener("packet", onPacket) transport.removeListener("close", onTransportClose) transport.removeListener("error", onError) this.removeListener("close", onClose) } const onError = (err: string) => { // debug("client did not complete upgrade - %s", err) cleanup() transport.close() transport = null } const onTransportClose = () => { onError("transport closed") } const onClose = () => { onError("socket closed") } transport.on("packet", onPacket) transport.once("close", onTransportClose) transport.once("error", onError) this.once("close", onClose) } /** * Clears listeners and timers associated with current transport. * * @api private */ clearTransport() { let cleanup: () => void const toCleanUp = this.cleanupFn.length for (let i = 0; i < toCleanUp; i++) { cleanup = this.cleanupFn.shift() cleanup() } // silence further transport errors and prevent uncaught exceptions this.transport.on("error", function () { // debug("error triggered by discarded transport") }) // ensure transport won't stay open this.transport.close() clearTimeout(this.pingTimeoutTimer) } /** * Called upon transport considered closed. * Possible reasons: `ping timeout`, `client error`, `parse error`, * `transport error`, `server close`, `transport close` */ onClose(reason: string, description?: string) { if ("closed" !== this.readyState) { this.readyState = "closed" // clear timers clearTimeout(this.pingIntervalTimer) clearTimeout(this.pingTimeoutTimer) clearInterval(this.checkIntervalTimer) this.checkIntervalTimer = null clearTimeout(this.upgradeTimeoutTimer) // clean writeBuffer in next tick, so developers can still // grab the writeBuffer on 'close' event process.nextTick(() => { this.writeBuffer = [] }) this.packetsFn = [] this.sentCallbackFn = [] this.clearTransport() this.emit("close", reason, description) } } /** * Setup and manage send callback * * @api private */ setupSendCallback() { // the message was sent successfully, execute the callback const onDrain = () => { if (this.sentCallbackFn.length > 0) { const seqFn = this.sentCallbackFn.splice(0, 1)[0] if ("function" === typeof seqFn) { // debug("executing send callback") seqFn(this.transport) } else if (Array.isArray(seqFn)) { // debug("executing batch send callback") const l = seqFn.length let i = 0 for (; i < l; i++) { if ("function" === typeof seqFn[i]) { seqFn[i](this.transport) } } } } } this.transport.on("drain", onDrain) this.cleanupFn.push(() => { this.transport.removeListener("drain", onDrain) }) } /** * Sends a message packet. * * @param {String} message * @param {Object} options * @param {Function} callback * @return {Socket} for chaining * @api public */ send(data: any, options: any, callback: any) { this.sendPacket("message", data, options, callback) return this } write(data: any, options: any, callback?: any) { this.sendPacket("message", data, options, callback) return this } /** * Sends a packet. * * @param {String} packet type * @param {String} optional, data * @param {Object} options * @api private */ sendPacket(type: string, data?: string, options?: { compress?: any }, callback?: undefined) { if ("function" === typeof options) { callback = options options = null } options = options || {} options.compress = false !== options.compress if ("closing" !== this.readyState && "closed" !== this.readyState) { // console.debug('sending packet "%s" (%s)', type, data) const packet: any = { type: type, options: options } if (data) packet.data = data // exports packetCreate event this.emit("packetCreate", packet) this.writeBuffer.push(packet) // add send callback to object, if defined if (callback) this.packetsFn.push(callback) this.flush() } } /** * Attempts to flush the packets buffer. * * @api private */ flush() { if ( "closed" !== this.readyState && this.transport.writable && this.writeBuffer.length ) { console.trace("flushing buffer to transport") this.emit("flush", this.writeBuffer) this.server.emit("flush", this, this.writeBuffer) const wbuf = this.writeBuffer this.writeBuffer = [] if (!this.transport.supportsFraming) { this.sentCallbackFn.push(this.packetsFn) } else { this.sentCallbackFn.push.apply(this.sentCallbackFn, this.packetsFn) } this.packetsFn = [] this.transport.send(wbuf) this.emit("drain") this.server.emit("drain", this) } } /** * Get available upgrades for this socket. * * @api private */ getAvailableUpgrades() { const availableUpgrades = [] const allUpgrades = this.server.upgrades(this.transport.name) let i = 0 const l = allUpgrades.length for (; i < l; ++i) { const upg = allUpgrades[i] if (this.server.opts.transports.indexOf(upg) !== -1) { availableUpgrades.push(upg) } } return availableUpgrades } /** * Closes the socket and underlying transport. * * @param {Boolean} optional, discard * @return {Socket} for chaining * @api public */ close(discard?: any) { if ("open" !== this.readyState) return this.readyState = "closing" if (this.writeBuffer.length) { this.once("drain", this.closeTransport.bind(this, discard)) return } this.closeTransport(discard) } /** * Closes the underlying transport. * * @param {Boolean} discard * @api private */ closeTransport(discard: any) { if (discard) this.transport.discard() this.transport.close(this.onClose.bind(this, "forced close")) } }