Skip to content

Commit d9a4b3c

Browse files
author
ruf-io
committed
First
0 parents  commit d9a4b3c

File tree

9 files changed

+645
-0
lines changed

9 files changed

+645
-0
lines changed

README.md

Lines changed: 339 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,339 @@
1+
# Redpanda + Materialize Demo
2+
3+
This is a variation of the [standard ecommerce demo](../ecommerce), illustrating how it looks to switch from Kafka to Redpanda.
4+
5+
![Shop demo infra with redpanda](demo.png)
6+
7+
**NOTE:** For context on what is happening in the demo, and initial setup instructions, see the [README](https://github.com/MaterializeInc/ecommerce-demo#readme).
8+
9+
## Running Redpanda + Materialize Stack
10+
11+
You'll need to have [docker and docker-compose installed](https://materialize.com/docs/third-party/docker) before getting started.
12+
13+
1. Clone this repo and navigate to the directory by running:
14+
15+
```shell session
16+
git clone https://github.com/MaterializeInc/demos.git
17+
cd demos/ecommerce-redpanda
18+
```
19+
20+
2. Bring up the Docker Compose containers in the background.
21+
22+
```shell session
23+
docker-compose up -d
24+
```
25+
26+
**This may take several minutes to complete the first time you run it.** If all goes well, you'll have everything running in their own containers, with Debezium configured to ship changes from MySQL into Redpanda.
27+
28+
3. Confirm that everything is running as expected:
29+
30+
```shell session
31+
docker-compose ps
32+
```
33+
34+
4. Log in to MySQL to confirm that tables are created and seeded:
35+
36+
```shell session
37+
docker-compose exec mysql bash -c 'mysql -umysqluser -pmysqlpw shop'
38+
39+
SHOW TABLES;
40+
41+
SELECT * FROM purchases LIMIT 1;
42+
```
43+
44+
5. Exec in to the redpanda container to look around using redpanda's amazing [rpk]() CLI.
45+
46+
```shell session
47+
docker-compose exec redpanda /bin/bash
48+
49+
rpk debug info
50+
51+
rpk topic list
52+
53+
rpk topic create dd_flagged_profiles
54+
55+
rpk topic consume pageviews
56+
```
57+
58+
You should see a live feed of JSON formatted pageview kafka messages:
59+
60+
```
61+
{
62+
"key": "3290",
63+
"message": "{\"user_id\": 3290, \"url\": \"/products/257\", \"channel\": \"social\", \"received_at\": 1634651213}",
64+
"partition": 0,
65+
"offset": 21529,
66+
"size": 89,
67+
"timestamp": "2021-10-19T13:46:53.15Z"
68+
}
69+
```
70+
71+
6. Launch the Materialize CLI.
72+
73+
```shell session
74+
docker-compose run cli
75+
```
76+
77+
_(This is just a shortcut to a docker container with postgres-client pre-installed, if you already have psql you could run `psql -U materialize -h localhost -p 6875 materialize`)_
78+
79+
7. Now that you're in the Materialize CLI, define all of the tables in `mysql.shop` as Kafka sources:
80+
81+
```sql
82+
CREATE SOURCE purchases
83+
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'mysql.shop.purchases'
84+
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://redpanda:8081'
85+
ENVELOPE DEBEZIUM;
86+
87+
CREATE SOURCE items
88+
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'mysql.shop.items'
89+
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://redpanda:8081'
90+
ENVELOPE DEBEZIUM;
91+
92+
CREATE SOURCE users
93+
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'mysql.shop.users'
94+
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://redpanda:8081'
95+
ENVELOPE DEBEZIUM;
96+
97+
CREATE SOURCE json_pageviews
98+
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'pageviews'
99+
FORMAT BYTES;
100+
```
101+
102+
Because the first three sources are pulling message schema data from the registry, materialize knows the column types to use for each attribute. The last source is a JSON-formatted source for the pageviews.
103+
104+
Now if you run `SHOW SOURCES;` in the CLI, you should see the four sources we created:
105+
106+
```
107+
materialize=> SHOW SOURCES;
108+
name
109+
----------------
110+
items
111+
json_pageviews
112+
purchases
113+
users
114+
(4 rows)
115+
116+
materialize=>
117+
```
118+
119+
8. Next we will create a NON-materialized View, you can almost think of this as a reusable template to be used in other materialized view.
120+
121+
```sql
122+
CREATE VIEW pageview_stg AS
123+
SELECT
124+
*,
125+
regexp_match(url, '/(products|profiles)/')[1] AS pageview_type,
126+
(regexp_match(url, '/(?:products|profiles)/(\d+)')[1])::INT AS target_id
127+
FROM (
128+
SELECT
129+
(data->'user_id')::INT AS user_id,
130+
data->>'url' AS url,
131+
data->>'channel' AS channel,
132+
(data->>'received_at')::double AS received_at
133+
FROM (
134+
SELECT CAST(data AS jsonb) AS data
135+
FROM (
136+
SELECT convert_from(data, 'utf8') AS data
137+
FROM json_pageviews
138+
)
139+
)
140+
);
141+
```
142+
143+
9. **Analytical Views:** Let's create a couple analytical views to get a feel for how it works.
144+
145+
Start simple with a materialized view that aggregates purchase stats by item:
146+
147+
```sql
148+
CREATE MATERIALIZED VIEW purchases_by_item AS
149+
SELECT
150+
item_id,
151+
SUM(purchase_price) as revenue,
152+
COUNT(id) AS orders,
153+
SUM(quantity) AS items_sold
154+
FROM purchases GROUP BY 1;
155+
```
156+
157+
and something similar that uses our `pageview_stg` static view to quickly aggregate pageviews by item:
158+
159+
```sql
160+
CREATE MATERIALIZED VIEW pageviews_by_item AS
161+
SELECT
162+
target_id as item_id,
163+
COUNT(*) AS pageviews
164+
FROM pageview_stg
165+
WHERE pageview_type = 'products'
166+
GROUP BY 1;
167+
```
168+
169+
and now let's show how you can combine and stack views by creating a single view that brings everything together:
170+
171+
```sql
172+
CREATE MATERIALIZED VIEW item_summary AS
173+
SELECT
174+
items.name,
175+
items.category,
176+
SUM(purchases_by_item.items_sold) as items_sold,
177+
SUM(purchases_by_item.orders) as orders,
178+
SUM(purchases_by_item.revenue) as revenue,
179+
SUM(pageviews_by_item.pageviews) as pageviews,
180+
SUM(purchases_by_item.orders) / SUM(pageviews_by_item.pageviews)::FLOAT AS conversion_rate
181+
FROM items
182+
JOIN purchases_by_item ON purchases_by_item.item_id = items.id
183+
JOIN pageviews_by_item ON pageviews_by_item.item_id = items.id
184+
GROUP BY 1, 2;
185+
```
186+
187+
We can check that it's working by querying the view:
188+
189+
```sql
190+
SELECT * FROM item_summary ORDER BY pageviews DESC LIMIT 5;
191+
```
192+
193+
Or we can even check that it's incrementally updating by exiting out of materialize and running a watch command on that query:
194+
195+
```bash session
196+
watch -n1 "psql -c 'SELECT * FROM item_summary ORDER BY pageviews DESC LIMIT 5;' -U materialize -h localhost -p 6875"
197+
```
198+
199+
10. **Views for User-Facing Data:**
200+
201+
Redpanda will often be used in building rich data-intensive applications, let's try creating a view meant to power something like the "Who has viewed your profile" feature on Linkedin:
202+
203+
User views of other user profiles
204+
205+
```sql
206+
CREATE MATERIALIZED VIEW profile_views_per_minute_last_10 AS
207+
SELECT
208+
target_id as user_id,
209+
date_trunc('minute', to_timestamp(received_at)) as received_at_minute,
210+
COUNT(*) as pageviews
211+
FROM pageview_stg
212+
WHERE
213+
pageview_type = 'profiles' AND
214+
mz_logical_timestamp() < (received_at*1000 + 600000)::numeric
215+
GROUP BY 1, 2;
216+
```
217+
218+
We can check it with:
219+
220+
```sql
221+
SELECT * FROM profile_views_per_minute_last_10 WHERE user_id = 10;
222+
```
223+
224+
and confirm that this is the data we could use to populate a "profile views" graph for user `10`.
225+
226+
Next let's use a `LATERAL` join to get the last five users to have viewed each profile:
227+
228+
```sql
229+
CREATE MATERIALIZED VIEW profile_views AS
230+
SELECT
231+
target_id AS owner_id,
232+
user_id AS viewer_id,
233+
received_at AS received_at
234+
FROM (SELECT DISTINCT target_id FROM pageview_stg) grp,
235+
LATERAL (
236+
SELECT user_id, received_at FROM pageview_stg
237+
WHERE target_id = grp.target_id
238+
ORDER BY received_at DESC LIMIT 10
239+
);
240+
```
241+
242+
```sql
243+
CREATE MATERIALIZED VIEW profile_views_enriched AS
244+
SELECT
245+
owner.id as owner_id,
246+
owner.email as owner_email,
247+
viewers.id as viewer_id,
248+
viewers.email as viewer_email,
249+
profile_views.received_at
250+
FROM profile_views
251+
JOIN users owner ON profile_views.owner_id = owner.id
252+
JOIN users viewers ON profile_views.viewer_id = viewers.id;
253+
```
254+
255+
We can test this by checking on profile views for a specific user:
256+
257+
```sql
258+
SELECT * FROM profile_views_enriched WHERE owner_id=25 ORDER BY received_at DESC;
259+
```
260+
261+
11. **Demand-driven query:** Since redpanda has such a nice HTTP interface, it makes it easier to extend without writing lots of glue code and services. Here's an example where we use pandaproxy to do a ["demand-driven query"]().
262+
263+
Add a message to the `dd_flagged_profiles` topic using curl and pandaproxy:
264+
265+
```curl
266+
curl -s \
267+
-X POST \
268+
"http://localhost:8082/topics/dd_flagged_profiles" \
269+
-H "Content-Type: application/vnd.kafka.json.v2+json" \
270+
-d '{
271+
"records":[{
272+
"key":"0",
273+
"value":"25",
274+
"partition":0
275+
}]
276+
}'
277+
```
278+
279+
Now let's materialize that data and join the flagged_profile id to a much larger dataset.
280+
281+
```sql
282+
CREATE MATERIALIZED SOURCE dd_flagged_profiles
283+
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'dd_flagged_profiles'
284+
FORMAT TEXT
285+
ENVELOPE UPSERT;
286+
287+
CREATE MATERIALIZED VIEW dd_flagged_profile_view AS
288+
SELECT pageview_stg.*
289+
FROM dd_flagged_profiles
290+
JOIN pageview_stg ON user_id = btrim(text, '"')::INT;
291+
```
292+
293+
This pattern is useful for scenarios where materializing all the data (without filtering down to certain profiles) puts too much of a memory demand on the system.
294+
295+
12. Sink data back out to Redpanda:
296+
297+
Let's create a view that flags "high-value" users that have spent $10k or more total.
298+
299+
```sql
300+
CREATE MATERIALIZED VIEW high_value_users AS
301+
SELECT
302+
users.id,
303+
users.email,
304+
SUM(purchase_price * quantity)::int AS lifetime_value,
305+
COUNT(*) as purchases
306+
FROM users
307+
JOIN purchases ON purchases.user_id = users.id
308+
GROUP BY 1,2
309+
HAVING SUM(purchase_price * quantity) > 10000;
310+
```
311+
312+
and then a sink to stream updates to this view back out to redpanda
313+
314+
```sql
315+
CREATE SINK high_value_users_sink
316+
FROM high_value_users
317+
INTO KAFKA BROKER 'redpanda:9092' TOPIC 'high-value-users-sink'
318+
WITH (reuse_topic=true, consistency_topic='high-value-users-sink-consistency')
319+
FORMAT AVRO USING
320+
CONFLUENT SCHEMA REGISTRY 'http://redpanda:8081';
321+
```
322+
323+
This is a bit more complex because it is an `exactly-once` sink. This means that across materialize restarts, it will never output the same update more than once.
324+
325+
We won't be able to preview the results with `rpk` because it's AVRO formatted. But we can actually stream it BACK into Materialize to confirm the format!
326+
327+
```sql
328+
CREATE MATERIALIZED SOURCE hvu_test
329+
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'high-value-users-sink'
330+
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://redpanda:8081';
331+
332+
SELECT * FROM hvu_test LIMIT 2;
333+
```
334+
335+
## Conclusion
336+
337+
You now have materialize doing real-time materialized views on a changefeed from a database and pageview events from Redpanda. You have complex multi-layer views doing JOIN's and aggregations in order to distill the raw data into a form that's useful for downstream applications. In metabase, you have the ability to create dashboards and reports using the real-time data.
338+
339+
You have a lot of infrastructure running in docker containers, don't forget to run `docker-compose down` to shut everything down!

0 commit comments

Comments
 (0)