Skip to content
Draft
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
8691625
dedup work:
ikreymer Aug 30, 2025
cf2d766
args: add separate --dedupIndexUrl to support separate redis for dedup
ikreymer Sep 17, 2025
c5f84fe
add indexer entrypoint:
ikreymer Sep 18, 2025
35d43a5
keep skipping dupe URLs as before
ikreymer Sep 18, 2025
0ab43db
warc writing:
ikreymer Sep 18, 2025
ab4b19f
rename --dedupStoreUrl -> redisDedupUrl
ikreymer Sep 18, 2025
b24b2f2
update to latest warcio (2.4.7) to fix issus when returning payload o…
ikreymer Sep 18, 2025
1213ed0
bump to 2.4.7
ikreymer Sep 18, 2025
e059949
tests: add dedup-basic.test for simple dedup, ensure number of revisi…
ikreymer Sep 18, 2025
03bbf69
deps update
ikreymer Sep 20, 2025
fc3f9b4
Merge branch 'main' into hash-based-dedup
ikreymer Sep 20, 2025
aa7b8a1
dedup indexing: strip hash prefix from digest, as cdx does not have it
ikreymer Sep 23, 2025
3428b16
use dedup redis for queue up wacz files that need to be updated
ikreymer Sep 23, 2025
6ca191b
dedup post requests and non-404s as well!
ikreymer Sep 25, 2025
79c9327
Merge branch 'main' into hash-based-dedup
tw4l Oct 1, 2025
a39eea0
Merge branch 'main' into hash-based-dedup
ikreymer Oct 14, 2025
3397eb1
Merge branch 'hash-based-dedup' of github.com:webrecorder/browsertrix…
ikreymer Oct 14, 2025
b9db2ef
- track source index for each hash, so entry becomes '<source index> …
ikreymer Oct 18, 2025
f506113
update to new data model:
ikreymer Oct 24, 2025
48f781c
cleanup, keep compatibility with redis 6 still
ikreymer Oct 24, 2025
99a49d5
always return wacz, store wacz depends only for current wacz
ikreymer Oct 24, 2025
6f00a2e
rename 'dedup' -> 'dedupe' for consistency
ikreymer Oct 25, 2025
532fbe3
indexer optimize: commit only if added
ikreymer Oct 25, 2025
01930ee
add removing option to also remove unused crawls if doing a full sync…
ikreymer Oct 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ ADD https://cdn.jsdelivr.net/npm/replaywebpage@${RWP_VERSION}/ui.js /app/html/rw
ADD https://cdn.jsdelivr.net/npm/replaywebpage@${RWP_VERSION}/sw.js /app/html/rwp/
ADD https://cdn.jsdelivr.net/npm/replaywebpage@${RWP_VERSION}/adblock/adblock.gz /app/html/rwp/adblock.gz

RUN chmod a+x /app/dist/main.js /app/dist/create-login-profile.js && chmod a+r /app/html/rwp/*
RUN chmod a+x /app/dist/main.js /app/dist/create-login-profile.js /app/dist/indexer.js && chmod a+r /app/html/rwp/*

RUN ln -s /app/dist/main.js /usr/bin/crawl; \
ln -s /app/dist/main.js /usr/bin/qa; \
ln -s /app/dist/create-login-profile.js /usr/bin/create-login-profile
ln -s /app/dist/create-login-profile.js /usr/bin/create-login-profile; \
ln -s /app/dist/indexer.js /usr/bin/indexer;

RUN mkdir -p /app/behaviors

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "browsertrix-crawler",
"version": "1.8.0",
"version": "1.9.0-beta.0",
"main": "browsertrix-crawler",
"type": "module",
"repository": "https://github.com/webrecorder/browsertrix-crawler",
Expand Down
34 changes: 19 additions & 15 deletions src/crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import {
} from "./util/storage.js";
import { ScreenCaster, WSTransport } from "./util/screencaster.js";
import { Screenshots } from "./util/screenshots.js";
import { initRedis } from "./util/redis.js";
import { initRedisWaitForSuccess } from "./util/redis.js";
import { logger, formatErr, LogDetails, LogContext } from "./util/logger.js";
import { WorkerState, closeWorkers, runWorkers } from "./util/worker.js";
import { sleep, timedRun, secondsElapsed } from "./util/timing.js";
Expand Down Expand Up @@ -337,32 +337,28 @@ export class Crawler {

async initCrawlState() {
const redisUrl = this.params.redisStoreUrl || "redis://localhost:6379/0";
const dedupRedisUrl = this.params.redisDedupUrl || redisUrl;

if (!redisUrl.startsWith("redis://")) {
logger.fatal(
"stateStoreUrl must start with redis:// -- Only redis-based store currently supported",
);
}

let redis;

while (true) {
try {
redis = await initRedis(redisUrl);
break;
} catch (e) {
//logger.fatal("Unable to connect to state store Redis: " + redisUrl);
logger.warn(`Waiting for redis at ${redisUrl}`, {}, "state");
await sleep(1);
}
}
const redis = await initRedisWaitForSuccess(redisUrl);

logger.debug(
`Storing state via Redis ${redisUrl} @ key prefix "${this.crawlId}"`,
{},
"state",
);

let dedupRedis = redis;

if (redisUrl !== dedupRedisUrl) {
dedupRedis = await initRedisWaitForSuccess(dedupRedisUrl);
}

logger.debug(`Max Page Time: ${this.maxPageTime} seconds`, {}, "state");

this.crawlState = new RedisCrawlState(
Expand All @@ -371,6 +367,7 @@ export class Crawler {
this.maxPageTime,
os.hostname(),
this.params.maxPageRetries,
dedupRedis,
);

if (this.params.logErrorsToRedis) {
Expand Down Expand Up @@ -1046,7 +1043,7 @@ self.__bx_behaviors.selectMainBehavior();
const { page, cdp, data, workerid, callbacks, recorder } = opts;
data.callbacks = callbacks;

const { url, seedId } = data;
const { url, seedId, depth } = data;

const auth = this.seeds[seedId].authHeader();

Expand Down Expand Up @@ -1119,6 +1116,7 @@ self.__bx_behaviors.selectMainBehavior();

if (recorder) {
recorder.pageSeed = seed;
recorder.pageSeedDepth = depth;
}

// run custom driver here, if any
Expand Down Expand Up @@ -1292,6 +1290,7 @@ self.__bx_behaviors.selectMainBehavior();
} else {
if (pageSkipped) {
await this.crawlState.markExcluded(url);
this.limitHit = false;
} else {
const retry = await this.crawlState.markFailed(url);

Expand Down Expand Up @@ -2137,7 +2136,12 @@ self.__bx_behaviors.selectMainBehavior();
// excluded in recorder
if (msg.startsWith("net::ERR_BLOCKED_BY_RESPONSE")) {
data.pageSkipped = true;
logger.warn("Page Load Blocked, skipping", { msg, loadState });
logger.warn(
"Page Load Blocked, skipping",
{ msg, loadState },
"pageStatus",
);
throw new Error("logged");
} else {
return this.pageFailed("Page Load Failed", retry, {
msg,
Expand Down
181 changes: 181 additions & 0 deletions src/indexer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
#!/usr/bin/env node

import yargs from "yargs";
import { logger } from "./util/logger.js";
import { getInfoString } from "./util/file_reader.js";
import { openAsBlob } from "node:fs";
import { WACZLoader } from "./util/wacz.js";
import { ExitCodes } from "./util/constants.js";
import { initRedisWaitForSuccess } from "./util/redis.js";
import { AsyncIterReader } from "warcio";
import { RedisDedupIndex } from "./util/state.js";

export class CrawlIndexer {
constructor() {}

initArgs() {
return yargs(process.argv)
.usage("indexer [options]")
.options({
redisDedupUrl: {
describe: "URL for remote redis instance to index into",
type: "string",
required: true,
},

sourceUrl: {
describe: "Source WACZ or Multi WACZ or Multi WACZ JSON to index",
type: "string",
required: true,
},
})
.parseSync();
}

async run() {
logger.setDebugLogging(true);

process.on("SIGINT", () => this.handleTerminate("SIGINT"));

process.on("SIGTERM", () => this.handleTerminate("SIGTERM"));

logger.info(await getInfoString());

const params = this.initArgs();

const redis = await initRedisWaitForSuccess(params.redisDedupUrl);
const dedupIndex = new RedisDedupIndex(redis);

for await (const [name, waczfile] of this.iterWACZ(params.sourceUrl)) {
await dedupIndex.addHashSource(name, waczfile);
}

let count = 0;
let res;

while ((res = await dedupIndex.nextQueuedHashSource())) {
const { id, url, total } = res;
count += 1;
const loader = new WACZLoader(url);
logger.debug(`Processing WACZ ${count} of ${total}`, { waczfile: url });
for await (const file of loader.iterFiles("indexes/")) {
const filename = file.filename;
if (filename.endsWith(".cdx.gz")) {
logger.debug("Processing CDX GZ Index", { filename });
await this.ingestCDXJ(dedupIndex, loader, filename, "gzip");
} else if (filename.endsWith(".cdx") || filename.endsWith(".cdxj")) {
logger.debug("Processing CDX Index", { filename });
await this.ingestCDXJ(dedupIndex, loader, filename);
}
}
await dedupIndex.addDoneSource(id);
}

logger.info("Done!");
await dedupIndex.markDoneImport();
process.exit(ExitCodes.Success);
}

async ingestCDXJ(
dedupIndex: RedisDedupIndex,
loader: WACZLoader,
filename: string,
compression?: string,
) {
let reader = await loader.loadFile(filename);

if (!reader) {
logger.error("File not found, skipping!");
return;
}

if (compression === "gzip") {
reader = new AsyncIterReader(reader, "gzip", false);
}

let count = 0;

for await (const line of reader.iterLines()) {
const inx = line.indexOf(" {");
if (inx < 0) {
logger.error("Skipping invalid CDXJ, no JSON", { line });
continue;
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
let cdx: Record<string, any>;

try {
cdx = JSON.parse(line.slice(inx));
} catch (e) {
logger.error("Skipping invalid CDXJ, JSON invalid", { line });
continue;
}

const date = line.split(" ", 2)[1];
const url = cdx.url;
const hash = cdx.digest;

if (url.startsWith("urn:")) {
continue;
}

// only adding originals to dedup against, don't want to dedup against existing revisits
if (cdx.mime === "warc/revisit") {
continue;
}

if (url && date && hash) {
await dedupIndex.addHashDupe(hash, url, date);
} else {
logger.warn("Skipping invalid CDXJ, data missing", {
url,
date,
digest: hash,
});
continue;
}

count += 1;
}

logger.debug("Processed", { count });
}

async *iterWACZ(url: string, name?: string): AsyncIterable<[string, string]> {
let path: string = url;

try {
path = new URL(url).pathname;
} catch (e) {
// ignore
}

if (path.endsWith(".wacz")) {
yield [name || url, url];
} else if (path.endsWith(".json")) {
if (!url.startsWith("http://") && !url.startsWith("https://")) {
const blob = await openAsBlob(url);
url = URL.createObjectURL(blob);
}

const resp = await fetch(url);
const json = await resp.json();

for (const entry of json.resources) {
if (entry.path) {
yield* this.iterWACZ(entry.path, entry.name);
}
}
} else {
logger.warn("Unknown source", { url }, "replay");
}
}

handleTerminate(signame: string) {
logger.info(`Got signal ${signame}, exiting`);
process.exit(ExitCodes.SignalInterrupted);
}
}

await new CrawlIndexer().run();
39 changes: 1 addition & 38 deletions src/replaycrawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ import { PageInfoRecord, PageInfoValue, Recorder } from "./util/recorder.js";
import fsp from "fs/promises";
import path from "path";

import { ZipRangeReader, createLoader } from "@webrecorder/wabac";

import { AsyncIterReader } from "warcio";
import { parseArgs } from "./util/argParser.js";

import { PNG } from "pngjs";
Expand All @@ -23,6 +20,7 @@ import { MAX_URL_LENGTH } from "./util/reqresp.js";
import { openAsBlob } from "fs";
import { WARCWriter } from "./util/warcwriter.js";
import { parseRx } from "./util/seeds.js";
import { WACZLoader } from "./util/wacz.js";

// RWP Replay Prefix
const REPLAY_PREFIX = "http://localhost:9990/replay/w/replay/";
Expand Down Expand Up @@ -784,38 +782,3 @@ export class ReplayCrawler extends Crawler {
return null;
}
}

class WACZLoader {
url: string;
zipreader: ZipRangeReader | null;

constructor(url: string) {
this.url = url;
this.zipreader = null;
}

async init() {
if (!this.url.startsWith("http://") && !this.url.startsWith("https://")) {
const blob = await openAsBlob(this.url);
this.url = URL.createObjectURL(blob);
}

const loader = await createLoader({ url: this.url });

this.zipreader = new ZipRangeReader(loader);
}

async loadFile(fileInZip: string) {
const { reader } = await this.zipreader!.loadFile(fileInZip);

if (!reader) {
return null;
}

if (!reader.iterLines) {
return new AsyncIterReader(reader);
}

return reader;
}
}
13 changes: 13 additions & 0 deletions src/util/argParser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,19 @@ class ArgParser {
default: "redis://localhost:6379/0",
},

redisDedupUrl: {
describe:
"If set, url for remote redis server to store state. Otherwise, using local redis instance",
type: "string",
},

minPageDedupDepth: {
describe:
"If set >= 0, minimum depth at which duplicate pages can be skipped. -1 means never skip duplicate pages",
type: "number",
default: -1,
},

saveState: {
describe:
"If the crawl state should be serialized to the crawls/ directory. Defaults to 'partial', only saved when crawl is interrupted",
Expand Down
2 changes: 2 additions & 0 deletions src/util/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ export const DETECT_SITEMAP = "<detect>";

export const EXTRACT_TEXT_TYPES = ["to-pages", "to-warc", "final-to-warc"];

export const HASH_DUPE_KEY = "dupe";

export enum BxFunctionBindings {
BehaviorLogFunc = "__bx_log",
AddLinkFunc = "__bx_addLink",
Expand Down
Loading
Loading