diff --git a/packages/websocket/src/client/index.ts b/packages/websocket/src/client/index.ts index 7cfbb9d9..7dd1339b 100644 --- a/packages/websocket/src/client/index.ts +++ b/packages/websocket/src/client/index.ts @@ -61,7 +61,7 @@ export class WebSocket extends EventEmitter { manager.del(this) }) this.client.on('error', (event) => this.onerror?.(event)) - setTimeout(() => this.client.connect(), 20) + this.client.connect() } get id() { return this.client.id diff --git a/packages/websocket/src/client/netty/handler.ts b/packages/websocket/src/client/netty/handler.ts index b47ea567..308013f9 100644 --- a/packages/websocket/src/client/netty/handler.ts +++ b/packages/websocket/src/client/netty/handler.ts @@ -20,15 +20,15 @@ export class WebSocketClientHandler extends WebSocketClientHandlerAdapter { return true } handlerAdded(ctx: any) { - console.trace(`${ctx} handlerAdded`) + console.debug(`${ctx} handlerAdded`) this.handshakeFuture = ctx.newPromise() } channelActive(ctx: any) { - console.trace(`${ctx} channelActive`) + console.debug(`${ctx} channelActive`) this.handshaker.handshake(ctx.channel()) } channelInactive(ctx: any) { - console.trace(`${ctx} channelInactive`) + console.debug(`${ctx} channelInactive`) this.client.onclose({ code: 0, reason: 'server connection channel inactive!' }) } channelRead0(ctx: any, msg: any) { @@ -54,7 +54,7 @@ export class WebSocketClientHandler extends WebSocketClientHandlerAdapter { } } exceptionCaught(ctx: any, cause: Error) { - console.trace(`${ctx} exceptionCaught ${cause}`) + console.debug(`${ctx} exceptionCaught ${cause}`) this.client.onerror({ error: cause }) if (!this.handshakeFuture.isDone()) { this.handshakeFuture.setFailure(cause) diff --git a/packages/websocket/src/client/netty/index.ts b/packages/websocket/src/client/netty/index.ts index cf50e671..9aea6470 100644 --- a/packages/websocket/src/client/netty/index.ts +++ b/packages/websocket/src/client/netty/index.ts @@ -25,22 +25,51 @@ const CloseWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.Cl const ChannelInitializer = Java.type('io.netty.channel.ChannelInitializer') const DefaultHttpHeaders = Java.type('io.netty.handler.codec.http.DefaultHttpHeaders') +const SslContextBuilder = Java.type('io.netty.handler.ssl.SslContextBuilder') +const InsecureTrustManagerFactory = Java.type('io.netty.handler.ssl.util.InsecureTrustManagerFactory') + const epull = Epoll.isAvailable() const group = epull ? new EpollEventLoopGroup() : new NioEventLoopGroup() const socketChannelClass = epull ? EpollSocketChannel.class : NioSocketChannel.class process.on('exit', () => group.shutdownGracefully()) export class NettyWebSocket extends Transport { + private _uri: any + private _schema: string + private _host: string + private _port: number private channel: any private b = new Bootstrap(); constructor(url: string, subProtocol: string = '', headers: WebSocketHeader = {}) { super(url, subProtocol, headers) + if (!url) { + throw new Error("Failed to construct 'WebSocket': The URL '" + url + "' is invalid.") + } + this._uri = URI.create(this._url) + this._schema = this._uri.getScheme() ?? 'ws' + if (["wss", "ws"].indexOf(this._schema) == -1) { + throw new Error("Failed to construct 'WebSocket': The URL's scheme must be either 'ws' or 'wss'. '" + this._schema + "' is not allowed.") + } + this._host = this._uri.getHost() + if (!this._host) { + throw new Error("Failed to construct 'WebSocket': The Host '" + this._host + "' is invalid.") + } + this._port = this._uri.getPort() + if (this._port == -1) { + if (this._schema == "wss") { + this._port = 443 + } else if (this._schema == "ws") { + this._port = 80 + } + } + console.debug(`constructor NettyWebSocket url: ${url} scheme: ${this._schema} host: ${this._host} port: ${this._port}`) } getId() { return this.channel?.id() + '' } doConnect() { + console.debug('client NettyWebSocket doConnect', this._url) let uri = URI.create(this._url) let headers = new DefaultHttpHeaders() for (const key of Object.getOwnPropertyNames(this._headers || {})) { @@ -56,12 +85,16 @@ export class NettyWebSocket extends Transport { .handler(new ChannelInitializer({ initChannel: (ch: any) => { let pipeline = ch.pipeline() + if (this._schema == "wss") { + let sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build() + pipeline.addLast(sslCtx.newHandler(ch.alloc(), this._host, this._port)) + } pipeline.addLast("http-codec", new HttpClientCodec()) pipeline.addLast("aggregator", new HttpObjectAggregator(65536)) pipeline.addLast("websocket", handler.getHandler()) } })) - this.b.connect(uri.getHost(), uri.getPort()).addListener(new ChannelFutureListener((future: any) => { + this.b.connect(this._host, this._port).addListener(new ChannelFutureListener((future: any) => { this.channel = future.sync().channel() this.onconnection({}) handler.handshakeFuture.addListener(new ChannelFutureListener((future: any) => { @@ -69,7 +102,7 @@ export class NettyWebSocket extends Transport { future.sync() this.onconnect({}) } catch (error) { - console.debug(error) + this.onerror({ error }) } })) }))