Skip to content

Commit c911d15

Browse files
author
Andy Hattemer
committed
fix dupes
1 parent 85d3d41 commit c911d15

File tree

12 files changed

+54
-414
lines changed

12 files changed

+54
-414
lines changed

demo.png

-130 KB
Binary file not shown.

loadgen/generate_load.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@
99
from mysql.connector import Error, connect
1010

1111
# CONFIG
12+
vendorSeedCount = 100
1213
userSeedCount = 10000
1314
itemSeedCount = 1000
14-
purchaseGenCount = 500000
15+
purchaseGenCount = 5000000
1516
purchaseGenEveryMS = 100
1617
pageviewMultiplier = 75 # Translates to 75x purchases, currently 750/sec or 65M/day
1718
itemInventoryMin = 1000
@@ -22,25 +23,24 @@
2223
mysqlPort = "3306"
2324
mysqlUser = "mysqluser"
2425
mysqlPass = "mysqlpw"
25-
kafkaHostPort = os.getenv("KAFKA_ADDR", "kafka:9092")
26+
kafkaHostPort = os.getenv("KAFKA_ADDR", "redpanda:9092")
2627
kafkaTopic = "pageviews"
2728
debeziumHostPort = "debezium:8083"
2829
channels = ["organic search", "paid search", "referral", "social", "display"]
2930
categories = ["widgets", "gadgets", "doodads", "clearance"]
3031

3132
# INSERT TEMPLATES
32-
item_insert = "INSERT INTO shop.items (name, category, price, inventory) VALUES ( %s, %s, %s, %s )"
33+
vendor_insert = "INSERT INTO shop.vendors (name) VALUES ( %s )"
34+
item_insert = "INSERT INTO shop.items (vendor_id, name, category, price, inventory) VALUES ( %s, %s, %s, %s, %s )"
3335
user_insert = "INSERT INTO shop.users (email, is_vip) VALUES ( %s, %s )"
3436
purchase_insert = "INSERT INTO shop.purchases (user_id, item_id, quantity, purchase_price) VALUES ( %s, %s, %s, %s )"
3537

36-
3738
# Initialize Kafka
3839
producer = KafkaProducer(
3940
bootstrap_servers=[kafkaHostPort],
4041
value_serializer=lambda x: json.dumps(x).encode("utf-8"),
4142
)
4243

43-
4444
def generatePageview(viewer_id, target_id, page_type):
4545
return {
4646
"user_id": viewer_id,
@@ -58,10 +58,22 @@ def generatePageview(viewer_id, target_id, page_type):
5858
) as connection:
5959
with connection.cursor() as cursor:
6060
print("Seeding data...")
61+
print(" - Vendors")
62+
cursor.executemany(
63+
vendor_insert,
64+
[
65+
(
66+
[barnum.create_company_name()]
67+
)
68+
for i in range(vendorSeedCount)
69+
],
70+
)
71+
print(" - Items")
6172
cursor.executemany(
6273
item_insert,
6374
[
6475
(
76+
random.randint(1, vendorSeedCount),
6577
barnum.create_nouns(),
6678
random.choice(categories),
6779
random.randint(itemPriceMin * 100, itemPriceMax * 100) / 100,

loadgen/loadgen/Dockerfile

Lines changed: 0 additions & 10 deletions
This file was deleted.

loadgen/loadgen/generate_load.py

Lines changed: 0 additions & 150 deletions
This file was deleted.

loadgen/loadgen/requirements.txt

Lines changed: 0 additions & 4 deletions
This file was deleted.

materialize/create.sql

Lines changed: 27 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
-- CREATE SOURCES
22

3+
CREATE SOURCE json_pageviews
4+
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'pageviews'
5+
FORMAT BYTES;
6+
37
CREATE SOURCE purchases
48
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'mysql.shop.purchases'
59
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://redpanda:8081'
@@ -15,19 +19,19 @@ FROM KAFKA BROKER 'redpanda:9092' TOPIC 'mysql.shop.users'
1519
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://redpanda:8081'
1620
ENVELOPE DEBEZIUM;
1721

18-
CREATE SOURCE json_pageviews
19-
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'pageviews'
20-
FORMAT BYTES;
21-
22-
22+
CREATE SOURCE vendors
23+
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'mysql.shop.vendors'
24+
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://redpanda:8081'
25+
ENVELOPE DEBEZIUM;
2326

2427
-- CREATE NON-MATERIALIZED VIEW TO PARSE JSON PAGEVIEWS
2528

2629
CREATE VIEW pageview_stg AS
2730
SELECT
2831
*,
2932
regexp_match(url, '/(products|profiles)/')[1] AS pageview_type,
30-
(regexp_match(url, '/(?:products|profiles)/(\d+)')[1])::INT AS target_id
33+
(regexp_match(url, '/(?:products|profiles)/(\d+)')[1])::INT AS target_id,
34+
to_timestamp(received_at) as received_at_ts
3135
FROM (
3236
SELECT
3337
(data->'user_id')::INT AS user_id,
@@ -43,94 +47,22 @@ CREATE VIEW pageview_stg AS
4347
)
4448
);
4549

50+
-- CREATE AN ANALYTICAL AGGREGATION FOR VENDORS
4651

47-
-- CREATE ANALYTICAL VIEWS
48-
49-
CREATE VIEW purchases_by_item AS
50-
SELECT
51-
item_id,
52-
SUM(purchase_price) as revenue,
53-
COUNT(id) AS orders,
54-
SUM(quantity) AS items_sold
55-
FROM purchases GROUP BY 1;
56-
57-
CREATE VIEW pageviews_by_item AS
58-
SELECT
59-
target_id as item_id,
60-
COUNT(*) AS pageviews
61-
FROM pageview_stg
62-
WHERE pageview_type = 'products'
63-
GROUP BY 1;
64-
65-
CREATE VIEW purchases_by_item_last_hour AS
66-
SELECT
67-
item_id,
68-
SUM(purchase_price) as revenue,
69-
COUNT(id) AS orders,
70-
SUM(quantity) AS items_sold
71-
FROM purchases
72-
WHERE mz_logical_timestamp() < (extract(epoch from created_at)*1000 + 3600000)::numeric
73-
GROUP BY 1;
74-
75-
CREATE VIEW pageviews_by_item_last_hour AS
76-
SELECT
77-
target_id as item_id,
78-
COUNT(*) AS pageviews
79-
FROM pageview_stg
80-
WHERE pageview_type = 'products' AND mz_logical_timestamp() < (received_at*1000 + 3600000)::numeric
81-
GROUP BY 1;
82-
83-
CREATE MATERIALIZED VIEW item_summary AS
84-
SELECT
85-
items.id as item_id,
86-
items.name,
87-
items.category,
88-
SUM(purchases_by_item.items_sold) as items_sold,
89-
SUM(purchases_by_item.orders) as orders,
90-
SUM(purchases_by_item.revenue) as revenue,
91-
SUM(pageviews_by_item.pageviews) as pageviews,
92-
SUM(purchases_by_item_last_hour.orders) as last_hour_orders,
93-
SUM(pageviews_by_item_last_hour.pageviews) as last_hour_pageviews
94-
FROM items
95-
JOIN purchases_by_item ON purchases_by_item.item_id = items.id
96-
JOIN pageviews_by_item ON pageviews_by_item.item_id = items.id
97-
JOIN purchases_by_item_last_hour ON purchases_by_item_last_hour.item_id = items.id
98-
JOIN pageviews_by_item_last_hour ON pageviews_by_item_last_hour.item_id = items.id
99-
GROUP BY 1, 2, 3;
100-
101-
102-
-- CREATE USER-FACING ANALYTICS VIEWS
103-
104-
CREATE MATERIALIZED VIEW profile_views_per_minute_last_10 AS
105-
SELECT
106-
target_id as user_id,
107-
date_trunc('minute', to_timestamp(received_at)) as received_at_minute,
108-
COUNT(*) as pageviews
109-
FROM pageview_stg
110-
WHERE
111-
pageview_type = 'profiles' AND
112-
mz_logical_timestamp() < (received_at*1000 + 600000)::numeric
113-
GROUP BY 1, 2;
114-
115-
CREATE MATERIALIZED VIEW profile_views AS
116-
SELECT
117-
target_id AS owner_id,
118-
user_id AS viewer_id,
119-
received_at AS received_at
120-
FROM (SELECT DISTINCT target_id FROM pageview_stg) grp,
121-
LATERAL (
122-
SELECT user_id, received_at FROM pageview_stg
123-
WHERE target_id = grp.target_id
124-
ORDER BY received_at DESC LIMIT 10
125-
);
126-
127-
CREATE MATERIALIZED VIEW profile_views_enriched AS
52+
CREATE MATERIALIZED VIEW agg_vendors_minute AS
12853
SELECT
129-
owner.id as owner_id,
130-
owner.email as owner_email,
131-
viewers.id as viewer_id,
132-
viewers.email as viewer_email,
133-
profile_views.received_at
134-
FROM profile_views
135-
JOIN users owner ON profile_views.owner_id = owner.id
136-
JOIN users viewers ON profile_views.viewer_id = viewers.id;
54+
vendors.id as vendor_id,
55+
vendors.name as vendor_name,
56+
minute_series.m,
57+
SUM(purchases.quantity) as items_sold,
58+
COUNT(purchases.id) as orders,
59+
SUM(purchases.purchase_price) as revenue,
60+
COUNT(pageview_stg.url) as pageviews
61+
FROM vendors
62+
JOIN items ON items.vendor_id = vendors.id
63+
JOIN (
64+
SELECT generate_series('2022-05-19 00:00:00', '2022-05-20 00:00:00', '1 MINUTE') as m
65+
) minute_series ON true
66+
LEFT JOIN purchases ON purchases.item_id = items.id AND date_trunc('minute', purchases.created_at) = minute_series.m
67+
LEFT JOIN pageview_stg ON pageview_stg.target_id = items.id AND pageview_stg.pageview_type = 'products' AND date_trunc('minute', pageview_stg.received_at_ts) = minute_series.m
68+
GROUP BY 1, 2, 3;

0 commit comments

Comments
 (0)