refactor(websocket): upgrade socket.io
Signed-off-by: MiaoWoo <admin@yumc.pw>
This commit is contained in:
20
packages/websocket/src/netty/adapter/httprequest.ts
Normal file
20
packages/websocket/src/netty/adapter/httprequest.ts
Normal file
@ -0,0 +1,20 @@
|
||||
const TypeParameterMatcher = Java.type('io.netty.util.internal.TypeParameterMatcher')
|
||||
const SimpleChannelInboundHandler = Java.type('io.netty.channel.SimpleChannelInboundHandler')
|
||||
const FullHttpRequestMatcher = TypeParameterMatcher.get(base.getClass('io.netty.handler.codec.http.FullHttpRequest'))
|
||||
|
||||
export abstract class HttpRequestHandlerAdapter {
|
||||
private _Handler;
|
||||
constructor() {
|
||||
let HttpRequestHandlerAdapterImpl = Java.extend(SimpleChannelInboundHandler, {
|
||||
acceptInboundMessage: (msg: any) => {
|
||||
return FullHttpRequestMatcher.match(msg)
|
||||
},
|
||||
channelRead0: this.channelRead0.bind(this)
|
||||
})
|
||||
this._Handler = new HttpRequestHandlerAdapterImpl();
|
||||
}
|
||||
abstract channelRead0(ctx: any, request: any);
|
||||
getHandler() {
|
||||
return this._Handler;
|
||||
}
|
||||
}
|
3
packages/websocket/src/netty/adapter/index.ts
Normal file
3
packages/websocket/src/netty/adapter/index.ts
Normal file
@ -0,0 +1,3 @@
|
||||
export * from './text_websocket_frame'
|
||||
export * from './websocket'
|
||||
export * from './httprequest'
|
24
packages/websocket/src/netty/adapter/text_websocket_frame.ts
Normal file
24
packages/websocket/src/netty/adapter/text_websocket_frame.ts
Normal file
@ -0,0 +1,24 @@
|
||||
const TypeParameterMatcher = Java.type('io.netty.util.internal.TypeParameterMatcher')
|
||||
const TextWebSocketFrameMatcher = TypeParameterMatcher.get(base.getClass('io.netty.handler.codec.http.websocketx.TextWebSocketFrame'))
|
||||
const SimpleChannelInboundHandler = Java.type('io.netty.channel.SimpleChannelInboundHandler')
|
||||
|
||||
export abstract class TextWebSocketFrameHandlerAdapter {
|
||||
private _Handler
|
||||
constructor() {
|
||||
let TextWebSocketFrameHandlerAdapterImpl = Java.extend(SimpleChannelInboundHandler, {
|
||||
userEventTriggered: this.userEventTriggered.bind(this),
|
||||
acceptInboundMessage: (msg: any) => {
|
||||
return TextWebSocketFrameMatcher.match(msg)
|
||||
},
|
||||
channelRead0: this.channelRead0.bind(this),
|
||||
exceptionCaught: this.exceptionCaught.bind(this)
|
||||
})
|
||||
this._Handler = new TextWebSocketFrameHandlerAdapterImpl()
|
||||
}
|
||||
abstract userEventTriggered(ctx: any, evt: any)
|
||||
abstract channelRead0(ctx: any, msg: any)
|
||||
abstract exceptionCaught(ctx: any, cause: Error)
|
||||
getHandler() {
|
||||
return this._Handler
|
||||
}
|
||||
}
|
@ -5,12 +5,14 @@ export abstract class WebSocketHandlerAdapter {
|
||||
constructor() {
|
||||
let ChannelInboundHandlerAdapterImpl = Java.extend(ChannelInboundHandlerAdapter, {
|
||||
channelRead: this.channelRead.bind(this),
|
||||
channelInactive: this.channelInactive.bind(this),
|
||||
channelUnregistered: this.exceptionCaught.bind(this),
|
||||
exceptionCaught: this.exceptionCaught.bind(this)
|
||||
})
|
||||
this._Handler = new ChannelInboundHandlerAdapterImpl()
|
||||
}
|
||||
abstract channelRead(ctx: any, channel: any)
|
||||
abstract channelInactive(ctx: any)
|
||||
abstract channelUnregistered(ctx: any)
|
||||
abstract exceptionCaught(ctx: any, cause: Error)
|
||||
getHandler() {
|
45
packages/websocket/src/netty/client.ts
Normal file
45
packages/websocket/src/netty/client.ts
Normal file
@ -0,0 +1,45 @@
|
||||
import { EventEmitter } from 'events'
|
||||
import { InnerClient } from '../interfaces'
|
||||
import { AttributeKeys } from './constants'
|
||||
|
||||
const TextWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.TextWebSocketFrame')
|
||||
|
||||
export class NettyClient extends EventEmitter implements InnerClient {
|
||||
private _id: string
|
||||
private channel: any
|
||||
|
||||
server: any
|
||||
readyState: string
|
||||
remoteAddress: string
|
||||
upgraded: boolean
|
||||
request: any
|
||||
|
||||
constructor(server: any, channel: any) {
|
||||
super()
|
||||
this.server = server
|
||||
this.readyState = 'open'
|
||||
this.remoteAddress = channel.remoteAddress() + ''
|
||||
this.upgraded = true
|
||||
this.request = channel.attr(AttributeKeys.Request).get()
|
||||
|
||||
this.channel = channel
|
||||
this._id = channel.id() + ''
|
||||
}
|
||||
|
||||
get id() {
|
||||
return this._id
|
||||
}
|
||||
send(text: string) {
|
||||
if (this.readyState == 'open') {
|
||||
this.channel.writeAndFlush(new TextWebSocketFrame(text))
|
||||
} else {
|
||||
console.debug(`send message ${text} to close client ${this._id}`)
|
||||
}
|
||||
}
|
||||
close() {
|
||||
if (this.readyState = 'open') {
|
||||
this.channel.close()
|
||||
this.readyState = 'close'
|
||||
}
|
||||
}
|
||||
}
|
13
packages/websocket/src/netty/constants.ts
Normal file
13
packages/websocket/src/netty/constants.ts
Normal file
@ -0,0 +1,13 @@
|
||||
const AttributeKey = Java.type('io.netty.util.AttributeKey');
|
||||
|
||||
export enum Keys {
|
||||
Detect = "miao_detect",
|
||||
Handler = "miaowebsocket",
|
||||
Default = "DefaultChannelPipeline"
|
||||
}
|
||||
|
||||
let RequestAttributeKey: any
|
||||
try { RequestAttributeKey = AttributeKey.valueOf('request') } catch (error) { }
|
||||
export enum AttributeKeys {
|
||||
Request = RequestAttributeKey
|
||||
}
|
@ -1,20 +1,61 @@
|
||||
const TypeParameterMatcher = Java.type('io.netty.util.internal.TypeParameterMatcher')
|
||||
const SimpleChannelInboundHandler = Java.type('io.netty.channel.SimpleChannelInboundHandler')
|
||||
const FullHttpRequestMatcher = TypeParameterMatcher.get(base.getClass('io.netty.handler.codec.http.FullHttpRequest'))
|
||||
import { HttpRequestHandlerAdapter } from './adapter'
|
||||
import { AttributeKeys } from './constants'
|
||||
import { ServerOptions } from 'socket-io'
|
||||
|
||||
export abstract class HttpRequestHandlerAdapter {
|
||||
private _Handler;
|
||||
constructor() {
|
||||
let HttpRequestHandlerAdapterImpl = Java.extend(SimpleChannelInboundHandler, {
|
||||
acceptInboundMessage: (msg: any) => {
|
||||
return FullHttpRequestMatcher.match(msg)
|
||||
},
|
||||
channelRead0: this.channelRead0.bind(this)
|
||||
})
|
||||
this._Handler = new HttpRequestHandlerAdapterImpl();
|
||||
const DefaultHttpResponse = Java.type('io.netty.handler.codec.http.DefaultHttpResponse')
|
||||
const DefaultFullHttpResponse = Java.type('io.netty.handler.codec.http.DefaultFullHttpResponse')
|
||||
const HttpHeaders = Java.type('io.netty.handler.codec.http.HttpHeaders')
|
||||
const HttpVersion = Java.type('io.netty.handler.codec.http.HttpVersion')
|
||||
const HttpResponseStatus = Java.type('io.netty.handler.codec.http.HttpResponseStatus')
|
||||
const LastHttpContent = Java.type('io.netty.handler.codec.http.LastHttpContent')
|
||||
|
||||
const File = Java.type('java.io.File')
|
||||
const Runnable = Java.type('java.lang.Runnable')
|
||||
const RandomAccessFile = Java.type('java.io.RandomAccessFile')
|
||||
const DefaultFileRegion = Java.type('io.netty.channel.DefaultFileRegion')
|
||||
const ChannelFutureListener = Java.type('io.netty.channel.ChannelFutureListener')
|
||||
|
||||
export class HttpRequestHandler extends HttpRequestHandlerAdapter {
|
||||
private ws: string
|
||||
private root: string
|
||||
constructor(options: ServerOptions) {
|
||||
super()
|
||||
this.root = options.root
|
||||
this.ws = options.path
|
||||
}
|
||||
abstract channelRead0(ctx: any, request: any);
|
||||
getHandler() {
|
||||
return this._Handler;
|
||||
channelRead0(ctx: any, request: any) {
|
||||
if (request.getUri().startsWith(this.ws)) {
|
||||
ctx.channel().attr(AttributeKeys.Request).set(request)
|
||||
ctx.fireChannelRead(request.retain())
|
||||
} else {
|
||||
ctx.executor().execute(new Runnable({
|
||||
run: () => {
|
||||
if (HttpHeaders.is100ContinueExpected(request)) {
|
||||
ctx.writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE))
|
||||
}
|
||||
let filename = request.getUri().split('?')[0].substr(1)
|
||||
let file = new File(this.root, filename || 'index.html')
|
||||
if (!file.exists() || !file.isFile()) {
|
||||
ctx.write(new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.NOT_FOUND))
|
||||
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(ChannelFutureListener.CLOSE)
|
||||
return
|
||||
}
|
||||
let response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK)
|
||||
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html charset=UTF-8")
|
||||
let raf = new RandomAccessFile(file, 'r')
|
||||
let keepAlive = HttpHeaders.isKeepAlive(request)
|
||||
if (keepAlive) {
|
||||
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length())
|
||||
response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE)
|
||||
}
|
||||
ctx.write(response)
|
||||
ctx.write(new DefaultFileRegion(raf.getChannel(), 0, raf.length()))
|
||||
let future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
|
||||
if (!keepAlive) {
|
||||
future.addListener(ChannelFutureListener.CLOSE)
|
||||
}
|
||||
}
|
||||
}))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,3 +1,70 @@
|
||||
export * from './text_websocket_frame'
|
||||
export * from './websocket'
|
||||
export * from './httprequest'
|
||||
import { EventEmitter } from 'events'
|
||||
|
||||
import { ServerOptions } from '../socket-io'
|
||||
import { ServerEvent } from '../socket-io/constants'
|
||||
|
||||
import { NettyClient } from './client'
|
||||
import { Keys } from './constants'
|
||||
import { WebSocketDetect } from './websocket_detect'
|
||||
import { WebSocketHandler } from './websocket_handler'
|
||||
|
||||
class NettyWebSocketServer extends EventEmitter {
|
||||
private pipeline: any
|
||||
private clients: Map<string, NettyClient>
|
||||
|
||||
constructor(pipeline: any, options: ServerOptions) {
|
||||
super()
|
||||
this.clients = new Map()
|
||||
this.pipeline = pipeline
|
||||
let connectEvent = options.event
|
||||
try { this.pipeline.remove(Keys.Detect) } catch (error) { }
|
||||
this.pipeline.addFirst(Keys.Detect, new WebSocketDetect(connectEvent).getHandler())
|
||||
connectEvent.on(ServerEvent.detect, (ctx, channel) => {
|
||||
channel.pipeline().addFirst(Keys.Handler, new WebSocketHandler(options).getHandler())
|
||||
ctx.fireChannelRead(channel)
|
||||
})
|
||||
connectEvent.on(ServerEvent.connect, (ctx) => {
|
||||
let cid = ctx?.channel().id() + ''
|
||||
let nettyClient = new NettyClient(this, ctx.channel())
|
||||
this.clients.set(cid, nettyClient)
|
||||
this.emit(ServerEvent.connect, nettyClient)
|
||||
})
|
||||
connectEvent.on(ServerEvent.message, (ctx, msg) => {
|
||||
let cid = ctx?.channel().id() + ''
|
||||
if (this.clients.has(cid)) {
|
||||
this.emit(ServerEvent.message, this.clients.get(cid), msg.text())
|
||||
} else {
|
||||
console.error(`unknow client ${ctx} reciver message ${msg.text()}`)
|
||||
}
|
||||
})
|
||||
connectEvent.on(ServerEvent.disconnect, (ctx, cause) => {
|
||||
let cid = ctx?.channel().id() + ''
|
||||
if (this.clients.has(cid)) {
|
||||
this.emit(ServerEvent.disconnect, this.clients.get(cid), cause)
|
||||
} else {
|
||||
console.error(`unknow client ${ctx} disconnect cause ${cause}`)
|
||||
}
|
||||
})
|
||||
connectEvent.on(ServerEvent.error, (ctx, cause) => {
|
||||
let cid = ctx?.channel().id() + ''
|
||||
if (this.clients.has(cid)) {
|
||||
this.emit(ServerEvent.error, this.clients.get(cid), cause)
|
||||
} else {
|
||||
console.error(`unknow client ${ctx} cause error ${cause}`)
|
||||
console.ex(cause)
|
||||
}
|
||||
})
|
||||
}
|
||||
close() {
|
||||
if (this.pipeline.names().contains(Keys.Detect)) {
|
||||
this.pipeline.remove(Keys.Detect)
|
||||
}
|
||||
this.clients.forEach(client => client.close())
|
||||
}
|
||||
}
|
||||
|
||||
export {
|
||||
NettyWebSocketServer,
|
||||
ServerEvent,
|
||||
NettyClient
|
||||
}
|
||||
|
@ -1,24 +1,23 @@
|
||||
const TypeParameterMatcher = Java.type('io.netty.util.internal.TypeParameterMatcher')
|
||||
const TextWebSocketFrameMatcher = TypeParameterMatcher.get(base.getClass('io.netty.handler.codec.http.websocketx.TextWebSocketFrame'))
|
||||
const SimpleChannelInboundHandler = Java.type('io.netty.channel.SimpleChannelInboundHandler')
|
||||
import { EventEmitter } from 'events'
|
||||
import { ServerOptions } from '../socket-io'
|
||||
import { ServerEvent } from '../socket-io/constants'
|
||||
import { TextWebSocketFrameHandlerAdapter } from './adapter'
|
||||
|
||||
export abstract class TextWebSocketFrameHandlerAdapter {
|
||||
private _Handler
|
||||
constructor() {
|
||||
let TextWebSocketFrameHandlerAdapterImpl = Java.extend(SimpleChannelInboundHandler, {
|
||||
userEventTriggered: this.userEventTriggered.bind(this),
|
||||
acceptInboundMessage: (msg: any) => {
|
||||
return TextWebSocketFrameMatcher.match(msg)
|
||||
},
|
||||
channelRead0: this.channelRead0.bind(this),
|
||||
exceptionCaught: this.exceptionCaught.bind(this)
|
||||
})
|
||||
this._Handler = new TextWebSocketFrameHandlerAdapterImpl()
|
||||
export class TextWebSocketFrameHandler extends TextWebSocketFrameHandlerAdapter {
|
||||
private event: EventEmitter
|
||||
constructor(options: ServerOptions) {
|
||||
super()
|
||||
this.event = options.event
|
||||
}
|
||||
abstract userEventTriggered(ctx: any, evt: any)
|
||||
abstract channelRead0(ctx: any, msg: any)
|
||||
abstract exceptionCaught(ctx: any, cause: Error)
|
||||
getHandler() {
|
||||
return this._Handler
|
||||
userEventTriggered(ctx: any, evt: any) {
|
||||
if (evt == 'HANDSHAKE_COMPLETE') {
|
||||
this.event.emit(ServerEvent.connect, ctx)
|
||||
}
|
||||
}
|
||||
channelRead0(ctx: any, msg: any) {
|
||||
this.event.emit(ServerEvent.message, ctx, msg)
|
||||
}
|
||||
exceptionCaught(ctx: any, cause: Error) {
|
||||
this.event.emit(ServerEvent.error, ctx, cause)
|
||||
}
|
||||
}
|
||||
|
27
packages/websocket/src/netty/websocket_detect.ts
Normal file
27
packages/websocket/src/netty/websocket_detect.ts
Normal file
@ -0,0 +1,27 @@
|
||||
import { EventEmitter } from 'events'
|
||||
import { WebSocketHandlerAdapter } from "./adapter"
|
||||
import { ServerEvent } from '../socket-io/constants'
|
||||
|
||||
export class WebSocketDetect extends WebSocketHandlerAdapter {
|
||||
private event: EventEmitter
|
||||
constructor(event: EventEmitter) {
|
||||
super()
|
||||
this.event = event
|
||||
}
|
||||
channelRead(ctx: any, channel: any) {
|
||||
this.event.emit(ServerEvent.detect, ctx, channel)
|
||||
}
|
||||
channelInactive(ctx: any) {
|
||||
console.debug('WebSocketDetect channelUnregistered ' + ctx)
|
||||
this.event.emit(ServerEvent.disconnect, ctx, 'client disconnect')
|
||||
ctx.channelInactive()
|
||||
}
|
||||
channelUnregistered(ctx: any) {
|
||||
console.debug('WebSocketDetect channelUnregistered ' + ctx)
|
||||
this.event.emit(ServerEvent.disconnect, ctx, 'client disconnect')
|
||||
ctx.fireChannelUnregistered()
|
||||
}
|
||||
exceptionCaught(ctx: any, cause: Error) {
|
||||
this.event.emit(ServerEvent.error, ctx, cause)
|
||||
}
|
||||
}
|
59
packages/websocket/src/netty/websocket_handler.ts
Normal file
59
packages/websocket/src/netty/websocket_handler.ts
Normal file
@ -0,0 +1,59 @@
|
||||
import { ServerOptions } from '../socket-io'
|
||||
import { ServerEvent } from '../socket-io/constants'
|
||||
|
||||
import { Keys } from './constants'
|
||||
import { HttpRequestHandler } from './httprequest'
|
||||
import { WebSocketHandlerAdapter } from "./adapter"
|
||||
import { TextWebSocketFrameHandler } from './text_websocket_frame'
|
||||
|
||||
const CharsetUtil = Java.type('io.netty.util.CharsetUtil')
|
||||
const HttpServerCodec = Java.type('io.netty.handler.codec.http.HttpServerCodec')
|
||||
const ChunkedWriteHandler = Java.type('io.netty.handler.stream.ChunkedWriteHandler')
|
||||
const HttpObjectAggregator = Java.type('io.netty.handler.codec.http.HttpObjectAggregator')
|
||||
const WebSocketServerProtocolHandler = Java.type('io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler')
|
||||
|
||||
export class WebSocketHandler extends WebSocketHandlerAdapter {
|
||||
private options: ServerOptions
|
||||
constructor(options: ServerOptions) {
|
||||
super()
|
||||
this.options = options
|
||||
}
|
||||
channelRead(ctx: any, msg: any) {
|
||||
msg.markReaderIndex()
|
||||
let message: string = msg.toString(CharsetUtil.UTF_8)
|
||||
let channel = ctx.channel()
|
||||
let pipeline = channel.pipeline()
|
||||
if (message.indexOf('HTTP/1.1') > 0) {
|
||||
pipeline.names().forEach(f => {
|
||||
if (f == Keys.Handler || f.indexOf(Keys.Default) > -1) { return }
|
||||
pipeline.remove(f)
|
||||
})
|
||||
pipeline.addLast('http', new HttpServerCodec())
|
||||
pipeline.addLast('chunk', new ChunkedWriteHandler())
|
||||
pipeline.addLast('httpobj', new HttpObjectAggregator(64 * 1024))
|
||||
pipeline.addLast('http_request', new HttpRequestHandler(this.options).getHandler())
|
||||
// this.options.path, null, false, 655360, false, true, false, 10000
|
||||
pipeline.addLast('websocket', new WebSocketServerProtocolHandler(this.options.path, true))
|
||||
pipeline.addLast('websocket_handler', new TextWebSocketFrameHandler(this.options).getHandler())
|
||||
}
|
||||
pipeline.remove(Keys.Handler)
|
||||
msg.resetReaderIndex()
|
||||
ctx.fireChannelRead(msg)
|
||||
}
|
||||
|
||||
channelInactive(ctx: any) {
|
||||
console.debug('WebSocketHandler channelInactive ' + ctx)
|
||||
this.options.event.emit(ServerEvent.disconnect, ctx, 'client disconnect')
|
||||
ctx.channelInactive()
|
||||
}
|
||||
|
||||
channelUnregistered(ctx: any) {
|
||||
console.debug('WebSocketHandler channelUnregistered ' + ctx)
|
||||
this.options.event.emit(ServerEvent.disconnect, ctx, 'client disconnect')
|
||||
ctx.fireChannelUnregistered()
|
||||
}
|
||||
|
||||
exceptionCaught(ctx: any, cause: Error) {
|
||||
this.options.event.emit(ServerEvent.error, ctx, cause)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user