|
| 1 | +--- |
| 2 | +title: Custom Sources |
| 3 | +description: Learn how to create custom sources in CocoIndex to read data from any system including APIs, databases, file systems, and external services. Build source specs and connectors with list and get_value methods for flexible data ingestion operations. |
| 4 | +toc_max_heading_level: 4 |
| 5 | +--- |
| 6 | + |
| 7 | +A custom source allows you to read data from any system you want, such as APIs, databases, file systems, cloud storage, or other external services. |
| 8 | +You can stream data incrementally and provide ordinal information for efficient updates and change tracking. |
| 9 | + |
| 10 | +## Overview |
| 11 | + |
| 12 | +Custom sources are defined by two components: |
| 13 | + |
| 14 | +* A **source spec** that configures the behavior and connection parameters for the source. |
| 15 | +* A **source connector** that handles the actual data reading operations. It provides the following required methods: |
| 16 | + * `create()`: Create a connector instance from the source spec. |
| 17 | + * `list()`: List all available data items. Return keys. |
| 18 | + * `get_value()`: Get the full content for a specific data item by given key. |
| 19 | + |
| 20 | +## Source Spec |
| 21 | + |
| 22 | +The source spec defines the configuration parameters for your custom source. When you use this source in a flow (typically by calling [`add_source()`](/docs/core/flow_def#import-from-source)), you instantiate this source spec with specific parameter values. |
| 23 | + |
| 24 | +A source spec is defined as a class that inherits from `cocoindex.op.SourceSpec`. |
| 25 | + |
| 26 | +```python |
| 27 | +class CustomSource(cocoindex.op.SourceSpec): |
| 28 | + """ |
| 29 | + Documentation for the source. |
| 30 | + """ |
| 31 | + param1: str |
| 32 | + param2: int | None = None |
| 33 | + ... |
| 34 | +``` |
| 35 | + |
| 36 | +Notes: |
| 37 | + |
| 38 | +* All fields of the spec must have a type serializable / deserializable by the `json` module. |
| 39 | +* All subclasses of `SourceSpec` can be instantiated similar to a dataclass, i.e. `ClassName(param1=value1, param2=value2, ...)`. |
| 40 | + |
| 41 | +## Source Connector |
| 42 | + |
| 43 | +A source connector handles the actual data reading operations for your custom source. It defines how data should be accessed from your source system. |
| 44 | + |
| 45 | +Source connectors implement methods to list available data and retrieve specific data items. |
| 46 | + |
| 47 | +A source connector is defined as a class decorated by `@cocoindex.op.source_connector(spec_cls=CustomSource, key_type=KeyType, value_type=ValueType)`. |
| 48 | +This is a full-featured source connector definition: |
| 49 | + |
| 50 | +```python |
| 51 | +@cocoindex.op.source_connector( |
| 52 | + spec_cls=CustomSource, |
| 53 | + key_type=DataKeyType, |
| 54 | + value_type=DataValueType |
| 55 | +) |
| 56 | +class CustomSourceConnector: |
| 57 | + @staticmethod |
| 58 | + async def create(spec: CustomSource) -> "CustomSourceConnector": |
| 59 | + """Required. Create a connector instance from the spec.""" |
| 60 | + ... |
| 61 | + |
| 62 | + async def list(self, options: SourceReadOptions) -> AsyncIterator[PartialSourceRow[DataKeyType, DataValueType]]: |
| 63 | + """Required. List all available data items. `options` is optional.""" |
| 64 | + ... |
| 65 | + |
| 66 | + async def get_value(self, key: DataKeyType, options: SourceReadOptions) -> PartialSourceRowData[DataValueType]: |
| 67 | + """Required. Get the full content for a specific data item. `options` is optional.""" |
| 68 | + ... |
| 69 | + |
| 70 | + def provides_ordinal(self) -> bool: |
| 71 | + """Optional. Return True if this source provides ordinal information.""" |
| 72 | + return False |
| 73 | +``` |
| 74 | + |
| 75 | +Your source connector class doesn't have to have everything above: |
| 76 | + |
| 77 | +* `options` arguments are optional. It provides additional hints to tell if the engine needs a certain property currently. |
| 78 | + You don't have to take this argument and always provide available properties. |
| 79 | +* `provides_ordinal()` is optional. It's a hint to tell if the engine needs ordinal information. |
| 80 | +* `create()`, `list()` and `get_value()` methods can be async or sync. |
| 81 | + |
| 82 | +The following data types are involved in the method definitions above: `CustomSource`, `DataKeyType`, `DataValueType`, `PartialSourceRow`, `PartialSourceRowData`. They should be replaced with the actual types in your implementation. We will explain each of them below. |
| 83 | + |
| 84 | +### Data Access Methods |
| 85 | + |
| 86 | +Data access methods handle the actual reading operations - discovering available data and retrieving specific content. |
| 87 | + |
| 88 | +#### `async? def create(spec)` (Required) |
| 89 | + |
| 90 | +This static method creates a connector instance from the source spec. It should return a connector that can be used to access data from the source. |
| 91 | + |
| 92 | +```python |
| 93 | +@staticmethod |
| 94 | +async def create(spec: CustomSource) -> "CustomSourceConnector": |
| 95 | + """ |
| 96 | + Create a connector instance. This is where you initialize connections, |
| 97 | + validate configuration, authenticate, etc. |
| 98 | + """ |
| 99 | + # Initialize connections, authenticate, etc. |
| 100 | + return CustomSourceConnector(spec, ...) |
| 101 | +``` |
| 102 | + |
| 103 | +#### `async? def list(options?)` (Required) |
| 104 | + |
| 105 | +This method enumerates all available data items from the source. It should yield `PartialSourceRow` objects containing: |
| 106 | + |
| 107 | +* `key`: A unique identifier for the data item |
| 108 | +* `data`: A `PartialSourceRowData` object with metadata (typically just `ordinal` information) |
| 109 | + |
| 110 | +The optional `options` parameter provides hints about what information is needed: |
| 111 | + |
| 112 | +```python |
| 113 | +async def list(self, options: SourceReadOptions) -> AsyncIterator[PartialSourceRow[DataKeyType, DataValueType]]: |
| 114 | + """ |
| 115 | + List all available data items. This method is used by CocoIndex to |
| 116 | + discover what data is available and track changes. |
| 117 | + """ |
| 118 | + # Enumerate data items from your source |
| 119 | + for item_metadata in await self._fetch_item_list(): |
| 120 | + data = PartialSourceRowData[DataValueType]() |
| 121 | + |
| 122 | + # Include ordinal if requested and available. |
| 123 | + # Must provide if `provides_ordinal()` returns True and `include_ordinal` is True. |
| 124 | + if options.include_ordinal and item_metadata.timestamp: |
| 125 | + data.ordinal = item_metadata.timestamp |
| 126 | + |
| 127 | + # Optionally include full value if it's cheap to fetch and requested |
| 128 | + if options.include_value and self._can_fetch_cheaply(item_metadata): |
| 129 | + data.value = await self._fetch_item_content(item_metadata.id) |
| 130 | + |
| 131 | + yield PartialSourceRow( |
| 132 | + key=DataKeyType(id=item_metadata.id), |
| 133 | + data=data |
| 134 | + ) |
| 135 | +``` |
| 136 | + |
| 137 | +`options` is a hint. You don't have to take this argument. |
| 138 | +Without it, provide all fields as long as they're available. |
| 139 | + |
| 140 | +#### `async? def get_value(key, options?)` (Required) |
| 141 | + |
| 142 | +This method retrieves the full content for a specific data item given its key. It should return a `PartialSourceRowData` object containing the actual data. |
| 143 | + |
| 144 | +The optional `options` parameter provides hints about what information is needed: |
| 145 | + |
| 146 | +```python |
| 147 | +async def get_value(self, key: DataKeyType, options: SourceReadOptions) -> PartialSourceRowData[DataValueType]: |
| 148 | + """ |
| 149 | + Get the full content for a specific data item. CocoIndex calls this |
| 150 | + when it needs the actual data content for processing. |
| 151 | + """ |
| 152 | + # Fetch the full content for the given key |
| 153 | + content = await self._fetch_item_content(key.id) |
| 154 | + if content is None: |
| 155 | + return PartialSourceRowData( |
| 156 | + value=NON_EXISTENCE, |
| 157 | + ordinal=NO_ORDINAL, |
| 158 | + content_version_fp=None |
| 159 | + ) |
| 160 | + |
| 161 | + data = PartialSourceRowData( |
| 162 | + value=DataValueType( |
| 163 | + title=content.title, |
| 164 | + content=content.body, |
| 165 | + author=content.author |
| 166 | + ) |
| 167 | + ) |
| 168 | + |
| 169 | + # Include ordinal if requested and available |
| 170 | + if options.include_ordinal and content.timestamp: |
| 171 | + data.ordinal = content.timestamp |
| 172 | + |
| 173 | + # Include content version fingerprint if requested and easily available |
| 174 | + if options.include_content_version_fp and content.etag: |
| 175 | + data.content_version_fp = content.etag.encode() |
| 176 | + |
| 177 | + return data |
| 178 | +``` |
| 179 | + |
| 180 | +`options` is a hint. You don't have to take this argument. |
| 181 | +Without it, provide all fields as long as they're available. |
| 182 | + |
| 183 | +#### `def provides_ordinal()` (Optional) |
| 184 | + |
| 185 | +Returns `True` if this source provides ordinal information (such as timestamps) that can be used for efficient change detection. If `True`, CocoIndex can use this information to optimize updates. |
| 186 | + |
| 187 | +```python |
| 188 | +def provides_ordinal(self) -> bool: |
| 189 | + """ |
| 190 | + Return True if this source provides ordinal information. |
| 191 | + """ |
| 192 | + return True # If your source provides timestamps or version numbers |
| 193 | +``` |
| 194 | + |
| 195 | +### Data Types |
| 196 | + |
| 197 | +#### SourceReadOptions |
| 198 | + |
| 199 | +The `SourceReadOptions` class provides hints to source connectors about what information is needed. This allows connectors to optimize their data fetching: |
| 200 | + |
| 201 | +```python |
| 202 | +@dataclasses.dataclass |
| 203 | +class SourceReadOptions: |
| 204 | + """Options for reading source data.""" |
| 205 | + include_ordinal: bool = False |
| 206 | + include_content_version_fp: bool = False |
| 207 | + include_value: bool = False |
| 208 | +``` |
| 209 | + |
| 210 | +**Fields:** |
| 211 | + |
| 212 | +* `include_ordinal`: Whether to include ordinal information (timestamps, version numbers). Required in `list()` when `provides_ordinal()` returns `True` and this is `True`. |
| 213 | +* `include_content_version_fp`: Whether to include content version fingerprints for change detection. Always optional. |
| 214 | +* `include_value`: Whether to include full values. Required in `get_value()` when this is `True`. Optional in `list()` - useful when fetching values is cheap. |
| 215 | + |
| 216 | +#### PartialSourceRow |
| 217 | + |
| 218 | +Represents a single data item from the source: |
| 219 | + |
| 220 | +```python |
| 221 | +@dataclasses.dataclass |
| 222 | +class PartialSourceRow(Generic[K, V]): |
| 223 | + key: K # Unique identifier for the data item |
| 224 | + data: PartialSourceRowData[V] # Metadata and optionally the value |
| 225 | +``` |
| 226 | + |
| 227 | +#### PartialSourceRowData |
| 228 | + |
| 229 | +Contains the data and metadata for a source item: |
| 230 | + |
| 231 | +```python |
| 232 | +@dataclasses.dataclass |
| 233 | +class PartialSourceRowData(Generic[V]): |
| 234 | + value: V | Literal["NON_EXISTENCE"] | None = None # The actual data content |
| 235 | + ordinal: int | Literal["NO_ORDINAL"] | None = None # Timestamp or version number |
| 236 | + content_version_fp: bytes | None = None # Content fingerprint for change detection |
| 237 | +``` |
| 238 | + |
| 239 | +**Fields:** |
| 240 | + |
| 241 | +* `value`: The actual data content, or `cocoindex.op.NON_EXISTENCE` if the item doesn't exist, or `None` if not included. |
| 242 | +* `ordinal`: Timestamp or version number for change tracking, or `cocoindex.op.NO_ORDINAL` if not available, or `None` if not included. An ordinal needs to be monotonically increasing. |
| 243 | +* `content_version_fp`: Content fingerprint (hash, ETag, etc.) for efficient change detection, or `None` if not available. |
| 244 | + |
| 245 | +#### Key Type (`DataKeyType`) and Value Type (`DataValueType`) for Data |
| 246 | + |
| 247 | +The data type of custom source is a [*KTable*](/docs/core/data_types#ktable), so the key type and value type also follows the same requirements as key and value of a *KTable*. Specifically: |
| 248 | + |
| 249 | +* The key type uniquely identifies each data item in your source. It can be: |
| 250 | + * A `NamedTuple` or frozen dataclass with one or multiple fields. |
| 251 | + |
| 252 | + ```python |
| 253 | + class DataKeyType(NamedTuple): |
| 254 | + id: str |
| 255 | + category: str | None = None |
| 256 | + ``` |
| 257 | + |
| 258 | + * When there's a single key part, you can also use a simple type like `str` or `int` for single-field keys. |
| 259 | + The key field name will be `cocoindex.KEY_FIELD_NAME` for this case. |
| 260 | + |
| 261 | +* The value type represents the actual data content. |
| 262 | + It's typically a dataclass containing all the value fields the source exposes. |
| 263 | + |
| 264 | + ```python |
| 265 | + @dataclasses.dataclass |
| 266 | + class DataValueType: |
| 267 | + title: str |
| 268 | + content: str |
| 269 | + author: str | None |
| 270 | + created_at: datetime | None |
| 271 | + ``` |
| 272 | + |
| 273 | +## Example |
| 274 | + |
| 275 | +See the [custom_source_hn](https://github.com/cocoindex-io/cocoindex/blob/main/examples/custom_source_hn/main.py) example for a complete end-to-end implementation. |
0 commit comments