feat: support full nodejs event loop

Signed-off-by: MiaoWoo <admin@yumc.pw>
This commit is contained in:
MiaoWoo 2020-11-20 10:28:30 +08:00
parent 75b34bfc48
commit cd57944cb8
3 changed files with 103 additions and 45 deletions

View File

@ -158,9 +158,11 @@ export class MiaoScriptConsole implements Console {
var { fileName, lineNumber } = this.readSourceMap(trace.fileName, trace.lineNumber) var { fileName, lineNumber } = this.readSourceMap(trace.fileName, trace.lineNumber)
if (fileName.startsWith(root)) { fileName = fileName.split(root)[1] } if (fileName.startsWith(root)) { fileName = fileName.split(root)[1] }
} else { } else {
for (let prefix in ignoreLogPrefix) { if (!global.debug) {
if (className.startsWith(ignoreLogPrefix[prefix])) { for (let prefix in ignoreLogPrefix) {
return if (className.startsWith(ignoreLogPrefix[prefix])) {
return
}
} }
} }
} }

View File

@ -67,6 +67,7 @@ declare global {
interface Core { interface Core {
getClass(name: String): any getClass(name: String): any
getProxyClass(): any getProxyClass(): any
getJavaScriptTaskClass(): any
getInstance(): any getInstance(): any
read(path: string): string read(path: string): string
save(path: string, content: string): void save(path: string, content: string): void

View File

@ -5,13 +5,15 @@ const ThreadGroup = Java.type("java.lang.ThreadGroup")
const AtomicInteger = Java.type("java.util.concurrent.atomic.AtomicInteger") const AtomicInteger = Java.type("java.util.concurrent.atomic.AtomicInteger")
const ThreadPoolExecutor = Java.type('java.util.concurrent.ThreadPoolExecutor') const ThreadPoolExecutor = Java.type('java.util.concurrent.ThreadPoolExecutor')
const LinkedBlockingQueue = Java.type("java.util.concurrent.LinkedBlockingQueue") const LinkedBlockingQueue = Java.type("java.util.concurrent.LinkedBlockingQueue")
const TimeUnit = Java.type("java.util.concurrent.TimeUnit") const TimeUnit = Java.type('java.util.concurrent.TimeUnit')
const DelayQueue = Java.type('java.util.concurrent.DelayQueue')
const JavaScriptTask = Java.type(base.getJavaScriptTaskClass().name)
const threadCount = new AtomicInteger(0) const threadCount = new AtomicInteger(0)
const threadGroup = new ThreadGroup("@ccms/ployfill-micro-task") const threadGroup = new ThreadGroup("@ccms/ployfill-micro-task")
const microTaskPool = new ThreadPoolExecutor( const microTaskPool = new ThreadPoolExecutor(
10, 100, 60, Packages.java.util.concurrent.TimeUnit.SECONDS, 100, 200, 60, Packages.java.util.concurrent.TimeUnit.SECONDS,
new LinkedBlockingQueue(500), new LinkedBlockingQueue(300),
(run: any) => new Thread(threadGroup, run, "@ccms/micro-task-" + threadCount.incrementAndGet()), (run: any) => new Thread(threadGroup, run, "@ccms/micro-task-" + threadCount.incrementAndGet()),
new ThreadPoolExecutor.CallerRunsPolicy() new ThreadPoolExecutor.CallerRunsPolicy()
) )
@ -38,52 +40,105 @@ class Process extends EventEmitter {
nextTick(func: Function) { nextTick(func: Function) {
microTaskPool.execute(func) microTaskPool.execute(func)
} }
queueMicrotask(func: Function) {
microTaskPool.execute(func)
}
exit(code: number) { exit(code: number) {
process.emit('exit', code) process.emit('exit', code)
microTaskPool.shutdown() microTaskPool.shutdown()
console.log('await microTaskPool termination...') console.log('process exit await microTaskPool termination...')
microTaskPool.awaitTermination(5000, TimeUnit.MILLISECONDS) microTaskPool.awaitTermination(5000, TimeUnit.MILLISECONDS)
} }
} }
const timeoutCount = new AtomicInteger(0)
const timeoutTasks = [] class EventLoop {
function setTimeout(func: Function, time: number, ...args: any[]) { private eventLoopMainThread = undefined
let taskId = timeoutCount.incrementAndGet() private eventLoopTaskQueue = new DelayQueue()
timeoutTasks[taskId] = func
process.nextTick(() => { constructor() {
Thread.sleep(time) this.eventLoopMainThread = new Thread(() => {
if (timeoutTasks[taskId]) { func(...args) } let task = undefined
}) try {
return taskId while ((task = this.eventLoopTaskQueue.take()) != undefined) {
} try {
function clearTimeout(taskId: number) { task.getTask()()
delete timeoutTasks[taskId] } catch (error) {
} try {
const intervalCount = new AtomicInteger(0) process.emit('error', error)
const intervalTasks = [] } catch (error) {
function setInterval(func: Function, time: number, ...args: any[]) { console.error(error)
let taskId = intervalCount.incrementAndGet() console.ex(error)
intervalTasks[taskId] = func }
process.nextTick(() => { }
Thread.sleep(time) }
while (intervalTasks[taskId]) { } catch (error) {
func(...args) console.log(`EventLoop Thread isInterrupted exit! remainTask: ${this.eventLoopTaskQueue.size()}`)
Thread.sleep(time) this.eventLoopTaskQueue.clear()
this.eventLoopTaskQueue = undefined
this.timeoutCount = undefined
this.timeoutTasks = undefined
this.intervalCount = undefined
this.intervalTasks = undefined
this.eventLoopMainThread = undefined
}
}, "@ccms/node-shim/event-loop")
this.eventLoopMainThread.setDaemon(true)
process.on('exit', () => this.eventLoopMainThread.interrupt())
}
startEventLoop() {
this.eventLoopMainThread.start()
}
private putDelayTask(callback: Function, ms: number) {
this.eventLoopTaskQueue.put(new JavaScriptTask(callback, ms))
}
private timeoutCount = new AtomicInteger(0)
private timeoutTasks = []
setTimeout(callback: (...args: any[]) => void, ms: number, ...args: any[]) {
let taskId = this.timeoutCount.incrementAndGet()
this.timeoutTasks[taskId] = callback
console.trace(`create setTimeout task ${taskId} => ${callback}`)
this.putDelayTask(() => {
if (this.timeoutTasks[taskId]) {
callback(...args)
} else {
console.trace(`ignore setTimeout ${ms}ms task ${taskId} because it's cancelled!`)
}
}, ms)
return taskId
}
clearTimeout(taskId: number) {
delete this.timeoutTasks[taskId]
}
private intervalCount = new AtomicInteger(0)
private intervalTasks = []
setInterval(callback: (...args: any[]) => void, ms: number, ...args: any[]) {
let taskId = this.intervalCount.incrementAndGet()
this.intervalTasks[taskId] = callback
console.trace(`create setInterval ${ms}ms task ${taskId} => ${callback}`)
let intervalTask = () => {
if (this.intervalTasks[taskId]) {
callback(...args)
this.putDelayTask(intervalTask, ms)
} else {
console.trace(`ignore setInterval task ${taskId} because it's cancelled!`)
}
} }
}) this.putDelayTask(intervalTask, ms)
return taskId return taskId
} }
function clearInterval(taskId: number) { clearInterval(taskId: number) {
delete intervalTasks[taskId] delete this.intervalTasks[taskId]
}
} }
global.setGlobal('process', new Process(), {}) global.setGlobal('process', new Process(), {})
const eventLoop = new EventLoop()
global.setGlobal('eventLoop', eventLoop, {})
eventLoop.startEventLoop()
global.setGlobal('queueMicrotask', (func: any) => microTaskPool.execute(func), {}) global.setGlobal('queueMicrotask', (func: any) => microTaskPool.execute(func), {})
global.setGlobal('setTimeout', setTimeout, {}) global.setGlobal('setTimeout', eventLoop.setTimeout.bind(eventLoop), {})
global.setGlobal('clearTimeout', clearTimeout, {}) global.setGlobal('clearTimeout', eventLoop.clearTimeout.bind(eventLoop), {})
global.setGlobal('setImmediate', (func: Function, ...args: any[]) => setTimeout(func, 0, ...args), {}) global.setGlobal('setImmediate', (callback: (...args: any[]) => void, ...args: any[]) => eventLoop.setTimeout(callback, 0, ...args), {})
global.setGlobal('clearImmediate ', clearTimeout, {}) global.setGlobal('clearImmediate ', eventLoop.clearTimeout.bind(eventLoop), {})
global.setGlobal('setInterval', setInterval, {}) global.setGlobal('setInterval', eventLoop.setInterval.bind(eventLoop), {})
global.setGlobal('clearInterval', clearInterval, {}) global.setGlobal('clearInterval', eventLoop.clearInterval.bind(eventLoop), {})