@@ -61,7 +61,7 @@ export class WebSocket extends EventEmitter {
 | 
			
		||||
            manager.del(this)
 | 
			
		||||
        })
 | 
			
		||||
        this.client.on('error', (event) => this.onerror?.(event))
 | 
			
		||||
        setTimeout(() => this.client.connect(), 20)
 | 
			
		||||
        this.client.connect()
 | 
			
		||||
    }
 | 
			
		||||
    get id() {
 | 
			
		||||
        return this.client.id
 | 
			
		||||
 
 | 
			
		||||
@@ -20,15 +20,15 @@ export class WebSocketClientHandler extends WebSocketClientHandlerAdapter {
 | 
			
		||||
        return true
 | 
			
		||||
    }
 | 
			
		||||
    handlerAdded(ctx: any) {
 | 
			
		||||
        console.trace(`${ctx} handlerAdded`)
 | 
			
		||||
        console.debug(`${ctx} handlerAdded`)
 | 
			
		||||
        this.handshakeFuture = ctx.newPromise()
 | 
			
		||||
    }
 | 
			
		||||
    channelActive(ctx: any) {
 | 
			
		||||
        console.trace(`${ctx} channelActive`)
 | 
			
		||||
        console.debug(`${ctx} channelActive`)
 | 
			
		||||
        this.handshaker.handshake(ctx.channel())
 | 
			
		||||
    }
 | 
			
		||||
    channelInactive(ctx: any) {
 | 
			
		||||
        console.trace(`${ctx} channelInactive`)
 | 
			
		||||
        console.debug(`${ctx} channelInactive`)
 | 
			
		||||
        this.client.onclose({ code: 0, reason: 'server connection channel inactive!' })
 | 
			
		||||
    }
 | 
			
		||||
    channelRead0(ctx: any, msg: any) {
 | 
			
		||||
@@ -54,7 +54,7 @@ export class WebSocketClientHandler extends WebSocketClientHandlerAdapter {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
    exceptionCaught(ctx: any, cause: Error) {
 | 
			
		||||
        console.trace(`${ctx} exceptionCaught ${cause}`)
 | 
			
		||||
        console.debug(`${ctx} exceptionCaught ${cause}`)
 | 
			
		||||
        this.client.onerror({ error: cause })
 | 
			
		||||
        if (!this.handshakeFuture.isDone()) {
 | 
			
		||||
            this.handshakeFuture.setFailure(cause)
 | 
			
		||||
 
 | 
			
		||||
@@ -25,22 +25,51 @@ const CloseWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.Cl
 | 
			
		||||
const ChannelInitializer = Java.type('io.netty.channel.ChannelInitializer')
 | 
			
		||||
const DefaultHttpHeaders = Java.type('io.netty.handler.codec.http.DefaultHttpHeaders')
 | 
			
		||||
 | 
			
		||||
const SslContextBuilder = Java.type('io.netty.handler.ssl.SslContextBuilder')
 | 
			
		||||
const InsecureTrustManagerFactory = Java.type('io.netty.handler.ssl.util.InsecureTrustManagerFactory')
 | 
			
		||||
 | 
			
		||||
const epull = Epoll.isAvailable()
 | 
			
		||||
const group = epull ? new EpollEventLoopGroup() : new NioEventLoopGroup()
 | 
			
		||||
const socketChannelClass = epull ? EpollSocketChannel.class : NioSocketChannel.class
 | 
			
		||||
process.on('exit', () => group.shutdownGracefully())
 | 
			
		||||
 | 
			
		||||
export class NettyWebSocket extends Transport {
 | 
			
		||||
    private _uri: any
 | 
			
		||||
    private _schema: string
 | 
			
		||||
    private _host: string
 | 
			
		||||
    private _port: number
 | 
			
		||||
    private channel: any
 | 
			
		||||
    private b = new Bootstrap();
 | 
			
		||||
 | 
			
		||||
    constructor(url: string, subProtocol: string = '', headers: WebSocketHeader = {}) {
 | 
			
		||||
        super(url, subProtocol, headers)
 | 
			
		||||
        if (!url) {
 | 
			
		||||
            throw new Error("Failed to construct 'WebSocket': The URL '" + url + "' is invalid.")
 | 
			
		||||
        }
 | 
			
		||||
        this._uri = URI.create(this._url)
 | 
			
		||||
        this._schema = this._uri.getScheme() ?? 'ws'
 | 
			
		||||
        if (["wss", "ws"].indexOf(this._schema) == -1) {
 | 
			
		||||
            throw new Error("Failed to construct 'WebSocket': The URL's scheme must be either 'ws' or 'wss'. '" + this._schema + "' is not allowed.")
 | 
			
		||||
        }
 | 
			
		||||
        this._host = this._uri.getHost()
 | 
			
		||||
        if (!this._host) {
 | 
			
		||||
            throw new Error("Failed to construct 'WebSocket': The Host '" + this._host + "' is invalid.")
 | 
			
		||||
        }
 | 
			
		||||
        this._port = this._uri.getPort()
 | 
			
		||||
        if (this._port == -1) {
 | 
			
		||||
            if (this._schema == "wss") {
 | 
			
		||||
                this._port = 443
 | 
			
		||||
            } else if (this._schema == "ws") {
 | 
			
		||||
                this._port = 80
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        console.debug(`constructor NettyWebSocket url: ${url} scheme: ${this._schema} host: ${this._host} port: ${this._port}`)
 | 
			
		||||
    }
 | 
			
		||||
    getId() {
 | 
			
		||||
        return this.channel?.id() + ''
 | 
			
		||||
    }
 | 
			
		||||
    doConnect() {
 | 
			
		||||
        console.debug('client NettyWebSocket doConnect', this._url)
 | 
			
		||||
        let uri = URI.create(this._url)
 | 
			
		||||
        let headers = new DefaultHttpHeaders()
 | 
			
		||||
        for (const key of Object.getOwnPropertyNames(this._headers || {})) {
 | 
			
		||||
@@ -56,12 +85,16 @@ export class NettyWebSocket extends Transport {
 | 
			
		||||
            .handler(new ChannelInitializer({
 | 
			
		||||
                initChannel: (ch: any) => {
 | 
			
		||||
                    let pipeline = ch.pipeline()
 | 
			
		||||
                    if (this._schema == "wss") {
 | 
			
		||||
                        let sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build()
 | 
			
		||||
                        pipeline.addLast(sslCtx.newHandler(ch.alloc(), this._host, this._port))
 | 
			
		||||
                    }
 | 
			
		||||
                    pipeline.addLast("http-codec", new HttpClientCodec())
 | 
			
		||||
                    pipeline.addLast("aggregator", new HttpObjectAggregator(65536))
 | 
			
		||||
                    pipeline.addLast("websocket", handler.getHandler())
 | 
			
		||||
                }
 | 
			
		||||
            }))
 | 
			
		||||
        this.b.connect(uri.getHost(), uri.getPort()).addListener(new ChannelFutureListener((future: any) => {
 | 
			
		||||
        this.b.connect(this._host, this._port).addListener(new ChannelFutureListener((future: any) => {
 | 
			
		||||
            this.channel = future.sync().channel()
 | 
			
		||||
            this.onconnection({})
 | 
			
		||||
            handler.handshakeFuture.addListener(new ChannelFutureListener((future: any) => {
 | 
			
		||||
@@ -69,7 +102,7 @@ export class NettyWebSocket extends Transport {
 | 
			
		||||
                    future.sync()
 | 
			
		||||
                    this.onconnect({})
 | 
			
		||||
                } catch (error) {
 | 
			
		||||
                    console.debug(error)
 | 
			
		||||
                    this.onerror({ error })
 | 
			
		||||
                }
 | 
			
		||||
            }))
 | 
			
		||||
        }))
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user