Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ utilizing timestreams.

This post is the first in a series introducing Kaskada, an open-source event processing engine designed around the
timestream abstraction. The full series will include (1) this introduction to the timestream abstraction, (2) [how Kaskada
builds an expressive temporal query language on the timestream abstraction][timestreams_part2], (3) how the timestream data
model enables Kaskada to efficiently execute temporal queries, and (4) how timestreams allow Kaskada to execute
incrementally over event streams.
builds an expressive temporal query language on the timestream abstraction][timestreams_part2], and (3) how the timestream data
model enables Kaskada to efficiently execute temporal queries.

## Rising Abstractions in Stream Processing

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,13 @@ The corresponding query is shown below written in two equivalent ways.
The first emphasizes the sum being applied to the purchases while the second emphasizes the chain of operations we have composed – “take the purchases then apply the sum”.
In the remainder of this post we’ll use the latter since it better matches the way we tend to think about processing timestreams.

```fenl
sum(Purchases.amount)

# OR

Purchases.amount
| sum()
```{.python}
Purchases.col("amount").sum()
TODO: FRAZ - should these include '.to_pandas()' or '.preview()'/other?
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.

```

Writing simple temporal queries with timestreams was as easy as SQL.
Writing simple temporal queries with timestreams is as easy as SQL.
Processing events in order is an intuitive way of operating over time.
Of course, aggregating over all events is only one way we may wish to aggregate things.
In the next example, we’ll see how to extend this query using temporal windows to focus on recent events.
Expand All @@ -64,9 +61,10 @@ And this intuition is exactly how these kinds of temporal windows work with time
The temporal query is shown below.
It clearly indicates the intent we expressed above – take the purchases and aggregate them since the beginning of each month.

```fenl
Purchases.amount
| sum(window=since(monthly()))
```{.python}
import kaskada as kd

Purchases.col("amount").sum(window=kd.windows.Since.monthly())
```

Since time is inherently part of every timestream, every aggregation is able to operate within temporal windows.
Expand All @@ -88,9 +86,11 @@ The first thing we’ll do is compute the number of page views since the last pu
In the previous example, we windowed since the start of the month.
But there is nothing special about the timestream defining the start of a month – we can window with any other timestream.

```fenl
PageViews
| count(window=since(is_valid(Purchases)))
```{.python}
import kaskada as kd

valid_purchase = Purchases.is_not_null()
PageViews.count(kd.windows.Since(valid_purchase))
```

![timestream showing data-defined windows][data_windows_1]
Expand All @@ -102,11 +102,12 @@ Since every timestream is ordered by time and grouped by entity, every timestrea
The previous step gave us the page views since the last purchase.
But it was a continuous timestream that increased at each page view until the next purchase.
We’re after a discrete timestream with a single value at the time of each purchase representing the page views since the previous purchase.
To do this, we use the [`when`]({% fenl_catalog when %}) operation which allows observing – and interpolating if needed – a timestream at specific points in time.
// TODO: Link to new python docs
To do this, we use the [`filter`]({% fenl_catalog when %}) operation which allows observing – and interpolating if needed – a timestream at specific points in time.

![Observing a timestream at specific points in time using when][data_windows_2]

The `when` operation can be used anywhere in a temporal query and allows for filtering points which are present in the output – or passed to later aggregations.
The `filter` operation can be used anywhere in a temporal query and allows for filtering points which are present in the output – or passed to later aggregations.

# Nested Aggregation
With the number of page views between purchases computed, we are now able to compute the average of this value.
Expand All @@ -119,11 +120,11 @@ The complete query is shown below.
We see the steps match the logical steps we talked through above.
Even though the logic was reasonably complex, the query is relatively straightforward and captures our idea for what we want to compute – hard questions are possible.

```fenl
PageViews
| count(window=since(is_valid(Purchases)))
| when(is_valid(Purchases))
| mean()
```{.python}
import kaskada as kd

valid_purchase = Purchases.is_not_null()
PageViews.count(kd.windows.Since(valid_purchase)).filter(valid_purchase).mean()
```

This kind of query can be generalized to analyze a variety of patterns in the page view activity.
Expand Down Expand Up @@ -166,12 +167,11 @@ This uses the [`lookup`]({% fenl_catalog lookup %}) operation.
## Putting it together
Putting it all together we use the lookup with a [`min`]({% fenl_catalog min %}) aggregation to determine the minimum average rating of items purchased by each user.

```fenl
Reviews.score
| with_key(Reviews.item)
| mean()
| lookup(Purchases.item)
| min()
```{.python}
review_item = Reviews.col("item")
purchase_item = Purchases.col("item")

Reviews.col("score").with_key(review_item).mean().lookup(purchase_item).min()
```

This pattern of re-grouping to a different entity, performing an aggregation, and looking up the value (or shifting back to the original entities) is common in data-processing tasks.
Expand Down