diff --git a/docs/docs/core/basics.md b/docs/docs/core/basics.md index f2fd89a10..c30a47b04 100644 --- a/docs/docs/core/basics.md +++ b/docs/docs/core/basics.md @@ -1,6 +1,6 @@ --- title: Basics -description: CocoIndex Basics +description: "CocoIndex basic concepts: indexing flow, data, operations, data updates, etc." --- # CocoIndex Basics @@ -9,7 +9,7 @@ An **index** is a collection of data stored in a way that is easy for retrieval. CocoIndex is an ETL framework for building indexes from specified data sources, a.k.a. indexing. It also offers utilities for users to retrieve data from the indexes. -## Indexing Flow +## Indexing flow An indexing flow extracts data from speicfied data sources, upon specified transformations, and puts the transformed data into specified storage for later retrieval. @@ -36,7 +36,7 @@ An **operation** in an indexing flow defines a step in the flow. An operation is * **Action**, which defines the behavior of the operation, e.g. *import*, *transform*, *for each*, *collect* and *export*. See [Flow Definition](flow_def) for more details for each action. -* Some actions (i.e. "import", "transform" and "export") require an **Operation Spec**, which describes the specific behavior of the operation, e.g. a source to import from, a function describing the transformation behavior, a storage to export to as an index. +* Some actions (i.e. "import", "transform" and "export") require an **Operation Spec**, which describes the specific behavior of the operation, e.g. a source to import from, a function describing the transformation behavior, a target storage to export to (as an index). * Each operation spec has a **operation type**, e.g. `LocalFile` (data source), `SplitRecursively` (function), `SentenceTransformerEmbed` (function), `Postgres` (storage). * CocoIndex framework maintains a set of supported operation types. Users can also implement their own. @@ -60,31 +60,40 @@ This shows schema and example data for the indexing flow: ![Data Example](data_example.svg) -### Life Cycle of an Indexing Flow +### Life cycle of an indexing flow -An indexing flow, once set up, maintains a long-lived relationship between source data and indexes. This means: +An indexing flow, once set up, maintains a long-lived relationship between data source and data in target storage. This means: + +1. The target storage created by the flow remain available for querying at any time + +2. As source data changes (new data added, existing data updated or deleted), data in the target storage are updated to reflect those changes, + on certain pace, according to the update mode: + + * **One time update**: Once triggered, CocoIndex updates the target data to reflect the version of source data up to the current moment. + * **Live update**: CocoIndex continuously watches the source data and updates the target data accordingly. + + See more details in the [build / update target data](flow_methods#build--update-target-data) section. + +3. CocoIndex intelligently manages these updates by: + * Determining which parts of the target data need to be recomputed + * Reusing existing computations where possible + * Only reprocessing the minimum necessary data -1. The indexes created by the flow remain available for querying at any time -2. When source data changes, the indexes are automatically updated to reflect those changes -3. CocoIndex intelligently manages these updates by: - - Determining which parts of the index need to be recomputed - - Reusing existing computations where possible - - Only reprocessing the minimum necessary data You can think of an indexing flow similar to formulas in a spreadsheet: -- In a spreadsheet, you define formulas that transform input cells into output cells -- When input values change, the spreadsheet automatically recalculates affected outputs -- You focus on defining the transformation logic, not managing updates +* In a spreadsheet, you define formulas that transform input cells into output cells +* When input values change, the spreadsheet recalculates affected outputs +* You focus on defining the transformation logic, not managing updates CocoIndex works the same way, but with more powerful capabilities: -- Instead of flat tables, CocoIndex models data in nested data structures, making it more natural to model complex data -- Instead of simple cell-level formulas, you have operations like "for each" to apply the same formula across rows without repeating yourself +* Instead of flat tables, CocoIndex models data in nested data structures, making it more natural to model complex data +* Instead of simple cell-level formulas, you have operations like "for each" to apply the same formula across rows without repeating yourself -This means when writing your flow operations, you can treat source data as if it were static - focusing purely on defining the transformation logic. CocoIndex takes care of maintaining the dynamic relationship between sources and indexes behind the scenes. +This means when writing your flow operations, you can treat source data as if it were static - focusing purely on defining the transformation logic. CocoIndex takes care of maintaining the dynamic relationship between sources and target data behind the scenes. -### Internal Storage +### Internal storage As an indexing flow is long-lived, it needs to store intermediate data to keep track of the states. CocoIndex uses internal storage for this purpose. @@ -94,9 +103,9 @@ See [Initialization](initialization) for configuring its location, and `cocoinde ## Retrieval -There are two ways to retrieve data from indexes built by an indexing flow: +There are two ways to retrieve data from target storage built by an indexing flow: -* Query the underlying index storage directly for maximum flexibility. -* Use CocoIndex *query handlers* for a more convenient experience with built-in tooling support (e.g. CocoInsight) to understand query performance against the index. +* Query the underlying target storage directly for maximum flexibility. +* Use CocoIndex *query handlers* for a more convenient experience with built-in tooling support (e.g. CocoInsight) to understand query performance against the target data. -Query handlers are tied to specific indexing flows. They accept query inputs, transform them by defined operations, and retrieve matching data from the index storage that was created by the flow. \ No newline at end of file +Query handlers are tied to specific indexing flows. They accept query inputs, transform them by defined operations, and retrieve matching data from the target storage that was created by the flow. \ No newline at end of file diff --git a/docs/docs/core/flow_def.mdx b/docs/docs/core/flow_def.mdx index dbf173ea6..562099a2c 100644 --- a/docs/docs/core/flow_def.mdx +++ b/docs/docs/core/flow_def.mdx @@ -1,6 +1,6 @@ --- title: Flow Definition -description: CocoIndex Flow Definition +description: Define a CocoIndex flow, by specifying source, transformations and storages, and connect input/output data of them. --- import Tabs from '@theme/Tabs'; @@ -8,7 +8,8 @@ import TabItem from '@theme/TabItem'; # CocoIndex Flow Definition -In CocoIndex, to define an indexing flow, you provide a function to construct the flow, by adding operations and connecting them with fields. +In CocoIndex, to define an indexing flow, you provide a function to import source, transform data and put them into target storage (sinks). +You connect input/output of these operations with fields of data scopes. ## Entry Point @@ -43,7 +44,7 @@ demo_flow = cocoindex.flow.add_flow_def("DemoFlow", demo_flow_def) ``` In both cases, `demo_flow` will be an object with `cocoindex.Flow` class type. -See [Flow Methods](/docs/core/flow_methods) for more details on it. +See [Flow Running](/docs/core/flow_methods) for more details on it. @@ -52,7 +53,7 @@ See [Flow Methods](/docs/core/flow_methods) for more details on it. The `FlowBuilder` object is the starting point to construct a flow. -### Import From Source +### Import from source `FlowBuilder` provides a `add_source()` method to import data from external sources. A *source spec* needs to be provided for any import operation, to describe the source and parameters related to the source. @@ -64,7 +65,7 @@ Import must happen at the top level, and the field created by import must be in ```python @cocoindex.flow_def(name="DemoFlow") def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): - data_scope["documents"] = flow_builder.add_source(DemoSourceSpec(...)) + data_scope["documents"] = flow_builder.add_source(DemoSourceSpec(...)) ...... ``` @@ -74,9 +75,42 @@ def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataSco `add_source()` returns a `DataSlice`. Once external data sources are imported, you can further transform them using methods exposed by these data objects, as discussed in the following sections. We'll describe different data objects in next few sections. -Note that the actual value of data is not available at the time when we define the flow: it's only available at runtime. + +:::note + +The actual value of data is not available at the time when we define the flow: it's only available at runtime. In a flow definition, you can use a data representation as input for operations, but not access the actual value. +::: + +#### Refresh interval + +You can provide a `refresh_interval` argument. +When present, in the [live update mode](/docs/core/flow_methods#live-update), the data source will be refreshed by specified interval. + + + + +The `refresh_interval` argument is of type `datetime.timedelta`. For example, this refreshes the data source every 1 minute: + +```python +@cocoindex.flow_def(name="DemoFlow") +def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): + data_scope["documents"] = flow_builder.add_source( + DemoSourceSpec(...), refresh_interval=datetime.timedelta(minutes=1)) + ...... +``` + + + + +:::info + +In live update mode, for each refresh, CocoIndex will traverse the data source to figure out the changes, +and only perform transformations on changed source keys. + +::: + ## Data Scope A **data scope** represents data for a certain unit, e.g. the top level scope (involving all data for a flow), for a document, or for a chunk. @@ -84,7 +118,13 @@ A data scope has a bunch of fields and collectors, and users can add new fields ### Get or Add a Field -Get or add a field of a data scope (which is a data slice). Note that you cannot override an existing field. +You can get or add a field of a data scope (which is a data slice). + +:::note + +You cannot override an existing field. + +::: @@ -95,20 +135,20 @@ Getting and setting a field of a data scope is done by the `[]` operator with a @cocoindex.flow_def(name="DemoFlow") def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): - # Add "documents" to the top-level data scope. - data_scope["documents"] = flow_builder.add_source(DemoSourceSpec(...)) + # Add "documents" to the top-level data scope. + data_scope["documents"] = flow_builder.add_source(DemoSourceSpec(...)) - # Each row of "documents" is a child scope. - with data_scope["documents"].row() as document: + # Each row of "documents" is a child scope. + with data_scope["documents"].row() as document: - # Get "content" from the document scope, transform, and add "summary" to scope. - document["summary"] = field1_row["content"].transform(DemoFunctionSpec(...)) + # Get "content" from the document scope, transform, and add "summary" to scope. + document["summary"] = field1_row["content"].transform(DemoFunctionSpec(...)) ``` -### Add a Collector +### Add a collector See [Data Collector](#data-collector) below for more details. @@ -132,17 +172,17 @@ Other arguments can be passed in as positional arguments or keyword arguments, a ```python @cocoindex.flow_def(name="DemoFlow") def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): - ... - data_scope["field2"] = data_scope["field1"].transform( - DemoFunctionSpec(...), - arg1, arg2, ..., key0=kwarg0, key1=kwarg1, ...) - ... + ... + data_scope["field2"] = data_scope["field1"].transform( + DemoFunctionSpec(...), + arg1, arg2, ..., key0=kwarg0, key1=kwarg1, ...) + ... ``` -### For Each Row +### For each row If the data slice has `Table` type, you can call `row()` method to obtain a child scope representing each row, to apply operations on each row. @@ -161,7 +201,7 @@ def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataSco -### Get a Sub Field +### Get a sub field If the data slice has `Struct` type, you can obtain a data slice on a specific sub field of it, similar to getting a field of a data scope. @@ -192,14 +232,14 @@ For example, ```python @cocoindex.flow_def(name="DemoFlow") def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): - ... - demo_collector = data_scope.add_collector() - with data_scope["documents"].row() as document: ... - demo_collector.collect(id=cocoindex.GeneratedField.UUID, - filename=document["filename"], - summary=document["summary"]) - ... + demo_collector = data_scope.add_collector() + with data_scope["documents"].row() as document: + ... + demo_collector.collect(id=cocoindex.GeneratedField.UUID, + filename=document["filename"], + summary=document["summary"]) + ... ``` @@ -228,13 +268,13 @@ Export must happen at the top level of a flow, i.e. not within any child scopes ```python @cocoindex.flow_def(name="DemoFlow") def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): - ... - demo_collector = data_scope.add_collector() - ... - demo_collector.export( - "demo_storage", DemoStorageSpec(...), - primary_key_fields=["field1"], - vector_index=[("field2", cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY)]) + ... + demo_collector = data_scope.add_collector() + ... + demo_collector.export( + "demo_storage", DemoStorageSpec(...), + primary_key_fields=["field1"], + vector_index=[("field2", cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY)]) ``` diff --git a/docs/docs/core/flow_methods.mdx b/docs/docs/core/flow_methods.mdx index 727a21dbd..1d3b41e7b 100644 --- a/docs/docs/core/flow_methods.mdx +++ b/docs/docs/core/flow_methods.mdx @@ -1,40 +1,188 @@ --- -title: Flow Methods -description: Methods of a CocoIndex Flow +title: Flow Running +description: Run a CocoIndex Flow, including build / update data in the target storage and evaluate the flow without changing the target storage. --- import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; -# Methods of a CocoIndex Flow +# Running a CocoIndex Flow -After a flow is defined as discussed in [Flow Definition](/docs/core/flow_def), you get a Flow object and can call its methods to operate with it. +After a flow is defined as discussed in [Flow Definition](/docs/core/flow_def), you can start to transform data with it. -## update +It can be achieved in two ways: -The `update()` method will update the index defined by the flow. +* Use APIs provided by the library. + You have a `cocoindex.Flow` object after defining the flow in your code, and you can interact with it later. -Once the function returns, the indice is fresh up to the moment when the function is called. +* Use [CocoIndex CLI](/docs/core/cli). + +We'll focus on the first way in this document. +The following sections assume you have a `demo_flow`: + + + + +```python +@cocoindex.flow_def(name="DemoFlow") +def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): + ... +``` + +It creates a `demo_flow` object in `cocoindex.Flow` type. + + + + + +## Build / update target data + +The major goal of a flow is to perform the transformations on source data and build / update data in the target storage (the index). +This action has two flavors: + +* **One time update.** + It builds/update the target data based on source data up to the current moment. + After the target data is at least as fresh as the source data when update starts, it's done. + It fits into situations that you need to access the fresh target data at certain time points. + +* **Live update.** + It continuously watches the source data and updates the target data accordingly. + It's long-running and only stops when being aborted explicitly. + It fits into situations that you need to access the fresh target data continuously in most of the time. + +:::info + +For both flavors, CocoIndex is performing updates incrementally. +i.e. we only performs computations and storage mutations on source data that are changed, or the flow has changed. +This is to achieve best efficiency. + +::: + + +### One time update + +:::tip + +CLI equivalence: `cocoindex update` + +::: + +The `update()` async method creates/updates data in the target storage. + +Once the function returns, the target data is fresh up to the moment when the function is called. + + + + +```python +stats = await demo_flow.update() +print(stats) +``` + + + + +### Live update + +:::tip + +CLI equivalence: `cocoindex update -L` + +::: + +Live update is *eligible* for certain data sources, including: + +* Data sources configured with a [refresh interval](flow_def#refresh-interval). +* Data sources provides a **change stream**. + +To perform live update, you need to create a `cocoindex.FlowLiveUpdater` object using the `cocoindex.Flow` object. +It takes an optional `cocoindex.FlowLiveUpdaterOptions` option, with the following fields: + +* `live_mode` (type: `bool`, default: `True`): + Whether to perform live update for eligible data sources. + +* `print_stats` (type: `bool`, default: `False`): Whether to print stats during update. + +For data sources ineligible for live updates, or when the `live_mode` is `False`, +the `FlowLiveUpdater` only performs a one-time update, i.e. similar to the one-time update (`update()` method) above, +under a unified interface. +This creates a `cocoindex.FlowLiveUpdater` object, with an optional `cocoindex.FlowLiveUpdaterOptions` option: + ```python -flow.update() +my_updater = cocoindex.FlowLiveUpdater( + demo_flow, cocoindex.FlowLiveUpdaterOptions(print_stats=True)) ``` -## evaluate_and_dump +A `FlowLiveUpdater` object supports the following methods: + +* `abort()`: Abort the updater. +* `wait()` (async): Wait for the updater to finish. It only unblocks in one of the following cases: + * The updater was aborted. + * A one time update is done, and live update is not enabled: + either `live_mode` is `False`, or all data sources are ineligible for live updates. +* `update_stats()`: It returns the stats of the updater. + + + + +```python +my_updater = cocoindex.FlowLiveUpdater(demo_flow) + +# Perform your own logic (e.g. a query loop). +... + +# Print the update stats. +print(my_updater.update_stats()) +# Abort the updater. +my_updater.abort() +# Wait for the updater to finish. +await my_updater.wait() +``` + +Python SDK also allows you to use the updater as a context manager. +It will abort and wait for the updater to finish automatically when the context is exited. +The following code is equivalent to the code above: + +```python +async with cocoindex.FlowLiveUpdater(demo_flow) as my_updater: + # Perform your own logic (e.g. a query loop). + ... + print(my_updater.update_stats()) +``` + +Within a synchronous function, remove `async` before `with`, like this: + +```python +with cocoindex.FlowLiveUpdater(demo_flow) as my_updater: + ... +``` + + + + +## Evaluate the flow + +:::tip + +CLI equivalence: `cocoindex evaluate` + +::: -The `evaluate_and_dump()` method evaluates the flow and dump flow outputs to files. +CocoIndex allows you to run the transformations defined by the flow without updating the target storage. +The `evaluate_and_dump()` method supports this by dumping flow outputs to files. It takes a `EvaluateAndDumpOptions` dataclass as input to configure, with the following fields: * `output_dir` (type: `str`, required): The directory to dump the result to. * `use_cache` (type: `bool`, default: `True`): Use already-cached intermediate data if available. - Note that we only reuse existing cached data without updating the cache even if it's turned on. + Note that we only read existing cached data without updating the cache, even if it's turned on. diff --git a/docs/docs/ops/sources.md b/docs/docs/ops/sources.md index 0a775be08..54f4ea60b 100644 --- a/docs/docs/ops/sources.md +++ b/docs/docs/ops/sources.md @@ -59,6 +59,21 @@ The spec takes the following fields: * `service_account_credential_path` (type: `str`, required): full path to the service account credential file in JSON format. * `root_folder_ids` (type: `list[str]`, required): a list of Google Drive folder IDs to import files from. * `binary` (type: `bool`, optional): whether reading files as binary (instead of text). +* `recent_changes_poll_interval` (type: `datetime.timedelta`, optional): when set, this source provides a *change stream* by polling Google Drive for recent modified files periodically. + + :::info + + Since it only retrieves metadata for recent modified files (up to the previous poll) during polling, + it's typically cheaper than a full refresh by setting the [refresh interval](../core/flow_def#refresh-interval) especially when the folder contains a large number of files. + So you can usually set it with a smaller value compared to the `refresh_interval`. + + On the other hand, this only detects changes for files still exists. + If the file is deleted (or the current account no longer has access to), this change will not be detected by this change stream. + + So when a source is configured with a change stream, it's still recommended to set a `refresh_interval`, with a larger value. + So for most changes can be covered by the change stream (with low latency), and remaining changes (files no longer exist or accessible) will still be covered (with a higher latency). + + ::: ### Schema