Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
275 changes: 275 additions & 0 deletions docs/docs/custom_ops/custom_sources.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
---
title: Custom Sources
description: Learn how to create custom sources in CocoIndex to read data from any system including APIs, databases, file systems, and external services. Build source specs and connectors with list and get_value methods for flexible data ingestion operations.
toc_max_heading_level: 4
---

A custom source allows you to read data from any system you want, such as APIs, databases, file systems, cloud storage, or other external services.
You can stream data incrementally and provide ordinal information for efficient updates and change tracking.

## Overview

Custom sources are defined by two components:

* A **source spec** that configures the behavior and connection parameters for the source.
* A **source connector** that handles the actual data reading operations. It provides the following required methods:
* `create()`: Create a connector instance from the source spec.
* `list()`: List all available data items. Return keys.
* `get_value()`: Get the full content for a specific data item by given key.

## Source Spec

The source spec defines the configuration parameters for your custom source. When you use this source in a flow (typically by calling [`add_source()`](/docs/core/flow_def#import-from-source)), you instantiate this source spec with specific parameter values.

A source spec is defined as a class that inherits from `cocoindex.op.SourceSpec`.

```python
class CustomSource(cocoindex.op.SourceSpec):
"""
Documentation for the source.
"""
param1: str
param2: int | None = None
...
```

Notes:

* All fields of the spec must have a type serializable / deserializable by the `json` module.
* All subclasses of `SourceSpec` can be instantiated similar to a dataclass, i.e. `ClassName(param1=value1, param2=value2, ...)`.

## Source Connector

A source connector handles the actual data reading operations for your custom source. It defines how data should be accessed from your source system.

Source connectors implement methods to list available data and retrieve specific data items.

A source connector is defined as a class decorated by `@cocoindex.op.source_connector(spec_cls=CustomSource, key_type=KeyType, value_type=ValueType)`.
This is a full-featured source connector definition:

```python
@cocoindex.op.source_connector(
spec_cls=CustomSource,
key_type=DataKeyType,
value_type=DataValueType
)
class CustomSourceConnector:
@staticmethod
async def create(spec: CustomSource) -> "CustomSourceConnector":
"""Required. Create a connector instance from the spec."""
...

async def list(self, options: SourceReadOptions) -> AsyncIterator[PartialSourceRow[DataKeyType, DataValueType]]:
"""Required. List all available data items. `options` is optional."""
...

async def get_value(self, key: DataKeyType, options: SourceReadOptions) -> PartialSourceRowData[DataValueType]:
"""Required. Get the full content for a specific data item. `options` is optional."""
...

def provides_ordinal(self) -> bool:
"""Optional. Return True if this source provides ordinal information."""
return False
```

Your source connector class doesn't have to have everything above:

* `options` arguments are optional. It provides additional hints to tell if the engine needs a certain property currently.
You don't have to take this argument and always provide available properties.
* `provides_ordinal()` is optional. It's a hint to tell if the engine needs ordinal information.
* `create()`, `list()` and `get_value()` methods can be async or sync.

The following data types are involved in the method definitions above: `CustomSource`, `DataKeyType`, `DataValueType`, `PartialSourceRow`, `PartialSourceRowData`. They should be replaced with the actual types in your implementation. We will explain each of them below.

### Data Access Methods

Data access methods handle the actual reading operations - discovering available data and retrieving specific content.

#### `async? def create(spec)` (Required)

This static method creates a connector instance from the source spec. It should return a connector that can be used to access data from the source.

```python
@staticmethod
async def create(spec: CustomSource) -> "CustomSourceConnector":
"""
Create a connector instance. This is where you initialize connections,
validate configuration, authenticate, etc.
"""
# Initialize connections, authenticate, etc.
return CustomSourceConnector(spec, ...)
```

#### `async? def list(options?)` (Required)

This method enumerates all available data items from the source. It should yield `PartialSourceRow` objects containing:

* `key`: A unique identifier for the data item
* `data`: A `PartialSourceRowData` object with metadata (typically just `ordinal` information)

The optional `options` parameter provides hints about what information is needed:

```python
async def list(self, options: SourceReadOptions) -> AsyncIterator[PartialSourceRow[DataKeyType, DataValueType]]:
"""
List all available data items. This method is used by CocoIndex to
discover what data is available and track changes.
"""
# Enumerate data items from your source
for item_metadata in await self._fetch_item_list():
data = PartialSourceRowData[DataValueType]()

# Include ordinal if requested and available.
# Must provide if `provides_ordinal()` returns True and `include_ordinal` is True.
if options.include_ordinal and item_metadata.timestamp:
data.ordinal = item_metadata.timestamp

# Optionally include full value if it's cheap to fetch and requested
if options.include_value and self._can_fetch_cheaply(item_metadata):
data.value = await self._fetch_item_content(item_metadata.id)

yield PartialSourceRow(
key=DataKeyType(id=item_metadata.id),
data=data
)
```

`options` is a hint. You don't have to take this argument.
Without it, provide all fields as long as they're available.

#### `async? def get_value(key, options?)` (Required)

This method retrieves the full content for a specific data item given its key. It should return a `PartialSourceRowData` object containing the actual data.

The optional `options` parameter provides hints about what information is needed:

```python
async def get_value(self, key: DataKeyType, options: SourceReadOptions) -> PartialSourceRowData[DataValueType]:
"""
Get the full content for a specific data item. CocoIndex calls this
when it needs the actual data content for processing.
"""
# Fetch the full content for the given key
content = await self._fetch_item_content(key.id)
if content is None:
return PartialSourceRowData(
value=NON_EXISTENCE,
ordinal=NO_ORDINAL,
content_version_fp=None
)

data = PartialSourceRowData(
value=DataValueType(
title=content.title,
content=content.body,
author=content.author
)
)

# Include ordinal if requested and available
if options.include_ordinal and content.timestamp:
data.ordinal = content.timestamp

# Include content version fingerprint if requested and easily available
if options.include_content_version_fp and content.etag:
data.content_version_fp = content.etag.encode()

return data
```

`options` is a hint. You don't have to take this argument.
Without it, provide all fields as long as they're available.

#### `def provides_ordinal()` (Optional)

Returns `True` if this source provides ordinal information (such as timestamps) that can be used for efficient change detection. If `True`, CocoIndex can use this information to optimize updates.

```python
def provides_ordinal(self) -> bool:
"""
Return True if this source provides ordinal information.
"""
return True # If your source provides timestamps or version numbers
```

### Data Types

#### SourceReadOptions

The `SourceReadOptions` class provides hints to source connectors about what information is needed. This allows connectors to optimize their data fetching:

```python
@dataclasses.dataclass
class SourceReadOptions:
"""Options for reading source data."""
include_ordinal: bool = False
include_content_version_fp: bool = False
include_value: bool = False
```

**Fields:**

* `include_ordinal`: Whether to include ordinal information (timestamps, version numbers). Required in `list()` when `provides_ordinal()` returns `True` and this is `True`.
* `include_content_version_fp`: Whether to include content version fingerprints for change detection. Always optional.
* `include_value`: Whether to include full values. Required in `get_value()` when this is `True`. Optional in `list()` - useful when fetching values is cheap.

#### PartialSourceRow

Represents a single data item from the source:

```python
@dataclasses.dataclass
class PartialSourceRow(Generic[K, V]):
key: K # Unique identifier for the data item
data: PartialSourceRowData[V] # Metadata and optionally the value
```

#### PartialSourceRowData

Contains the data and metadata for a source item:

```python
@dataclasses.dataclass
class PartialSourceRowData(Generic[V]):
value: V | Literal["NON_EXISTENCE"] | None = None # The actual data content
ordinal: int | Literal["NO_ORDINAL"] | None = None # Timestamp or version number
content_version_fp: bytes | None = None # Content fingerprint for change detection
```

**Fields:**

* `value`: The actual data content, or `cocoindex.op.NON_EXISTENCE` if the item doesn't exist, or `None` if not included.
* `ordinal`: Timestamp or version number for change tracking, or `cocoindex.op.NO_ORDINAL` if not available, or `None` if not included. An ordinal needs to be monotonically increasing.
* `content_version_fp`: Content fingerprint (hash, ETag, etc.) for efficient change detection, or `None` if not available.

#### Key Type (`DataKeyType`) and Value Type (`DataValueType`) for Data

The data type of custom source is a [*KTable*](/docs/core/data_types#ktable), so the key type and value type also follows the same requirements as key and value of a *KTable*. Specifically:

* The key type uniquely identifies each data item in your source. It can be:
* A `NamedTuple` or frozen dataclass with one or multiple fields.

```python
class DataKeyType(NamedTuple):
id: str
category: str | None = None
```

* When there's a single key part, you can also use a simple type like `str` or `int` for single-field keys.
The key field name will be `cocoindex.KEY_FIELD_NAME` for this case.

* The value type represents the actual data content.
It's typically a dataclass containing all the value fields the source exposes.

```python
@dataclasses.dataclass
class DataValueType:
title: str
content: str
author: str | None
created_at: datetime | None
```

## Example

See the [custom_source_hn](https://github.com/cocoindex-io/cocoindex/blob/main/examples/custom_source_hn/main.py) example for a complete end-to-end implementation.
1 change: 1 addition & 0 deletions docs/sidebars.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ const sidebars: SidebarsConfig = {
label: 'Custom Operations',
collapsed: false,
items: [
'custom_ops/custom_sources',
'custom_ops/custom_functions',
'custom_ops/custom_targets',
],
Expand Down
21 changes: 21 additions & 0 deletions python/cocoindex/op.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,27 @@ def _inner(fn: Callable[..., Any]) -> Callable[..., Any]:

@dataclasses.dataclass
class SourceReadOptions:
"""
The options for reading a source row.
This is argument for both `list()` and `get_value()` methods.
Note that in most cases (unless spelled out otherwise below) it's not a mandatory requirement, but more like a hint to say it's useful under the current context.

- include_ordinal: Whether to include the ordinal of the source row.
When provides_ordinal() returns True, you must provide `ordinal` in `list()` when `include_ordinal` is True.
It's optional for other cases. It's helpful to skip unnecessary reprocessing early, and avoid output from older version of input over-writing the latest one when there's concurrency (especially multiple processes) and source updates frequently.

- include_content_version_fp: Whether to include the content version fingerprint of the source row.
It's always optional even if this is True.
It's helpful to skip unnecessary reprocessing early.
You should only consider providing it if you can directly get it without computing the hash on the content.

- include_value: Whether to include the value of the source row.
You must provide it in `get_value()` when `include_value` is True.
It's optional for `list()`.
Consider providing it when it's significantly cheaper then calling another `get_value()` for each row.
It will save costs of individual `get_value()` calls.
"""

include_ordinal: bool = False
include_content_version_fp: bool = False
include_value: bool = False
Expand Down
Loading