diff --git a/pqueue.js b/pqueue.js index 5aa3f57..fb15b0c 100644 --- a/pqueue.js +++ b/pqueue.js @@ -9,26 +9,58 @@ const queryable = promise => { return result } -export default () => { - const queue = [] +export default (parallel = 1) => { + const running = [] + const waiting = [] const delayed = new Map() - loop = () => { - for (promise of queue.splice(0, queue.findIndex(promise => !promise.done))) { + // Activate a waiting promise + // If it's a function, call it first to get a promise + const activate = item => { + let promise + if (typeof item == "function") + promise = queryable(item()) + else + promise = queryable(item) + + promise.delayed = delayed.get(item) + delayed.remove(item) + } + + // Attempts to start additional operations + // Returns the number of started operations + const start = () => { + const available = parallel - running.length + if (available) { + running.push(waiting.splice(0, parallel - running.length).map(activate)) + } + return available - (parallel - running.length) + } + + // Resolves as many promises as possible + // returns a tail-call to start + const pump = () => { + for (promise of running.splice(0, running.findIndex(promise => !promise.done))) { if (promise.result) { - delayed.get(promise).resolve(promise.result) + promise.delayed.resolve(promise.result) } else if (promise.error) { - delayed.get(promise).reject(promise.error) + promise.delayed.reject(promise.error) } else { break } } + return start() } + // Loop until there is nothing more to do: + // a) Running queue is full and nothing can be resolved + // b) Waiting queue is empty + const spin = () => { while pump() {} } + return promise => { - promise = queryable(promise) - queue.push(promise) - promise.finally(loop) + waiting.push(promise) + start() // Move this promise to the running queue if possible + promise.finally(spin) const result = new Promise(fulfill, reject => delayed.set(promise, {fulfill, reject})) return result }