From 48cc6325b8208d66ab0f165439703ba6276054a8 Mon Sep 17 00:00:00 2001 From: MiaoWoo Date: Thu, 17 Dec 2020 17:00:27 +0800 Subject: [PATCH] feat: add netty WebSocket client Signed-off-by: MiaoWoo --- packages/websocket/src/client/index.ts | 98 +++++++++++++++++ packages/websocket/src/client/interface.ts | 27 +++++ .../src/client/netty/adapter/handler.ts | 24 +++++ .../websocket/src/client/netty/handler.ts | 63 +++++++++++ packages/websocket/src/client/netty/index.ts | 88 +++++++++++++++ packages/websocket/src/client/transport.ts | 100 ++++++++++++++++++ 6 files changed, 400 insertions(+) create mode 100644 packages/websocket/src/client/index.ts create mode 100644 packages/websocket/src/client/interface.ts create mode 100644 packages/websocket/src/client/netty/adapter/handler.ts create mode 100644 packages/websocket/src/client/netty/handler.ts create mode 100644 packages/websocket/src/client/netty/index.ts create mode 100644 packages/websocket/src/client/transport.ts diff --git a/packages/websocket/src/client/index.ts b/packages/websocket/src/client/index.ts new file mode 100644 index 00000000..19e82c03 --- /dev/null +++ b/packages/websocket/src/client/index.ts @@ -0,0 +1,98 @@ + +import { EventEmitter } from 'events' +import { Transport } from './transport' +import { CloseEvent, ErrorEvent, Event, EventType, MessageEvent, WebSocketHeader } from './interface' + +export class WebSocketManager { + private clients = new Map() + + constructor() { + process.on('exit', () => { + for (const client of this.clients.values()) { + client.close(0, `client ${client.id} close connect`) + } + this.clients.clear() + }) + } + + add(client: WebSocket) { + this.clients.set(client.id, client) + } + del(client: WebSocket) { + this.clients.delete(client.id) + } +} + +export const managers = new WebSocketManager() + +export class WebSocket extends EventEmitter { + public static CONNECTING = 0 + public static OPEN = 1 + public static CLOSING = 2 + public static CLOSED = 3 + public binaryType: 'blob' | 'arraybuffer' + + protected _url: string + protected _headers: WebSocketHeader = {} + + private client: Transport + + constructor(url: string, subProtocol: string = '', headers: WebSocketHeader = {}) { + super() + this._url = url + this._headers = headers + try { + let TransportImpl = require('./netty').NettyWebSocket + this.client = new TransportImpl(url, subProtocol, headers) + } catch (error) { + console.error('create websocket impl error: ' + error) + console.ex(error) + return + } + this.client.on('open', (event) => { + this.onopen?.(event) + managers.add(this) + }) + this.client.on('message', (event) => this.onmessage?.(event)) + this.client.on('close', (event) => { + this.onclose?.(event) + managers.del(this) + }) + this.client.on('error', (event) => this.onerror?.(event)) + setTimeout(() => this.client.connect(), 20) + } + get id() { + return this.client.id + } + get bufferedAmount() { + throw new Error("Method not implemented.") + } + get extensions() { + throw new Error("Method not implemented.") + } + get protocol() { + return this.client.protocol + } + get readyState() { + return this.client.readyStatus + } + get url() { + return this._url + } + public onopen: (event: Event) => void + public onmessage: (event: MessageEvent) => void + public onclose: (event: CloseEvent) => void + public onerror: (event: ErrorEvent) => void + + addEventListener(event: EventType, callback: () => void) { + this[`on${event.toLowerCase()}`] = callback + this.client.on(event, callback) + } + public send(data: any) { + this.client.send(data) + } + public close(code?: number, reason?: string) { + this.client.close(code, reason) + this.removeAllListeners() + } +} diff --git a/packages/websocket/src/client/interface.ts b/packages/websocket/src/client/interface.ts new file mode 100644 index 00000000..27494afb --- /dev/null +++ b/packages/websocket/src/client/interface.ts @@ -0,0 +1,27 @@ + +export interface WebSocketHeader { + [key: string]: string +} + +export type EventType = + | 'close' + | 'error' + | 'message' + | 'open' +export interface Event { + +} +export interface MessageEvent extends Event { + data: any + origin?: string + lastEventId?: string + source?: string + ports?: string +} +export interface ErrorEvent extends Event { + error: Error +} +export interface CloseEvent extends Event { + code: number + reason: string +} diff --git a/packages/websocket/src/client/netty/adapter/handler.ts b/packages/websocket/src/client/netty/adapter/handler.ts new file mode 100644 index 00000000..5d386ae7 --- /dev/null +++ b/packages/websocket/src/client/netty/adapter/handler.ts @@ -0,0 +1,24 @@ +const SimpleChannelInboundHandler = Java.type('io.netty.channel.SimpleChannelInboundHandler') +export abstract class WebSocketClientHandlerAdapter { + private _Handler + constructor() { + let WebSocketClientHandlerAdapterImpl = Java.extend(SimpleChannelInboundHandler, { + isSharable: this.isSharable.bind(this), + handlerAdded: this.handlerAdded.bind(this), + channelActive: this.channelActive.bind(this), + channelInactive: this.channelInactive.bind(this), + channelRead0: this.channelRead0.bind(this), + exceptionCaught: this.exceptionCaught.bind(this) + }) + this._Handler = new WebSocketClientHandlerAdapterImpl() + } + abstract isSharable(): void + abstract handlerAdded(ctx: any): void + abstract channelActive(ctx: any): void + abstract channelInactive(ctx: any): void + abstract channelRead0(ctx: any, msg: any): void + abstract exceptionCaught(ctx: any, cause: Error): void + getHandler() { + return this._Handler + } +} diff --git a/packages/websocket/src/client/netty/handler.ts b/packages/websocket/src/client/netty/handler.ts new file mode 100644 index 00000000..b47ea567 --- /dev/null +++ b/packages/websocket/src/client/netty/handler.ts @@ -0,0 +1,63 @@ +import { EventEmitter } from 'events' +import { NettyWebSocket } from '.' +import { WebSocketClientHandlerAdapter } from './adapter/handler' + +const CharsetUtil = Java.type('io.netty.util.CharsetUtil') +const TextWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.TextWebSocketFrame') +const CloseWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.CloseWebSocketFrame') +const FullHttpResponse = Java.type('io.netty.handler.codec.http.FullHttpResponse') + +export class WebSocketClientHandler extends WebSocketClientHandlerAdapter { + public handshaker: any + public handshakeFuture: any//ChannelPromise + private client: NettyWebSocket + constructor(handshaker: any, client: NettyWebSocket) { + super() + this.handshaker = handshaker + this.client = client + } + isSharable() { + return true + } + handlerAdded(ctx: any) { + console.trace(`${ctx} handlerAdded`) + this.handshakeFuture = ctx.newPromise() + } + channelActive(ctx: any) { + console.trace(`${ctx} channelActive`) + this.handshaker.handshake(ctx.channel()) + } + channelInactive(ctx: any) { + console.trace(`${ctx} channelInactive`) + this.client.onclose({ code: 0, reason: 'server connection channel inactive!' }) + } + channelRead0(ctx: any, msg: any) { + console.trace(`${ctx} channelRead0 ${msg}`) + let ch = ctx.channel() + if (!this.handshaker.isHandshakeComplete()) { + // web socket client connected + this.handshaker.finishHandshake(ch, msg) + this.handshakeFuture.setSuccess() + return + } + + if (msg instanceof FullHttpResponse) { + let response = msg + throw new Error(`Unexpected FullHttpResponse (getStatus=${response.getStatus()}, content=${response.content().toString(CharsetUtil.UTF_8)})`) + } + + let frame = msg + if (frame instanceof TextWebSocketFrame) { + this.client.onmessage({ data: frame.text() }) + } else if (frame instanceof CloseWebSocketFrame) { + this.client.onclose({ code: 0, reason: 'server send CloseWebSocketFrame!' }) + } + } + exceptionCaught(ctx: any, cause: Error) { + console.trace(`${ctx} exceptionCaught ${cause}`) + this.client.onerror({ error: cause }) + if (!this.handshakeFuture.isDone()) { + this.handshakeFuture.setFailure(cause) + } + } +} diff --git a/packages/websocket/src/client/netty/index.ts b/packages/websocket/src/client/netty/index.ts new file mode 100644 index 00000000..cf50e671 --- /dev/null +++ b/packages/websocket/src/client/netty/index.ts @@ -0,0 +1,88 @@ + +import { WebSocketHeader } from '../interface' +import { Transport } from '../transport' +import { WebSocketClientHandler } from './handler' + +const URI = Java.type('java.net.URI') +const Epoll = Java.type('io.netty.channel.epoll.Epoll') +const Bootstrap = Java.type('io.netty.bootstrap.Bootstrap') +const ChannelFutureListener = Java.type('io.netty.channel.ChannelFutureListener') + +const NioEventLoopGroup = Java.type('io.netty.channel.nio.NioEventLoopGroup') +const NioSocketChannel = Java.type('io.netty.channel.socket.nio.NioSocketChannel') + +const EpollEventLoopGroup = Java.type('io.netty.channel.epoll.EpollEventLoopGroup') +const EpollSocketChannel = Java.type('io.netty.channel.epoll.EpollSocketChannel') + +const WebSocketClientHandshakerFactory = Java.type('io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory') +const WebSocketVersion = Java.type('io.netty.handler.codec.http.websocketx.WebSocketVersion') + +const HttpClientCodec = Java.type('io.netty.handler.codec.http.HttpClientCodec') +const HttpObjectAggregator = Java.type('io.netty.handler.codec.http.HttpObjectAggregator') +const TextWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.TextWebSocketFrame') +const CloseWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.CloseWebSocketFrame') + +const ChannelInitializer = Java.type('io.netty.channel.ChannelInitializer') +const DefaultHttpHeaders = Java.type('io.netty.handler.codec.http.DefaultHttpHeaders') + +const epull = Epoll.isAvailable() +const group = epull ? new EpollEventLoopGroup() : new NioEventLoopGroup() +const socketChannelClass = epull ? EpollSocketChannel.class : NioSocketChannel.class +process.on('exit', () => group.shutdownGracefully()) + +export class NettyWebSocket extends Transport { + private channel: any + private b = new Bootstrap(); + + constructor(url: string, subProtocol: string = '', headers: WebSocketHeader = {}) { + super(url, subProtocol, headers) + } + getId() { + return this.channel?.id() + '' + } + doConnect() { + let uri = URI.create(this._url) + let headers = new DefaultHttpHeaders() + for (const key of Object.getOwnPropertyNames(this._headers || {})) { + headers.add(key, this._headers[key]) + } + // Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00. + // If you change it to V00, ping is not supported and remember to change + // HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline. + let handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory + .newHandshaker(uri, WebSocketVersion.V13, null, false, headers), this) + this.b.group(group) + .channel(socketChannelClass) + .handler(new ChannelInitializer({ + initChannel: (ch: any) => { + let pipeline = ch.pipeline() + pipeline.addLast("http-codec", new HttpClientCodec()) + pipeline.addLast("aggregator", new HttpObjectAggregator(65536)) + pipeline.addLast("websocket", handler.getHandler()) + } + })) + this.b.connect(uri.getHost(), uri.getPort()).addListener(new ChannelFutureListener((future: any) => { + this.channel = future.sync().channel() + this.onconnection({}) + handler.handshakeFuture.addListener(new ChannelFutureListener((future: any) => { + try { + future.sync() + this.onconnect({}) + } catch (error) { + console.debug(error) + } + })) + })) + } + doSend(text: string) { + this.channel.writeAndFlush(new TextWebSocketFrame(text)) + } + doClose(code: number, reason: string) { + this.channel.writeAndFlush(new CloseWebSocketFrame()) + this.channel.close() + this.channel.closeFuture().addListener(new ChannelFutureListener(() => console.debug(`NettyWebSocket close code: ${code} reason: ${reason}`))) + } + getChannel() { + return this.channel + } +} diff --git a/packages/websocket/src/client/transport.ts b/packages/websocket/src/client/transport.ts new file mode 100644 index 00000000..30d086ef --- /dev/null +++ b/packages/websocket/src/client/transport.ts @@ -0,0 +1,100 @@ +import { EventEmitter } from 'events' +import { WebSocket } from './index' +import { CloseEvent, ErrorEvent, Event, MessageEvent, WebSocketHeader } from './interface' + +export abstract class Transport extends EventEmitter { + protected _url: string + protected _state: number = WebSocket.CONNECTING + protected _protocol: string + protected _headers: WebSocketHeader = {} + + constructor(uri: string, subProtocol: string = '', headers: WebSocketHeader = {}) { + super() + this._url = uri + this._protocol = subProtocol + this._headers = headers + } + + get id() { + return this.getId() + } + + get protocol() { + return this._protocol + } + + get readyStatus() { + return this._state + } + + set readyStatus(state: number) { + this._state = state + } + + connect() { + try { + this.doConnect() + } catch (error) { + this.onerror({ error }) + } + } + + send(text: string) { + try { + this.doSend(text) + } catch (error) { + this.onerror({ error }) + } + } + + close(code: number = 0, reason: string = '') { + if (this.readyStatus != WebSocket.CLOSING && this.readyStatus != WebSocket.CLOSED) { + this.readyStatus = WebSocket.CLOSING + try { + this.onclose({ code, reason }) + this.doClose(code, reason) + } catch (error) { + this.onerror({ error }) + } + } else { + console.debug(`${this.id} call close but state is ${this.readyStatus}`) + } + } + + onconnection(event: Event) { + this._state == WebSocket.CONNECTING + this.emit('connecting', event) + } + + onconnect(event: Event) { + console.debug(`${this.id} call onconnect`) + if (this.readyStatus != WebSocket.OPEN) { + this.readyStatus = WebSocket.OPEN + this.emit('open', event) + } else { + console.debug(`${this.id} call onconnect but state is ${this.readyStatus}`) + } + } + + onmessage(event: MessageEvent) { + this.emit('message', event) + } + + onerror(event: ErrorEvent) { + this.emit('error', event) + } + + onclose(event: CloseEvent) { + if (this.readyStatus != WebSocket.CLOSED) { + this.readyStatus = WebSocket.CLOSED + this.emit('close', event) + this.removeAllListeners() + } else { + console.debug(`${this.id} call onclose but state is ${this.readyStatus} CloseEvent[code: ${event.code}, reason: ${event.reason}]`) + } + } + abstract getId() + abstract doConnect() + abstract doSend(text: string) + abstract doClose(code: number, reason: string) +}