Skip to content

Commit 18c502c

Browse files
committed
[B] Adjust ingestion message polling timestamps + end logic
1 parent 185dfaa commit 18c502c

File tree

2 files changed

+23
-12
lines changed

2 files changed

+23
-12
lines changed

client/src/backend/containers/ingestion/Ingest.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ export default function IngestContainer({ sectionIngest, refresh }) {
5353

5454
if (!errors) {
5555
setLog("Ready to ingest...");
56+
setAction(null);
5657
} else {
5758
setErrorNotification(errors);
5859
}

client/src/backend/containers/ingestion/useFetchIngestionMessages.js

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ export default function useFetchIngestionMessages(id, setLog, setAction) {
77
const [loading, setLoading] = useState(false);
88
const logIds = useRef([]);
99

10+
const startTime = useRef(null);
11+
1012
const endPolling = useCallback(() => {
1113
if (intervalRef.current) {
1214
clearInterval(intervalRef.current);
@@ -24,6 +26,16 @@ export default function useFetchIngestionMessages(id, setLog, setAction) {
2426
[setLog]
2527
);
2628

29+
const checkForEnd = useCallback(
30+
message => {
31+
if (message[1]?.includes("Complete")) {
32+
endPolling();
33+
setAction("end");
34+
}
35+
},
36+
[endPolling, setAction]
37+
);
38+
2739
const processMessage = useCallback(
2840
message => {
2941
if (!message.id || !message.payload) return;
@@ -32,17 +44,11 @@ export default function useFetchIngestionMessages(id, setLog, setAction) {
3244
logIds.current = [...logIds.current, message.id];
3345

3446
if (message.kind === "log") {
35-
return appendToLog(message.payload);
36-
}
37-
38-
if (message.kind === "message") {
39-
if (message.payload === "END_ACTION") {
40-
endPolling();
41-
return setAction("end");
42-
}
47+
appendToLog(message.payload);
48+
checkForEnd(message.payload);
4349
}
4450
},
45-
[endPolling, appendToLog, setAction, logIds]
51+
[appendToLog, logIds, checkForEnd]
4652
);
4753

4854
const fetchMessages = useApiCallback(ingestionsAPI.messages, {
@@ -51,12 +57,16 @@ export default function useFetchIngestionMessages(id, setLog, setAction) {
5157

5258
const startPolling = useCallback(() => {
5359
if (intervalRef.current) return;
60+
5461
setLoading(true);
62+
63+
const now = new Date();
64+
now.setSeconds(now.getSeconds() - 5);
65+
startTime.current = now.toISOString();
66+
5567
intervalRef.current = setInterval(async () => {
5668
try {
57-
const time = new Date();
58-
time.setSeconds(time.getSeconds() - 5);
59-
const { data, errors } = await fetchMessages(id, time.toISOString());
69+
const { data, errors } = await fetchMessages(id, startTime.current);
6070

6171
if (!errors) {
6272
data.forEach(m => processMessage(m.attributes));

0 commit comments

Comments
 (0)