From 9cfac1672abb05c379862b1ae63fe78f4aa517b6 Mon Sep 17 00:00:00 2001 From: MiaoWoo Date: Tue, 8 Aug 2023 18:06:34 +0800 Subject: [PATCH] feat: optimize websocket client --- README.MD | 47 +++++++------- packages/api/src/command.ts | 37 +++++------ packages/nodejs/src/events/index.ts | 11 +++- packages/nodejs/src/fs/index.ts | 3 +- packages/polyfill/src/node-shim.ts | 9 +-- packages/websocket/src/client/index.ts | 43 +++++++----- .../websocket/src/client/netty/handler.ts | 28 +++++--- packages/websocket/src/client/netty/index.ts | 51 ++++++++------- packages/websocket/src/client/transport.ts | 65 ++++++++++++------- 9 files changed, 170 insertions(+), 124 deletions(-) diff --git a/README.MD b/README.MD index 3e34cc02..9a164e4e 100644 --- a/README.MD +++ b/README.MD @@ -8,27 +8,28 @@ ```txt └─packages - ├─api 全平台兼容的接口 - ├─core 核心代码 用于引导加载 - ├─common 公共类库代码 例如 http reflect 模块 - ├─client NodeJS的Minecraft客户端 用于调试插件 - ├─container IOC容器 用于注入具体实现 - ├─ployfill Nashorn 的一些自定义增强 - ├─nashorn Nashorn 的类型定义 - ├─bungee BungeeCordAPI内部实现 - ├─bukkit BukkitAPI内部实现 - ├─sponge SpongeAPI内部实现 - ├─nukkit NukkitAPI内部实现 - ├─plugin 插件管理器 - ├─websocket Netty的WebSocket注入 - ├─type Java的类型定义 - | ├─bungee BungeeCord类型定义 - | ├─bukkit Bukkit类型定义 - | ├─sponge Sponge类型定义 - | └─nukkit Nukkit类型定义 - └─plugins 这里当然是插件啦 - ├─bungee 只兼容BungeeCord的插件 - ├─bukkit 只兼容Bukkit的插件 - ├─sponge 只兼容Sponge的插件 - └─nukkit 只兼容Nukkit的插件 + ├─api 全平台兼容的接口 + ├─core 核心代码 用于引导加载 + ├─common 公共类库代码 例如 http reflect 模块 + ├─compile 编译器相关功能 + ├─client NodeJS 的 Minecraft 客户端 已迁移至 ms-client + ├─container IOC容器 用于注入具体实现 + ├─database 数据库相关功能 + ├─protocol 协议处理相关功能 + ├─service 服务相关功能 + ├─i18n 多语言环境相关支持 + ├─polyfill Nashorn 的一些自定义增强 + ├─nashorn Nashorn 的类型定义 + ├─nodejs NodeJS 的部分 Java 实现 + ├─bungee BungeeCordAPI 内部实现 + ├─bukkit BukkitAPI 内部实现 + ├─sponge SpongeAPI 内部实现 + ├─nukkit NukkitAPI 内部实现 + ├─molang MoLang 解析库 + ├─qrcode 二维码相关类库 + ├─plugin 插件管理器 + ├─websocket WebSocket 相关实现 + | ├─client 基于 Netty 的 WebSocket 客户端 + | └─server 基于 Netty 的 WebSocket 服务端 + └─type 类型定义 已迁移到 @javatypes ``` diff --git a/packages/api/src/command.ts b/packages/api/src/command.ts index 0f2eb3f3..3eb035a4 100644 --- a/packages/api/src/command.ts +++ b/packages/api/src/command.ts @@ -5,6 +5,13 @@ import { plugin } from './plugin' export namespace command { @injectable() export abstract class Command { + /** + * first time script engine need optimize jit code + * so ignore first slow exec notify + */ + private cacheSlowCommandKey = {}; + private cacheSlowCompleteKey = {}; + /** * 注册插件命令 * @param plugin 插件 @@ -48,6 +55,8 @@ export namespace command { let result = executor(sender, command, Java.from(args)) let cost = Date.now() - time if (cost > global.ScriptSlowExecuteTime) { + let commandKey = `${plugin.description.name}-${command}-${sender.name}` + if (!this.cacheSlowCommandKey[commandKey]) { return this.cacheSlowCommandKey[commandKey] = cost } console.i18n("ms.api.command.execute.slow", { player: sender.name, plugin: plugin.description.name, @@ -58,23 +67,17 @@ export namespace command { } return result } catch (ex: any) { - console.i18n("ms.api.command.execute.error", { + let message = i18n.translate("ms.api.command.execute.error", { player: sender.name, plugin: plugin.description.name, command, args: Java.from(args).join(' '), ex }) + console.console(message) console.ex(ex) if (sender.name != 'CONSOLE') { - console.sender(sender, [i18n.translate("ms.api.command.execute.error", { - player: sender.name, - plugin: plugin.description.name, - command, - args: Java.from(args).join(' '), - ex - }), - ...console.stack(ex)]) + console.sender(sender, [message, ...console.stack(ex)]) } return true } @@ -89,6 +92,8 @@ export namespace command { let result = this.copyPartialMatches(complete, token) let cost = Date.now() - time if (cost > global.ScriptSlowExecuteTime) { + let completerKey = `${plugin.description.name}-${command}-${sender.name}` + if (!this.cacheSlowCompleteKey[completerKey]) { return this.cacheSlowCompleteKey[completerKey] = cost } console.i18n("ms.api.command.tab.completer.slow", { player: sender.name, plugin: plugin.description.name, @@ -99,25 +104,17 @@ export namespace command { } return result } catch (ex: any) { - console.i18n("ms.api.command.tab.completer.error", { + let message = i18n.translate("ms.api.command.tab.completer.error", { player: sender.name, plugin: plugin.description.name, command, args: Java.from(args).join(' '), ex }) + console.console(message) console.ex(ex) if (sender.name != 'CONSOLE') { - console.sender(sender, [ - i18n.translate("ms.api.command.tab.completer.error", { - player: sender.name, - plugin: plugin.description.name, - command, - args: Java.from(args).join(' '), - ex - }), - ...console.stack(ex) - ]) + console.sender(sender, [message, ...console.stack(ex)]) } return [] } diff --git a/packages/nodejs/src/events/index.ts b/packages/nodejs/src/events/index.ts index 92a3e71a..eef9a117 100644 --- a/packages/nodejs/src/events/index.ts +++ b/packages/nodejs/src/events/index.ts @@ -20,6 +20,7 @@ // USE OR OTHER DEALINGS IN THE SOFTWARE. 'use strict'; +var Throwable = Java.type('java.lang.Throwable') var R = typeof Reflect === 'object' ? Reflect : null var ReflectApply = R && typeof R.apply === 'function' ? R.apply @@ -136,13 +137,19 @@ EventEmitter.prototype.emit = function emit(type) { var er; if (args.length > 0) er = args[0]; - if (er instanceof Error) { + if (er instanceof Error || er instanceof Throwable) { // Note: The comments on the `throw` lines are intentional, they show // up in Node's output if this results in an unhandled exception. throw er; // Unhandled 'error' event } + if (er.error instanceof Error || er.error instanceof Throwable) { + throw er.error; // Unhandled 'error' event + } + if (er.cause instanceof Error || er.error instanceof Throwable) { + throw er.error; // Unhandled 'error' event + } // At least give some kind of context to the user - var err = new Error('Unhandled error.' + (er ? ' (' + er.message + ')' : '')); + var err = new Error('Unhandled error.' + (er ? ' (' + (er.message || er.error || er.cause || er) + ')' : '')); // @ts-ignore err.context = er; throw err; // Unhandled 'error' event diff --git a/packages/nodejs/src/fs/index.ts b/packages/nodejs/src/fs/index.ts index ae86524e..12fe54df 100644 --- a/packages/nodejs/src/fs/index.ts +++ b/packages/nodejs/src/fs/index.ts @@ -5,6 +5,7 @@ const Path = Java.type("java.nio.file.Path"); const JavaString = Java.type("java.lang.String"); const File = Java.type("java.io.File"); const Files = Java.type("java.nio.file.Files"); +const Paths = Java.type("java.nio.file.Paths"); const Collector = Java.type("java.util.stream.Collector") const separatorChar = File.separatorChar; const StandardCopyOption = Java.type("java.nio.file.StandardCopyOption"); @@ -37,7 +38,7 @@ function javaFile(...opts: any[]) { } export function renameSync(oldPath: PathLike, newPath: PathLike): void { - + Files.move(Paths.get(oldPath), Paths.get(oldPath), StandardCopyOption['ATOMIC_MOVE']) } export function truncateSync() { diff --git a/packages/polyfill/src/node-shim.ts b/packages/polyfill/src/node-shim.ts index 5fdfb82f..9837a9ef 100644 --- a/packages/polyfill/src/node-shim.ts +++ b/packages/polyfill/src/node-shim.ts @@ -126,6 +126,7 @@ class Process extends EventEmitter { } class EventLoop { + private threadCount = new AtomicInteger(0) private eventLoopMainThread = undefined private eventLoopTaskQueue = new DelayQueue() private taskExecuteTimeout = 3000 @@ -134,10 +135,10 @@ class EventLoop { constructor() { this.taskExecuteTimeout = parseInt(process.env.MS_TASK_EXECUTE_TIMEOUT) || 3000 this.fixedThreadPool = new ThreadPoolExecutor( - 1, 1, 0, TimeUnit.SECONDS, + 8, 16, 0, TimeUnit.SECONDS, new LinkedBlockingQueue(1024), new ThreadFactory((run: any) => { - let thread = new Thread(run, "@ccms/event-loop") + let thread = new Thread(run, "@ccms/event-loop-" + this.threadCount.incrementAndGet()) thread.setDaemon(true) return thread })) @@ -198,7 +199,7 @@ class EventLoop { try { callback.apply(undefined, args) } catch (cause: any) { - cause = cause.getCause && cause.getCause() || cause + cause = cause.getCause ? cause.getCause() : cause try { process.emit('error', cause) } catch (error: any) { @@ -213,7 +214,7 @@ class EventLoop { return console.warn(`FixedThreadPool isInterrupted exit! Task ${name} exec exit!`) } if (error instanceof TimeoutException) { - return console.warn(`Task ${name} => ${callback} exec time greater than ${this.taskExecuteTimeout}s!`) + return console.warn(`Task ${name} => ${callback} exec time greater than ${this.taskExecuteTimeout}ms!`) } throw error.getCause && error.getCause() || error } diff --git a/packages/websocket/src/client/index.ts b/packages/websocket/src/client/index.ts index 6753beea..287fccca 100644 --- a/packages/websocket/src/client/index.ts +++ b/packages/websocket/src/client/index.ts @@ -28,12 +28,14 @@ export class WebSocketManager { export const manager = new WebSocketManager() export class WebSocket extends EventEmitter { + public static manager: WebSocketManager = manager + public static CONNECTING = 0 public static OPEN = 1 public static CLOSING = 2 public static CLOSED = 3 + public binaryType: 'blob' | 'arraybuffer' - protected manager: WebSocketManager protected _url: string protected _headers: WebSocketHeader = {} @@ -42,7 +44,6 @@ export class WebSocket extends EventEmitter { constructor(url: string, subProtocol: string | string[] = '', headers: WebSocketHeader = {}) { super() - this.manager = manager this._url = url this._headers = headers try { @@ -54,7 +55,14 @@ export class WebSocket extends EventEmitter { console.ex(error) return } + // mamanger connected client manager.add(this) + this.client.on(ClientEvent.close, (_) => manager.del(this)) + // add event forward + this.client.on(ClientEvent.open, (event) => this.onopen?.(event)) + this.client.on(ClientEvent.message, (event) => this.onmessage?.(event)) + this.client.on(ClientEvent.close, (event) => this.onclose?.(event)) + this.client.on(ClientEvent.error, (event) => this.onerror?.(event)) setTimeout(() => this.client.connect(), 20) } get id() { @@ -70,30 +78,31 @@ export class WebSocket extends EventEmitter { return this.client.protocol } get readyState() { - return this.client.readyStatus + return this.client.readyState } get url() { return this._url } - set onopen(func: (event: Event) => void) { - this.client.on(ClientEvent.open, func) + + public onopen: (event: Event) => void + public onmessage: (event: MessageEvent) => void + public onclose: (event: CloseEvent) => void + public onerror: (event: ErrorEvent) => void + + public on(eventName: string | symbol, listener: (...args: any[]) => void): this { + this.client.on(eventName, listener) + return this } - set onmessage(func: (event: MessageEvent) => void) { - this.client.on(ClientEvent.message, func) - } - set onclose(func: (event: CloseEvent) => void) { - this.client.on(ClientEvent.close, func) - manager.del(this) - } - set onerror(func: (event: ErrorEvent) => void) { - this.client.on(ClientEvent.error, func) + public emit(eventName: string | symbol, ...args: any[]): boolean { + return this.client.emit(eventName, ...args) } + public send(data: any) { - this.client.send(data) + return this.client.send(data) } + public close(code?: number, reason?: string) { - this.client.close(code, reason) - manager.del(this) + return this.client.close(code, reason) } } global.setGlobal('WebSocket', WebSocket) diff --git a/packages/websocket/src/client/netty/handler.ts b/packages/websocket/src/client/netty/handler.ts index 5c418fbe..6a3219cc 100644 --- a/packages/websocket/src/client/netty/handler.ts +++ b/packages/websocket/src/client/netty/handler.ts @@ -1,6 +1,9 @@ import { NettyWebSocket } from '.' import { WebSocketClientHandlerAdapter } from './adapter/handler' +const Throwable = Java.type('java.lang.Throwable') +const RuntimeException = Java.type('java.lang.RuntimeException') + const CharsetUtil = Java.type('io.netty.util.CharsetUtil') const TextWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.TextWebSocketFrame') const CloseWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.CloseWebSocketFrame') @@ -20,6 +23,7 @@ export class WebSocketClientHandler extends WebSocketClientHandlerAdapter { return true } handlerAdded(ctx: any) { + this.client.onconnection({}) if (ctx.newPromise) { this.handshakeFuture = ctx.newPromise() } else { @@ -28,18 +32,21 @@ export class WebSocketClientHandler extends WebSocketClientHandlerAdapter { } channelActive(ctx: any) { this.handshaker.handshake(ctx.channel()) + setTimeout(() => { + this.abortHandshake(new Error('handshake timed out.')) + }, 10000) } channelInactive(ctx: any) { - if (this.client.readyStatus != WebSocket.CLOSED) { - this.client.onclose({ code: 1006, reason: 'client connection channel inactive.' }) - } + this.client.close(1006, 'connection was closed abnormally.', true) } channelRead0(ctx: any, msg: any) { let ch = ctx.channel() if (!this.handshaker.isHandshakeComplete()) { - // web socket client connected + console.debug(`Netty Handler channelRead0 websocket client connected`) + // websocket client connected this.handshaker.finishHandshake(ch, msg) this.handshakeFuture.setSuccess() + this.client.onconnect({}) return } @@ -52,14 +59,19 @@ export class WebSocketClientHandler extends WebSocketClientHandlerAdapter { if (frame instanceof TextWebSocketFrame) { this.client.onmessage({ data: frame.text() }) } else if (frame instanceof CloseWebSocketFrame) { - this.client.close(1000, 'server close connection.') + this.client.receiverClose(frame.statusCode(), frame.reasonText()) } } + abortHandshake(reason: Error) { + if (this.handshakeFuture.isDone()) { return } + if (!(reason instanceof Throwable)) { + reason = new RuntimeException(reason) + } + this.handshakeFuture.setFailure(reason) + } exceptionCaught(ctx: any, cause: Error) { console.debug(`${ctx} exceptionCaught ${cause}`) + this.client.abortHandshake(cause) this.client.onerror({ error: cause }) - if (!this.handshakeFuture.isDone()) { - this.handshakeFuture.setFailure(cause) - } } } diff --git a/packages/websocket/src/client/netty/index.ts b/packages/websocket/src/client/netty/index.ts index e55ebb48..f6585300 100644 --- a/packages/websocket/src/client/netty/index.ts +++ b/packages/websocket/src/client/netty/index.ts @@ -60,6 +60,8 @@ export class NettyWebSocket extends Transport { private channel: any private b: any + private handler: any + constructor(url: string, subProtocol: string = '', headers: WebSocketHeader = {}) { super(url, subProtocol, headers) if (!url) { @@ -88,7 +90,6 @@ export class NettyWebSocket extends Transport { return `${this.channel?.id()}` || `NettyWebSocket#${channelCount.incrementAndGet()}` } doConnect() { - console.debug('client NettyWebSocket doConnect', this._url) let uri = URI.create(this._url) let headers = new DefaultHttpHeaders() for (const key of Object.getOwnPropertyNames(this._headers || {})) { @@ -97,7 +98,7 @@ export class NettyWebSocket extends Transport { // Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00. // If you change it to V00, ping is not supported and remember to change // HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline. - let handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory + this.handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory .newHandshaker(uri, WebSocketVersion.V13, null, false, headers), this) this.b = new Bootstrap() this.b.group(group) @@ -108,7 +109,7 @@ export class NettyWebSocket extends Transport { if (this._schema == "wss") { if (SslContextBuilder) { let sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build() - pipeline.addLast(sslCtx.newHandler(ch.alloc(), this._host, this._port)) + pipeline.addLast('ssl', sslCtx.newHandler(ch.alloc(), this._host, this._port)) } else { let sslEngine = SSLContext.getDefault().createSSLEngine() sslEngine.setUseClientMode(true) @@ -117,37 +118,37 @@ export class NettyWebSocket extends Transport { } pipeline.addLast("http-codec", new HttpClientCodec()) pipeline.addLast("aggregator", new HttpObjectAggregator(65536)) - pipeline.addLast("websocket", handler.getHandler()) + pipeline.addLast("websocket", this.handler.getHandler()) } })) - this.b.connect(this._host, this._port).addListener(new ChannelFutureListener((future: any) => { - try { - this.channel = future.sync().channel() - this.onconnection({}) - handler.handshakeFuture.addListener(new ChannelFutureListener((future: any) => { - try { - future.sync() - // only trigger onconnect when not have error - this.onconnect({}) - } catch (error: any) { - // ignore error exceptionCaught from handler - // this.onerror({ error }) - } - })) - } catch (error: any) { - this.onerror({ error }) - } - })) + try { + this.channel = this.b.connect(this._host, this._port).sync().channel() + this.handler.handshakeFuture.sync() + } catch (error) { + // ignore connect error + // tigger error at handshakeFuture + } } doSend(text: string) { this.channel.writeAndFlush(new TextWebSocketFrame(text)) } - doClose(code: number, reason: string) { - this.channel.writeAndFlush(new CloseWebSocketFrame()) - this.channel.closeFuture().addListener(new ChannelFutureListener(() => { + doClose(code: number, reason: string, wasClean: boolean = false) { + console.debug(`Netty Client doClose code: ${code} reason: ${reason}`) + if (this.readyState == WebSocket.CLOSING) { + if (!this._closeFrameSent) { + console.debug(`Netty Client doClose send close frame`) + this.channel?.writeAndFlush(new CloseWebSocketFrame(code, reason)) + this._closeFrameSent = true + } + if (!this._closeFrameReceived && !wasClean) { return console.debug(`Netty Client doClose wait server send close`) } + } + this.channel?.closeFuture().addListener(new ChannelFutureListener(() => { this.onclose({ code, reason }) })) } + abortHandshake(reason: Error): void { + this.handler.abortHandshake(reason) + } getChannel() { return this.channel } diff --git a/packages/websocket/src/client/transport.ts b/packages/websocket/src/client/transport.ts index ff666f03..d28bfde4 100644 --- a/packages/websocket/src/client/transport.ts +++ b/packages/websocket/src/client/transport.ts @@ -8,6 +8,9 @@ export abstract class Transport extends EventEmitter { protected _protocol: string protected _headers: WebSocketHeader = {} + protected _closeFrameReceived = false; + protected _closeFrameSent = false; + constructor(uri: string, subProtocol: string = '', headers: WebSocketHeader = {}) { super() this._url = uri @@ -23,24 +26,22 @@ export abstract class Transport extends EventEmitter { return this._protocol } - get readyStatus() { + get readyState() { return this._state } - set readyStatus(state: number) { + set readyState(state: number) { this._state = state } connect() { - try { - this.doConnect() - } catch (error: any) { - console.ex(error) - this.onerror({ error }) - } + this.doConnect() } send(text: string) { + if (this.readyState === WebSocket.CONNECTING) { + throw new Error('WebSocket is not open: readyState 0 (CONNECTING)'); + } try { this.doSend(text) } catch (error: any) { @@ -48,16 +49,24 @@ export abstract class Transport extends EventEmitter { } } - close(code: number = 1000, reason: string = '') { - if (this.readyStatus < WebSocket.CLOSING) { - this.readyStatus = WebSocket.CLOSING - try { - this.doClose(code, reason) - } catch (error: any) { - this.onerror({ error }) + close(code: number = 1000, reason: string = '', wasClean: boolean = false) { + if (this.readyState === WebSocket.CLOSED) return; + if (this.readyState === WebSocket.CONNECTING) { + const msg = 'WebSocket was closed before the connection was established'; + this.abortHandshake(new Error(msg)); + return; + } + if (this.readyState === WebSocket.CLOSING) { + if (this._closeFrameSent && this._closeFrameReceived) { + this.onclose({ code, reason }); } - } else { - console.debug(`WebSocket Transport ${this.id} call close code ${code} reason ${reason} but state is ${this.readyStatus}`) + return; + } + this.readyState = WebSocket.CLOSING + try { + this.doClose(code, reason, wasClean) + } catch (error: any) { + this.onerror({ error }) } } @@ -67,11 +76,11 @@ export abstract class Transport extends EventEmitter { } onconnect(event: Event) { - if (this.readyStatus != WebSocket.OPEN) { - this.readyStatus = WebSocket.OPEN + if (this.readyState != WebSocket.OPEN) { + this.readyState = WebSocket.OPEN this.emit(ClientEvent.open, event) } else { - console.debug(`WebSocket Transport ${this.id} call onconnect but state is ${this.readyStatus}`) + console.debug(`WebSocket Transport ${this.id} call onconnect but state is ${this.readyState}`) } } @@ -84,16 +93,24 @@ export abstract class Transport extends EventEmitter { } onclose(event: CloseEvent) { - if (this.readyStatus != WebSocket.CLOSED) { - this.readyStatus = WebSocket.CLOSED + console.debug(`WebSocket Transport ${this.id} call onclose CloseEvent[code: ${event.code}, reason: ${event.reason}]`) + if (this.readyState != WebSocket.CLOSED) { + this.readyState = WebSocket.CLOSED this.emit(ClientEvent.close, event) } else { - console.debug(`WebSocket Transport ${this.id} call onclose but state is ${this.readyStatus} CloseEvent[code: ${event.code}, reason: ${event.reason}]`) + console.debug(`WebSocket Transport ${this.id} call onclose but state is ${this.readyState} CloseEvent[code: ${event.code}, reason: ${event.reason}]`) } } + receiverClose(code: number, reason: string) { + console.debug(`Netty Handler receeve close code: ${code} reason: ${reason}`) + this._closeFrameReceived = true; + this.close(code, reason) + } + abstract getId(): string abstract doConnect(): void abstract doSend(text: string): void - abstract doClose(code: number, reason: string): void + abstract doClose(code: number, reason: string, wasClean?: boolean): void + abstract abortHandshake(reason: Error): void }