ms/packages/websocket/src/engine.io-client/socket.ts

689 lines
19 KiB
TypeScript

import transports from "./transports"
// const transports = require("./transports/index")
const Emitter = require("component-emitter")
const debug = (...args: any) => console.debug('engine.io-client:socket', ...args)//require("debug")("engine.io-client:socket")
import parser from "../engine.io-parser"
const parseuri = require("parseuri")
const parseqs = require("parseqs")
import { installTimerFunctions } from "./util"
export class Socket extends Emitter {
/**
* Socket constructor.
*
* @param {String|Object} uri or options
* @param {Object} options
* @api public
*/
constructor(uri, opts: any = {}) {
super()
if (uri && "object" === typeof uri) {
opts = uri
uri = null
}
if (uri) {
uri = parseuri(uri)
opts.hostname = uri.host
opts.secure = uri.protocol === "https" || uri.protocol === "wss"
opts.port = uri.port
if (uri.query) opts.query = uri.query
} else if (opts.host) {
opts.hostname = parseuri(opts.host).host
}
installTimerFunctions(this, opts)
this.secure =
null != opts.secure
? opts.secure
: typeof location !== "undefined" && "https:" === location.protocol
if (opts.hostname && !opts.port) {
// if no port is specified manually, use the protocol default
opts.port = this.secure ? "443" : "80"
}
this.hostname =
opts.hostname ||
(typeof location !== "undefined" ? location.hostname : "localhost")
this.port =
opts.port ||
(typeof location !== "undefined" && location.port
? location.port
: this.secure
? 443
: 80)
this.transports = ["websocket"]
this.readyState = ""
this.writeBuffer = []
this.prevBufferLen = 0
this.opts = Object.assign(
{
path: "/engine.io",
agent: false,
withCredentials: false,
upgrade: true,
jsonp: true,
timestampParam: "t",
rememberUpgrade: false,
rejectUnauthorized: true,
perMessageDeflate: {
threshold: 1024
},
transportOptions: {},
closeOnBeforeunload: true
},
opts
)
this.opts.path = this.opts.path.replace(/\/$/, "") + "/"
if (typeof this.opts.query === "string") {
this.opts.query = parseqs.decode(this.opts.query)
}
// set on handshake
this.id = null
this.upgrades = null
this.pingInterval = null
this.pingTimeout = null
// set on heartbeat
this.pingTimeoutTimer = null
if (typeof addEventListener === "function") {
if (this.opts.closeOnBeforeunload) {
// Firefox closes the connection when the "beforeunload" event is emitted but not Chrome. This event listener
// ensures every browser behaves the same (no "disconnect" event at the Socket.IO level when the page is
// closed/reloaded)
addEventListener(
"beforeunload",
() => {
if (this.transport) {
// silently close the transport
this.transport.removeAllListeners()
this.transport.close()
}
},
false
)
}
if (this.hostname !== "localhost") {
this.offlineEventListener = () => {
this.onClose("transport close")
}
addEventListener("offline", this.offlineEventListener, false)
}
}
this.open()
}
/**
* Creates transport of the given type.
*
* @param {String} transport name
* @return {Transport}
* @api private
*/
createTransport(name, opt?) {
if (name != 'websocket') {
throw new Error('Only Support WebSocket in MiaoScript!')
}
debug('creating transport "%s"', name)
const query: any = clone(this.opts.query)
// append engine.io protocol identifier
query.EIO = parser.protocol
// transport name
query.transport = name
// session id if we already have one
if (this.id) query.sid = this.id
const opts = Object.assign(
{},
this.opts.transportOptions[name],
this.opts,
{
query,
socket: this,
hostname: this.hostname,
secure: this.secure,
port: this.port
}
)
debug("options: %j", JSON.stringify(opts))
debug("new func", transports[name])
return new transports[name](opts)
}
/**
* Initializes transport to use and starts probe.
*
* @api private
*/
open() {
let transport
if (
this.opts.rememberUpgrade &&
Socket.priorWebsocketSuccess &&
this.transports.indexOf("websocket") !== -1
) {
transport = "websocket"
} else if (0 === this.transports.length) {
// Emit error on next tick so it can be listened to
this.setTimeoutFn(() => {
this.emit("error", "No transports available")
}, 0)
return
} else {
transport = this.transports[0]
}
this.readyState = "opening"
// Retry with the next transport if the transport is disabled (jsonp: false)
try {
transport = this.createTransport(transport)
} catch (error: any) {
debug("error while creating transport: %s", error)
this.transports.shift()
this.open()
return
}
transport.open()
this.setTransport(transport)
}
/**
* Sets the current transport. Disables the existing one (if any).
*
* @api private
*/
setTransport(transport) {
debug("setting transport %s", transport.name)
if (this.transport) {
debug("clearing existing transport %s", this.transport.name)
this.transport.removeAllListeners()
}
// set up transport
this.transport = transport
// set up transport listeners
transport
.on("drain", this.onDrain.bind(this))
.on("packet", this.onPacket.bind(this))
.on("error", this.onError.bind(this))
.on("close", () => {
this.onClose("transport close")
})
}
/**
* Probes a transport.
*
* @param {String} transport name
* @api private
*/
probe(name) {
debug('probing transport "%s"', name)
let transport = this.createTransport(name, { probe: 1 })
let failed = false
Socket.priorWebsocketSuccess = false
const onTransportOpen = () => {
if (failed) return
debug('probe transport "%s" opened', name)
transport.send([{ type: "ping", data: "probe" }])
transport.once("packet", msg => {
if (failed) return
if ("pong" === msg.type && "probe" === msg.data) {
debug('probe transport "%s" pong', name)
this.upgrading = true
this.emit("upgrading", transport)
if (!transport) return
Socket.priorWebsocketSuccess = "websocket" === transport.name
debug('pausing current transport "%s"', this.transport.name)
this.transport.pause(() => {
if (failed) return
if ("closed" === this.readyState) return
debug("changing transport and sending upgrade packet")
cleanup()
this.setTransport(transport)
transport.send([{ type: "upgrade" }])
this.emit("upgrade", transport)
transport = null
this.upgrading = false
this.flush()
})
} else {
debug('probe transport "%s" failed', name)
const err: any = new Error("probe error")
err.transport = transport.name
this.emit("upgradeError", err)
}
})
}
function freezeTransport() {
if (failed) return
// Any callback called by transport should be ignored since now
failed = true
cleanup()
transport.close()
transport = null
}
// Handle any error that happens while probing
const onerror = err => {
const error: any = new Error("probe error: " + err)
error.transport = transport.name
freezeTransport()
debug('probe transport "%s" failed because of error: %s', name, err)
this.emit("upgradeError", error)
}
function onTransportClose() {
onerror("transport closed")
}
// When the socket is closed while we're probing
function onclose() {
onerror("socket closed")
}
// When the socket is upgraded while we're probing
function onupgrade(to) {
if (transport && to.name !== transport.name) {
debug('"%s" works - aborting "%s"', to.name, transport.name)
freezeTransport()
}
}
// Remove all listeners on the transport and on self
const cleanup = () => {
transport.removeListener("open", onTransportOpen)
transport.removeListener("error", onerror)
transport.removeListener("close", onTransportClose)
this.removeListener("close", onclose)
this.removeListener("upgrading", onupgrade)
}
transport.once("open", onTransportOpen)
transport.once("error", onerror)
transport.once("close", onTransportClose)
this.once("close", onclose)
this.once("upgrading", onupgrade)
transport.open()
}
/**
* Called when connection is deemed open.
*
* @api public
*/
onOpen() {
debug("socket open")
this.readyState = "open"
Socket.priorWebsocketSuccess = "websocket" === this.transport.name
this.emit("open")
this.flush()
// we check for `readyState` in case an `open`
// listener already closed the socket
if (
"open" === this.readyState &&
this.opts.upgrade &&
this.transport.pause
) {
debug("starting upgrade probes")
let i = 0
const l = this.upgrades.length
for (; i < l; i++) {
this.probe(this.upgrades[i])
}
}
}
/**
* Handles a packet.
*
* @api private
*/
onPacket(packet) {
if (
"opening" === this.readyState ||
"open" === this.readyState ||
"closing" === this.readyState
) {
debug('socket receive: type "%s", data "%s"', packet.type, packet.data)
this.emit("packet", packet)
// Socket is live - any packet counts
this.emit("heartbeat")
switch (packet.type) {
case "open":
this.onHandshake(JSON.parse(packet.data))
break
case "ping":
this.resetPingTimeout()
this.sendPacket("pong")
this.emit("ping")
this.emit("pong")
break
case "error":
const err: any = new Error("server error")
err.code = packet.data
this.onError(err)
break
case "message":
this.emit("data", packet.data)
this.emit("message", packet.data)
break
}
} else {
debug('packet received with socket readyState "%s"', this.readyState)
}
}
/**
* Called upon handshake completion.
*
* @param {Object} handshake obj
* @api private
*/
onHandshake(data) {
this.emit("handshake", data)
this.id = data.sid
this.transport.query.sid = data.sid
this.upgrades = this.filterUpgrades(data.upgrades)
this.pingInterval = data.pingInterval
this.pingTimeout = data.pingTimeout
this.onOpen()
// In case open handler closes socket
if ("closed" === this.readyState) return
this.resetPingTimeout()
}
/**
* Sets and resets ping timeout timer based on server pings.
*
* @api private
*/
resetPingTimeout() {
this.clearTimeoutFn(this.pingTimeoutTimer)
this.pingTimeoutTimer = this.setTimeoutFn(() => {
this.onClose("ping timeout")
}, this.pingInterval + this.pingTimeout)
if (this.opts.autoUnref) {
this.pingTimeoutTimer.unref()
}
}
/**
* Called on `drain` event
*
* @api private
*/
onDrain() {
this.writeBuffer.splice(0, this.prevBufferLen)
// setting prevBufferLen = 0 is very important
// for example, when upgrading, upgrade packet is sent over,
// and a nonzero prevBufferLen could cause problems on `drain`
this.prevBufferLen = 0
if (0 === this.writeBuffer.length) {
this.emit("drain")
} else {
this.flush()
}
}
/**
* Flush write buffers.
*
* @api private
*/
flush() {
if (
"closed" !== this.readyState &&
this.transport.writable &&
!this.upgrading &&
this.writeBuffer.length
) {
debug("flushing %d packets in socket", this.writeBuffer.length)
this.transport.send(this.writeBuffer)
// keep track of current length of writeBuffer
// splice writeBuffer and callbackBuffer on `drain`
this.prevBufferLen = this.writeBuffer.length
this.emit("flush")
}
}
/**
* Sends a message.
*
* @param {String} message.
* @param {Function} callback function.
* @param {Object} options.
* @return {Socket} for chaining.
* @api public
*/
write(msg, options, fn) {
this.sendPacket("message", msg, options, fn)
return this
}
send(msg, options, fn) {
this.sendPacket("message", msg, options, fn)
return this
}
/**
* Sends a packet.
*
* @param {String} packet type.
* @param {String} data.
* @param {Object} options.
* @param {Function} callback function.
* @api private
*/
sendPacket(type, data?, options?, fn?) {
if ("function" === typeof data) {
fn = data
data = undefined
}
if ("function" === typeof options) {
fn = options
options = null
}
if ("closing" === this.readyState || "closed" === this.readyState) {
return
}
options = options || {}
options.compress = false !== options.compress
const packet = {
type: type,
data: data,
options: options
}
this.emit("packetCreate", packet)
this.writeBuffer.push(packet)
if (fn) this.once("flush", fn)
this.flush()
}
/**
* Closes the connection.
*
* @api private
*/
close() {
const close = () => {
this.onClose("forced close")
debug("socket closing - telling transport to close")
this.transport.close()
}
const cleanupAndClose = () => {
this.removeListener("upgrade", cleanupAndClose)
this.removeListener("upgradeError", cleanupAndClose)
close()
}
const waitForUpgrade = () => {
// wait for upgrade to finish since we can't send packets while pausing a transport
this.once("upgrade", cleanupAndClose)
this.once("upgradeError", cleanupAndClose)
}
if ("opening" === this.readyState || "open" === this.readyState) {
this.readyState = "closing"
if (this.writeBuffer.length) {
this.once("drain", () => {
if (this.upgrading) {
waitForUpgrade()
} else {
close()
}
})
} else if (this.upgrading) {
waitForUpgrade()
} else {
close()
}
}
return this
}
/**
* Called upon transport error
*
* @api private
*/
onError(err) {
debug("socket error %j", err)
Socket.priorWebsocketSuccess = false
this.emit("error", err)
this.onClose("transport error", err)
}
/**
* Called upon transport close.
*
* @api private
*/
onClose(reason, desc?) {
if (
"opening" === this.readyState ||
"open" === this.readyState ||
"closing" === this.readyState
) {
debug('socket close with reason: "%s"', reason)
// clear timers
this.clearTimeoutFn(this.pingIntervalTimer)
this.clearTimeoutFn(this.pingTimeoutTimer)
// stop event from firing again for transport
this.transport.removeAllListeners("close")
// ensure transport won't stay open
this.transport.close()
// ignore further transport communication
this.transport.removeAllListeners()
if (typeof removeEventListener === "function") {
removeEventListener("offline", this.offlineEventListener, false)
}
// set ready state
this.readyState = "closed"
// clear session id
this.id = null
// emit close event
this.emit("close", reason, desc)
// clean buffers after, so users can still
// grab the buffers on `close` event
this.writeBuffer = []
this.prevBufferLen = 0
}
}
/**
* Filters upgrades, returning only those matching client transports.
*
* @param {Array} server upgrades
* @api private
*
*/
filterUpgrades(upgrades) {
const filteredUpgrades = []
let i = 0
const j = upgrades.length
for (; i < j; i++) {
if (~this.transports.indexOf(upgrades[i]))
filteredUpgrades.push(upgrades[i])
}
return filteredUpgrades
}
}
Socket.priorWebsocketSuccess = false
/**
* Protocol version.
*
* @api public
*/
Socket.protocol = parser.protocol // this is an int
function clone(obj) {
const o = {}
for (let i in obj) {
if (obj.hasOwnProperty(i)) {
o[i] = obj[i]
}
}
return o
}