|
| 1 | +-- CREATE SOURCES |
| 2 | + |
| 3 | +CREATE SOURCE purchases |
| 4 | +FROM KAFKA BROKER 'redpanda:9092' TOPIC 'mysql.shop.purchases' |
| 5 | +FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://redpanda:8081' |
| 6 | +ENVELOPE DEBEZIUM; |
| 7 | + |
| 8 | +CREATE SOURCE items |
| 9 | +FROM KAFKA BROKER 'redpanda:9092' TOPIC 'mysql.shop.items' |
| 10 | +FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://redpanda:8081' |
| 11 | +ENVELOPE DEBEZIUM; |
| 12 | + |
| 13 | +CREATE SOURCE users |
| 14 | +FROM KAFKA BROKER 'redpanda:9092' TOPIC 'mysql.shop.users' |
| 15 | +FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://redpanda:8081' |
| 16 | +ENVELOPE DEBEZIUM; |
| 17 | + |
| 18 | +CREATE SOURCE json_pageviews |
| 19 | +FROM KAFKA BROKER 'redpanda:9092' TOPIC 'pageviews' |
| 20 | +FORMAT BYTES; |
| 21 | + |
| 22 | + |
| 23 | + |
| 24 | +-- CREATE NON-MATERIALIZED VIEW TO PARSE JSON PAGEVIEWS |
| 25 | + |
| 26 | +CREATE VIEW pageview_stg AS |
| 27 | + SELECT |
| 28 | + *, |
| 29 | + regexp_match(url, '/(products|profiles)/')[1] AS pageview_type, |
| 30 | + (regexp_match(url, '/(?:products|profiles)/(\d+)')[1])::INT AS target_id |
| 31 | + FROM ( |
| 32 | + SELECT |
| 33 | + (data->'user_id')::INT AS user_id, |
| 34 | + data->>'url' AS url, |
| 35 | + data->>'channel' AS channel, |
| 36 | + (data->>'received_at')::double AS received_at |
| 37 | + FROM ( |
| 38 | + SELECT CAST(data AS jsonb) AS data |
| 39 | + FROM ( |
| 40 | + SELECT convert_from(data, 'utf8') AS data |
| 41 | + FROM json_pageviews |
| 42 | + ) |
| 43 | + ) |
| 44 | + ); |
| 45 | + |
| 46 | + |
| 47 | +-- CREATE ANALYTICAL VIEWS |
| 48 | + |
| 49 | +CREATE VIEW purchases_by_item AS |
| 50 | + SELECT |
| 51 | + item_id, |
| 52 | + SUM(purchase_price) as revenue, |
| 53 | + COUNT(id) AS orders, |
| 54 | + SUM(quantity) AS items_sold |
| 55 | + FROM purchases GROUP BY 1; |
| 56 | + |
| 57 | +CREATE VIEW pageviews_by_item AS |
| 58 | + SELECT |
| 59 | + target_id as item_id, |
| 60 | + COUNT(*) AS pageviews |
| 61 | + FROM pageview_stg |
| 62 | + WHERE pageview_type = 'products' |
| 63 | + GROUP BY 1; |
| 64 | + |
| 65 | +CREATE VIEW purchases_by_item_last_hour AS |
| 66 | + SELECT |
| 67 | + item_id, |
| 68 | + SUM(purchase_price) as revenue, |
| 69 | + COUNT(id) AS orders, |
| 70 | + SUM(quantity) AS items_sold |
| 71 | + FROM purchases |
| 72 | + WHERE mz_logical_timestamp() < (extract(epoch from created_at)*1000 + 3600000)::numeric |
| 73 | + GROUP BY 1; |
| 74 | + |
| 75 | +CREATE VIEW pageviews_by_item_last_hour AS |
| 76 | + SELECT |
| 77 | + target_id as item_id, |
| 78 | + COUNT(*) AS pageviews |
| 79 | + FROM pageview_stg |
| 80 | + WHERE pageview_type = 'products' AND mz_logical_timestamp() < (received_at*1000 + 3600000)::numeric |
| 81 | + GROUP BY 1; |
| 82 | + |
| 83 | + CREATE MATERIALIZED VIEW item_summary AS |
| 84 | + SELECT |
| 85 | + items.id as item_id, |
| 86 | + items.name, |
| 87 | + items.category, |
| 88 | + SUM(purchases_by_item.items_sold) as items_sold, |
| 89 | + SUM(purchases_by_item.orders) as orders, |
| 90 | + SUM(purchases_by_item.revenue) as revenue, |
| 91 | + SUM(pageviews_by_item.pageviews) as pageviews, |
| 92 | + SUM(purchases_by_item_last_hour.orders) as last_hour_orders, |
| 93 | + SUM(pageviews_by_item_last_hour.pageviews) as last_hour_pageviews |
| 94 | + FROM items |
| 95 | + JOIN purchases_by_item ON purchases_by_item.item_id = items.id |
| 96 | + JOIN pageviews_by_item ON pageviews_by_item.item_id = items.id |
| 97 | + JOIN purchases_by_item_last_hour ON purchases_by_item_last_hour.item_id = items.id |
| 98 | + JOIN pageviews_by_item_last_hour ON pageviews_by_item_last_hour.item_id = items.id |
| 99 | + GROUP BY 1, 2, 3; |
| 100 | + |
| 101 | + |
| 102 | +-- CREATE USER-FACING ANALYTICS VIEWS |
| 103 | + |
| 104 | +CREATE MATERIALIZED VIEW profile_views_per_minute_last_10 AS |
| 105 | + SELECT |
| 106 | + target_id as user_id, |
| 107 | + date_trunc('minute', to_timestamp(received_at)) as received_at_minute, |
| 108 | + COUNT(*) as pageviews |
| 109 | + FROM pageview_stg |
| 110 | + WHERE |
| 111 | + pageview_type = 'profiles' AND |
| 112 | + mz_logical_timestamp() < (received_at*1000 + 600000)::numeric |
| 113 | + GROUP BY 1, 2; |
| 114 | + |
| 115 | +CREATE MATERIALIZED VIEW profile_views AS |
| 116 | + SELECT |
| 117 | + target_id AS owner_id, |
| 118 | + user_id AS viewer_id, |
| 119 | + received_at AS received_at |
| 120 | + FROM (SELECT DISTINCT target_id FROM pageview_stg) grp, |
| 121 | + LATERAL ( |
| 122 | + SELECT user_id, received_at FROM pageview_stg |
| 123 | + WHERE target_id = grp.target_id |
| 124 | + ORDER BY received_at DESC LIMIT 10 |
| 125 | + ); |
| 126 | + |
| 127 | +CREATE MATERIALIZED VIEW profile_views_enriched AS |
| 128 | + SELECT |
| 129 | + owner.id as owner_id, |
| 130 | + owner.email as owner_email, |
| 131 | + viewers.id as viewer_id, |
| 132 | + viewers.email as viewer_email, |
| 133 | + profile_views.received_at |
| 134 | + FROM profile_views |
| 135 | + JOIN users owner ON profile_views.owner_id = owner.id |
| 136 | + JOIN users viewers ON profile_views.viewer_id = viewers.id; |
0 commit comments