import transports from "./transports" // const transports = require("./transports/index") const Emitter = require("component-emitter") const debug = (...args: any) => console.debug('engine.io-client:socket', ...args)//require("debug")("engine.io-client:socket") import parser from "../engine.io-parser" const parseuri = require("parseuri") const parseqs = require("parseqs") import { installTimerFunctions } from "./util" export class Socket extends Emitter { /** * Socket constructor. * * @param {String|Object} uri or options * @param {Object} options * @api public */ constructor(uri, opts: any = {}) { super() if (uri && "object" === typeof uri) { opts = uri uri = null } if (uri) { uri = parseuri(uri) opts.hostname = uri.host opts.secure = uri.protocol === "https" || uri.protocol === "wss" opts.port = uri.port if (uri.query) opts.query = uri.query } else if (opts.host) { opts.hostname = parseuri(opts.host).host } installTimerFunctions(this, opts) this.secure = null != opts.secure ? opts.secure : typeof location !== "undefined" && "https:" === location.protocol if (opts.hostname && !opts.port) { // if no port is specified manually, use the protocol default opts.port = this.secure ? "443" : "80" } this.hostname = opts.hostname || (typeof location !== "undefined" ? location.hostname : "localhost") this.port = opts.port || (typeof location !== "undefined" && location.port ? location.port : this.secure ? 443 : 80) this.transports = ["websocket"] this.readyState = "" this.writeBuffer = [] this.prevBufferLen = 0 this.opts = Object.assign( { path: "/engine.io", agent: false, withCredentials: false, upgrade: true, jsonp: true, timestampParam: "t", rememberUpgrade: false, rejectUnauthorized: true, perMessageDeflate: { threshold: 1024 }, transportOptions: {}, closeOnBeforeunload: true }, opts ) this.opts.path = this.opts.path.replace(/\/$/, "") + "/" if (typeof this.opts.query === "string") { this.opts.query = parseqs.decode(this.opts.query) } // set on handshake this.id = null this.upgrades = null this.pingInterval = null this.pingTimeout = null // set on heartbeat this.pingTimeoutTimer = null if (typeof addEventListener === "function") { if (this.opts.closeOnBeforeunload) { // Firefox closes the connection when the "beforeunload" event is emitted but not Chrome. This event listener // ensures every browser behaves the same (no "disconnect" event at the Socket.IO level when the page is // closed/reloaded) addEventListener( "beforeunload", () => { if (this.transport) { // silently close the transport this.transport.removeAllListeners() this.transport.close() } }, false ) } if (this.hostname !== "localhost") { this.offlineEventListener = () => { this.onClose("transport close") } addEventListener("offline", this.offlineEventListener, false) } } this.open() } /** * Creates transport of the given type. * * @param {String} transport name * @return {Transport} * @api private */ createTransport(name, opt?) { if (name != 'websocket') { throw new Error('Only Support WebSocket in MiaoScript!') } debug('creating transport "%s"', name) const query: any = clone(this.opts.query) // append engine.io protocol identifier query.EIO = parser.protocol // transport name query.transport = name // session id if we already have one if (this.id) query.sid = this.id const opts = Object.assign( {}, this.opts.transportOptions[name], this.opts, { query, socket: this, hostname: this.hostname, secure: this.secure, port: this.port } ) debug("options: %j", JSON.stringify(opts)) debug("new func", transports[name]) return new transports[name](opts) } /** * Initializes transport to use and starts probe. * * @api private */ open() { let transport if ( this.opts.rememberUpgrade && Socket.priorWebsocketSuccess && this.transports.indexOf("websocket") !== -1 ) { transport = "websocket" } else if (0 === this.transports.length) { // Emit error on next tick so it can be listened to this.setTimeoutFn(() => { this.emit("error", "No transports available") }, 0) return } else { transport = this.transports[0] } this.readyState = "opening" // Retry with the next transport if the transport is disabled (jsonp: false) try { transport = this.createTransport(transport) } catch (e) { debug("error while creating transport: %s", e) this.transports.shift() this.open() return } transport.open() this.setTransport(transport) } /** * Sets the current transport. Disables the existing one (if any). * * @api private */ setTransport(transport) { debug("setting transport %s", transport.name) if (this.transport) { debug("clearing existing transport %s", this.transport.name) this.transport.removeAllListeners() } // set up transport this.transport = transport // set up transport listeners transport .on("drain", this.onDrain.bind(this)) .on("packet", this.onPacket.bind(this)) .on("error", this.onError.bind(this)) .on("close", () => { this.onClose("transport close") }) } /** * Probes a transport. * * @param {String} transport name * @api private */ probe(name) { debug('probing transport "%s"', name) let transport = this.createTransport(name, { probe: 1 }) let failed = false Socket.priorWebsocketSuccess = false const onTransportOpen = () => { if (failed) return debug('probe transport "%s" opened', name) transport.send([{ type: "ping", data: "probe" }]) transport.once("packet", msg => { if (failed) return if ("pong" === msg.type && "probe" === msg.data) { debug('probe transport "%s" pong', name) this.upgrading = true this.emit("upgrading", transport) if (!transport) return Socket.priorWebsocketSuccess = "websocket" === transport.name debug('pausing current transport "%s"', this.transport.name) this.transport.pause(() => { if (failed) return if ("closed" === this.readyState) return debug("changing transport and sending upgrade packet") cleanup() this.setTransport(transport) transport.send([{ type: "upgrade" }]) this.emit("upgrade", transport) transport = null this.upgrading = false this.flush() }) } else { debug('probe transport "%s" failed', name) const err: any = new Error("probe error") err.transport = transport.name this.emit("upgradeError", err) } }) } function freezeTransport() { if (failed) return // Any callback called by transport should be ignored since now failed = true cleanup() transport.close() transport = null } // Handle any error that happens while probing const onerror = err => { const error: any = new Error("probe error: " + err) error.transport = transport.name freezeTransport() debug('probe transport "%s" failed because of error: %s', name, err) this.emit("upgradeError", error) } function onTransportClose() { onerror("transport closed") } // When the socket is closed while we're probing function onclose() { onerror("socket closed") } // When the socket is upgraded while we're probing function onupgrade(to) { if (transport && to.name !== transport.name) { debug('"%s" works - aborting "%s"', to.name, transport.name) freezeTransport() } } // Remove all listeners on the transport and on self const cleanup = () => { transport.removeListener("open", onTransportOpen) transport.removeListener("error", onerror) transport.removeListener("close", onTransportClose) this.removeListener("close", onclose) this.removeListener("upgrading", onupgrade) } transport.once("open", onTransportOpen) transport.once("error", onerror) transport.once("close", onTransportClose) this.once("close", onclose) this.once("upgrading", onupgrade) transport.open() } /** * Called when connection is deemed open. * * @api public */ onOpen() { debug("socket open") this.readyState = "open" Socket.priorWebsocketSuccess = "websocket" === this.transport.name this.emit("open") this.flush() // we check for `readyState` in case an `open` // listener already closed the socket if ( "open" === this.readyState && this.opts.upgrade && this.transport.pause ) { debug("starting upgrade probes") let i = 0 const l = this.upgrades.length for (; i < l; i++) { this.probe(this.upgrades[i]) } } } /** * Handles a packet. * * @api private */ onPacket(packet) { if ( "opening" === this.readyState || "open" === this.readyState || "closing" === this.readyState ) { debug('socket receive: type "%s", data "%s"', packet.type, packet.data) this.emit("packet", packet) // Socket is live - any packet counts this.emit("heartbeat") switch (packet.type) { case "open": this.onHandshake(JSON.parse(packet.data)) break case "ping": this.resetPingTimeout() this.sendPacket("pong") this.emit("ping") this.emit("pong") break case "error": const err: any = new Error("server error") err.code = packet.data this.onError(err) break case "message": this.emit("data", packet.data) this.emit("message", packet.data) break } } else { debug('packet received with socket readyState "%s"', this.readyState) } } /** * Called upon handshake completion. * * @param {Object} handshake obj * @api private */ onHandshake(data) { this.emit("handshake", data) this.id = data.sid this.transport.query.sid = data.sid this.upgrades = this.filterUpgrades(data.upgrades) this.pingInterval = data.pingInterval this.pingTimeout = data.pingTimeout this.onOpen() // In case open handler closes socket if ("closed" === this.readyState) return this.resetPingTimeout() } /** * Sets and resets ping timeout timer based on server pings. * * @api private */ resetPingTimeout() { this.clearTimeoutFn(this.pingTimeoutTimer) this.pingTimeoutTimer = this.setTimeoutFn(() => { this.onClose("ping timeout") }, this.pingInterval + this.pingTimeout) if (this.opts.autoUnref) { this.pingTimeoutTimer.unref() } } /** * Called on `drain` event * * @api private */ onDrain() { this.writeBuffer.splice(0, this.prevBufferLen) // setting prevBufferLen = 0 is very important // for example, when upgrading, upgrade packet is sent over, // and a nonzero prevBufferLen could cause problems on `drain` this.prevBufferLen = 0 if (0 === this.writeBuffer.length) { this.emit("drain") } else { this.flush() } } /** * Flush write buffers. * * @api private */ flush() { if ( "closed" !== this.readyState && this.transport.writable && !this.upgrading && this.writeBuffer.length ) { debug("flushing %d packets in socket", this.writeBuffer.length) this.transport.send(this.writeBuffer) // keep track of current length of writeBuffer // splice writeBuffer and callbackBuffer on `drain` this.prevBufferLen = this.writeBuffer.length this.emit("flush") } } /** * Sends a message. * * @param {String} message. * @param {Function} callback function. * @param {Object} options. * @return {Socket} for chaining. * @api public */ write(msg, options, fn) { this.sendPacket("message", msg, options, fn) return this } send(msg, options, fn) { this.sendPacket("message", msg, options, fn) return this } /** * Sends a packet. * * @param {String} packet type. * @param {String} data. * @param {Object} options. * @param {Function} callback function. * @api private */ sendPacket(type, data?, options?, fn?) { if ("function" === typeof data) { fn = data data = undefined } if ("function" === typeof options) { fn = options options = null } if ("closing" === this.readyState || "closed" === this.readyState) { return } options = options || {} options.compress = false !== options.compress const packet = { type: type, data: data, options: options } this.emit("packetCreate", packet) this.writeBuffer.push(packet) if (fn) this.once("flush", fn) this.flush() } /** * Closes the connection. * * @api private */ close() { const close = () => { this.onClose("forced close") debug("socket closing - telling transport to close") this.transport.close() } const cleanupAndClose = () => { this.removeListener("upgrade", cleanupAndClose) this.removeListener("upgradeError", cleanupAndClose) close() } const waitForUpgrade = () => { // wait for upgrade to finish since we can't send packets while pausing a transport this.once("upgrade", cleanupAndClose) this.once("upgradeError", cleanupAndClose) } if ("opening" === this.readyState || "open" === this.readyState) { this.readyState = "closing" if (this.writeBuffer.length) { this.once("drain", () => { if (this.upgrading) { waitForUpgrade() } else { close() } }) } else if (this.upgrading) { waitForUpgrade() } else { close() } } return this } /** * Called upon transport error * * @api private */ onError(err) { debug("socket error %j", err) Socket.priorWebsocketSuccess = false this.emit("error", err) this.onClose("transport error", err) } /** * Called upon transport close. * * @api private */ onClose(reason, desc?) { if ( "opening" === this.readyState || "open" === this.readyState || "closing" === this.readyState ) { debug('socket close with reason: "%s"', reason) // clear timers this.clearTimeoutFn(this.pingIntervalTimer) this.clearTimeoutFn(this.pingTimeoutTimer) // stop event from firing again for transport this.transport.removeAllListeners("close") // ensure transport won't stay open this.transport.close() // ignore further transport communication this.transport.removeAllListeners() if (typeof removeEventListener === "function") { removeEventListener("offline", this.offlineEventListener, false) } // set ready state this.readyState = "closed" // clear session id this.id = null // emit close event this.emit("close", reason, desc) // clean buffers after, so users can still // grab the buffers on `close` event this.writeBuffer = [] this.prevBufferLen = 0 } } /** * Filters upgrades, returning only those matching client transports. * * @param {Array} server upgrades * @api private * */ filterUpgrades(upgrades) { const filteredUpgrades = [] let i = 0 const j = upgrades.length for (; i < j; i++) { if (~this.transports.indexOf(upgrades[i])) filteredUpgrades.push(upgrades[i]) } return filteredUpgrades } } Socket.priorWebsocketSuccess = false /** * Protocol version. * * @api public */ Socket.protocol = parser.protocol // this is an int function clone(obj) { const o = {} for (let i in obj) { if (obj.hasOwnProperty(i)) { o[i] = obj[i] } } return o }