diff --git a/Dockerfile b/Dockerfile index 6ba5fe8b..3620065c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -44,11 +44,12 @@ ADD https://cdn.jsdelivr.net/npm/replaywebpage@${RWP_VERSION}/ui.js /app/html/rw ADD https://cdn.jsdelivr.net/npm/replaywebpage@${RWP_VERSION}/sw.js /app/html/rwp/ ADD https://cdn.jsdelivr.net/npm/replaywebpage@${RWP_VERSION}/adblock/adblock.gz /app/html/rwp/adblock.gz -RUN chmod a+x /app/dist/main.js /app/dist/create-login-profile.js && chmod a+r /app/html/rwp/* +RUN chmod a+x /app/dist/main.js /app/dist/create-login-profile.js /app/dist/indexer.js && chmod a+r /app/html/rwp/* RUN ln -s /app/dist/main.js /usr/bin/crawl; \ ln -s /app/dist/main.js /usr/bin/qa; \ - ln -s /app/dist/create-login-profile.js /usr/bin/create-login-profile + ln -s /app/dist/create-login-profile.js /usr/bin/create-login-profile; \ + ln -s /app/dist/indexer.js /usr/bin/indexer; RUN mkdir -p /app/behaviors diff --git a/package.json b/package.json index debd1ec9..170c186f 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "browsertrix-crawler", - "version": "1.8.1", + "version": "1.9.0-beta.0", "main": "browsertrix-crawler", "type": "module", "repository": "https://github.com/webrecorder/browsertrix-crawler", diff --git a/src/crawler.ts b/src/crawler.ts index d815c3fd..22f1db41 100644 --- a/src/crawler.ts +++ b/src/crawler.ts @@ -31,7 +31,7 @@ import { } from "./util/storage.js"; import { ScreenCaster, WSTransport } from "./util/screencaster.js"; import { Screenshots } from "./util/screenshots.js"; -import { initRedis } from "./util/redis.js"; +import { initRedisWaitForSuccess } from "./util/redis.js"; import { logger, formatErr, LogDetails, LogContext } from "./util/logger.js"; import { WorkerState, closeWorkers, runWorkers } from "./util/worker.js"; import { sleep, timedRun, secondsElapsed } from "./util/timing.js"; @@ -199,6 +199,7 @@ export class Crawler { | null = null; recording: boolean; + deduping = false; constructor() { const args = this.parseArgs(); @@ -335,6 +336,9 @@ export class Crawler { async initCrawlState() { const redisUrl = this.params.redisStoreUrl || "redis://localhost:6379/0"; + const dedupeRedisUrl = this.params.redisDedupeUrl || redisUrl; + + this.deduping = dedupeRedisUrl !== redisUrl; if (!redisUrl.startsWith("redis://")) { logger.fatal( @@ -342,18 +346,7 @@ export class Crawler { ); } - let redis; - - while (true) { - try { - redis = await initRedis(redisUrl); - break; - } catch (e) { - //logger.fatal("Unable to connect to state store Redis: " + redisUrl); - logger.warn(`Waiting for redis at ${redisUrl}`, {}, "state"); - await sleep(1); - } - } + const redis = await initRedisWaitForSuccess(redisUrl); logger.debug( `Storing state via Redis ${redisUrl} @ key prefix "${this.crawlId}"`, @@ -361,6 +354,12 @@ export class Crawler { "state", ); + let dedupeRedis = redis; + + if (redisUrl !== dedupeRedisUrl) { + dedupeRedis = await initRedisWaitForSuccess(dedupeRedisUrl); + } + logger.debug(`Max Page Time: ${this.maxPageTime} seconds`, {}, "state"); this.crawlState = new RedisCrawlState( @@ -369,6 +368,7 @@ export class Crawler { this.maxPageTime, os.hostname(), this.params.maxPageRetries, + dedupeRedis, ); if (this.params.logErrorsToRedis) { @@ -1047,7 +1047,7 @@ self.__bx_behaviors.selectMainBehavior(); const { page, cdp, data, workerid, callbacks, recorder } = opts; data.callbacks = callbacks; - const { url, seedId } = data; + const { url, seedId, depth } = data; const auth = this.seeds[seedId].authHeader(); @@ -1120,6 +1120,7 @@ self.__bx_behaviors.selectMainBehavior(); if (recorder) { recorder.pageSeed = seed; + recorder.pageSeedDepth = depth; } // run custom driver here, if any @@ -1293,6 +1294,7 @@ self.__bx_behaviors.selectMainBehavior(); } else { if (pageSkipped) { await this.crawlState.markExcluded(url); + this.limitHit = false; } else { const retry = await this.crawlState.markFailed(url); @@ -1654,9 +1656,7 @@ self.__bx_behaviors.selectMainBehavior(); if (this.params.generateWACZ) { this.storage = initStorage(); - if (this.storage) { - await this.crawlState.setWACZFilename(); - } + await this.crawlState.setWACZFilename(); } if (POST_CRAWL_STATES.includes(initState)) { @@ -1843,9 +1843,20 @@ self.__bx_behaviors.selectMainBehavior(); } if (this.params.generateWACZ && generateFiles) { - const uploaded = await this.generateWACZ(); + const wacz = await this.generateWACZ(); + + if (wacz) { + if (this.deduping) { + await this.crawlState.setStatus("post-crawl"); + await this.crawlState.updateDedupeSource(wacz); + + await this.crawlState.clearDupeFileRef(); + } - if (uploaded && this.uploadAndDeleteLocal) { + await this.crawlState.clearWACZFilename(); + } + + if (wacz && this.storage && this.uploadAndDeleteLocal) { logger.info( `Uploaded WACZ, deleting local data to free up space: ${this.collDir}`, ); @@ -1883,7 +1894,7 @@ self.__bx_behaviors.selectMainBehavior(); await streamFinish(logFH); } - async generateWACZ() { + async generateWACZ(): Promise { logger.info("Generating WACZ"); await this.crawlState.setStatus("generate-wacz"); @@ -1897,11 +1908,11 @@ self.__bx_behaviors.selectMainBehavior(); if (!warcFileList.length) { // if finished, just return if (isFinished || (await this.crawlState.isCrawlCanceled())) { - return; + return null; } // possibly restarted after committing, so assume done here! if ((await this.crawlState.numDone()) > 0) { - return; + return null; } // fail crawl otherwise logger.fatal("No WARC Files, assuming crawl failed"); @@ -1921,6 +1932,8 @@ self.__bx_behaviors.selectMainBehavior(); await this.closeLog(); + const requires = await this.crawlState.getDupeDependentSources(); + const waczOpts: WACZInitOpts = { input: warcFileList.map((x) => path.join(this.archivesDir, x)), output: waczPath, @@ -1929,6 +1942,7 @@ self.__bx_behaviors.selectMainBehavior(); warcCdxDir: this.warcCdxDir, indexesDir: this.indexesDir, softwareString: this.infoString, + requires, }; if (process.env.WACZ_SIGN_URL) { @@ -1958,13 +1972,8 @@ self.__bx_behaviors.selectMainBehavior(); const targetFilename = await this.crawlState.getWACZFilename(); await this.storage.uploadCollWACZ(wacz, targetFilename, isFinished); - - await this.crawlState.clearWACZFilename(); - - return true; } - - return false; + return wacz; } catch (e) { logger.error("Error creating WACZ", e); if (!streaming) { @@ -1973,6 +1982,8 @@ self.__bx_behaviors.selectMainBehavior(); await this.setStatusAndExit(ExitCodes.UploadFailed, "interrupted"); } } + + return null; } logMemory() { @@ -2138,7 +2149,12 @@ self.__bx_behaviors.selectMainBehavior(); // excluded in recorder if (msg.startsWith("net::ERR_BLOCKED_BY_RESPONSE")) { data.pageSkipped = true; - logger.warn("Page Load Blocked, skipping", { msg, loadState }); + logger.warn( + "Page Load Blocked, skipping", + { msg, loadState }, + "pageStatus", + ); + throw new Error("logged"); } else { return this.pageFailed("Page Load Failed", retry, { msg, diff --git a/src/indexer.ts b/src/indexer.ts new file mode 100644 index 00000000..ac31f6fb --- /dev/null +++ b/src/indexer.ts @@ -0,0 +1,242 @@ +#!/usr/bin/env node +import yargs from "yargs"; +import { logger } from "./util/logger.js"; +import { getInfoString } from "./util/file_reader.js"; +import { openAsBlob } from "node:fs"; +import { WACZLoader } from "./util/wacz.js"; +import { ExitCodes } from "./util/constants.js"; +import { initRedisWaitForSuccess } from "./util/redis.js"; +import { AsyncIterReader } from "warcio"; +import { RedisDedupeIndex } from "./util/state.js"; +import { basename } from "node:path"; +import { sleep } from "./util/timing.js"; + +export type DedupeIndexEntry = { + name: string; + url: string; + crawlId?: string; + size?: number; + hash?: string; +}; + +export class CrawlIndexer { + constructor() {} + + initArgs() { + return yargs(process.argv) + .usage("indexer [options]") + .options({ + redisDedupeUrl: { + describe: "URL for remote redis instance to index into", + type: "string", + required: true, + }, + + sourceUrl: { + describe: "Source WACZ or Multi WACZ or Multi WACZ JSON to index", + type: "string", + required: true, + }, + + sourceCrawlId: { + describe: "If single WACZ, use this id as source id", + type: "string", + required: false, + }, + + removing: { + describe: "If set, also remove unsued crawls/hashes from index", + type: "boolean", + required: false, + default: false, + }, + }) + .parseSync(); + } + + async run() { + logger.setDebugLogging(true); + + process.on("SIGINT", () => this.handleTerminate("SIGINT")); + + process.on("SIGTERM", () => this.handleTerminate("SIGTERM")); + + logger.info(await getInfoString()); + + const params = this.initArgs(); + + const redis = await initRedisWaitForSuccess(params.redisDedupeUrl); + const dedupeIndex = new RedisDedupeIndex(redis, ""); + + for await (const entry of this.iterWACZ({ + url: params.sourceUrl, + name: basename(params.sourceUrl), + crawlId: params.sourceCrawlId, + })) { + await dedupeIndex.queueImportSource(entry.name, JSON.stringify(entry)); + if (params.removing && entry.crawlId) { + await dedupeIndex.markNotRemoved(entry.crawlId); + } + } + + let count = 0; + let total = 0; + let res; + + while ((res = await dedupeIndex.nextQueuedImportSource())) { + const { name, entry, remaining } = res; + if (!total) { + total = remaining; + } + const { url, crawlId, size, hash } = JSON.parse( + entry, + ) as DedupeIndexEntry; + count += 1; + const loader = new WACZLoader(url); + logger.debug(`Processing WACZ ${count} of ${total}`, { waczfile: url }); + + const crawlIdReal = crawlId || params.sourceCrawlId || url; + + await dedupeIndex.addImportedSourceForDedupe(crawlIdReal, { + filename: name, + size, + hash, + }); + + for await (const file of loader.iterFiles("indexes/")) { + const filename = file.filename; + if (filename.endsWith(".cdx.gz")) { + logger.debug("Processing CDX GZ Index", { filename }); + await this.ingestCDXJ( + dedupeIndex, + loader, + filename, + crawlIdReal, + "gzip", + ); + } else if (filename.endsWith(".cdx") || filename.endsWith(".cdxj")) { + logger.debug("Processing CDX Index", { filename }); + await this.ingestCDXJ(dedupeIndex, loader, filename, crawlIdReal); + } + } + + await dedupeIndex.markImportSourceDone(name, crawlIdReal); + } + + if (params.removing) { + const removeset = await dedupeIndex.getRemoveSet(); + if (removeset.size > 0) { + await dedupeIndex.removeCrawlIds(removeset); + } + } + + logger.info("Done!"); + await sleep(30); + await dedupeIndex.markImportFinishedTS(); + process.exit(ExitCodes.Success); + } + + async ingestCDXJ( + dedupeIndex: RedisDedupeIndex, + loader: WACZLoader, + filename: string, + crawlId: string, + compression?: string, + ) { + let reader = await loader.loadFile(filename); + + if (!reader) { + logger.error("File not found, skipping!"); + return; + } + + if (compression === "gzip") { + reader = new AsyncIterReader(reader, "gzip", false); + } + + let count = 0; + + for await (const line of reader.iterLines()) { + const inx = line.indexOf(" {"); + if (inx < 0) { + logger.error("Skipping invalid CDXJ, no JSON", { line }); + continue; + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let cdx: Record; + + try { + cdx = JSON.parse(line.slice(inx)); + } catch (e) { + logger.error("Skipping invalid CDXJ, JSON invalid", { line }); + continue; + } + + const date = line.split(" ", 2)[1]; + const url = cdx.url; + const hash = cdx.digest; + + if (url.startsWith("urn:")) { + continue; + } + + // only adding originals to dedupe against, don't want to dedupe against existing revisits + if (cdx.mime === "warc/revisit") { + continue; + } + + if (url && date && hash) { + await dedupeIndex.addHashDupe(hash, url, date, crawlId, true); + } else { + logger.warn("Skipping invalid CDXJ, data missing", { + url, + date, + digest: hash, + }); + continue; + } + + count += 1; + } + + logger.debug("Processed", { count }); + } + + async *iterWACZ(entry: DedupeIndexEntry): AsyncIterable { + let { url } = entry; + let path = url; + + try { + path = new URL(url).pathname; + } catch (e) { + // ignore + } + + if (path.endsWith(".wacz")) { + yield entry; + } else if (path.endsWith(".json")) { + if (!url.startsWith("http://") && !url.startsWith("https://")) { + const blob = await openAsBlob(url); + url = URL.createObjectURL(blob); + } + + const resp = await fetch(url); + const json = await resp.json(); + + for (const entry of json.resources) { + entry.url = entry.path; + yield* this.iterWACZ(entry); + } + } else { + logger.warn("Unknown source", { url }, "replay"); + } + } + + handleTerminate(signame: string) { + logger.info(`Got signal ${signame}, exiting`); + process.exit(ExitCodes.SignalInterrupted); + } +} + +await new CrawlIndexer().run(); diff --git a/src/replaycrawler.ts b/src/replaycrawler.ts index 819bcf39..13160c60 100644 --- a/src/replaycrawler.ts +++ b/src/replaycrawler.ts @@ -10,9 +10,6 @@ import { PageInfoRecord, PageInfoValue, Recorder } from "./util/recorder.js"; import fsp from "fs/promises"; import path from "path"; -import { ZipRangeReader, createLoader } from "@webrecorder/wabac"; - -import { AsyncIterReader } from "warcio"; import { parseArgs } from "./util/argParser.js"; import { PNG } from "pngjs"; @@ -23,6 +20,7 @@ import { MAX_URL_LENGTH } from "./util/reqresp.js"; import { openAsBlob } from "fs"; import { WARCWriter } from "./util/warcwriter.js"; import { parseRx } from "./util/seeds.js"; +import { WACZLoader } from "./util/wacz.js"; // RWP Replay Prefix const REPLAY_PREFIX = "http://localhost:9990/replay/w/replay/"; @@ -784,38 +782,3 @@ export class ReplayCrawler extends Crawler { return null; } } - -class WACZLoader { - url: string; - zipreader: ZipRangeReader | null; - - constructor(url: string) { - this.url = url; - this.zipreader = null; - } - - async init() { - if (!this.url.startsWith("http://") && !this.url.startsWith("https://")) { - const blob = await openAsBlob(this.url); - this.url = URL.createObjectURL(blob); - } - - const loader = await createLoader({ url: this.url }); - - this.zipreader = new ZipRangeReader(loader); - } - - async loadFile(fileInZip: string) { - const { reader } = await this.zipreader!.loadFile(fileInZip); - - if (!reader) { - return null; - } - - if (!reader.iterLines) { - return new AsyncIterReader(reader); - } - - return reader; - } -} diff --git a/src/util/argParser.ts b/src/util/argParser.ts index cd64e8fd..d47e9c13 100644 --- a/src/util/argParser.ts +++ b/src/util/argParser.ts @@ -437,6 +437,19 @@ class ArgParser { default: "redis://localhost:6379/0", }, + redisDedupeUrl: { + describe: + "If set, url for remote redis server to store state. Otherwise, using local redis instance", + type: "string", + }, + + minPageDedupeDepth: { + describe: + "If set >= 0, minimum depth at which duplicate pages can be skipped. -1 means never skip duplicate pages", + type: "number", + default: -1, + }, + saveState: { describe: "If the crawl state should be serialized to the crawls/ directory. Defaults to 'partial', only saved when crawl is interrupted", diff --git a/src/util/constants.ts b/src/util/constants.ts index 15b00bd7..24ac8e36 100644 --- a/src/util/constants.ts +++ b/src/util/constants.ts @@ -22,6 +22,9 @@ export const DETECT_SITEMAP = ""; export const EXTRACT_TEXT_TYPES = ["to-pages", "to-warc", "final-to-warc"]; +export const DUPE_ALL_HASH_KEY = "alldupes"; +export const DUPE_ALL_CRAWLS = "allcrawls"; + export enum BxFunctionBindings { BehaviorLogFunc = "__bx_log", AddLinkFunc = "__bx_addLink", diff --git a/src/util/recorder.ts b/src/util/recorder.ts index 96f95106..370b8674 100644 --- a/src/util/recorder.ts +++ b/src/util/recorder.ts @@ -15,6 +15,7 @@ import { removeRangeAsQuery, rewriteDASH, rewriteHLS, + tsToDate, } from "@webrecorder/wabac"; import { WARCRecord, multiValueHeader } from "warcio"; @@ -27,6 +28,7 @@ import { getProxyDispatcher } from "./proxy.js"; import { ScopedSeed } from "./seeds.js"; import EventEmitter from "events"; import { DEFAULT_MAX_RETRIES } from "./constants.js"; +import { createHash } from "crypto"; const MAX_BROWSER_DEFAULT_FETCH_SIZE = 5_000_000; const MAX_TEXT_REWRITE_SIZE = 25_000_000; @@ -140,6 +142,8 @@ export class Recorder extends EventEmitter { pageid!: string; pageSeed?: ScopedSeed; + pageSeedDepth = 0; + minPageDedupeDepth = -1; frameIdToExecId: Map | null; @@ -161,6 +165,8 @@ export class Recorder extends EventEmitter { this.shouldSaveStorage = !!crawler.params.saveStorage; + this.minPageDedupeDepth = crawler.params.minPageDedupeDepth; + this.writer = writer; this.fetcherQ = new PQueue({ concurrency: 1 }); @@ -817,6 +823,27 @@ export class Recorder extends EventEmitter { const rewritten = await this.rewriteResponse(reqresp, mimeType); + if ( + url === this.pageUrl && + reqresp.payload && + this.minPageDedupeDepth >= 0 && + this.pageSeedDepth >= this.minPageDedupeDepth + ) { + const hash = + "sha256:" + createHash("sha256").update(reqresp.payload).digest("hex"); + const res = await this.crawlState.getHashDupe(hash); + if (res) { + const { index, crawlId } = res; + const errorReason = "BlockedByResponse"; + await cdp.send("Fetch.failRequest", { + requestId, + errorReason, + }); + await this.crawlState.addDupeCrawlRef(crawlId, index); + return true; + } + } + // not rewritten, and not streaming, return false to continue if (!rewritten && !streamingConsume) { if (!reqresp.payload) { @@ -1466,11 +1493,9 @@ export class Recorder extends EventEmitter { const { url } = reqresp; const { logDetails } = this; try { - let readSize = await serializer.digestRecord(); - if (serializer.httpHeadersBuff) { - readSize -= serializer.httpHeadersBuff.length; - } - reqresp.readSize = readSize; + reqresp.readSize = await serializer.digestRecord({ + returnPayloadOnlySize: true, + }); // set truncated field and recompute header buff if (reqresp.truncated) { logger.warn( @@ -1562,14 +1587,18 @@ export class Recorder extends EventEmitter { !isRedirectStatus(status) && !(await this.crawlState.addIfNoDupe(WRITE_DUPE_KEY, url, status)) ) { - logNetwork("Skipping dupe", { url, status, ...this.logDetails }); + logNetwork("Skipping exact URL dupe in this crawl", { + url, + status, + ...this.logDetails, + }); return false; } - const responseRecord = createResponse(reqresp, pageid, iter); + let responseRecord = createResponse(reqresp, pageid, iter); const requestRecord = createRequest(reqresp, responseRecord, pageid); - const serializer = new WARCSerializer(responseRecord, { + let serializer = new WARCSerializer(responseRecord, { gzip, maxMemSize: MAX_BROWSER_DEFAULT_FETCH_SIZE, }); @@ -1580,6 +1609,7 @@ export class Recorder extends EventEmitter { ) { serializer.externalBuffer?.purge(); await this.crawlState.removeDupe(ASYNC_FETCH_DUPE_KEY, url, status); + await this.crawlState.removeDupe(WRITE_DUPE_KEY, url, status); return false; } @@ -1606,7 +1636,44 @@ export class Recorder extends EventEmitter { } } } else { - await serializer.digestRecord(); + reqresp.readSize = await serializer.digestRecord({ + returnPayloadOnlySize: true, + }); + } + + const hash = responseRecord.warcPayloadDigest || ""; + + // if (!(await this.crawlState.addIfNoDupe(WRITE_DUPE_KEY, url, hash))) { + // serializer.externalBuffer?.purge(); + // return false; + // } + + const date = responseRecord.warcDate || ""; + + const isEmpty = reqresp.readSize === 0; + + let isDupe = false; + + if (!isEmpty && url) { + const res = await this.crawlState.getHashDupe(hash); + + if (res) { + const { origUrl, origDate, crawlId, index } = res; + const date = tsToDate(origDate).toISOString(); + // always write revisit here + // duplicate URLs in same crawl filtered out separately + serializer.externalBuffer?.purge(); + ({ responseRecord, serializer } = await createRevisitForResponse( + responseRecord, + serializer, + origUrl, + date, + )); + await this.crawlState.addDupeCrawlRef(crawlId, index); + isDupe = true; + } else { + // no dupe, continue + } } let modified = false; @@ -1637,6 +1704,10 @@ export class Recorder extends EventEmitter { this.addPageRecord(reqresp); + if (!isEmpty && !isDupe) { + await this.crawlState.addHashDupe(hash, url, date); + } + return true; } } @@ -1974,14 +2045,14 @@ function createResponse( warcHeaders["WARC-Resource-Type"] = reqresp.resourceType; } - if (!contentIter) { - contentIter = [reqresp.payload] as Iterable; - } - if (Object.keys(reqresp.extraOpts).length) { warcHeaders["WARC-JSON-Metadata"] = JSON.stringify(reqresp.extraOpts); } + if (!contentIter) { + contentIter = [reqresp.payload] as Iterable; + } + return WARCRecord.create( { url, @@ -1996,6 +2067,55 @@ function createResponse( ); } +// ================================================================= +const REVISIT_COPY_HEADERS = [ + "WARC-Page-ID", + "WARC-Protocol", + "WARC-Resource-Type", + "WARC-JSON-Metadata", +]; + +// ================================================================= +// revisit +async function createRevisitForResponse( + responseRecord: WARCRecord, + serializer: WARCSerializer, + refersToUrl: string, + refersToDate: string, +) { + const payloadDigestForRevisit = responseRecord.warcPayloadDigest || ""; + + const warcHeaders: Record = {}; + + const origWarcHeaders = responseRecord.warcHeaders.headers; + + for (const header of REVISIT_COPY_HEADERS) { + if (origWarcHeaders.has(header)) { + warcHeaders[header] = origWarcHeaders.get(header)!; + } + } + + const revisitRecord = WARCRecord.create({ + url: responseRecord.warcTargetURI!, + date: responseRecord.warcDate!, + warcVersion: "WARC/1.1", + type: "revisit", + warcHeaders, + refersToUrl, + refersToDate, + }); + revisitRecord.httpHeaders = responseRecord.httpHeaders; + + serializer = new WARCSerializer(revisitRecord, { + gzip: true, + maxMemSize: MAX_BROWSER_DEFAULT_FETCH_SIZE, + }); + + await serializer.digestRecord({ payloadDigestForRevisit }); + + return { serializer, responseRecord: revisitRecord }; +} + // ================================================================= // request function createRequest( diff --git a/src/util/redis.ts b/src/util/redis.ts index 56b3bb27..325ce9ed 100644 --- a/src/util/redis.ts +++ b/src/util/redis.ts @@ -1,5 +1,6 @@ import { Redis } from "ioredis"; import { logger } from "./logger.js"; +import { sleep } from "./timing.js"; const error = console.error; @@ -34,6 +35,19 @@ export async function initRedis(url: string) { return redis; } +export async function initRedisWaitForSuccess(redisUrl: string, retrySecs = 1) { + while (true) { + try { + return await initRedis(redisUrl); + break; + } catch (e) { + //logger.fatal("Unable to connect to state store Redis: " + redisUrl); + logger.warn(`Waiting for redis at ${redisUrl}`, {}, "state"); + await sleep(retrySecs); + } + } +} + export function setExitOnRedisError() { exitOnError = true; } diff --git a/src/util/state.ts b/src/util/state.ts index 9309116a..e3454195 100644 --- a/src/util/state.ts +++ b/src/util/state.ts @@ -3,10 +3,16 @@ import { v4 as uuidv4 } from "uuid"; import { logger } from "./logger.js"; -import { MAX_DEPTH, DEFAULT_MAX_RETRIES } from "./constants.js"; +import { + MAX_DEPTH, + DEFAULT_MAX_RETRIES, + DUPE_ALL_HASH_KEY, + DUPE_ALL_CRAWLS, +} from "./constants.js"; import { ScopedSeed } from "./seeds.js"; import { Frame } from "puppeteer-core"; import { interpolateFilename } from "./storage.js"; +import { WACZ } from "./wacz.js"; // ============================================================================ export enum LoadState { @@ -26,11 +32,11 @@ export enum QueueState { // ============================================================================ // treat 0 or 206 as 200 for purposes of dedup -function normalizeDedupStatus(status: number): number { +export function normalizeDedupeStatus(status: number): string { if (status === 0 || status === 206) { - return 200; + return "200"; } - return status; + return status + ""; } // ============================================================================ @@ -185,12 +191,245 @@ export type SaveState = { }; // ============================================================================ -export class RedisCrawlState { +export type DedupeEntry = { + origDate: string; + origUrl: string; + index: string; + crawlId: string; +}; + +// ============================================================================ +export type DedupeSourceEntry = { + filename: string; + size?: number; + hash?: string; +}; + +// ============================================================================ +export class RedisDedupeIndex { + dedupeRedis: Redis; + crawlId: string; + dedupeKeyIndex = -1; + dedupeCurrFilename = ""; + + sourceDone = "src:d"; + sourceQ = "src:q"; + pendingQ = "pending:q"; + sourceP = "src:p"; + pendingPrefix = "pending:q:"; + + constructor(dedupeRedis: Redis, crawlId: string) { + this.dedupeRedis = dedupeRedis; + this.crawlId = crawlId; + } + + // DEDUPE SOURCE + + async addSourceForDedupe(filename: string) { + //const count = await this.dedupeRedis.incr(`c:${key}:count`) - 1; + const count = + (await this.dedupeRedis.rpush( + `c:${this.crawlId}:wacz`, + JSON.stringify({ filename }), + )) - 1; + this.dedupeCurrFilename = filename; + this.dedupeKeyIndex = count; + } + + async updateDedupeSource(wacz: WACZ) { + if (this.dedupeKeyIndex < 0) { + return; + } + + const value: DedupeSourceEntry = { + filename: wacz.getLocalFilename() || this.dedupeCurrFilename, + hash: wacz.getHash(), + size: wacz.getSize(), + }; + + await this.dedupeRedis.lset( + `c:${this.crawlId}:wacz`, + this.dedupeKeyIndex, + JSON.stringify(value), + ); + + await this.commitDedupeDone(); + } + + // COMMIT DEDUPE TO SHARED INDEX + + async commitDedupeDone() { + for await (const hashes of this.dedupeRedis.hscanStream( + `h:${this.crawlId}`, + )) { + let isValue = false; + for (const hash of hashes) { + if (!isValue) { + await this.dedupeRedis.hsetnx(DUPE_ALL_HASH_KEY, hash, this.crawlId); + } + isValue = !isValue; + } + } + + // add to crawls list + await this.dedupeRedis.sadd(DUPE_ALL_CRAWLS, this.crawlId); + } + + // GET OR ADD INDIVIDUAL HASHES + + async getHashDupe( + hash: string, + key = DUPE_ALL_HASH_KEY, + //url: string, + ): Promise { + hash = hash.split(":").at(-1)!; + + // first, check the shared key + let crawlId = await this.dedupeRedis.hget(key, hash); + if (!crawlId) { + // otherwise, try current crawl + crawlId = this.crawlId; + } + const value = await this.dedupeRedis.hget(`h:${crawlId}`, hash); + if (!value) { + return null; + } + const val = value.split(" "); + return { origUrl: val[2], origDate: val[1], index: val[0], crawlId }; + } + + async addHashDupe( + hash: string, + url: string, + date: string, + crawlId?: string, + commit = false, + ) { + date = date.replace(/[^\d]/g, ""); + hash = hash.split(":").at(-1)!; + const val = `${this.dedupeKeyIndex} ${date} ${url}`; + crawlId = crawlId || this.crawlId; + if ((await this.dedupeRedis.hsetnx(`h:${crawlId}`, hash, val)) && commit) { + await this.dedupeRedis.hsetnx(DUPE_ALL_HASH_KEY, hash, crawlId); + } + } + + // IMPORT + + async queueImportSource(id: string, data: string) { + // already handled this source + if (await this.dedupeRedis.sismember(this.sourceDone, id)) { + return; + } + await this.dedupeRedis.lpush(this.sourceQ, data); + } + + async addImportedSourceForDedupe(key: string, entry: DedupeSourceEntry) { + return ( + (await this.dedupeRedis.rpush(`c:${key}:wacz`, JSON.stringify(entry))) - 1 + ); + } + + async markImportSourceDone(id: string, crawlId: string) { + await this.dedupeRedis.sadd(this.sourceDone, id); + await this.dedupeRedis.sadd(DUPE_ALL_CRAWLS, crawlId); + } + + async nextQueuedImportSource() { + let res: string | null = await this.dedupeRedis.lmove( + this.sourceQ, + this.pendingQ, + "RIGHT", + "LEFT", + ); + // use circular pending Q to support retries + if (!res) { + const len = await this.dedupeRedis.llen(this.pendingQ); + for (let i = 0; i < len; i++) { + res = await this.dedupeRedis.lmove( + this.pendingQ, + this.pendingQ, + "RIGHT", + "LEFT", + ); + if (res) { + const { id } = JSON.parse(res); + if (await this.dedupeRedis.get(this.pendingPrefix + id)) { + res = null; + continue; + } else { + break; + } + } + } + } + + if (!res) { + return null; + } + + await this.dedupeRedis.lrem(this.pendingQ, 1, res); + const { name } = JSON.parse(res); + const remaining = (await this.dedupeRedis.llen(this.sourceQ)) + 1; + await this.dedupeRedis.setex(this.pendingPrefix + name, "1", 300); + return { name, entry: res, remaining }; + } + + async markImportFinishedTS() { + await this.dedupeRedis.set("last_update_ts", new Date().toISOString()); + } + + // REMOVE ON IMPORT + + async markNotRemoved(crawlId: string) { + await this.dedupeRedis.sadd("noremove", crawlId); + } + + async getRemoveSet() { + const removeSet = await this.dedupeRedis.sdiff(DUPE_ALL_CRAWLS, "noremove"); + await this.dedupeRedis.del("noremove"); + return new Set(removeSet); + } + + async removeCrawlIds(toRemove: Set) { + for await (const hashes of this.dedupeRedis.hscanStream( + DUPE_ALL_HASH_KEY, + )) { + let isValue = false; + let key = ""; + for (const hash of hashes) { + if (!isValue) { + key = hash; + } + if (key && isValue && toRemove.has(hash)) { + await this.dedupeRedis.hdel(DUPE_ALL_HASH_KEY, key); + } + isValue = !isValue; + } + } + + for (const crawlId of toRemove) { + const allWACZ = await this.dedupeRedis.lrange(`c:${crawlId}:wacz`, 0, -1); + for (const waczdata of allWACZ) { + try { + const { filename } = JSON.parse(waczdata); + await this.dedupeRedis.srem(this.sourceDone, filename); + } catch (e) { + // ignore + } + } + await this.dedupeRedis.del(`h:${crawlId}`, `c:${crawlId}:wacz`); + await this.dedupeRedis.srem(DUPE_ALL_CRAWLS, crawlId); + } + } +} + +// ============================================================================ +export class RedisCrawlState extends RedisDedupeIndex { redis: Redis; maxRetries: number; uid: string; - key: string; maxPageTime: number; qkey: string; @@ -214,32 +453,33 @@ export class RedisCrawlState { maxPageTime: number, uid: string, maxRetries?: number, + dedupeRedis?: Redis, ) { + super(dedupeRedis || redis, key); this.redis = redis; this.uid = uid; - this.key = key; this.maxPageTime = maxPageTime; this.maxRetries = maxRetries ?? DEFAULT_MAX_RETRIES; - this.qkey = this.key + ":q"; - this.pkey = this.key + ":p"; - this.skey = this.key + ":s"; + this.qkey = this.crawlId + ":q"; + this.pkey = this.crawlId + ":p"; + this.skey = this.crawlId + ":s"; // done (integer) - this.dkey = this.key + ":d"; + this.dkey = this.crawlId + ":d"; // failed final, no more retry - this.fkey = this.key + ":f"; + this.fkey = this.crawlId + ":f"; // crawler errors - this.ekey = this.key + ":e"; + this.ekey = this.crawlId + ":e"; // crawler behavior script messages - this.bkey = this.key + ":b"; + this.bkey = this.crawlId + ":b"; // pages - this.pageskey = this.key + ":pages"; + this.pageskey = this.crawlId + ":pages"; - this.esKey = this.key + ":extraSeeds"; - this.esMap = this.key + ":esMap"; + this.esKey = this.crawlId + ":extraSeeds"; + this.esMap = this.crawlId + ":esMap"; - this.sitemapDoneKey = this.key + ":sitemapDone"; + this.sitemapDoneKey = this.crawlId + ":sitemapDone"; this._initLuaCommands(this.redis); } @@ -481,29 +721,29 @@ return inx; } async setFailReason(reason: string) { - await this.redis.set(`${this.key}:failReason`, reason); + await this.redis.set(`${this.crawlId}:failReason`, reason); } async setStatus(status_: string) { - await this.redis.hset(`${this.key}:status`, this.uid, status_); + await this.redis.hset(`${this.crawlId}:status`, this.uid, status_); } async getStatus(): Promise { - return (await this.redis.hget(`${this.key}:status`, this.uid)) || ""; + return (await this.redis.hget(`${this.crawlId}:status`, this.uid)) || ""; } async setWACZFilename(): Promise { const filename = process.env.STORE_FILENAME || "@ts-@id.wacz"; - this.waczFilename = interpolateFilename(filename, this.key); + this.waczFilename = interpolateFilename(filename, this.crawlId); if ( !(await this.redis.hsetnx( - `${this.key}:nextWacz`, + `${this.crawlId}:nextWacz`, this.uid, this.waczFilename, )) ) { this.waczFilename = await this.redis.hget( - `${this.key}:nextWacz`, + `${this.crawlId}:nextWacz`, this.uid, ); logger.debug( @@ -518,6 +758,7 @@ return inx; "state", ); } + await this.addSourceForDedupe(this.waczFilename!); return this.waczFilename!; } @@ -529,20 +770,20 @@ return inx; } async clearWACZFilename(): Promise { - await this.redis.hdel(`${this.key}:nextWacz`, this.uid); + await this.redis.hdel(`${this.crawlId}:nextWacz`, this.uid); this.waczFilename = null; } async setArchiveSize(size: number) { - return await this.redis.hset(`${this.key}:size`, this.uid, size); + return await this.redis.hset(`${this.crawlId}:size`, this.uid, size); } async isCrawlStopped() { - if ((await this.redis.get(`${this.key}:stopping`)) === "1") { + if ((await this.redis.get(`${this.crawlId}:stopping`)) === "1") { return true; } - if ((await this.redis.hget(`${this.key}:stopone`, this.uid)) === "1") { + if ((await this.redis.hget(`${this.crawlId}:stopone`, this.uid)) === "1") { return true; } @@ -550,7 +791,7 @@ return inx; } async isCrawlPaused() { - if ((await this.redis.get(`${this.key}:paused`)) === "1") { + if ((await this.redis.get(`${this.crawlId}:paused`)) === "1") { return true; } @@ -558,13 +799,13 @@ return inx; } async isCrawlCanceled() { - return (await this.redis.get(`${this.key}:canceled`)) === "1"; + return (await this.redis.get(`${this.crawlId}:canceled`)) === "1"; } // note: not currently called in crawler, but could be // crawl may be stopped by setting this elsewhere in shared redis async stopCrawl() { - await this.redis.set(`${this.key}:stopping`, "1"); + await this.redis.set(`${this.crawlId}:stopping`, "1"); } async processMessage(seeds: ScopedSeed[]) { @@ -654,7 +895,7 @@ return inx; } async incFailCount() { - const key = `${this.key}:status:failcount:${this.uid}`; + const key = `${this.crawlId}:status:failcount:${this.uid}`; const res = await this.redis.incr(key); // consider failed if 3 failed retries in 60 secs @@ -1000,21 +1241,26 @@ return inx; async addIfNoDupe(key: string, url: string, status: number) { return ( - (await this.redis.sadd(key, normalizeDedupStatus(status) + "|" + url)) === - 1 + (await this.redis.sadd( + key, + normalizeDedupeStatus(status) + "|" + url, + )) === 1 ); } async removeDupe(key: string, url: string, status: number) { - return await this.redis.srem(key, normalizeDedupStatus(status) + "|" + url); + return await this.redis.srem( + key, + normalizeDedupeStatus(status) + "|" + url, + ); } async isInUserSet(value: string) { - return (await this.redis.sismember(this.key + ":user", value)) === 1; + return (await this.redis.sismember(this.crawlId + ":user", value)) === 1; } async addToUserSet(value: string) { - return (await this.redis.sadd(this.key + ":user", value)) === 1; + return (await this.redis.sadd(this.crawlId + ":user", value)) === 1; } async logError(error: string) { @@ -1101,4 +1347,33 @@ return inx; async markSitemapDone() { await this.redis.set(this.sitemapDoneKey, "1"); } + + // DEPENDENT CRAWLS FOR DEDUPE + + async addDupeCrawlRef(crawlId: string, index: string) { + await this.redis.sadd(`${this.uid}:duperef`, crawlId + " " + index); + await this.redis.sadd(`${this.crawlId}:reqCrawls`, crawlId); + } + + async clearDupeFileRef() { + await this.redis.del(`${this.uid}:duperef`); + } + + async getDupeDependentSources() { + const dependRefs = await this.redis.smembers(`${this.uid}:duperef`); + const crawlIds = []; + for (const value of dependRefs) { + const [crawlId, index] = value.split(" "); + const source = await this.dedupeRedis.lindex( + `c:${crawlId}:wacz`, + Number(index), + ); + if (crawlId && crawlId !== this.crawlId && source) { + const entry = JSON.parse(source); + entry.crawlId = crawlId; + crawlIds.push(entry); + } + } + return crawlIds; + } } diff --git a/src/util/wacz.ts b/src/util/wacz.ts index fcf4eabc..d31e2ae2 100644 --- a/src/util/wacz.ts +++ b/src/util/wacz.ts @@ -1,5 +1,5 @@ import path, { basename } from "node:path"; -import fs from "node:fs"; +import fs, { openAsBlob } from "node:fs"; import fsp from "node:fs/promises"; import { Writable, Readable } from "node:stream"; import { pipeline } from "node:stream/promises"; @@ -16,6 +16,8 @@ import { makeZip, InputWithoutMeta } from "client-zip"; import { logger, formatErr } from "./logger.js"; import { streamFinish } from "./warcwriter.js"; import { getDirSize } from "./storage.js"; +import { createLoader, ZipRangeReader } from "@webrecorder/wabac"; +import { AsyncIterReader } from "warcio"; const DATAPACKAGE_JSON = "datapackage.json"; const DATAPACKAGE_DIGEST_JSON = "datapackage-digest.json"; @@ -43,6 +45,7 @@ export type WACZInitOpts = { signingToken?: string; title?: string; description?: string; + requires?: string[]; }; export type WACZResourceEntry = { @@ -59,6 +62,7 @@ export type WACZDataPackage = { software: string; title?: string; description?: string; + relation?: { requires: string[] }; }; type WACZDigest = { @@ -105,6 +109,7 @@ export class WACZ { private size = 0; private hash: string = ""; + private localFilename = ""; constructor(config: WACZInitOpts, collDir: string) { this.warcs = config.input; @@ -129,6 +134,10 @@ export class WACZ { this.datapackage.description = config.description; } + if (config.requires && config.requires.length) { + this.datapackage.relation = { requires: config.requires }; + } + this.signingUrl = config.signingUrl || null; this.signingToken = config.signingToken || null; } @@ -192,7 +201,12 @@ export class WACZ { return this.size; } + getLocalFilename() { + return this.localFilename; + } + async generateToFile(filename: string) { + this.localFilename = path.basename(filename); await pipeline(this.generate(), fs.createWriteStream(filename)); } @@ -427,3 +441,52 @@ export async function mergeCDXJ( await removeIndexFile(INDEX_CDXJ); } } + +// ============================================================================ +export class WACZLoader { + url: string; + zipreader: ZipRangeReader | null; + + constructor(url: string) { + this.url = url; + this.zipreader = null; + } + + async init() { + if (!this.url.startsWith("http://") && !this.url.startsWith("https://")) { + const blob = await openAsBlob(this.url); + this.url = URL.createObjectURL(blob); + } + + const loader = await createLoader({ url: this.url }); + + this.zipreader = new ZipRangeReader(loader); + } + + async loadFile(fileInZip: string) { + const { reader } = await this.zipreader!.loadFile(fileInZip); + + if (!reader) { + return null; + } + + if (!reader.iterLines) { + return new AsyncIterReader(reader); + } + + return reader; + } + + async *iterFiles(prefix: string) { + if (!this.zipreader) { + await this.init(); + } + const entries = await this.zipreader!.load(); + + for (const [key, value] of Object.entries(entries)) { + if (key.startsWith(prefix)) { + yield value; + } + } + } +} diff --git a/tests/dedupe-basic.test.js b/tests/dedupe-basic.test.js new file mode 100644 index 00000000..759b1c89 --- /dev/null +++ b/tests/dedupe-basic.test.js @@ -0,0 +1,169 @@ +import {exec, execSync} from "child_process"; +import fs from "fs"; +import path from "path"; +import Redis from "ioredis"; +import { WARCParser } from "warcio"; + +function sleep(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + + +let redisId; +let numResponses = 0; + +beforeAll(() => { + execSync("docker network create dedupe"); + + redisId = execSync("docker run --rm --network=dedupe -p 37379:6379 --name dedupe-redis -d redis"); +}); + +afterAll(async () => { + execSync(`docker kill ${redisId}`); + + await sleep(3000); + + //await Promise.allSettled([crawler1, crawler2]); + + execSync("docker network rm dedupe"); +}); + +function runCrawl(name, db="0") { + fs.rmSync(`./test-crawls/collections/${name}`, { recursive: true, force: true }); + + const crawler = exec(`docker run --rm -v $PWD/test-crawls:/crawls --network=dedupe -e CRAWL_ID=${name} webrecorder/browsertrix-crawler crawl --url https://old.webrecorder.net/ --limit 4 --exclude community --collection ${name} --redisDedupeUrl redis://dedupe-redis:6379/${db} --generateWACZ`); + + return new Promise((resolve) => { + crawler.on("exit", (code) => { + resolve(code); + }); + }); +} + +function loadFirstWARC(name) { + const archiveWarcLists = fs.readdirSync( + `test-crawls/collections/${name}/archive`, + ); + + const warcName = path.join(`test-crawls/collections/${name}/archive`, archiveWarcLists[0]); + + const nodeStream = fs.createReadStream(warcName); + + const parser = new WARCParser(nodeStream); + + return parser; +} + +function loadDataPackageRelated(name) { + execSync( + `unzip test-crawls/collections/${name}/${name}.wacz -d test-crawls/collections/${name}/wacz`, + ); + + const data = fs.readFileSync( + `test-crawls/collections/${name}/wacz/datapackage.json`, + "utf8", + ); + const dataPackageJSON = JSON.parse(data); + return dataPackageJSON.relation; +} + + +test("check revisit records written on duplicate crawl", async () => { + + expect(await runCrawl("dedupe-test-orig")).toBe(0); + expect(await runCrawl("dedupe-test-dupe")).toBe(0); + + let statusCode = -1; + + let response = 0; + let revisit = 0; + + const parserOrig = loadFirstWARC("dedupe-test-orig"); + + for await (const record of parserOrig) { + if (record.warcTargetURI && record.warcTargetURI.startsWith("urn:")) { + continue; + } + + if (record.warcType === "response") { + response++; + } + } + + const dupeOrig = loadFirstWARC("dedupe-test-dupe"); + + for await (const record of dupeOrig) { + if (record.warcTargetURI && record.warcTargetURI.startsWith("urn:")) { + continue; + } + + if (record.warcType === "revisit") { + revisit++; + } + } + + expect(response).toBeGreaterThan(0); + + // revisits should match number of responses for non urn: + expect(response).toBe(revisit); + + numResponses = response; +}); + + +test("import index and crawl dupe", async () => { + + execSync(`docker run --rm -v $PWD/test-crawls:/crawls --network=dedupe webrecorder/browsertrix-crawler indexer --sourceUrl /crawls/collections/dedupe-test-orig/dedupe-test-orig.wacz --sourceCrawlId dedupe-test-orig --redisDedupeUrl redis://dedupe-redis:6379/1`); + + const redis = new Redis("redis://127.0.0.1:37379/1", { lazyConnect: true, retryStrategy: () => null }); + + await redis.connect({maxRetriesPerRequest: 50}); + + expect(await redis.hlen("alldupes")).toBe(numResponses); +}); + + +test("imported crawl dupe matches previous dupe count", async () => { + expect(await runCrawl("dedupe-test-dupe-2", 1)).toBe(0); + + const dupeOrig = loadFirstWARC("dedupe-test-dupe-2"); + + let revisit = 0; + + for await (const record of dupeOrig) { + if (record.warcTargetURI && record.warcTargetURI.startsWith("urn:")) { + continue; + } + + if (record.warcType === "revisit") { + revisit++; + } + } + + // matches same number of revisits as original + expect(revisit).toBe(numResponses); +}); + +test("test requires in datapackage.json of wacz deduped against previous crawl", () => { + const res1 = loadDataPackageRelated("dedupe-test-dupe"); + + expect(res1.requires.length).toBe(1); + const entry = res1.requires[0]; + expect(entry.crawlId).toBe("dedupe-test-orig"); + expect(entry.filename).toBe("dedupe-test-orig.wacz"); + expect(entry.size).toBeDefined(); + expect(entry.hash).toBeDefined(); +}); + +test("test requires in datapackage.json of wacz deduped against import from wacz", () => { + const res2 = loadDataPackageRelated("dedupe-test-dupe-2"); + expect(res2.requires.length).toBe(1); + const entry2 = res2.requires[0]; + expect(entry2.crawlId).toBe("dedupe-test-orig"); + expect(entry2.filename).toBe("dedupe-test-orig.wacz"); + // undefined as importing from single WACZ and not computing + expect(entry2.size).toBeUndefined(); + expect(entry2.hash).toBeUndefined(); +}); + +