feat: 优化WebSocket客户端 支持WSS

Signed-off-by: MiaoWoo <admin@yumc.pw>
This commit is contained in:
MiaoWoo 2021-08-05 14:35:17 +08:00
parent 2fd10140cf
commit cabf09e247
3 changed files with 40 additions and 7 deletions

View File

@ -61,7 +61,7 @@ export class WebSocket extends EventEmitter {
manager.del(this) manager.del(this)
}) })
this.client.on('error', (event) => this.onerror?.(event)) this.client.on('error', (event) => this.onerror?.(event))
setTimeout(() => this.client.connect(), 20) this.client.connect()
} }
get id() { get id() {
return this.client.id return this.client.id

View File

@ -20,15 +20,15 @@ export class WebSocketClientHandler extends WebSocketClientHandlerAdapter {
return true return true
} }
handlerAdded(ctx: any) { handlerAdded(ctx: any) {
console.trace(`${ctx} handlerAdded`) console.debug(`${ctx} handlerAdded`)
this.handshakeFuture = ctx.newPromise() this.handshakeFuture = ctx.newPromise()
} }
channelActive(ctx: any) { channelActive(ctx: any) {
console.trace(`${ctx} channelActive`) console.debug(`${ctx} channelActive`)
this.handshaker.handshake(ctx.channel()) this.handshaker.handshake(ctx.channel())
} }
channelInactive(ctx: any) { channelInactive(ctx: any) {
console.trace(`${ctx} channelInactive`) console.debug(`${ctx} channelInactive`)
this.client.onclose({ code: 0, reason: 'server connection channel inactive!' }) this.client.onclose({ code: 0, reason: 'server connection channel inactive!' })
} }
channelRead0(ctx: any, msg: any) { channelRead0(ctx: any, msg: any) {
@ -54,7 +54,7 @@ export class WebSocketClientHandler extends WebSocketClientHandlerAdapter {
} }
} }
exceptionCaught(ctx: any, cause: Error) { exceptionCaught(ctx: any, cause: Error) {
console.trace(`${ctx} exceptionCaught ${cause}`) console.debug(`${ctx} exceptionCaught ${cause}`)
this.client.onerror({ error: cause }) this.client.onerror({ error: cause })
if (!this.handshakeFuture.isDone()) { if (!this.handshakeFuture.isDone()) {
this.handshakeFuture.setFailure(cause) this.handshakeFuture.setFailure(cause)

View File

@ -25,22 +25,51 @@ const CloseWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.Cl
const ChannelInitializer = Java.type('io.netty.channel.ChannelInitializer') const ChannelInitializer = Java.type('io.netty.channel.ChannelInitializer')
const DefaultHttpHeaders = Java.type('io.netty.handler.codec.http.DefaultHttpHeaders') const DefaultHttpHeaders = Java.type('io.netty.handler.codec.http.DefaultHttpHeaders')
const SslContextBuilder = Java.type('io.netty.handler.ssl.SslContextBuilder')
const InsecureTrustManagerFactory = Java.type('io.netty.handler.ssl.util.InsecureTrustManagerFactory')
const epull = Epoll.isAvailable() const epull = Epoll.isAvailable()
const group = epull ? new EpollEventLoopGroup() : new NioEventLoopGroup() const group = epull ? new EpollEventLoopGroup() : new NioEventLoopGroup()
const socketChannelClass = epull ? EpollSocketChannel.class : NioSocketChannel.class const socketChannelClass = epull ? EpollSocketChannel.class : NioSocketChannel.class
process.on('exit', () => group.shutdownGracefully()) process.on('exit', () => group.shutdownGracefully())
export class NettyWebSocket extends Transport { export class NettyWebSocket extends Transport {
private _uri: any
private _schema: string
private _host: string
private _port: number
private channel: any private channel: any
private b = new Bootstrap(); private b = new Bootstrap();
constructor(url: string, subProtocol: string = '', headers: WebSocketHeader = {}) { constructor(url: string, subProtocol: string = '', headers: WebSocketHeader = {}) {
super(url, subProtocol, headers) super(url, subProtocol, headers)
if (!url) {
throw new Error("Failed to construct 'WebSocket': The URL '" + url + "' is invalid.")
}
this._uri = URI.create(this._url)
this._schema = this._uri.getScheme() ?? 'ws'
if (["wss", "ws"].indexOf(this._schema) == -1) {
throw new Error("Failed to construct 'WebSocket': The URL's scheme must be either 'ws' or 'wss'. '" + this._schema + "' is not allowed.")
}
this._host = this._uri.getHost()
if (!this._host) {
throw new Error("Failed to construct 'WebSocket': The Host '" + this._host + "' is invalid.")
}
this._port = this._uri.getPort()
if (this._port == -1) {
if (this._schema == "wss") {
this._port = 443
} else if (this._schema == "ws") {
this._port = 80
}
}
console.debug(`constructor NettyWebSocket url: ${url} scheme: ${this._schema} host: ${this._host} port: ${this._port}`)
} }
getId() { getId() {
return this.channel?.id() + '' return this.channel?.id() + ''
} }
doConnect() { doConnect() {
console.debug('client NettyWebSocket doConnect', this._url)
let uri = URI.create(this._url) let uri = URI.create(this._url)
let headers = new DefaultHttpHeaders() let headers = new DefaultHttpHeaders()
for (const key of Object.getOwnPropertyNames(this._headers || {})) { for (const key of Object.getOwnPropertyNames(this._headers || {})) {
@ -56,12 +85,16 @@ export class NettyWebSocket extends Transport {
.handler(new ChannelInitializer({ .handler(new ChannelInitializer({
initChannel: (ch: any) => { initChannel: (ch: any) => {
let pipeline = ch.pipeline() let pipeline = ch.pipeline()
if (this._schema == "wss") {
let sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build()
pipeline.addLast(sslCtx.newHandler(ch.alloc(), this._host, this._port))
}
pipeline.addLast("http-codec", new HttpClientCodec()) pipeline.addLast("http-codec", new HttpClientCodec())
pipeline.addLast("aggregator", new HttpObjectAggregator(65536)) pipeline.addLast("aggregator", new HttpObjectAggregator(65536))
pipeline.addLast("websocket", handler.getHandler()) pipeline.addLast("websocket", handler.getHandler())
} }
})) }))
this.b.connect(uri.getHost(), uri.getPort()).addListener(new ChannelFutureListener((future: any) => { this.b.connect(this._host, this._port).addListener(new ChannelFutureListener((future: any) => {
this.channel = future.sync().channel() this.channel = future.sync().channel()
this.onconnection({}) this.onconnection({})
handler.handshakeFuture.addListener(new ChannelFutureListener((future: any) => { handler.handshakeFuture.addListener(new ChannelFutureListener((future: any) => {
@ -69,7 +102,7 @@ export class NettyWebSocket extends Transport {
future.sync() future.sync()
this.onconnect({}) this.onconnect({})
} catch (error) { } catch (error) {
console.debug(error) this.onerror({ error })
} }
})) }))
})) }))