A modular, extensible CDC platform for streaming changes from operational databases into multiple sink destinations with exactly-once delivery semantics.
Overview
CDC Platform owns the full pipeline from source database to sink destination. It provisions transport resources, deploys source connectors, manages consumer groups and offset lifecycle, monitors schemas, and routes events to configurable sinks — webhooks, PostgreSQL replicas, and Apache Iceberg lakehouse tables.
Supported source databases: PostgreSQL (logical replication), MySQL (binlog), MongoDB (change streams), and SQL Server (CDC tables). Each is configured with a single source_type field; the platform handles connector deployment, topic naming, and snapshot behavior automatically.
The platform uses a transport-agnostic architecture — the core pipeline is decoupled from any specific event transport through protocol-based abstractions. Three transports are supported: Kafka (default, including MSK IAM and GCP Managed Kafka auth), Google Pub/Sub, and Amazon Kinesis. Non-Kafka transports use a direct WAL reader instead of Debezium. Events are delivered with exactly-once guarantees through min-watermark offset commits and idempotent writes. See docs/transports.md for the full transport comparison, configuration, and migration guide.
┌─────────────────── CDC Platform ──────────────────────────────┐
│ │
│ ┌─────────────┐ ┌──────────────────────────────┐ │
│ │ Provisioner │──▸│ EventSource (transport) │ │
│ │ (topics + │ │ ┌────────────────────────┐ │ │
│ │ connector) │ │ │ Kafka / Pub/Sub / PG │ │ │
│ └─────────────┘ │ └────────────┬───────────┘ │ │
│ └───────────────┼──────────────┘ │
│ SourceEvent stream │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ ▼ ▼ ▼ │
│ Queue(p0) Queue(p1) Queue(p2) │
│ │ │ │ │
│ Worker(p0) Worker(p1) Worker(p2)──▸ sinks │
│ │
│ SourceMonitor (schema + lag) table maintenance │
│ ErrorRouter (DLQ) │
└───────────────────────────────────────────────────────────────┘
▲ │
source DB sink destinations:
Postgres · MySQL Webhook, Postgres, Iceberg
MongoDB · SQL Server
The source database and sink destinations are the only components outside the platform boundary. The platform provisions transport resources, deploys source connectors, manages consumer groups and offsets, monitors schemas, and routes events to sinks. The transport layer is abstracted behind protocols (EventSource, Provisioner, ErrorRouter, SourceMonitor), allowing the core pipeline to work with any event transport.
Features
- Multi-transport support — protocol-based abstractions (
EventSource,Provisioner,ErrorRouter,SourceMonitor) decouple the pipeline from any specific event transport. Three transports are supported: Kafka (default), Google Pub/Sub, and Amazon Kinesis. - Multi-sink fan-out — each CDC event is dispatched concurrently to all enabled sinks
- Backpressure — bounded per-partition queues prevent unbounded memory growth when sinks are slow
- Parallel consumption — per-partition async workers process independently within a single asyncio process
- Exactly-once delivery — min-watermark offset commits ensure no data loss; idempotent upserts handle replay
- Schema evolution monitoring — polls Schema Registry for version changes, optionally halts on incompatible changes
- Dead Letter Queue — per-sink failures are routed to an error destination with full diagnostic headers
- Avro serialization — schema evolution managed by Confluent Schema Registry (Kafka transport)
- Managed Kafka auth — AWS MSK (IAM), GCP Managed Kafka (OAuth), SASL PLAIN/SCRAM with config-level auth fields
- Direct WAL reader — for Pub/Sub and Kinesis, replaces Debezium with a native PostgreSQL logical replication reader
- Retry with backoff — configurable exponential backoff with jitter on all sink writes
- Defaults-based config — built-in defaults with per-pipeline overrides, validated by Pydantic
- Lakehouse maintenance — background compaction and snapshot expiry for Iceberg tables
- Time travel & rollback — CLI commands for point-in-time queries and snapshot rollback
- CLI tooling — validate configs, deploy connectors, check health, debug-consume, and run pipelines
- Observability — structured logging (structlog), health probes, consumer lag metrics per partition
High-Throughput Mode
The platform supports config-driven high-throughput tuning — batch polling, parallel deserialization, interval-based commits, and offloaded Iceberg writes. All new config fields default to values that preserve current behavior; operators opt into high-throughput mode by setting config values.
See docs/scaling.md for the full scaling guide, tuning knobs, capacity planning, and a pre-built high-throughput profile.
Quick start:
cdc run pipeline.yaml --platform-config src/cdc_platform/config/defaults/platform-high-throughput.yaml
Deployment
For production Kubernetes deployments, see docs/deployment.md — covers the platform Helm chart, pipeline Docker images, health probes, configuration, and a production checklist.
Quick Docker example:
docker run --rm \ -v ./pipeline.yaml:/config/pipeline.yaml \ -v ./platform.yaml:/config/platform.yaml \ -e CDC_SOURCE_PASSWORD=secret \ -p 8080:8080 \ ghcr.io/baselyne-systems/cdc-platform:latest
Quick Start
Prerequisites
- Python 3.12+
- uv (recommended) or pip
- Docker and Docker Compose
1. Start the infrastructure
This starts the base stack: PostgreSQL (logical replication pre-configured), Kafka (KRaft mode), Schema Registry, Debezium Connect (all 4 source connectors installed), and Kafka UI.
| Service | Port | URL | Compose file |
|---|---|---|---|
| PostgreSQL | 5432 | localhost:5432 |
base |
| Kafka | 9092 | localhost:9092 |
base |
| Schema Registry | 8081 | http://localhost:8081 |
base |
| Kafka Connect | 8083 | http://localhost:8083 |
base |
| Kafka UI | 8080 | http://localhost:8080 |
base |
| MongoDB | 27017 | localhost:27017 |
docker/docker-compose.mongodb.yml |
| SQL Server | 1433 | localhost:1433 |
docker/docker-compose.sqlserver.yml |
To start with MongoDB or SQL Server:
# MongoDB (single-node replica set, CDC pre-seeded) docker compose -f docker/docker-compose.yml -f docker/docker-compose.mongodb.yml up -d # SQL Server (Developer edition, CDC enabled on cdc_demo) docker compose -f docker/docker-compose.yml -f docker/docker-compose.sqlserver.yml up -d
2. Install the platform
uv sync # core dependencies uv sync --extra postgres # + PostgreSQL sink uv sync --extra iceberg # + Iceberg sink uv sync --extra gcp # + Google Pub/Sub transport uv sync --extra aws # + Amazon Kinesis transport uv sync --extra wal # + Direct WAL reader (required for Pub/Sub + Kinesis) uv sync --extra dev # + dev/test tools
Or with pip:
pip install -e ".[postgres,iceberg,dev]" pip install -e ".[gcp,wal]" # For Pub/Sub transport pip install -e ".[aws,wal]" # For Kinesis transport
3. Check platform health
4. Validate and run a pipeline
cdc validate examples/demo-config.yaml cdc run examples/demo-config.yaml
Configuration
Configuration is split into two files that enforce a strict boundary between platform infrastructure and pipeline definitions.
Pipeline config (pipeline.yaml)
This is the only file users need to write. It defines what to capture and where to send it:
pipeline_id: demo topic_prefix: cdc source: database: cdc_demo password: cdc_password tables: - public.customers - public.orders sinks: - sink_id: webhook-notifications sink_type: webhook enabled: true webhook: url: http://localhost:8000/cdc-events method: POST headers: X-Source: cdc-platform timeout_seconds: 10 - sink_id: analytics-db sink_type: postgres enabled: false postgres: host: localhost port: 5433 database: analytics target_table: public.cdc_events batch_size: 50 upsert: true - sink_id: iceberg-lake sink_type: iceberg enabled: false iceberg: catalog_uri: "sqlite:////tmp/cdc_catalog.db" warehouse: "file:///tmp/cdc_warehouse" table_name: customers_cdc write_mode: append batch_size: 500 auto_create_table: true
Pipeline configs are validated strictly — including kafka, connector, or dlq keys in a pipeline YAML will raise a validation error. These belong in the platform config.
Platform config (platform.yaml)
Configures the platform's managed infrastructure: transport mode, Kafka, Debezium Connect, Schema Registry, DLQ behavior, and pipeline tuning. All fields have sensible defaults, so this file is optional for local development.
transport_mode: kafka # Event transport: kafka | pubsub | kinesis kafka: bootstrap_servers: kafka-prod:9092 schema_registry_url: http://registry-prod:8081 connector: connect_url: http://connect-prod:8083
Only specify fields that differ from the defaults. See examples/platform.yaml for Kafka, examples/platform-pubsub.yaml for Pub/Sub, examples/platform-kinesis.yaml for Kinesis, and docs/transports.md for the full reference.
Environment variables
Secrets and environment-specific values use ${VAR} or ${VAR:-default} syntax in YAML files:
# In pipeline.yaml source: database: ${CDC_SOURCE_DATABASE:-cdc_demo} password: ${CDC_SOURCE_PASSWORD:-cdc_password} # In platform.yaml kafka: bootstrap_servers: ${CDC_KAFKA_BOOTSTRAP_SERVERS:-localhost:9092}
Source configuration (external)
The source database that CDC captures changes from. The platform connects to it via Debezium.
| Field | Default | Description |
|---|---|---|
source_type |
postgres |
Source type: postgres | mysql | mongodb | sqlserver |
host |
localhost |
Database host |
port |
5432 |
Database port (postgres: 5432, mysql: 3306, mongodb: 27017, sqlserver: 1433) |
database |
(required) | Database name |
username |
cdc_user |
Database user |
password |
cdc_password |
Database password |
tables |
[] |
Tables/collections to capture. Format: schema.table (postgres/mysql/sqlserver) or db.collection (mongodb) |
snapshot_mode |
initial |
Debezium snapshot mode: initial | never | when_needed | no_data |
slot_name |
cdc_slot |
PostgreSQL only — replication slot name; must be unique per pipeline |
publication_name |
cdc_publication |
PostgreSQL only — publication name |
mysql_server_id |
1 |
MySQL only — server ID; must be unique across the replication topology |
replica_set_name |
None |
MongoDB only — replica set name (e.g. rs0); omit for Atlas/SRV URIs |
auth_source |
admin |
MongoDB only — authentication database |
Platform internals
These configure the platform's managed infrastructure. They live in platform.yaml (not in pipeline configs). The defaults are production-ready for most deployments.
Transport — Selects the event transport backend. The platform dispatches to transport-specific implementations via a factory based on this setting.
| Field | Default | Description |
|---|---|---|
transport_mode |
kafka |
Event transport backend: kafka, pubsub, or kinesis. |
When transport_mode is kafka, the kafka and connector sections are required. When pubsub, the pubsub and wal_reader sections are required. When kinesis, the kinesis and wal_reader sections are required. See docs/transports.md for full configuration reference.
Kafka (kafka.*) — Required when transport_mode: kafka. The platform provisions topics, manages consumer groups, commits offsets, and monitors schemas through Kafka and the Schema Registry.
| Field | Default | Description |
|---|---|---|
kafka.bootstrap_servers |
localhost:9092 |
Kafka broker(s) |
kafka.schema_registry_url |
http://localhost:8081 |
Confluent Schema Registry URL |
kafka.group_id |
cdc-platform |
Consumer group ID |
kafka.auto_offset_reset |
earliest |
Where to start consuming when no committed offset exists |
kafka.enable_idempotence |
true |
Idempotent producer for DLQ writes |
kafka.acks |
all |
Producer acknowledgment level |
kafka.topic_num_partitions |
1 |
Default partition count for auto-created topics |
kafka.topic_replication_factor |
1 |
Default replication factor for auto-created topics |
kafka.poll_batch_size |
1 |
Messages fetched per poll call. 1 = single-poll (current). Higher = batch poll. |
kafka.deser_pool_size |
1 |
Thread pool for parallel Avro deserialization. 1 = no pool. |
kafka.commit_interval_seconds |
0.0 |
0.0 = per-event sync commit. >0 = periodic async commit at this interval. |
Debezium connector (connector.*) — Required when transport_mode: kafka. The platform deploys and manages the Debezium source connector via the Kafka Connect REST API.
| Field | Default | Description |
|---|---|---|
connector.connect_url |
http://localhost:8083 |
Kafka Connect REST endpoint |
connector.timeout_seconds |
30.0 |
HTTP timeout for Connect API calls |
connector.retry_max_attempts |
5 |
Retries on connector deployment |
connector.retry_wait_seconds |
2.0 |
Wait between retries |
Pipeline tuning — Controls backpressure, parallelism, schema monitoring, and lag reporting.
| Field | Default | Description |
|---|---|---|
max_buffered_messages |
1000 |
Max messages queued per partition before backpressure kicks in. Total memory ≈ this × num_partitions × avg_msg_size. |
schema_monitor_interval_seconds |
30.0 |
How often to poll Schema Registry for version changes. Lower = faster detection, higher registry load. |
lag_monitor_interval_seconds |
15.0 |
How often to query and log consumer lag. Creates a short-lived Kafka admin connection on each poll. |
stop_on_incompatible_schema |
false |
When true, halt the pipeline on backward-incompatible schema changes (field removal, type narrowing). When false, log and continue. |
health_port |
8080 |
TCP port for the HTTP health server (/healthz, /readyz). |
health_enabled |
true |
Enable the health HTTP server for Kubernetes liveness/readiness probes. |
Dead Letter Queue (dlq.*) — Platform-managed error routing for sink write failures.
| Field | Default | Description |
|---|---|---|
dlq.enabled |
true |
Enable DLQ routing |
dlq.topic_suffix |
dlq |
Suffix appended to source topic for DLQ topic name |
dlq.max_retries |
3 |
Max retries before routing to DLQ |
dlq.include_headers |
true |
Include diagnostic headers on DLQ messages |
dlq.flush_interval_seconds |
0.0 |
0.0 = sync flush per DLQ message. >0 = non-blocking produce with periodic flush. |
Sink destinations (external)
Sinks are the external systems the platform writes to. Each sink is independently configured, retried, and monitored — one sink's failure doesn't affect others.
Webhook — HTTP POST/PUT/PATCH to any endpoint. Unbuffered (one request per event). Payload format: {key, value, metadata: {topic, partition, offset}}.
PostgreSQL — Batched inserts to a target table. Columns: event_key, event_value, source_topic, source_partition, source_offset. Set upsert: true for idempotent writes via ON CONFLICT (requires a unique constraint on source_topic, source_partition, source_offset).
Apache Iceberg — Batched appends or upserts to an Iceberg table. CDC metadata is stored in _cdc_topic, _cdc_partition, _cdc_offset columns. Supports auto table creation, partitioning, S3/local warehouses, and background table maintenance (compaction + snapshot expiry). See the maintenance nested config below and docs/lakehouse.md for full documentation.
Iceberg table maintenance (iceberg.maintenance.*) — Background service for compaction and snapshot expiry. Disabled by default.
| Field | Default | Description |
|---|---|---|
maintenance.enabled |
false |
Enable background maintenance |
maintenance.expire_snapshots_interval_seconds |
3600.0 |
Snapshot expiry poll interval |
maintenance.expire_snapshots_older_than_seconds |
86400.0 |
Expire snapshots older than this |
maintenance.compaction_interval_seconds |
7200.0 |
Compaction poll interval |
maintenance.compaction_file_threshold |
10 |
Min files before compaction triggers |
maintenance.compaction_max_rows_per_batch |
500000 |
Safety limit per compaction pass |
iceberg.flush_interval_seconds |
0.0 |
0.0 = flush only at batch_size. >0 = periodic flush for partial batches. |
iceberg.write_executor_threads |
0 |
0 = writes on event loop. >0 = writes offloaded to thread pool. |
Retry configuration
All sinks share a retry config (configured in platform.yaml as a global default):
| Field | Default | Description |
|---|---|---|
max_attempts |
5 |
Maximum retry attempts |
initial_wait_seconds |
1.0 |
Initial backoff delay |
max_wait_seconds |
60.0 |
Maximum backoff delay |
multiplier |
2.0 |
Exponential multiplier |
jitter |
true |
Add randomized jitter |
CLI
cdc validate <pipeline.yaml> Validate pipeline config (shows transport mode)
cdc validate <pipeline.yaml> --platform-config prod.yaml Validate with custom platform
cdc deploy <pipeline.yaml> Provision transport resources (topics + connector)
cdc deploy <pipeline.yaml> --platform-config prod.yaml Deploy with custom platform
cdc undeploy <pipeline.yaml> Teardown transport resources (remove connector)
cdc undeploy <pipeline.yaml> --platform-config prod.yaml Undeploy with custom platform
cdc health Health with defaults (transport-aware)
cdc health --platform-config prod.yaml Health from platform config
cdc consume <pipeline.yaml> Debug consumer via configured transport
cdc consume <pipeline.yaml> --platform-config prod.yaml Consume with custom platform
cdc run <pipeline.yaml> Run with default platform
cdc run <pipeline.yaml> --platform-config prod.yaml Run with custom platform
cdc lakehouse snapshots <pipeline.yaml> List Iceberg table snapshots
cdc lakehouse query <pipeline.yaml> --snapshot-id 123 Time-travel query at snapshot
cdc lakehouse rollback <pipeline.yaml> --snapshot-id 123 Rollback to snapshot (with --yes)
Architecture
Platform boundary
The platform manages everything between the source database and the sink destinations. The config separation reinforces this boundary: the platform config governs everything inside the boundary (transport, DLQ, monitoring, tuning), while the pipeline config governs the external connections (source database, sink destinations).
The core pipeline operates through four transport-agnostic protocols:
EventSource— Consumes events from the transport and deliversSourceEventobjects to the pipeline. Each event carrieskey,value,topic,partition,offset, and arawreference to the original transport message.Provisioner— Creates transport resources (topics, subscriptions, connectors) before the pipeline starts consuming.ErrorRouter— Routes failed events to a dead-letter destination with diagnostic metadata.SourceMonitor— Background monitoring for schema changes and consumer lag.
A factory (sources/factory.py) dispatches on transport_mode to create the appropriate implementation. Three transports are supported:
- Kafka (default) — Topics are provisioned automatically. Debezium captures WAL/binlog changes and publishes Avro to Kafka. The platform owns the consumer group, offset lifecycle, DLQ topics, and schema monitoring. Supports MSK IAM, GCP Managed Kafka OAuth, and SASL PLAIN/SCRAM auth.
- Google Pub/Sub — Topics and subscriptions are provisioned automatically. A direct WAL reader replaces Debezium, reading PostgreSQL logical replication and publishing JSON to Pub/Sub. Virtual partitions via consistent hashing preserve per-partition ordering.
- Amazon Kinesis — Streams are provisioned automatically. A direct WAL reader publishes JSON to Kinesis. Per-shard reader tasks map shards to partitions. Sequence numbers are checkpointed to DynamoDB.
The source database and sink destinations are the only things outside the platform boundary. The source is read-only (captured via Debezium or direct WAL reader). Sinks are write-only (the platform pushes events to them).
Pipeline lifecycle
- Transport provisioning — The
Provisionercreates transport resources. For Kafka: auto-creates topics based on source tables and topic prefix (e.g.cdc.public.customers,cdc.public.customers.dlq) and registers the appropriate Debezium connector (PostgreSQL, MySQL, MongoDB, or SQL Server) via the Kafka Connect REST API. - Error router initialization — The
ErrorRouter(DLQ handler) is created for routing failed events. - Sink initialization — All enabled sinks are started (connections opened, clients initialized).
- Event source creation — The
EventSourceis created for the configured transport mode. - Source monitor started — The
SourceMonitorbegins background polling for schema changes and consumer lag. - Health server started — HTTP health endpoints (
/healthz,/readyz) become available for Kubernetes probes. - Consume loop — The
EventSourcepolls the transport, converts native messages toSourceEventobjects, and enqueues to per-partition bounded queues. - Parallel dispatch — Each partition has its own async worker that drains its queue and dispatches to all sinks concurrently.
- Offset management — Offsets are committed at the min-watermark across all sinks via
EventSource.commit_offsets(). - Graceful shutdown — On SIGINT/SIGTERM, health server stops (readiness probe fails, traffic drains), source monitor is stopped, partition workers are cancelled, all sinks are flushed and stopped, and a final offset commit is issued.
Exactly-once delivery
The platform uses min-watermark offset commits combined with idempotent writes to achieve exactly-once delivery semantics:
- Kafka offsets are not committed per-message. Instead, each sink tracks the highest offset it has durably flushed (
flushed_offsets). - After each dispatch, the runner computes the minimum flushed offset across all sinks for each partition. Only this watermark is committed to Kafka.
- On crash recovery, Kafka re-delivers from the last committed offset. Idempotent sinks (PostgreSQL with
upsert: true, Iceberg in upsert mode) handle duplicates transparently.
This means a slow sink (e.g. Iceberg with batch_size=1000) won't cause data loss for a fast sink (e.g. webhook) — the committed offset is always safe for all sinks.
Backpressure
The pipeline uses bounded asyncio.Queues per partition to prevent unbounded memory growth.
EventSource poll loop Per-partition workers
│ │
├─ poll() ──▸ SourceEvent │
│ │ │
│ ▼ │
│ await queue.put(event) ──▸ [Queue maxsize=N] ──▸ worker drains ──▸ sinks
│ │ │
│ blocks if full │
│ (backpressure) │
How it works: When sinks are slow, queues fill up. await queue.put() blocks, which blocks the event source's poll loop, which causes the transport to stop fetching new messages. Once sinks catch up and workers drain the queue, the source resumes polling.
The queue size per partition is set to max_buffered_messages (default: 1000). This is per-partition, not shared — each (topic, partition) pair gets its own queue with that maxsize.
Tuning considerations:
- Too small (e.g. 10) — Frequent backpressure pauses, low throughput. The consumer spends more time blocked than polling.
- Too large (e.g. 100,000) — Defeats the purpose. Memory grows proportionally to
max_buffered_messages × num_partitions × avg_message_size. - Kafka session timeout — If backpressure blocks the poll loop longer than
session.timeout.ms(default 45s), the broker will consider the consumer dead and trigger a rebalance. Size the queue and sink throughput so the worst-case drain time stays well under this timeout.
Parallel Consumption
The pipeline runs as a single-process asyncio application. Each assigned partition gets its own async worker task:
┌─ Queue(p0) ──▸ Worker(p0) ──▸ dispatch ──▸ sinks
EventSource poll ──▸ enqueue ──▸ ├─ Queue(p1) ──▸ Worker(p1) ──▸ dispatch ──▸ sinks
└─ Queue(p2) ──▸ Worker(p2) ──▸ dispatch ──▸ sinks
The event source poll loop only enqueues SourceEvent objects to the correct partition's queue; dispatch happens in parallel per partition. A slow Iceberg flush on partition 0 does not block partition 1.
Rebalance handling: When the transport triggers a rebalance (new consumer joins, existing one leaves), the pipeline uses on_assign/on_revoke callbacks passed to EventSource.start():
on_assign— Creates a new bounded queue + async worker task for each newly assigned partition.on_revoke— Cancels the worker task and discards the queue for each revoked partition. Queued but uncommitted messages are safe to discard because their offsets were never committed — the new partition owner will re-consume them from the last committed offset.
Why single-process asyncio (not multiprocessing)? CDC workloads are I/O-bound (network calls to sinks, S3 flushes, HTTP webhooks). Asyncio provides concurrency without the overhead of inter-process communication or GIL workarounds. For CPU-bound transformations, individual sinks can use loop.run_in_executor() to offload to a thread pool.
Schema Evolution
A SchemaMonitor background task polls the Confluent Schema Registry for subject version changes. For each topic, both the key and value subjects are checked ({topic}-key and {topic}-value).
Detection flow:
- On startup, the monitor records the current version for each subject as a baseline — no alerts are raised.
- On each poll interval (
schema_monitor_interval_seconds, default 30s), the monitor fetches/subjects/{subject}/versions/latestfrom the registry. - If the version has changed since the last observation:
- A structured log event is emitted:
schema.version_changedwithsubject,previous_version,new_version,schema_id, andcompatiblefields. - If
stop_on_incompatible_schema=True, the monitor posts the previous schema to the registry's/compatibility/subjects/{subject}/versions/latestendpoint to check if the old schema is still backward-compatible with the new one. If not (field removed, type narrowed, etc.), it logsschema.incompatible_change_detectedat ERROR level and calls the pipeline'sstop()method. - If
stop_on_incompatible_schema=False(default), the version change is logged at INFO level and the pipeline continues.
- A structured log event is emitted:
How schema changes interact with sinks:
| Sink | Behavior on schema change |
|---|---|
| Iceberg | Auto-evolves the table schema (PyIceberg handles column additions natively) |
| PostgreSQL | Column mismatch causes a write error → event routes to DLQ |
| Webhook | Passes through the new payload shape transparently |
The schema monitor provides early warning so operators can act before DLQ volume spikes. With stop_on_incompatible_schema=True, the pipeline halts preemptively on breaking changes rather than flooding the DLQ.
Registry errors (network failures, 5xx responses) are logged at DEBUG level and silently skipped — the monitor retries on the next interval. The pipeline is never stopped due to a registry connectivity issue.
Observability
Consumer lag monitoring — A LagMonitor runs as a periodic async task (interval: lag_monitor_interval_seconds, default 15s). It queries Kafka for the committed offset and high watermark of each partition and logs a structured event:
event: consumer.lag
total_lag: 1247
partitions:
- topic: cdc.public.customers, partition: 0, lag: 823, offset: 4177
- topic: cdc.public.customers, partition: 1, lag: 424, offset: 5576
Lag data is also included in the health endpoint response under the consumer_lag key.
HTTP health server — When health_enabled: true (default), the pipeline starts a lightweight async HTTP server on health_port (default 8080) for Kubernetes probes:
| Endpoint | Purpose | K8s Probe |
|---|---|---|
GET /healthz |
Liveness — always returns 200 if the process is running | livenessProbe |
GET /readyz |
Readiness — delegates to Pipeline.health(), returns 200 if healthy, 503 if any component reports an error |
readinessProbe |
Health endpoint — Pipeline.health() returns:
{
"pipeline_id": "demo",
"source": {"connector": {"state": "RUNNING"}, "tasks": [{"state": "RUNNING"}]},
"sinks": [
{"sink_id": "wh1", "status": "running"},
{"sink_id": "iceberg-lake", "status": "running"}
],
"consumer_lag": [
{"topic": "cdc.public.customers", "partition": 0, "lag": 823, "offset": 4177}
]
}Structured log events reference:
| Event | Level | When |
|---|---|---|
pipeline.partitions_assigned |
INFO | Kafka rebalance assigns partitions |
pipeline.partitions_revoked |
INFO | Kafka rebalance revokes partitions |
schema.version_changed |
INFO/WARN | Schema Registry version changed (WARN if incompatible) |
schema.incompatible_change_detected |
ERROR | Backward-incompatible schema change detected |
consumer.lag |
INFO | Periodic lag report |
pipeline.sink_write_error |
ERROR | Sink write failed (routed to DLQ) |
Dead Letter Queue
When a sink write fails (after retries), the ErrorRouter routes the event to a dead-letter destination. For the Kafka transport, this is a DLQ topic ({source_topic}.dlq) with diagnostic headers:
dlq.source_topic,dlq.source_partition,dlq.source_offsetdlq.error_type,dlq.error_message,dlq.stacktracedlq.timestamp,dlq.sink_id
Other sinks are not affected by one sink's failure.
Topic naming
Topic format varies by source type to match Debezium's connector conventions:
| Source type | CDC topic format | Example |
|---|---|---|
postgres |
{prefix}.{schema}.{table} |
cdc.public.customers |
mysql |
{prefix}.{db}.{table} |
cdc.mydb.customers |
mongodb |
{prefix}.{db}.{collection} |
cdc.mydb.events |
sqlserver |
{prefix}.{database}.{schema}.{table} |
cdc.prod_db.dbo.customers |
DLQ topic: {cdc_topic}.{dlq_suffix} (default suffix: dlq), e.g. cdc.public.customers.dlq.
Project Structure
src/cdc_platform/
├── cli.py # Typer CLI (validate, deploy, health, consume, run)
├── config/
│ ├── models.py # Pydantic configuration models (incl. TransportMode)
│ ├── loader.py # YAML + env var config loader
│ ├── defaults.py # Default config loading and merging
│ └── defaults/
│ ├── platform.yaml # Default platform infrastructure config
│ └── pipeline.yaml # Default pipeline config (source defaults)
├── sources/
│ ├── base.py # SourceEvent dataclass + EventSource protocol
│ ├── provisioner.py # Provisioner protocol
│ ├── error_router.py # ErrorRouter protocol
│ ├── monitor.py # SourceMonitor protocol
│ ├── factory.py # Factory functions (transport_mode → implementations)
│ ├── debezium/
│ │ ├── client.py # Async Kafka Connect REST client
│ │ └── config.py # Debezium connector config builder
│ ├── kafka/
│ │ ├── source.py # KafkaEventSource (wraps CDCConsumer)
│ │ ├── provisioner.py # KafkaProvisioner (topics + Debezium registration)
│ │ └── monitor.py # KafkaSourceMonitor (SchemaMonitor + LagMonitor)
│ ├── pubsub/ # Google Pub/Sub transport
│ │ ├── source.py # PubSubEventSource (streaming pull + virtual partitions)
│ │ ├── provisioner.py # PubSubProvisioner (topics + subscriptions + PG slot)
│ │ ├── error_router.py # PubSubErrorRouter (DLQ topic)
│ │ ├── monitor.py # PubSubSourceMonitor (subscription backlog)
│ │ ├── publisher.py # PubSubWalPublisher (WalPublisher impl)
│ │ └── naming.py # Topic/subscription naming conventions
│ ├── kinesis/ # Amazon Kinesis transport
│ │ ├── source.py # KinesisEventSource (per-shard GetRecords)
│ │ ├── provisioner.py # KinesisProvisioner (streams + DynamoDB + PG slot)
│ │ ├── error_router.py # KinesisErrorRouter (DLQ stream)
│ │ ├── monitor.py # KinesisSourceMonitor (MillisBehindLatest)
│ │ ├── publisher.py # KinesisWalPublisher (WalPublisher impl)
│ │ ├── checkpoint.py # DynamoDB checkpoint store
│ │ └── naming.py # Stream naming conventions
│ └── wal/ # Direct WAL reader (non-Kafka transports)
│ ├── reader.py # Async logical replication stream consumer
│ ├── decoder.py # pgoutput binary protocol decoder
│ ├── publisher.py # WalPublisher protocol
│ └── slot_manager.py # Replication slot + publication lifecycle
├── streaming/
│ ├── auth.py # Kafka auth config builder (MSK IAM, GCP OAuth, SASL)
│ ├── consumer.py # Avro-deserializing Kafka consumer
│ ├── producer.py # Idempotent Kafka producer
│ ├── dlq.py # Dead Letter Queue handler (satisfies ErrorRouter)
│ ├── topics.py # Topic naming and admin utilities
│ ├── registry.py # Schema Registry integration
│ └── schema_monitor.py # Schema Registry version change monitor
├── lakehouse/
│ ├── maintenance.py # Background compaction + snapshot expiry
│ └── time_travel.py # Snapshot listing, time-travel scan, rollback
├── sinks/
│ ├── base.py # SinkConnector protocol
│ ├── factory.py # Sink factory (type -> class registry)
│ ├── webhook.py # HTTP webhook sink
│ ├── postgres.py # PostgreSQL batched sink
│ └── iceberg.py # Apache Iceberg lakehouse sink
├── pipeline/
│ └── runner.py # Pipeline orchestrator (transport-agnostic, uses protocols)
└── observability/
├── health.py # Component health probes (transport-aware)
├── http_health.py # Async HTTP health server for K8s probes (/healthz, /readyz)
└── metrics.py # Consumer lag metrics + LagMonitor periodic reporter
Dockerfile # Pipeline worker image (multi-stage, non-root)
Dockerfile.connect # Kafka Connect + all four Debezium connector plugins
docker/
├── docker-compose.yml # Base local stack (Kafka, Schema Registry, Connect, PostgreSQL)
├── docker-compose.mongodb.yml # Overlay: adds MongoDB replica set + seed data
├── docker-compose.sqlserver.yml # Overlay: adds SQL Server + CDC-enabled cdc_demo database
├── connect/
│ └── Dockerfile # Custom Kafka Connect image with Confluent Avro converter JARs
├── postgres/
│ └── init.sql # Demo schema (customers + orders) with logical replication
├── mongodb/
│ └── setup.sh # Replica set init + seed products/orders collections
└── sqlserver/
└── init.sql # sys.sp_cdc_enable_db/table + seed customers table
examples/
├── platform.yaml # Production platform config override (Kafka)
├── platform-msk.yaml # AWS MSK with IAM auth
├── platform-gcp-kafka.yaml # GCP Managed Kafka with OAuth
├── platform-pubsub.yaml # Google Pub/Sub transport
├── platform-kinesis.yaml # Amazon Kinesis transport
├── demo-config.yaml # PostgreSQL demo pipeline (webhook + postgres + iceberg sinks)
├── mongodb-pipeline.yaml # MongoDB CDC pipeline example
└── sqlserver-pipeline.yaml # SQL Server CDC pipeline example
helm/cdc-platform/ # Platform Helm chart (Kafka, Schema Registry, Kafka Connect)
├── Chart.yaml
├── values.yaml
└── templates/
tests/
├── unit/ # Unit tests (no Docker required)
├── integration/ # Integration tests (requires Docker stack)
└── benchmark/ # Throughput + backpressure benchmarks
Development
Setup
git clone <repo-url> cd cdc-platform uv sync --extra dev
Running tests
make test-unit # Unit tests only (no Docker required) make test-integration # Integration tests (manages Docker lifecycle automatically) make bench # Benchmark tests (manages Docker lifecycle automatically)
Unit tests run without any external services — all dependencies are mocked.
Integration tests manage the full Docker Compose stack automatically. The test session fixture starts all services (docker compose up -d), waits for health checks, registers a Debezium connector, runs the tests, and tears everything down (docker compose down -v). You do not need to run make up first — doing so would cause port conflicts.
The integration suite exercises the real Avro serialization path end-to-end: Debezium captures WAL changes, serializes with AvroConverter via the Confluent Schema Registry, and the tests consume with AvroDeserializer. This ensures the same code path used in production is tested.
Prerequisites for integration tests:
- Docker and Docker Compose installed and running
- No services already bound to ports 5432, 9092, 8081, 8083, or 8080
- First run may be slow while the custom Connect image builds (downloads ~12 Confluent Avro converter JARs)
Benchmarks
The platform includes a dedicated benchmark suite to measure throughput, latency, backpressure, and scaling. See docs/benchmarks.md for full documentation.
Lakehouse Features
The Iceberg sink supports table maintenance and time-travel operations. See docs/lakehouse.md for full documentation.
Linting and formatting
make lint # ruff check + mypy make fmt # ruff format + autofix
Docker stack
make up # Start all services (builds custom Connect image on first run) make down # Stop and remove containers make clean # Full cleanup (volumes + orphans)
The Connect service uses a custom Docker image (docker/connect/Dockerfile) that extends quay.io/debezium/connect:2.7 with Confluent Avro converter JARs. The stock Debezium image only ships JSON converters, but the platform uses Avro serialization with Schema Registry. The image is built automatically by docker compose up via the build: ./connect directive.
Adding a new sink
- Create
src/cdc_platform/sinks/my_sink.pyimplementing theSinkConnectorprotocol - Implement
sink_id,flushed_offsets,start(),write(),flush(),stop(),health() - Track
_flushed_offsets— update after durable writes, not on buffering - Register the sink type in
sinks/factory.py - Add the corresponding config model to
config/models.py
Adding a new transport
The platform's transport abstraction makes it possible to add new event sources without modifying the core pipeline. See sources/pubsub/ and sources/kinesis/ for reference implementations. Four protocols must be implemented:
-
Add a
TransportModevariant — Add the new mode to theTransportModeenum inconfig/models.py(e.g.PUBSUB = "pubsub"). Add any transport-specific config models and update thePlatformConfigmodel validator to enforce requirements for the new mode. -
Implement
EventSource(sources/base.py) — Createsources/<transport>/source.py. Thestart()method receives a handler callback and must callawait handler(event)for eachSourceEvent. Implementcommit_offsets(),stop(), andhealth(). For non-partitioned transports, usepartition=0. -
Implement
Provisioner(sources/provisioner.py) — Createsources/<transport>/provisioner.py. Theprovision()method creates any resources the transport needs (subscriptions, topics, connectors). Theteardown()method removes them. -
Implement
ErrorRouter(sources/error_router.py) — If the transport has its own dead-letter mechanism, create an implementation. Otherwise, the existingDLQHandlercan be reused if the transport writes to Kafka, or returnNonefrom the factory to disable error routing. -
Implement
SourceMonitor(sources/monitor.py) — Createsources/<transport>/monitor.pyif the transport supports schema monitoring or lag tracking. ReturnNonefrom the factory if not applicable. -
Register in the factory — Add dispatch branches for the new
TransportModein each function insources/factory.py. -
Add tests — Protocol conformance tests (
isinstancechecks), unit tests for each implementation, and factory dispatch tests.
The core pipeline (pipeline/runner.py), sinks, CLI, and all existing tests remain unchanged — they only interact with the protocols. The SourceEvent dataclass is the universal contract between transport and pipeline:
@dataclass class SourceEvent: key: dict[str, Any] | None value: dict[str, Any] | None topic: str # logical channel name partition: int # shard (0 for non-partitioned transports) offset: int # position (transport-specific) raw: Any = None # original transport message for ack/commit/DLQ
License
Apache 2.0 — see LICENSE.