feat: add adapter and socket

Signed-off-by: MiaoWoo <admin@yumc.pw>
This commit is contained in:
MiaoWoo 2020-03-23 00:58:53 +08:00
parent ace2c48ae2
commit f4b461409b
10 changed files with 429 additions and 172 deletions

View File

@ -1,26 +1,29 @@
import { EventEmitter } from 'events'
import { SocketIO } from 'socket-io/interfaces';
const TextWebSocketFrame = Java.type('io.netty.handler.codec.http.websocketx.TextWebSocketFrame')
export class NettyClient {
private event: EventEmitter
export class NettyClient extends EventEmitter implements SocketIO.EngineSocket {
private _id: string;
private channel: any
constructor(channel: any) {
server: any;
readyState: string;
remoteAddress: string;
upgraded: boolean;
request: any;
transport: any;
constructor(server: any, channel: any) {
super();
this.server = server;
this.channel = channel;
this._id = channel.id();
this.event = new EventEmitter();
}
get id() {
return this._id;
}
on(event: string, callback: (...args: any[]) => void) {
this.event.on(event, callback);
}
emit(event: string, text: string) {
this.event.emit(event, text);
}
send(text: string) {
this.channel.writeAndFlush(new TextWebSocketFrame(text))
}

View File

@ -0,0 +1,6 @@
import { EventEmitter } from 'events'
export interface NettyWebSocketServerOptions {
event: EventEmitter,
path?: string;
}

View File

@ -4,10 +4,7 @@ import { ServerEvent, Keys } from './constants'
import { WebSocketDetect } from './websocket_detect'
import { WebSocketHandler } from './websocket_handler'
import { NettyClient } from './client'
interface NettyWebSocketServerOptions {
path?: string;
}
import { NettyWebSocketServerOptions } from './config'
class NettyWebSocketServer {
private event: EventEmitter
@ -18,20 +15,20 @@ class NettyWebSocketServer {
this.event = new EventEmitter();
this.allClients = {};
this.pipeline = pipeline;
let connectEvent = new EventEmitter();
let connectEvent = options.event;
this.pipeline.addFirst(Keys.Detect, new WebSocketDetect(connectEvent).getHandler())
connectEvent.on(ServerEvent.detect, (ctx, channel) => {
channel.pipeline().addFirst(Keys.Handler, new WebSocketHandler(connectEvent).getHandler())
channel.pipeline().addFirst(Keys.Handler, new WebSocketHandler(options).getHandler())
ctx.fireChannelRead(channel)
})
connectEvent.on(ServerEvent.connect, (ctx) => {
let nettyClient = new NettyClient(ctx.channel());
let nettyClient = new NettyClient(this, ctx.channel());
this.allClients[nettyClient.id] = nettyClient;
this.event.emit(ServerEvent.connect, nettyClient);
})
connectEvent.on(ServerEvent.message, (ctx, msg) => {
let channel = ctx.channel();
this.allClients[channel.id()]?.emit(ServerEvent.message, msg.text())
this.event.emit(ServerEvent.message, this.allClients[channel.id()], msg.text())
})
}

View File

@ -1,12 +1,13 @@
import { TextWebSocketFrameHandlerAdapter } from '../netty'
import { EventEmitter } from 'events'
import { ServerEvent } from './constants'
import { NettyWebSocketServerOptions } from './config';
export class TextWebSocketFrameHandler extends TextWebSocketFrameHandlerAdapter {
private event: EventEmitter;
constructor(event: EventEmitter) {
constructor(options: NettyWebSocketServerOptions) {
super()
this.event = event;
this.event = options.event;
}
userEventTriggered(ctx: any, evt: any) {
if (evt == 'HANDSHAKE_COMPLETE') {

View File

@ -4,6 +4,7 @@ import { Keys } from './constants'
import { WebSocketHandlerAdapter } from "../netty"
import { HttpRequestHandler } from './httprequest'
import { TextWebSocketFrameHandler } from './text_websocket_frame'
import { NettyWebSocketServerOptions } from './config'
const CharsetUtil = Java.type('io.netty.util.CharsetUtil')
@ -13,10 +14,10 @@ const HttpObjectAggregator = Java.type('io.netty.handler.codec.http.HttpObjectAg
const WebSocketServerProtocolHandler = Java.type('io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler')
export class WebSocketHandler extends WebSocketHandlerAdapter {
private event: EventEmitter;
constructor(event: EventEmitter) {
private options: NettyWebSocketServerOptions;
constructor(options: NettyWebSocketServerOptions) {
super()
this.event = event;
this.options = options;
}
channelRead(ctx: any, msg: any) {
msg.markReaderIndex()
@ -32,8 +33,8 @@ export class WebSocketHandler extends WebSocketHandlerAdapter {
pipeline.addLast('chunk', new ChunkedWriteHandler())
pipeline.addLast('httpobj', new HttpObjectAggregator(64 * 1024))
pipeline.addLast('http_request', new HttpRequestHandler().getHandler())
pipeline.addLast('websocket', new WebSocketServerProtocolHandler("/ws", true))
pipeline.addLast('websocket_handler', new TextWebSocketFrameHandler(this.event).getHandler())
pipeline.addLast('websocket', new WebSocketServerProtocolHandler(this.options.path, true))
pipeline.addLast('websocket_handler', new TextWebSocketFrameHandler(this.options).getHandler())
}
pipeline.remove(Keys.Handler)
msg.resetReaderIndex()

View File

@ -0,0 +1,124 @@
import { EventEmitter } from 'events'
import { SocketIO } from './interfaces';
import { Namespace } from './namespace';
import { Parser } from './parser';
export class Adapter extends EventEmitter implements SocketIO.Adapter {
nsp: SocketIO.Namespace;
rooms: Rooms;
sids: { [id: string]: { [room: string]: boolean; }; };
parser: Parser
constructor(nsp: Namespace) {
super()
this.parser = nsp.server.parser;
}
add(id: string, room: string, callback?: (err?: any) => void): void {
return this.addAll(id, [room], callback);
}
/**
* Adds a socket to a list of room.
*
* @param {String} socket id
* @param {String} rooms
* @param {Function} callback
* @api public
*/
addAll(id, rooms, fn) {
for (var i = 0; i < rooms.length; i++) {
var room = rooms[i];
this.sids[id] = this.sids[id] || {};
this.sids[id][room] = true;
this.rooms[room] = this.rooms[room] || new Room();
this.rooms[room].add(id);
}
fn && fn.bind(null, null)
};
del(id: string, room: string, callback?: (err?: any) => void): void {
if (this.sids[id]) delete this.sids[id][room];
if (this.rooms.hasOwnProperty(room)) {
this.rooms[room].del(id);
if (this.rooms[room].length === 0) delete this.rooms[room];
}
callback && callback.bind(null, null)
}
delAll(id: string): void {
var rooms = this.sids[id];
if (rooms) {
for (var room in rooms) {
if (this.rooms.hasOwnProperty(room)) {
this.rooms[room].del(id);
if (this.rooms[room].length === 0) delete this.rooms[room];
}
}
}
delete this.sids[id];
}
broadcast(packet: any, opts: { rooms?: string[]; except?: string[]; flags?: { [flag: string]: boolean; }; }): void {
var rooms = opts.rooms || [];
var except = opts.except || [];
var flags = opts.flags || {};
var packetOpts = {
preEncoded: true,
volatile: flags.volatile,
compress: flags.compress
};
var ids = {};
var self = this;
var socket;
packet.nsp = this.nsp.name;
let encodedPackets = this.parser.encode(packet)
if (rooms.length) {
for (var i = 0; i < rooms.length; i++) {
var room = self.rooms[rooms[i]];
if (!room) continue;
var sockets = room.sockets;
for (var id in sockets) {
if (sockets.hasOwnProperty(id)) {
if (ids[id] || ~except.indexOf(id)) continue;
socket = self.nsp.connected[id];
if (socket) {
socket.packet(encodedPackets, packetOpts);
ids[id] = true;
}
}
}
}
} else {
for (var id in self.sids) {
if (self.sids.hasOwnProperty(id)) {
if (~except.indexOf(id)) continue;
socket = self.nsp.connected[id];
if (socket) socket.packet(encodedPackets, packetOpts);
}
}
}
}
}
class Rooms implements SocketIO.Rooms {
[room: string]: Room;
}
class Room implements SocketIO.Room {
sockets: { [id: string]: boolean; };
length: number;
constructor() {
this.sockets = {};
this.length = 0;
}
add(id) {
if (!this.sockets.hasOwnProperty(id)) {
this.sockets[id] = true;
this.length++;
}
}
del(id) {
if (this.sockets.hasOwnProperty(id)) {
delete this.sockets[id];
this.length--;
}
}
}

View File

@ -7,9 +7,9 @@ import { SocketIO } from './interfaces'
const parser = new Parser();
export class SocketIOClient implements SocketIO.Client {
export class Client extends EventEmitter implements SocketIO.Client {
private nettyClient: NettyClient;
private event: EventEmitter
private event: EventEmitter;
private _id: string;
server: SocketIO.Server;
@ -18,67 +18,37 @@ export class SocketIOClient implements SocketIO.Client {
sockets: { [id: string]: SocketIO.Socket; };
nsps: { [nsp: string]: SocketIO.Socket; };
constructor(nettyClient: NettyClient) {
this.event = new EventEmitter()
constructor(server: SocketIO.Server, nettyClient: NettyClient) {
super();
this.server = server;
this.event = new EventEmitter();
this.conn = nettyClient;
this.nettyClient = nettyClient;
this._id = this.nettyClient.id;
this.event.emit('connect', this);
this.nettyClient.on(ServerEvent.message, (text) => this.process(text))
}
get id() {
return this._id;
}
connect() {
this.packet({
type: PacketTypes.OPEN,
data: {
sid: this._id,
upgrades: [],
pingInterval: 25000,
pingTimeout: 5000
on(event: string, callback: (...args: any[]) => void) {
this.event.on(event, callback);
return this
}
})
this.packet({
type: PacketTypes.MESSAGE,
sub_type: SubPacketTypes.CONNECT
})
}
emit(event: string, data: any) {
emit(event: string, ...args: any[]): boolean {
this.packet({
type: PacketTypes.MESSAGE,
sub_type: SubPacketTypes.EVENT,
name: event,
data
data: args[0]
})
return true;
}
send(data: any) {
this.emit("message", data);
}
process(packet: Packet) {
this.event.emit(packet.name, packet.data);
}
packet(packet: Packet) {
this.nettyClient.send(parser.encode(packet))
}
private process(text: string) {
let packet = parser.decode(text);
switch (packet.type) {
case PacketTypes.OPEN:
break;
case PacketTypes.PING:
this.packet({
type: PacketTypes.PONG
})
break;
case PacketTypes.MESSAGE:
switch (packet.sub_type) {
case SubPacketTypes.CONNECT:
this.nettyClient.send(text);
break;
case SubPacketTypes.EVENT:
this.event.emit(packet.name, packet.data);
break;
}
break;
}
}
}

View File

@ -4,35 +4,37 @@ import { NettyWebSocketServer } from '../server'
import { ServerEvent } from '../server/constants';
import { Namespace } from './namespace';
import { SocketIOClient } from './client';
import { Client } from './client';
import { SocketIO } from './interfaces'
import { Parser } from './parser'
import { PacketTypes, SubPacketTypes } from './types';
import { Packet } from './packet';
import { Socket } from './socket';
import { Adapter } from './adapter';
export class SocketIOServer implements SocketIO.Server {
class Server implements SocketIO.Server {
private event: EventEmitter;
private nettyServer: NettyWebSocketServer;
private allClients: { [key: string]: SocketIOClient };
private namespaces: { [key: string]: Namespace };
private allClients: { [key: string]: Client };
engine: { ws: any; };
nsps: { [namespace: string]: SocketIO.Namespace; };
sockets: SocketIO.Namespace;
nsps: { [namespace: string]: Namespace; };
sockets: Namespace;
json: SocketIO.Server;
volatile: SocketIO.Server;
local: SocketIO.Server;
parser = new Parser();
_adapter: Adapter;
options: SocketIO.ServerOptions;
constructor(pipeline: any, options: SocketIO.ServerOptions) {
if (!pipeline) { throw new Error('Netty Pipeline can\'t be undefiend!') }
this.event = new EventEmitter();
this.allClients = {};
this.namespaces = {};
this.nettyServer = new NettyWebSocketServer(pipeline, {
});
this.nettyServer.on(ServerEvent.connect, (nettyClient) => {
let client = new SocketIOClient(nettyClient);
this.allClients[client.id] = client;
client.connect();
})
this.nsps = {};
this.sockets = new Namespace('/');
this.nsps['/'] = this.sockets;
this.initNettyServer(pipeline, options);
}
checkRequest(req: any, fn: (err: any, success: boolean) => void): void {
@ -46,12 +48,21 @@ export class SocketIOServer implements SocketIO.Server {
path(): string;
path(v: string): SocketIO.Server;
path(v?: any): string | SocketIO.Server {
if (!arguments.length) return this.options.path;
this.options.path = v.replace(/\/$/, '');
return this;
}
adapter();
adapter(): Adapter;
adapter(v: any): SocketIO.Server;
adapter(v?: any): SocketIO.Server {
throw new Error("Method not implemented.");
adapter(v?: any): Adapter | SocketIO.Server {
if (!arguments.length) return this._adapter;
this._adapter = v;
for (var i in this.nsps) {
if (this.nsps.hasOwnProperty(i)) {
this.nsps[i].initAdapter();
}
}
return this;
}
origins(): string | string[];
origins(v: string | string[]): SocketIO.Server;
@ -73,10 +84,23 @@ export class SocketIOServer implements SocketIO.Server {
throw new Error("Method not implemented.");
}
onconnection(socket: any): SocketIO.Server {
throw new Error("Method not implemented.");
socket.packet({
type: PacketTypes.OPEN,
data: {
sid: socket.id,
upgrades: [],
pingInterval: 25000,
pingTimeout: 5000
}
of(nsp: string | Function | RegExp): SocketIO.Namespace {
throw new Error("Method not implemented.");
})
this.sockets.add(socket);
return this;
}
of(nsp: string): SocketIO.Namespace {
if (!this.nsps[nsp]) {
this.nsps[nsp] = new Namespace(nsp);
}
return this.nsps[nsp];
}
close(fn?: () => void): void {
throw new Error("Method not implemented.");
@ -85,7 +109,8 @@ export class SocketIOServer implements SocketIO.Server {
on(event: "connect", listener: (socket: SocketIO.Socket) => void): SocketIO.Namespace;
on(event: string, listener: Function): SocketIO.Namespace;
on(event: any, listener: any): SocketIO.Namespace {
throw new Error("Method not implemented.");
this.event.on(event, listener);
return this.sockets;
}
to(room: string): SocketIO.Namespace {
throw new Error("Method not implemented.");
@ -97,22 +122,72 @@ export class SocketIOServer implements SocketIO.Server {
throw new Error("Method not implemented.");
}
emit(event: string, ...args: any[]): SocketIO.Namespace {
throw new Error("Method not implemented.");
this.sockets.emit(event, ...args);
return this.sockets;
}
send(...args: any[]): SocketIO.Namespace {
throw new Error("Method not implemented.");
this.sockets.send(...args);
return this.sockets;
}
write(...args: any[]): SocketIO.Namespace {
throw new Error("Method not implemented.");
this.sockets.write(...args);
return this.sockets;
}
clients(...args: any[]): SocketIO.Namespace {
throw new Error("Method not implemented.");
this.sockets.clients(args[0]);
return this.sockets;
}
compress(...args: any[]): SocketIO.Namespace {
throw new Error("Method not implemented.");
}
disable() {
this.nettyServer.disable();
}
private initNettyServer(pipeline, options) {
this.nettyServer = new NettyWebSocketServer(pipeline, {
event: new EventEmitter(),
path: options.path
});
this.nettyServer.on(ServerEvent.connect, (nettyClient) => {
let client = new Client(this, nettyClient);
this.onconnection(client);
})
this.nettyServer.on(ServerEvent.message, (nettyClient, text) => {
this.processPacket(this.parser.decode(text), this.allClients[nettyClient.id]);
})
}
private processPacket(packet: Packet, client: Client) {
switch (packet.type) {
case PacketTypes.PING:
client.packet({
type: PacketTypes.PONG,
data: packet.data
})
break;
case PacketTypes.UPGRADE:
break;
case PacketTypes.MESSAGE:
this.processSubPacket(packet, client);
break;
case PacketTypes.CLOSE:
}
}
private processSubPacket(packet: Packet, client: Client) {
switch (packet.sub_type) {
case SubPacketTypes.CONNECT:
client.packet(packet);
break;
case SubPacketTypes.EVENT:
client.process(packet);
break;
}
}
}
export {
Server,
Server as SocketIOServer,
Client as SocketIOClient
}

View File

@ -1,96 +1,66 @@
import { EventEmitter } from 'events'
import { SocketIOClient } from './client'
import { Client } from './client'
import { SocketIO } from './interfaces';
import { ServerEvent } from '../server';
import { Socket } from './socket';
import { Adapter } from './adapter';
import { Server } from './index'
export class Namespace implements SocketIO.Namespace {
private event: EventEmitter;
private allClients: { [key: string]: SocketIOClient };
private roomClients: { [key: string]: Set<string> };
private clientRooms: { [key: string]: Set<string> };
export class Namespace extends EventEmitter implements SocketIO.Namespace {
name: string;
server: SocketIO.Server;
server: Server;
sockets: { [id: string]: SocketIO.Socket; };
connected: { [id: string]: SocketIO.Socket; };
adapter: SocketIO.Adapter;
json: SocketIO.Namespace;
constructor(name: string) {
super();
this.name = name;
this.event = new EventEmitter();
this.allClients = {};
this.roomClients = {};
this.clientRooms = {};
this.sockets = {};
this.connected = {};
this.adapter = new Adapter(this);
}
initAdapter() {
let adp = this.server.adapter()
this.adapter = new adp()
}
add(client: Client) {
let nameClient = new Socket(this, client, {});
this.sockets[client.id] = nameClient;
client.nsps[this.name] = nameClient;
this.onconnection(nameClient);
}
use(fn: (socket: SocketIO.Socket, fn: (err?: any) => void) => void): SocketIO.Namespace {
throw new Error("Method not implemented.");
}
to(room: string): SocketIO.Namespace {
throw new Error("Method not implemented.");
}
in(room: string): SocketIO.Namespace {
throw new Error("Method not implemented.");
}
send(...args: any[]): SocketIO.Namespace {
throw new Error("Method not implemented.");
}
write(...args: any[]): SocketIO.Namespace {
throw new Error("Method not implemented.");
}
on(event: "connection", listener: (socket: SocketIO.Socket) => void): this;
on(event: "connect", listener: (socket: SocketIO.Socket) => void): this;
on(event: string, listener: Function): this;
on(event: any, listener: any) {
// TODO
return this;
}
to(room: string): SocketIO.Namespace {
// TODO
return this;
}
in(room: string): SocketIO.Namespace {
return this.to(room);
}
send(...args: any[]): SocketIO.Namespace {
// TODO
return this;
}
write(...args: any[]): SocketIO.Namespace {
return this.send(...args);
}
clients(fn: Function): SocketIO.Namespace {
throw new Error("Method not implemented.");
return fn(Object.values(this.sockets))
}
compress(compress: boolean): SocketIO.Namespace {
throw new Error("Method not implemented.");
}
addListener(event: string | symbol, listener: (...args: any[]) => void): this {
throw new Error("Method not implemented.");
private onconnection(socket: any) {
let client = socket as Socket;
this.sockets[client.id] = client;
client.onconnect()
this.emit(ServerEvent.connect, socket);
this.emit(ServerEvent.connection, socket);
}
once(event: string | symbol, listener: (...args: any[]) => void): this {
throw new Error("Method not implemented.");
}
removeListener(event: string | symbol, listener: (...args: any[]) => void): this {
throw new Error("Method not implemented.");
}
off(event: string | symbol, listener: (...args: any[]) => void): this {
throw new Error("Method not implemented.");
}
removeAllListeners(event?: string | symbol): this {
throw new Error("Method not implemented.");
}
setMaxListeners(n: number): this {
throw new Error("Method not implemented.");
}
getMaxListeners(): number {
throw new Error("Method not implemented.");
}
listeners(event: string | symbol): Function[] {
throw new Error("Method not implemented.");
}
rawListeners(event: string | symbol): Function[] {
throw new Error("Method not implemented.");
}
emit(event: string | symbol, ...args: any[]): boolean {
throw new Error("Method not implemented.");
}
listenerCount(type: string | symbol): number {
throw new Error("Method not implemented.");
}
prependListener(event: string | symbol, listener: (...args: any[]) => void): this {
throw new Error("Method not implemented.");
}
prependOnceListener(event: string | symbol, listener: (...args: any[]) => void): this {
throw new Error("Method not implemented.");
}
eventNames(): (string | symbol)[] {
throw new Error("Method not implemented.");
}
}

View File

@ -0,0 +1,110 @@
import { EventEmitter } from 'events'
import { SocketIOClient } from "socket-io";
import { SocketIO } from "./interfaces";
import { Packet } from './packet';
import { PacketTypes, SubPacketTypes } from './types';
export class Socket extends EventEmitter implements SocketIO.Socket {
nsp: SocketIO.Namespace;
server: SocketIO.Server;
adapter: SocketIO.Adapter;
id: string;
request: any;
client: SocketIOClient;
conn: SocketIO.EngineSocket;
rooms: { [id: string]: string; };
connected: boolean;
disconnected: boolean;
handshake: SocketIO.Handshake;
json: SocketIO.Socket;
volatile: SocketIO.Socket;
broadcast: SocketIO.Socket;
fns: any[];
_rooms: string[];
constructor(nsp, client, query) {
super();
this.nsp = nsp;
this.server = nsp.server;
this.adapter = this.nsp.adapter;
this.id = nsp.name !== '/' ? nsp.name + '#' + client.id : client.id;
this.client = client;
this.conn = client.conn;
this.rooms = {};
// this.acks = {};
this.connected = true;
this.disconnected = false;
// this.handshake = this.buildHandshake(query);
this.fns = [];
// this.flags = {};
this._rooms = [];
}
to(room: string): SocketIO.Socket {
if (!~this._rooms.indexOf(room)) this._rooms.push(room);
return this;
}
in(room: string): SocketIO.Socket {
return this.to(room);
}
use(fn: (packet: SocketIO.Packet, next: (err?: any) => void) => void): SocketIO.Socket {
throw new Error("Method not implemented.");
}
send(...args: any[]): SocketIO.Socket {
this.emit("message", ...args)
return this;
}
write(...args: any[]): SocketIO.Socket {
return this.send(...args);
}
join(rooms: string | string[], fn?: (err?: any) => void): SocketIO.Socket {
if (!Array.isArray(rooms)) {
rooms = [rooms];
}
rooms = rooms.filter((room) => {
return !this.rooms.hasOwnProperty(room);
});
if (!rooms.length) {
fn && fn(null);
return this;
}
return this;
}
leave(name: string, fn?: Function): SocketIO.Socket {
delete this.rooms[name];
fn && fn(null)
return this;
}
leaveAll(): void {
this.adapter.delAll(this.id);
this.rooms = {};
}
disconnect(close?: boolean): SocketIO.Socket {
throw new Error("Method not implemented.");
}
compress(compress: boolean): SocketIO.Socket {
throw new Error("Method not implemented.");
}
error(err: any): void {
throw new Error("Method not implemented.");
}
packet(packet: Packet) {
this.client.packet(packet);
}
onconnect() {
this.nsp.connected[this.id] = this;
this.join(this.id);
// var skip = this.nsp.name === '/' && this.nsp.fns.length === 0;
// if (skip) {
// debug('packet already sent in initial handshake');
// } else {
this.packet({
type: PacketTypes.MESSAGE,
sub_type: SubPacketTypes.CONNECT
});
// }
}
}