Skip to content

Commit 4930839

Browse files
committed
- track uncommitted crawl ids in separate key
- cleanup uncommitted crawl id keys when crawl is canceled - simplify final exit checks, for operations on final crawler exit
1 parent 7047462 commit 4930839

File tree

3 files changed

+61
-12
lines changed

3 files changed

+61
-12
lines changed

src/crawler.ts

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,7 @@ export class Crawler {
644644
if (!finished) {
645645
if (canceled) {
646646
status = "canceled";
647+
await this.cleanupOnCancel();
647648
} else if (stopped) {
648649
status = "done";
649650
logger.info("Crawl gracefully stopped on request");
@@ -1612,7 +1613,17 @@ self.__bx_behaviors.selectMainBehavior();
16121613

16131614
async checkCanceled() {
16141615
if (this.crawlState && (await this.crawlState.isCrawlCanceled())) {
1616+
await this.cleanupOnCancel();
16151617
await this.setStatusAndExit(ExitCodes.Success, "canceled");
1618+
return true;
1619+
}
1620+
1621+
return false;
1622+
}
1623+
1624+
async cleanupOnCancel() {
1625+
if (this.deduping) {
1626+
await this.crawlState.clearUncommitted();
16161627
}
16171628
}
16181629

@@ -1659,8 +1670,7 @@ self.__bx_behaviors.selectMainBehavior();
16591670
return false;
16601671
}
16611672

1662-
if (await this.crawlState.isCrawlCanceled()) {
1663-
await this.setStatusAndExit(ExitCodes.Success, "canceled");
1673+
if (await this.checkCanceled()) {
16641674
return false;
16651675
}
16661676

@@ -1728,6 +1738,9 @@ self.__bx_behaviors.selectMainBehavior();
17281738
await this.crawlState.addSourceWACZForDedupe(filename);
17291739
}
17301740
}
1741+
if (this.deduping) {
1742+
await this.crawlState.addUncommited();
1743+
}
17311744

17321745
if (POST_CRAWL_STATES.includes(initState)) {
17331746
logger.info("crawl already finished, running post-crawl tasks", {
@@ -1905,8 +1918,7 @@ self.__bx_behaviors.selectMainBehavior();
19051918
}
19061919

19071920
const generateFiles =
1908-
!this.params.dryRun &&
1909-
(!this.interruptReason || this.finalExit || this.uploadAndDeleteLocal);
1921+
!this.params.dryRun && (this.finalExit || this.uploadAndDeleteLocal);
19101922

19111923
if (
19121924
(this.params.generateCDX || this.params.generateWACZ) &&
@@ -1949,13 +1961,18 @@ self.__bx_behaviors.selectMainBehavior();
19491961
}
19501962
}
19511963

1952-
if (this.finalExit && generateFiles && this.deduping) {
1964+
// from here, actions that should happen on final crawler exit (not temp interrupt)
1965+
if (!this.finalExit) {
1966+
return;
1967+
}
1968+
1969+
if (this.deduping) {
19531970
// commit crawl data to main index
19541971
logger.info("Committing dedupe index");
19551972
await this.crawlState.commitDedupeDone();
19561973
}
19571974

1958-
if (this.finalExit && generateFiles && this.params.saveProfile) {
1975+
if (this.params.saveProfile && generateFiles) {
19591976
const resource = await this.browser.saveProfile(
19601977
this.params.saveProfile,
19611978
this.storage,
@@ -1966,7 +1983,7 @@ self.__bx_behaviors.selectMainBehavior();
19661983
}
19671984
}
19681985

1969-
if (this.params.waitOnDone && (!this.interruptReason || this.finalExit)) {
1986+
if (this.params.waitOnDone) {
19701987
this.done = true;
19711988
logger.info("All done, waiting for signal...");
19721989
await this.crawlState.setStatus("done");

src/util/constants.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ export const EXTRACT_TEXT_TYPES = ["to-pages", "to-warc", "final-to-warc"];
2525
export const DUPE_ALL_HASH_KEY = "alldupes";
2626
export const DUPE_ALL_CRAWLS = "allcrawls";
2727
export const DUPE_ALL_COUNTS = "allcounts";
28+
export const DUPE_UNCOMMITTED = "uncommittedcrawls";
2829

2930
export enum BxFunctionBindings {
3031
BehaviorLogFunc = "__bx_log",

src/util/state.ts

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
DUPE_ALL_HASH_KEY,
1111
DUPE_ALL_CRAWLS,
1212
DUPE_ALL_COUNTS,
13+
DUPE_UNCOMMITTED,
1314
} from "./constants.js";
1415
import { ScopedSeed } from "./seeds.js";
1516
import { Frame } from "puppeteer-core";
@@ -316,12 +317,38 @@ export class RedisDedupeIndex {
316317
}
317318

318319
// add to crawls list
320+
await this.dedupeRedis.srem(DUPE_UNCOMMITTED, crawlId);
319321
await this.dedupeRedis.sadd(DUPE_ALL_CRAWLS, crawlId);
320322

321323
// add counts
322324
await this.addCrawlCounts(crawlId);
323325
}
324326

327+
// ADD UNCOMITTED CRAWL
328+
async addUncommited() {
329+
if (this.crawlId) {
330+
await this.dedupeRedis.sadd(DUPE_UNCOMMITTED, this.crawlId);
331+
}
332+
}
333+
334+
// CLEAR UNCOMMITTED
335+
async clearUncommitted() {
336+
if (this.crawlId) {
337+
await this.dedupeRedis.srem(DUPE_UNCOMMITTED, this.crawlId);
338+
await this.deleteCrawlDedupeKeys(this.crawlId);
339+
}
340+
}
341+
342+
async clearAllUncommitted() {
343+
while (true) {
344+
const crawlId = await this.dedupeRedis.spop(DUPE_UNCOMMITTED);
345+
if (!crawlId) {
346+
break;
347+
}
348+
await this.deleteCrawlDedupeKeys(crawlId);
349+
}
350+
}
351+
325352
// GET OR ADD INDIVIDUAL HASHES
326353

327354
async getHashDupe(hash: string): Promise<DedupeEntry | null> {
@@ -675,15 +702,19 @@ export class RedisDedupeIndex {
675702
// ignore
676703
}
677704
}
678-
await this.dedupeRedis.del(
679-
`h:${crawlId}`,
680-
`c:${crawlId}:wacz`,
681-
`h:${crawlId}:counts`,
682-
);
705+
await this.deleteCrawlDedupeKeys(crawlId);
683706
}
684707

685708
await this.dedupeRedis.del(TO_REMOVE_CRAWLS);
686709
}
710+
711+
private async deleteCrawlDedupeKeys(crawlId: string) {
712+
await this.dedupeRedis.del(
713+
`h:${crawlId}`,
714+
`c:${crawlId}:wacz`,
715+
`h:${crawlId}:counts`,
716+
);
717+
}
687718
}
688719

689720
// ============================================================================

0 commit comments

Comments
 (0)