perf: optimize websocket logic

Signed-off-by: MiaoWoo <admin@yumc.pw>
This commit is contained in:
MiaoWoo 2020-11-20 14:21:57 +08:00
parent f31726e98b
commit af1e64767a
12 changed files with 273 additions and 228 deletions

View File

@ -1,5 +0,0 @@
export interface InnerClient {
id: string
send(text: string)
close()
}

View File

@ -12,8 +12,12 @@ export abstract class WebSocketHandlerAdapter {
this._Handler = new ChannelInboundHandlerAdapterImpl() this._Handler = new ChannelInboundHandlerAdapterImpl()
} }
abstract channelRead(ctx: any, channel: any) abstract channelRead(ctx: any, channel: any)
abstract channelInactive(ctx: any) channelInactive(ctx: any) {
abstract channelUnregistered(ctx: any) ctx.fireChannelInactive()
}
channelUnregistered(ctx: any) {
ctx.fireChannelUnregistered()
}
abstract exceptionCaught(ctx: any, cause: Error) abstract exceptionCaught(ctx: any, cause: Error)
getHandler() { getHandler() {
return this._Handler return this._Handler

View File

@ -1,45 +1,24 @@
import { EventEmitter } from 'events' import { Transport } from '../transport'
import { InnerClient } from '../interfaces'
import { AttributeKeys } from './constants' import { AttributeKeys } from './constants'
const TextWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.TextWebSocketFrame') const TextWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.TextWebSocketFrame')
export class NettyClient extends EventEmitter implements InnerClient { export class NettyClient extends Transport {
private _id: string
private channel: any private channel: any
server: any
readyState: string
remoteAddress: string
upgraded: boolean
request: any
constructor(server: any, channel: any) { constructor(server: any, channel: any) {
super() super(server)
this.server = server
this.readyState = 'open'
this.remoteAddress = channel.remoteAddress() + '' this.remoteAddress = channel.remoteAddress() + ''
this.upgraded = true
this.request = channel.attr(AttributeKeys.Request).get() this.request = channel.attr(AttributeKeys.Request).get()
this.channel = channel
this._id = channel.id() + '' this._id = channel.id() + ''
this.channel = channel
} }
get id() { doSend(text: string) {
return this._id this.channel.writeAndFlush(new TextWebSocketFrame(text))
} }
send(text: string) { doClose() {
if (this.readyState == 'open') { this.channel.close()
this.channel.writeAndFlush(new TextWebSocketFrame(text))
} else {
console.debug(`send message ${text} to close client ${this._id}`)
}
}
close() {
if (this.readyState = 'open') {
this.channel.close()
this.readyState = 'close'
}
} }
} }

View File

@ -11,16 +11,6 @@ export class WebSocketDetect extends WebSocketHandlerAdapter {
channelRead(ctx: any, channel: any) { channelRead(ctx: any, channel: any) {
this.event.emit(ServerEvent.detect, ctx, channel) this.event.emit(ServerEvent.detect, ctx, channel)
} }
channelInactive(ctx: any) {
console.debug('WebSocketDetect channelUnregistered ' + ctx)
this.event.emit(ServerEvent.disconnect, ctx, 'client disconnect')
ctx.channelInactive()
}
channelUnregistered(ctx: any) {
console.debug('WebSocketDetect channelUnregistered ' + ctx)
this.event.emit(ServerEvent.disconnect, ctx, 'client disconnect')
ctx.fireChannelUnregistered()
}
exceptionCaught(ctx: any, cause: Error) { exceptionCaught(ctx: any, cause: Error) {
this.event.emit(ServerEvent.error, ctx, cause) this.event.emit(ServerEvent.error, ctx, cause)
} }

View File

@ -43,14 +43,14 @@ export class WebSocketHandler extends WebSocketHandlerAdapter {
channelInactive(ctx: any) { channelInactive(ctx: any) {
console.debug('WebSocketHandler channelInactive ' + ctx) console.debug('WebSocketHandler channelInactive ' + ctx)
this.options.event.emit(ServerEvent.disconnect, ctx, 'client disconnect') this.options.event.emit(ServerEvent.disconnect, ctx, 'netty channelInactive')
ctx.channelInactive() super.channelInactive(ctx)
} }
channelUnregistered(ctx: any) { channelUnregistered(ctx: any) {
console.debug('WebSocketHandler channelUnregistered ' + ctx) console.debug('WebSocketHandler channelUnregistered ' + ctx)
this.options.event.emit(ServerEvent.disconnect, ctx, 'client disconnect') this.options.event.emit(ServerEvent.disconnect, ctx, 'netty channelUnregistered')
ctx.fireChannelUnregistered() super.channelUnregistered(ctx)
} }
exceptionCaught(ctx: any, cause: Error) { exceptionCaught(ctx: any, cause: Error) {

View File

@ -5,11 +5,12 @@ import { Namespace, Server, Socket } from './index'
import { PacketTypes, SubPacketTypes } from './types' import { PacketTypes, SubPacketTypes } from './types'
import { ServerEvent } from './constants' import { ServerEvent } from './constants'
import { SocketId } from './adapter' import { SocketId } from './adapter'
import { Transport } from '../transport'
const parser = new Parser() const parser = new Parser()
export class Client extends EventEmitter implements Client { export class Client extends EventEmitter {
public readonly conn public readonly conn: Transport
/** /**
* @private * @private
*/ */
@ -21,6 +22,11 @@ export class Client extends EventEmitter implements Client {
private nsps: Map<string, Socket> private nsps: Map<string, Socket>
private connectTimeout: NodeJS.Timeout private connectTimeout: NodeJS.Timeout
private checkIntervalTimer: NodeJS.Timeout
private upgradeTimeoutTimer: NodeJS.Timeout
private pingTimeoutTimer: NodeJS.Timeout
private pingIntervalTimer: NodeJS.Timeout
constructor(server: Server, conn) { constructor(server: Server, conn) {
super() super()
this.server = server this.server = server
@ -32,6 +38,9 @@ export class Client extends EventEmitter implements Client {
// ============================= // =============================
this.sockets = new Map() this.sockets = new Map()
this.nsps = new Map() this.nsps = new Map()
// ================== engine.io
this.onOpen()
// ================== Transport
this.conn.on(ServerEvent.disconnect, (reason) => { this.conn.on(ServerEvent.disconnect, (reason) => {
this.onclose(reason) this.onclose(reason)
}) })
@ -73,7 +82,7 @@ export class Client extends EventEmitter implements Client {
* @private * @private
*/ */
private connect(name: string, auth: object = {}) { private connect(name: string, auth: object = {}) {
console.debug(`client ${this.id} connecting to namespace ${name} has: ${this.server._nsps[name]}`) console.debug(`client ${this.id} connecting to namespace ${name} has: ${this.server._nsps.has(name)}`)
if (this.server._nsps.has(name)) { if (this.server._nsps.has(name)) {
return this.doConnect(name, auth) return this.doConnect(name, auth)
} }
@ -103,9 +112,7 @@ export class Client extends EventEmitter implements Client {
const nsp = this.server.of(name) const nsp = this.server.of(name)
const socket = nsp._add(this, auth, () => { const socket = nsp._add(this, auth, () => {
console.debug(`doConnect set sockets ${socket.id}`)
this.sockets.set(socket.id, socket) this.sockets.set(socket.id, socket)
console.debug(`doConnect set nsps ${nsp.name}`)
this.nsps.set(nsp.name, socket) this.nsps.set(nsp.name, socket)
}) })
} }
@ -128,12 +135,16 @@ export class Client extends EventEmitter implements Client {
*/ */
_remove(socket: Socket) { _remove(socket: Socket) {
if (this.sockets.has(socket.id)) { if (this.sockets.has(socket.id)) {
const nsp = this.sockets.get(socket.id).nsp.name
this.sockets.delete(socket.id) this.sockets.delete(socket.id)
this.nsps.delete(nsp) this.nsps.delete(socket.nsp.name)
} else { } else {
console.debug(`ignoring remove for ${socket.id}`,) console.debug(`ignoring remove for ${socket.id}`,)
} }
process.nextTick(() => {
if (this.sockets.size == 0) {
this.onclose('no live socket')
}
})
} }
/** /**
* Closes the underlying connection. * Closes the underlying connection.
@ -141,6 +152,7 @@ export class Client extends EventEmitter implements Client {
* @private * @private
*/ */
private close() { private close() {
console.debug(`client ${this.id} close`)
if ("open" == this.conn.readyState) { if ("open" == this.conn.readyState) {
console.debug("forcing transport close") console.debug("forcing transport close")
this.onclose("forced server close") this.onclose("forced server close")
@ -154,7 +166,7 @@ export class Client extends EventEmitter implements Client {
* @param {Object} opts * @param {Object} opts
* @private * @private
*/ */
_packet(packet, opts = { preEncoded: false }) { _packet(packet: Packet, opts = { preEncoded: false }) {
// opts = opts || {} // opts = opts || {}
// const self = this // const self = this
@ -178,7 +190,11 @@ export class Client extends EventEmitter implements Client {
// } else { // } else {
// debug("ignoring packet write %j", packet) // debug("ignoring packet write %j", packet)
// } // }
this.conn.send(opts.preEncoded ? packet as unknown as string : parser.encode(packet)) if ("open" == this.conn.readyState) {
this.conn.send(opts.preEncoded ? packet as unknown as string : parser.encode(packet))
} else {
console.debug(`ignoring write packet ${JSON.stringify(packet)} to client ${this.id} is already close!`)
}
} }
/** /**
* Called with incoming transport data. * Called with incoming transport data.
@ -202,15 +218,14 @@ export class Client extends EventEmitter implements Client {
if (SubPacketTypes.CONNECT == packet.sub_type) { if (SubPacketTypes.CONNECT == packet.sub_type) {
this.connect(packet.nsp, packet.data) this.connect(packet.nsp, packet.data)
} else { } else {
const socket = this.nsps.get(packet.nsp) process.nextTick(() => {
if (socket) { const socket = this.nsps.get(packet.nsp)
process.nextTick(function () { if (socket) {
socket._onpacket(packet) socket._onpacket(packet)
}) } else {
} else { console.debug(`client ${this.id} no socket for namespace ${packet.nsp}.`)
console.debug(`client ${this.id} no socket for namespace ${packet.nsp} avalibe socket: `) }
this.nsps.forEach((v, k) => console.debug(`- ${k} => ${v}`)) })
}
} }
} }
/** /**
@ -226,9 +241,16 @@ export class Client extends EventEmitter implements Client {
this.conn.close() this.conn.close()
} }
onclose(reason?: string) { onclose(reason?: string) {
this.conn.readyState = "closing"
// ======= engine.io
this.onClose(reason)
// cleanup connectTimeout
if (this.connectTimeout) {
clearTimeout(this.connectTimeout)
this.connectTimeout = null
}
console.debug(`client ${this.id} close with reason ${reason}`) console.debug(`client ${this.id} close with reason ${reason}`)
// ignore a potential subsequent `close` event // ignore a potential subsequent `close` event
this.destroy()
// `nsps` and `sockets` are cleaned up seamlessly // `nsps` and `sockets` are cleaned up seamlessly
for (const socket of this.sockets.values()) { for (const socket of this.sockets.values()) {
socket._onclose(reason) socket._onclose(reason)
@ -242,5 +264,97 @@ export class Client extends EventEmitter implements Client {
// this.conn.removeListener('error', this.onerror); // this.conn.removeListener('error', this.onerror);
// this.conn.removeListener('close', this.onclose); // this.conn.removeListener('close', this.onclose);
// this.decoder.removeListener('decoded', this.ondecoded); // this.decoder.removeListener('decoded', this.ondecoded);
}; }
//================== engine.io
onOpen() {
this.conn.readyState = "open"
this._packet({
type: PacketTypes.OPEN,
data: {
sid: this.id,
upgrades: [],
pingInterval: this.server.options.pingInterval,
pingTimeout: this.server.options.pingTimeout
}
})
this.schedulePing()
}
onPacket(packet: Packet) {
if ("open" === this.conn.readyState) {
// export packet event
// debug("packet")
// this.emit("packet", packet)
// Reset ping timeout on any packet, incoming data is a good sign of
// other side's liveness
this.resetPingTimeout(this.server.options.pingInterval + this.server.options.pingTimeout * 2)
switch (packet.type) {
case PacketTypes.PING:
this._packet({
type: PacketTypes.PONG,
data: packet.data
})
break
case PacketTypes.PONG:
this.schedulePing()
break
case PacketTypes.UPGRADE:
break
case PacketTypes.MESSAGE:
this.ondecoded(packet)
break
case PacketTypes.CLOSE:
this.onclose()
break
default:
console.log(`client ${this.id} reciver unknow packet type: ${packet.type}`)
}
} else {
console.debug(`packet received with closed client ${this.id}`)
}
}
/**
* Called upon transport considered closed.
* Possible reasons: `ping timeout`, `client error`, `parse error`,
* `transport error`, `server close`, `transport close`
*/
onClose(reason, description?: string) {
// if ("closed" !== this.conn.readyState) {
clearTimeout(this.pingIntervalTimer)
clearTimeout(this.pingTimeoutTimer)
clearInterval(this.checkIntervalTimer)
this.checkIntervalTimer = null
clearTimeout(this.upgradeTimeoutTimer)
// this.emit("close", reason, description)
// }
}
/**
* Pings client every `this.pingInterval` and expects response
* within `this.pingTimeout` or closes connection.
*
* @api private
*/
schedulePing() {
clearTimeout(this.pingIntervalTimer)
this.pingIntervalTimer = setTimeout(() => {
this.resetPingTimeout(this.server.options.pingTimeout)
process.nextTick(() => this._packet({ type: PacketTypes.PING }))
}, this.server.options.pingInterval)
}
/**
* Resets ping timeout.
*
* @api private
*/
resetPingTimeout(timeout) {
clearTimeout(this.pingTimeoutTimer)
this.pingTimeoutTimer = setTimeout(() => {
if (this.conn.readyState === "closed") return
this.onclose("ping timeout")
}, timeout)
}
} }

View File

@ -4,5 +4,6 @@ export enum ServerEvent {
connection = 'connection', connection = 'connection',
message = 'message', message = 'message',
error = 'error', error = 'error',
disconnect = 'disconnect' disconnecting = 'disconnecting',
disconnect = 'disconnect',
} }

View File

@ -8,7 +8,7 @@ import { PacketTypes, SubPacketTypes } from './types'
import { Packet } from './packet' import { Packet } from './packet'
import { Socket } from './socket' import { Socket } from './socket'
import { Adapter } from './adapter' import { Adapter } from './adapter'
import { InnerClient } from '../interfaces' import { Transport } from '../transport'
import { ParentNamespace } from './parent-namespace' import { ParentNamespace } from './parent-namespace'
interface EngineOptions { interface EngineOptions {
@ -144,7 +144,7 @@ class Server {
/** /**
* @private * @private
*/ */
readonly _parser: Parser _parser: Parser
private readonly encoder private readonly encoder
/** /**
@ -172,29 +172,33 @@ class Server {
*/ */
_connectTimeout: number _connectTimeout: number
options: ServerOptions options: Partial<ServerOptions>
private websocketServer: WebSocketServer private websocketServer: WebSocketServer
private allClients: Map<string, Client> private allClients: Map<string, Client>
constructor(instance: any, options: Partial<ServerOptions>) { constructor(instance: any, options: Partial<ServerOptions>) {
if (!instance) { throw new Error('instance can\'t be undefiend!') } if (!instance) { throw new Error('instance can\'t be undefiend!') }
this.allClients = new Map() this.options = Object.assign({
this._nsps = new Map() event: new EventEmitter(),
this.connectTimeout(options.connectTimeout || 45000) path: '/socket.io',
this._parser = options.parser || new Parser() root: root + '/wwwroot',
this.adapter(options.adapter || Adapter) serveClient: false,
connectTimeout: 45000,
wsEngine: process.env.EIO_WS_ENGINE || "ws",
pingTimeout: 5000,
pingInterval: 25000,
upgradeTimeout: 10000,
maxHttpBufferSize: 1e6,
transports: 'websocket',
allowUpgrades: true,
httpCompression: {
threshold: 1024
},
cors: false
}, options)
this.initServerConfig()
this.sockets = this.of('/') this.sockets = this.of('/')
if (instance.class.name.startsWith('io.netty.channel')) { this.selectServerImpl(instance)
let { NettyWebSocketServer } = require("../netty")
this.websocketServer = new NettyWebSocketServer(instance, Object.assign({
event: new EventEmitter(),
path: '/socket.io',
root: root + '/wwwroot'
}, options))
} else {
let { TomcatWebSocketServer } = require("../tomcat")
this.websocketServer = new TomcatWebSocketServer(instance, options)
}
this.initServer() this.initServer()
} }
/** /**
@ -496,15 +500,6 @@ class Server {
console.debug(`incoming connection with id ${conn.id}`) console.debug(`incoming connection with id ${conn.id}`)
let client = new Client(this, conn) let client = new Client(this, conn)
this.allClients.set(conn.id, client) this.allClients.set(conn.id, client)
client._packet({
type: PacketTypes.OPEN,
data: {
sid: client.id,
upgrades: [],
pingInterval: 25000,
pingTimeout: 5000
}
})
return this return this
} }
// of(nsp: string): Namespace { // of(nsp: string): Namespace {
@ -560,8 +555,9 @@ class Server {
return nsp return nsp
} }
close(fn?: () => void): void { close(fn?: () => void): void {
for (const socket of this.sockets.sockets.values()) { this.clients.length
socket._onclose("server shutting down") for (const client of this.allClients.values()) {
client._disconnect()
} }
// this.engine.close() // this.engine.close()
@ -605,70 +601,57 @@ class Server {
return this.sockets.compress(args[0]) return this.sockets.compress(args[0])
} }
// =============================== // ===============================
private initServerConfig() {
this.allClients = new Map()
this._nsps = new Map()
this.connectTimeout(this.options.connectTimeout || 45000)
this._parser = this.options.parser || new Parser()
this.adapter(this.options.adapter || Adapter)
}
private selectServerImpl(instance: any) {
let WebSocketServerImpl = undefined
if (instance.class.name.startsWith('io.netty.channel')) {
WebSocketServerImpl = require("../netty").NettyWebSocketServer
} else {
WebSocketServerImpl = require("../tomcat").TomcatWebSocketServer
}
this.websocketServer = new WebSocketServerImpl(instance, this.options)
}
private initServer() { private initServer() {
this.websocketServer.on(ServerEvent.connect, (innerClient: InnerClient) => { this.websocketServer.on(ServerEvent.connect, (transport: Transport) => {
this.onconnection(innerClient) this.onconnection(transport)
}) })
this.websocketServer.on(ServerEvent.message, (innerClient: InnerClient, text) => { this.websocketServer.on(ServerEvent.message, (transport: Transport, text) => {
if (this.allClients.has(innerClient.id)) { if (this.allClients.has(transport.id)) {
this.processPacket(this._parser.decode(text), this.allClients.get(innerClient.id)) let client = this.allClients.get(transport.id)
client.onPacket(this._parser.decode(text))
} else { } else {
console.error(`unknow engine socket ${innerClient.id} reciver message ${text}`) console.error(`unknow transport ${transport.id} reciver message ${text}`)
} }
}) })
this.websocketServer.on(ServerEvent.disconnect, (innerClient: InnerClient, reason) => { this.websocketServer.on(ServerEvent.disconnect, (transport: Transport, reason) => {
if (this.allClients.has(innerClient.id)) { if (this.allClients.has(transport.id)) {
this.allClients.get(innerClient.id).onclose(reason) this.allClients.get(transport.id).onclose(reason)
this.allClients.delete(innerClient.id) this.allClients.delete(transport.id)
} else { } else {
console.error(`unknow engine innerClient ${innerClient?.id} disconnect cause ${reason}`) console.error(`unknow transport ${transport?.id} disconnect cause ${reason}`)
} }
}) })
this.websocketServer.on(ServerEvent.error, (innerClient: InnerClient, cause) => { this.websocketServer.on(ServerEvent.error, (transport: Transport, cause) => {
if (this.allClients.has(innerClient?.id)) { if (this.allClients.has(transport?.id)) {
let client = this.allClients.get(innerClient?.id) let client = this.allClients.get(transport?.id)
if (client.listeners(ServerEvent.error).length) { if (client.listeners(ServerEvent.error).length) {
client.emit(ServerEvent.error, cause) client.emit(ServerEvent.error, cause)
} else { } else {
console.error(`engine innerClient ${innerClient.id} cause error: ${cause}`) console.error(`client ${client.id} cause error: ${cause}`)
console.ex(cause) console.ex(cause)
} }
} else { } else {
console.error(`unknow innerClient ${innerClient?.id} cause error: ${cause}`) console.error(`unknow transport ${transport?.id} cause error: ${cause}`)
console.ex(cause) console.ex(cause)
} }
}) })
} }
private processPacket(packet: Packet, client: Client) {
switch (packet.type) {
case PacketTypes.PING:
client._packet({
type: PacketTypes.PONG,
data: packet.data
})
break
case PacketTypes.UPGRADE:
break
case PacketTypes.MESSAGE:
this.processSubPacket(packet, client)
break
case PacketTypes.CLOSE:
client.onclose()
break
}
}
private processSubPacket(packet: Packet, client: Client) {
switch (packet.sub_type) {
case SubPacketTypes.CONNECT:
client.doConnect(packet.nsp, {})
break
default:
client.ondecoded(packet)
break
}
}
} }
/** /**

View File

@ -58,7 +58,6 @@ export class Namespace extends EventEmitter {
public use( public use(
fn: (socket: Socket, next: (err?: ExtendedError) => void) => void fn: (socket: Socket, next: (err?: ExtendedError) => void) => void
): Namespace { ): Namespace {
throw new Error("Method not implemented.")
this._fns.push(fn) this._fns.push(fn)
return this return this
} }
@ -96,8 +95,8 @@ export class Namespace extends EventEmitter {
return this.to(name) return this.to(name)
} }
_add(client: Client, query?: any, fn?: () => void) { _add(client: Client, query?: any, fn?: () => void) {
console.debug(`adding socket to nsp ${this.name}`)
const socket = new Socket(this, client, query || {}) const socket = new Socket(this, client, query || {})
console.debug(`client ${client.id} adding socket ${socket.id} to nsp ${this.name}`)
this.run(socket, err => { this.run(socket, err => {
process.nextTick(() => { process.nextTick(() => {
if ("open" == client.conn.readyState) { if ("open" == client.conn.readyState) {
@ -137,7 +136,7 @@ export class Namespace extends EventEmitter {
console.debug(`namespace ${this.name} remove socket ${socket.id}`) console.debug(`namespace ${this.name} remove socket ${socket.id}`)
this.sockets.delete(socket.id) this.sockets.delete(socket.id)
} else { } else {
console.debug(`ignoring remove for ${socket.id}`) console.debug(`namespace ${this.name} ignoring remove for ${socket.id}`)
} }
} }
emit(event: string, ...args: any[]): boolean { emit(event: string, ...args: any[]): boolean {
@ -185,9 +184,7 @@ export class Namespace extends EventEmitter {
*/ */
public allSockets(): Promise<Set<SocketId>> { public allSockets(): Promise<Set<SocketId>> {
if (!this.adapter) { if (!this.adapter) {
throw new Error( throw new Error("No adapter for this namespace, are you trying to get the list of clients of a dynamic namespace?")
"No adapter for this namespace, are you trying to get the list of clients of a dynamic namespace?"
)
} }
const rooms = new Set(this._rooms) const rooms = new Set(this._rooms)
this._rooms.clear() this._rooms.clear()
@ -233,24 +230,9 @@ export class Namespace extends EventEmitter {
hasBin(args: any[]) { hasBin(args: any[]) {
return false return false
} }
del(client: Client) {
let socket = this.sockets[client.id]
socket.disconnect()
delete this.sockets[client.id]
}
clients(fn: (sockets: Socket[]) => Namespace): Namespace { clients(fn: (sockets: Socket[]) => Namespace): Namespace {
return fn(Object.values(this.sockets)) return fn(Object.values(this.sockets))
} }
process(packet: Packet, client: Client) {
switch (packet.sub_type) {
case SubPacketTypes.CONNECT:
client.doConnect(this.name, {})
break
default:
this.sockets.get(client.id)._onpacket(packet)
break
}
}
close() { close() {
this.removeAllListeners(ServerEvent.connect) this.removeAllListeners(ServerEvent.connect)
this.removeAllListeners(ServerEvent.connection) this.removeAllListeners(ServerEvent.connection)

View File

@ -88,15 +88,6 @@ export class Socket extends EventEmitter {
private _rooms: Set<Room> = new Set(); private _rooms: Set<Room> = new Set();
private _anyListeners: Array<(...args: any[]) => void> private _anyListeners: Array<(...args: any[]) => void>
private events = [
'connect',
"connect_error",
'disconnect',
'disconnecting',
'newListener',
'removeListener'
]
constructor(nsp: Namespace, client: Client, auth = {}) { constructor(nsp: Namespace, client: Client, auth = {}) {
super() super()
this.nsp = nsp this.nsp = nsp
@ -114,12 +105,6 @@ export class Socket extends EventEmitter {
this._rooms = new Set() this._rooms = new Set()
} }
emit(event: string, ...args: any[]): boolean { emit(event: string, ...args: any[]): boolean {
if (~this.events.indexOf(event)) {
super.emit(event, ...args)
// @ts-ignore
return this
}
let packet: Packet = { let packet: Packet = {
type: PacketTypes.MESSAGE, type: PacketTypes.MESSAGE,
sub_type: (this.flags.binary !== undefined ? this.flags.binary : this.hasBin(args)) ? SubPacketTypes.BINARY_EVENT : SubPacketTypes.EVENT, sub_type: (this.flags.binary !== undefined ? this.flags.binary : this.hasBin(args)) ? SubPacketTypes.BINARY_EVENT : SubPacketTypes.EVENT,
@ -213,9 +198,9 @@ export class Socket extends EventEmitter {
* @private * @private
*/ */
_onconnect(): void { _onconnect(): void {
console.debug("socket connected - writing packet") console.debug(`socket ${this.id} connected - writing packet`)
this.join(this.id) this.join(this.id)
this.packet({ type: PacketTypes.MESSAGE, sub_type: SubPacketTypes.CONNECT }) this.packet({ type: PacketTypes.MESSAGE, sub_type: SubPacketTypes.CONNECT, data: { sid: this.id } })
} }
_onpacket(packet: Packet) { _onpacket(packet: Packet) {
switch (packet.sub_type) { switch (packet.sub_type) {
@ -246,7 +231,7 @@ export class Socket extends EventEmitter {
} }
onevent(packet: Packet) { onevent(packet: Packet) {
if (null != packet.id) { if (null != packet.id) {
console.debug('attaching ack callback to event') console.trace(`attaching ack ${packet.id} callback to client ${this.id} event`)
this.dispatch(packet, this.ack(packet.id)) this.dispatch(packet, this.ack(packet.id))
} else { } else {
this.dispatch(packet) this.dispatch(packet)
@ -266,13 +251,13 @@ export class Socket extends EventEmitter {
} }
} }
onack(packet: Packet) { onack(packet: Packet) {
let ack = this.acks[packet.id] let ack = this.acks.get(packet.id)
if ('function' == typeof ack) { if ('function' == typeof ack) {
console.debug('calling ack %s with %j', packet.id, packet.data) console.trace(`calling ack ${packet.id} on socket ${this.id} with ${packet.data}`)
ack.apply(this, packet.data) ack.apply(this, packet.data)
delete this.acks[packet.id] this.acks.delete(packet.id)
} else { } else {
console.debug('bad ack %s', packet.id) console.trace(`bad ack ${packet.id} on socket ${this.id}`)
} }
} }
/** /**
@ -281,7 +266,7 @@ export class Socket extends EventEmitter {
* @private * @private
*/ */
private ondisconnect(): void { private ondisconnect(): void {
console.debug("got disconnect packet") console.debug(`socket ${this.id} got disconnect packet`)
this._onclose("client namespace disconnect") this._onclose("client namespace disconnect")
} }
@ -294,7 +279,7 @@ export class Socket extends EventEmitter {
if (this.listeners("error").length) { if (this.listeners("error").length) {
super.emit("error", err) super.emit("error", err)
} else { } else {
console.error("Missing error handler on `socket`.") console.error(`Missing error handler on 'socket(${this.id})'.`)
console.error(err.stack) console.error(err.stack)
} }
} }
@ -308,15 +293,15 @@ export class Socket extends EventEmitter {
* @private * @private
*/ */
_onclose(reason: string) { _onclose(reason: string) {
console.debug(`closing socket - reason: ${reason} connected: ${this.connected}`)
if (!this.connected) return this if (!this.connected) return this
this.emit('disconnecting', reason) console.debug(`closing socket ${this.id} - reason: ${reason} connected: ${this.connected}`)
super.emit(ServerEvent.disconnecting, reason)
this.leaveAll() this.leaveAll()
this.nsp._remove(this) this.nsp._remove(this)
this.client._remove(this) this.client._remove(this)
this.connected = false this.connected = false
this.disconnected = true this.disconnected = true
this.emit('disconnect', reason) super.emit(ServerEvent.disconnect, reason)
} }
/** /**
@ -496,8 +481,8 @@ export class Socket extends EventEmitter {
this._onerror(error) this._onerror(error)
} }
} }
dispatch(packet: Packet, ack?: Function) { dispatch(packet: Packet, ack?: () => void) {
if (ack) { this.acks[packet.id] = ack } if (ack) { this.acks.set(packet.id, ack) }
super.emit(packet.name, ...packet.data, ack) super.emit(packet.name, ...packet.data, ack)
} }
private hasBin(obj: any) { private hasBin(obj: any) {

View File

@ -1,45 +1,22 @@
import { EventEmitter } from 'events' import { Transport } from '../transport'
import { InnerClient } from '../interfaces'
export class TomcatClient extends EventEmitter implements InnerClient { export class TomcatClient extends Transport {
private _id: string
private session: javax.websocket.Session private session: javax.websocket.Session
server: any
readyState: string
remoteAddress: string
upgraded: boolean
request: any
constructor(server: any, session: javax.websocket.Session) { constructor(server: any, session: javax.websocket.Session) {
super() super(server)
this.server = server
this.readyState = 'open'
this.remoteAddress = session + '' this.remoteAddress = session + ''
this.upgraded = true
this.request = { this.request = {
uri: () => `${session.getRequestURI()}`, uri: () => `${session.getRequestURI()}`,
headers: () => [] headers: () => []
} }
this.session = session
this._id = session.getId() + '' this._id = session.getId() + ''
this.session = session
} }
doSend(text: string) {
get id() { Java.synchronized(() => this.session.getBasicRemote().sendText(text), this.session)()
return this._id
} }
send(text: string) { doClose() {
if (this.readyState == 'open') { this.session.close()
Java.synchronized(() => this.session.getBasicRemote().sendText(text), this.session)()
} else {
console.debug(`send message ${text} to close client ${this._id}`)
}
}
close() {
if (this.readyState == 'open') {
this.readyState = 'close'
this.session.close()
}
} }
} }

View File

@ -0,0 +1,35 @@
import { EventEmitter } from 'events'
export abstract class Transport extends EventEmitter {
protected _id: string
server: any
readyState: 'opening' | 'open' | 'closing' | 'closed'
remoteAddress: string
upgraded: boolean
request: any
constructor(server: any) {
super()
this.server = server
this.readyState = 'open'
this.upgraded = true
}
get id() {
return this._id
}
send(text: string) {
if (this.readyState == 'open') {
this.doSend(text)
} else {
console.debug(`send message ${text} to close client ${this._id}`)
}
}
close() {
if ("closed" === this.readyState || "closing" === this.readyState) { return }
this.doClose()
this.readyState = 'closed'
}
abstract doSend(text: string)
abstract doClose()
}