You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: docs/joins.md
+58-3Lines changed: 58 additions & 3 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -1,6 +1,6 @@
1
1
# Joins
2
2
## Join as-of
3
-
3
+
> _New in [3.15.0](https://github.com/quixio/quix-streams/releases/tag/v3.15.0)_
4
4
5
5
Use `StreamingDataFrame.join_asof()` to join two topics into a new stream where each left record
6
6
is merged with the right record with the same key whose timestamp is less than or equal to the left timestamp.
@@ -135,12 +135,67 @@ Adjust `grace_ms` based on the expected time gap between the left and the right
135
135
- As-of join preserves headers only for the left dataframe.
136
136
If you need headers of the right side records, consider adding them to the value.
137
137
138
-
## Message ordering between partitions
139
-
Joins use [`StreamingDataFrame.concat()`](concatenating.md) under the hood, which means that the application's internal consumer goes into a special "buffered" mode
138
+
###Message ordering between partitions
139
+
Streaming joins use [`StreamingDataFrame.concat()`](concatenating.md) under the hood, which means that the application's internal consumer goes into a special "buffered" mode
140
140
when the join is used.
141
141
142
142
In this mode, it buffers messages per partition in order to process them in the timestamp order between different topics.
143
143
Timestamp alignment is effective only for the partitions **with the same numbers**: partition zero is aligned with other zero partitions, but not with partition one.
144
144
145
145
Note that message ordering works only when the messages are consumed from the topics.
146
146
If you change timestamps of the record during processing, they will be processed in the original order.
147
+
148
+
## Lookup join
149
+
150
+
> _New in [3.16.0](https://github.com/quixio/quix-streams/releases/tag/v3.16.0)_
151
+
152
+
!!! warning
153
+
This is an experimental feature; the API may change in future releases.
154
+
155
+
156
+
`StreamingDataFrame.lookup_join()` is a special type of join that allows you to enrich records in a streaming dataframe with the data from external systems.
157
+
158
+
You can use it to enriching streaming data with configuration or reference data from an external source, like a database.
159
+
160
+
### Example
161
+
162
+
To perform a lookup join, you need:
163
+
164
+
1. A subclass of [quixstreams.dataframe.joins.lookups.base.BaseLookup](api-reference/dataframe.md#baselookup) to query the external source and cache the results when necessary.
165
+
2. A subclass of [quixstreams.dataframe.joins.lookups.base.BaseField](api-reference/dataframe.md#basefield) to define how the data is extracted from the result.
166
+
3. To pass the lookup and the fields to the `StreamingDataFrame.lookup_join`.
167
+
168
+
169
+
See [SQLiteLookup](api-reference/dataframe.md#sqlitelookup) and [SQLiteLookupField](api-reference/dataframe.md#sqlitelookupfield) for the reference implementation.
170
+
171
+
Here is an example of lookup join with a SQLite database:
172
+
173
+
```python
174
+
from quixstreams import Application
175
+
from quixstreams.dataframe.joins.lookups import SQLiteLookup, SQLiteLookupField
176
+
177
+
app = Application(...)
178
+
179
+
# An implementation of BaseLookup for SQLite
180
+
lookup = SQLiteLookup(path="db.db")
181
+
182
+
sdf = app.dataframe(app.topic("input"))
183
+
184
+
sdf = sdf.lookup_join(
185
+
lookup,
186
+
on="column", # A column in StreamingDataFrame to join on
- For each record in the dataframe, a user-defined lookup strategy (a subclass of `BaseLookup`) is called with a mapping of field names to field definitions (subclasses of `BaseField`).
200
+
- The lookup strategy fetches or computes enrichment data based on the provided key and fields, and updates the record in-place.
201
+
- The enrichment can come from external sources such as configuration topics, databases, or in-memory data.
0 commit comments