feat: js task limit exec time default 3s

Signed-off-by: MiaoWoo <admin@yumc.pw>
This commit is contained in:
MiaoWoo 2020-12-17 17:00:00 +08:00
parent ede3f83dd5
commit c61f87a449
2 changed files with 86 additions and 26 deletions

View File

@ -4,9 +4,9 @@ import i18n from '@ccms/i18n'
let ployfillStartTime = new Date().getTime() let ployfillStartTime = new Date().getTime()
i18n.initialize() i18n.initialize()
console.i18n("ms.ployfill.initialize") console.i18n("ms.ployfill.initialize")
require('./es5-ext') import './es5-ext'
require('./node-shim') import './node-shim'
require('core-js') import 'core-js'
//@ts-ignore //@ts-ignore
process.on('exit', () => require.disable()) process.on('exit', () => require.disable())
global.setGlobal('Proxy', require('./proxy').Proxy) global.setGlobal('Proxy', require('./proxy').Proxy)

View File

@ -1,8 +1,14 @@
import { EventEmitter } from 'events' import { EventEmitter } from 'events'
const System = Java.type('java.lang.System')
const Thread = Java.type('java.lang.Thread') const Thread = Java.type('java.lang.Thread')
const Runnable = Java.type('java.lang.Runnable')
const InterruptedException = Java.type('java.lang.InterruptedException')
const ThreadGroup = Java.type("java.lang.ThreadGroup") const ThreadGroup = Java.type("java.lang.ThreadGroup")
const AtomicInteger = Java.type("java.util.concurrent.atomic.AtomicInteger") const AtomicInteger = Java.type("java.util.concurrent.atomic.AtomicInteger")
const Callable = Java.type('java.util.concurrent.Callable')
const ThreadFactory = Java.type("java.util.concurrent.ThreadFactory")
const TimeoutException = Java.type('java.util.concurrent.TimeoutException')
const ThreadPoolExecutor = Java.type('java.util.concurrent.ThreadPoolExecutor') const ThreadPoolExecutor = Java.type('java.util.concurrent.ThreadPoolExecutor')
const LinkedBlockingQueue = Java.type("java.util.concurrent.LinkedBlockingQueue") const LinkedBlockingQueue = Java.type("java.util.concurrent.LinkedBlockingQueue")
const TimeUnit = Java.type('java.util.concurrent.TimeUnit') const TimeUnit = Java.type('java.util.concurrent.TimeUnit')
@ -12,18 +18,24 @@ const JavaScriptTask = Java.type(base.getJavaScriptTaskClass().name)
const threadCount = new AtomicInteger(0) const threadCount = new AtomicInteger(0)
const threadGroup = new ThreadGroup("@ccms/ployfill-micro-task") const threadGroup = new ThreadGroup("@ccms/ployfill-micro-task")
const microTaskPool = new ThreadPoolExecutor( const microTaskPool = new ThreadPoolExecutor(
100, 200, 60, Packages.java.util.concurrent.TimeUnit.SECONDS, 100, 200, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue(300), new LinkedBlockingQueue(300),
(run: any) => new Thread(threadGroup, run, "@ccms/micro-task-" + threadCount.incrementAndGet()), new ThreadFactory((run: any) => new Thread(threadGroup, run, "@ccms/micro-task-" + threadCount.incrementAndGet()))
new ThreadPoolExecutor.CallerRunsPolicy()
) )
class Process extends EventEmitter { class Process extends EventEmitter {
env = { env = {
__noSuchProperty__: (prop) => { __noSuchProperty__: (prop) => {
return Packages.java.lang.System.getenv(prop) return System.getenv(prop)
} }
} }
platform = Packages.java.lang.System.getProperty("os.name") platform = System.getProperty("os.name")
constructor() {
super()
this.on('exit', () => {
console.log(`await microTaskPool termination! queueTask: ${microTaskPool.shutdownNow().size()} remainTask: ${threadGroup.activeCount()}`)
microTaskPool.awaitTermination(3000, TimeUnit.MILLISECONDS)
})
}
on(event: string | symbol, listener: (...args: any[]) => void) { on(event: string | symbol, listener: (...args: any[]) => void) {
return super.on(event, (...args) => { return super.on(event, (...args) => {
try { try {
@ -41,29 +53,40 @@ class Process extends EventEmitter {
microTaskPool.execute(func) microTaskPool.execute(func)
} }
exit(code: number) { exit(code: number) {
process.emit('exit', code) console.log(`process exit by code ${code}!`)
console.log(`process exit await microTaskPool termination! queueTask: ${microTaskPool.shutdownNow().size()} remainTask: ${threadGroup.activeCount()}`) this.emit('exit', code)
microTaskPool.awaitTermination(3000, TimeUnit.MILLISECONDS)
} }
} }
class EventLoop { class EventLoop {
private eventLoopMainThread = undefined private eventLoopMainThread = undefined
private eventLoopTaskQueue = new DelayQueue() private eventLoopTaskQueue = new DelayQueue()
private taskExecTimeout = 3
private fixedThreadPool = undefined
constructor() { constructor() {
this.taskExecTimeout = parseInt(process.env.MS_NODE_EVENT_LOOP_TIMEOUT) || 3
this.fixedThreadPool = new ThreadPoolExecutor(
1, 1, 0, TimeUnit.SECONDS,
new LinkedBlockingQueue(300),
new ThreadFactory((run: any) => {
let thread = new Thread(run, "@ccms/node-shim/event-loop-exec")
thread.setDaemon(true)
return thread
}))
this.eventLoopMainThread = new Thread(() => { this.eventLoopMainThread = new Thread(() => {
let task = undefined let task = undefined
try { try {
while ((task = this.eventLoopTaskQueue.take()) != undefined) { while (!this.eventLoopMainThread.isInterrupted()) {
task = this.eventLoopTaskQueue.take()
try { try {
task.getTask()() task.getTask()()
} catch (error) { } catch (cause) {
try { try {
process.emit('error', error) process.emit('error', cause)
} catch (error) { } catch (error) {
console.error(error) console.error(cause)
console.ex(error) console.ex(cause)
} }
} }
} }
@ -79,15 +102,52 @@ class EventLoop {
} }
}, "@ccms/node-shim/event-loop") }, "@ccms/node-shim/event-loop")
this.eventLoopMainThread.setDaemon(true) this.eventLoopMainThread.setDaemon(true)
process.on('exit', () => this.eventLoopMainThread.interrupt()) process.on('exit', () => {
this.eventLoopMainThread.interrupt()
this.fixedThreadPool.shutdownNow()
console.log(`await fixedThreadPool termination!`)
this.fixedThreadPool.awaitTermination(3000, TimeUnit.MILLISECONDS)
})
} }
startEventLoop() { startEventLoop() {
this.eventLoopMainThread.start() this.eventLoopMainThread.start()
} }
private putDelayTask(callback: Function, ms: number) { private putDelayTask(id: number, callback: Function, ms: number) {
this.eventLoopTaskQueue.put(new JavaScriptTask(callback, ms)) this.eventLoopTaskQueue.put(new JavaScriptTask(id, callback, ms))
}
private limitTimeTask(name: string, callback: Function, ...args: any[]) {
if (!callback) {
throw new Error(`task ${name} callback function can't be null!`)
}
if (this.fixedThreadPool.isShutdown()) { return console.warn(`FixedThreadPool isTerminated! ignore Task ${name}!`) }
try {
this.fixedThreadPool.submit(new Callable({
call: () => {
try {
callback.apply(undefined, args)
} catch (cause) {
cause = cause.getCause && cause.getCause() || cause
try {
process.emit('error', cause)
} catch (error) {
console.error(cause)
console.ex(cause)
}
}
}
})).get(this.taskExecTimeout, TimeUnit.SECONDS)
} catch (error) {
if (error instanceof InterruptedException) {
return console.warn(`FixedThreadPool isInterrupted exit! Task ${name} exec exit!`)
}
if (error instanceof TimeoutException) {
return console.warn(`Task ${name} => ${callback} exec time greater than ${this.taskExecTimeout}s!`)
}
throw error.getCause && error.getCause() || error
}
} }
private timeoutCount = new AtomicInteger(0) private timeoutCount = new AtomicInteger(0)
@ -96,9 +156,9 @@ class EventLoop {
let taskId = this.timeoutCount.incrementAndGet() let taskId = this.timeoutCount.incrementAndGet()
this.timeoutTasks[taskId] = callback this.timeoutTasks[taskId] = callback
console.trace(`create setTimeout task ${taskId} => ${callback}`) console.trace(`create setTimeout task ${taskId} => ${callback}`)
this.putDelayTask(() => { this.putDelayTask(taskId, () => {
if (this.timeoutTasks[taskId]) { if (this.timeoutTasks[taskId]) {
callback(...args) this.limitTimeTask(`setTimeout-${taskId}`, callback, ...args)
} else { } else {
console.trace(`ignore setTimeout ${ms}ms task ${taskId} because it's cancelled!`) console.trace(`ignore setTimeout ${ms}ms task ${taskId} because it's cancelled!`)
} }
@ -117,13 +177,13 @@ class EventLoop {
console.trace(`create setInterval ${ms}ms task ${taskId} => ${callback}`) console.trace(`create setInterval ${ms}ms task ${taskId} => ${callback}`)
let intervalTask = () => { let intervalTask = () => {
if (this.intervalTasks[taskId]) { if (this.intervalTasks[taskId]) {
callback(...args) this.limitTimeTask(`setInterval-${taskId}`, callback, ...args)
this.putDelayTask(intervalTask, ms) this.putDelayTask(taskId, intervalTask, ms)
} else { } else {
console.trace(`ignore setInterval task ${taskId} because it's cancelled!`) console.trace(`ignore setInterval task ${taskId} because it's cancelled!`)
} }
} }
this.putDelayTask(intervalTask, ms) this.putDelayTask(taskId, intervalTask, ms)
return taskId return taskId
} }
clearInterval(taskId: number) { clearInterval(taskId: number) {
@ -137,7 +197,7 @@ eventLoop.startEventLoop()
global.setGlobal('queueMicrotask', (func: any) => microTaskPool.execute(func), {}) global.setGlobal('queueMicrotask', (func: any) => microTaskPool.execute(func), {})
global.setGlobal('setTimeout', eventLoop.setTimeout.bind(eventLoop), {}) global.setGlobal('setTimeout', eventLoop.setTimeout.bind(eventLoop), {})
global.setGlobal('clearTimeout', eventLoop.clearTimeout.bind(eventLoop), {}) global.setGlobal('clearTimeout', eventLoop.clearTimeout.bind(eventLoop), {})
global.setGlobal('setImmediate', (callback: (...args: any[]) => void, ...args: any[]) => eventLoop.setTimeout(callback, 0, ...args), {})
global.setGlobal('clearImmediate ', eventLoop.clearTimeout.bind(eventLoop), {})
global.setGlobal('setInterval', eventLoop.setInterval.bind(eventLoop), {}) global.setGlobal('setInterval', eventLoop.setInterval.bind(eventLoop), {})
global.setGlobal('clearInterval', eventLoop.clearInterval.bind(eventLoop), {}) global.setGlobal('clearInterval', eventLoop.clearInterval.bind(eventLoop), {})
global.setGlobal('setImmediate', (callback: (...args: any[]) => void, ...args: any[]) => eventLoop.setTimeout(callback, 0, ...args), { writable: true })
global.setGlobal('clearImmediate ', eventLoop.clearTimeout.bind(eventLoop), { writable: true })