From b98409abe780da797411297ffab02a083306e50f Mon Sep 17 00:00:00 2001 From: MiaoWoo Date: Wed, 17 Jun 2020 18:27:36 +0800 Subject: [PATCH] feat: add amqp package Signed-off-by: MiaoWoo --- packages/amqp/.npmignore | 1 + packages/amqp/package.json | 32 +++++++++ packages/amqp/src/adapter/admin.ts | 14 ++++ packages/amqp/src/adapter/callback.ts | 30 +++++++++ packages/amqp/src/adapter/connection.ts | 31 +++++++++ packages/amqp/src/adapter/container.ts | 30 +++++++++ packages/amqp/src/adapter/converter.ts | 37 ++++++++++ packages/amqp/src/adapter/index.ts | 7 ++ packages/amqp/src/adapter/listener.ts | 49 ++++++++++++++ packages/amqp/src/adapter/template.ts | 27 ++++++++ packages/amqp/src/admin.ts | 90 +++++++++++++++++++++++++ packages/amqp/src/constants.ts | 3 + packages/amqp/src/index.ts | 34 ++++++++++ packages/amqp/src/listener.ts | 5 ++ packages/amqp/src/manager.ts | 18 +++++ packages/amqp/tsconfig.json | 7 ++ 16 files changed, 415 insertions(+) create mode 120000 packages/amqp/.npmignore create mode 100644 packages/amqp/package.json create mode 100644 packages/amqp/src/adapter/admin.ts create mode 100644 packages/amqp/src/adapter/callback.ts create mode 100644 packages/amqp/src/adapter/connection.ts create mode 100644 packages/amqp/src/adapter/container.ts create mode 100644 packages/amqp/src/adapter/converter.ts create mode 100644 packages/amqp/src/adapter/index.ts create mode 100644 packages/amqp/src/adapter/listener.ts create mode 100644 packages/amqp/src/adapter/template.ts create mode 100644 packages/amqp/src/admin.ts create mode 100644 packages/amqp/src/constants.ts create mode 100644 packages/amqp/src/index.ts create mode 100644 packages/amqp/src/listener.ts create mode 100644 packages/amqp/src/manager.ts create mode 100644 packages/amqp/tsconfig.json diff --git a/packages/amqp/.npmignore b/packages/amqp/.npmignore new file mode 120000 index 00000000..b4359f69 --- /dev/null +++ b/packages/amqp/.npmignore @@ -0,0 +1 @@ +../../.npmignore \ No newline at end of file diff --git a/packages/amqp/package.json b/packages/amqp/package.json new file mode 100644 index 00000000..b64cef2f --- /dev/null +++ b/packages/amqp/package.json @@ -0,0 +1,32 @@ +{ + "name": "@ccms/amqp", + "version": "0.7.0", + "description": "MiaoScript amqp package", + "keywords": [ + "miaoscript", + "minecraft", + "bukkit", + "sponge" + ], + "author": "MiaoWoo ", + "homepage": "https://github.com/circlecloud/ms.git", + "license": "ISC", + "main": "dist/index.js", + "scripts": { + "clean": "rimraf dist", + "watch": "tsc --watch", + "build": "yarn clean && tsc", + "test": "echo \"Error: run tests from root\" && exit 1" + }, + "dependencies": { + "@ccms/api": "^0.7.0", + "@ccms/nashorn": "^0.7.0", + "@ccms/common": "^0.7.0", + "@ccms/container": "^0.7.0" + }, + "devDependencies": { + "reflect-metadata": "^0.1.13", + "rimraf": "^3.0.2", + "typescript": "^3.9.2" + } +} \ No newline at end of file diff --git a/packages/amqp/src/adapter/admin.ts b/packages/amqp/src/adapter/admin.ts new file mode 100644 index 00000000..d032c9d4 --- /dev/null +++ b/packages/amqp/src/adapter/admin.ts @@ -0,0 +1,14 @@ +import { ConnectionFactoryAdapter } from "./connection" +import { RabbitTemplateAdapter } from "./template" + +export const RabbitAdmin = Java.type('org.springframework.amqp.rabbit.core.RabbitAdmin') + +export class RabbitAdminAdapter { + private _Handler: org.springframework.amqp.rabbit.core.RabbitAdmin + constructor(config: RabbitTemplateAdapter | org.springframework.amqp.rabbit.core.RabbitTemplate | ConnectionFactoryAdapter | org.springframework.amqp.rabbit.connection.ConnectionFactory) { + this._Handler = new RabbitAdmin((config instanceof RabbitTemplateAdapter || config instanceof ConnectionFactoryAdapter) ? config.getHandler() : config) + } + getHandler(): org.springframework.amqp.rabbit.core.RabbitAdmin { + return this._Handler + } +} diff --git a/packages/amqp/src/adapter/callback.ts b/packages/amqp/src/adapter/callback.ts new file mode 100644 index 00000000..d4936a94 --- /dev/null +++ b/packages/amqp/src/adapter/callback.ts @@ -0,0 +1,30 @@ +export const ConfirmCallback = Java.type('org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback') +export const ReturnCallback = Java.type('org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback') + +export abstract class ConfirmCallbackAdapter { + private _Handler: org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback + constructor() { + let ConfirmCallbackImpl = Java.extend(ReturnCallback, { + confirm: (correlationData: org.springframework.amqp.rabbit.connection.CorrelationData, ack: boolean, cause: string) => this.confirm(correlationData, ack, cause) + }) + this._Handler = new ConfirmCallbackImpl() + } + abstract confirm(correlationData: org.springframework.amqp.rabbit.connection.CorrelationData, ack: boolean, cause: string) + getHandler() { + return this._Handler + } +} + +export abstract class ReturnCallbackAdapter { + private _Handler: org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback + constructor() { + let ReturnCallbackImpl = Java.extend(ReturnCallback, { + returnedMessage: (message: org.springframework.amqp.core.Message, replyCode: number, replyText: string, exchange: string, routingKey: string) => this.returnedMessage(message, replyCode, replyText, exchange, routingKey) + }) + this._Handler = new ReturnCallbackImpl() + } + abstract returnedMessage(message: org.springframework.amqp.core.Message, replyCode: number, replyText: string, exchange: string, routingKey: string) + getHandler() { + return this._Handler + } +} diff --git a/packages/amqp/src/adapter/connection.ts b/packages/amqp/src/adapter/connection.ts new file mode 100644 index 00000000..b18fb5f9 --- /dev/null +++ b/packages/amqp/src/adapter/connection.ts @@ -0,0 +1,31 @@ +import threadPool from '@ccms/common/dist/thread-pool' + +export const ConnectionFactory = Java.type('org.springframework.amqp.rabbit.connection.ConnectionFactory') +const CachingConnectionFactory = Java.type('org.springframework.amqp.rabbit.connection.CachingConnectionFactory') +interface ConnectionConfig { + url: string, + username?: string, + password?: string, + publisherConfirms?: boolean + cacheSize?: number +} + +export class ConnectionFactoryAdapter { + private _Handler: org.springframework.amqp.rabbit.connection.CachingConnectionFactory + + constructor(config: ConnectionConfig) { + if (!config.url) { throw new Error('Connection UrI Can\'t be undefiend!') } + config = { publisherConfirms: true, cacheSize: 50, ...config } + this._Handler = new CachingConnectionFactory(new java.net.URI(config.url)) + config.username && this._Handler.setUsername(config.username) + config.password && this._Handler.setPassword(config.password) + this._Handler.setPublisherConfirms(config.publisherConfirms) + this._Handler.setExecutor(threadPool.create({ + groupName: '@ccms/amqp-rabbit' + })) + this._Handler.setChannelCacheSize(config.cacheSize) + } + getHandler() { + return this._Handler + } +} diff --git a/packages/amqp/src/adapter/container.ts b/packages/amqp/src/adapter/container.ts new file mode 100644 index 00000000..a2491dd9 --- /dev/null +++ b/packages/amqp/src/adapter/container.ts @@ -0,0 +1,30 @@ +import { ConnectionFactoryAdapter, ConnectionFactory } from "./connection" +import { ChannelAwareMessageListenerAdapter, ChannelAwareMessageListener } from "./listener" + +export const SimpleMessageListenerContainer = org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer +export const AcknowledgeMode = org.springframework.amqp.core.AcknowledgeMode +interface ContainerConfig { + connectionFactory: ConnectionFactoryAdapter | typeof ConnectionFactory + queueNames: string[] + messageListener: ChannelAwareMessageListenerAdapter | typeof ChannelAwareMessageListener + maxConcurrentConsumers?: number + concurrentConsumers?: number + acknowledgeMode?: org.springframework.amqp.core.AcknowledgeMode +} + +export class MessageListenerContainerAdapter { + private _Handler: org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer + constructor(config: ContainerConfig) { + config = { concurrentConsumers: 5, maxConcurrentConsumers: 10, acknowledgeMode: AcknowledgeMode.AUTO, ...config } + this._Handler = new SimpleMessageListenerContainer(config.connectionFactory instanceof ConnectionFactoryAdapter ? config.connectionFactory.getHandler() : config.connectionFactory) + //@ts-ignore + this._Handler.setQueueNames(config.queueNames) + this._Handler.setMaxConcurrentConsumers(config.maxConcurrentConsumers) + this._Handler.setConcurrentConsumers(config.concurrentConsumers) + this._Handler.setAcknowledgeMode(config.acknowledgeMode) + this._Handler.setMessageListener(config.messageListener instanceof ChannelAwareMessageListenerAdapter ? config.messageListener.getHandler() : config.messageListener) + } + getHandler() { + return this._Handler + } +} \ No newline at end of file diff --git a/packages/amqp/src/adapter/converter.ts b/packages/amqp/src/adapter/converter.ts new file mode 100644 index 00000000..dfe2685d --- /dev/null +++ b/packages/amqp/src/adapter/converter.ts @@ -0,0 +1,37 @@ +const JavaString = Java.type('java.lang.String') +const MessageProperties = org.springframework.amqp.core.MessageProperties +const Message = org.springframework.amqp.core.Message + +export const MessageConverter = Java.type('org.springframework.amqp.support.converter.MessageConverter') + +export class MessageConverterAdapter { + private _Handler: org.springframework.amqp.support.converter.MessageConverter + constructor() { + var MessageConverterImpl = Java.extend(MessageConverter, { + toMessage: (object: any, messageProperties: org.springframework.amqp.core.MessageProperties) => this.toMessage(object, messageProperties), + fromMessage: (message: org.springframework.amqp.core.Message) => this.fromMessage(message) + }) + this._Handler = new MessageConverterImpl() + } + toMessage(object: any, messageProperties: org.springframework.amqp.core.MessageProperties) { + if (typeof object == "string") { + messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN) + return new Message(new JavaString(object).getBytes(), messageProperties) + } else { + messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON) + return new Message(new JavaString(JSON.stringify(object)).getBytes(), messageProperties) + } + } + fromMessage(message: org.springframework.amqp.core.Message) { + if (message.getMessageProperties().getContentType() == MessageProperties.CONTENT_TYPE_TEXT_PLAIN) { + return new JavaString(message.getBody(), message.getMessageProperties().getContentEncoding()) + } + if (message.getMessageProperties().getContentType() == MessageProperties.CONTENT_TYPE_JSON) { + return JSON.parse(new JavaString(message.getBody(), message.getMessageProperties().getContentEncoding())) + } + return message + } + getHandler() { + return this._Handler + } +} diff --git a/packages/amqp/src/adapter/index.ts b/packages/amqp/src/adapter/index.ts new file mode 100644 index 00000000..979742a8 --- /dev/null +++ b/packages/amqp/src/adapter/index.ts @@ -0,0 +1,7 @@ +export * from './admin' +export * from './callback' +export * from './connection' +export * from './container' +export * from './converter' +export * from './listener' +export * from './template' diff --git a/packages/amqp/src/adapter/listener.ts b/packages/amqp/src/adapter/listener.ts new file mode 100644 index 00000000..45e87262 --- /dev/null +++ b/packages/amqp/src/adapter/listener.ts @@ -0,0 +1,49 @@ +const JavaString = Java.type('java.lang.String') +export const MessageProperties = org.springframework.amqp.core.MessageProperties +export const ChannelAwareMessageListener = Java.type('org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener') +export const AmqpRejectAndDontRequeueException = org.springframework.amqp.AmqpRejectAndDontRequeueException +export const Channel: com.rabbitmq.client.Channel = Java.type('com.rabbitmq.client.Channel') +export const Message = org.springframework.amqp.core.Message +export type MessageHandler = (content: T, message: org.springframework.amqp.core.Message, channel: com.rabbitmq.client.Channel) => any +export abstract class ChannelAwareMessageListenerAdapter { + private _Handler: org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener + constructor(manual: boolean = false) { + let ChannelAwareMessageListenerImpl = Java.extend(ChannelAwareMessageListener, { + onMessage: (message: org.springframework.amqp.core.Message, channel: com.rabbitmq.client.Channel) => { + let content = new JavaString(message.getBody(), message.getMessageProperties().getContentEncoding() || "UTF-8") + try { + if (message.getMessageProperties().getContentType() == MessageProperties.CONTENT_TYPE_JSON) { + content = JSON.parse(content) + } + } catch (error) { + if (manual) { + channel.basicReject(message.getMessageProperties().getDeliveryTag(), true) + } else { + throw new AmqpRejectAndDontRequeueException(`${error}`, error) + } + } + manual ? this.manualOnMessage(content, message, channel) : this.onMessage(content, message, channel) + } + }) + this._Handler = new ChannelAwareMessageListenerImpl() + } + abstract onMessage(content: T, message: org.springframework.amqp.core.Message, channel: com.rabbitmq.client.Channel): any + onError(error: Error, message: org.springframework.amqp.core.Message, channel: com.rabbitmq.client.Channel): any { } + getHandler() { + return this._Handler + } + private manualOnMessage(content: T, message: org.springframework.amqp.core.Message, channel: com.rabbitmq.client.Channel) { + let deliveryTag = message.getMessageProperties().getDeliveryTag() + try { + if (this.onMessage(content, message, channel)) { + channel.basicAck(deliveryTag, false) + } else if (message.getMessageProperties().getRedelivered()) { + channel.basicReject(deliveryTag, true) + } else { + channel.basicNack(deliveryTag, false, true) + } + } catch (error) { + channel.basicReject(deliveryTag, this.onError(error, message, channel)) + } + } +} diff --git a/packages/amqp/src/adapter/template.ts b/packages/amqp/src/adapter/template.ts new file mode 100644 index 00000000..b1085386 --- /dev/null +++ b/packages/amqp/src/adapter/template.ts @@ -0,0 +1,27 @@ +import { MessageConverterAdapter } from "./converter" +import { ConnectionFactoryAdapter } from "./connection" +import { ConfirmCallbackAdapter, ReturnCallbackAdapter } from "./callback" + +export const RabbitTemplate = Java.type('org.springframework.amqp.rabbit.core.RabbitTemplate') + +interface TemplateConfig { + connectionFactory: ConnectionFactoryAdapter | org.springframework.amqp.rabbit.connection.ConnectionFactory + confirmCallback?: ConfirmCallbackAdapter + returnCallback?: ReturnCallbackAdapter + messageConverter?: MessageConverterAdapter +} +export class RabbitTemplateAdapter { + private _Handler: org.springframework.amqp.rabbit.core.RabbitTemplate + constructor(config: TemplateConfig) { + config = { messageConverter: new MessageConverterAdapter(), ...config } + console.debug(JSON.stringify(config)) + this._Handler = new RabbitTemplate(config.connectionFactory instanceof ConnectionFactoryAdapter ? config.connectionFactory.getHandler() : config.connectionFactory) + config.returnCallback && this._Handler.setReturnCallback(config.returnCallback.getHandler()) + config.confirmCallback && this._Handler.setConfirmCallback(config.confirmCallback.getHandler()) + config.messageConverter && this._Handler.setMessageConverter(config.messageConverter.getHandler()) + } + + getHandler() { + return this._Handler + } +} diff --git a/packages/amqp/src/admin.ts b/packages/amqp/src/admin.ts new file mode 100644 index 00000000..d60e3499 --- /dev/null +++ b/packages/amqp/src/admin.ts @@ -0,0 +1,90 @@ +import { JSClass } from "@ccms/container" +import { RabbitAdminAdapter, MessageListenerContainerAdapter, AcknowledgeMode, MessageHandler, ChannelAwareMessageListenerAdapter } from "./adapter" + +const RabbitAdmin = Java.type('org.springframework.amqp.rabbit.core.RabbitAdmin') + +export class AmqpAdmin { + @JSClass('org.springframework.amqp.core.TopicExchange') + private TopicExchange: typeof org.springframework.amqp.core.TopicExchange + @JSClass('org.springframework.amqp.core.Queue') + private Queue: typeof org.springframework.amqp.core.Queue + @JSClass('org.springframework.amqp.core.Binding') + private Binding: typeof org.springframework.amqp.core.Binding + + @JSClass('org.springframework.amqp.core.Binding.DestinationType') + private DestinationType: typeof org.springframework.amqp.core.Binding.DestinationType + + private amqpAdmin: org.springframework.amqp.rabbit.core.RabbitAdmin + + constructor(amqpAdmin: org.springframework.amqp.rabbit.core.RabbitAdmin | any) { + if (amqpAdmin instanceof RabbitAdmin) { + this.amqpAdmin = amqpAdmin + } else if (amqpAdmin instanceof RabbitAdminAdapter) { + this.amqpAdmin = amqpAdmin.getHandler() + } else { + this.amqpAdmin = new RabbitAdminAdapter(amqpAdmin).getHandler() + } + } + + getHandler() { + return this.amqpAdmin + } + + getQueueProperties(name: string) { + return this.amqpAdmin.getQueueProperties(name) + } + + declareExchange(name: string) { + let exchange = new this.TopicExchange(name, true, false) + this.amqpAdmin.declareExchange(exchange) + return this + } + + declareQueue(name: string) { + let queue = new this.Queue(name, true) + this.amqpAdmin.declareQueue(queue) + return this + } + + declareBinding(queue: string, exchange: string, routerKey: string, argument: any = null) { + let binding = new this.Binding(queue, this.DestinationType.QUEUE, exchange, routerKey, argument) + this.amqpAdmin.declareBinding(binding) + return this + } + + declareQueueAndBindExchange(queue: string, exchange: string, routerKey: string) { + return this.declareQueue(queue).declareExchange(exchange).declareBinding(queue, exchange, routerKey) + } + + createContainer(queue: string, listener: MessageHandler, acknowledgeMode = AcknowledgeMode.AUTO) { + let connection = this.amqpAdmin.getRabbitTemplate().getConnectionFactory() + return new MessageListenerContainerAdapter({ + connectionFactory: connection, + queueNames: [queue], + messageListener: new SimpleMessageHandler(listener), + acknowledgeMode: acknowledgeMode + }).getHandler() + } + + send() + send() { + + } + + getTemplate() { + return this.amqpAdmin.getRabbitTemplate() + } + + close() { + this.getTemplate().stop() + } +} + +export class SimpleMessageHandler extends ChannelAwareMessageListenerAdapter { + constructor(private handler: MessageHandler) { + super() + } + onMessage(content: any, message: org.springframework.amqp.core.Message, channel: com.rabbitmq.client.Channel) { + return this.handler(content, message, channel) + } +} \ No newline at end of file diff --git a/packages/amqp/src/constants.ts b/packages/amqp/src/constants.ts new file mode 100644 index 00000000..b21a8a87 --- /dev/null +++ b/packages/amqp/src/constants.ts @@ -0,0 +1,3 @@ +export const METADATA_KEY = { + +} \ No newline at end of file diff --git a/packages/amqp/src/index.ts b/packages/amqp/src/index.ts new file mode 100644 index 00000000..207ff799 --- /dev/null +++ b/packages/amqp/src/index.ts @@ -0,0 +1,34 @@ +/// +/// +/// +/// +/// + +import { amqp } from '@ccms/api' +import { getContainer, reduceMetadata } from '@ccms/container' + +function init() { + const beanFactory = base.getInstance().getAutowireCapableBeanFactory() + getContainer().bind(amqp.rabbit.Template).toDynamicValue((ctx) => { + let metadata = reduceMetadata(ctx) + if (!metadata.named) { + return beanFactory.getBean('rabbitTemplate') + } else { + return beanFactory.getBean(`${metadata.named}-rabbitTemplate`) + } + }) + getContainer().bind(amqp.rabbit.Admin).toDynamicValue((ctx) => { + let metadata = reduceMetadata(ctx) + if (!metadata.named) { + return beanFactory.getBean('rabbitAdmin') + } else { + return beanFactory.getBean(`${metadata.named}-rabbitAdmin`) + } + }) +} +init() + +export * from './admin' +export * from './adapter' +export * from './manager' +export * from './constants' diff --git a/packages/amqp/src/listener.ts b/packages/amqp/src/listener.ts new file mode 100644 index 00000000..cc493772 --- /dev/null +++ b/packages/amqp/src/listener.ts @@ -0,0 +1,5 @@ +export class AmqpListener { + constructor() { + + } +} \ No newline at end of file diff --git a/packages/amqp/src/manager.ts b/packages/amqp/src/manager.ts new file mode 100644 index 00000000..4ad8d0f2 --- /dev/null +++ b/packages/amqp/src/manager.ts @@ -0,0 +1,18 @@ +import { amqp } from "@ccms/api" +import { provideSingleton } from "@ccms/container" + +import { ConnectionFactoryAdapter, RabbitTemplateAdapter, RabbitAdminAdapter } from "./adapter" +import { AmqpAdmin } from "./admin" + +@provideSingleton(amqp.Manager) +export class AmqpManager { + createConnection(url: string, username: string, password: string) { + return new ConnectionFactoryAdapter({ url, username, password }) + } + createTemplate(adapter: ConnectionFactoryAdapter) { + return new RabbitTemplateAdapter({ connectionFactory: adapter }) + } + createAdmin(adapter: RabbitTemplateAdapter | ConnectionFactoryAdapter) { + return new AmqpAdmin(new RabbitAdminAdapter(adapter)) + } +} diff --git a/packages/amqp/tsconfig.json b/packages/amqp/tsconfig.json new file mode 100644 index 00000000..7aae5d2b --- /dev/null +++ b/packages/amqp/tsconfig.json @@ -0,0 +1,7 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "baseUrl": "src", + "outDir": "dist" + } +} \ No newline at end of file