Celery is an asynchronous task/job queue based on distributed
message passing. celery-shoot allows to queue tasks from Node.js.
If you are new to Celery check out http://celeryproject.org/
Differences with node-celery
-
This library is now based on amqp-coffee, instead of node-amqp.
-
EventEmitterbased code has been removed; only pure callbacks are available. -
Support for the Redis Backend has been removed.
- I will accept pull-requests if you would like to re-add support.
-
Primary Queue / Exchange declaration has been removed. This means if you start up
celery-shooton a fresh RabbitMQ vhost, you'll get an error.To get around this, just start a
celery workeron that vhost first.- Why? If you declared your Queues/Exchanges from node, you need to mirror
the celery settings exactly. If you don't, you need to stop both node &
celery, delete the queues, restart both services (with the correct settings,
or with the
celery workerfirst). This was a big trap, that often came up deploying to production so we're better off without it!
- Why? If you declared your Queues/Exchanges from node, you need to mirror
the celery settings exactly. If you don't, you need to stop both node &
celery, delete the queues, restart both services (with the correct settings,
or with the
Usage
Simple example, included as examples/hello-world.js:
var celery = require('celery-shoot'), client = celery.connectWithUri('amqp://guest:guest@localhost:5672//', function(err){ assert(err == null); var task = client.createTask('tasks.echo'); task.invoke(["Hello Wolrd"], function(err, result){ console.log(err, result); }) });
ETA
The ETA (estimated time of arrival) lets you set a specific date and time that is the earliest time at which your task will be executed:
var celery = require('celery-shoot'), client = celery.connectWithUri('amqp://guest:guest@localhost:5672//', function(err){ assert(err == null); var task = client.createTask('tasks.send_email', { eta: 60 * 60 * 1000 // execute in an hour from invocation }, { ignoreResult: true // ignore results }); task.invoke([], { to: 'to@example.com', title: 'sample email' }) });
Expiration
The expires argument defines an optional expiry time, a specific date and time using Date:
var celery = require('celery-shoot'), client = celery.connectWithUri('amqp://guest:guest@localhost:5672//', function(err){ assert(err == null); var task = client.createTask('tasks.sleep', { eta: 60 * 60 * 1000 // expire in an hour }); task.invoke([2 * 60 * 60], function(err, res){ console.log(err, res); }) });
Routing
The simplest way to route tasks to different queues is using options.routes:
var celery = require('celery-shoot'), client = celery.connectWithUri('amqp://guest:guest@localhost:5672//', { routes: { 'tasks.send_mail': { 'queue': 'mail' } } }, function(err){ assert(err == null); var task = client.createTask('tasks.send_email'); task.invoke([], { to: 'to@example.com', title: 'sample email' }); var task2 = client.createTask('tasks.calculate_rating'); task2.invoke([], { item: 1345 }); });
You can also configure custom routers, similar to http://celery.readthedocs.org/en/latest/userguide/routing.html#routers
var myRouter = function(task, args, kwargs){ if(task === 'myapp.tasks.compress_video'){ return { 'exchange': 'video', 'routingKey': 'video.compress' } } return null; } Client({ routes: [myRouter] });