Lession 7
Message Queue - Produce
We are using Celery for messaging and RabbitMQ as broker. The messages are send to an exchnage named dispo. Consumers will then subscribe to this exchange and will get their own queue. (pub-sub)
Add Broker
# deploy/rabbitmq.yaml apiVersion: apps/v1 kind: Deployment metadata: name: rabbitmq labels: app: rabbitmq spec: selector: matchLabels: app: rabbitmq template: metadata: labels: app: rabbitmq spec: containers: - name: rabbitmq image: rabbitmq:3-management ports: - containerPort: 15672 - containerPort: 5672 env: - name: RABBITMQ_DEFAULT_USER value: rabbit - name: RABBITMQ_DEFAULT_PASS value: mysecretpassword resources: limits: memory: 256Mi cpu: 400m --- apiVersion: v1 kind: Service metadata: name: rabbitmq spec: selector: app: rabbitmq ports: - name: management protocol: TCP port: 15672 targetPort: 15672 - name: rabbit protocol: TCP port: 5672 targetPort: 5672
# tiltfile - add k8s_yaml('deploy/rabbitmq.yaml') k8s_resource( 'rabbitmq', port_forwards=['15672:15672', '5672:5672'] )
Watch Tilt Web Console to see the deployment. Afterwards RabbitMQ Manabement Console should be available at http://localhost:15672/. BTW the link is also included in Tilt Web Console.
Add Producer
Add dependencies
poetry add celery poetry add --group dev celery-types
Producer Task
# dispo/dispo/tasks.py import logging import os from celery import Celery from celery.signals import worker_process_init logger = logging.getLogger(__name__) @worker_process_init.connect(weak=False) def init_celery_tracing(*args, **kwargs): logging.debug("init celery") amqp_url = os.environ["AMQP_URL"] app = Celery(broker=amqp_url) app.conf.task_default_exchange = "dispo" @app.task( name="booking.created", exchange="dispo", routing_key="booking", ignore_result=True ) def booking_created(data): raise NotImplementedError("you must not call me directly! Use delay!")
Add trigger
# dispo/dispo/booking.py - add before return of post method logger.info("pushing to queue") tasks.booking_created.delay( {"room": booking.room.name, "start": booking.start, "end": booking.end} )
Test it
curl http://localhost:8080/health curl http://localhost:8080/api/v1/bookings/ -H "Content-Type: application/json" \ -d '{"room_id": 1, "start": "2023-02-04", "end": "2023-02-06"}'
You may want to update your env vars for local debugging
# dispo/.vscode/launch.json # beyond this line "args": ["dispo.main:app", "--reload"], # add this "env": { "DATABASE_URL": "postgresql://dispo:mysecretpassword@localhost/dispo", "AMQP_URL": "amqp://rabbit:mysecretpassword@localhost:5672/" },
Check RabbitMQ Management Console