@@ -1,6 +1,8 @@
|
||||
import { EventEmitter } from "events"
|
||||
|
||||
export type SocketId = string
|
||||
// we could extend the Room type to "string | number", but that would be a breaking change
|
||||
// related: https://github.com/socketio/socket.io-redis-adapter/issues/418
|
||||
export type Room = string
|
||||
|
||||
export interface BroadcastFlags {
|
||||
@@ -9,11 +11,12 @@ export interface BroadcastFlags {
|
||||
local?: boolean
|
||||
broadcast?: boolean
|
||||
binary?: boolean
|
||||
timeout?: number
|
||||
}
|
||||
|
||||
export interface BroadcastOptions {
|
||||
rooms: Set<Room>
|
||||
except?: Set<SocketId>
|
||||
except?: Set<Room>
|
||||
flags?: BroadcastFlags
|
||||
}
|
||||
|
||||
@@ -42,6 +45,15 @@ export class Adapter extends EventEmitter {
|
||||
*/
|
||||
public close(): Promise<void> | void { }
|
||||
|
||||
/**
|
||||
* Returns the number of Socket.IO servers in the cluster
|
||||
*
|
||||
* @public
|
||||
*/
|
||||
public serverCount(): Promise<number> {
|
||||
return Promise.resolve(1)
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a socket to a list of room.
|
||||
*
|
||||
@@ -82,14 +94,14 @@ export class Adapter extends EventEmitter {
|
||||
this._del(room, id)
|
||||
}
|
||||
|
||||
private _del(room, id) {
|
||||
if (this.rooms.has(room)) {
|
||||
const deleted = this.rooms.get(room).delete(id)
|
||||
private _del(room: Room, id: SocketId) {
|
||||
const _room = this.rooms.get(room)
|
||||
if (_room != null) {
|
||||
const deleted = _room.delete(id)
|
||||
if (deleted) {
|
||||
this.emit("leave-room", room, id)
|
||||
}
|
||||
if (this.rooms.get(room).size === 0) {
|
||||
this.rooms.delete(room)
|
||||
if (_room.size === 0 && this.rooms.delete(room)) {
|
||||
this.emit("delete-room", room)
|
||||
}
|
||||
}
|
||||
@@ -126,7 +138,7 @@ export class Adapter extends EventEmitter {
|
||||
*/
|
||||
public broadcast(packet: any, opts: BroadcastOptions): void {
|
||||
const flags = opts.flags || {}
|
||||
const basePacketOpts = {
|
||||
const packetOpts = {
|
||||
preEncoded: true,
|
||||
volatile: flags.volatile,
|
||||
compress: flags.compress
|
||||
@@ -135,22 +147,65 @@ export class Adapter extends EventEmitter {
|
||||
packet.nsp = this.nsp.name
|
||||
const encodedPackets = this.encoder.encode(packet)
|
||||
|
||||
const packetOpts = encodedPackets.map(encodedPacket => {
|
||||
if (typeof encodedPacket === "string") {
|
||||
return {
|
||||
...basePacketOpts,
|
||||
wsPreEncoded: "4" + encodedPacket // "4" being the "message" packet type in Engine.IO
|
||||
}
|
||||
} else {
|
||||
return basePacketOpts
|
||||
this.apply(opts, socket => {
|
||||
if (typeof socket.notifyOutgoingListeners === "function") {
|
||||
socket.notifyOutgoingListeners(packet)
|
||||
}
|
||||
|
||||
socket.client.writeToEngine(encodedPackets, packetOpts)
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Broadcasts a packet and expects multiple acknowledgements.
|
||||
*
|
||||
* Options:
|
||||
* - `flags` {Object} flags for this packet
|
||||
* - `except` {Array} sids that should be excluded
|
||||
* - `rooms` {Array} list of rooms to broadcast to
|
||||
*
|
||||
* @param {Object} packet the packet object
|
||||
* @param {Object} opts the options
|
||||
* @param clientCountCallback - the number of clients that received the packet
|
||||
* @param ack - the callback that will be called for each client response
|
||||
*
|
||||
* @public
|
||||
*/
|
||||
public broadcastWithAck(
|
||||
packet: any,
|
||||
opts: BroadcastOptions,
|
||||
clientCountCallback: (clientCount: number) => void,
|
||||
ack: (...args: any[]) => void
|
||||
) {
|
||||
const flags = opts.flags || {}
|
||||
const packetOpts = {
|
||||
preEncoded: true,
|
||||
volatile: flags.volatile,
|
||||
compress: flags.compress
|
||||
}
|
||||
|
||||
packet.nsp = this.nsp.name
|
||||
// we can use the same id for each packet, since the _ids counter is common (no duplicate)
|
||||
packet.id = this.nsp._ids++
|
||||
|
||||
const encodedPackets = this.encoder.encode(packet)
|
||||
|
||||
let clientCount = 0
|
||||
|
||||
this.apply(opts, socket => {
|
||||
for (let i = 0; i < encodedPackets.length; i++) {
|
||||
socket.client.writeToEngine(encodedPackets[i], packetOpts[i])
|
||||
// track the total number of acknowledgements that are expected
|
||||
clientCount++
|
||||
// call the ack callback for each client response
|
||||
socket.acks.set(packet.id, ack)
|
||||
|
||||
if (typeof socket.notifyOutgoingListeners === "function") {
|
||||
socket.notifyOutgoingListeners(packet)
|
||||
}
|
||||
|
||||
socket.client.writeToEngine(encodedPackets, packetOpts)
|
||||
})
|
||||
|
||||
clientCountCallback(clientCount)
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -272,7 +327,7 @@ export class Adapter extends EventEmitter {
|
||||
* @param packet - an array of arguments, which may include an acknowledgement callback at the end
|
||||
*/
|
||||
public serverSideEmit(packet: any[]): void {
|
||||
throw new Error(
|
||||
console.warn(
|
||||
"this adapter does not support the serverSideEmit() functionality"
|
||||
)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user