-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy path2023-05-25-introducing-timestreams-part-2.qmd
More file actions
216 lines (158 loc) · 15.1 KB
/
2023-05-25-introducing-timestreams-part-2.qmd
File metadata and controls
216 lines (158 loc) · 15.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
---
title: "Introducing Timestreams"
subtitle: "Part 2: Declarative, Temporal Queries"
image: /_static/images/blog/introducing-timestreams/part-2_banner.png
author: Ben Chambers and Therapon Skoteiniotis
date: 2023-May-25
draft: true
---
Understanding time-based patterns in your data can unlock valuable insights, but expressing temporal queries can often be a challenging task.
Imagine being able to effortlessly analyze your users' behavior over time, to perform precise temporal joins, or to examine patterns of activity between different events, all while being intuitive and naturally handling time.
This is where the concept of timestreams, a high-level abstraction for working with temporal data, comes to your aid.
In this blog post, we are diving deep into the world of timestreams.
We'll demonstrate how they make expressing temporal queries on events, and importantly, between events, not just easy but also intuitive.
We previously [introduced the timestream][timestreams_part1] a high-level abstraction for working with temporal data.
timestreams organize event-based data by time and entity, making it possible to reason about the context around events.
Express temporal queries using timestreams have several benefits:
- **Intuitive**: Since timestreams are ordered by time, it is natural for queries to operate in order as well. As time progresses, additional events – input – occur and are reflected in the output of the query. This way of thinking about computations – as progressing through time – is intuitive because it matches the way we observe events.
- **Declarative**: Temporal operations – like windowing and shifting – are clearly declared when working with timestreams, since time is part of the abstraction.
- **Composable**: Every operation takes timestreams and produces timestreams, meaning that operations can be chained as necessary to produce the intended results.
In the next sections, we are going to dissect four real-life examples demonstrating the immense benefits of timestreams. We'll start with a simple query for an aggregation and progressively tackle more intricate temporal windows, data-dependent windows, and temporally correct joins. By the end, you'll be left with a deep understanding of how timestreams make writing simple temporal queries as straightforward as SQL and how they empower us to take on the more challenging questions.
# Total Spend: Cumulative Aggregation
timestreams support everything you can do in SQL, intuitively extended to operate over time.
Before looking at some new capabilities for sophisticated temporal queries, let’s look at something simple – an aggregation.
Writing simple queries is easy: in fact, since timestreams are ordered by time and grouped by entity, they can be even easier than in SQL!
Consider the question, _how much did each user spend_?
When thinking about this over events, it is natural to process the purchases in order, updating the amount each user has spent over time.
The result is a _cumulative_ sum producing a _continuous timestream.
![Timestream visualization of how much each user spent][aggregation]
The corresponding query is shown below written in two equivalent ways.
The first emphasizes the sum being applied to the purchases while the second emphasizes the chain of operations we have composed – “take the purchases then apply the sum”.
In the remainder of this post we’ll use the latter since it better matches the way we tend to think about processing timestreams.
```{.python}
Purchases.col("amount").sum()
TODO: FRAZ - should these include '.to_pandas()' or '.preview()'/other?
```
Writing simple temporal queries with timestreams is as easy as SQL.
Processing events in order is an intuitive way of operating over time.
Of course, aggregating over all events is only one way we may wish to aggregate things.
In the next example, we’ll see how to extend this query using temporal windows to focus on recent events.
# Monthly Spend: Temporal Windowing
When thinking about temporal queries, it’s very natural to ask questions about the recent past – year-to-date or the last 30 days.
The intuition of processing events in order suggests answering the question “_how much has each user spent this month_” should just require resetting the value at the start of each month.
And this intuition is exactly how these kinds of temporal windows work with timestreams.
![Timestream visualization of how much each user spent this month][windowed]
The temporal query is shown below.
It clearly indicates the intent we expressed above – take the purchases and aggregate them since the beginning of each month.
```{.python}
import kaskada as kd
Purchases.col("amount").sum(window=kd.windows.Since.monthly())
```
Since time is inherently part of every timestream, every aggregation is able to operate within temporal windows.
In the next example we’ll see how easy it is to work with more complicated queries, including aggregations with more sophisticated windows.
# Page Views Between Purchases
Not all windows are defined in terms of time.
It is often useful to use events to determine the windows used for an aggregation.
Additionally, while all the examples so far have operated on a single type of event – `Purchases` – examining the patterns of activity between different events is critical for identifying cause and effect relationships.
In this example we’ll take advantage of timestreams to declaratively express queries using data-defined windows and multiple types of events.
We’ll also filter an intermediate timestream to specific points to control the values used in later steps while composing the query.
The question we’ll answer is “_what is the average number of page-views between each purchase for each user_?”.
We’ll first compute the page views since the last purchase, observe them at the time of each purchase, and then take the average.
## Data-Defined Windowing
The first thing we’ll do is compute the number of page views since the last purchase.
In the previous example, we windowed since the start of the month.
But there is nothing special about the timestream defining the start of a month – we can window with any other timestream.
```{.python}
import kaskada as kd
valid_purchase = Purchases.is_not_null()
PageViews.count(kd.windows.Since(valid_purchase))
```
![timestream showing data-defined windows][data_windows_1]
In addition to data-defined windowing, we see how to work with multiple types of events.
Since every timestream is ordered by time and grouped by entity, every timestream can be lined up by time and joined on entity – _automatically_.
## Observing at specific times
The previous step gave us the page views since the last purchase.
But it was a continuous timestream that increased at each page view until the next purchase.
We’re after a discrete timestream with a single value at the time of each purchase representing the page views since the previous purchase.
// TODO: Link to new python docs
To do this, we use the [`filter`]({% fenl_catalog when %}) operation which allows observing – and interpolating if needed – a timestream at specific points in time.
![Observing a timestream at specific points in time using when][data_windows_2]
The `filter` operation can be used anywhere in a temporal query and allows for filtering points which are present in the output – or passed to later aggregations.
# Nested Aggregation
With the number of page views between purchases computed, we are now able to compute the average of this value.
All we need to do is use the [](`kaskada.Timestream.mean`) aggregation.
![Applying an outer aggregation to compute the average of observed page views][data_windows_3]
# Putting it together
The complete query is shown below.
We see the steps match the logical steps we talked through above.
Even though the logic was reasonably complex, the query is relatively straightforward and captures our idea for what we want to compute – hard questions are possible.
```{.python}
import kaskada as kd
valid_purchase = Purchases.is_not_null()
PageViews.count(kd.windows.Since(valid_purchase)).filter(valid_purchase).mean()
```
This kind of query can be generalized to analyze a variety of patterns in the page view activity.
Maybe we only want to look at the page views for the most-frequently viewed item rather than all items, believing that the user is becoming more focused on that item.
Maybe we want to window since purchases of the same item, rather than any purchase.
This query showed some ways timestreams enable complex temporal queries to be expressed:
1. Ordering allows windows to be defined by their delimiters – when they start and end – rather than having to compute a “window ID” from each value for grouping.
1. Ordering also allows multiple timestreams to be used within the same expression – in this case `PageViews` and `Purchases`.
1. Continuity allows values to be interpolated at arbitrary times and filtered using the `when` operation.
1. Composability allows the result of any operation to be used with later operations to express temporal questions. This allows complex questions to be expressed as a sequence of simple operations.
These capabilities allow identifying cause-and-effect patterns.
While it may be that a purchase _now_ causes me to make purchases _later_, other events often have a stronger relationship – for instance, running out of tape and buying more, or scheduling a camping trip and stocking up.
Being able to look at activity (`PageViews`) within a window defined by other events (`Purchases`) is important for understanding the relationship between those events.
# Minimum Review Score
We’ve already seen how timestreams allow working with multiple types of events associated with the same entity.
But it’s often necessary to work with multiple entities as well.
For instance, using information about the entire population to normalize values for each user.
Our final example will show how to work with multiple entities and perform a temporal join.
The final question we’ll answer is _“what is the minimum average product review (score) at time of each purchase?”_.
To do this, we’ll first work with reviews associated with each product to compute the average score, and then we’ll join each purchase with the corresponding average review.
## Changing entities
To start, we want to compute the average product review (score) for each item.
Since the reviews are currently grouped by user, we will need to re-group them by item, using the [`with_key`]({% fenl_catalog with_key %}) operation.
Once we’ve done that, we can use the `mean` aggregation we’ve already seen.
![timestreams computing the per-item average score][temporal_join_1]
## Lookup between entities
For each purchase (grouped by user) we want to look up the average review score of the corresponding item.
This uses the [`lookup`]({% fenl_catalog lookup %}) operation.
![timestreams using lookup to temporally join with the item score][temporal_join_2]
## Putting it together
Putting it all together we use the lookup with a [`min`]({% fenl_catalog min %}) aggregation to determine the minimum average rating of items purchased by each user.
```{.python}
review_item = Reviews.col("item")
purchase_item = Purchases.col("item")
Reviews.col("score").with_key(review_item).mean().lookup(purchase_item).min()
```
This pattern of re-grouping to a different entity, performing an aggregation, and looking up the value (or shifting back to the original entities) is common in data-processing tasks.
In this case, the resulting value was looked up and used directly.
In other cases it is useful for normalization – such as relating each user’s value to the average values in their city.
Ordering and grouping allow timestreams to clearly express operations between different entities.
The result of a lookup is from the point-in-time at which the lookup is performed.
This provides a temporally correct “as-of” join.
Performing a join _at the correct time_ is critical for computing training examples from the past that are comparable to feature values used when applying the model.
Similarly, it ensures that any dashboard, visualization, or analysis performed on the results of the query are actually correct as if they were looking at the values in the past, rather than using information that wasn’t available at that time.
# Conclusion
In this post, we've demonstrated the power of timestreams as a high-level abstraction for handling temporal data.
Through intuitive, declarative, and composable operations, we showed how timestreams enable efficient expression of temporal queries on events and between events.
With examples ranging from simple aggregations to sophisticated queries like data-dependent windows and temporally correct joins, we illustrated how timestream operations can be chained to produce intended results.
The potency of timestreams lies in their ability to easily express simple temporal questions and intuitively extend to complex temporal queries.
From total spend to minimum review score, we walked through four illustrative examples that highlight the capabilities of timestreams in temporal querying.
We explored cumulative aggregation, temporal windowing, and observed how data-defined windowing offers the ability to express complex temporal questions.
We also showed how timestreams facilitate multi-entity handling and temporal joins.
These examples show that with timestreams, you have a powerful tool to identify cause-and-effect patterns and compute training examples that are comparably valid when applying a model.
In the next post in this series, we delve further into the dynamics of temporal queries on timestreams.
We'll explore how these queries are efficiently executed by taking advantage of the properties of timestreams.
We encourage you to [Get started][getting_started] writing your own temporal queries today, and [join our Slack channel]({{< var slack-join-url >}}) for more discussions and insights on timestreams and other data processing topics.
Don't miss out on this opportunity to be a part of our growing data community.
Join us now and let's grow together!
[timestreams_part1]: 2023-05-09-introducing-timestreams-part-1.html
[getting_started]: /guide/quickstart.html
[aggregation]: /_static/images/blog/introducing-timestreams/aggregation.svg "timestreams showing purchases and sum of purchases"
[windowed]: /_static/images/blog/introducing-timestreams/windowed.svg "timestreams showing purchases and sum of purchases since start of the month"
[data_windows_1]: /_static/images/blog/introducing-timestreams/data_windows_1.svg "timestreams showing count of page views since last purchase"
[data_windows_2]: /_static/images/blog/introducing-timestreams/data_windows_2.svg "timestreams showing count of page views since last purchase observed at each purchase"
[data_windows_3]: /_static/images/blog/introducing-timestreams/data_windows_3.svg "timestreams showing average of the page view count between purchases"
[temporal_join_1]: /_static/images/blog/introducing-timestreams/temporal_join_1.svg "timestreams computing the average item review score"
[temporal_join_2]: /_static/images/blog/introducing-timestreams/temporal_join_2.svg "timestreams looking up the average review score for each purchase"