Skip to content

Commit c542c8f

Browse files
committed
feat: add corpus normalization feature
1 parent 94f2074 commit c542c8f

File tree

9 files changed

+1082
-43
lines changed

9 files changed

+1082
-43
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
1-
target
1+
target/
2+
personal_docs/
3+
data/

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ edition = "2021"
55

66
[features]
77
default = []
8-
multi_thread = ["scraper"]
8+
multi_thread = []
99

1010
[dependencies]
1111
heapless = { version = "0.9.1", default-features = false }
@@ -25,7 +25,10 @@ reqwest = { version = "0.12", default-features = false, features = [
2525
"stream",
2626
] }
2727
lol_html = "2.7.0"
28-
scraper = { version = "0.24.0", optional = true }
28+
scraper = { version = "0.24.0" }
29+
crc32fast = "1.4"
30+
serde = { version = "1.0", features = ["derive"] }
31+
serde_json = "1.0"
2932
clap = { version = "4.5", features = ["derive", "env"] }
30-
url = "2.5"
33+
url = { version = "2.5", features = ["serde"] }
3134
clap_builder = "4.5.51"

README.md

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,34 @@ cargo run --example wiki --features multi_thread -- --duration-secs 60
101101
- Metrics live in `src/runtime.rs` and can be extended if you need additional counters or telemetry sinks. Multi-thread
102102
runs also report `local shard enqueues` vs `remote shard links (batches)` so you can gauge partition efficiency.
103103

104+
## Corpus Normalization
105+
106+
Pass `--normalize` to stream every fetched page through the new `Normalizer` service. The pipeline writes newline-
107+
delimited JSON (metadata + cleaned text blocks + embedding-ready chunks) to `--normalize-jsonl` (default:
108+
`normalized_pages.jsonl`) and respects additional knobs:
109+
110+
```
111+
cargo run --example wiki -- \
112+
--normalize \
113+
--normalize-jsonl data/wiki.jsonl \
114+
--normalize-manifest-jsonl data/wiki_manifest.jsonl \
115+
--normalize-chunk-tokens 384 \
116+
--normalize-overlap-tokens 64
117+
```
118+
119+
Chunk and block bounds can be tuned via `--normalize-chunk-tokens`, `--normalize-overlap-tokens`, and
120+
`--normalize-max-blocks`. The JSON payload includes per-block heading context, content hashes, token estimates, and
121+
metadata such as HTTP status, language hints, and shard ownership so downstream embedding/indexing jobs can ingest it
122+
directly. When `--normalize-manifest-jsonl` is set, the runtime also appends digest records (`url`, `checksum`,
123+
`last_seen_epoch_ms`, `changed`) so incremental pipelines can diff and skip re-embedding unchanged pages.
124+
104125
## LLM-Oriented Next Steps
105126

106127
Fastcrawl is already a solid content harvester for downstream ML pipelines. Future work aimed at LLM/RAG workflows
107128
includes:
108129

109-
1. **Corpus normalization** – strip boilerplate, capture metadata, and chunk pages into consistent token windows.
130+
- [x] **Corpus normalization** – strip boilerplate, capture metadata, and chunk pages into consistent token windows.
131+
110132
2. **Embedding pipeline** – push cleaned chunks through an embedding model and store vectors (pgvector/Qdrant/Milvus)
111133
with provenance.
112134
3. **Incremental refresh** – schedule revisits, diff pages, and update embeddings so the knowledge base stays current.

src/controls.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
//! Crawl throttle and filtering controls shared across executors.
22
3+
use crate::normalizer::NormalizationConfig;
34
use clap::Parser;
45
#[cfg(feature = "multi_thread")]
56
use clap::ValueEnum;
7+
use std::path::PathBuf;
68
use std::time::Duration;
79

810
/// Tunable knobs that bound crawl behavior.
@@ -91,6 +93,34 @@ pub struct Cli {
9193
#[arg(long, env = "FASTCRAWL_DOMAINS", default_value = "en.wikipedia.org")]
9294
pub allowed_domains: String,
9395

96+
/// Enable corpus normalization pipeline and JSONL output
97+
#[arg(long, env = "FASTCRAWL_NORMALIZE", default_value_t = false)]
98+
pub normalize: bool,
99+
100+
/// Output path for normalized JSONL batches (overwrites existing file)
101+
#[arg(
102+
long,
103+
env = "FASTCRAWL_NORMALIZE_JSONL",
104+
default_value = "normalized_pages.jsonl"
105+
)]
106+
pub normalize_jsonl: String,
107+
108+
/// Optional manifest JSONL capturing per-URL digests (checksum + last seen)
109+
#[arg(long, env = "FASTCRAWL_NORMALIZE_MANIFEST")]
110+
pub normalize_manifest_jsonl: Option<PathBuf>,
111+
112+
/// Target tokens per chunk emitted by the normalizer
113+
#[arg(long, env = "FASTCRAWL_NORMALIZE_TOKENS", default_value_t = 256)]
114+
pub normalize_chunk_tokens: usize,
115+
116+
/// Token overlap between neighboring chunks
117+
#[arg(long, env = "FASTCRAWL_NORMALIZE_OVERLAP", default_value_t = 48)]
118+
pub normalize_overlap_tokens: usize,
119+
120+
/// Maximum text blocks to keep before truncating normalization
121+
#[arg(long, env = "FASTCRAWL_NORMALIZE_MAX_BLOCKS", default_value_t = 8192)]
122+
pub normalize_max_blocks: usize,
123+
94124
/// Shard partitioning strategy (multi-thread feature only)
95125
#[cfg(feature = "multi_thread")]
96126
#[arg(long, env = "FASTCRAWL_PARTITION", default_value = "hash")]
@@ -133,6 +163,29 @@ impl Cli {
133163
Duration::from_secs(self.duration_secs)
134164
}
135165

166+
/// Returns normalization settings when enabled.
167+
pub fn normalization_settings(&self) -> Option<NormalizationSettings> {
168+
if !self.normalize {
169+
return None;
170+
}
171+
172+
let chunk_target = self.normalize_chunk_tokens.max(1);
173+
let chunk_overlap = self
174+
.normalize_overlap_tokens
175+
.min(chunk_target.saturating_sub(1));
176+
let max_blocks = self.normalize_max_blocks.max(1);
177+
178+
Some(NormalizationSettings {
179+
output_path: PathBuf::from(&self.normalize_jsonl),
180+
manifest_path: self.normalize_manifest_jsonl.clone(),
181+
config: NormalizationConfig {
182+
chunk_target_tokens: chunk_target,
183+
chunk_overlap_tokens: chunk_overlap,
184+
max_blocks,
185+
},
186+
})
187+
}
188+
136189
fn domains_vec(&self) -> Vec<String> {
137190
self.allowed_domains
138191
.split(',')
@@ -179,3 +232,14 @@ pub struct PartitionSettings {
179232
/// Whether to log channel-closed warnings for cross-shard sends.
180233
pub remote_channel_logs: bool,
181234
}
235+
236+
/// Settings controlling corpus normalization outputs.
237+
#[derive(Debug, Clone)]
238+
pub struct NormalizationSettings {
239+
/// Filesystem path that will receive newline-delimited JSON.
240+
pub output_path: PathBuf,
241+
/// Optional path for digest manifest records.
242+
pub manifest_path: Option<PathBuf>,
243+
/// Chunking/cleanup configuration applied to each page.
244+
pub config: NormalizationConfig,
245+
}

src/html.rs

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,34 @@ use std::error::Error;
77
use std::fmt;
88
use std::sync::{Arc, Mutex};
99

10+
/// Result of streaming link extraction.
11+
pub struct LinkHarvest<T> {
12+
/// Accepted links transformed by the caller.
13+
pub links: Vec<T>,
14+
/// Optional raw body bytes captured during streaming.
15+
pub body: Option<Vec<u8>>,
16+
}
17+
1018
/// Streams anchor tags from an HTTP response, transforming matching `href` values with `transform`.
1119
///
1220
/// The `transform` closure runs for every `href`; returning `Some(T)` keeps the value, `None` skips
13-
/// it. Only accepted entries count against `limit`.
21+
/// it. Only accepted entries count against `limit`. When `capture_body` is true the full response
22+
/// body is buffered and returned alongside the discovered links.
1423
pub async fn stream_links<T, F>(
1524
response: Response,
1625
limit: usize,
26+
capture_body: bool,
1727
transform: F,
18-
) -> Result<Vec<T>, HtmlStreamError>
28+
) -> Result<LinkHarvest<T>, HtmlStreamError>
1929
where
2030
T: Send + 'static,
2131
F: Fn(&str) -> Option<T> + Send + Sync + 'static,
2232
{
23-
if limit == 0 {
24-
return Ok(Vec::new());
33+
if limit == 0 && !capture_body {
34+
return Ok(LinkHarvest {
35+
links: Vec::new(),
36+
body: None,
37+
});
2538
}
2639

2740
let values: Arc<Mutex<Vec<T>>> = Arc::new(Mutex::new(Vec::new()));
@@ -54,8 +67,12 @@ where
5467
);
5568

5669
let mut stream = response.bytes_stream();
70+
let mut body_buf = capture_body.then(Vec::new);
5771
while let Some(chunk) = stream.next().await {
5872
let chunk = chunk.map_err(HtmlStreamError::Http)?;
73+
if let Some(buf) = body_buf.as_mut() {
74+
buf.extend_from_slice(&chunk);
75+
}
5976
rewriter.write(&chunk).map_err(HtmlStreamError::Rewrite)?;
6077
}
6178
rewriter.end().map_err(HtmlStreamError::Rewrite)?;
@@ -67,7 +84,10 @@ where
6784
.into_inner()
6885
.map_err(|_| HtmlStreamError::CollectorPoisoned)?;
6986

70-
Ok(collected)
87+
Ok(LinkHarvest {
88+
links: collected,
89+
body: body_buf,
90+
})
7191
}
7292

7393
/// Errors surfaced while streaming HTML.

src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,14 @@ mod bloom;
66
pub mod controls;
77
pub mod frontier;
88
pub mod html;
9+
pub mod normalizer;
910
pub mod runtime;
1011

1112
pub use agents::{registry, AgentRegistry, CrawlTask, InlineString};
1213
pub use controls::{Cli, CrawlControls};
1314
pub use frontier::{Frontier, FrontierError, DEFAULT_FRONTIER_QUEUE, DEFAULT_FRONTIER_SEEN};
15+
pub use normalizer::{
16+
BlockKind, FetchedPage, NormalizationConfig, NormalizationError, NormalizedChunk,
17+
NormalizedPage, Normalizer, PageMetadata, SectionHeading, TextBlock,
18+
};
1419
pub use runtime::run as run_crawler;

0 commit comments

Comments
 (0)