Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
clearQueryResults,
submitQuery,
} from "../../../../api/presto-search";
import useSearchStore from "../../SearchState";
import useSearchStore, {SEARCH_STATE_DEFAULT} from "../../SearchState";
import {SEARCH_UI_STATE} from "../../SearchState/typings";


Expand Down Expand Up @@ -43,7 +43,13 @@ const handlePrestoClearResults = () => {
* @param payload
*/
const handlePrestoQuerySubmit = (payload: PrestoQueryJobCreation) => {
const {updateSearchJobId, updateSearchUiState, searchUiState} = useSearchStore.getState();
const {
updateNumSearchResultsTable,
updateNumSearchResultsMetadata,
updateSearchJobId,
updateSearchUiState,
searchUiState,
} = useSearchStore.getState();

// User should NOT be able to submit a new query while an existing query is in progress.
if (
Expand All @@ -58,6 +64,8 @@ const handlePrestoQuerySubmit = (payload: PrestoQueryJobCreation) => {

handlePrestoClearResults();

updateNumSearchResultsTable(SEARCH_STATE_DEFAULT.numSearchResultsTable);
updateNumSearchResultsMetadata(SEARCH_STATE_DEFAULT.numSearchResultsMetadata);
updateSearchUiState(SEARCH_UI_STATE.QUERY_ID_PENDING);

submitQuery(payload)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ const handleQuerySubmit = (payload: QueryJobCreation) => {

store.updateNumSearchResultsTable(SEARCH_STATE_DEFAULT.numSearchResultsTable);
store.updateNumSearchResultsTimeline(SEARCH_STATE_DEFAULT.numSearchResultsTimeline);
store.updateNumSearchResultsMetadata(SEARCH_STATE_DEFAULT.numSearchResultsMetadata);
store.updateSearchUiState(SEARCH_UI_STATE.QUERY_ID_PENDING);

submitQuery(payload)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const useUpdateStateWithMetadata = () => {

switch (resultsMetadata.lastSignal) {
case SEARCH_SIGNAL.RESP_DONE:
case PRESTO_SEARCH_SIGNAL.FINISHED:
case PRESTO_SEARCH_SIGNAL.DONE:
updateSearchUiState(SEARCH_UI_STATE.DONE);
break;
case PRESTO_SEARCH_SIGNAL.FAILED:
Expand Down
169 changes: 169 additions & 0 deletions components/webui/common/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import {Type} from "@sinclair/typebox";


/**
* Unique ID for each active unique query. Multiple clients can subscribe to the same ID if the
* queries are identical. The ID is also used to represent the socket room, and MongoDB
* change stream.
*/
type QueryId = number;

/**
* Error response to event.
*/
interface Err {
error: string;
queryId?: QueryId;
}

/**
* Success response to event.
*/
interface Success<T> {
data: T;
}

/**
* Event response.
*/
type Response<T> = Err | Success<T>;


/**
* Events that the client can emit to the server.
*/
type ClientToServerEvents = {
"disconnect": () => void;
"collection::find::subscribe": (
requestArgs: {
collectionName: string;
query: object;
options: object;
},
callback: (res: Response<{queryId: QueryId; initialDocuments: object[]}>) => void) => void;
"collection::find::unsubscribe": (
requestArgs: {
queryId: QueryId;
}
) => Promise<void>;
};

/**
* Events that the server can emit to the client.
*/
interface ServerToClientEvents {
// eslint-disable-next-line no-warning-comments
// TODO: Consider replacing this with `collection::find::update${number}`, which will
// limit callbacks being triggered in the client to their respective query IDs.
"collection::find::update": (respArgs: {
queryId: QueryId;
data: object[];
}) => void;
}

/**
* Empty but required by Socket IO.
*/
// eslint-disable-next-line @typescript-eslint/no-empty-object-type
interface InterServerEvents {
}

/**
* Collection associated with each socket connection.
*/
interface SocketData {
collectionName?: string;
}

/**
* Enum of search-related signals.
*
* This includes request and response signals for various search operations and their respective
* states.
*/
enum SEARCH_SIGNAL {
NONE = "none",

REQ_CANCELLING = "req-cancelling",
REQ_CLEARING = "req-clearing",
REQ_QUERYING = "req-querying",

RESP_DONE = "resp-done",
RESP_QUERYING = "resp-querying",
}

/**
* Presto search-related signals.
*/
enum PRESTO_SEARCH_SIGNAL {
QUERYING = "QUERYING",
FAILED = "FAILED",
DONE = "DONE"
}

/**
* CLP query engines.
*/
enum CLP_QUERY_ENGINES {
CLP = "clp",
CLP_S = "clp-s",
PRESTO = "presto",
}

/**
* MongoDB document for search results metadata. `numTotalResults` is optional
* since it is only set when the search job is completed.
*/
interface SearchResultsMetadataDocument {
_id: string;

// eslint-disable-next-line no-warning-comments
// TODO: Replace with Nullable<string> when the `@common` directory refactoring is completed.
errorMsg: string | null;
errorName: string | null;
lastSignal: SEARCH_SIGNAL | PRESTO_SEARCH_SIGNAL;
numTotalResults?: number;
queryEngine: CLP_QUERY_ENGINES;
}

/**
* Presto row wrapped in a `row` property to prevent conflicts with MongoDB's `_id` field.
*/
interface PrestoRowObject {
row: Record<string, unknown>;
}

/**
* Presto search result in MongoDB.
*/
interface PrestoSearchResult extends PrestoRowObject {
_id: string;
}

/**
* Test TypeBox schema for testing dependency.
*/
// eslint-disable-next-line no-warning-comments
// TODO: Will be removed once shared server/client route types are migrated into common.
const TestTypeBoxSchema = Type.Object({
type: Type.String(),
});

export {
CLP_QUERY_ENGINES,
PRESTO_SEARCH_SIGNAL,
SEARCH_SIGNAL,
TestTypeBoxSchema,
};
export type {
ClientToServerEvents,
Err,
InterServerEvents,
PrestoRowObject,
PrestoSearchResult,
QueryId,
Response,
SearchResultsMetadataDocument,
ServerToClientEvents,
SocketData,
};
26 changes: 15 additions & 11 deletions components/webui/server/src/routes/api/presto-search/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,6 @@ const plugin: FastifyPluginAsyncTypebox = async (fastify) => {
},
query: queryString,
state: (_, queryId, stats) => {
// Type cast `presto-client` string literal type to our enum type.
const newState = stats.state as PRESTO_SEARCH_SIGNAL;
request.log.info({
searchJobId: queryId,
state: stats.state,
Expand All @@ -163,24 +161,30 @@ const plugin: FastifyPluginAsyncTypebox = async (fastify) => {
_id: queryId,
errorMsg: null,
errorName: null,
lastSignal: newState,
lastSignal: PRESTO_SEARCH_SIGNAL.QUERYING,
queryEngine: CLP_QUERY_ENGINES.PRESTO,
}).catch((err: unknown) => {
request.log.error(err, "Failed to insert Presto metadata");
});
isResolved = true;
resolve(queryId);
} else {
// Update metadata on subsequent calls
searchResultsMetadataCollection.updateOne(
{_id: queryId},
{$set: {lastSignal: newState}}
).catch((err: unknown) => {
request.log.error(err, "Failed to update Presto metadata");
});
}
},
success: () => {
if (false === isResolved) {
request.log.error(
"Presto query finished before searchJobId was resolved; "
);

return;
}
searchResultsMetadataCollection.updateOne(
{_id: searchJobId},
{$set: {lastSignal: PRESTO_SEARCH_SIGNAL.DONE}}
).catch((err: unknown) => {
request.log.error(err, "Failed to update Presto metadata");
});

request.log.info("Presto search succeeded");
},
timeout: null,
Expand Down
Loading