You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
80 lines
1.9 KiB
80 lines
1.9 KiB
const createJob = require('./createJob'); |
|
const { log } = require('./utils/log'); |
|
const getId = require('./utils/getId'); |
|
|
|
let schedulerCounter = 0; |
|
|
|
module.exports = () => { |
|
const id = getId('Scheduler', schedulerCounter); |
|
const workers = {}; |
|
const runningWorkers = {}; |
|
let jobQueue = []; |
|
|
|
schedulerCounter += 1; |
|
|
|
const getQueueLen = () => jobQueue.length; |
|
const getNumWorkers = () => Object.keys(workers).length; |
|
|
|
const dequeue = () => { |
|
if (jobQueue.length !== 0) { |
|
const wIds = Object.keys(workers); |
|
for (let i = 0; i < wIds.length; i += 1) { |
|
if (typeof runningWorkers[wIds[i]] === 'undefined') { |
|
jobQueue[0](workers[wIds[i]]); |
|
break; |
|
} |
|
} |
|
} |
|
}; |
|
|
|
const queue = (action, payload) => ( |
|
new Promise((resolve, reject) => { |
|
const job = createJob({ action, payload }); |
|
jobQueue.push(async (w) => { |
|
jobQueue.shift(); |
|
runningWorkers[w.id] = job; |
|
try { |
|
resolve(await w[action].apply(this, [...payload, job.id])); |
|
} catch (err) { |
|
reject(err); |
|
} finally { |
|
delete runningWorkers[w.id]; |
|
dequeue(); |
|
} |
|
}); |
|
log(`[${id}]: Add ${job.id} to JobQueue`); |
|
log(`[${id}]: JobQueue length=${jobQueue.length}`); |
|
dequeue(); |
|
}) |
|
); |
|
|
|
const addWorker = (w) => { |
|
workers[w.id] = w; |
|
log(`[${id}]: Add ${w.id}`); |
|
log(`[${id}]: Number of workers=${getNumWorkers()}`); |
|
dequeue(); |
|
return w.id; |
|
}; |
|
|
|
const addJob = async (action, ...payload) => { |
|
if (getNumWorkers() === 0) { |
|
throw Error(`[${id}]: You need to have at least one worker before adding jobs`); |
|
} |
|
return queue(action, payload); |
|
}; |
|
|
|
const terminate = async () => { |
|
Object.keys(workers).forEach(async (wid) => { |
|
await workers[wid].terminate(); |
|
}); |
|
jobQueue = []; |
|
}; |
|
|
|
return { |
|
addWorker, |
|
addJob, |
|
terminate, |
|
getQueueLen, |
|
getNumWorkers, |
|
}; |
|
};
|
|
|