|
| 1 | +--- |
| 2 | +id: synchronizing-tables |
| 3 | +title: Synchronize multiple tables |
| 4 | +--- |
| 5 | + |
| 6 | +When working with multiple related tables in Deephaven, you may encounter situations where tables receive updates at different rates. This guide shows how to use `SyncTableFilter` and `LeaderTableFilter` to coordinate updates across multiple tables. |
| 7 | + |
| 8 | +## The synchronization problem |
| 9 | + |
| 10 | +Deephaven does not provide cross-table or cross-partition transactions. The system processes each partition independently to maximize throughput. This means that a row created later in one partition may appear in your query before a row created earlier in a different partition. |
| 11 | + |
| 12 | +This independence can cause consistency issues when you have multiple tables that contain correlated data. For example: |
| 13 | + |
| 14 | +- A trading system might have separate tables for orders, executions, and messages that all share a common transaction ID. |
| 15 | +- A data pipeline might split events across multiple tables, each tagged with a sequence number. |
| 16 | +- A multi-source system might need to wait until all sources report data for a given timestamp. |
| 17 | + |
| 18 | +Both `SyncTableFilter` and `LeaderTableFilter` solve this problem by ensuring that only coordinated rows appear in the filtered results. |
| 19 | + |
| 20 | +## When to use each utility |
| 21 | + |
| 22 | +Choose the synchronization utility based on your table relationships: |
| 23 | + |
| 24 | +- **Use `SyncTableFilter`** when all tables are peers. Each table contributes equally to determining which rows to show. The filter passes through rows where all tables have matching ID values. |
| 25 | + |
| 26 | +- **Use `LeaderTableFilter`** when one table should control synchronization. The leader table contains ID values that dictate which rows from follower tables to show. This is useful when one table acts as a coordination log or contains the authoritative sequence of events. |
| 27 | + |
| 28 | +## Requirements |
| 29 | + |
| 30 | +Both utilities require: |
| 31 | + |
| 32 | +- **Add-only tables**: Tables must not modify, shift, or remove rows. If you filter an add-only source table, the result remains add-only. |
| 33 | +- **Monotonically increasing IDs**: ID values must increase for each key. IDs cannot decrease or repeat. |
| 34 | +- **Atomic updates**: All rows for a given ID of a given table must appear in the same update. |
| 35 | +- **Shared keys**: Tables must have common key columns for grouping. |
| 36 | + |
| 37 | +## `SyncTableFilter` |
| 38 | + |
| 39 | +`SyncTableFilter` synchronizes multiple peer tables by showing only rows where all tables have the same minimum ID for each key. |
| 40 | + |
| 41 | +### How it works |
| 42 | + |
| 43 | +For each key, the filter identifies the minimum ID value across all input tables. Only rows with that minimum ID are passed through. When all tables advance to the next ID, the filter removes the old ID's rows and adds the new ID's rows. |
| 44 | + |
| 45 | +### Example |
| 46 | + |
| 47 | +This example synchronizes three tables that share `Symbol` as a key and use `SeqNum` as the ID: |
| 48 | + |
| 49 | +```python order=null |
| 50 | +import jpy |
| 51 | +from deephaven import new_table |
| 52 | +from deephaven.column import string_col, long_col, double_col |
| 53 | + |
| 54 | +SyncTableFilterBuilder = jpy.get_type( |
| 55 | + "io.deephaven.engine.table.impl.util.SyncTableFilter$Builder" |
| 56 | +) |
| 57 | + |
| 58 | +price_data = new_table( |
| 59 | + [ |
| 60 | + string_col("Symbol", ["AAPL", "AAPL", "AAPL", "GOOGL", "GOOGL"]), |
| 61 | + long_col("SeqNum", [1, 2, 3, 1, 2]), |
| 62 | + double_col("Price", [150.0, 151.0, 152.0, 2800.0, 2805.0]), |
| 63 | + ] |
| 64 | +) |
| 65 | + |
| 66 | +volume_data = new_table( |
| 67 | + [ |
| 68 | + string_col("Symbol", ["AAPL", "AAPL", "GOOGL", "GOOGL"]), |
| 69 | + long_col("SeqNum", [1, 2, 1, 2]), |
| 70 | + long_col("Volume", [1000000, 1100000, 500000, 520000]), |
| 71 | + ] |
| 72 | +) |
| 73 | + |
| 74 | +bid_ask_data = new_table( |
| 75 | + [ |
| 76 | + string_col("Symbol", ["AAPL", "AAPL", "GOOGL"]), |
| 77 | + long_col("SeqNum", [1, 2, 1]), |
| 78 | + double_col("Bid", [149.95, 150.95, 2799.50]), |
| 79 | + double_col("Ask", [150.05, 151.05, 2800.50]), |
| 80 | + ] |
| 81 | +) |
| 82 | + |
| 83 | +builder = SyncTableFilterBuilder("SeqNum", "Symbol") |
| 84 | +builder.addTable("prices", price_data.j_table) |
| 85 | +builder.addTable("volumes", volume_data.j_table) |
| 86 | +builder.addTable("bidAsk", bid_ask_data.j_table) |
| 87 | + |
| 88 | +result = builder.build() |
| 89 | + |
| 90 | +synced_prices = result.get("prices") |
| 91 | +synced_volumes = result.get("volumes") |
| 92 | +synced_bid_ask = result.get("bidAsk") |
| 93 | +``` |
| 94 | + |
| 95 | +In this example: |
| 96 | + |
| 97 | +- For `AAPL`, all three tables have `SeqNum` 1 and 2, so those rows appear in the synchronized results. |
| 98 | +- For `AAPL`, only `price_data` has `SeqNum` 3, so that row is filtered out. |
| 99 | +- For `GOOGL`, only `bid_ask_data` is missing `SeqNum` 2, so only rows with `SeqNum` 1 appear. |
| 100 | +- When `bid_ask_data` receives `SeqNum` 2 for `GOOGL`, the filter will advance to show those rows. |
| 101 | + |
| 102 | +### API |
| 103 | + |
| 104 | +Create a builder with the ID column name and key column names: |
| 105 | + |
| 106 | +```python syntax |
| 107 | +builder = SyncTableFilterBuilder(id_column, key_column1, key_column2, ...) |
| 108 | +``` |
| 109 | + |
| 110 | +Add each table with a unique name: |
| 111 | + |
| 112 | +```python syntax |
| 113 | +builder.addTable(table_name, table.j_table) |
| 114 | +``` |
| 115 | + |
| 116 | +Build and retrieve the synchronized tables: |
| 117 | + |
| 118 | +```python syntax |
| 119 | +result = builder.build() |
| 120 | +synced_table = result.get(table_name) |
| 121 | +``` |
| 122 | + |
| 123 | +## `LeaderTableFilter` |
| 124 | + |
| 125 | +`LeaderTableFilter` synchronizes multiple tables using a leader-follower pattern. The leader table contains ID columns that specify which rows from each follower table to show. |
| 126 | + |
| 127 | +### How it works |
| 128 | + |
| 129 | +The leader table contains one ID column for each follower table. When the leader table has a row with specific ID values, the filter shows the corresponding rows from each follower table that match those IDs. |
| 130 | + |
| 131 | +### Example |
| 132 | + |
| 133 | +This example uses a synchronization log as the leader table: |
| 134 | + |
| 135 | +```python order=null |
| 136 | +import jpy |
| 137 | +from deephaven import new_table |
| 138 | +from deephaven.column import string_col, long_col, double_col |
| 139 | + |
| 140 | +LeaderTableFilterBuilder = jpy.get_type( |
| 141 | + "io.deephaven.engine.util.LeaderTableFilter$TableBuilder" |
| 142 | +) |
| 143 | + |
| 144 | +sync_log = new_table( |
| 145 | + [ |
| 146 | + string_col("Client", ["ClientA", "ClientA", "ClientB"]), |
| 147 | + string_col("Session", ["S1", "S1", "S2"]), |
| 148 | + long_col("TradeId", [100, 101, 200]), |
| 149 | + long_col("MessageId", [1, 2, 5]), |
| 150 | + ] |
| 151 | +) |
| 152 | + |
| 153 | +trade_log = new_table( |
| 154 | + [ |
| 155 | + string_col("Client", ["ClientA", "ClientA", "ClientA", "ClientB"]), |
| 156 | + string_col("SessionId", ["S1", "S1", "S1", "S2"]), |
| 157 | + long_col("Id", [100, 101, 102, 200]), |
| 158 | + string_col("Symbol", ["AAPL", "GOOGL", "MSFT", "TSLA"]), |
| 159 | + double_col("Quantity", [100.0, 50.0, 75.0, 200.0]), |
| 160 | + ] |
| 161 | +) |
| 162 | + |
| 163 | +message_log = new_table( |
| 164 | + [ |
| 165 | + string_col("Client", ["ClientA", "ClientA", "ClientA", "ClientB", "ClientB"]), |
| 166 | + string_col("SessionId", ["S1", "S1", "S1", "S2", "S2"]), |
| 167 | + long_col("MsgId", [1, 2, 3, 5, 6]), |
| 168 | + string_col( |
| 169 | + "Message", |
| 170 | + [ |
| 171 | + "Order placed", |
| 172 | + "Order filled", |
| 173 | + "Order confirmed", |
| 174 | + "Trade executed", |
| 175 | + "Settlement", |
| 176 | + ], |
| 177 | + ), |
| 178 | + ] |
| 179 | +) |
| 180 | + |
| 181 | +builder = LeaderTableFilterBuilder(sync_log.j_table, "Client", "Session") |
| 182 | +builder.addTable("trades", trade_log.j_table, "TradeId=Id", "Client", "SessionId") |
| 183 | +builder.addTable( |
| 184 | + "messages", message_log.j_table, "MessageId=MsgId", "Client", "SessionId" |
| 185 | +) |
| 186 | + |
| 187 | +result = builder.build() |
| 188 | + |
| 189 | +filtered_leader = result.getLeader() |
| 190 | +filtered_trades = result.get("trades") |
| 191 | +filtered_messages = result.get("messages") |
| 192 | +``` |
| 193 | + |
| 194 | +In this example: |
| 195 | + |
| 196 | +- The `sync_log` leader table controls which trades and messages appear. |
| 197 | +- For `ClientA/S1`, the leader shows `TradeId` 100 and 101, and `MessageId` 1 and 2. |
| 198 | +- Even though `trade_log` has `Id` 102 and `message_log` has `MsgId` 3, they don't appear because the leader hasn't referenced them yet. |
| 199 | +- For `ClientB/S2`, only trade 200 and message 5 appear. |
| 200 | + |
| 201 | +### API |
| 202 | + |
| 203 | +Create a builder with the leader table and key columns: |
| 204 | + |
| 205 | +```python syntax |
| 206 | +builder = LeaderTableFilterBuilder(leader_table.j_table, key_column1, key_column2, ...) |
| 207 | +``` |
| 208 | + |
| 209 | +Add each follower table with: |
| 210 | + |
| 211 | +- A unique name |
| 212 | +- The table reference |
| 213 | +- ID column mapping (format: `"leaderIdColumn=followerIdColumn"`) |
| 214 | +- Key columns in the follower table (must match leader key columns in type) |
| 215 | + |
| 216 | +```python syntax |
| 217 | +builder.addTable( |
| 218 | + table_name, |
| 219 | + table.j_table, |
| 220 | + "leaderIdCol=followerIdCol", |
| 221 | + follower_key_col1, |
| 222 | + follower_key_col2, |
| 223 | + ..., |
| 224 | +) |
| 225 | +``` |
| 226 | + |
| 227 | +Build and retrieve the synchronized tables: |
| 228 | + |
| 229 | +```python syntax |
| 230 | +result = builder.build() |
| 231 | +filtered_leader = result.getLeader() |
| 232 | +filtered_follower = result.get(table_name) |
| 233 | +``` |
| 234 | + |
| 235 | +### Partitioned table variant |
| 236 | + |
| 237 | +`LeaderTableFilter.PartitionedTableBuilder` works with partitioned tables. Access it via jpy: |
| 238 | + |
| 239 | +```python syntax |
| 240 | +PartitionedTableBuilder = jpy.get_type( |
| 241 | + "io.deephaven.engine.util.LeaderTableFilter$PartitionedTableBuilder" |
| 242 | +) |
| 243 | +builder = PartitionedTableBuilder(leader_partitioned_table.j_partitioned_table) |
| 244 | +builder.addTable( |
| 245 | + name, follower_partitioned_table.j_partitioned_table, "leaderIdCol=followerIdCol" |
| 246 | +) |
| 247 | +result = builder.build() |
| 248 | +``` |
| 249 | + |
| 250 | +Requirements: |
| 251 | + |
| 252 | +- All partitioned tables have the same number of key columns. |
| 253 | +- Key columns have compatible types. |
| 254 | +- Key columns are joined in order. |
| 255 | +- Constituent tables within each partition are add-only. |
| 256 | + |
| 257 | +## Related documentation |
| 258 | + |
| 259 | +- [Filters](./use-filters.md) |
| 260 | +- [Partitioned tables](./partitioned-tables.md) |
| 261 | +- [`SyncTableFilter` Javadoc](https://deephaven.io/core/javadoc/io/deephaven/engine/table/impl/util/SyncTableFilter.html) |
| 262 | +- [`LeaderTableFilter` Javadoc](https://deephaven.io/core/javadoc/io/deephaven/engine/util/LeaderTableFilter.html) |
0 commit comments