|
|
|
const createJob = require('./createJob');
|
|
|
|
const log = require('./utils/log');
|
|
|
|
const getId = require('./utils/getId');
|
|
|
|
|
|
|
|
let schedulerCounter = 0;
|
|
|
|
|
|
|
|
module.exports = () => {
|
|
|
|
const id = getId('Scheduler', schedulerCounter);
|
|
|
|
const workers = {};
|
|
|
|
const runningWorkers = {};
|
|
|
|
let jobQueue = [];
|
|
|
|
|
|
|
|
schedulerCounter += 1;
|
|
|
|
|
|
|
|
const getQueueLen = () => jobQueue.length;
|
|
|
|
const getNumWorkers = () => Object.keys(workers).length;
|
|
|
|
|
|
|
|
const dequeue = () => {
|
|
|
|
if (jobQueue.length !== 0) {
|
|
|
|
const wIds = Object.keys(workers);
|
|
|
|
for (let i = 0; i < wIds.length; i += 1) {
|
|
|
|
if (typeof runningWorkers[wIds[i]] === 'undefined') {
|
|
|
|
jobQueue[0](workers[wIds[i]]);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
const queue = (action, payload) => (
|
|
|
|
new Promise((resolve, reject) => {
|
|
|
|
const job = createJob({ action, payload });
|
|
|
|
jobQueue.push(async (w) => {
|
|
|
|
jobQueue.shift();
|
|
|
|
runningWorkers[w.id] = job;
|
|
|
|
try {
|
|
|
|
resolve(await w[action].apply(this, [...payload, job.id]));
|
|
|
|
} catch (err) {
|
|
|
|
reject(err);
|
|
|
|
} finally {
|
|
|
|
delete runningWorkers[w.id];
|
|
|
|
dequeue();
|
|
|
|
}
|
|
|
|
});
|
|
|
|
log(`[${id}]: Add ${job.id} to JobQueue`);
|
|
|
|
log(`[${id}]: JobQueue length=${jobQueue.length}`);
|
|
|
|
dequeue();
|
|
|
|
})
|
|
|
|
);
|
|
|
|
|
|
|
|
const addWorker = (w) => {
|
|
|
|
workers[w.id] = w;
|
|
|
|
log(`[${id}]: Add ${w.id}`);
|
|
|
|
log(`[${id}]: Number of workers=${getNumWorkers()}`);
|
|
|
|
dequeue();
|
|
|
|
return w.id;
|
|
|
|
};
|
|
|
|
|
|
|
|
const addJob = async (action, ...payload) => {
|
|
|
|
if (getNumWorkers() === 0) {
|
|
|
|
throw Error(`[${id}]: You need to have at least one worker before adding jobs`);
|
|
|
|
}
|
|
|
|
return queue(action, payload);
|
|
|
|
};
|
|
|
|
|
|
|
|
const terminate = async () => {
|
|
|
|
Object.keys(workers).forEach(async (wid) => {
|
|
|
|
await workers[wid].terminate();
|
|
|
|
});
|
|
|
|
jobQueue = [];
|
|
|
|
};
|
|
|
|
|
|
|
|
return {
|
|
|
|
addWorker,
|
|
|
|
addJob,
|
|
|
|
terminate,
|
|
|
|
getQueueLen,
|
|
|
|
getNumWorkers,
|
|
|
|
};
|
|
|
|
};
|