Skip to content

Commit 85d3d41

Browse files
author
Andy Hattemer
committed
update
1 parent c999a83 commit 85d3d41

File tree

15 files changed

+378
-142
lines changed

15 files changed

+378
-142
lines changed

README.md

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
This is a quick proof-of-concept of how [Cube](https://cube.dev/) integrates with [Materialize]()
66

7-
- **Materialize** is a streaming database that
7+
- **Materialize** is a streaming database that
88
- **Cube** is a "headless BI" service that connects to databases or data warehouses and handles Data Modeling, Access Control, Caching and APIs
99

1010
## Starting Up
@@ -24,24 +24,16 @@ You'll need to have [docker and docker-compose installed](https://materialize.co
2424
docker-compose up -d
2525
```
2626

27-
**This may take several minutes to complete the first time you run it.** If all goes well, you'll have everything running in their own containers.
27+
**This may take several minutes to complete the first time you run it.** If all goes well, you'll have everything running in their own containers, with Debezium configured to ship changes from MySQL into Redpanda.
2828

29-
3. Create the initial schema of sources and materialized views in Materialize
29+
3. Confirm that everything is running as expected:
3030

3131
```shell session
32-
psql -h localhost -p 6875 -U materialize -f materialize/create.sql
32+
docker-compose ps
3333
```
3434

35-
4. Open http://localhost:4000/ in your browser and connect Cube.js to materialize:
36-
![image](https://user-images.githubusercontent.com/11527560/168123759-e63489d8-12d9-47c0-bd8b-493632370075.png)
37-
For the credentials, use:
38-
| Field | Value |
39-
|----------|----------------|
40-
| host | `materialized` |
41-
| port | `6875` |
42-
| database | `materialize` |
43-
| username | `materialize` |
44-
| password | _leave blank_ |
45-
46-
47-
5. Three example models have been pre-created in the `cube/schema` directory, which is mounted as a volume so the Cube docker image recognizes and loads them.
35+
4. Initialize the Materialize Schema
36+
37+
```shell session
38+
psql -h localhost -p 6875 -U materialize -f materialize/create.sql
39+
```

compose.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@ services:
55
- 4000:4000
66
environment:
77
- CUBEJS_DEV_MODE=true
8+
- CUBEJS_DB_TYPE=materialize
9+
- CUBEJS_DB_HOST=materialized
10+
- CUBEJS_DB_PORT=6875
11+
- CUBEJS_DB_NAME=materialize
12+
- CUBEJS_DB_USER=materialize
13+
- CUBEJS_API_SECRET=SECRET
814
volumes:
915
- ${PWD}/cube:/cube/conf
1016
materialized:

cube/schema/.ItemSummary.js.swp

-1 KB
Binary file not shown.

cube/schema/ItemSummary.js

Lines changed: 0 additions & 60 deletions
This file was deleted.

cube/schema/ProfileViewsEnriched.js

Lines changed: 0 additions & 33 deletions
This file was deleted.

cube/schema/ProfileViewsPerMinuteLast10.js

Lines changed: 0 additions & 32 deletions
This file was deleted.

cube/schema/Vendors.js

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
cube(`Vendors`, {
2+
refreshKey: {
3+
every: `1 second`
4+
},
5+
6+
sql: `SELECT * FROM public.agg_vendors_minute`,
7+
measures: {
8+
count: {
9+
type: `count`,
10+
drillMembers: [CUBE.name]
11+
},
12+
totalRevenue: {
13+
type: `sum`,
14+
sql: `revenue`,
15+
format: `currency`
16+
},
17+
totalOrders: {
18+
type: `sum`,
19+
sql: `orders`
20+
},
21+
totalPageviews: {
22+
type: `sum`,
23+
sql: `pageviews`
24+
},
25+
totalItemsSold: {
26+
type: `sum`,
27+
sql: `items_sold`
28+
},
29+
conversionRate: {
30+
type: `number`,
31+
sql: `SUM(orders)/SUM(pageviews)+1`,
32+
format: `percent`
33+
},
34+
},
35+
dimensions: {
36+
id: {
37+
sql: `vendor_id`,
38+
type: `number`
39+
},
40+
name: {
41+
sql: `vendor_name`,
42+
type: `string`
43+
},
44+
receivedAtMinute: {
45+
sql: `m`,
46+
type: `time`
47+
}
48+
},
49+
dataSource: `default`,
50+
});

loadgen/loadgen/Dockerfile

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
FROM python:3.9.9-bullseye
2+
3+
WORKDIR /workdir
4+
5+
COPY requirements.txt .
6+
RUN pip install --no-cache-dir -r requirements.txt
7+
8+
COPY . .
9+
10+
ENTRYPOINT ["python", "generate_load.py"]

loadgen/loadgen/generate_load.py

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
import json
2+
import math
3+
import os
4+
import random
5+
import time
6+
7+
import barnum
8+
from kafka import KafkaProducer
9+
from mysql.connector import Error, connect
10+
11+
# CONFIG
12+
vendorSeedCount = 100
13+
userSeedCount = 10000
14+
itemSeedCount = 1000
15+
purchaseGenCount = 5000000
16+
purchaseGenEveryMS = 100
17+
pageviewMultiplier = 75 # Translates to 75x purchases, currently 750/sec or 65M/day
18+
itemInventoryMin = 1000
19+
itemInventoryMax = 5000
20+
itemPriceMin = 5
21+
itemPriceMax = 500
22+
mysqlHost = "mysql"
23+
mysqlPort = "3306"
24+
mysqlUser = "mysqluser"
25+
mysqlPass = "mysqlpw"
26+
kafkaHostPort = os.getenv("KAFKA_ADDR", "redpanda:9092")
27+
kafkaTopic = "pageviews"
28+
debeziumHostPort = "debezium:8083"
29+
channels = ["organic search", "paid search", "referral", "social", "display"]
30+
categories = ["widgets", "gadgets", "doodads", "clearance"]
31+
32+
# INSERT TEMPLATES
33+
vendor_insert = "INSERT INTO shop.vendors (name) VALUES ( %s )"
34+
item_insert = "INSERT INTO shop.items (vendor_id, name, category, price, inventory) VALUES ( %s, %s, %s, %s, %s )"
35+
user_insert = "INSERT INTO shop.users (email, is_vip) VALUES ( %s, %s )"
36+
purchase_insert = "INSERT INTO shop.purchases (user_id, item_id, quantity, purchase_price) VALUES ( %s, %s, %s, %s )"
37+
38+
# Initialize Kafka
39+
producer = KafkaProducer(
40+
bootstrap_servers=[kafkaHostPort],
41+
value_serializer=lambda x: json.dumps(x).encode("utf-8"),
42+
)
43+
44+
def generatePageview(viewer_id, target_id, page_type):
45+
return {
46+
"user_id": viewer_id,
47+
"url": f"/{page_type}/{target_id}",
48+
"channel": random.choice(channels),
49+
"received_at": int(time.time()),
50+
}
51+
52+
53+
try:
54+
with connect(
55+
host=mysqlHost,
56+
user=mysqlUser,
57+
password=mysqlPass,
58+
) as connection:
59+
with connection.cursor() as cursor:
60+
print("Seeding data...")
61+
print(" - Vendors")
62+
cursor.executemany(
63+
vendor_insert,
64+
[
65+
(
66+
[barnum.create_company_name()]
67+
)
68+
for i in range(vendorSeedCount)
69+
],
70+
)
71+
print(" - Items")
72+
cursor.executemany(
73+
item_insert,
74+
[
75+
(
76+
random.randint(1, vendorSeedCount),
77+
barnum.create_nouns(),
78+
random.choice(categories),
79+
random.randint(itemPriceMin * 100, itemPriceMax * 100) / 100,
80+
random.randint(itemInventoryMin, itemInventoryMax),
81+
)
82+
for i in range(itemSeedCount)
83+
],
84+
)
85+
cursor.executemany(
86+
user_insert,
87+
[
88+
(barnum.create_email(), (random.randint(0, 10) > 8))
89+
for i in range(userSeedCount)
90+
],
91+
)
92+
connection.commit()
93+
94+
print("Getting item ID and PRICEs...")
95+
cursor.execute("SELECT id, price FROM shop.items")
96+
item_prices = [(row[0], row[1]) for row in cursor]
97+
98+
print("Preparing to loop + seed kafka pageviews and purchases")
99+
for i in range(purchaseGenCount):
100+
# Get a user and item to purchase
101+
purchase_item = random.choice(item_prices)
102+
purchase_user = random.randint(0, userSeedCount - 1)
103+
purchase_quantity = random.randint(1, 5)
104+
105+
# Write purchaser pageview
106+
producer.send(
107+
kafkaTopic,
108+
key=str(purchase_user).encode("ascii"),
109+
value=generatePageview(purchase_user, purchase_item[0], "products"),
110+
)
111+
112+
# Write random pageviews to products or profiles
113+
pageviewOscillator = int(
114+
pageviewMultiplier + (math.sin(time.time() / 1000) * 50)
115+
)
116+
for i in range(pageviewOscillator):
117+
rand_user = random.randint(0, userSeedCount)
118+
rand_page_type = random.choice(["products", "profiles"])
119+
target_id_max_range = (
120+
itemSeedCount if rand_page_type == "products" else userSeedCount
121+
)
122+
producer.send(
123+
kafkaTopic,
124+
key=str(rand_user).encode("ascii"),
125+
value=generatePageview(
126+
rand_user,
127+
random.randint(0, target_id_max_range),
128+
rand_page_type,
129+
),
130+
)
131+
132+
# Write purchase row
133+
cursor.execute(
134+
purchase_insert,
135+
(
136+
purchase_user,
137+
purchase_item[0],
138+
purchase_quantity,
139+
purchase_item[1] * purchase_quantity,
140+
),
141+
)
142+
connection.commit()
143+
144+
# Pause
145+
time.sleep(purchaseGenEveryMS / 1000)
146+
147+
connection.close()
148+
149+
except Error as e:
150+
print(e)

0 commit comments

Comments
 (0)