CloudQuery
Creating a New Source Integration in Python
This guide walks through building a source integration in Python using the CloudQuery SDK. We reference the Python Integration Template and the Typeform integration as examples.
Prerequisites:
- Python 3.11+ installed (Python tutorial)
- uv package manager (recommended) or pip
- CloudQuery CLI installed
Get Started
Clone the Python Integration Template as a starting point:
git clone https://github.com/cloudquery/python-plugin-template.git cq-source-<name>
cd cq-source-<name>Install dependencies using uv (recommended):
Or using pip in a virtual environment:
pip3 install -r requirements.txtIf starting from scratch, install the SDK directly:
uv add cloudquery-plugin-sdk
# or: pip3 install cloudquery-plugin-sdkHow It All Connects
When a user runs cloudquery sync, here’s what happens with your Python integration:
- The CLI starts your integration (or connects to it over gRPC if you’re running it locally)
- Your
main.pycreates the plugin and starts the gRPC server - The CLI sends the user’s
specconfiguration to your plugin’sinitmethod initparses the spec, validates it, creates an authenticated API client, and sets up a scheduler- The CLI calls
sync, which runs each table’s resolver through the scheduler - Each resolver is a generator that
yields results. The SDK handles writing them to the destination.
Your main implementation work is in three places: the init method (parsing configuration and creating the API client), the Table classes (defining what data your integration exposes), and the Resolvers (fetching data from the API).
Project Structure
Here’s the typical structure based on the template and Typeform integration:
cq-source-<name>/
├── main.py # Entry point
├── pyproject.toml # Dependencies (SDK >= 0.1.47)
├── Dockerfile
├── plugin/
│ ├── __init__.py # Exports the Plugin class
│ ├── plugin.py # Plugin class definition
│ ├── client/
│ │ └── client.py # Spec dataclass + Client wrapper
│ └── tables/
│ ├── __init__.py # Exports Table classes
│ └── items.py # Table + Resolver per file
└── <api_name>/
└── client.py # Raw API client (HTTP calls, auth)Here’s what each component does:
main.py: the entry point. Creates your plugin and starts the gRPC server. You rarely need to modify this.plugin/plugin.py: the Plugin class is the central coordinator. It implementsinit(parsing configuration),get_tables(listing available tables), andsync(running resolvers through the scheduler). This is where your integration’s setup logic lives.plugin/client/: the Spec dataclass defines what settings your integration accepts (API keys, endpoints, concurrency). The Client wrapper stores the authenticated API client and any shared state that resolvers need.plugin/tables/: one file per table. Each file contains a Table class (defining the table name and columns) and a Resolver class (the generator that fetches data from the API). The resolver is the heart of each table.<api_name>/client.py: your raw API client code. This is where you make HTTP calls, handle auth headers, parse responses, and manage pagination. Keeping this separate from CloudQuery-specific code means you can test it independently.
Entry Point
The main.py creates and serves the plugin. This is boilerplate that you rarely need to change:
import sys
from cloudquery.sdk import serve
from plugin import ExamplePlugin
def main():
p = ExamplePlugin()
serve.PluginCommand(p).run(sys.argv[1:])
if __name__ == "__main__":
main()Plugin Class
The Plugin class is the central piece of your integration. It extends plugin.Plugin and implements four key methods that the SDK calls at different stages of a sync:
set_logger: called first, gives you a logger for debugginginit: called with the user’s spec JSON. This is where you parse configuration, validate it, create your API client, and set up the schedulerget_tables: returns the list of tables your integration supports (filtered by the user’s configuration)sync: runs the actual sync by feeding tables and resolvers to the scheduler
from cloudquery.sdk import plugin
PLUGIN_NAME = "my-integration"
PLUGIN_VERSION = "0.1.0"
TEAM_NAME = "my-team"
PLUGIN_KIND = "source"
class MyPlugin(plugin.Plugin):
def __init__(self) -> None:
super().__init__(
PLUGIN_NAME,
PLUGIN_VERSION,
plugin.plugin.Options(team=TEAM_NAME, kind=PLUGIN_KIND),
)
def set_logger(self, logger):
self._logger = logger
def init(self, spec, no_connection=False):
# Parse JSON spec, create API client and scheduler
...
def get_tables(self, options):
# Return filtered list of tables
...
def sync(self, options):
# Build resolver list, run scheduler, yield SyncMessages
...Define a Table
Unlike Go (where tables are functions returning a struct), in Python a table is a class that extends Table. You define the table name, columns, and a resolver property. Columns use PyArrow types for the underlying type system. This ensures consistent types across all CloudQuery SDKs regardless of language.
import pyarrow as pa
from cloudquery.sdk.schema import Column, Table
class Items(Table):
def __init__(self) -> None:
super().__init__(
name="example_items",
title="Example Items",
columns=[
Column("id", pa.uint64(), primary_key=True),
Column("name", pa.string()),
Column("created_at", pa.timestamp(unit="s")),
Column("active", pa.bool_()),
],
)
@property
def resolver(self):
return ItemResolver(table=self)Key details:
- Column types are PyArrow types:
pa.string(),pa.uint64(),pa.timestamp(),pa.bool_(),pa.date64(),pa.float64() - For JSON columns, use
JSONType()fromcloudquery.sdk.types - Set
primary_key=Trueon primary key columns - For parent-child relations, pass
relations=[ChildTable()] - For incremental tables, set
is_incremental=True
Write a Resolver
The resolver is where the actual API fetching happens. In Python, resolvers work differently from Go. Instead of sending items to a channel, you write a generator that yields results one at a time. This is the Python-native way to stream data lazily without loading everything into memory at once.
A resolver is a class extending TableResolver that implements the resolve method:
from typing import Any, Generator
from cloudquery.sdk.scheduler import TableResolver
from cloudquery.sdk.schema.resource import Resource
class ItemResolver(TableResolver):
def __init__(self, table) -> None:
super().__init__(table=table)
def resolve(self, client, parent_resource: Resource) -> Generator[Any, None, None]:
for item in client.api_client.list_items():
yield item # yield a dict matching the table columnsThe resolve method receives two arguments:
client: your Client wrapper, which gives access to the API client and any shared state you set up in the plugin’sinitmethod.parent_resource: for top-level tables this isNone. For child tables (e.g. fetching issues for a specific project), this contains the parent row so you can access parent fields viaparent_resource.item["field_name"].
Each yield produces one row in the destination table. The yielded value should be a dict whose keys match the column names defined in your Table class. The SDK handles mapping the dict values to the correct columns and types.
Important: resolve must be a generator (using yield), not a regular function that returns a list. The SDK relies on the generator pattern to stream results incrementally. If you collect all items into a list and return them, the SDK won’t receive them correctly.
For parent-child table relationships, expose child resolvers via a child_resolvers property on your resolver class: [table.resolver for table in self._table.relations].
Configuration & Authentication
The SDK passes the user’s spec block as a JSON string to your plugin’s init method. Define a @dataclass for the spec and parse it:
from dataclasses import dataclass, field
import json
@dataclass
class Spec:
access_token: str = field(default="")
base_url: str = field(default="https://api.example.com")
concurrency: int = field(default=100)
def validate(self):
if not self.access_token:
raise ValueError("access_token is required")Then in your plugin’s init method, deserialize and validate:
def init(self, spec, no_connection=False):
parsed = json.loads(spec)
self._spec = Spec(**parsed)
self._spec.validate()
# Create authenticated API client
self._api_client = MyApiClient(
access_token=self._spec.access_token,
base_url=self._spec.base_url,
)
# Create scheduler
self._scheduler = Scheduler(concurrency=self._spec.concurrency)Users configure authentication in their YAML file. The CLI automatically resolves environment variable references:
spec:
access_token: "${MY_API_TOKEN}"
base_url: "https://api.example.com"Common Pitfalls
Avoid these common mistakes when building Python integrations:
resolvemust be a generator. Useyieldto produce results. Don’t return a list. The SDK expects a generator for streaming.- Yield results as you go. Don’t collect all pages into a list then yield them at the end. Yield each item or page immediately to keep memory usage low.
- Use
@dataclassfor the Spec, not Pydantic. The SDK examples and template use standard dataclasses. - Return errors, don’t swallow them. If an API call fails, let the exception propagate so the SDK can log it and surface it to the user.
- Use the bundled PyArrow. Don’t install a separate version of
pyarrow. Use the one that comes withcloudquery-plugin-sdkfor compatibility.
Test Locally
Start the integration as a gRPC server:
uv run main serve
# or: python main.py serveThen sync using registry: grpc. You can also build and run as a Docker container. See the Typeform Dockerfile. See Testing Locally for configuration examples and Running Locally for full details.
Publishing
Visit Publishing an Integration to the Hub for release instructions. Python integrations can also be distributed as Docker images.
Real-World Examples
- Python Integration Template: starter template for new integrations
- xkcd Example: a starter integration built from the template
- Typeform: single-endpoint REST API integration
- Square: OpenAPI-generated tables using
cloudquery.sdk.transformers.openapi
Next Steps
Once your integration is working locally:
- Publish to the Hub: make your integration available to others
- Add tests: see the test patterns in Typeform for reference
- Add incremental tables: set
is_incremental=Trueon tables and use the state client for cursor management - Try OpenAPI generation: if your API has an OpenAPI spec, see the Square integration for auto-generating tables
Resources
- CloudQuery Community
- How to Write a Source Integration in Python (Video. May reference older SDK patterns; use this guide for current code.)
- Python SDK Source Code
- CloudQuery SDK on PyPI