Skip to content

Commit 2683c3f

Browse files
kevinhartmanc-w
authored andcommitted
Remove mat views, use counter tables, and fix doubling counting (#31)
Resolves CatalystCode/project-fortis-pipeline#194
1 parent 0037982 commit 2683c3f

File tree

18 files changed

+538
-1391
lines changed

18 files changed

+538
-1391
lines changed

.env

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@ FEATURE_SERVICE_PORT=9090
88
CASSANDRA_SEED_DATA_URL=https://raw.githubusercontent.com/CatalystCode/project-fortis/master/project-fortis-pipeline/localdeploy/seed-data/seed-data-twitter.tar.gz
99
AD_CLIENT_ID=bf1ceec7-cfc7-49c5-9ed3-c6390b87dda5
1010
USERS=scicoria@microsoft.com,erisch@microsoft.com
11-
ADMINS=clewolff@microsoft.com,stmarker@microsoft.com,naros@microsoft.com
11+
ADMINS=clewolff@microsoft.com,stmarker@microsoft.com,naros@microsoft.com,keha@microsoft.com

project-fortis-pipeline/ops/storage-ddls/cassandra-setup.cql

Lines changed: 113 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -4,72 +4,79 @@ CREATE KEYSPACE fortis WITH replication = {
44
'replication_factor': 3
55
};
66

7-
// *****************************************************************************
8-
// * Down migration
9-
// *****************************************************************************
10-
11-
DROP MATERIALIZED VIEW IF EXISTS fortis.eventbatches;
12-
DROP MATERIALIZED VIEW IF EXISTS fortis.popularsources;
13-
DROP MATERIALIZED VIEW IF EXISTS fortis.populartopics;
14-
DROP MATERIALIZED VIEW IF EXISTS fortis.eventplacesbysource;
7+
-- *****************************************************************************
8+
-- * Down migration
9+
-- *****************************************************************************
1510

1611
DROP TABLE IF EXISTS fortis.conjunctivetopics;
1712
DROP TABLE IF EXISTS fortis.computedtiles;
1813
DROP TABLE IF EXISTS fortis.heatmap;
14+
DROP TABLE IF EXISTS fortis.popularsources;
15+
DROP TABLE IF EXISTS fortis.populartopics;
1916
DROP TABLE IF EXISTS fortis.popularplaces;
20-
DROP TABLE IF EXISTS fortis.eventplaces;
17+
DROP TABLE IF EXISTS fortis.eventplacesbysource;
2118
DROP TABLE IF EXISTS fortis.events;
2219
DROP TABLE IF EXISTS fortis.computedtrends;
20+
DROP TABLE IF EXISTS fortis.eventplaces;
21+
DROP TABLE IF EXISTS fortis.eventsbypipeline;
22+
2323

24-
// *****************************************************************************
25-
// * Data tables
26-
// *****************************************************************************
24+
-- *****************************************************************************
25+
-- * Data tables
26+
-- *****************************************************************************
27+
28+
-- *****************************************************************************
29+
-- * Partition key: (pipelinekey, conjunctiontopic1, conjunctiontopic2, conjunctiontopic3, tilez, tileid, externalsourceid, periodtype)
30+
-- *
31+
-- * Spark partitions TileRow data once according to the above partition key and
32+
-- * performs writes to these tables without shuffling.
33+
-- *****************************************************************************
2734

2835
CREATE TABLE fortis.computedtiles (
29-
perioddate timestamp,
30-
periodtype text,
3136
pipelinekey text,
32-
tilez int,
33-
tileid text,
34-
externalsourceid text,
35-
mentioncount bigint,
36-
avgsentimentnumerator bigint,
3737
conjunctiontopic1 text,
3838
conjunctiontopic2 text,
3939
conjunctiontopic3 text,
40-
PRIMARY KEY ((periodtype, conjunctiontopic1, conjunctiontopic2, conjunctiontopic3, tilez, pipelinekey, externalsourceid, tileid), perioddate)
40+
tilez int,
41+
tileid text,
42+
externalsourceid text,
43+
periodtype text,
44+
perioddate timestamp,
45+
mentioncount counter,
46+
avgsentimentnumerator counter,
47+
PRIMARY KEY ((pipelinekey, conjunctiontopic1, conjunctiontopic2, conjunctiontopic3, tilez, tileid, externalsourceid, periodtype), perioddate)
4148
);
4249

4350
CREATE TABLE fortis.heatmap (
44-
perioddate timestamp,
45-
periodtype text,
4651
pipelinekey text,
52+
conjunctiontopic1 text,
53+
conjunctiontopic2 text,
54+
conjunctiontopic3 text,
4755
tilez int,
4856
tileid text,
49-
heatmaptileid text,
5057
externalsourceid text,
58+
periodtype text,
59+
perioddate timestamp,
60+
heatmaptileid text,
5161
mentioncount counter,
5262
avgsentimentnumerator counter,
53-
conjunctiontopic1 text,
54-
conjunctiontopic2 text,
55-
conjunctiontopic3 text,
56-
PRIMARY KEY ((periodtype, conjunctiontopic1, conjunctiontopic2, conjunctiontopic3, tilez, pipelinekey, externalsourceid, tileid), perioddate, heatmaptileid)
63+
PRIMARY KEY ((pipelinekey, conjunctiontopic1, conjunctiontopic2, conjunctiontopic3, tilez, tileid, externalsourceid, periodtype), perioddate, heatmaptileid)
5764
);
5865

5966
CREATE TABLE fortis.popularplaces (
60-
perioddate timestamp,
61-
periodtype text,
6267
pipelinekey text,
63-
externalsourceid text,
64-
placeid text,
65-
tilez int,
66-
tileid text,
6768
conjunctiontopic1 text,
6869
conjunctiontopic2 text,
6970
conjunctiontopic3 text,
71+
tilez int,
72+
tileid text,
73+
externalsourceid text,
74+
periodtype text,
75+
perioddate timestamp,
76+
placeid text,
7077
mentioncount counter,
7178
avgsentimentnumerator counter,
72-
PRIMARY KEY ((periodtype, pipelinekey, externalsourceid, conjunctiontopic1, conjunctiontopic2, conjunctiontopic3, tilez, tileid), perioddate, placeid)
79+
PRIMARY KEY ((pipelinekey, conjunctiontopic1, conjunctiontopic2, conjunctiontopic3, tilez, tileid, externalsourceid, periodtype), perioddate, placeid)
7380
);
7481

7582
CREATE TABLE fortis.conjunctivetopics (
@@ -86,20 +93,20 @@ CREATE TABLE fortis.conjunctivetopics (
8693
);
8794

8895
CREATE TABLE fortis.eventplaces(
89-
eventid text,
96+
pipelinekey text,
9097
conjunctiontopic1 text,
9198
conjunctiontopic2 text,
9299
conjunctiontopic3 text,
93-
tileid text,
94100
tilez int,
101+
tileid text,
102+
externalsourceid text, -- TODO: should this be part of the Primary Key?
103+
eventtime timestamp,
104+
eventid text,
105+
placeid text,
95106
centroidlat double,
96107
centroidlon double,
97-
placeid text,
98108
insertiontime timestamp,
99-
eventtime timestamp,
100-
pipelinekey text,
101-
externalsourceid text,
102-
PRIMARY KEY ((conjunctiontopic1, conjunctiontopic2, conjunctiontopic3, pipelinekey, tilez, tileid), eventtime, eventid, placeid)
109+
PRIMARY KEY ((pipelinekey, conjunctiontopic1, conjunctiontopic2, conjunctiontopic3, tilez, tileid), eventtime, eventid, placeid)
103110
) WITH CLUSTERING ORDER BY (eventtime DESC);
104111

105112
CREATE TABLE fortis.events(
@@ -120,83 +127,68 @@ CREATE TABLE fortis.events(
120127
PRIMARY KEY (eventid)
121128
);
122129

123-
// *****************************************************************************
124-
// * Views
125-
// *****************************************************************************
126-
127-
CREATE MATERIALIZED VIEW fortis.populartopics
128-
AS SELECT periodtype, conjunctiontopic1, conjunctiontopic2, conjunctiontopic3, externalsourceid, pipelinekey, tilez,
129-
tileid, perioddate, mentioncount, avgsentimentnumerator
130-
FROM fortis.computedtiles
131-
WHERE periodtype IS NOT NULL
132-
AND conjunctiontopic1 IS NOT NULL
133-
AND conjunctiontopic2 = ''
134-
AND conjunctiontopic3 = ''
135-
AND avgsentimentnumerator IS NOT NULL
136-
AND mentioncount IS NOT NULL
137-
AND externalsourceid IS NOT NULL
138-
AND pipelinekey IS NOT NULL
139-
AND tilez IS NOT NULL
140-
AND tileid IS NOT NULL
141-
AND perioddate IS NOT NULL
142-
PRIMARY KEY ((periodtype, externalsourceid, tilez, pipelinekey, tileid, conjunctiontopic1, conjunctiontopic2, conjunctiontopic3), perioddate, mentioncount)
143-
WITH CLUSTERING ORDER BY (perioddate DESC, mentioncount DESC);
144-
145-
CREATE MATERIALIZED VIEW fortis.eventplacesbysource
146-
AS SELECT eventid, pipelinekey, eventtime, centroidlat, centroidlon, placeid, conjunctiontopic1,
147-
conjunctiontopic2, conjunctiontopic3, externalsourceid, tileid, tilez
148-
FROM fortis.eventplaces
149-
WHERE eventid IS NOT NULL
150-
AND pipelinekey IS NOT NULL
151-
AND eventtime IS NOT NULL
152-
AND tileid IS NOT NULL
153-
AND tilez IS NOT NULL
154-
AND placeid IS NOT NULL
155-
AND externalsourceid IS NOT NULL
156-
AND conjunctiontopic1 IS NOT NULL
157-
AND conjunctiontopic2 IS NOT NULL
158-
AND conjunctiontopic3 IS NOT NULL
159-
AND centroidlat IS NOT NULL
160-
AND centroidlon IS NOT NULL
161-
PRIMARY KEY ((conjunctiontopic1, conjunctiontopic2, conjunctiontopic3, pipelinekey, tilez, externalsourceid, tileid), eventtime, placeid, eventid);
162-
163-
// Allows for linking the batchid to saveToCassandra spark call so we can filter out dupes from the original rdd.
164-
CREATE MATERIALIZED VIEW fortis.eventbatches
165-
AS SELECT batchid, eventid
166-
FROM fortis.events
167-
WHERE batchid IS NOT NULL
168-
AND eventid IS NOT NULL
169-
PRIMARY KEY (batchid, eventid);
170-
171-
CREATE MATERIALIZED VIEW fortis.popularsources
172-
AS SELECT periodtype, conjunctiontopic1, conjunctiontopic2, conjunctiontopic3, tilez, externalsourceid, pipelinekey,
173-
tileid, perioddate, mentioncount, avgsentimentnumerator
174-
FROM fortis.computedtiles
175-
WHERE periodtype IS NOT NULL
176-
AND conjunctiontopic1 IS NOT NULL
177-
AND conjunctiontopic2 IS NOT NULL
178-
AND conjunctiontopic3 IS NOT NULL
179-
AND avgsentimentnumerator IS NOT NULL
180-
AND mentioncount IS NOT NULL
181-
AND externalsourceid IS NOT NULL
182-
AND pipelinekey IS NOT NULL
183-
AND tilez IS NOT NULL
184-
AND tileid IS NOT NULL
185-
AND perioddate IS NOT NULL
186-
PRIMARY KEY ((periodtype, conjunctiontopic1, conjunctiontopic2, conjunctiontopic3, tilez, pipelinekey, tileid), perioddate, mentioncount, externalsourceid)
187-
WITH CLUSTERING ORDER BY (perioddate DESC, mentioncount DESC);
188-
189-
CREATE MATERIALIZED VIEW fortis.eventsbypipeline
190-
AS SELECT eventid, pipelinekey, eventtime,conjunctiontopic3,conjunctiontopic2,tilez,tileid,placeid,conjunctiontopic1
191-
FROM fortis.eventplaces
192-
WHERE eventid IS NOT NULL
193-
AND pipelinekey IS NOT NULL
194-
AND eventtime IS NOT NULL
195-
AND conjunctiontopic1 IS NOT NULL
196-
AND conjunctiontopic2 = ''
197-
AND conjunctiontopic3 = ''
198-
AND tilez = 15
199-
AND tileid IS NOT NULL
200-
AND placeid IS NOT NULL
201-
PRIMARY KEY ((pipelinekey, conjunctiontopic1, conjunctiontopic2, conjunctiontopic3, tilez), eventtime, eventid, tileid, placeid)
202-
WITH CLUSTERING ORDER BY (eventtime DESC);
130+
CREATE TABLE fortis.populartopics(
131+
pipelinekey text,
132+
conjunctiontopic1 text,
133+
conjunctiontopic2 text, -- always set to ''
134+
conjunctiontopic3 text, -- always set to ''
135+
tilez int,
136+
tileid text,
137+
externalsourceid text,
138+
periodtype text,
139+
perioddate timestamp,
140+
mentioncount counter,
141+
avgsentimentnumerator counter,
142+
PRIMARY KEY ((pipelinekey, conjunctiontopic1, conjunctiontopic2, conjunctiontopic3, tilez, tileid, externalsourceid, periodtype), perioddate)
143+
) WITH CLUSTERING ORDER BY (perioddate DESC);
144+
145+
CREATE TABLE fortis.eventplacesbysource(
146+
pipelinekey text,
147+
conjunctiontopic1 text,
148+
conjunctiontopic2 text,
149+
conjunctiontopic3 text,
150+
tilez int,
151+
tileid text,
152+
externalsourceid text,
153+
periodtype text, -- always set to 'day'
154+
eventtime timestamp,
155+
eventid text,
156+
placeid text,
157+
centroidlat double,
158+
centroidlon double,
159+
insertiontime timestamp,
160+
PRIMARY KEY ((pipelinekey, conjunctiontopic1, conjunctiontopic2, conjunctiontopic3, tilez, tileid, externalsourceid, periodtype), eventtime, eventid, placeid)
161+
) WITH CLUSTERING ORDER BY (eventtime DESC);
162+
163+
CREATE TABLE fortis.popularsources (
164+
pipelinekey text,
165+
conjunctiontopic1 text,
166+
conjunctiontopic2 text,
167+
conjunctiontopic3 text,
168+
tilez int,
169+
tileid text,
170+
periodtype text,
171+
perioddate timestamp,
172+
externalsourceid text,
173+
mentioncount counter,
174+
avgsentimentnumerator counter,
175+
PRIMARY KEY ((pipelinekey, conjunctiontopic1, conjunctiontopic2, conjunctiontopic3, tilez, tileid, periodtype), perioddate, externalsourceid)
176+
) WITH CLUSTERING ORDER BY (perioddate DESC);
177+
178+
CREATE TABLE fortis.eventsbypipeline(
179+
pipelinekey text,
180+
conjunctiontopic1 text,
181+
conjunctiontopic2 text,
182+
conjunctiontopic3 text,
183+
tilez int,
184+
tileid text,
185+
externalsourceid text,
186+
eventtime timestamp,
187+
eventid text,
188+
placeid text,
189+
centroidlat double,
190+
centroidlon double,
191+
insertiontime timestamp,
192+
PRIMARY KEY ((pipelinekey, conjunctiontopic1, conjunctiontopic2, conjunctiontopic3, tilez), eventtime, eventid, tileid, placeid)
193+
) WITH CLUSTERING ORDER BY (eventtime DESC);
194+

project-fortis-pipeline/ops/storage-ddls/settings-setup.cql

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ CREATE KEYSPACE settings WITH replication = {
44
'replication_factor': 3
55
};
66

7-
// *****************************************************************************
8-
// * Down migration
9-
// *****************************************************************************
7+
-- *****************************************************************************
8+
-- * Down migration
9+
-- *****************************************************************************
1010

1111
DROP TABLE IF EXISTS settings.users;
1212
DROP TABLE IF EXISTS settings.watchlist;
@@ -15,9 +15,9 @@ DROP TABLE IF EXISTS settings.sitesettings;
1515
DROP TABLE IF EXISTS settings.streams;
1616
DROP TABLE IF EXISTS settings.trustedsources;
1717

18-
// *****************************************************************************
19-
// * Configuration tables
20-
// *****************************************************************************
18+
-- *****************************************************************************
19+
-- * Configuration tables
20+
-- *****************************************************************************
2121

2222
CREATE TABLE settings.users(
2323
identifier text,

project-fortis-services/src/resolvers/Messages/queries.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ function byBbox(args, res) { // eslint-disable-line no-unused-vars
130130

131131
if (args.externalsourceid) {
132132
tagsParams.push(args.externalsourceid);
133+
tagsParams.push('day');
133134
tableName = 'eventplacesbysource';
134135
}
135136

@@ -144,7 +145,7 @@ function byBbox(args, res) { // eslint-disable-line no-unused-vars
144145
AND tileid IN ?
145146
AND tilez = ?
146147
AND pipelinekey IN ?
147-
${args.externalsourceid ? ' AND externalsourceid = ?' : ''}
148+
${args.externalsourceid ? ' AND externalsourceid = ? AND periodtype = ?' : ''}
148149
`.trim();
149150

150151
cassandraConnector.executeQueryWithPageState(tagsQuery, tagsParams, args.pageState, parseLimit(args.limit))
@@ -299,4 +300,4 @@ module.exports = {
299300
event: requiresRole(trackEvent(event, 'messageForEvent'), 'user'),
300301
translate: requiresRole(trackEvent(translate, 'translate'), 'user'),
301302
translateWords: requiresRole(trackEvent(translateWords, 'translateWords'), 'user')
302-
};
303+
};

0 commit comments

Comments
 (0)