Skip to content

Source Adapters

Canonical unified data-loading surface for Alphaforge.

New code should register SourceAdapter implementations in DataContext.adapters and load data through:

  • DataContext.fetch(...)
  • DataContext.fetch_many(...)
  • DataContext.prefetch(...)

Legacy DataSource and fetch_panel(...) usage remains supported only as a compatibility boundary while older loaders migrate.

Protocol & Base

alphaforge.data.adapter

Unified SourceAdapter protocol and base mixin.

SourceAdapter — Protocol that all data-source adapters must satisfy. SourceAdapterBase — Mixin providing default fetch_many and prefetch.

SourceAdapter

Bases: Protocol

Protocol for unified data-source adapters.

Each adapter represents one source (e.g. "cftc", "fred", "tiingo") that can serve one or more datasets.

Attributes

source_name : str Identifier for this source (e.g. "cftc", "bloomberg"). datasets : frozenset[str] Logical datasets this adapter can serve (e.g. {"cot.tff", "cot.disagg"}).

fetch(query: Query, *, max_staleness: Optional[timedelta] = None) -> FetchResult

Cache-aware single query fetch.

Checks local cache first; on miss, fetches from the underlying source, transforms to canonical format, persists to cache, and returns the result.

fetch_many(queries: list[Query], *, max_staleness: Optional[timedelta] = None) -> list[FetchResult]

Batch fetch for multiple queries.

Bulk adapters can optimize this (single cache lookup for all keys). Per-series adapters parallelize individual fetch calls.

list_entities(dataset: str) -> list[str]

Return available entity/series keys for a dataset.

prefetch(dataset: str, asof_range: tuple[date, date] | None = None) -> CacheManifest

Explicitly warm cache for a dataset.

Bulk adapters fetch the full dataset; per-series adapters may no-op. Returns a manifest describing what was cached.

SourceAdapterBase

Mixin providing default implementations for SourceAdapter.

Subclasses must define: - source_name - datasets - fetch() - list_entities()

Default fetch_many iterates over queries calling fetch. Default prefetch returns an empty manifest.

fetch_many(queries: list[Query], *, max_staleness: Optional[timedelta] = None) -> list[FetchResult]

Default: iterate and call fetch for each query.

prefetch(dataset: str, asof_range: tuple[date, date] | None = None) -> CacheManifest

Default: no-op, returns empty manifest.

Value Types

alphaforge.data.types

Unified data-layer value objects.

FetchResult — immutable result of a data fetch (PIT or non-PIT). CacheManifest — metadata about a cache population event. StaleDataWarning — warning emitted when cached data exceeds staleness threshold.

CacheManifest dataclass

Metadata about a cache population event.

FetchResult dataclass

Immutable result of a data fetch.

Parameters

data : pd.DataFrame The fetched data. For PIT data the frame has columns [obs_date, asof_utc, value] (and possibly others). For non-PIT data it has [obs_date, value] (or OHLCV columns). source : str Which source adapter provided the data (e.g. "fred", "cftc"). dataset : str Logical dataset name (e.g. "gdp", "cot.tff"). is_pit : bool Whether the data carries point-in-time revision history. cached_at : datetime | None When this data was stored in cache. None means freshly fetched (not served from cache).

cache_age: Optional[timedelta] property

Time since this data was cached, or None if freshly fetched.

as_timeseries(asof: Optional[date] = None) -> pd.Series

Extract a single obs_date-indexed value Series.

For PIT data: filters to the latest revision with asof_utc <= asof per obs_date, then returns the value column. If asof is None, returns the latest revision overall.

For non-PIT data: ignores asof and returns the value column indexed by obs_date.

StaleDataWarning

Bases: UserWarning

Emitted when cached data exceeds the caller's staleness threshold.

Cache Layer

alphaforge.data.cache_layer

Unified cache layer for PIT and non-PIT data.

Stores data in DuckDB tables with source isolation and staleness tracking. PIT data goes to cache_pit_observations, non-PIT to cache_market_observations, and freshness metadata to cache_manifest.

CacheLayer

Unified cache backed by a DuckDB connection.

Parameters

conn : duckdb.DuckDBPyConnection An open DuckDB connection (in-memory or file-backed).

get_manifest(*, dataset: str, source: str) -> Optional[CacheManifest]

Return the manifest for a dataset+source, or None.

lookup(*, series_key: str, dataset: str, source: str, is_pit: bool, max_staleness: Optional[timedelta] = None) -> Optional[tuple[pd.DataFrame, datetime]]

Look up cached data for a series.

Returns

(DataFrame, cached_at) if found, else None.

If max_staleness is timedelta(0), always returns None (force miss). If cache is older than max_staleness, emits :class:StaleDataWarning but still returns data.

store(df: pd.DataFrame, *, dataset: str, source: str, is_pit: bool) -> None

Persist data to the appropriate cache table and update manifest.

Discovery

alphaforge.data.sources.discover_adapters() -> dict[str, type[SourceAdapter]]

Discover installed SourceAdapter classes via entry points.

Returns a mapping of {source_name: AdapterClass} for every package that registers an alphaforge.source_adapters entry point.

Example

adapters = discover_adapters() adapters.keys() dict_keys(['tiingo', 'fred', 'cftc', 'dtcc'])

Built-in Adapters

Tiingo (Market OHLCV)

alphaforge.data.sources.tiingo.TiingoAdapter

Bases: SourceAdapterBase

Cache-aware adapter for Tiingo EOD market data.

Parameters

api_key : str Tiingo API token. cache_conn : duckdb.DuckDBPyConnection | None DuckDB connection for caching. If None, no caching. use_adjusted : bool Use split/dividend-adjusted prices (default True).

fetch(query: Query, *, max_staleness: Optional[timedelta] = None) -> FetchResult

Fetch OHLCV for requested tickers. Cache-aware.

list_entities(dataset: str) -> list[str]

Tiingo has no fixed entity list.

FRED (Macro PIT)

alphaforge.data.sources.fred.FREDSourceAdapter

Bases: SourceAdapterBase

Cache-aware adapter for FRED/ALFRED macro data (PIT).

Parameters

api_key : str FRED API key. cache_conn : duckdb.DuckDBPyConnection | None DuckDB connection for caching. If None, no caching. datasets : frozenset[str] | None Override the default dataset set.

fetch(query: Query, *, max_staleness: Optional[timedelta] = None) -> FetchResult

Fetch PIT macro data for requested series.

list_entities(dataset: str) -> list[str]

FRED has thousands of series — return empty by default.

Users should know their series IDs (e.g., GDP, GDPC1, UNRATE).

CFTC (CoT Positioning)

alphaforge.data.sources.cftc.CFTCAdapter

Bases: SourceAdapterBase

Cache-aware bulk adapter for CFTC Commitments of Traders data.

Each dataset is fetched in bulk through a dataset-specific callable. The default backward-compatible constructor still accepts a single raw_fetcher for cot.tff.

fetch(query: Query, *, max_staleness: Optional[timedelta] = None) -> FetchResult

Fetch CoT data. First call triggers bulk fetch; subsequent use cache.

list_entities(dataset: str) -> list[str]

List cached entity keys, or empty if no cache.

prefetch(dataset: str, asof_range: tuple[date, date] | None = None) -> CacheManifest

Bulk fetch and cache all CoT data.

DTCC (Swap Derivatives)

The preferred DTCC adapter-backed datasets now split along the first concrete product families:

  • DTCCFXAdapter serves dtcc.fx for FX forwards and swaps
  • DTCCIRSAdapter serves dtcc.irs for interest rate swaps

These family adapters own DTCCPPDSource construction internally, stamp product-family-specific PIT lineage, and keep dataset routing distinct in DataContext.

from alphaforge.data.context import DataContext
from alphaforge.data.sources.dtcc import DTCCFXAdapter, DTCCIRSAdapter

ctx = DataContext.from_adapters(
    DTCCFXAdapter(list_provider=..., artifact_provider=...),
    DTCCIRSAdapter(list_provider=..., artifact_provider=...),
)

fx = ctx.load(
    "dtcc.fx",
    columns=["value"],
    entities=["dtcc.fx.dtccppd.fx.fx_forward.usd.1m.trade_count"],
)
irs = ctx.load(
    "dtcc.irs",
    columns=["value"],
    entities=["dtcc.irs.dtccppd.rates.interest_rate_swap.usd.5y.trade_count"],
)

DTCCAdapter remains available as the broader generic dtcc.ppd wrapper, and future DTCC product-family adapters should build on DTCCPPDAdapterBase so raw-loader fetch wiring, cache behavior, and PIT transform plumbing stay consistent.

alphaforge.data.sources.dtcc.DTCCFXAdapter

Bases: _FilteredDTCCPPDAdapter

Canonical DTCC adapter for FX forwards and swaps.

alphaforge.data.sources.dtcc.DTCCIRSAdapter

Bases: _FilteredDTCCPPDAdapter

Canonical DTCC adapter for interest rate swaps.

alphaforge.data.sources.dtcc.DTCCAdapter

Bases: DTCCPPDAdapterBase

Built-in canonical adapter for the generic DTCC PPD dataset.

Callers no longer need to inject a raw fetch function. By default this adapter constructs and owns a DTCCPPDSource instance internally. Tests and advanced callers may still inject a preconfigured raw source.

alphaforge.data.sources.dtcc.DTCCPPDAdapterBase

Bases: SourceAdapterBase

Shared cache-aware adapter plumbing over DTCCPPDSource.

Subclasses define the adapter-facing dataset contract and the PIT transform, while this base owns raw-loader fetch construction and cache lifecycle management.

fetch(query: Query, *, max_staleness: Optional[timedelta] = None) -> FetchResult

Fetch DTCC data through the canonical adapter contract.

list_entities(dataset: str) -> list[str]

List cached entity keys for a DTCC adapter dataset.

prefetch(dataset: str, asof_range: tuple[date, date] | None = None) -> CacheManifest

Bulk fetch and cache all rows for an adapter dataset.

PIT Compatibility Bridge

SourceAdapterPITCompat is a temporary migration bridge. It remains documented because supported downstream PIT integrations still rely on it, but it is not the long-term canonical adapter model.

alphaforge.pit.adapters.source_adapter_compat.SourceAdapterPITCompat

Bases: PITAdapter

Bridge a SourceAdapter into the PITAdapter interface.

Parameters

adapter : SourceAdapter The unified source adapter to wrap. dataset : str The dataset key to use in Query.table (e.g. "macro.fred"). frequency : str Default frequency label for generated PITObservation records.

fetch_asof(series_id: str, asof_date: date, start: date | None = None, end: date | None = None, *, metadata: SeriesMetadata | None = None, **kwargs: Any) -> list[PITObservation]

Fetch PIT observations by delegating to the unified SourceAdapter.

list_vintages(series_id: str) -> list[date]

Not supported via SourceAdapter — return empty.

supports_pit(series_id: str) -> bool

All series routed through this adapter are assumed PIT-capable.