Files
ms/packages/websocket/src/engine.io/socket.ts
2021-08-03 16:59:43 +08:00

531 lines
16 KiB
TypeScript

import { EventEmitter } from "events"
import { Server } from "./server"
import { Transport } from "./transport"
import type { Request } from "../server/request"
// const debug = require("debug")("engine:socket")
export class Socket extends EventEmitter {
public id: string
private server: Server
private upgrading = false
private upgraded = false
public readyState = "opening"
private writeBuffer = []
private packetsFn = []
private sentCallbackFn = []
private cleanupFn = []
public request: Request
public protocol: number
public remoteAddress: any
public transport: Transport
private checkIntervalTimer: NodeJS.Timeout
private upgradeTimeoutTimer: NodeJS.Timeout
private pingTimeoutTimer: NodeJS.Timeout
private pingIntervalTimer: NodeJS.Timeout
/**
* Client class (abstract).
*
* @api private
*/
constructor(id: string, server: Server, transport: Transport, req: Request, protocol: number) {
super()
this.id = id
this.server = server
this.request = req
this.protocol = protocol
// Cache IP since it might not be in the req later
if (req.websocket && req.websocket._socket) {
this.remoteAddress = req.websocket._socket.remoteAddress
} else {
this.remoteAddress = req.connection.remoteAddress
}
this.checkIntervalTimer = null
this.upgradeTimeoutTimer = null
this.pingTimeoutTimer = null
this.pingIntervalTimer = null
this.setTransport(transport)
this.onOpen()
}
/**
* Called upon transport considered open.
*
* @api private
*/
onOpen() {
this.readyState = "open"
// sends an `open` packet
this.transport.sid = this.id
this.sendPacket(
"open",
JSON.stringify({
sid: this.id,
upgrades: this.getAvailableUpgrades(),
pingInterval: this.server.opts.pingInterval,
pingTimeout: this.server.opts.pingTimeout
})
)
if (this.server.opts.initialPacket) {
this.sendPacket("message", this.server.opts.initialPacket)
}
this.emit("open")
if (this.protocol === 3) {
// in protocol v3, the client sends a ping, and the server answers with a pong
this.resetPingTimeout(
this.server.opts.pingInterval + this.server.opts.pingTimeout
)
} else {
// in protocol v4, the server sends a ping, and the client answers with a pong
this.schedulePing()
}
}
/**
* Called upon transport packet.
*
* @param {Object} packet
* @api private
*/
onPacket(packet: { type: any; data: any }) {
if ("open" !== this.readyState) {
console.debug("packet received with closed socket")
return
}
// export packet event
// debug(`received packet ${packet.type}`)
this.emit("packet", packet)
// Reset ping timeout on any packet, incoming data is a good sign of
// other side's liveness
this.resetPingTimeout(
this.server.opts.pingInterval + this.server.opts.pingTimeout
)
switch (packet.type) {
case "ping":
if (this.transport.protocol !== 3) {
this.onError("invalid heartbeat direction")
return
}
// debug("got ping")
this.sendPacket("pong")
this.emit("heartbeat")
break
case "pong":
if (this.transport.protocol === 3) {
this.onError("invalid heartbeat direction")
return
}
// debug("got pong")
this.schedulePing()
this.emit("heartbeat")
break
case "error":
this.onClose("parse error")
break
case "message":
this.emit("data", packet.data)
this.emit("message", packet.data)
break
}
}
/**
* Called upon transport error.
*
* @param {Error} error object
* @api private
*/
onError(err: string) {
// debug("transport error")
this.onClose("transport error", err)
}
/**
* Pings client every `this.pingInterval` and expects response
* within `this.pingTimeout` or closes connection.
*
* @api private
*/
schedulePing() {
clearTimeout(this.pingIntervalTimer)
this.pingIntervalTimer = setTimeout(() => {
// debug(
// "writing ping packet - expecting pong within %sms",
// this.server.opts.pingTimeout
// )
this.sendPacket("ping")
this.resetPingTimeout(this.server.opts.pingTimeout)
}, this.server.opts.pingInterval)
}
/**
* Resets ping timeout.
*
* @api private
*/
resetPingTimeout(timeout: number) {
clearTimeout(this.pingTimeoutTimer)
this.pingTimeoutTimer = setTimeout(() => {
if (this.readyState === "closed") return
this.onClose("ping timeout")
}, timeout)
}
/**
* Attaches handlers for the given transport.
*
* @param {Transport} transport
* @api private
*/
setTransport(transport: Transport) {
console.debug(`engine.io socket ${this.id} set transport ${transport.name}`)
const onError = this.onError.bind(this)
const onPacket = this.onPacket.bind(this)
const flush = this.flush.bind(this)
const onClose = this.onClose.bind(this, "transport close")
this.transport = transport
this.transport.once("error", onError)
this.transport.on("packet", onPacket)
this.transport.on("drain", flush)
this.transport.once("close", onClose)
// this function will manage packet events (also message callbacks)
this.setupSendCallback()
this.cleanupFn.push(function () {
transport.removeListener("error", onError)
transport.removeListener("packet", onPacket)
transport.removeListener("drain", flush)
transport.removeListener("close", onClose)
})
}
/**
* Upgrades socket to the given transport
*
* @param {Transport} transport
* @api private
*/
maybeUpgrade(transport: Transport) {
console.debug(
'might upgrade socket transport from "', this.transport.name, '" to "', transport.name, '"'
)
this.upgrading = true
// set transport upgrade timer
this.upgradeTimeoutTimer = setTimeout(() => {
console.debug("client did not complete upgrade - closing transport")
cleanup()
if ("open" === transport.readyState) {
transport.close()
}
}, this.server.opts.upgradeTimeout)
const onPacket = (packet: { type: string; data: string }) => {
if ("ping" === packet.type && "probe" === packet.data) {
transport.send([{ type: "pong", data: "probe" }])
this.emit("upgrading", transport)
clearInterval(this.checkIntervalTimer)
this.checkIntervalTimer = setInterval(check, 100)
} else if ("upgrade" === packet.type && this.readyState !== "closed") {
// debug("got upgrade packet - upgrading")
cleanup()
this.transport.discard()
this.upgraded = true
this.clearTransport()
this.setTransport(transport)
this.emit("upgrade", transport)
this.flush()
if (this.readyState === "closing") {
transport.close(() => {
this.onClose("forced close")
})
}
} else {
cleanup()
transport.close()
}
}
// we force a polling cycle to ensure a fast upgrade
const check = () => {
if ("polling" === this.transport.name && this.transport.writable) {
// debug("writing a noop packet to polling for fast upgrade")
this.transport.send([{ type: "noop" }])
}
}
const cleanup = () => {
this.upgrading = false
clearInterval(this.checkIntervalTimer)
this.checkIntervalTimer = null
clearTimeout(this.upgradeTimeoutTimer)
this.upgradeTimeoutTimer = null
transport.removeListener("packet", onPacket)
transport.removeListener("close", onTransportClose)
transport.removeListener("error", onError)
this.removeListener("close", onClose)
}
const onError = (err: string) => {
// debug("client did not complete upgrade - %s", err)
cleanup()
transport.close()
transport = null
}
const onTransportClose = () => {
onError("transport closed")
}
const onClose = () => {
onError("socket closed")
}
transport.on("packet", onPacket)
transport.once("close", onTransportClose)
transport.once("error", onError)
this.once("close", onClose)
}
/**
* Clears listeners and timers associated with current transport.
*
* @api private
*/
clearTransport() {
let cleanup: () => void
const toCleanUp = this.cleanupFn.length
for (let i = 0; i < toCleanUp; i++) {
cleanup = this.cleanupFn.shift()
cleanup()
}
// silence further transport errors and prevent uncaught exceptions
this.transport.on("error", function () {
// debug("error triggered by discarded transport")
})
// ensure transport won't stay open
this.transport.close()
clearTimeout(this.pingTimeoutTimer)
}
/**
* Called upon transport considered closed.
* Possible reasons: `ping timeout`, `client error`, `parse error`,
* `transport error`, `server close`, `transport close`
*/
onClose(reason: string, description?: string) {
if ("closed" !== this.readyState) {
this.readyState = "closed"
// clear timers
clearTimeout(this.pingIntervalTimer)
clearTimeout(this.pingTimeoutTimer)
clearInterval(this.checkIntervalTimer)
this.checkIntervalTimer = null
clearTimeout(this.upgradeTimeoutTimer)
// clean writeBuffer in next tick, so developers can still
// grab the writeBuffer on 'close' event
process.nextTick(() => {
this.writeBuffer = []
})
this.packetsFn = []
this.sentCallbackFn = []
this.clearTransport()
this.emit("close", reason, description)
}
}
/**
* Setup and manage send callback
*
* @api private
*/
setupSendCallback() {
// the message was sent successfully, execute the callback
const onDrain = () => {
if (this.sentCallbackFn.length > 0) {
const seqFn = this.sentCallbackFn.splice(0, 1)[0]
if ("function" === typeof seqFn) {
// debug("executing send callback")
seqFn(this.transport)
} else if (Array.isArray(seqFn)) {
// debug("executing batch send callback")
const l = seqFn.length
let i = 0
for (; i < l; i++) {
if ("function" === typeof seqFn[i]) {
seqFn[i](this.transport)
}
}
}
}
}
this.transport.on("drain", onDrain)
this.cleanupFn.push(() => {
this.transport.removeListener("drain", onDrain)
})
}
/**
* Sends a message packet.
*
* @param {String} message
* @param {Object} options
* @param {Function} callback
* @return {Socket} for chaining
* @api public
*/
send(data: any, options: any, callback: any) {
this.sendPacket("message", data, options, callback)
return this
}
write(data: any, options: any, callback?: any) {
this.sendPacket("message", data, options, callback)
return this
}
/**
* Sends a packet.
*
* @param {String} packet type
* @param {String} optional, data
* @param {Object} options
* @api private
*/
sendPacket(type: string, data?: string, options?: { compress?: any }, callback?: undefined) {
if ("function" === typeof options) {
callback = options
options = null
}
options = options || {}
options.compress = false !== options.compress
if ("closing" !== this.readyState && "closed" !== this.readyState) {
// console.debug('sending packet "%s" (%s)', type, data)
const packet: any = {
type: type,
options: options
}
if (data) packet.data = data
// exports packetCreate event
this.emit("packetCreate", packet)
this.writeBuffer.push(packet)
// add send callback to object, if defined
if (callback) this.packetsFn.push(callback)
this.flush()
}
}
/**
* Attempts to flush the packets buffer.
*
* @api private
*/
flush() {
if (
"closed" !== this.readyState &&
this.transport.writable &&
this.writeBuffer.length
) {
console.trace("flushing buffer to transport")
this.emit("flush", this.writeBuffer)
this.server.emit("flush", this, this.writeBuffer)
const wbuf = this.writeBuffer
this.writeBuffer = []
if (!this.transport.supportsFraming) {
this.sentCallbackFn.push(this.packetsFn)
} else {
this.sentCallbackFn.push.apply(this.sentCallbackFn, this.packetsFn)
}
this.packetsFn = []
this.transport.send(wbuf)
this.emit("drain")
this.server.emit("drain", this)
}
}
/**
* Get available upgrades for this socket.
*
* @api private
*/
getAvailableUpgrades() {
const availableUpgrades = []
const allUpgrades = this.server.upgrades(this.transport.name)
let i = 0
const l = allUpgrades.length
for (; i < l; ++i) {
const upg = allUpgrades[i]
if (this.server.opts.transports.indexOf(upg) !== -1) {
availableUpgrades.push(upg)
}
}
return availableUpgrades
}
/**
* Closes the socket and underlying transport.
*
* @param {Boolean} optional, discard
* @return {Socket} for chaining
* @api public
*/
close(discard?: any) {
if ("open" !== this.readyState) return
this.readyState = "closing"
if (this.writeBuffer.length) {
this.once("drain", this.closeTransport.bind(this, discard))
return
}
this.closeTransport(discard)
}
/**
* Closes the underlying transport.
*
* @param {Boolean} discard
* @api private
*/
closeTransport(discard: any) {
if (discard) this.transport.discard()
this.transport.close(this.onClose.bind(this, "forced close"))
}
}