From 23bc6068b5b37dc2dbba8669bdf1aded848eecd0 Mon Sep 17 00:00:00 2001 From: MiaoWoo Date: Sun, 30 Jul 2023 16:46:34 +0800 Subject: [PATCH] feat: optimize websocket client --- package.json | 6 +-- packages/websocket/src/client/index.ts | 37 +++++++++---------- packages/websocket/src/client/interface.ts | 16 ++++++-- .../websocket/src/client/netty/handler.ts | 10 ++--- packages/websocket/src/client/netty/index.ts | 10 ++--- packages/websocket/src/client/transport.ts | 34 ++++++++--------- packages/websocket/src/index.ts | 6 +-- packages/websocket/src/server/index.ts | 35 +++++++++--------- script/push.sh | 3 +- 9 files changed, 78 insertions(+), 79 deletions(-) diff --git a/package.json b/package.json index f23c674b..69017e0e 100644 --- a/package.json +++ b/package.json @@ -13,9 +13,9 @@ "ug": "yarn upgrade-interactive", "np": "./script/push.sh", "lsp": "npm login -scope=@ccms", - "lp": "lerna publish --verify-access --force-publish", - "lpb": "lerna publish --preid beta --dist-tag beta --verify-access --force-publish", - "lpc": "lerna publish --canary --preid beta --pre-dist-tag beta --verify-access --force-publish", + "lp": "lerna publish --force-publish", + "lpb": "lerna publish --preid beta --dist-tag beta --force-publish", + "lpc": "lerna publish --canary --preid beta --pre-dist-tag beta --force-publish", "lpf": "lerna publish from-package --yes", "sync": "./script/sync.sh" }, diff --git a/packages/websocket/src/client/index.ts b/packages/websocket/src/client/index.ts index 05a76c40..6753beea 100644 --- a/packages/websocket/src/client/index.ts +++ b/packages/websocket/src/client/index.ts @@ -1,7 +1,7 @@ import { EventEmitter } from 'events' import { Transport } from './transport' -import { CloseEvent, ErrorEvent, Event, EventType, MessageEvent, WebSocketHeader } from './interface' +import { ClientEvent, CloseEvent, ErrorEvent, Event, EventType, MessageEvent, WebSocketHeader } from './interface' export class WebSocketManager { private clients = new Map() @@ -18,7 +18,9 @@ export class WebSocketManager { add(client: WebSocket) { this.clients.set(client.id, client) } + del(client: WebSocket) { + client.removeAllListeners() this.clients.delete(client.id) } } @@ -52,16 +54,7 @@ export class WebSocket extends EventEmitter { console.ex(error) return } - this.client.on('open', (event) => { - this.onopen?.(event) - manager.add(this) - }) - this.client.on('message', (event) => this.onmessage?.(event)) - this.client.on('close', (event) => { - this.onclose?.(event) - manager.del(this) - }) - this.client.on('error', (event) => this.onerror?.(event)) + manager.add(this) setTimeout(() => this.client.connect(), 20) } get id() { @@ -82,21 +75,25 @@ export class WebSocket extends EventEmitter { get url() { return this._url } - public onopen: (event: Event) => void - public onmessage: (event: MessageEvent) => void - public onclose: (event: CloseEvent) => void - public onerror: (event: ErrorEvent) => void - - addEventListener(event: EventType, callback: () => void) { - this[`on${event.toLowerCase()}`] = callback - this.client.on(event, callback) + set onopen(func: (event: Event) => void) { + this.client.on(ClientEvent.open, func) + } + set onmessage(func: (event: MessageEvent) => void) { + this.client.on(ClientEvent.message, func) + } + set onclose(func: (event: CloseEvent) => void) { + this.client.on(ClientEvent.close, func) + manager.del(this) + } + set onerror(func: (event: ErrorEvent) => void) { + this.client.on(ClientEvent.error, func) } public send(data: any) { this.client.send(data) } public close(code?: number, reason?: string) { this.client.close(code, reason) - this.removeAllListeners() + manager.del(this) } } global.setGlobal('WebSocket', WebSocket) diff --git a/packages/websocket/src/client/interface.ts b/packages/websocket/src/client/interface.ts index 27494afb..050b2e64 100644 --- a/packages/websocket/src/client/interface.ts +++ b/packages/websocket/src/client/interface.ts @@ -4,10 +4,18 @@ export interface WebSocketHeader { } export type EventType = - | 'close' - | 'error' - | 'message' - | 'open' + | ClientEvent.open + | ClientEvent.message + | ClientEvent.close + | ClientEvent.error + +export enum ClientEvent { + open = 'open', + message = 'message', + close = 'close', + error = 'error', +} + export interface Event { } diff --git a/packages/websocket/src/client/netty/handler.ts b/packages/websocket/src/client/netty/handler.ts index 60d2af2e..5c418fbe 100644 --- a/packages/websocket/src/client/netty/handler.ts +++ b/packages/websocket/src/client/netty/handler.ts @@ -20,7 +20,6 @@ export class WebSocketClientHandler extends WebSocketClientHandlerAdapter { return true } handlerAdded(ctx: any) { - console.debug(`${ctx} handlerAdded`) if (ctx.newPromise) { this.handshakeFuture = ctx.newPromise() } else { @@ -28,15 +27,14 @@ export class WebSocketClientHandler extends WebSocketClientHandlerAdapter { } } channelActive(ctx: any) { - console.debug(`${ctx} channelActive`) this.handshaker.handshake(ctx.channel()) } channelInactive(ctx: any) { - console.debug(`${ctx} channelInactive`) - this.client.onclose({ code: 0, reason: 'client connection channel inactive!' }) + if (this.client.readyStatus != WebSocket.CLOSED) { + this.client.onclose({ code: 1006, reason: 'client connection channel inactive.' }) + } } channelRead0(ctx: any, msg: any) { - console.trace(`${ctx} channelRead0 ${msg}`) let ch = ctx.channel() if (!this.handshaker.isHandshakeComplete()) { // web socket client connected @@ -54,7 +52,7 @@ export class WebSocketClientHandler extends WebSocketClientHandlerAdapter { if (frame instanceof TextWebSocketFrame) { this.client.onmessage({ data: frame.text() }) } else if (frame instanceof CloseWebSocketFrame) { - this.client.onclose({ code: 0, reason: 'server close connection!' }) + this.client.close(1000, 'server close connection.') } } exceptionCaught(ctx: any, cause: Error) { diff --git a/packages/websocket/src/client/netty/index.ts b/packages/websocket/src/client/netty/index.ts index 1a79595e..e55ebb48 100644 --- a/packages/websocket/src/client/netty/index.ts +++ b/packages/websocket/src/client/netty/index.ts @@ -85,10 +85,7 @@ export class NettyWebSocket extends Transport { console.debug(`constructor NettyWebSocket url: ${url} scheme: ${this._schema} host: ${this._host} port: ${this._port} header: ${JSON.stringify(headers)}`) } getId() { - if (this.channel?.id) { - return this.channel?.id() + '' - } - return 'NettyWebSocket#' + channelCount.incrementAndGet() + return `${this.channel?.id()}` || `NettyWebSocket#${channelCount.incrementAndGet()}` } doConnect() { console.debug('client NettyWebSocket doConnect', this._url) @@ -147,8 +144,9 @@ export class NettyWebSocket extends Transport { } doClose(code: number, reason: string) { this.channel.writeAndFlush(new CloseWebSocketFrame()) - this.channel.close() - this.channel.closeFuture().addListener(new ChannelFutureListener(() => console.debug(`NettyWebSocket close code: ${code} reason: ${reason}`))) + this.channel.closeFuture().addListener(new ChannelFutureListener(() => { + this.onclose({ code, reason }) + })) } getChannel() { return this.channel diff --git a/packages/websocket/src/client/transport.ts b/packages/websocket/src/client/transport.ts index 8cfc9aa7..ff666f03 100644 --- a/packages/websocket/src/client/transport.ts +++ b/packages/websocket/src/client/transport.ts @@ -1,6 +1,6 @@ import { EventEmitter } from 'events' import { WebSocket } from './index' -import { CloseEvent, ErrorEvent, Event, MessageEvent, WebSocketHeader } from './interface' +import { ClientEvent, CloseEvent, ErrorEvent, Event, MessageEvent, WebSocketHeader } from './interface' export abstract class Transport extends EventEmitter { protected _url: string @@ -32,7 +32,6 @@ export abstract class Transport extends EventEmitter { } connect() { - console.debug(`client Transport connect`) try { this.doConnect() } catch (error: any) { @@ -49,19 +48,16 @@ export abstract class Transport extends EventEmitter { } } - close(code: number = 0, reason: string = '') { - if (this.readyStatus != WebSocket.CLOSING && this.readyStatus != WebSocket.CLOSED) { + close(code: number = 1000, reason: string = '') { + if (this.readyStatus < WebSocket.CLOSING) { this.readyStatus = WebSocket.CLOSING try { - this.onclose({ code, reason }) this.doClose(code, reason) } catch (error: any) { this.onerror({ error }) - } finally { - this.removeAllListeners() } } else { - console.debug(`${this.id} call close but state is ${this.readyStatus}`) + console.debug(`WebSocket Transport ${this.id} call close code ${code} reason ${reason} but state is ${this.readyStatus}`) } } @@ -73,31 +69,31 @@ export abstract class Transport extends EventEmitter { onconnect(event: Event) { if (this.readyStatus != WebSocket.OPEN) { this.readyStatus = WebSocket.OPEN - this.emit('open', event) + this.emit(ClientEvent.open, event) } else { - console.debug(`${this.id} call onconnect but state is ${this.readyStatus}`) + console.debug(`WebSocket Transport ${this.id} call onconnect but state is ${this.readyStatus}`) } } onmessage(event: MessageEvent) { - this.emit('message', event) + this.emit(ClientEvent.message, event) } onerror(event: ErrorEvent) { - this.emit('error', event) + this.emit(ClientEvent.error, event) } onclose(event: CloseEvent) { if (this.readyStatus != WebSocket.CLOSED) { this.readyStatus = WebSocket.CLOSED - this.emit('close', event) - this.removeAllListeners() + this.emit(ClientEvent.close, event) } else { - console.debug(`${this.id} call onclose but state is ${this.readyStatus} CloseEvent[code: ${event.code}, reason: ${event.reason}]`) + console.debug(`WebSocket Transport ${this.id} call onclose but state is ${this.readyStatus} CloseEvent[code: ${event.code}, reason: ${event.reason}]`) } } - abstract getId() - abstract doConnect() - abstract doSend(text: string) - abstract doClose(code: number, reason: string) + + abstract getId(): string + abstract doConnect(): void + abstract doSend(text: string): void + abstract doClose(code: number, reason: string): void } diff --git a/packages/websocket/src/index.ts b/packages/websocket/src/index.ts index d401b77c..0fcc876c 100644 --- a/packages/websocket/src/index.ts +++ b/packages/websocket/src/index.ts @@ -1,7 +1,7 @@ /// /// -import * as server from './server' +import { WebSocketServer } from './server' import { Server, ServerOptions } from './socket.io' interface SocketIOStatic { @@ -45,7 +45,7 @@ let singletonServer: Server let io: SocketStatic = function io(pipeline: any, options: Partial, singleton = true) { if (singleton) { if (!singletonServer) { - singletonServer = new Server(server.attach(pipeline, options), options) + singletonServer = new Server(WebSocketServer.attach(pipeline, options), options) process.emit('websocket.create', singletonServer) process.on('exit', () => { singletonServer.close() @@ -53,7 +53,7 @@ let io: SocketStatic = function io(pipeline: any, options: Partial + + public static attach(instance, options) { + if (!instance) { throw new Error('instance can\'t be undefiend!') } + options = Object.assign({ + event: new EventEmitter(), + path: '/ws', + root: root + Java.type("java.io.File").separatorChar + 'wwwroot', + }, options) + let WebSocketServerImpl = undefined + if (instance.class.name.startsWith('io.netty.channel')) { + WebSocketServerImpl = require("./netty").NettyWebSocketServer + } else { + WebSocketServerImpl = require("./tomcat").TomcatWebSocketServer + } + console.debug('create websocket server from ' + WebSocketServerImpl.name) + return new WebSocketServerImpl(instance, options) + } + constructor(instance: any, options: JavaServerOptions) { super() this.instance = instance @@ -69,20 +87,3 @@ export abstract class WebSocketServer extends EventEmitter { protected abstract getSocket(handler: any): WebSocketClient protected abstract doClose(): void } - -export const attach = (instance, options) => { - if (!instance) { throw new Error('instance can\'t be undefiend!') } - options = Object.assign({ - event: new EventEmitter(), - path: '/ws', - root: root + Java.type("java.io.File").separatorChar + 'wwwroot', - }, options) - let WebSocketServerImpl = undefined - if (instance.class.name.startsWith('io.netty.channel')) { - WebSocketServerImpl = require("./netty").NettyWebSocketServer - } else { - WebSocketServerImpl = require("./tomcat").TomcatWebSocketServer - } - console.debug('create websocket server from ' + WebSocketServerImpl.name) - return new WebSocketServerImpl(instance, options) -} diff --git a/script/push.sh b/script/push.sh index 6b93bdc8..89351360 100755 --- a/script/push.sh +++ b/script/push.sh @@ -1,7 +1,8 @@ #!/bin/bash +DISTTAG=${1:latest} for package in `ls packages`; do echo $package pushd packages/$package - npm publish --access=public --registry https://registry.npmjs.org + npm publish --tag ${DISTTAG} --access=public --registry https://registry.npmjs.org popd done