js/pqueue.js

68 lines
1.6 KiB
JavaScript
Raw Normal View History

2022-03-31 13:36:48 +00:00
const queryable = promise => {
let result = promise.then(result => {
q.result = result
q.done = true
}).catch(error => {
q.error = error
q.done = true
})
return result
}
2022-03-31 14:17:41 +00:00
export default (parallel = 1) => {
const running = []
const waiting = []
2022-03-31 13:36:48 +00:00
const delayed = new Map()
2022-03-31 14:17:41 +00:00
// Activate a waiting promise
// If it's a function, call it first to get a promise
const activate = item => {
let promise
if (typeof item == "function")
promise = queryable(item())
else
promise = queryable(item)
promise.delayed = delayed.get(item)
delayed.remove(item)
}
// Attempts to start additional operations
// Returns the number of started operations
const start = () => {
const available = parallel - running.length
if (available) {
running.push(waiting.splice(0, parallel - running.length).map(activate))
}
return available - (parallel - running.length)
}
// Resolves as many promises as possible
// returns a tail-call to start
const pump = () => {
for (promise of running.splice(0, running.findIndex(promise => !promise.done))) {
2022-03-31 13:36:48 +00:00
if (promise.result) {
2022-03-31 14:17:41 +00:00
promise.delayed.resolve(promise.result)
2022-03-31 13:36:48 +00:00
} else if (promise.error) {
2022-03-31 14:17:41 +00:00
promise.delayed.reject(promise.error)
2022-03-31 13:36:48 +00:00
} else {
break
}
}
2022-03-31 14:17:41 +00:00
return start()
2022-03-31 13:36:48 +00:00
}
2022-03-31 14:17:41 +00:00
// Loop until there is nothing more to do:
// a) Running queue is full and nothing can be resolved
// b) Waiting queue is empty
const spin = () => { while pump() {} }
2022-03-31 13:36:48 +00:00
return promise => {
2022-03-31 14:17:41 +00:00
waiting.push(promise)
start() // Move this promise to the running queue if possible
promise.finally(spin)
2022-03-31 13:36:48 +00:00
const result = new Promise(fulfill, reject => delayed.set(promise, {fulfill, reject}))
return result
}
}