feat: optimize websocket client
This commit is contained in:
@@ -28,12 +28,14 @@ export class WebSocketManager {
|
||||
export const manager = new WebSocketManager()
|
||||
|
||||
export class WebSocket extends EventEmitter {
|
||||
public static manager: WebSocketManager = manager
|
||||
|
||||
public static CONNECTING = 0
|
||||
public static OPEN = 1
|
||||
public static CLOSING = 2
|
||||
public static CLOSED = 3
|
||||
|
||||
public binaryType: 'blob' | 'arraybuffer'
|
||||
protected manager: WebSocketManager
|
||||
|
||||
protected _url: string
|
||||
protected _headers: WebSocketHeader = {}
|
||||
@@ -42,7 +44,6 @@ export class WebSocket extends EventEmitter {
|
||||
|
||||
constructor(url: string, subProtocol: string | string[] = '', headers: WebSocketHeader = {}) {
|
||||
super()
|
||||
this.manager = manager
|
||||
this._url = url
|
||||
this._headers = headers
|
||||
try {
|
||||
@@ -54,7 +55,14 @@ export class WebSocket extends EventEmitter {
|
||||
console.ex(error)
|
||||
return
|
||||
}
|
||||
// mamanger connected client
|
||||
manager.add(this)
|
||||
this.client.on(ClientEvent.close, (_) => manager.del(this))
|
||||
// add event forward
|
||||
this.client.on(ClientEvent.open, (event) => this.onopen?.(event))
|
||||
this.client.on(ClientEvent.message, (event) => this.onmessage?.(event))
|
||||
this.client.on(ClientEvent.close, (event) => this.onclose?.(event))
|
||||
this.client.on(ClientEvent.error, (event) => this.onerror?.(event))
|
||||
setTimeout(() => this.client.connect(), 20)
|
||||
}
|
||||
get id() {
|
||||
@@ -70,30 +78,31 @@ export class WebSocket extends EventEmitter {
|
||||
return this.client.protocol
|
||||
}
|
||||
get readyState() {
|
||||
return this.client.readyStatus
|
||||
return this.client.readyState
|
||||
}
|
||||
get url() {
|
||||
return this._url
|
||||
}
|
||||
set onopen(func: (event: Event) => void) {
|
||||
this.client.on(ClientEvent.open, func)
|
||||
|
||||
public onopen: (event: Event) => void
|
||||
public onmessage: (event: MessageEvent) => void
|
||||
public onclose: (event: CloseEvent) => void
|
||||
public onerror: (event: ErrorEvent) => void
|
||||
|
||||
public on(eventName: string | symbol, listener: (...args: any[]) => void): this {
|
||||
this.client.on(eventName, listener)
|
||||
return this
|
||||
}
|
||||
set onmessage(func: (event: MessageEvent) => void) {
|
||||
this.client.on(ClientEvent.message, func)
|
||||
}
|
||||
set onclose(func: (event: CloseEvent) => void) {
|
||||
this.client.on(ClientEvent.close, func)
|
||||
manager.del(this)
|
||||
}
|
||||
set onerror(func: (event: ErrorEvent) => void) {
|
||||
this.client.on(ClientEvent.error, func)
|
||||
public emit(eventName: string | symbol, ...args: any[]): boolean {
|
||||
return this.client.emit(eventName, ...args)
|
||||
}
|
||||
|
||||
public send(data: any) {
|
||||
this.client.send(data)
|
||||
return this.client.send(data)
|
||||
}
|
||||
|
||||
public close(code?: number, reason?: string) {
|
||||
this.client.close(code, reason)
|
||||
manager.del(this)
|
||||
return this.client.close(code, reason)
|
||||
}
|
||||
}
|
||||
global.setGlobal('WebSocket', WebSocket)
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
import { NettyWebSocket } from '.'
|
||||
import { WebSocketClientHandlerAdapter } from './adapter/handler'
|
||||
|
||||
const Throwable = Java.type('java.lang.Throwable')
|
||||
const RuntimeException = Java.type('java.lang.RuntimeException')
|
||||
|
||||
const CharsetUtil = Java.type('io.netty.util.CharsetUtil')
|
||||
const TextWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.TextWebSocketFrame')
|
||||
const CloseWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.CloseWebSocketFrame')
|
||||
@@ -20,6 +23,7 @@ export class WebSocketClientHandler extends WebSocketClientHandlerAdapter {
|
||||
return true
|
||||
}
|
||||
handlerAdded(ctx: any) {
|
||||
this.client.onconnection({})
|
||||
if (ctx.newPromise) {
|
||||
this.handshakeFuture = ctx.newPromise()
|
||||
} else {
|
||||
@@ -28,18 +32,21 @@ export class WebSocketClientHandler extends WebSocketClientHandlerAdapter {
|
||||
}
|
||||
channelActive(ctx: any) {
|
||||
this.handshaker.handshake(ctx.channel())
|
||||
setTimeout(() => {
|
||||
this.abortHandshake(new Error('handshake timed out.'))
|
||||
}, 10000)
|
||||
}
|
||||
channelInactive(ctx: any) {
|
||||
if (this.client.readyStatus != WebSocket.CLOSED) {
|
||||
this.client.onclose({ code: 1006, reason: 'client connection channel inactive.' })
|
||||
}
|
||||
this.client.close(1006, 'connection was closed abnormally.', true)
|
||||
}
|
||||
channelRead0(ctx: any, msg: any) {
|
||||
let ch = ctx.channel()
|
||||
if (!this.handshaker.isHandshakeComplete()) {
|
||||
// web socket client connected
|
||||
console.debug(`Netty Handler channelRead0 websocket client connected`)
|
||||
// websocket client connected
|
||||
this.handshaker.finishHandshake(ch, msg)
|
||||
this.handshakeFuture.setSuccess()
|
||||
this.client.onconnect({})
|
||||
return
|
||||
}
|
||||
|
||||
@@ -52,14 +59,19 @@ export class WebSocketClientHandler extends WebSocketClientHandlerAdapter {
|
||||
if (frame instanceof TextWebSocketFrame) {
|
||||
this.client.onmessage({ data: frame.text() })
|
||||
} else if (frame instanceof CloseWebSocketFrame) {
|
||||
this.client.close(1000, 'server close connection.')
|
||||
this.client.receiverClose(frame.statusCode(), frame.reasonText())
|
||||
}
|
||||
}
|
||||
abortHandshake(reason: Error) {
|
||||
if (this.handshakeFuture.isDone()) { return }
|
||||
if (!(reason instanceof Throwable)) {
|
||||
reason = new RuntimeException(reason)
|
||||
}
|
||||
this.handshakeFuture.setFailure(reason)
|
||||
}
|
||||
exceptionCaught(ctx: any, cause: Error) {
|
||||
console.debug(`${ctx} exceptionCaught ${cause}`)
|
||||
this.client.abortHandshake(cause)
|
||||
this.client.onerror({ error: cause })
|
||||
if (!this.handshakeFuture.isDone()) {
|
||||
this.handshakeFuture.setFailure(cause)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -60,6 +60,8 @@ export class NettyWebSocket extends Transport {
|
||||
private channel: any
|
||||
private b: any
|
||||
|
||||
private handler: any
|
||||
|
||||
constructor(url: string, subProtocol: string = '', headers: WebSocketHeader = {}) {
|
||||
super(url, subProtocol, headers)
|
||||
if (!url) {
|
||||
@@ -88,7 +90,6 @@ export class NettyWebSocket extends Transport {
|
||||
return `${this.channel?.id()}` || `NettyWebSocket#${channelCount.incrementAndGet()}`
|
||||
}
|
||||
doConnect() {
|
||||
console.debug('client NettyWebSocket doConnect', this._url)
|
||||
let uri = URI.create(this._url)
|
||||
let headers = new DefaultHttpHeaders()
|
||||
for (const key of Object.getOwnPropertyNames(this._headers || {})) {
|
||||
@@ -97,7 +98,7 @@ export class NettyWebSocket extends Transport {
|
||||
// Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00.
|
||||
// If you change it to V00, ping is not supported and remember to change
|
||||
// HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline.
|
||||
let handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory
|
||||
this.handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory
|
||||
.newHandshaker(uri, WebSocketVersion.V13, null, false, headers), this)
|
||||
this.b = new Bootstrap()
|
||||
this.b.group(group)
|
||||
@@ -108,7 +109,7 @@ export class NettyWebSocket extends Transport {
|
||||
if (this._schema == "wss") {
|
||||
if (SslContextBuilder) {
|
||||
let sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build()
|
||||
pipeline.addLast(sslCtx.newHandler(ch.alloc(), this._host, this._port))
|
||||
pipeline.addLast('ssl', sslCtx.newHandler(ch.alloc(), this._host, this._port))
|
||||
} else {
|
||||
let sslEngine = SSLContext.getDefault().createSSLEngine()
|
||||
sslEngine.setUseClientMode(true)
|
||||
@@ -117,37 +118,37 @@ export class NettyWebSocket extends Transport {
|
||||
}
|
||||
pipeline.addLast("http-codec", new HttpClientCodec())
|
||||
pipeline.addLast("aggregator", new HttpObjectAggregator(65536))
|
||||
pipeline.addLast("websocket", handler.getHandler())
|
||||
pipeline.addLast("websocket", this.handler.getHandler())
|
||||
}
|
||||
}))
|
||||
this.b.connect(this._host, this._port).addListener(new ChannelFutureListener((future: any) => {
|
||||
try {
|
||||
this.channel = future.sync().channel()
|
||||
this.onconnection({})
|
||||
handler.handshakeFuture.addListener(new ChannelFutureListener((future: any) => {
|
||||
try {
|
||||
future.sync()
|
||||
// only trigger onconnect when not have error
|
||||
this.onconnect({})
|
||||
} catch (error: any) {
|
||||
// ignore error exceptionCaught from handler
|
||||
// this.onerror({ error })
|
||||
}
|
||||
}))
|
||||
} catch (error: any) {
|
||||
this.onerror({ error })
|
||||
}
|
||||
}))
|
||||
try {
|
||||
this.channel = this.b.connect(this._host, this._port).sync().channel()
|
||||
this.handler.handshakeFuture.sync()
|
||||
} catch (error) {
|
||||
// ignore connect error
|
||||
// tigger error at handshakeFuture
|
||||
}
|
||||
}
|
||||
doSend(text: string) {
|
||||
this.channel.writeAndFlush(new TextWebSocketFrame(text))
|
||||
}
|
||||
doClose(code: number, reason: string) {
|
||||
this.channel.writeAndFlush(new CloseWebSocketFrame())
|
||||
this.channel.closeFuture().addListener(new ChannelFutureListener(() => {
|
||||
doClose(code: number, reason: string, wasClean: boolean = false) {
|
||||
console.debug(`Netty Client doClose code: ${code} reason: ${reason}`)
|
||||
if (this.readyState == WebSocket.CLOSING) {
|
||||
if (!this._closeFrameSent) {
|
||||
console.debug(`Netty Client doClose send close frame`)
|
||||
this.channel?.writeAndFlush(new CloseWebSocketFrame(code, reason))
|
||||
this._closeFrameSent = true
|
||||
}
|
||||
if (!this._closeFrameReceived && !wasClean) { return console.debug(`Netty Client doClose wait server send close`) }
|
||||
}
|
||||
this.channel?.closeFuture().addListener(new ChannelFutureListener(() => {
|
||||
this.onclose({ code, reason })
|
||||
}))
|
||||
}
|
||||
abortHandshake(reason: Error): void {
|
||||
this.handler.abortHandshake(reason)
|
||||
}
|
||||
getChannel() {
|
||||
return this.channel
|
||||
}
|
||||
|
||||
@@ -8,6 +8,9 @@ export abstract class Transport extends EventEmitter {
|
||||
protected _protocol: string
|
||||
protected _headers: WebSocketHeader = {}
|
||||
|
||||
protected _closeFrameReceived = false;
|
||||
protected _closeFrameSent = false;
|
||||
|
||||
constructor(uri: string, subProtocol: string = '', headers: WebSocketHeader = {}) {
|
||||
super()
|
||||
this._url = uri
|
||||
@@ -23,24 +26,22 @@ export abstract class Transport extends EventEmitter {
|
||||
return this._protocol
|
||||
}
|
||||
|
||||
get readyStatus() {
|
||||
get readyState() {
|
||||
return this._state
|
||||
}
|
||||
|
||||
set readyStatus(state: number) {
|
||||
set readyState(state: number) {
|
||||
this._state = state
|
||||
}
|
||||
|
||||
connect() {
|
||||
try {
|
||||
this.doConnect()
|
||||
} catch (error: any) {
|
||||
console.ex(error)
|
||||
this.onerror({ error })
|
||||
}
|
||||
this.doConnect()
|
||||
}
|
||||
|
||||
send(text: string) {
|
||||
if (this.readyState === WebSocket.CONNECTING) {
|
||||
throw new Error('WebSocket is not open: readyState 0 (CONNECTING)');
|
||||
}
|
||||
try {
|
||||
this.doSend(text)
|
||||
} catch (error: any) {
|
||||
@@ -48,16 +49,24 @@ export abstract class Transport extends EventEmitter {
|
||||
}
|
||||
}
|
||||
|
||||
close(code: number = 1000, reason: string = '') {
|
||||
if (this.readyStatus < WebSocket.CLOSING) {
|
||||
this.readyStatus = WebSocket.CLOSING
|
||||
try {
|
||||
this.doClose(code, reason)
|
||||
} catch (error: any) {
|
||||
this.onerror({ error })
|
||||
close(code: number = 1000, reason: string = '', wasClean: boolean = false) {
|
||||
if (this.readyState === WebSocket.CLOSED) return;
|
||||
if (this.readyState === WebSocket.CONNECTING) {
|
||||
const msg = 'WebSocket was closed before the connection was established';
|
||||
this.abortHandshake(new Error(msg));
|
||||
return;
|
||||
}
|
||||
if (this.readyState === WebSocket.CLOSING) {
|
||||
if (this._closeFrameSent && this._closeFrameReceived) {
|
||||
this.onclose({ code, reason });
|
||||
}
|
||||
} else {
|
||||
console.debug(`WebSocket Transport ${this.id} call close code ${code} reason ${reason} but state is ${this.readyStatus}`)
|
||||
return;
|
||||
}
|
||||
this.readyState = WebSocket.CLOSING
|
||||
try {
|
||||
this.doClose(code, reason, wasClean)
|
||||
} catch (error: any) {
|
||||
this.onerror({ error })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -67,11 +76,11 @@ export abstract class Transport extends EventEmitter {
|
||||
}
|
||||
|
||||
onconnect(event: Event) {
|
||||
if (this.readyStatus != WebSocket.OPEN) {
|
||||
this.readyStatus = WebSocket.OPEN
|
||||
if (this.readyState != WebSocket.OPEN) {
|
||||
this.readyState = WebSocket.OPEN
|
||||
this.emit(ClientEvent.open, event)
|
||||
} else {
|
||||
console.debug(`WebSocket Transport ${this.id} call onconnect but state is ${this.readyStatus}`)
|
||||
console.debug(`WebSocket Transport ${this.id} call onconnect but state is ${this.readyState}`)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -84,16 +93,24 @@ export abstract class Transport extends EventEmitter {
|
||||
}
|
||||
|
||||
onclose(event: CloseEvent) {
|
||||
if (this.readyStatus != WebSocket.CLOSED) {
|
||||
this.readyStatus = WebSocket.CLOSED
|
||||
console.debug(`WebSocket Transport ${this.id} call onclose CloseEvent[code: ${event.code}, reason: ${event.reason}]`)
|
||||
if (this.readyState != WebSocket.CLOSED) {
|
||||
this.readyState = WebSocket.CLOSED
|
||||
this.emit(ClientEvent.close, event)
|
||||
} else {
|
||||
console.debug(`WebSocket Transport ${this.id} call onclose but state is ${this.readyStatus} CloseEvent[code: ${event.code}, reason: ${event.reason}]`)
|
||||
console.debug(`WebSocket Transport ${this.id} call onclose but state is ${this.readyState} CloseEvent[code: ${event.code}, reason: ${event.reason}]`)
|
||||
}
|
||||
}
|
||||
|
||||
receiverClose(code: number, reason: string) {
|
||||
console.debug(`Netty Handler receeve close code: ${code} reason: ${reason}`)
|
||||
this._closeFrameReceived = true;
|
||||
this.close(code, reason)
|
||||
}
|
||||
|
||||
abstract getId(): string
|
||||
abstract doConnect(): void
|
||||
abstract doSend(text: string): void
|
||||
abstract doClose(code: number, reason: string): void
|
||||
abstract doClose(code: number, reason: string, wasClean?: boolean): void
|
||||
abstract abortHandshake(reason: Error): void
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user