Distributed timed job queue, backed by redis.
Features
- Support redis cluster.
- Support one or more
timed-queueinstance in a redis instance. Eachtimed-queueinstance are segregated byprefixoption - Support one or more job queues in a timed-queue instance.
- Support one or more timed-queue clients for a timed-queue instance.
Demo
const TimedQueue = require('timed-queue') const timedQueue = new TimedQueue({prefix: 'TQ1', interval: 1000 * 60}) // connect to redis cluster. timedQueue.connect([7000, 7001, 7002]) .on('error', function (error) { console.error(error) }) // create 'event' job queue in timed-queue instance const eventQueue = timedQueue.queue('event') // add 'job' listener eventQueue.on('job', function (jobObj) { // ... just do some thing // ACK the job eventQueue.ackjob(jobObj.job)() }) // add job to queue eventQueue.addjob(eventObj.id, new Date(eventObj.startDate).getTime() - 10 * 60 * 1000)(function (err, res) { console.log(err, res) })
Installation
Job
Job Class:
function Job (queue, job, timing, active, retryCount) { this.queue = queue this.job = job this.timing = timing this.active = active this.retryCount = retryCount }
this.queue: {String} Queue namethis.job: {String} The job's namethis.timing: {Number} The time in millisecond when the job should be activedthis.active: {Number} The actual time in millisecond that the job be activedthis.retryCount: {Number} A job that has been actived but has not been ACK inretrytime will be actived again.retryCountis times that the job re-actived.
API
const TimedQueue = require('timed-queue')
new TimedQueue([options]) => timedQueue object
Return a timedQueue client. It is an EventEmitter instance.
options.prefix: {String} Redis key's prefix, or namespace. Default to"TIMEDQ"options.count: {Number} The maximum job count for queue'sgetjobsmethod. Default to64options.interval: {Number} Interval time for scanning. Default to1000 * 60msoptions.retry: {Number} Retry time for a job. A job that has been actived but has not been ACK inretrytime will be actived again. Default tointerval / 2msoptions.expire: {Number} Expiration time for a job. A job that has been actived and has not been ACK inexpiretime will be removed from the queue. Default tointerval * 5msoptions.accuracy: {Number} Scanning accuracy. Default tointerval / 5options.autoScan: {Boolean} The flag to enable or disable automatic scan. Default totrue. It can be set tofalseif automatic scan is not desired.
const timedQueue = new TimedQueue()
TimedQueue Events
- timedQueue.on('connect', function () {})
- timedQueue.on('error', function (error) {})
- timedQueue.on('close', function () {})
- timedQueue.on('scanStart', function (queuesLength) {})
- timedQueue.on('scanEnd', function (queuesLength, timeConsuming) {})
TimedQueue.prototype.connect([host, options]) => this
TimedQueue.prototype.connect(redisClient) => this
Connect to redis. Arguments are the same as thunk-redis's createClient, or give a thunk-redis instance.
TimedQueue.prototype.scan() => this
Start scanning. It automatically starts after connect method is called unless autoScan is set to false.
TimedQueue.prototype.stop() => this
Stop scanning.
TimedQueue.prototype.close() => this
Close the timedQueue. It closes redis client of the timedQueue accordingly.
TimedQueue.prototype.regulateFreq(factor) => this
It is used to regulate the automatic scanning frequency.
TimedQueue.prototype.destroyQueue(queue[, options]) => this
Remove the queue. It deletes all data in the queue from redis.
TimedQueue.prototype.queue(queue[, options]) => Queue instance
Return a Queue instance if one exists. Otherwise it creates a Queue instance and return it. Queue instance is a EventEmitter instance.
queue: {String} The queue's nameoptions.count: {Number} The maximum job count for queue'sgetjobsmethod. Default to timedQueue'scountoptions.retry: {Number} Retry time for a job. A job that has been actived and has not been ACK inretrytime will be actived again. Default to timedQueue'sretryoptions.expire: {Number} Expiration time for job. A job that has been actived and has not been ACK inexpiretime will be removed from the queue. Default to timedQueue'sexpireoptions.accuracy: {Number} Scanning accuracy, Default to timedQueue'saccuracy
const eventQueue = timedQueue.queue('event', {retry: 1000, expire: 5000})
Queue Events
- queue.on('job', function (job) {})
If no job listener on queue, queue scanning will not run.
Queue.prototype.init([options]) => this
options.count: {Number} The maximum job count for queue'sgetjobsmethod. Default to timedQueue'scountoptions.retry: {Number} Retry time for a job. A job that has been actived and has not been ACK inretrytime will be actived again. Default to timedQueue'sretryoptions.expire: {Number} Expire time for a job. A job that has been actived and has not been ACK inexpiretime will be removed from queue. Default to timedQueue'sexpireoptions.accuracy: {Number} Scanning accuracy. Default to timedQueue'saccuracy
Queue.prototype.addjob(job, timing[, job, timing, ...]) => thunk function
Queue.prototype.addjob([job, timing, job, timing, ...]) => thunk function
Add one or more jobs to the queue. It can be used to update the job's timing.
job: {String} The job's nametiming: {Number} The time in millisecond when the job should be actived. It should greater thanDate.now()
eventQueue.addjob('52b3b5f49c2238313600015d', 1441552050409)(function (err, res) { console.log(err, res) // null, 1 })
Queue.prototype.show(job) => thunk function
Show the job info.
job: {String} job
eventQueue.show('52b3b5f49c2238313600015d')(function (err, res) { console.log(err, res) // { // queue: 'event', // job: '52b3b5f49c2238313600015d', // timing: 1441552050409 // active: 0, // retryCount: 0 // } })
Queue.prototype.deljob(job[, job, ...]) => thunk function
Queue.prototype.deljob([job, job, ...]) => thunk function
Delete one or more jobs.
job: {String} job
eventQueue.deljob('52b3b5f49c2238313600015d')(function (err, res) { console.log(err, res) // null, 1 })
Queue.prototype.getjobs([scanActive]) => thunk function
It is called by Queue.prototype.scan. It should not be called explicitly unless you know what you are doing.
Queue.prototype.ackjob(job[, job, ...]) => thunk function
Queue.prototype.ackjob([job, job, ...]) => thunk function
ACK one or more jobs.
job: {String} job
eventQueue.ackjob('52b3b5f49c2238313600015d')(function (err, res) { console.log(err, res) // null, 1 })
Queue.prototype.scan() => thunk function
It is called by TimedQueue.prototype.scan. It should not be called explicitly unless you know what you are doing.
Queue.prototype.len() => thunk function
Return the queue' length.
eventQueue.len()(function (err, res) { console.log(err, res) // null, 3 })
Queue.prototype.showActive() => thunk function
Return actived jobs in the queue.
eventQueue.showActive()(function (err, res) { console.log(err, res) // null, [jobs...] })
