Refactor pqueue and add more features
This commit is contained in:
parent
8ea23e620d
commit
e56a9a164d
1 changed files with 41 additions and 9 deletions
50
pqueue.js
50
pqueue.js
|
@ -9,26 +9,58 @@ const queryable = promise => {
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
export default () => {
|
export default (parallel = 1) => {
|
||||||
const queue = []
|
const running = []
|
||||||
|
const waiting = []
|
||||||
const delayed = new Map()
|
const delayed = new Map()
|
||||||
|
|
||||||
loop = () => {
|
// Activate a waiting promise
|
||||||
for (promise of queue.splice(0, queue.findIndex(promise => !promise.done))) {
|
// If it's a function, call it first to get a promise
|
||||||
|
const activate = item => {
|
||||||
|
let promise
|
||||||
|
if (typeof item == "function")
|
||||||
|
promise = queryable(item())
|
||||||
|
else
|
||||||
|
promise = queryable(item)
|
||||||
|
|
||||||
|
promise.delayed = delayed.get(item)
|
||||||
|
delayed.remove(item)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Attempts to start additional operations
|
||||||
|
// Returns the number of started operations
|
||||||
|
const start = () => {
|
||||||
|
const available = parallel - running.length
|
||||||
|
if (available) {
|
||||||
|
running.push(waiting.splice(0, parallel - running.length).map(activate))
|
||||||
|
}
|
||||||
|
return available - (parallel - running.length)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resolves as many promises as possible
|
||||||
|
// returns a tail-call to start
|
||||||
|
const pump = () => {
|
||||||
|
for (promise of running.splice(0, running.findIndex(promise => !promise.done))) {
|
||||||
if (promise.result) {
|
if (promise.result) {
|
||||||
delayed.get(promise).resolve(promise.result)
|
promise.delayed.resolve(promise.result)
|
||||||
} else if (promise.error) {
|
} else if (promise.error) {
|
||||||
delayed.get(promise).reject(promise.error)
|
promise.delayed.reject(promise.error)
|
||||||
} else {
|
} else {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return start()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Loop until there is nothing more to do:
|
||||||
|
// a) Running queue is full and nothing can be resolved
|
||||||
|
// b) Waiting queue is empty
|
||||||
|
const spin = () => { while pump() {} }
|
||||||
|
|
||||||
return promise => {
|
return promise => {
|
||||||
promise = queryable(promise)
|
waiting.push(promise)
|
||||||
queue.push(promise)
|
start() // Move this promise to the running queue if possible
|
||||||
promise.finally(loop)
|
promise.finally(spin)
|
||||||
const result = new Promise(fulfill, reject => delayed.set(promise, {fulfill, reject}))
|
const result = new Promise(fulfill, reject => delayed.set(promise, {fulfill, reject}))
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue