@@ -1,4 +1,3 @@
 | 
			
		||||
import { EventEmitter } from 'events'
 | 
			
		||||
import { NettyWebSocket } from '.'
 | 
			
		||||
import { WebSocketClientHandlerAdapter } from './adapter/handler'
 | 
			
		||||
 | 
			
		||||
@@ -6,6 +5,7 @@ const CharsetUtil = Java.type('io.netty.util.CharsetUtil')
 | 
			
		||||
const TextWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.TextWebSocketFrame')
 | 
			
		||||
const CloseWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.CloseWebSocketFrame')
 | 
			
		||||
const FullHttpResponse = Java.type('io.netty.handler.codec.http.FullHttpResponse')
 | 
			
		||||
const DefaultChannelPromise = Java.type('io.netty.channel.DefaultChannelPromise')
 | 
			
		||||
 | 
			
		||||
export class WebSocketClientHandler extends WebSocketClientHandlerAdapter {
 | 
			
		||||
    public handshaker: any
 | 
			
		||||
@@ -21,7 +21,11 @@ export class WebSocketClientHandler extends WebSocketClientHandlerAdapter {
 | 
			
		||||
    }
 | 
			
		||||
    handlerAdded(ctx: any) {
 | 
			
		||||
        console.debug(`${ctx} handlerAdded`)
 | 
			
		||||
        this.handshakeFuture = ctx.newPromise()
 | 
			
		||||
        if (ctx.newPromise) {
 | 
			
		||||
            this.handshakeFuture = ctx.newPromise()
 | 
			
		||||
        } else {
 | 
			
		||||
            this.handshakeFuture = new DefaultChannelPromise(ctx.channel(), ctx.executor())
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
    channelActive(ctx: any) {
 | 
			
		||||
        console.debug(`${ctx} channelActive`)
 | 
			
		||||
 
 | 
			
		||||
@@ -4,16 +4,12 @@ import { Transport } from '../transport'
 | 
			
		||||
import { WebSocketClientHandler } from './handler'
 | 
			
		||||
 | 
			
		||||
const URI = Java.type('java.net.URI')
 | 
			
		||||
const Epoll = Java.type('io.netty.channel.epoll.Epoll')
 | 
			
		||||
const Bootstrap = Java.type('io.netty.bootstrap.Bootstrap')
 | 
			
		||||
const ChannelFutureListener = Java.type('io.netty.channel.ChannelFutureListener')
 | 
			
		||||
 | 
			
		||||
const NioEventLoopGroup = Java.type('io.netty.channel.nio.NioEventLoopGroup')
 | 
			
		||||
const NioSocketChannel = Java.type('io.netty.channel.socket.nio.NioSocketChannel')
 | 
			
		||||
 | 
			
		||||
const EpollEventLoopGroup = Java.type('io.netty.channel.epoll.EpollEventLoopGroup')
 | 
			
		||||
const EpollSocketChannel = Java.type('io.netty.channel.epoll.EpollSocketChannel')
 | 
			
		||||
 | 
			
		||||
const WebSocketClientHandshakerFactory = Java.type('io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory')
 | 
			
		||||
const WebSocketVersion = Java.type('io.netty.handler.codec.http.websocketx.WebSocketVersion')
 | 
			
		||||
 | 
			
		||||
@@ -25,12 +21,35 @@ const CloseWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.Cl
 | 
			
		||||
const ChannelInitializer = Java.type('io.netty.channel.ChannelInitializer')
 | 
			
		||||
const DefaultHttpHeaders = Java.type('io.netty.handler.codec.http.DefaultHttpHeaders')
 | 
			
		||||
 | 
			
		||||
const SslContextBuilder = Java.type('io.netty.handler.ssl.SslContextBuilder')
 | 
			
		||||
const InsecureTrustManagerFactory = Java.type('io.netty.handler.ssl.util.InsecureTrustManagerFactory')
 | 
			
		||||
const AtomicInteger = Java.type("java.util.concurrent.atomic.AtomicInteger")
 | 
			
		||||
const channelCount = new AtomicInteger(0)
 | 
			
		||||
 | 
			
		||||
var SslContextBuilder: any
 | 
			
		||||
var InsecureTrustManagerFactory: any
 | 
			
		||||
var SSLContext: any
 | 
			
		||||
var SslHandler: any
 | 
			
		||||
try {
 | 
			
		||||
    SslContextBuilder = Java.type('io.netty.handler.ssl.SslContextBuilder')
 | 
			
		||||
    InsecureTrustManagerFactory = Java.type('io.netty.handler.ssl.util.InsecureTrustManagerFactory')
 | 
			
		||||
} catch (error) {
 | 
			
		||||
    SSLContext = Java.type('javax.net.ssl.SSLContext')
 | 
			
		||||
    SslHandler = Java.type('io.netty.handler.ssl.SslHandler')
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var group: any
 | 
			
		||||
var socketChannelClass: any
 | 
			
		||||
try {
 | 
			
		||||
    const Epoll = Java.type('io.netty.channel.epoll.Epoll')
 | 
			
		||||
    const epull = Epoll.isAvailable()
 | 
			
		||||
    const EpollEventLoopGroup = Java.type('io.netty.channel.epoll.EpollEventLoopGroup')
 | 
			
		||||
    const EpollSocketChannel = Java.type('io.netty.channel.epoll.EpollSocketChannel')
 | 
			
		||||
    group = epull ? new EpollEventLoopGroup() : new NioEventLoopGroup()
 | 
			
		||||
    socketChannelClass = epull ? EpollSocketChannel.class : NioSocketChannel.class
 | 
			
		||||
} catch (error) {
 | 
			
		||||
    group = new NioEventLoopGroup()
 | 
			
		||||
    socketChannelClass = NioSocketChannel.class
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
const epull = Epoll.isAvailable()
 | 
			
		||||
const group = epull ? new EpollEventLoopGroup() : new NioEventLoopGroup()
 | 
			
		||||
const socketChannelClass = epull ? EpollSocketChannel.class : NioSocketChannel.class
 | 
			
		||||
process.on('exit', () => group.shutdownGracefully())
 | 
			
		||||
 | 
			
		||||
export class NettyWebSocket extends Transport {
 | 
			
		||||
@@ -66,7 +85,10 @@ export class NettyWebSocket extends Transport {
 | 
			
		||||
        console.debug(`constructor NettyWebSocket url: ${url} scheme: ${this._schema} host: ${this._host} port: ${this._port} header: ${JSON.stringify(headers)}`)
 | 
			
		||||
    }
 | 
			
		||||
    getId() {
 | 
			
		||||
        return this.channel?.id() + ''
 | 
			
		||||
        if (this.channel?.id) {
 | 
			
		||||
            return this.channel?.id() + ''
 | 
			
		||||
        }
 | 
			
		||||
        return 'NettyWebSocket#' + channelCount.incrementAndGet()
 | 
			
		||||
    }
 | 
			
		||||
    doConnect() {
 | 
			
		||||
        console.debug('client NettyWebSocket doConnect', this._url)
 | 
			
		||||
@@ -86,8 +108,14 @@ export class NettyWebSocket extends Transport {
 | 
			
		||||
                initChannel: (ch: any) => {
 | 
			
		||||
                    let pipeline = ch.pipeline()
 | 
			
		||||
                    if (this._schema == "wss") {
 | 
			
		||||
                        let sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build()
 | 
			
		||||
                        pipeline.addLast(sslCtx.newHandler(ch.alloc(), this._host, this._port))
 | 
			
		||||
                        if (SslContextBuilder) {
 | 
			
		||||
                            let sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build()
 | 
			
		||||
                            pipeline.addLast(sslCtx.newHandler(ch.alloc(), this._host, this._port))
 | 
			
		||||
                        } else {
 | 
			
		||||
                            let sslEngine = SSLContext.getDefault().createSSLEngine()
 | 
			
		||||
                            sslEngine.setUseClientMode(true)
 | 
			
		||||
                            pipeline.addLast("ssl", new SslHandler(sslEngine))
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                    pipeline.addLast("http-codec", new HttpClientCodec())
 | 
			
		||||
                    pipeline.addLast("aggregator", new HttpObjectAggregator(65536))
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user