feat: 完善client相关功能 重构server部分

Signed-off-by: MiaoWoo <admin@yumc.pw>
This commit is contained in:
2021-08-14 12:43:20 +08:00
parent 774763be13
commit 53502e12cf
34 changed files with 2893 additions and 20 deletions

View File

@@ -49,9 +49,6 @@ export abstract class WebSocketServer extends EventEmitter {
this.execute(handler, (websocket) => websocket.emit(ServerEvent.disconnect, cause))
}
protected onerror(handler: any, error: Error) {
if (global.debug) {
console.ex(error)
}
this.execute(handler, (websocket) => websocket.emit(ServerEvent.error, error))
}
protected execute(handler: any, callback: (websocket: WebSocketClient) => void) {
@@ -82,9 +79,9 @@ export const attach = (instance, options) => {
}, options)
let WebSocketServerImpl = undefined
if (instance.class.name.startsWith('io.netty.channel')) {
WebSocketServerImpl = require("../netty").NettyWebSocketServer
WebSocketServerImpl = require("./netty").NettyWebSocketServer
} else {
WebSocketServerImpl = require("../tomcat").TomcatWebSocketServer
WebSocketServerImpl = require("./tomcat").TomcatWebSocketServer
}
return new WebSocketServerImpl(instance, options)
}

View File

@@ -0,0 +1,21 @@
const TypeParameterMatcher = Java.type('io.netty.util.internal.TypeParameterMatcher')
const SimpleChannelInboundHandler = Java.type('io.netty.channel.SimpleChannelInboundHandler')
const FullHttpRequestMatcher = TypeParameterMatcher.get(base.getClass('io.netty.handler.codec.http.FullHttpRequest'))
export abstract class HttpRequestHandlerAdapter {
private _Handler
constructor() {
let HttpRequestHandlerAdapterImpl = Java.extend(SimpleChannelInboundHandler, {
isSharable: () => true,
acceptInboundMessage: (msg: any) => {
return FullHttpRequestMatcher.match(msg)
},
channelRead0: this.channelRead0.bind(this)
})
this._Handler = new HttpRequestHandlerAdapterImpl()
}
abstract channelRead0(ctx: any, request: any)
getHandler() {
return this._Handler
}
}

View File

@@ -0,0 +1,3 @@
export * from './text_websocket_frame'
export * from './websocket'
export * from './httprequest'

View File

@@ -0,0 +1,25 @@
const TypeParameterMatcher = Java.type('io.netty.util.internal.TypeParameterMatcher')
const TextWebSocketFrameMatcher = TypeParameterMatcher.get(base.getClass('io.netty.handler.codec.http.websocketx.TextWebSocketFrame'))
const SimpleChannelInboundHandler = Java.type('io.netty.channel.SimpleChannelInboundHandler')
export abstract class TextWebSocketFrameHandlerAdapter {
private _Handler
constructor() {
let TextWebSocketFrameHandlerAdapterImpl = Java.extend(SimpleChannelInboundHandler, {
isSharable: () => true,
userEventTriggered: this.userEventTriggered.bind(this),
acceptInboundMessage: (msg: any) => {
return TextWebSocketFrameMatcher.match(msg)
},
channelRead0: this.channelRead0.bind(this),
exceptionCaught: this.exceptionCaught.bind(this)
})
this._Handler = new TextWebSocketFrameHandlerAdapterImpl()
}
abstract userEventTriggered(ctx: any, evt: any)
abstract channelRead0(ctx: any, msg: any)
abstract exceptionCaught(ctx: any, cause: Error)
getHandler() {
return this._Handler
}
}

View File

@@ -0,0 +1,26 @@
const ChannelInboundHandlerAdapter = Java.type('io.netty.channel.ChannelInboundHandlerAdapter')
export abstract class WebSocketHandlerAdapter {
private _Handler
constructor() {
let ChannelInboundHandlerAdapterImpl = Java.extend(ChannelInboundHandlerAdapter, {
isSharable: () => true,
channelRead: this.channelRead.bind(this),
channelInactive: this.channelInactive.bind(this),
channelUnregistered: this.exceptionCaught.bind(this),
exceptionCaught: this.exceptionCaught.bind(this)
})
this._Handler = new ChannelInboundHandlerAdapterImpl()
}
abstract channelRead(ctx: any, channel: any)
channelInactive(ctx: any) {
ctx.fireChannelInactive()
}
channelUnregistered(ctx: any) {
ctx.fireChannelUnregistered()
}
abstract exceptionCaught(ctx: any, cause: Error)
getHandler() {
return this._Handler
}
}

View File

@@ -0,0 +1,26 @@
import { WebSocketClient } from '../client'
const TextWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.TextWebSocketFrame')
export class NettyClient extends WebSocketClient {
private channel: any
constructor(channel: any) {
super()
this.id = channel.id() + ''
this.channel = channel
}
send(text: string, opts?: any, callback?: (err?: Error) => void) {
try {
this.channel.writeAndFlush(new TextWebSocketFrame(text))
callback?.()
} catch (error) {
callback?.(error)
}
}
close() {
this.channel.close()
}
}

View File

@@ -0,0 +1,13 @@
const AttributeKey = Java.type('io.netty.util.AttributeKey');
export enum Keys {
Detect = "miao_detect",
Handler = "miaowebsocket",
Default = "DefaultChannelPipeline"
}
let RequestAttributeKey: any
try { RequestAttributeKey = AttributeKey.valueOf('request') } catch (error) { }
export enum AttributeKeys {
Request = RequestAttributeKey
}

View File

@@ -0,0 +1,62 @@
import { HttpRequestHandlerAdapter } from './adapter'
import { AttributeKeys } from './constants'
import type { JavaServerOptions } from '../'
const DefaultHttpResponse = Java.type('io.netty.handler.codec.http.DefaultHttpResponse')
const DefaultFullHttpResponse = Java.type('io.netty.handler.codec.http.DefaultFullHttpResponse')
const HttpHeaders = Java.type('io.netty.handler.codec.http.HttpHeaders')
const HttpVersion = Java.type('io.netty.handler.codec.http.HttpVersion')
const HttpResponseStatus = Java.type('io.netty.handler.codec.http.HttpResponseStatus')
const LastHttpContent = Java.type('io.netty.handler.codec.http.LastHttpContent')
const File = Java.type('java.io.File')
const Runnable = Java.type('java.lang.Runnable')
const RandomAccessFile = Java.type('java.io.RandomAccessFile')
const DefaultFileRegion = Java.type('io.netty.channel.DefaultFileRegion')
const ChannelFutureListener = Java.type('io.netty.channel.ChannelFutureListener')
export class HttpRequestHandler extends HttpRequestHandlerAdapter {
private ws: string
private root: string
constructor(options: JavaServerOptions) {
super()
this.root = options.root
this.ws = options.path
}
channelRead0(ctx: any, request: any) {
if (request.getUri().startsWith(this.ws)) {
ctx.channel().attr(AttributeKeys.Request).set(request)
ctx.fireChannelRead(request.retain())
} else {
ctx.executor().execute(new Runnable({
run: () => {
if (HttpHeaders.is100ContinueExpected(request)) {
ctx.writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE))
}
let filename = request.getUri().split('?')[0].substr(1)
let file = new File(this.root, filename || 'index.html')
if (!file.exists() || !file.isFile()) {
ctx.write(new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.NOT_FOUND))
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(ChannelFutureListener.CLOSE)
return
}
let response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK)
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html charset=UTF-8")
let raf = new RandomAccessFile(file, 'r')
let keepAlive = HttpHeaders.isKeepAlive(request)
if (keepAlive) {
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length())
response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE)
}
ctx.write(response)
ctx.write(new DefaultFileRegion(raf.getChannel(), 0, raf.length()))
let future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
if (!keepAlive) {
future.addListener(ChannelFutureListener.CLOSE)
}
}
}))
}
}
}

View File

@@ -0,0 +1,76 @@
import { ServerEvent, WebSocketServer } from '../'
import { Request } from '../request'
import { NettyClient } from './client'
import { AttributeKeys, Keys } from './constants'
import { WebSocketDetect } from './websocket_detect'
import { WebSocketHandler } from './websocket_handler'
import type { JavaServerOptions } from '../'
class NettyWebSocketServer extends WebSocketServer {
constructor(pipeline: any, options: JavaServerOptions) {
super(pipeline, options)
}
protected initialize() {
let connectEvent = this.options.event
try { this.instance.remove(Keys.Detect) } catch (error) { }
this.instance.addFirst(Keys.Detect, new WebSocketDetect(connectEvent).getHandler())
connectEvent.on(ServerEvent.detect, (ctx, channel) => {
channel.pipeline().addFirst(Keys.Handler, new WebSocketHandler(this.options).getHandler())
ctx.fireChannelRead(channel)
})
connectEvent.on(ServerEvent.connect, (ctx) => {
this.onconnect(ctx)
})
connectEvent.on(ServerEvent.message, (ctx, msg) => {
this.onmessage(ctx, msg.text())
})
connectEvent.on(ServerEvent.disconnect, (ctx, cause) => {
this.ondisconnect(ctx, cause)
})
connectEvent.on(ServerEvent.error, (ctx, error) => {
this.onerror(ctx, error)
})
}
protected getId(ctx: any) {
try {
return ctx.channel().id() + ''
} catch (error) {
console.log(Object.toString.apply(ctx))
console.ex(error)
}
}
protected getRequest(ctx) {
let channel = ctx.channel()
let req = channel.attr(AttributeKeys.Request).get()
let headers = {}
let nativeHeaders = req.headers()
nativeHeaders.forEach(function (header) {
headers[header.getKey()] = header.getValue()
})
let request = new Request(req.uri(), req.method().name(), headers)
request.connection = {
remoteAddress: channel.remoteAddress() + ''
}
return request
}
protected getSocket(ctx) {
return new NettyClient(ctx.channel())
}
protected doClose() {
if (this.instance.names().contains(Keys.Detect)) {
this.instance.remove(Keys.Detect)
}
}
}
export {
NettyWebSocketServer,
NettyClient
}

View File

@@ -0,0 +1,25 @@
import { EventEmitter } from 'events'
import { ServerEvent } from '../'
import { TextWebSocketFrameHandlerAdapter } from './adapter'
import type { JavaServerOptions } from '../'
export class TextWebSocketFrameHandler extends TextWebSocketFrameHandlerAdapter {
private event: EventEmitter
constructor(options: JavaServerOptions) {
super()
this.event = options.event
}
userEventTriggered(ctx: any, evt: any) {
if (evt == 'HANDSHAKE_COMPLETE') {
this.event.emit(ServerEvent.connect, ctx)
}
}
channelRead0(ctx: any, msg: any) {
this.event.emit(ServerEvent.message, ctx, msg)
}
exceptionCaught(ctx: any, cause: Error) {
this.event.emit(ServerEvent.error, ctx, cause)
}
}

View File

@@ -0,0 +1,18 @@
import { EventEmitter } from 'events'
import { WebSocketHandlerAdapter } from "./adapter"
import { ServerEvent } from '../'
export class WebSocketDetect extends WebSocketHandlerAdapter {
private event: EventEmitter
constructor(event: EventEmitter) {
super()
this.event = event
}
channelRead(ctx: any, channel: any) {
this.event.emit(ServerEvent.detect, ctx, channel)
}
exceptionCaught(ctx: any, cause: Error) {
this.event.emit(ServerEvent.error, ctx, cause)
}
}

View File

@@ -0,0 +1,58 @@
import { ServerEvent } from '../'
import { Keys } from './constants'
import { HttpRequestHandler } from './httprequest'
import { WebSocketHandlerAdapter } from "./adapter"
import { TextWebSocketFrameHandler } from './text_websocket_frame'
import type { JavaServerOptions } from '../'
const CharsetUtil = Java.type('io.netty.util.CharsetUtil')
const HttpServerCodec = Java.type('io.netty.handler.codec.http.HttpServerCodec')
const ChunkedWriteHandler = Java.type('io.netty.handler.stream.ChunkedWriteHandler')
const HttpObjectAggregator = Java.type('io.netty.handler.codec.http.HttpObjectAggregator')
const WebSocketServerProtocolHandler = Java.type('io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler')
export class WebSocketHandler extends WebSocketHandlerAdapter {
private options: JavaServerOptions
constructor(options: JavaServerOptions) {
super()
this.options = options
}
channelRead(ctx: any, msg: any) {
msg.markReaderIndex()
let message: string = msg.toString(CharsetUtil.UTF_8)
let channel = ctx.channel()
let pipeline = channel.pipeline()
if (message.indexOf('HTTP/1.1') > 0) {
pipeline.names().forEach(f => {
if (f == Keys.Handler || f.indexOf(Keys.Default) > -1) { return }
pipeline.remove(f)
})
pipeline.addLast('http', new HttpServerCodec())
pipeline.addLast('chunk', new ChunkedWriteHandler())
pipeline.addLast('httpobj', new HttpObjectAggregator(64 * 1024))
pipeline.addLast('http_request', new HttpRequestHandler(this.options).getHandler())
// this.options.path, null, false, 655360, false, true, false, 10000
pipeline.addLast('websocket', new WebSocketServerProtocolHandler(this.options.path, true))
pipeline.addLast('websocket_handler', new TextWebSocketFrameHandler(this.options).getHandler())
}
pipeline.remove(Keys.Handler)
msg.resetReaderIndex()
ctx.fireChannelRead(msg)
}
channelInactive(ctx: any) {
this.options.event.emit(ServerEvent.disconnect, ctx, 'netty channelInactive')
super.channelInactive(ctx)
}
channelUnregistered(ctx: any) {
this.options.event.emit(ServerEvent.disconnect, ctx, 'netty channelUnregistered')
super.channelUnregistered(ctx)
}
exceptionCaught(ctx: any, cause: Error) {
this.options.event.emit(ServerEvent.error, ctx, cause)
}
}

View File

@@ -0,0 +1,24 @@
import { WebSocketClient } from '../client'
export class TomcatClient extends WebSocketClient {
private session: javax.websocket.Session
constructor(session: javax.websocket.Session) {
super()
this.id = session.getId() + ''
this.session = session
}
send(text: string, opts?: any, callback?: (err?: Error) => void) {
Java.synchronized(() => {
try {
this.session.getBasicRemote().sendText(text)
callback?.()
} catch (error) {
callback?.(error)
}
}, this.session)()
}
close() {
this.session.close()
}
}

View File

@@ -0,0 +1 @@
export const ProxyBeanName = "webSocketServerProxy"

View File

@@ -0,0 +1,75 @@
import { JavaServerOptions, WebSocketServer } from '../'
import { Request } from '../request'
import { TomcatClient } from './client'
import { ProxyBeanName } from './constants'
const ThreadPoolExecutor = Java.type('java.util.concurrent.ThreadPoolExecutor')
type TomcatWebSocketSession = javax.websocket.Session
class TomcatWebSocketServer extends WebSocketServer {
private executor: any
constructor(beanFactory: any, options: JavaServerOptions) {
super(beanFactory, options)
}
protected initialize(): void {
this.initThreadPool()
try { this.instance.destroySingleton(ProxyBeanName) } catch (error) { }
let NashornWebSocketServerProxy = Java.extend(Java.type("pw.yumc.MiaoScript.websocket.WebSocketProxy"), {
onOpen: (session: TomcatWebSocketSession) => {
this.onconnect(session)
},
onMessage: (session: TomcatWebSocketSession, message: string) => {
this.onmessage(session, message)
},
onClose: (session: TomcatWebSocketSession, reason: any) => {
this.ondisconnect(session, reason)
},
onError: (session: TomcatWebSocketSession, error: Error) => {
this.onerror(session, error)
},
})
this.instance.registerSingleton(ProxyBeanName, new NashornWebSocketServerProxy())
}
protected getId(session) {
return session?.getId() + ''
}
protected getRequest(session) {
let request = new Request(session.getRequestURI(), "GET")
request.connection = {
remoteAddress: ''
}
return request
}
protected getSocket(session) {
return new TomcatClient(session)
}
protected doClose() {
this.instance.destroySingleton(ProxyBeanName)
this.executor.shutdown()
}
private initThreadPool() {
const ThreadPoolTaskExecutor = Java.type('org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor')
this.executor = new ThreadPoolTaskExecutor()
this.executor.setCorePoolSize(10)
this.executor.setMaxPoolSize(100)
this.executor.setQueueCapacity(500)
this.executor.setKeepAliveSeconds(60)
this.executor.setThreadNamePrefix("@ccms/websocket-")
this.executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy())
this.executor.initialize()
}
}
export {
TomcatWebSocketServer,
TomcatClient
}