From c61f87a449b66751462ddc8a1291a9c1e54e2eee Mon Sep 17 00:00:00 2001 From: MiaoWoo Date: Thu, 17 Dec 2020 17:00:00 +0800 Subject: [PATCH] feat: js task limit exec time default 3s Signed-off-by: MiaoWoo --- packages/ployfill/src/index.ts | 6 +- packages/ployfill/src/node-shim.ts | 106 ++++++++++++++++++++++------- 2 files changed, 86 insertions(+), 26 deletions(-) diff --git a/packages/ployfill/src/index.ts b/packages/ployfill/src/index.ts index 3260df29..2d1fec8d 100644 --- a/packages/ployfill/src/index.ts +++ b/packages/ployfill/src/index.ts @@ -4,9 +4,9 @@ import i18n from '@ccms/i18n' let ployfillStartTime = new Date().getTime() i18n.initialize() console.i18n("ms.ployfill.initialize") -require('./es5-ext') -require('./node-shim') -require('core-js') +import './es5-ext' +import './node-shim' +import 'core-js' //@ts-ignore process.on('exit', () => require.disable()) global.setGlobal('Proxy', require('./proxy').Proxy) diff --git a/packages/ployfill/src/node-shim.ts b/packages/ployfill/src/node-shim.ts index a9efdd41..cb5212f9 100644 --- a/packages/ployfill/src/node-shim.ts +++ b/packages/ployfill/src/node-shim.ts @@ -1,8 +1,14 @@ import { EventEmitter } from 'events' +const System = Java.type('java.lang.System') const Thread = Java.type('java.lang.Thread') +const Runnable = Java.type('java.lang.Runnable') +const InterruptedException = Java.type('java.lang.InterruptedException') const ThreadGroup = Java.type("java.lang.ThreadGroup") const AtomicInteger = Java.type("java.util.concurrent.atomic.AtomicInteger") +const Callable = Java.type('java.util.concurrent.Callable') +const ThreadFactory = Java.type("java.util.concurrent.ThreadFactory") +const TimeoutException = Java.type('java.util.concurrent.TimeoutException') const ThreadPoolExecutor = Java.type('java.util.concurrent.ThreadPoolExecutor') const LinkedBlockingQueue = Java.type("java.util.concurrent.LinkedBlockingQueue") const TimeUnit = Java.type('java.util.concurrent.TimeUnit') @@ -12,18 +18,24 @@ const JavaScriptTask = Java.type(base.getJavaScriptTaskClass().name) const threadCount = new AtomicInteger(0) const threadGroup = new ThreadGroup("@ccms/ployfill-micro-task") const microTaskPool = new ThreadPoolExecutor( - 100, 200, 60, Packages.java.util.concurrent.TimeUnit.SECONDS, + 100, 200, 60, TimeUnit.SECONDS, new LinkedBlockingQueue(300), - (run: any) => new Thread(threadGroup, run, "@ccms/micro-task-" + threadCount.incrementAndGet()), - new ThreadPoolExecutor.CallerRunsPolicy() + new ThreadFactory((run: any) => new Thread(threadGroup, run, "@ccms/micro-task-" + threadCount.incrementAndGet())) ) class Process extends EventEmitter { env = { __noSuchProperty__: (prop) => { - return Packages.java.lang.System.getenv(prop) + return System.getenv(prop) } } - platform = Packages.java.lang.System.getProperty("os.name") + platform = System.getProperty("os.name") + constructor() { + super() + this.on('exit', () => { + console.log(`await microTaskPool termination! queueTask: ${microTaskPool.shutdownNow().size()} remainTask: ${threadGroup.activeCount()}`) + microTaskPool.awaitTermination(3000, TimeUnit.MILLISECONDS) + }) + } on(event: string | symbol, listener: (...args: any[]) => void) { return super.on(event, (...args) => { try { @@ -41,29 +53,40 @@ class Process extends EventEmitter { microTaskPool.execute(func) } exit(code: number) { - process.emit('exit', code) - console.log(`process exit await microTaskPool termination! queueTask: ${microTaskPool.shutdownNow().size()} remainTask: ${threadGroup.activeCount()}`) - microTaskPool.awaitTermination(3000, TimeUnit.MILLISECONDS) + console.log(`process exit by code ${code}!`) + this.emit('exit', code) } } class EventLoop { private eventLoopMainThread = undefined private eventLoopTaskQueue = new DelayQueue() + private taskExecTimeout = 3 + private fixedThreadPool = undefined constructor() { + this.taskExecTimeout = parseInt(process.env.MS_NODE_EVENT_LOOP_TIMEOUT) || 3 + this.fixedThreadPool = new ThreadPoolExecutor( + 1, 1, 0, TimeUnit.SECONDS, + new LinkedBlockingQueue(300), + new ThreadFactory((run: any) => { + let thread = new Thread(run, "@ccms/node-shim/event-loop-exec") + thread.setDaemon(true) + return thread + })) this.eventLoopMainThread = new Thread(() => { let task = undefined try { - while ((task = this.eventLoopTaskQueue.take()) != undefined) { + while (!this.eventLoopMainThread.isInterrupted()) { + task = this.eventLoopTaskQueue.take() try { task.getTask()() - } catch (error) { + } catch (cause) { try { - process.emit('error', error) + process.emit('error', cause) } catch (error) { - console.error(error) - console.ex(error) + console.error(cause) + console.ex(cause) } } } @@ -79,15 +102,52 @@ class EventLoop { } }, "@ccms/node-shim/event-loop") this.eventLoopMainThread.setDaemon(true) - process.on('exit', () => this.eventLoopMainThread.interrupt()) + process.on('exit', () => { + this.eventLoopMainThread.interrupt() + this.fixedThreadPool.shutdownNow() + console.log(`await fixedThreadPool termination!`) + this.fixedThreadPool.awaitTermination(3000, TimeUnit.MILLISECONDS) + }) } startEventLoop() { this.eventLoopMainThread.start() } - private putDelayTask(callback: Function, ms: number) { - this.eventLoopTaskQueue.put(new JavaScriptTask(callback, ms)) + private putDelayTask(id: number, callback: Function, ms: number) { + this.eventLoopTaskQueue.put(new JavaScriptTask(id, callback, ms)) + } + + private limitTimeTask(name: string, callback: Function, ...args: any[]) { + if (!callback) { + throw new Error(`task ${name} callback function can't be null!`) + } + if (this.fixedThreadPool.isShutdown()) { return console.warn(`FixedThreadPool isTerminated! ignore Task ${name}!`) } + try { + this.fixedThreadPool.submit(new Callable({ + call: () => { + try { + callback.apply(undefined, args) + } catch (cause) { + cause = cause.getCause && cause.getCause() || cause + try { + process.emit('error', cause) + } catch (error) { + console.error(cause) + console.ex(cause) + } + } + } + })).get(this.taskExecTimeout, TimeUnit.SECONDS) + } catch (error) { + if (error instanceof InterruptedException) { + return console.warn(`FixedThreadPool isInterrupted exit! Task ${name} exec exit!`) + } + if (error instanceof TimeoutException) { + return console.warn(`Task ${name} => ${callback} exec time greater than ${this.taskExecTimeout}s!`) + } + throw error.getCause && error.getCause() || error + } } private timeoutCount = new AtomicInteger(0) @@ -96,9 +156,9 @@ class EventLoop { let taskId = this.timeoutCount.incrementAndGet() this.timeoutTasks[taskId] = callback console.trace(`create setTimeout task ${taskId} => ${callback}`) - this.putDelayTask(() => { + this.putDelayTask(taskId, () => { if (this.timeoutTasks[taskId]) { - callback(...args) + this.limitTimeTask(`setTimeout-${taskId}`, callback, ...args) } else { console.trace(`ignore setTimeout ${ms}ms task ${taskId} because it's cancelled!`) } @@ -117,13 +177,13 @@ class EventLoop { console.trace(`create setInterval ${ms}ms task ${taskId} => ${callback}`) let intervalTask = () => { if (this.intervalTasks[taskId]) { - callback(...args) - this.putDelayTask(intervalTask, ms) + this.limitTimeTask(`setInterval-${taskId}`, callback, ...args) + this.putDelayTask(taskId, intervalTask, ms) } else { console.trace(`ignore setInterval task ${taskId} because it's cancelled!`) } } - this.putDelayTask(intervalTask, ms) + this.putDelayTask(taskId, intervalTask, ms) return taskId } clearInterval(taskId: number) { @@ -137,7 +197,7 @@ eventLoop.startEventLoop() global.setGlobal('queueMicrotask', (func: any) => microTaskPool.execute(func), {}) global.setGlobal('setTimeout', eventLoop.setTimeout.bind(eventLoop), {}) global.setGlobal('clearTimeout', eventLoop.clearTimeout.bind(eventLoop), {}) -global.setGlobal('setImmediate', (callback: (...args: any[]) => void, ...args: any[]) => eventLoop.setTimeout(callback, 0, ...args), {}) -global.setGlobal('clearImmediate ', eventLoop.clearTimeout.bind(eventLoop), {}) global.setGlobal('setInterval', eventLoop.setInterval.bind(eventLoop), {}) global.setGlobal('clearInterval', eventLoop.clearInterval.bind(eventLoop), {}) +global.setGlobal('setImmediate', (callback: (...args: any[]) => void, ...args: any[]) => eventLoop.setTimeout(callback, 0, ...args), { writable: true }) +global.setGlobal('clearImmediate ', eventLoop.clearTimeout.bind(eventLoop), { writable: true })