Skip to content

Commit 118745b

Browse files
committed
Parallelize queries with too large IN clauses
The large IN clauses is causing queries for pages with many keywords to time out. Reference reading: https://lostechies.com/ryansvihla/2014/09/22/cassandra-query-patterns-not-using-the-in-query-for-multiple-partitions/
1 parent 602e4cc commit 118745b

File tree

6 files changed

+151
-115
lines changed

6 files changed

+151
-115
lines changed

project-fortis-services/config.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ module.exports = {
2020
},
2121
cassandra: {
2222
fetchSize: process.env.FETCH_SIZE || 1000,
23+
maxConcurrentQueries: process.env.MAX_CONCURRENT_QUERIES || 50,
2324
maxOperationsPerBatch: process.env.MAX_OPERATIONS_PER_BATCH || 10,
2425
maxConcurrentBatches: process.env.MAX_CONCURRENT_BATCHES || 50,
2526
coreConnectionsPerHostLocal: process.env.CORE_CONNECTIONS_PER_HOST_LOCAL || 3,

project-fortis-services/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
"azure-event-hubs": "0.0.8",
3939
"azure-sb": "^0.10.4",
4040
"azure-storage": "^1.3.2",
41+
"bluebird": "^3.5.1",
4142
"body-parser": "^1.15.0",
4243
"cassandra-driver": "^3.2.2",
4344
"cors": "^2.8.4",

project-fortis-services/src/clients/cassandra/CassandraConnector.js

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
'use strict';
22

33
const Promise = require('promise');
4+
const PromiseMap = require('bluebird').map;
45
const cassandra = require('cassandra-driver');
6+
const flatten = require('lodash/flatten');
57
const { distance, consistencies } = cassandra.types;
68
const asyncEachLimit = require('async/eachLimit');
79
const chunk = require('lodash/chunk');
810
const { trackDependency, trackException } = require('../appinsights/AppInsightsClient');
911
const loggingClient = require('../appinsights/LoggingClient');
1012

1113
const {
12-
fetchSize, maxOperationsPerBatch, maxConcurrentBatches,
14+
fetchSize, maxOperationsPerBatch, maxConcurrentBatches, maxConcurrentQueries,
1315
coreConnectionsPerHostLocal, coreConnectionsPerHostRemote,
1416
cassandraHost, cassandraPort,
1517
cassandraPassword, cassandraUsername
@@ -136,6 +138,28 @@ function executeQuery(query, params, options) {
136138
});
137139
}
138140

141+
/**
142+
* @param {Array<{query: string, params: Array<string|map>}>} queries
143+
* @param {{fetchSize: int, consistency: int}} [options]
144+
* @returns {Promise.<object[]>}
145+
*/
146+
function executeQueries(queries, options) {
147+
const results = [];
148+
queries.forEach(() => results.push([]));
149+
150+
const runSingleQuery = trackDependency(executeQuery, 'Cassandra', 'executeQueries');
151+
152+
function makeQuery(query, i) {
153+
return runSingleQuery(query.query, query.params, options)
154+
.then(rows => {
155+
results[i] = rows;
156+
});
157+
}
158+
159+
return PromiseMap(queries, makeQuery, { concurrency: maxConcurrentQueries })
160+
.then(() => flatten(results));
161+
}
162+
139163
/**
140164
* @param {string} query
141165
* @param {string[]} params
@@ -199,6 +223,7 @@ function intialize() {
199223

200224
module.exports = {
201225
status,
226+
executeQueries,
202227
initialize: trackDependency(intialize, 'Cassandra', 'initialize'),
203228
executeBatchMutations: trackDependency(executeBatchMutations, 'Cassandra', 'executeBatchMutations'),
204229
executeQueryWithPageState: trackDependency(executeQueryWithPageState, 'Cassandra', 'executeQueryWithPageState'),

project-fortis-services/src/resolvers/Edges/queries.js

Lines changed: 62 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -155,34 +155,36 @@ function timeSeries(args, res) { // eslint-disable-line no-unused-vars
155155
return reject(`No tiles found for bounding box ${args.bbox.join(',')} and zoom ${args.zoomLevel}`);
156156
}
157157

158-
const query = `
159-
SELECT conjunctiontopic1, conjunctiontopic2, conjunctiontopic3, perioddate, mentioncount, avgsentimentnumerator, tileid
160-
FROM fortis.computedtiles
161-
WHERE periodtype = ?
162-
AND conjunctiontopic1 IN ?
163-
AND conjunctiontopic2 = ?
164-
AND conjunctiontopic3 = ?
165-
AND pipelinekey IN ?
166-
AND externalsourceid = ?
167-
AND tilez = ?
168-
AND perioddate <= ?
169-
AND perioddate >= ?
170-
AND tileid IN ?
171-
`.trim();
172-
173-
const params = [
174-
args.periodType,
175-
maintopics,
176-
...fromTopicListToConjunctionTopics(conjunctivetopics, MaxConjunctiveTopicsAllowed),
177-
args.pipelinekeys,
178-
args.externalsourceid,
179-
args.zoomLevel,
180-
args.toDate,
181-
args.fromDate,
182-
tiles
183-
];
184-
185-
return cassandraConnector.executeQuery(query, params)
158+
const queries = maintopics.map(maintopic => ({
159+
query: `
160+
SELECT conjunctiontopic1, conjunctiontopic2, conjunctiontopic3, perioddate, mentioncount, avgsentimentnumerator, tileid
161+
FROM fortis.computedtiles
162+
WHERE periodtype = ?
163+
AND conjunctiontopic1 = ?
164+
AND conjunctiontopic2 = ?
165+
AND conjunctiontopic3 = ?
166+
AND pipelinekey IN ?
167+
AND externalsourceid = ?
168+
AND tilez = ?
169+
AND perioddate <= ?
170+
AND perioddate >= ?
171+
AND tileid IN ?
172+
`.trim(),
173+
174+
params: [
175+
args.periodType,
176+
maintopic,
177+
...fromTopicListToConjunctionTopics(conjunctivetopics, MaxConjunctiveTopicsAllowed),
178+
args.pipelinekeys,
179+
args.externalsourceid,
180+
args.zoomLevel,
181+
args.toDate,
182+
args.fromDate,
183+
tiles
184+
]
185+
}))
186+
187+
return cassandraConnector.executeQueries(queries)
186188
.then(rows => {
187189
const labels = Array.from(makeSet(rows, row => row.conjunctiontopic1.toLowerCase())).map(row => ({ name: row.toLowerCase() }));
188190
const tiles = Array.from(makeSet(rows, row => row.tileid)).map(row => row);
@@ -240,36 +242,38 @@ function topTerms(args, res) { // eslint-disable-line no-unused-vars
240242
return reject(`No tiles found for bounding box ${args.bbox.join(',')} and zoom ${args.zoomLevel}`);
241243
}
242244

243-
const query = `
244-
SELECT mentioncount, conjunctiontopic1, avgsentimentnumerator
245-
FROM fortis.populartopics
246-
WHERE periodtype = ?
247-
AND pipelinekey IN ?
248-
AND externalsourceid = ?
249-
AND tilez = ?
250-
AND perioddate <= ?
251-
AND perioddate >= ?
252-
AND tileid IN ?
253-
AND conjunctiontopic1 IN ?
254-
AND conjunctiontopic2 = ''
255-
AND conjunctiontopic3 = ''
256-
LIMIT ?
257-
`.trim();
258-
259-
//todo: figure out why node driver timezone conversion is filtering out a majority of records
260-
const params = [
261-
args.periodType,
262-
args.pipelinekeys,
263-
args.externalsourceid,
264-
args.zoomLevel,
265-
args.toDate,
266-
args.fromDate,
267-
tiles,
268-
terms.edges.map(item => item.name),
269-
MaxFetchedRows
270-
];
271-
272-
return cassandraConnector.executeQuery(query, params, { fetchSize })
245+
const queries = terms.edges.map(item => ({
246+
query: `
247+
SELECT mentioncount, conjunctiontopic1, avgsentimentnumerator
248+
FROM fortis.populartopics
249+
WHERE periodtype = ?
250+
AND pipelinekey IN ?
251+
AND externalsourceid = ?
252+
AND tilez = ?
253+
AND perioddate <= ?
254+
AND perioddate >= ?
255+
AND tileid IN ?
256+
AND conjunctiontopic1 = ?
257+
AND conjunctiontopic2 = ''
258+
AND conjunctiontopic3 = ''
259+
LIMIT ?
260+
`.trim(),
261+
262+
//todo: figure out why node driver timezone conversion is filtering out a majority of records
263+
params: [
264+
args.periodType,
265+
args.pipelinekeys,
266+
args.externalsourceid,
267+
args.zoomLevel,
268+
args.toDate,
269+
args.fromDate,
270+
tiles,
271+
item.name,
272+
MaxFetchedRows
273+
],
274+
}));
275+
276+
return cassandraConnector.executeQueries(queries, { fetchSize })
273277
.then(rows =>
274278
resolve({
275279
edges: aggregateBy(rows, row => `${row.conjunctiontopic1}`, row => ({

project-fortis-services/src/resolvers/Messages/queries.js

Lines changed: 50 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -75,33 +75,36 @@ function fetchPlacesById(feature) { // eslint-disable-line no-unused-vars
7575

7676
function queryEventsTable(eventIdResponse, args) {
7777
return new Promise((resolve, reject) => {
78+
if (!eventIdResponse.rows.length) {
79+
return resolve({ type: 'FeatureCollection', features: [] });
80+
}
81+
7882
const eventIds = makeSet(eventIdResponse.rows, row => row.eventid);
79-
let eventsQuery = `
80-
SELECT *
81-
FROM fortis.events
82-
WHERE eventid IN ?
83-
`.trim();
8483

85-
let eventsParams = [
86-
limitForInClause(eventIds)
87-
];
84+
const eventsQueries = Array.from(eventIds).map(eventId => ({
85+
query: `
86+
SELECT *
87+
FROM fortis.events
88+
WHERE eventid = ?
89+
`.trim(),
8890

89-
if (eventIdResponse.rows.length) {
90-
cassandraConnector.executeQuery(eventsQuery, eventsParams)
91-
.then(rows => {
92-
const sortedEvents = rows.sort((a, b)=>moment.utc(b.eventtime.getTime()).diff(moment.utc(a.eventtime.getTime())));
93-
94-
resolve({
95-
type: 'FeatureCollection',
96-
features: sortedEvents.map(eventToFeature),
97-
bbox: args.bbox,
98-
pageState: eventIdResponse.pageState
99-
});
100-
})
101-
.catch(reject);
102-
} else {
103-
resolve({ type: 'FeatureCollection', features: [] });
104-
}
91+
params: [
92+
eventId
93+
]
94+
}));
95+
96+
cassandraConnector.executeQueries(eventsQueries)
97+
.then(rows => {
98+
const sortedEvents = rows.sort((a, b)=>moment.utc(b.eventtime.getTime()).diff(moment.utc(a.eventtime.getTime())));
99+
100+
resolve({
101+
type: 'FeatureCollection',
102+
features: sortedEvents.map(eventToFeature),
103+
bbox: args.bbox,
104+
pageState: eventIdResponse.pageState
105+
});
106+
})
107+
.catch(reject);
105108
});
106109
}
107110

@@ -155,27 +158,29 @@ function byEdges(args, res) { // eslint-disable-line no-unused-vars
155158
return new Promise((resolve, reject) => {
156159
if (!args || !args.filteredEdges || !args.filteredEdges.length) return reject('No edges by which to filter specified');
157160

158-
const tagsQuery = `
159-
SELECT eventid
160-
FROM fortis.eventtopics
161-
WHERE topic IN ?
162-
AND pipelinekey = ?
163-
AND externalsourceid = ?
164-
AND eventtime <= ?
165-
AND eventtime >= ?
166-
LIMIT ?
167-
`.trim();
168-
169-
const tagsParams = [
170-
limitForInClause(toConjunctionTopics(args.mainTerm, args.filteredEdges).filter(topic => !!topic)),
171-
toPipelineKey(args.sourceFilter),
172-
'all',
173-
args.toDate,
174-
args.fromDate,
175-
parseLimit(args.limit)
176-
];
177-
178-
cassandraConnector.executeQuery(tagsQuery, tagsParams)
161+
const tagsQueries = toConjunctionTopics(args.mainTerm, args.filteredEdges).filter(topic => !!topic).map(topic => ({
162+
query: `
163+
SELECT eventid
164+
FROM fortis.eventtopics
165+
WHERE topic = ?
166+
AND pipelinekey = ?
167+
AND externalsourceid = ?
168+
AND eventtime <= ?
169+
AND eventtime >= ?
170+
LIMIT ?
171+
`.trim(),
172+
173+
params: [
174+
topic,
175+
toPipelineKey(args.sourceFilter),
176+
'all',
177+
args.toDate,
178+
args.fromDate,
179+
parseLimit(args.limit)
180+
]
181+
}));
182+
183+
cassandraConnector.executeQueries(tagsQueries)
179184
.then(rows => {
180185
return queryEventsTable(rows, args);
181186
})

project-fortis-services/src/resolvers/Settings/mutations.js

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -479,19 +479,19 @@ function removeBlacklist(args, res) { // eslint-disable-line no-unused-vars
479479
const termFilters = args && args.input && args.input.filters;
480480
if (!termFilters || !termFilters.length) return reject('No blacklists to remove specified.');
481481

482-
const termIds = termFilters.map(termFilter => termFilter.id);
483-
484-
const query = `
485-
DELETE
486-
FROM settings.blacklist
487-
WHERE id IN ?
488-
`;
482+
const queries = termFilters.map(termFilter => ({
483+
query: `
484+
DELETE
485+
FROM settings.blacklist
486+
WHERE id = ?
487+
`,
489488

490-
const params = [
491-
limitForInClause(termIds)
492-
];
489+
params: [
490+
termFilter.id
491+
]
492+
}));
493493

494-
cassandraConnector.executeQuery(query, params)
494+
cassandraConnector.executeQueries(queries)
495495
.then(() => {
496496
streamingController.notifyBlacklistUpdate();
497497
})

0 commit comments

Comments
 (0)