feat: add netty WebSocket client
Signed-off-by: MiaoWoo <admin@yumc.pw>
This commit is contained in:
parent
f42d5d12ed
commit
48cc6325b8
98
packages/websocket/src/client/index.ts
Normal file
98
packages/websocket/src/client/index.ts
Normal file
@ -0,0 +1,98 @@
|
||||
|
||||
import { EventEmitter } from 'events'
|
||||
import { Transport } from './transport'
|
||||
import { CloseEvent, ErrorEvent, Event, EventType, MessageEvent, WebSocketHeader } from './interface'
|
||||
|
||||
export class WebSocketManager {
|
||||
private clients = new Map<string, WebSocket>()
|
||||
|
||||
constructor() {
|
||||
process.on('exit', () => {
|
||||
for (const client of this.clients.values()) {
|
||||
client.close(0, `client ${client.id} close connect`)
|
||||
}
|
||||
this.clients.clear()
|
||||
})
|
||||
}
|
||||
|
||||
add(client: WebSocket) {
|
||||
this.clients.set(client.id, client)
|
||||
}
|
||||
del(client: WebSocket) {
|
||||
this.clients.delete(client.id)
|
||||
}
|
||||
}
|
||||
|
||||
export const managers = new WebSocketManager()
|
||||
|
||||
export class WebSocket extends EventEmitter {
|
||||
public static CONNECTING = 0
|
||||
public static OPEN = 1
|
||||
public static CLOSING = 2
|
||||
public static CLOSED = 3
|
||||
public binaryType: 'blob' | 'arraybuffer'
|
||||
|
||||
protected _url: string
|
||||
protected _headers: WebSocketHeader = {}
|
||||
|
||||
private client: Transport
|
||||
|
||||
constructor(url: string, subProtocol: string = '', headers: WebSocketHeader = {}) {
|
||||
super()
|
||||
this._url = url
|
||||
this._headers = headers
|
||||
try {
|
||||
let TransportImpl = require('./netty').NettyWebSocket
|
||||
this.client = new TransportImpl(url, subProtocol, headers)
|
||||
} catch (error) {
|
||||
console.error('create websocket impl error: ' + error)
|
||||
console.ex(error)
|
||||
return
|
||||
}
|
||||
this.client.on('open', (event) => {
|
||||
this.onopen?.(event)
|
||||
managers.add(this)
|
||||
})
|
||||
this.client.on('message', (event) => this.onmessage?.(event))
|
||||
this.client.on('close', (event) => {
|
||||
this.onclose?.(event)
|
||||
managers.del(this)
|
||||
})
|
||||
this.client.on('error', (event) => this.onerror?.(event))
|
||||
setTimeout(() => this.client.connect(), 20)
|
||||
}
|
||||
get id() {
|
||||
return this.client.id
|
||||
}
|
||||
get bufferedAmount() {
|
||||
throw new Error("Method not implemented.")
|
||||
}
|
||||
get extensions() {
|
||||
throw new Error("Method not implemented.")
|
||||
}
|
||||
get protocol() {
|
||||
return this.client.protocol
|
||||
}
|
||||
get readyState() {
|
||||
return this.client.readyStatus
|
||||
}
|
||||
get url() {
|
||||
return this._url
|
||||
}
|
||||
public onopen: (event: Event) => void
|
||||
public onmessage: (event: MessageEvent) => void
|
||||
public onclose: (event: CloseEvent) => void
|
||||
public onerror: (event: ErrorEvent) => void
|
||||
|
||||
addEventListener(event: EventType, callback: () => void) {
|
||||
this[`on${event.toLowerCase()}`] = callback
|
||||
this.client.on(event, callback)
|
||||
}
|
||||
public send(data: any) {
|
||||
this.client.send(data)
|
||||
}
|
||||
public close(code?: number, reason?: string) {
|
||||
this.client.close(code, reason)
|
||||
this.removeAllListeners()
|
||||
}
|
||||
}
|
27
packages/websocket/src/client/interface.ts
Normal file
27
packages/websocket/src/client/interface.ts
Normal file
@ -0,0 +1,27 @@
|
||||
|
||||
export interface WebSocketHeader {
|
||||
[key: string]: string
|
||||
}
|
||||
|
||||
export type EventType =
|
||||
| 'close'
|
||||
| 'error'
|
||||
| 'message'
|
||||
| 'open'
|
||||
export interface Event {
|
||||
|
||||
}
|
||||
export interface MessageEvent extends Event {
|
||||
data: any
|
||||
origin?: string
|
||||
lastEventId?: string
|
||||
source?: string
|
||||
ports?: string
|
||||
}
|
||||
export interface ErrorEvent extends Event {
|
||||
error: Error
|
||||
}
|
||||
export interface CloseEvent extends Event {
|
||||
code: number
|
||||
reason: string
|
||||
}
|
24
packages/websocket/src/client/netty/adapter/handler.ts
Normal file
24
packages/websocket/src/client/netty/adapter/handler.ts
Normal file
@ -0,0 +1,24 @@
|
||||
const SimpleChannelInboundHandler = Java.type('io.netty.channel.SimpleChannelInboundHandler')
|
||||
export abstract class WebSocketClientHandlerAdapter {
|
||||
private _Handler
|
||||
constructor() {
|
||||
let WebSocketClientHandlerAdapterImpl = Java.extend(SimpleChannelInboundHandler, {
|
||||
isSharable: this.isSharable.bind(this),
|
||||
handlerAdded: this.handlerAdded.bind(this),
|
||||
channelActive: this.channelActive.bind(this),
|
||||
channelInactive: this.channelInactive.bind(this),
|
||||
channelRead0: this.channelRead0.bind(this),
|
||||
exceptionCaught: this.exceptionCaught.bind(this)
|
||||
})
|
||||
this._Handler = new WebSocketClientHandlerAdapterImpl()
|
||||
}
|
||||
abstract isSharable(): void
|
||||
abstract handlerAdded(ctx: any): void
|
||||
abstract channelActive(ctx: any): void
|
||||
abstract channelInactive(ctx: any): void
|
||||
abstract channelRead0(ctx: any, msg: any): void
|
||||
abstract exceptionCaught(ctx: any, cause: Error): void
|
||||
getHandler() {
|
||||
return this._Handler
|
||||
}
|
||||
}
|
63
packages/websocket/src/client/netty/handler.ts
Normal file
63
packages/websocket/src/client/netty/handler.ts
Normal file
@ -0,0 +1,63 @@
|
||||
import { EventEmitter } from 'events'
|
||||
import { NettyWebSocket } from '.'
|
||||
import { WebSocketClientHandlerAdapter } from './adapter/handler'
|
||||
|
||||
const CharsetUtil = Java.type('io.netty.util.CharsetUtil')
|
||||
const TextWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.TextWebSocketFrame')
|
||||
const CloseWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.CloseWebSocketFrame')
|
||||
const FullHttpResponse = Java.type('io.netty.handler.codec.http.FullHttpResponse')
|
||||
|
||||
export class WebSocketClientHandler extends WebSocketClientHandlerAdapter {
|
||||
public handshaker: any
|
||||
public handshakeFuture: any//ChannelPromise
|
||||
private client: NettyWebSocket
|
||||
constructor(handshaker: any, client: NettyWebSocket) {
|
||||
super()
|
||||
this.handshaker = handshaker
|
||||
this.client = client
|
||||
}
|
||||
isSharable() {
|
||||
return true
|
||||
}
|
||||
handlerAdded(ctx: any) {
|
||||
console.trace(`${ctx} handlerAdded`)
|
||||
this.handshakeFuture = ctx.newPromise()
|
||||
}
|
||||
channelActive(ctx: any) {
|
||||
console.trace(`${ctx} channelActive`)
|
||||
this.handshaker.handshake(ctx.channel())
|
||||
}
|
||||
channelInactive(ctx: any) {
|
||||
console.trace(`${ctx} channelInactive`)
|
||||
this.client.onclose({ code: 0, reason: 'server connection channel inactive!' })
|
||||
}
|
||||
channelRead0(ctx: any, msg: any) {
|
||||
console.trace(`${ctx} channelRead0 ${msg}`)
|
||||
let ch = ctx.channel()
|
||||
if (!this.handshaker.isHandshakeComplete()) {
|
||||
// web socket client connected
|
||||
this.handshaker.finishHandshake(ch, msg)
|
||||
this.handshakeFuture.setSuccess()
|
||||
return
|
||||
}
|
||||
|
||||
if (msg instanceof FullHttpResponse) {
|
||||
let response = msg
|
||||
throw new Error(`Unexpected FullHttpResponse (getStatus=${response.getStatus()}, content=${response.content().toString(CharsetUtil.UTF_8)})`)
|
||||
}
|
||||
|
||||
let frame = msg
|
||||
if (frame instanceof TextWebSocketFrame) {
|
||||
this.client.onmessage({ data: frame.text() })
|
||||
} else if (frame instanceof CloseWebSocketFrame) {
|
||||
this.client.onclose({ code: 0, reason: 'server send CloseWebSocketFrame!' })
|
||||
}
|
||||
}
|
||||
exceptionCaught(ctx: any, cause: Error) {
|
||||
console.trace(`${ctx} exceptionCaught ${cause}`)
|
||||
this.client.onerror({ error: cause })
|
||||
if (!this.handshakeFuture.isDone()) {
|
||||
this.handshakeFuture.setFailure(cause)
|
||||
}
|
||||
}
|
||||
}
|
88
packages/websocket/src/client/netty/index.ts
Normal file
88
packages/websocket/src/client/netty/index.ts
Normal file
@ -0,0 +1,88 @@
|
||||
|
||||
import { WebSocketHeader } from '../interface'
|
||||
import { Transport } from '../transport'
|
||||
import { WebSocketClientHandler } from './handler'
|
||||
|
||||
const URI = Java.type('java.net.URI')
|
||||
const Epoll = Java.type('io.netty.channel.epoll.Epoll')
|
||||
const Bootstrap = Java.type('io.netty.bootstrap.Bootstrap')
|
||||
const ChannelFutureListener = Java.type('io.netty.channel.ChannelFutureListener')
|
||||
|
||||
const NioEventLoopGroup = Java.type('io.netty.channel.nio.NioEventLoopGroup')
|
||||
const NioSocketChannel = Java.type('io.netty.channel.socket.nio.NioSocketChannel')
|
||||
|
||||
const EpollEventLoopGroup = Java.type('io.netty.channel.epoll.EpollEventLoopGroup')
|
||||
const EpollSocketChannel = Java.type('io.netty.channel.epoll.EpollSocketChannel')
|
||||
|
||||
const WebSocketClientHandshakerFactory = Java.type('io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory')
|
||||
const WebSocketVersion = Java.type('io.netty.handler.codec.http.websocketx.WebSocketVersion')
|
||||
|
||||
const HttpClientCodec = Java.type('io.netty.handler.codec.http.HttpClientCodec')
|
||||
const HttpObjectAggregator = Java.type('io.netty.handler.codec.http.HttpObjectAggregator')
|
||||
const TextWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.TextWebSocketFrame')
|
||||
const CloseWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.CloseWebSocketFrame')
|
||||
|
||||
const ChannelInitializer = Java.type('io.netty.channel.ChannelInitializer')
|
||||
const DefaultHttpHeaders = Java.type('io.netty.handler.codec.http.DefaultHttpHeaders')
|
||||
|
||||
const epull = Epoll.isAvailable()
|
||||
const group = epull ? new EpollEventLoopGroup() : new NioEventLoopGroup()
|
||||
const socketChannelClass = epull ? EpollSocketChannel.class : NioSocketChannel.class
|
||||
process.on('exit', () => group.shutdownGracefully())
|
||||
|
||||
export class NettyWebSocket extends Transport {
|
||||
private channel: any
|
||||
private b = new Bootstrap();
|
||||
|
||||
constructor(url: string, subProtocol: string = '', headers: WebSocketHeader = {}) {
|
||||
super(url, subProtocol, headers)
|
||||
}
|
||||
getId() {
|
||||
return this.channel?.id() + ''
|
||||
}
|
||||
doConnect() {
|
||||
let uri = URI.create(this._url)
|
||||
let headers = new DefaultHttpHeaders()
|
||||
for (const key of Object.getOwnPropertyNames(this._headers || {})) {
|
||||
headers.add(key, this._headers[key])
|
||||
}
|
||||
// Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00.
|
||||
// If you change it to V00, ping is not supported and remember to change
|
||||
// HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline.
|
||||
let handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory
|
||||
.newHandshaker(uri, WebSocketVersion.V13, null, false, headers), this)
|
||||
this.b.group(group)
|
||||
.channel(socketChannelClass)
|
||||
.handler(new ChannelInitializer({
|
||||
initChannel: (ch: any) => {
|
||||
let pipeline = ch.pipeline()
|
||||
pipeline.addLast("http-codec", new HttpClientCodec())
|
||||
pipeline.addLast("aggregator", new HttpObjectAggregator(65536))
|
||||
pipeline.addLast("websocket", handler.getHandler())
|
||||
}
|
||||
}))
|
||||
this.b.connect(uri.getHost(), uri.getPort()).addListener(new ChannelFutureListener((future: any) => {
|
||||
this.channel = future.sync().channel()
|
||||
this.onconnection({})
|
||||
handler.handshakeFuture.addListener(new ChannelFutureListener((future: any) => {
|
||||
try {
|
||||
future.sync()
|
||||
this.onconnect({})
|
||||
} catch (error) {
|
||||
console.debug(error)
|
||||
}
|
||||
}))
|
||||
}))
|
||||
}
|
||||
doSend(text: string) {
|
||||
this.channel.writeAndFlush(new TextWebSocketFrame(text))
|
||||
}
|
||||
doClose(code: number, reason: string) {
|
||||
this.channel.writeAndFlush(new CloseWebSocketFrame())
|
||||
this.channel.close()
|
||||
this.channel.closeFuture().addListener(new ChannelFutureListener(() => console.debug(`NettyWebSocket close code: ${code} reason: ${reason}`)))
|
||||
}
|
||||
getChannel() {
|
||||
return this.channel
|
||||
}
|
||||
}
|
100
packages/websocket/src/client/transport.ts
Normal file
100
packages/websocket/src/client/transport.ts
Normal file
@ -0,0 +1,100 @@
|
||||
import { EventEmitter } from 'events'
|
||||
import { WebSocket } from './index'
|
||||
import { CloseEvent, ErrorEvent, Event, MessageEvent, WebSocketHeader } from './interface'
|
||||
|
||||
export abstract class Transport extends EventEmitter {
|
||||
protected _url: string
|
||||
protected _state: number = WebSocket.CONNECTING
|
||||
protected _protocol: string
|
||||
protected _headers: WebSocketHeader = {}
|
||||
|
||||
constructor(uri: string, subProtocol: string = '', headers: WebSocketHeader = {}) {
|
||||
super()
|
||||
this._url = uri
|
||||
this._protocol = subProtocol
|
||||
this._headers = headers
|
||||
}
|
||||
|
||||
get id() {
|
||||
return this.getId()
|
||||
}
|
||||
|
||||
get protocol() {
|
||||
return this._protocol
|
||||
}
|
||||
|
||||
get readyStatus() {
|
||||
return this._state
|
||||
}
|
||||
|
||||
set readyStatus(state: number) {
|
||||
this._state = state
|
||||
}
|
||||
|
||||
connect() {
|
||||
try {
|
||||
this.doConnect()
|
||||
} catch (error) {
|
||||
this.onerror({ error })
|
||||
}
|
||||
}
|
||||
|
||||
send(text: string) {
|
||||
try {
|
||||
this.doSend(text)
|
||||
} catch (error) {
|
||||
this.onerror({ error })
|
||||
}
|
||||
}
|
||||
|
||||
close(code: number = 0, reason: string = '') {
|
||||
if (this.readyStatus != WebSocket.CLOSING && this.readyStatus != WebSocket.CLOSED) {
|
||||
this.readyStatus = WebSocket.CLOSING
|
||||
try {
|
||||
this.onclose({ code, reason })
|
||||
this.doClose(code, reason)
|
||||
} catch (error) {
|
||||
this.onerror({ error })
|
||||
}
|
||||
} else {
|
||||
console.debug(`${this.id} call close but state is ${this.readyStatus}`)
|
||||
}
|
||||
}
|
||||
|
||||
onconnection(event: Event) {
|
||||
this._state == WebSocket.CONNECTING
|
||||
this.emit('connecting', event)
|
||||
}
|
||||
|
||||
onconnect(event: Event) {
|
||||
console.debug(`${this.id} call onconnect`)
|
||||
if (this.readyStatus != WebSocket.OPEN) {
|
||||
this.readyStatus = WebSocket.OPEN
|
||||
this.emit('open', event)
|
||||
} else {
|
||||
console.debug(`${this.id} call onconnect but state is ${this.readyStatus}`)
|
||||
}
|
||||
}
|
||||
|
||||
onmessage(event: MessageEvent) {
|
||||
this.emit('message', event)
|
||||
}
|
||||
|
||||
onerror(event: ErrorEvent) {
|
||||
this.emit('error', event)
|
||||
}
|
||||
|
||||
onclose(event: CloseEvent) {
|
||||
if (this.readyStatus != WebSocket.CLOSED) {
|
||||
this.readyStatus = WebSocket.CLOSED
|
||||
this.emit('close', event)
|
||||
this.removeAllListeners()
|
||||
} else {
|
||||
console.debug(`${this.id} call onclose but state is ${this.readyStatus} CloseEvent[code: ${event.code}, reason: ${event.reason}]`)
|
||||
}
|
||||
}
|
||||
abstract getId()
|
||||
abstract doConnect()
|
||||
abstract doSend(text: string)
|
||||
abstract doClose(code: number, reason: string)
|
||||
}
|
Loading…
Reference in New Issue
Block a user