|
| 1 | +--- |
| 2 | +title: Live Updates |
| 3 | +description: "Keep your indexes up-to-date with live updates in CocoIndex." |
| 4 | +--- |
| 5 | + |
| 6 | +# Live Updates |
| 7 | + |
| 8 | +CocoIndex is designed to keep your indexes synchronized with your data sources. This is achieved through a feature called **live updates**, which automatically detects changes in your sources and updates your indexes accordingly. This ensures that your search results and data analysis are always based on the most current information. |
| 9 | + |
| 10 | +## How Live Updates Work |
| 11 | + |
| 12 | +Live updates in CocoIndex can be triggered in two main ways: |
| 13 | + |
| 14 | +1. **Refresh Interval:** You can configure a `refresh_interval` for any data source. CocoIndex will then periodically check the source for any new, updated, or deleted data. This is a simple and effective way to keep your index fresh, especially for sources that don't have a built-in change notification system. |
| 15 | + |
| 16 | +2. **Change Capture Mechanisms:** Some data sources offer more sophisticated ways to track changes. For example: |
| 17 | + * **Amazon S3:** You can configure an SQS queue to receive notifications whenever a file is added, modified, or deleted in your S3 bucket. CocoIndex can listen to this queue and trigger an update instantly. |
| 18 | + * **Google Drive:** The Google Drive source can be configured to poll for recent changes, which is more efficient than a full refresh. |
| 19 | + |
| 20 | +When a change is detected, CocoIndex performs an **incremental update**. This means it only re-processes the data that has been affected by the change, without having to re-index your entire dataset. This makes the update process fast and efficient. |
| 21 | + |
| 22 | +Here's an example of how to set up a source with a `refresh_interval`: |
| 23 | + |
| 24 | +```python |
| 25 | +@cocoindex.flow_def(name="LiveUpdateExample") |
| 26 | +def live_update_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): |
| 27 | + # Source: local files in the 'data' directory |
| 28 | + data_scope["documents"] = flow_builder.add_source( |
| 29 | + cocoindex.sources.LocalFile(path="data"), |
| 30 | + refresh_interval=cocoindex.timedelta(seconds=5), |
| 31 | + ) |
| 32 | + # ... |
| 33 | +``` |
| 34 | + |
| 35 | +By setting `refresh_interval` to 5 seconds, we're telling CocoIndex to check for changes in the `data` directory every 5 seconds. |
| 36 | + |
| 37 | +## Implementing Live Updates |
| 38 | + |
| 39 | +You can enable live updates using either the CocoIndex CLI or the Python library. |
| 40 | + |
| 41 | +### Using the CLI |
| 42 | + |
| 43 | +To start a live update process from the command line, use the `update` command with the `-L` or `--live` flag: |
| 44 | + |
| 45 | +```bash |
| 46 | +cocoindex update -L your_flow_definition_file.py |
| 47 | +``` |
| 48 | + |
| 49 | +This will start a long-running process that continuously monitors your data sources for changes and updates your indexes in real-time. You can stop the process by pressing `Ctrl+C`. |
| 50 | + |
| 51 | +### Using the Python Library |
| 52 | + |
| 53 | +For more control over the live update process, you can use the `FlowLiveUpdater` class in your Python code. This is particularly useful when you want to integrate CocoIndex into a larger application. |
| 54 | + |
| 55 | +The `FlowLiveUpdater` can be used as a context manager, which automatically starts the updater when you enter the `with` block and stops it when you exit. The `wait()` method will block until the updater is aborted (e.g., by pressing `Ctrl+C`). |
| 56 | + |
| 57 | +Here's how you can use `FlowLiveUpdater` to start and manage a live update process: |
| 58 | + |
| 59 | +```python |
| 60 | +import cocoindex |
| 61 | + |
| 62 | +# Create a FlowLiveUpdater instance |
| 63 | +with cocoindex.FlowLiveUpdater(live_update_flow, cocoindex.FlowLiveUpdaterOptions(print_stats=True)) as updater: |
| 64 | + print("Live updater started. Press Ctrl+C to stop.") |
| 65 | + # The updater runs in the background. |
| 66 | + # The wait() method blocks until the updater is stopped. |
| 67 | + updater.wait() |
| 68 | + |
| 69 | +print("Live updater stopped.") |
| 70 | +``` |
| 71 | + |
| 72 | +#### Getting Status Updates |
| 73 | + |
| 74 | +You can also get status updates from the `FlowLiveUpdater` to monitor the update process. The `next_status_updates()` method blocks until there is a new status update. |
| 75 | + |
| 76 | +```python |
| 77 | +import cocoindex |
| 78 | + |
| 79 | +updater = cocoindex.FlowLiveUpdater(live_update_flow) |
| 80 | +updater.start() |
| 81 | + |
| 82 | +while True: |
| 83 | + updates = updater.next_status_updates() |
| 84 | + |
| 85 | + if not updates.active_sources: |
| 86 | + print("All sources have finished processing.") |
| 87 | + break |
| 88 | + |
| 89 | + for source_name in updates.updated_sources: |
| 90 | + print(f"Source '{source_name}' has been updated.") |
| 91 | + |
| 92 | +updater.wait() |
| 93 | +``` |
| 94 | + |
| 95 | +This allows you to react to updates in your application, for example, by notifying users or triggering downstream processes. |
| 96 | + |
| 97 | +## Example |
| 98 | + |
| 99 | +Let's walk through an example of how to set up a live update flow. For the complete, runnable code, see the [live updates example](https://github.com/cocoindex-io/cocoindex/tree/main/examples/live_updates) in the CocoIndex repository. |
| 100 | + |
| 101 | +### 1. Setting up the Source |
| 102 | + |
| 103 | +The first step is to define a source and configure a `refresh_interval`. In this example, we'll use a `LocalFile` source to monitor a directory named `data`. |
| 104 | + |
| 105 | +```python |
| 106 | +@cocoindex.flow_def(name="LiveUpdateExample") |
| 107 | +def live_update_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): |
| 108 | + # Source: local files in the 'data' directory |
| 109 | + data_scope["documents"] = flow_builder.add_source( |
| 110 | + cocoindex.sources.LocalFile(path="data"), |
| 111 | + refresh_interval=cocoindex.timedelta(seconds=5), |
| 112 | + ) |
| 113 | + |
| 114 | + # Collector |
| 115 | + collector = data_scope.add_collector() |
| 116 | + with data_scope["documents"].row() as doc: |
| 117 | + collector.collect(filename=doc["filename"], content=doc["content"]) |
| 118 | + |
| 119 | + # Target: Postgres database |
| 120 | + collector.export( |
| 121 | + "documents_index", |
| 122 | + cocoindex.targets.Postgres(), |
| 123 | + primary_key_fields=["filename"] |
| 124 | + ) |
| 125 | +``` |
| 126 | + |
| 127 | +By setting `refresh_interval` to 5 seconds, we're telling CocoIndex to check for changes in the `data` directory every 5 seconds. |
| 128 | + |
| 129 | +### 2. Running the Live Updater |
| 130 | + |
| 131 | +Once the flow is defined, you can use the `FlowLiveUpdater` to start the live update process. |
| 132 | + |
| 133 | +```python |
| 134 | +def main(): |
| 135 | + # Initialize CocoIndex |
| 136 | + cocoindex.init() |
| 137 | + |
| 138 | + # Setup the flow |
| 139 | + live_update_flow.setup(report_to_stdout=True) |
| 140 | + |
| 141 | + # Start the live updater |
| 142 | + with cocoindex.FlowLiveUpdater(live_update_flow, cocoindex.FlowLiveUpdaterOptions(print_stats=True)) as updater: |
| 143 | + print("Live updater started. Watching for changes in the 'data' directory.") |
| 144 | + updater.wait() |
| 145 | + |
| 146 | +if __name__ == "__main__": |
| 147 | + main() |
| 148 | +``` |
| 149 | + |
| 150 | +The `FlowLiveUpdater` will run in the background, and the `updater.wait()` call will block until the process is stopped. |
| 151 | + |
| 152 | +## Conclusion |
| 153 | + |
| 154 | +Live updates is a powerful feature of CocoIndex that ensures your indexes are always fresh. By using a combination of refresh intervals and source-specific change capture mechanisms, you can build responsive, real-time applications that are always in sync with your data. |
| 155 | + |
| 156 | +For more detailed information on the `FlowLiveUpdater` and other live update options, please refer to the [Run a Flow documentation](https://cocoindex.io/docs/core/flow_methods#live-update). |
0 commit comments