Build a custom collector

Build a custom ODD collector or adapter against the odd-collector-sdk.

Audience: Python developers extending ODD when an existing adapter doesn't fit. Most source systems are already covered by the bundled collectors — start at the Integrations hub before you reach for the SDK.

The odd-collector-sdk Python library is what powers every pull collector in the odd-collectors monorepo (odd-collector generic, odd-collector-aws, odd-collector-azure, odd-collector-gcp) and the standalone odd-collector-profiler. It handles the parts that every collector shares — config loading, adapter discovery, scheduling, the Platform Ingress API client, signal-based shutdown — so authoring a new adapter is mostly about writing the source-specific extract-and-map step.

This guide walks through building a custom collector or a custom adapter against the SDK. It covers the plugin / config pattern, the adapter contract (sync, async, and async-generator variants), wiring up the entry point, packaging, and the seams where ODDRN generation and runtime configuration plug in.

When to author a custom adapter or collector

Before writing code, confirm that the existing collectors don't already cover your case:

  • A new database / BI / ML / streaming source not yet in odd-collector → add a new adapter to the generic collector and contribute it back upstream.

  • An AWS / Azure / GCP managed service not yet in the cloud collectors → add an adapter to odd-collector-aws, odd-collector-azure, or odd-collector-gcp.

  • A push-strategy integration (the source already runs your code — Airflow, dbt, Spark, Great Expectations, custom CI/CD) → the source-embedded pattern is implemented separately in repos like odd-airflow-2, odd-dbt, odd-spark-adapter, odd-great-expectations. The SDK described here targets pull collectors; push integrations follow the host system's plugin or listener API rather than this SDK.

  • A standalone collector container with no overlap with the bundled set (proprietary SaaS, internal data system, an isolated research source) → build a brand-new collector against the SDK using the layout below.

  • Just want a one-off ad-hoc push → consider odd-cli before authoring code.

The "one-off pull adapter" path is by far the most common — every adapter inside odd-collector started life as a Plugin subclass plus an Adapter class plus a Generator subclass.

SDK packages and versions

  • Python: the SDK pins python = "^3.9" in odd-collector-sdk/pyproject.toml. Any Python 3.9.x or later 3.x interpreter works.

  • Pydantic: the SDK pins pydantic = "^2.7.1". The Plugin base class uses Pydantic v2's BaseSettings from the separate pydantic-settings package — be careful not to import BaseSettings from pydantic itself (Pydantic v2 moved it).

  • Scheduler: APScheduler v3 (apscheduler = "^3.8.1").

  • Async transport: aiohttp = "^3.8.1".

  • Models: odd-models = "^2.0.47" (the Python Pydantic model package generated from the ODD Specification). DataEntity, DataEntityList, and DataSource come from here.

  • ODDRN generator: oddrn-generator = "^0.1.101". Per-source Generator subclasses live in this package (PostgresqlGenerator, SnowflakeGenerator, KafkaGenerator, FeastGenerator, …).

Install the SDK into your project with Poetry:

Anatomy of a collector

A pull collector is a long-running process that does five things on a schedule:

  1. Load configuration from collector_config.yaml (and the environment), validated against a Pydantic model.

  2. Discover adapters by importing the package referenced by each plugin's type literal and instantiating the Adapter class found in that package.

  3. Register data sources with the Platform via POST /ingestion/datasources once at startup.

  4. Run each adapter on the schedule via APScheduler, or once if no schedule is configured.

  5. Send the resulting DataEntityList to the Platform via POST /ingestion/entities, chunked to keep individual requests bounded.

The SDK provides:

Component
Purpose
Source

Collector

Lifecycle entry point — config loading, adapter discovery, register, schedule, shutdown

CollectorConfig

Pydantic model for the top-level YAML schema (token, platform URL, plugins list, runtime knobs)

Plugin

Base class for adapter configuration objects (Pydantic BaseSettings, extra="allow")

PluginFactory

Dict[str, Type[Plugin]] mapping type literal → Plugin subclass; the discriminator

AbstractAdapter, BaseAdapter, AsyncAbstractAdapter

Three contracts an adapter can implement; SDK dispatches automatically

Filter

Reusable include / exclude regex filter for ingestion scoping

PlatformApi

Async client for /ingestion/datasources and /ingestion/entities

BaseSecretsBackend

Optional pluggable secrets-backend hook for sourcing config from external stores

Project layout

A custom collector named my_collector follows this canonical layout — the same shape every monorepo collector uses:

Two non-negotiable rules:

  • The directory name under adapters/ must equal the plugin's type literal. The SDK's load_adapters imports {root_package}.{plugins_package}.{plugin.type} and looks for package.adapter.Adapter inside it. A mismatch between the YAML's type: field and the directory name will surface as a missing-module ImportError at startup.

  • Each adapter package must expose a class named exactly Adapter in adapter.py. The class can extend any of the three adapter contracts below — the SDK detects which one by inspection.

Define the plugin (config schema)

A plugin is a Pydantic model that mirrors the YAML shape an operator writes into collector_config.yaml for one configured instance of an adapter. Every plugin extends Plugin from the SDK:

The base Plugin provides three fields every plugin inherits — name (required, operator-chosen, unique per collector), description (optional metadata), and namespace (optional metadata). The base also sets extra="allow" on the underlying pydantic_settings.BaseSettings, so adapter-specific fields are accepted without further declaration. Use pydantic.SecretStr for secrets so they are masked in repr output.

PLUGIN_FACTORY is the discriminator the SDK uses to map an entry's type: value to the right Pydantic class. The directory name under adapters/ must equal the type literal, and the type literal must equal a key in PLUGIN_FACTORY. Three names, one string.

For a small inheritance example modelled on the bundled collectors, see AwsPlugin and its subclasses — every AWS adapter inherits a common-auth base; specific adapters declare only the source-specific fields. The same pattern — a private base class plus per-source subclasses — works in any collector.

Implement the adapter

An adapter does two things: returns the data source's ODDRN and produces a DataEntityList. The SDK supports three implementation contracts and dispatches automatically based on the shape of get_data_entity_list:

Base class

When to use

get_data_entity_list signature

AbstractAdapter

Synchronous source (typical SQL adapter using psycopg2 or similar blocking driver)

def get_data_entity_list(self) -> DataEntityList

AsyncAbstractAdapter

Async source (aiohttp, aiomysql, asyncpg, …)

async def get_data_entity_list(self) -> DataEntityList

BaseAdapter (concrete subclass of AbstractAdapter)

Synchronous source where you already have an oddrn_generator.Generator subclass

inherits get_data_source_oddrn from the generator; you implement create_generator() and get_data_entity_list()

The SDK's create_job inspects get_data_entity_list and instantiates one of:

  • SyncJob — adapter returns a DataEntityList directly; SDK iterates over .items in chunks of chunk_size (default 250).

  • AsyncJob — adapter awaits a DataEntityList; SDK chunks per-call.

  • AsyncGeneratorJob — adapter is an async def ... yield generator yielding multiple DataEntityList objects (or a flat iterable thereof); useful for very large catalogs where building the full list in memory is not affordable.

You don't pick the job type explicitly — define get_data_entity_list in whichever shape fits your source's I/O model and the SDK does the rest.

AbstractAdapter — minimum contract

The SDK injects the matching Plugin instance as the constructor argument — so config is already validated against your Pydantic model. Always store it as self.config; if you don't, the SDK's load_adapters will fall back to setting adapter.config = plugin after construction so the scheduler can read adapter.config.name for log labels.

For a real working example of this shape, the Feast adapter is concise and self-contained — it constructs a FeatureStore, holds a FeastGenerator, and returns a sync DataEntityList derived from self.__feature_store.list_feature_views().

AsyncAbstractAdapter — async I/O

BaseAdapter — generator-driven shape

BaseAdapter is a concrete AbstractAdapter subclass that bundles the common pattern of "I have an oddrn_generator.Generator and I want it to drive get_data_source_oddrn":

BaseAdapter saves the boilerplate of declaring __init__ and get_data_source_oddrn yourself; pick it whenever the generator-on-self pattern fits.

Async generator — for very large catalogs

If your source's catalog is large enough that materialising it as a single DataEntityList is a memory concern, declare get_data_entity_list as an async generator and yield batches:

The SDK's AsyncGeneratorJob further sub-chunks each yielded list to chunk_size items per request, so you can yield large pages and still respect the request-size limit.

Wire up the entry point

__main__.py instantiates the SDK's Collector and hands control to its run() method, which sets up signal handlers (SIGHUP, SIGTERM, SIGINT), registers data sources, and either schedules the polling loop or runs once and exits depending on whether default_pulling_interval is set:

Then run with:

The Collector constructor signature is Collector(config_path, root_package, plugin_factory, plugins_package="adapters"). Override plugins_package only if your project nests adapters under a non-default subpackage. run(loop=None) accepts an existing event loop — supply one when integrating into a host process; omit it for standalone collector containers.

End-to-end skeleton

A minimal-but-complete custom collector that does nothing useful but starts cleanly:

Run it:

The collector starts, registers a //hello/host/hello_world data source against the platform, sends an empty DataEntityList every 10 minutes, and exits cleanly on SIGINT. Replace the get_data_entity_list body with real source extraction and you have a working adapter.

Generate ODDRNs

Every entity the adapter emits — the data source itself, datasets, columns, jobs, transformers — needs an ODDRN. ODDRNs are how the platform recognises the same entity across ingests, across collectors, and over time; getting them right is what makes cross-system lineage possible.

Use the oddrn-generator Python package — it ships per-source Generator subclasses (PostgresqlGenerator, SnowflakeGenerator, KafkaGenerator, FeastGenerator, MysqlGenerator, MssqlGenerator, …). Pick the one that matches your source category, or subclass Generator for a brand-new source family. Set the host / database / namespace once at adapter construction and let the generator stitch path components onto each entity ODDRN.

The Java equivalent (oddrn-generator-java) exists for JVM-side push adapters. See ODDRN for the format, examples, and consumer requirements (the same ODDRN must identify the same entity across producers — coordinate hostnames and identifiers across your deployment).

Runtime configuration

The full reference for the top-level collector_config.yaml shape — platform_host_url, token, default_pulling_interval, plugins, plus connection_timeout_seconds, chunk_size, misfire_grace_time, max_instances, verify_ssl — lives once at Build and run ODD Collectors → Full configuration reference. That page is the operator-side companion to this developer guide; treat it as the runtime-config source of truth.

Custom collectors share that exact schema — your adapter only adds per-plugin fields under each plugins[*] entry. Setting default_pulling_interval to a positive integer (in minutes) makes the collector poll on that cadence; leaving it unset makes the collector run all adapters once and exit (useful for one-shot ingestion in CI / cron).

For sourcing config values from an external secret store instead of inline YAML, see Collector secrets backend. The SDK ships an AWSSystemsManagerParameterStore provider and a BaseSecretsBackend abstract class for additional providers.

Package and deploy

The bundled monorepo collectors package as Docker images. The same Dockerfile shape works for a custom collector:

The SDK's CollectorConfigLoader (instantiated and called from Collector.__init__ at collector.py:61 as CollectorConfigLoader(config_path, plugin_factory).load()) reads the path passed to Collector(config_path=...), falling back to $CONFIG_PATH, then to ./collector_config.yaml. Mount the operator-supplied YAML at the path your __main__.py resolves to. Token and other secrets are best supplied via environment variables resolved by pyaml-env (the SDK's YAML loader) rather than baked into the image.

For Kubernetes deployments, follow the same pattern as the bundled collectors' Helm charts at opendatadiscovery/charts — a single Deployment running one container per collector, with collector_config.yaml as a ConfigMap or Secret volume.

Testing locally

Run a one-shot pull against a local Platform to smoke-check the adapter loads and the data source registers:

  1. In the platform UI, create a Collector entity (Management → Collectors) and copy the issued token.

  2. Set token: <COLLECTOR_TOKEN> in collector_config.yaml and remove default_pulling_interval (so the collector runs once and exits — much faster feedback than a 10-minute schedule loop).

  3. poetry run python -m my_collector and watch the log for the DataSource registration POST and the per-adapter [name] collecting metadata started. / [name] metadata collected in … lines.

  4. Open the platform UI's Catalog page and confirm the data source appears.

When something doesn't work, the SDK logs are the first place to look — loguru writes structured per-adapter lines (the format is set in logger.py). Most failures fall into three buckets:

  • LoadConfigError at startup — a YAML field doesn't match the Pydantic model. The exception body names the offending field path.

  • ImportError looking for package.adapter.Adapter — the directory under adapters/ doesn't match the plugin's type literal, or adapter.py doesn't expose a class named exactly Adapter.

  • PlatformApiError on register_datasource / ingest_data — token, platform URL, or TLS verification (verify_ssl) is wrong; double-check platform_host_url and that the token belongs to a Collector entity that's registered in the platform.

Where to look in the SDK

When the doc above doesn't cover a specific question, read the source — it is small enough to navigate directly:

  • odd-collector-sdk/odd_collector_sdk/collector.py — the Collector class and its lifecycle (run, start_polling, register_data_sources, one_time_run).

  • odd-collector-sdk/odd_collector_sdk/domain/adapter.py — the three adapter contracts.

  • odd-collector-sdk/odd_collector_sdk/domain/plugin.py — the Plugin base and the Config alias.

  • odd-collector-sdk/odd_collector_sdk/domain/collector_config.pyCollectorConfig (the runtime Pydantic model). The module also defines a load_config helper, but it is test-only (called only from tests/test_module_importer.py) — do not use it for runtime config loading.

  • odd-collector-sdk/odd_collector_sdk/domain/collector_config_loader.pyCollectorConfigLoader (the runtime config loader). Collector.__init__ instantiates it as CollectorConfigLoader(config_path, plugin_factory).load(); it integrates with the optional secrets backend, merges priority-ordered settings and plugins, and returns a validated CollectorConfig.

  • odd-collector-sdk/odd_collector_sdk/load_adapter.py — adapter package discovery and instantiation.

  • odd-collector-sdk/odd_collector_sdk/job.pySyncJob / AsyncJob / AsyncGeneratorJob and the create_job dispatch.

  • odd-collector-sdk/odd_collector_sdk/api/datasource_api.py — the Ingress API client.

  • odd-collector-sdk/odd_collector_sdk/secrets/ — the BaseSecretsBackend and the AWS SSM provider.

For working examples of each adapter shape, the bundled collectors in odd-collectors are the best teachers — every type literal you see on the generic collector page maps to a real adapter under odd-collector/odd_collector/adapters/{type}/adapter.py that you can read end-to-end.

Further reading

Contribute back

If your custom adapter targets a source that other operators are likely to use, contribute it to the appropriate bundled collector instead of maintaining a fork. The contribution flow follows the standard ODD process — fork, branch, PR — see How to contribute. New adapters generally go into:

  • odd-collector — generic data sources (databases, BI, streams, MLOps).

  • odd-collector-aws / odd-collector-azure / odd-collector-gcp — cloud-native sources, when there's a clear cloud affinity.

  • A standalone push-adapter repo (odd-dbt, odd-airflow-2, odd-spark-adapter, …) — for source-runtime-embedded integrations.

Custom collectors that don't fit the bundled pattern can also live as separate community-maintained repos that depend on odd-collector-sdk directly; see GitHub organization overview for the existing repo set and the role of each.

Last updated