Async task queue with concurrency control, priority, pause/resume, backpressure, timeout, retry, and stats. Zero dependencies. TypeScript-first.
Install
npm install @corvid-agent/queue
Usage
Queue
Run async tasks with controlled concurrency:
import { Queue } from "@corvid-agent/queue"; const queue = new Queue({ concurrency: 3 }); // Add tasks — returns a promise with the result const result = await queue.add(async () => { const res = await fetch("/api/data"); return res.json(); }); // Add multiple tasks at once const results = await queue.addAll([ () => fetchUser(1), () => fetchUser(2), () => fetchUser(3), ]);
Priority
Higher priority tasks run first:
const queue = new Queue({ concurrency: 1 }); queue.add(() => lowPriorityWork()); // priority: 0 (default) queue.add(() => urgentWork(), { priority: 10 }); // runs first queue.add(() => mediumWork(), { priority: 5 }); // runs second
Backpressure
Limit queue size to prevent unbounded memory growth:
const queue = new Queue({ concurrency: 2, maxSize: 100 }); try { await queue.add(() => work()); } catch (err) { // QueueFullError when queue exceeds maxSize }
Pause / Resume
const queue = new Queue({ autoStart: false }); // starts paused queue.add(() => task1()); queue.add(() => task2()); queue.resume(); // start processing queue.pause(); // stop starting new tasks (running tasks finish) queue.resume(); // continue
Abort
Cancel tasks with AbortController:
const controller = new AbortController(); const promise = queue.add(() => fetchData(), { signal: controller.signal, }); controller.abort(); // removes from queue, rejects with TaskAbortedError
Timeout
Reject tasks that take too long:
import { TaskTimeoutError } from "@corvid-agent/queue"; try { await queue.add(() => slowOperation(), { timeout: 5000 }); } catch (err) { if (err instanceof TaskTimeoutError) { console.log(`Timed out after ${err.timeout}ms`); } }
Retry
Automatically retry failed tasks with optional backoff:
// Constant delay between retries await queue.add(() => flakeyApi(), { retries: 3, retryDelay: 1000, }); // Exponential backoff: 100ms, 200ms, 400ms await queue.add(() => flakeyApi(), { retries: 3, retryDelay: (attempt) => 100 * Math.pow(2, attempt - 1), });
Timeouts and aborts are not retried — only thrown errors trigger retries.
Stats
Track cumulative queue performance:
const queue = new Queue({ concurrency: 5 }); // ... process tasks ... console.log(queue.stats); // { processed: 100, succeeded: 95, failed: 5, retries: 12, timedOut: 2 } queue.resetStats(); // reset all counters
Events
const queue = new Queue({ concurrency: 5 }); queue.on("active", () => console.log(`Task started. Active: ${queue.active}`)); queue.on("completed", (result) => console.log("Task done:", result)); queue.on("error", (err) => console.error("Task failed:", err)); queue.on("idle", () => console.log("All done")); queue.on("empty", () => console.log("Pending queue empty")); // Wait for specific states await queue.onIdle(); // all tasks completed await queue.onEmpty(); // pending queue empty await queue.onDrained(); // everything processed
Convenience Functions
Map, filter, and iterate with concurrency control:
import { map, each, filter } from "@corvid-agent/queue"; // Concurrent map — preserves order const users = await map(userIds, (id) => fetchUser(id), { concurrency: 5 }); // Concurrent side effects await each(emails, (email) => sendNotification(email), { concurrency: 10 }); // Concurrent filter const alive = await filter(servers, async (server) => { const res = await fetch(server.healthUrl); return res.ok; }, { concurrency: 20 });
API
new Queue(options?)
| Option | Type | Default | Description |
|---|---|---|---|
concurrency |
number |
1 |
Max concurrent tasks |
maxSize |
number |
Infinity |
Max pending tasks (backpressure) |
autoStart |
boolean |
true |
Start processing on add |
queue.add(fn, options?)
| Option | Type | Default | Description |
|---|---|---|---|
priority |
number |
0 |
Higher = runs first |
signal |
AbortSignal |
- | Cancel the task |
timeout |
number |
- | Reject after ms with TaskTimeoutError |
retries |
number |
0 |
Retry attempts on failure |
retryDelay |
number | (attempt) => number |
0 |
Delay between retries (ms) |
Properties
| Property | Type | Description |
|---|---|---|
queue.size |
number |
Pending tasks |
queue.active |
number |
Running tasks |
queue.isPaused |
boolean |
Whether paused |
queue.isIdle |
boolean |
Nothing running or pending |
queue.stats |
QueueStats |
Cumulative { processed, succeeded, failed, retries, timedOut } |
Methods
| Method | Description |
|---|---|
add(fn, opts?) |
Add a task, returns Promise<T> |
addAll(fns, opts?) |
Add multiple tasks |
pause() |
Stop starting new tasks |
resume() |
Resume processing |
clear() |
Remove all pending tasks |
onIdle() |
Wait until idle |
onEmpty() |
Wait until pending is empty |
onDrained() |
Wait until all processed |
on(event, listener) |
Add event listener |
off(event, listener) |
Remove event listener |
resetStats() |
Reset all statistics to zero |
map(items, fn, options?)
Concurrent map preserving order.
each(items, fn, options?)
Concurrent iteration.
filter(items, fn, options?)
Concurrent filter preserving order.
Error Classes
QueueFullError— thrown whenmaxSizeis exceededTaskAbortedError— thrown when a task is aborted or queue is clearedTaskTimeoutError— thrown when a task exceeds itstimeout
License
MIT