Skip to content

Commit cd31259

Browse files
authored
feat(webui): Create result metadata document in MongoDB to track search signal for Presto queries. (#1168)
1 parent e1c026f commit cd31259

File tree

9 files changed

+88
-74
lines changed

9 files changed

+88
-74
lines changed

components/webui/client/public/settings.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"ClpStorageEngine": "clp",
3-
"ClpQueryEngine": "native",
3+
"ClpQueryEngine": "clp",
44
"MongoDbSearchResultsMetadataCollectionName": "results-metadata",
55
"SqlDbClpArchivesTableName": "clp_archives",
66
"SqlDbClpDatasetsTableName": "clp_datasets",

components/webui/client/src/config/index.ts

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import {CLP_QUERY_ENGINES} from "../../../common/index.js";
12
import {settings} from "../settings";
23

34

@@ -9,14 +10,6 @@ enum CLP_STORAGE_ENGINES {
910
CLP_S = "clp-s",
1011
}
1112

12-
/**
13-
* Query engine options.
14-
*/
15-
enum CLP_QUERY_ENGINES {
16-
NATIVE = "native",
17-
PRESTO = "presto",
18-
}
19-
2013
const SETTINGS_STORAGE_ENGINE = settings.ClpStorageEngine as CLP_STORAGE_ENGINES;
2114
const SETTINGS_QUERY_ENGINE = settings.ClpQueryEngine as CLP_QUERY_ENGINES;
2215

components/webui/client/src/pages/SearchPage/SearchControls/index.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ const SearchControls = () => {
3131
return (
3232
<form onSubmit={handleSubmit}>
3333
<div className={styles["searchControlsContainer"]}>
34-
{SETTINGS_QUERY_ENGINE === CLP_QUERY_ENGINES.NATIVE ?
34+
{SETTINGS_QUERY_ENGINE !== CLP_QUERY_ENGINES.PRESTO ?
3535
(
3636
<>
3737
{CLP_STORAGE_ENGINES.CLP_S === SETTINGS_STORAGE_ENGINE && <Dataset/>}

components/webui/common/index.ts

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,33 @@ enum SEARCH_SIGNAL {
8888
RESP_QUERYING = "resp-querying",
8989
}
9090

91+
/**
92+
* Presto search-related signals.
93+
*
94+
* Note: Using type instead of enum to match `presto-client-node` type definition.
95+
*/
96+
type PRESTO_SEARCH_SIGNAL =
97+
| "WAITING_FOR_PREREQUISITES"
98+
| "QUEUED"
99+
| "WAITING_FOR_RESOURCES"
100+
| "DISPATCHING"
101+
| "PLANNING"
102+
| "STARTING"
103+
| "RUNNING"
104+
| "FINISHING"
105+
| "FINISHED"
106+
| "CANCELED"
107+
| "FAILED";
108+
109+
/**
110+
* CLP query engines.
111+
*/
112+
enum CLP_QUERY_ENGINES {
113+
CLP = "clp",
114+
CLP_S = "clp-s",
115+
PRESTO = "presto",
116+
}
117+
91118
/**
92119
* MongoDB document for search results metadata. `numTotalResults` is optional
93120
* since it is only set when the search job is completed.
@@ -98,13 +125,16 @@ interface SearchResultsMetadataDocument {
98125
// eslint-disable-next-line no-warning-comments
99126
// TODO: Replace with Nullable<string> when the `@common` directory refactoring is completed.
100127
errorMsg: string | null;
101-
lastSignal: SEARCH_SIGNAL;
128+
lastSignal: SEARCH_SIGNAL | PRESTO_SEARCH_SIGNAL;
102129
numTotalResults?: number;
130+
queryEngine: CLP_QUERY_ENGINES;
103131
}
104132
export {
133+
CLP_QUERY_ENGINES,
105134
SEARCH_SIGNAL,
106135
};
107136
export type {
137+
PRESTO_SEARCH_SIGNAL,
108138
SearchResultsMetadataDocument,
109139
ClientToServerEvents,
110140
Err,

components/webui/server/settings.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
"StreamFilesS3PathPrefix": null,
1919
"StreamFilesS3Profile": null,
2020

21-
"ClpQueryEngine": "native",
21+
"ClpQueryEngine": "clp",
2222
"PrestoHost": "localhost",
2323
"PrestoPort": 8889
2424
}

components/webui/server/src/routes/api/presto-search/index.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
import {FastifyPluginAsyncTypebox} from "@fastify/type-provider-typebox";
22
import {StatusCodes} from "http-status-codes";
33

4+
import {
5+
CLP_QUERY_ENGINES,
6+
type SearchResultsMetadataDocument,
7+
} from "../../../../../common/index.js";
8+
import settings from "../../../../settings.json" with {type: "json"};
49
import {ErrorSchema} from "../../../schemas/error.js";
510
import {
611
PrestoQueryJobCreationSchema,
@@ -28,6 +33,10 @@ const plugin: FastifyPluginAsyncTypebox = async (fastify) => {
2833
throw new Error("MongoDB database not found");
2934
}
3035

36+
const searchResultsMetadataCollection = mongoDb.collection<SearchResultsMetadataDocument>(
37+
settings.MongoDbSearchResultsMetadataCollectionName
38+
);
39+
3140
/**
3241
* Submits a search query.
3342
*/
@@ -50,6 +59,7 @@ const plugin: FastifyPluginAsyncTypebox = async (fastify) => {
5059
let searchJobId: string;
5160

5261
try {
62+
// eslint-disable-next-line max-lines-per-function
5363
searchJobId = await new Promise<string>((resolve, reject) => {
5464
let isResolved = false;
5565
Presto.client.execute({
@@ -100,14 +110,32 @@ const plugin: FastifyPluginAsyncTypebox = async (fastify) => {
100110
state: stats.state,
101111
}, "Presto search state updated");
102112

113+
// Insert metadata and resolve queryId on first call
103114
if (false === isResolved) {
115+
searchResultsMetadataCollection.insertOne({
116+
_id: queryId,
117+
lastSignal: stats.state,
118+
errorMsg: null,
119+
queryEngine: CLP_QUERY_ENGINES.PRESTO,
120+
}).catch((err: unknown) => {
121+
request.log.error(err, "Failed to insert Presto metadata");
122+
});
104123
isResolved = true;
105124
resolve(queryId);
125+
} else {
126+
// Update metadata on subsequent calls
127+
searchResultsMetadataCollection.updateOne(
128+
{_id: queryId},
129+
{$set: {lastSignal: stats.state}}
130+
).catch((err: unknown) => {
131+
request.log.error(err, "Failed to update Presto metadata");
132+
});
106133
}
107134
},
108135
success: () => {
109136
request.log.info("Presto search succeeded");
110137
},
138+
timeout: null,
111139
});
112140
});
113141
} catch (error) {

components/webui/server/src/routes/api/search/index.ts

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import {
55
import {StatusCodes} from "http-status-codes";
66

77
import {
8+
CLP_QUERY_ENGINES,
89
SEARCH_SIGNAL,
910
type SearchResultsMetadataDocument,
1011
} from "../../../../../common/index.js";
@@ -18,7 +19,6 @@ import {QUERY_JOB_TYPE} from "../../../typings/query.js";
1819
import {SEARCH_MAX_NUM_RESULTS} from "./typings.js";
1920
import {
2021
createMongoIndexes,
21-
updateSearchResultsMeta,
2222
updateSearchSignalWhenJobsFinish,
2323
} from "./utils.js";
2424

@@ -44,6 +44,8 @@ const plugin: FastifyPluginAsyncTypebox = async (fastify) => {
4444
settings.MongoDbSearchResultsMetadataCollectionName
4545
);
4646

47+
const queryEngine = settings.ClpQueryEngine as CLP_QUERY_ENGINES;
48+
4749
/**
4850
* Submits a search query and initiates the search process.
4951
*/
@@ -113,6 +115,7 @@ const plugin: FastifyPluginAsyncTypebox = async (fastify) => {
113115
_id: searchJobId.toString(),
114116
lastSignal: SEARCH_SIGNAL.RESP_QUERYING,
115117
errorMsg: null,
118+
queryEngine: queryEngine,
116119
});
117120

118121
// Defer signal update until after response is sent
@@ -197,16 +200,18 @@ const plugin: FastifyPluginAsyncTypebox = async (fastify) => {
197200
await QueryJobDbManager.cancelJob(searchJobId);
198201
await QueryJobDbManager.cancelJob(aggregationJobId);
199202

200-
await updateSearchResultsMeta({
201-
fields: {
202-
lastSignal: SEARCH_SIGNAL.RESP_DONE,
203-
errorMsg: "Query cancelled before it could be completed.",
203+
await searchResultsMetadataCollection.updateOne(
204+
{
205+
_id: searchJobId.toString(),
206+
lastSignal: SEARCH_SIGNAL.RESP_QUERYING,
204207
},
205-
jobId: searchJobId,
206-
lastSignal: SEARCH_SIGNAL.RESP_QUERYING,
207-
logger: request.log,
208-
searchResultsMetadataCollection: searchResultsMetadataCollection,
209-
});
208+
{
209+
$set: {
210+
lastSignal: SEARCH_SIGNAL.RESP_DONE,
211+
errorMsg: "Query cancelled before it could be completed.",
212+
},
213+
}
214+
);
210215
} catch (err: unknown) {
211216
const errMsg = "Failed to submit cancel request";
212217
request.log.error(

components/webui/server/src/routes/api/search/typings.ts

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,14 @@ import type {
77
Db,
88
} from "mongodb";
99

10-
import {
11-
SEARCH_SIGNAL,
12-
type SearchResultsMetadataDocument,
13-
} from "../../../../../common/index.js";
10+
import {type SearchResultsMetadataDocument} from "../../../../../common/index.js";
1411

1512

1613
/**
1714
* The maximum number of results to retrieve for a search.
1815
*/
1916
const SEARCH_MAX_NUM_RESULTS = 1000;
2017

21-
type UpdateSearchResultsMetaProps = {
22-
fields: Partial<SearchResultsMetadataDocument>;
23-
jobId: number;
24-
lastSignal: SEARCH_SIGNAL;
25-
logger: FastifyBaseLogger;
26-
searchResultsMetadataCollection: Collection<SearchResultsMetadataDocument>;
27-
};
28-
2918
type UpdateSearchSignalWhenJobsFinishProps = {
3019
aggregationJobId: number;
3120
logger: FastifyBaseLogger;
@@ -46,6 +35,5 @@ export {
4635
CreateMongoIndexesProps,
4736
SEARCH_MAX_NUM_RESULTS,
4837
SearchResultsMetadataDocument,
49-
UpdateSearchResultsMetaProps,
5038
UpdateSearchSignalWhenJobsFinishProps,
5139
};

components/webui/server/src/routes/api/search/utils.ts

Lines changed: 9 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import {SEARCH_SIGNAL} from "../../../../../common/index.js";
44
import {
55
CreateMongoIndexesProps,
66
SEARCH_MAX_NUM_RESULTS,
7-
UpdateSearchResultsMetaProps,
87
UpdateSearchSignalWhenJobsFinishProps,
98
} from "./typings.js";
109

@@ -21,36 +20,6 @@ const hasCollection = async (mongoDb: Db, collectionName: string): Promise<boole
2120
return collections.some((collection: {name: string}) => collection.name === collectionName);
2221
};
2322

24-
/**
25-
* Modifies the search results metadata for a given job ID.
26-
*
27-
* @param props
28-
* @param props.fields
29-
* @param props.jobId
30-
* @param props.lastSignal
31-
* @param props.logger
32-
* @param props.searchResultsMetadataCollection
33-
*/
34-
const updateSearchResultsMeta = async ({
35-
fields,
36-
jobId,
37-
lastSignal,
38-
logger,
39-
searchResultsMetadataCollection,
40-
}: UpdateSearchResultsMetaProps) => {
41-
const filter = {
42-
_id: jobId.toString(),
43-
lastSignal: lastSignal,
44-
};
45-
46-
const modifier = {
47-
$set: fields,
48-
};
49-
50-
logger.debug("SearchResultsMetadataCollection modifier = ", modifier);
51-
await searchResultsMetadataCollection.updateOne(filter, modifier);
52-
};
53-
5423
/**
5524
* Updates the search signal when the specified job finishes.
5625
*
@@ -103,20 +72,22 @@ const updateSearchSignalWhenJobsFinish = async ({
10372
return;
10473
}
10574

106-
await updateSearchResultsMeta({
107-
fields: {
75+
const filter = {
76+
_id: searchJobId.toString(),
77+
lastSignal: SEARCH_SIGNAL.RESP_QUERYING,
78+
};
79+
const modifier = {
80+
$set: {
10881
lastSignal: SEARCH_SIGNAL.RESP_DONE,
10982
errorMsg: errorMsg,
11083
numTotalResults: Math.min(
11184
numResultsInCollection,
11285
SEARCH_MAX_NUM_RESULTS
11386
),
11487
},
115-
jobId: searchJobId,
116-
lastSignal: SEARCH_SIGNAL.RESP_QUERYING,
117-
logger: logger,
118-
searchResultsMetadataCollection: searchResultsMetadataCollection,
119-
});
88+
};
89+
90+
await searchResultsMetadataCollection.updateOne(filter, modifier);
12091
};
12192

12293
/**
@@ -158,6 +129,5 @@ const createMongoIndexes = async ({
158129
export {
159130
createMongoIndexes,
160131
hasCollection,
161-
updateSearchResultsMeta,
162132
updateSearchSignalWhenJobsFinish,
163133
};

0 commit comments

Comments
 (0)