feat: optimize websocket client

This commit is contained in:
MiaoWoo 2023-07-30 16:46:34 +08:00
parent 27b428fbe2
commit 23bc6068b5
9 changed files with 78 additions and 79 deletions

View File

@ -13,9 +13,9 @@
"ug": "yarn upgrade-interactive", "ug": "yarn upgrade-interactive",
"np": "./script/push.sh", "np": "./script/push.sh",
"lsp": "npm login -scope=@ccms", "lsp": "npm login -scope=@ccms",
"lp": "lerna publish --verify-access --force-publish", "lp": "lerna publish --force-publish",
"lpb": "lerna publish --preid beta --dist-tag beta --verify-access --force-publish", "lpb": "lerna publish --preid beta --dist-tag beta --force-publish",
"lpc": "lerna publish --canary --preid beta --pre-dist-tag beta --verify-access --force-publish", "lpc": "lerna publish --canary --preid beta --pre-dist-tag beta --force-publish",
"lpf": "lerna publish from-package --yes", "lpf": "lerna publish from-package --yes",
"sync": "./script/sync.sh" "sync": "./script/sync.sh"
}, },

View File

@ -1,7 +1,7 @@
import { EventEmitter } from 'events' import { EventEmitter } from 'events'
import { Transport } from './transport' import { Transport } from './transport'
import { CloseEvent, ErrorEvent, Event, EventType, MessageEvent, WebSocketHeader } from './interface' import { ClientEvent, CloseEvent, ErrorEvent, Event, EventType, MessageEvent, WebSocketHeader } from './interface'
export class WebSocketManager { export class WebSocketManager {
private clients = new Map<string, WebSocket>() private clients = new Map<string, WebSocket>()
@ -18,7 +18,9 @@ export class WebSocketManager {
add(client: WebSocket) { add(client: WebSocket) {
this.clients.set(client.id, client) this.clients.set(client.id, client)
} }
del(client: WebSocket) { del(client: WebSocket) {
client.removeAllListeners()
this.clients.delete(client.id) this.clients.delete(client.id)
} }
} }
@ -52,16 +54,7 @@ export class WebSocket extends EventEmitter {
console.ex(error) console.ex(error)
return return
} }
this.client.on('open', (event) => {
this.onopen?.(event)
manager.add(this) manager.add(this)
})
this.client.on('message', (event) => this.onmessage?.(event))
this.client.on('close', (event) => {
this.onclose?.(event)
manager.del(this)
})
this.client.on('error', (event) => this.onerror?.(event))
setTimeout(() => this.client.connect(), 20) setTimeout(() => this.client.connect(), 20)
} }
get id() { get id() {
@ -82,21 +75,25 @@ export class WebSocket extends EventEmitter {
get url() { get url() {
return this._url return this._url
} }
public onopen: (event: Event) => void set onopen(func: (event: Event) => void) {
public onmessage: (event: MessageEvent) => void this.client.on(ClientEvent.open, func)
public onclose: (event: CloseEvent) => void }
public onerror: (event: ErrorEvent) => void set onmessage(func: (event: MessageEvent) => void) {
this.client.on(ClientEvent.message, func)
addEventListener(event: EventType, callback: () => void) { }
this[`on${event.toLowerCase()}`] = callback set onclose(func: (event: CloseEvent) => void) {
this.client.on(event, callback) this.client.on(ClientEvent.close, func)
manager.del(this)
}
set onerror(func: (event: ErrorEvent) => void) {
this.client.on(ClientEvent.error, func)
} }
public send(data: any) { public send(data: any) {
this.client.send(data) this.client.send(data)
} }
public close(code?: number, reason?: string) { public close(code?: number, reason?: string) {
this.client.close(code, reason) this.client.close(code, reason)
this.removeAllListeners() manager.del(this)
} }
} }
global.setGlobal('WebSocket', WebSocket) global.setGlobal('WebSocket', WebSocket)

View File

@ -4,10 +4,18 @@ export interface WebSocketHeader {
} }
export type EventType = export type EventType =
| 'close' | ClientEvent.open
| 'error' | ClientEvent.message
| 'message' | ClientEvent.close
| 'open' | ClientEvent.error
export enum ClientEvent {
open = 'open',
message = 'message',
close = 'close',
error = 'error',
}
export interface Event { export interface Event {
} }

View File

@ -20,7 +20,6 @@ export class WebSocketClientHandler extends WebSocketClientHandlerAdapter {
return true return true
} }
handlerAdded(ctx: any) { handlerAdded(ctx: any) {
console.debug(`${ctx} handlerAdded`)
if (ctx.newPromise) { if (ctx.newPromise) {
this.handshakeFuture = ctx.newPromise() this.handshakeFuture = ctx.newPromise()
} else { } else {
@ -28,15 +27,14 @@ export class WebSocketClientHandler extends WebSocketClientHandlerAdapter {
} }
} }
channelActive(ctx: any) { channelActive(ctx: any) {
console.debug(`${ctx} channelActive`)
this.handshaker.handshake(ctx.channel()) this.handshaker.handshake(ctx.channel())
} }
channelInactive(ctx: any) { channelInactive(ctx: any) {
console.debug(`${ctx} channelInactive`) if (this.client.readyStatus != WebSocket.CLOSED) {
this.client.onclose({ code: 0, reason: 'client connection channel inactive!' }) this.client.onclose({ code: 1006, reason: 'client connection channel inactive.' })
}
} }
channelRead0(ctx: any, msg: any) { channelRead0(ctx: any, msg: any) {
console.trace(`${ctx} channelRead0 ${msg}`)
let ch = ctx.channel() let ch = ctx.channel()
if (!this.handshaker.isHandshakeComplete()) { if (!this.handshaker.isHandshakeComplete()) {
// web socket client connected // web socket client connected
@ -54,7 +52,7 @@ export class WebSocketClientHandler extends WebSocketClientHandlerAdapter {
if (frame instanceof TextWebSocketFrame) { if (frame instanceof TextWebSocketFrame) {
this.client.onmessage({ data: frame.text() }) this.client.onmessage({ data: frame.text() })
} else if (frame instanceof CloseWebSocketFrame) { } else if (frame instanceof CloseWebSocketFrame) {
this.client.onclose({ code: 0, reason: 'server close connection!' }) this.client.close(1000, 'server close connection.')
} }
} }
exceptionCaught(ctx: any, cause: Error) { exceptionCaught(ctx: any, cause: Error) {

View File

@ -85,10 +85,7 @@ export class NettyWebSocket extends Transport {
console.debug(`constructor NettyWebSocket url: ${url} scheme: ${this._schema} host: ${this._host} port: ${this._port} header: ${JSON.stringify(headers)}`) console.debug(`constructor NettyWebSocket url: ${url} scheme: ${this._schema} host: ${this._host} port: ${this._port} header: ${JSON.stringify(headers)}`)
} }
getId() { getId() {
if (this.channel?.id) { return `${this.channel?.id()}` || `NettyWebSocket#${channelCount.incrementAndGet()}`
return this.channel?.id() + ''
}
return 'NettyWebSocket#' + channelCount.incrementAndGet()
} }
doConnect() { doConnect() {
console.debug('client NettyWebSocket doConnect', this._url) console.debug('client NettyWebSocket doConnect', this._url)
@ -147,8 +144,9 @@ export class NettyWebSocket extends Transport {
} }
doClose(code: number, reason: string) { doClose(code: number, reason: string) {
this.channel.writeAndFlush(new CloseWebSocketFrame()) this.channel.writeAndFlush(new CloseWebSocketFrame())
this.channel.close() this.channel.closeFuture().addListener(new ChannelFutureListener(() => {
this.channel.closeFuture().addListener(new ChannelFutureListener(() => console.debug(`NettyWebSocket close code: ${code} reason: ${reason}`))) this.onclose({ code, reason })
}))
} }
getChannel() { getChannel() {
return this.channel return this.channel

View File

@ -1,6 +1,6 @@
import { EventEmitter } from 'events' import { EventEmitter } from 'events'
import { WebSocket } from './index' import { WebSocket } from './index'
import { CloseEvent, ErrorEvent, Event, MessageEvent, WebSocketHeader } from './interface' import { ClientEvent, CloseEvent, ErrorEvent, Event, MessageEvent, WebSocketHeader } from './interface'
export abstract class Transport extends EventEmitter { export abstract class Transport extends EventEmitter {
protected _url: string protected _url: string
@ -32,7 +32,6 @@ export abstract class Transport extends EventEmitter {
} }
connect() { connect() {
console.debug(`client Transport connect`)
try { try {
this.doConnect() this.doConnect()
} catch (error: any) { } catch (error: any) {
@ -49,19 +48,16 @@ export abstract class Transport extends EventEmitter {
} }
} }
close(code: number = 0, reason: string = '') { close(code: number = 1000, reason: string = '') {
if (this.readyStatus != WebSocket.CLOSING && this.readyStatus != WebSocket.CLOSED) { if (this.readyStatus < WebSocket.CLOSING) {
this.readyStatus = WebSocket.CLOSING this.readyStatus = WebSocket.CLOSING
try { try {
this.onclose({ code, reason })
this.doClose(code, reason) this.doClose(code, reason)
} catch (error: any) { } catch (error: any) {
this.onerror({ error }) this.onerror({ error })
} finally {
this.removeAllListeners()
} }
} else { } else {
console.debug(`${this.id} call close but state is ${this.readyStatus}`) console.debug(`WebSocket Transport ${this.id} call close code ${code} reason ${reason} but state is ${this.readyStatus}`)
} }
} }
@ -73,31 +69,31 @@ export abstract class Transport extends EventEmitter {
onconnect(event: Event) { onconnect(event: Event) {
if (this.readyStatus != WebSocket.OPEN) { if (this.readyStatus != WebSocket.OPEN) {
this.readyStatus = WebSocket.OPEN this.readyStatus = WebSocket.OPEN
this.emit('open', event) this.emit(ClientEvent.open, event)
} else { } else {
console.debug(`${this.id} call onconnect but state is ${this.readyStatus}`) console.debug(`WebSocket Transport ${this.id} call onconnect but state is ${this.readyStatus}`)
} }
} }
onmessage(event: MessageEvent) { onmessage(event: MessageEvent) {
this.emit('message', event) this.emit(ClientEvent.message, event)
} }
onerror(event: ErrorEvent) { onerror(event: ErrorEvent) {
this.emit('error', event) this.emit(ClientEvent.error, event)
} }
onclose(event: CloseEvent) { onclose(event: CloseEvent) {
if (this.readyStatus != WebSocket.CLOSED) { if (this.readyStatus != WebSocket.CLOSED) {
this.readyStatus = WebSocket.CLOSED this.readyStatus = WebSocket.CLOSED
this.emit('close', event) this.emit(ClientEvent.close, event)
this.removeAllListeners()
} else { } else {
console.debug(`${this.id} call onclose but state is ${this.readyStatus} CloseEvent[code: ${event.code}, reason: ${event.reason}]`) console.debug(`WebSocket Transport ${this.id} call onclose but state is ${this.readyStatus} CloseEvent[code: ${event.code}, reason: ${event.reason}]`)
} }
} }
abstract getId()
abstract doConnect() abstract getId(): string
abstract doSend(text: string) abstract doConnect(): void
abstract doClose(code: number, reason: string) abstract doSend(text: string): void
abstract doClose(code: number, reason: string): void
} }

View File

@ -1,7 +1,7 @@
/// <reference types="@ccms/nashorn" /> /// <reference types="@ccms/nashorn" />
/// <reference types="@javatypes/tomcat-websocket-api" /> /// <reference types="@javatypes/tomcat-websocket-api" />
import * as server from './server' import { WebSocketServer } from './server'
import { Server, ServerOptions } from './socket.io' import { Server, ServerOptions } from './socket.io'
interface SocketIOStatic { interface SocketIOStatic {
@ -45,7 +45,7 @@ let singletonServer: Server
let io: SocketStatic = function io(pipeline: any, options: Partial<JavaServerOptions>, singleton = true) { let io: SocketStatic = function io(pipeline: any, options: Partial<JavaServerOptions>, singleton = true) {
if (singleton) { if (singleton) {
if (!singletonServer) { if (!singletonServer) {
singletonServer = new Server(server.attach(pipeline, options), options) singletonServer = new Server(WebSocketServer.attach(pipeline, options), options)
process.emit('websocket.create', singletonServer) process.emit('websocket.create', singletonServer)
process.on('exit', () => { process.on('exit', () => {
singletonServer.close() singletonServer.close()
@ -53,7 +53,7 @@ let io: SocketStatic = function io(pipeline: any, options: Partial<JavaServerOpt
} }
return singletonServer return singletonServer
} }
return new Server(server.attach(pipeline, options), options) return new Server(WebSocketServer.attach(pipeline, options), options)
} }
io.Instance = Symbol("@ccms/websocket") io.Instance = Symbol("@ccms/websocket")
export default io export default io

View File

@ -26,6 +26,24 @@ export abstract class WebSocketServer extends EventEmitter {
protected instance: any protected instance: any
protected options: JavaServerOptions protected options: JavaServerOptions
private clients: Map<string, WebSocketClient> private clients: Map<string, WebSocketClient>
public static attach(instance, options) {
if (!instance) { throw new Error('instance can\'t be undefiend!') }
options = Object.assign({
event: new EventEmitter(),
path: '/ws',
root: root + Java.type("java.io.File").separatorChar + 'wwwroot',
}, options)
let WebSocketServerImpl = undefined
if (instance.class.name.startsWith('io.netty.channel')) {
WebSocketServerImpl = require("./netty").NettyWebSocketServer
} else {
WebSocketServerImpl = require("./tomcat").TomcatWebSocketServer
}
console.debug('create websocket server from ' + WebSocketServerImpl.name)
return new WebSocketServerImpl(instance, options)
}
constructor(instance: any, options: JavaServerOptions) { constructor(instance: any, options: JavaServerOptions) {
super() super()
this.instance = instance this.instance = instance
@ -69,20 +87,3 @@ export abstract class WebSocketServer extends EventEmitter {
protected abstract getSocket(handler: any): WebSocketClient protected abstract getSocket(handler: any): WebSocketClient
protected abstract doClose(): void protected abstract doClose(): void
} }
export const attach = (instance, options) => {
if (!instance) { throw new Error('instance can\'t be undefiend!') }
options = Object.assign({
event: new EventEmitter(),
path: '/ws',
root: root + Java.type("java.io.File").separatorChar + 'wwwroot',
}, options)
let WebSocketServerImpl = undefined
if (instance.class.name.startsWith('io.netty.channel')) {
WebSocketServerImpl = require("./netty").NettyWebSocketServer
} else {
WebSocketServerImpl = require("./tomcat").TomcatWebSocketServer
}
console.debug('create websocket server from ' + WebSocketServerImpl.name)
return new WebSocketServerImpl(instance, options)
}

View File

@ -1,7 +1,8 @@
#!/bin/bash #!/bin/bash
DISTTAG=${1:latest}
for package in `ls packages`; do for package in `ls packages`; do
echo $package echo $package
pushd packages/$package pushd packages/$package
npm publish --access=public --registry https://registry.npmjs.org npm publish --tag ${DISTTAG} --access=public --registry https://registry.npmjs.org
popd popd
done done