CloudQuery

Creating a New Source Integration in Python

This guide walks through building a source integration in Python using the CloudQuery SDK. We reference the Python Integration Template and the Typeform integration as examples.

Prerequisites:

  • Python 3.11+ installed (Python tutorial)
  • uv package manager (recommended) or pip
  • CloudQuery CLI installed

Get Started

Clone the Python Integration Template as a starting point:

git clone https://github.com/cloudquery/python-plugin-template.git cq-source-<name>
cd cq-source-<name>

Install dependencies using uv (recommended):

Or using pip in a virtual environment:

pip3 install -r requirements.txt

If starting from scratch, install the SDK directly:

uv add cloudquery-plugin-sdk
# or: pip3 install cloudquery-plugin-sdk

How It All Connects

When a user runs cloudquery sync, here’s what happens with your Python integration:

  1. The CLI starts your integration (or connects to it over gRPC if you’re running it locally)
  2. Your main.py creates the plugin and starts the gRPC server
  3. The CLI sends the user’s spec configuration to your plugin’s init method
  4. init parses the spec, validates it, creates an authenticated API client, and sets up a scheduler
  5. The CLI calls sync, which runs each table’s resolver through the scheduler
  6. Each resolver is a generator that yields results. The SDK handles writing them to the destination.

Your main implementation work is in three places: the init method (parsing configuration and creating the API client), the Table classes (defining what data your integration exposes), and the Resolvers (fetching data from the API).

Project Structure

Here’s the typical structure based on the template and Typeform integration:

cq-source-<name>/
├── main.py                   # Entry point
├── pyproject.toml             # Dependencies (SDK >= 0.1.47)
├── Dockerfile
├── plugin/
│   ├── __init__.py            # Exports the Plugin class
│   ├── plugin.py              # Plugin class definition
│   ├── client/
│   │   └── client.py          # Spec dataclass + Client wrapper
│   └── tables/
│       ├── __init__.py        # Exports Table classes
│       └── items.py           # Table + Resolver per file
└── <api_name>/
    └── client.py              # Raw API client (HTTP calls, auth)

Here’s what each component does:

  • main.py: the entry point. Creates your plugin and starts the gRPC server. You rarely need to modify this.
  • plugin/plugin.py: the Plugin class is the central coordinator. It implements init (parsing configuration), get_tables (listing available tables), and sync (running resolvers through the scheduler). This is where your integration’s setup logic lives.
  • plugin/client/: the Spec dataclass defines what settings your integration accepts (API keys, endpoints, concurrency). The Client wrapper stores the authenticated API client and any shared state that resolvers need.
  • plugin/tables/: one file per table. Each file contains a Table class (defining the table name and columns) and a Resolver class (the generator that fetches data from the API). The resolver is the heart of each table.
  • <api_name>/client.py: your raw API client code. This is where you make HTTP calls, handle auth headers, parse responses, and manage pagination. Keeping this separate from CloudQuery-specific code means you can test it independently.

Entry Point

The main.py creates and serves the plugin. This is boilerplate that you rarely need to change:

import sys
 
from cloudquery.sdk import serve
from plugin import ExamplePlugin
 
def main():
    p = ExamplePlugin()
    serve.PluginCommand(p).run(sys.argv[1:])
 
if __name__ == "__main__":
    main()

Plugin Class

The Plugin class is the central piece of your integration. It extends plugin.Plugin and implements four key methods that the SDK calls at different stages of a sync:

  • set_logger: called first, gives you a logger for debugging
  • init: called with the user’s spec JSON. This is where you parse configuration, validate it, create your API client, and set up the scheduler
  • get_tables: returns the list of tables your integration supports (filtered by the user’s configuration)
  • sync: runs the actual sync by feeding tables and resolvers to the scheduler
from cloudquery.sdk import plugin
 
PLUGIN_NAME = "my-integration"
PLUGIN_VERSION = "0.1.0"
TEAM_NAME = "my-team"
PLUGIN_KIND = "source"
 
class MyPlugin(plugin.Plugin):
    def __init__(self) -> None:
        super().__init__(
            PLUGIN_NAME,
            PLUGIN_VERSION,
            plugin.plugin.Options(team=TEAM_NAME, kind=PLUGIN_KIND),
        )
 
    def set_logger(self, logger):
        self._logger = logger
 
    def init(self, spec, no_connection=False):
        # Parse JSON spec, create API client and scheduler
        ...
 
    def get_tables(self, options):
        # Return filtered list of tables
        ...
 
    def sync(self, options):
        # Build resolver list, run scheduler, yield SyncMessages
        ...

Define a Table

Unlike Go (where tables are functions returning a struct), in Python a table is a class that extends Table. You define the table name, columns, and a resolver property. Columns use PyArrow types for the underlying type system. This ensures consistent types across all CloudQuery SDKs regardless of language.

import pyarrow as pa
from cloudquery.sdk.schema import Column, Table
 
class Items(Table):
    def __init__(self) -> None:
        super().__init__(
            name="example_items",
            title="Example Items",
            columns=[
                Column("id", pa.uint64(), primary_key=True),
                Column("name", pa.string()),
                Column("created_at", pa.timestamp(unit="s")),
                Column("active", pa.bool_()),
            ],
        )
 
    @property
    def resolver(self):
        return ItemResolver(table=self)

Key details:

  • Column types are PyArrow types: pa.string(), pa.uint64(), pa.timestamp(), pa.bool_(), pa.date64(), pa.float64()
  • For JSON columns, use JSONType() from cloudquery.sdk.types
  • Set primary_key=True on primary key columns
  • For parent-child relations, pass relations=[ChildTable()]
  • For incremental tables, set is_incremental=True

Write a Resolver

The resolver is where the actual API fetching happens. In Python, resolvers work differently from Go. Instead of sending items to a channel, you write a generator that yields results one at a time. This is the Python-native way to stream data lazily without loading everything into memory at once.

A resolver is a class extending TableResolver that implements the resolve method:

from typing import Any, Generator
 
from cloudquery.sdk.scheduler import TableResolver
from cloudquery.sdk.schema.resource import Resource
 
class ItemResolver(TableResolver):
    def __init__(self, table) -> None:
        super().__init__(table=table)
 
    def resolve(self, client, parent_resource: Resource) -> Generator[Any, None, None]:
        for item in client.api_client.list_items():
            yield item  # yield a dict matching the table columns

The resolve method receives two arguments:

  • client: your Client wrapper, which gives access to the API client and any shared state you set up in the plugin’s init method.
  • parent_resource: for top-level tables this is None. For child tables (e.g. fetching issues for a specific project), this contains the parent row so you can access parent fields via parent_resource.item["field_name"].

Each yield produces one row in the destination table. The yielded value should be a dict whose keys match the column names defined in your Table class. The SDK handles mapping the dict values to the correct columns and types.

Important: resolve must be a generator (using yield), not a regular function that returns a list. The SDK relies on the generator pattern to stream results incrementally. If you collect all items into a list and return them, the SDK won’t receive them correctly.

For parent-child table relationships, expose child resolvers via a child_resolvers property on your resolver class: [table.resolver for table in self._table.relations].

Configuration & Authentication

The SDK passes the user’s spec block as a JSON string to your plugin’s init method. Define a @dataclass for the spec and parse it:

from dataclasses import dataclass, field
import json
 
@dataclass
class Spec:
    access_token: str = field(default="")
    base_url: str = field(default="https://api.example.com")
    concurrency: int = field(default=100)
 
    def validate(self):
        if not self.access_token:
            raise ValueError("access_token is required")

Then in your plugin’s init method, deserialize and validate:

def init(self, spec, no_connection=False):
    parsed = json.loads(spec)
    self._spec = Spec(**parsed)
    self._spec.validate()
 
    # Create authenticated API client
    self._api_client = MyApiClient(
        access_token=self._spec.access_token,
        base_url=self._spec.base_url,
    )
 
    # Create scheduler
    self._scheduler = Scheduler(concurrency=self._spec.concurrency)

Users configure authentication in their YAML file. The CLI automatically resolves environment variable references:

spec:
  access_token: "${MY_API_TOKEN}"
  base_url: "https://api.example.com"

Common Pitfalls

Avoid these common mistakes when building Python integrations:

  • resolve must be a generator. Use yield to produce results. Don’t return a list. The SDK expects a generator for streaming.
  • Yield results as you go. Don’t collect all pages into a list then yield them at the end. Yield each item or page immediately to keep memory usage low.
  • Use @dataclass for the Spec, not Pydantic. The SDK examples and template use standard dataclasses.
  • Return errors, don’t swallow them. If an API call fails, let the exception propagate so the SDK can log it and surface it to the user.
  • Use the bundled PyArrow. Don’t install a separate version of pyarrow. Use the one that comes with cloudquery-plugin-sdk for compatibility.

Test Locally

Start the integration as a gRPC server:

uv run main serve
# or: python main.py serve

Then sync using registry: grpc. You can also build and run as a Docker container. See the Typeform Dockerfile. See Testing Locally for configuration examples and Running Locally for full details.

Publishing

Visit Publishing an Integration to the Hub for release instructions. Python integrations can also be distributed as Docker images.

Real-World Examples

Next Steps

Once your integration is working locally:

  1. Publish to the Hub: make your integration available to others
  2. Add tests: see the test patterns in Typeform for reference
  3. Add incremental tables: set is_incremental=True on tables and use the state client for cursor management
  4. Try OpenAPI generation: if your API has an OpenAPI spec, see the Square integration for auto-generating tables

Resources