Skip to content

Commit 5130bf3

Browse files
committed
feat: add docker-compose.yml; embed chunks in pgvector store
1 parent fede124 commit 5130bf3

File tree

7 files changed

+221
-148
lines changed

7 files changed

+221
-148
lines changed

README.md

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,18 +175,37 @@ false, so re-embedding only happens when the crawler observed fresh content.
175175
## pgvector Store
176176

177177
Ship the embeddings into Postgres with the bundled `fastcrawl-pgvector` binary. It ingests the JSONL produced above and
178-
upserts into a `vector` table (creating the `vector` extension/table automatically unless disabled):
178+
upserts into a `vector` table (creating the `vector` extension/table automatically unless disabled). The repo now ships
179+
a `docker-compose.yml` that launches a local Postgres instance with the `pgvector` extension preinstalled:
180+
181+
```sh
182+
docker compose up -d pgvector
183+
```
184+
185+
Once the container is healthy, point `DATABASE_URL` at it and run the loader:
186+
187+
```fish
188+
set -gx DATABASE_URL postgres://postgres:postgres@localhost:5432/fastcrawl
189+
```
179190

180191
```sh
181192
export DATABASE_URL=postgres://postgres:postgres@localhost:5432/fastcrawl
193+
```
194+
195+
```sh
196+
docker-compose up;
197+
182198
cargo run --bin pgvector_store -- \
183199
--input data/wiki_embeddings.jsonl \
184200
--schema public \
185201
--table wiki_chunks \
186202
--batch-size 256 \
187-
--upsert
203+
--upsert \
204+
--database-url=postgresql://postgres:postgres@localhost:5432
188205
```
189206

207+
Stop the container with `docker compose down` (pass `-v` to remove the persisted volume if you want a clean slate).
208+
190209
Columns created by default:
191210

192211
- `url TEXT`, `chunk_id BIGINT` primary key for provenance.

docker-compose.yml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
version: "3.9"
2+
3+
services:
4+
pgvector:
5+
image: pgvector/pgvector:0.8.1-pg18-trixie
6+
container_name: fastcrawl-pgvector
7+
restart: unless-stopped
8+
environment:
9+
POSTGRES_DB: fastcrawl
10+
POSTGRES_USER: postgres
11+
POSTGRES_PASSWORD: postgres
12+
ports:
13+
- "5432:5432"
14+
volumes:
15+
- pgvector-data:/var/lib/postgresql
16+
healthcheck:
17+
test: ["CMD", "pg_isready", "-U", "postgres"]
18+
interval: 5s
19+
timeout: 3s
20+
retries: 5
21+
22+
volumes:
23+
pgvector-data:
24+
driver: local

src/bin/embedder.rs

Lines changed: 1 addition & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,8 @@ use std::time::Duration;
88
use anyhow::{anyhow, Context, Result};
99
use clap::Parser;
1010
use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender, TryRecvError};
11+
use fastcrawl::embedder::openai::OpenAiEmbedder;
1112
use fastcrawl::{EmbeddedChunkRecord, ManifestRecord, NormalizedPage};
12-
use reqwest::blocking::Client;
13-
use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION, CONTENT_TYPE};
14-
use reqwest::StatusCode;
15-
use serde::Deserialize;
16-
use serde::Serialize;
1713

1814
#[derive(Parser, Debug)]
1915
#[command(
@@ -456,144 +452,3 @@ struct EmbeddingBatchResult {
456452
}
457453

458454
type EmbeddingResult = Result<EmbeddingBatchResult>;
459-
460-
#[derive(Clone)]
461-
struct OpenAiEmbedder {
462-
client: Client,
463-
endpoint: String,
464-
model: String,
465-
dimensions: Option<usize>,
466-
max_retries: usize,
467-
batch_size: usize,
468-
}
469-
470-
impl OpenAiEmbedder {
471-
fn new(
472-
api_key: String,
473-
base_url: String,
474-
model: String,
475-
dimensions: Option<usize>,
476-
timeout: Duration,
477-
max_retries: usize,
478-
batch_size: usize,
479-
) -> Result<Self> {
480-
anyhow::ensure!(!api_key.trim().is_empty(), "missing OpenAI API key");
481-
anyhow::ensure!(!model.trim().is_empty(), "missing OpenAI model name");
482-
let mut headers = HeaderMap::new();
483-
let auth = format!("Bearer {}", api_key.trim());
484-
headers.insert(
485-
AUTHORIZATION,
486-
HeaderValue::from_str(&auth).context("invalid OpenAI API key")?,
487-
);
488-
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
489-
let client = Client::builder()
490-
.timeout(timeout)
491-
.default_headers(headers)
492-
.build()
493-
.context("failed to build OpenAI HTTP client")?;
494-
let endpoint = format!("{}/embeddings", base_url.trim_end_matches('/'));
495-
Ok(Self {
496-
client,
497-
endpoint,
498-
model,
499-
dimensions,
500-
max_retries,
501-
batch_size,
502-
})
503-
}
504-
505-
fn embed_batch(&self, inputs: &[&str]) -> Result<Vec<Vec<f32>>> {
506-
if inputs.is_empty() {
507-
return Ok(Vec::new());
508-
}
509-
anyhow::ensure!(
510-
inputs.len() <= self.batch_size,
511-
"batch of {} exceeds configured max {}",
512-
inputs.len(),
513-
self.batch_size
514-
);
515-
516-
let mut attempt = 0usize;
517-
loop {
518-
let request = EmbeddingRequest {
519-
model: &self.model,
520-
input: inputs,
521-
dimensions: self.dimensions,
522-
};
523-
let response = self.client.post(&self.endpoint).json(&request).send();
524-
match response {
525-
Ok(resp) => {
526-
let status = resp.status();
527-
if status.is_success() {
528-
let mut parsed: EmbeddingResponse = resp
529-
.json()
530-
.context("failed to parse OpenAI embedding response")?;
531-
parsed.data.sort_by_key(|entry| entry.index);
532-
anyhow::ensure!(
533-
parsed.data.len() == inputs.len(),
534-
"OpenAI returned {} embeddings for {} inputs",
535-
parsed.data.len(),
536-
inputs.len()
537-
);
538-
return Ok(parsed
539-
.data
540-
.into_iter()
541-
.map(|entry| entry.embedding)
542-
.collect());
543-
}
544-
545-
let body = resp
546-
.text()
547-
.unwrap_or_else(|_| "<body unavailable>".to_string());
548-
if self.should_retry(status) && attempt + 1 < self.max_retries {
549-
attempt += 1;
550-
thread::sleep(self.retry_backoff(attempt));
551-
continue;
552-
}
553-
anyhow::bail!("OpenAI embeddings request failed ({}): {}", status, body);
554-
}
555-
Err(err) => {
556-
if self.is_retryable_error(&err) && attempt + 1 < self.max_retries {
557-
attempt += 1;
558-
thread::sleep(self.retry_backoff(attempt));
559-
continue;
560-
}
561-
return Err(err.into());
562-
}
563-
}
564-
}
565-
}
566-
567-
fn should_retry(&self, status: StatusCode) -> bool {
568-
status == StatusCode::TOO_MANY_REQUESTS || status.is_server_error()
569-
}
570-
571-
fn is_retryable_error(&self, err: &reqwest::Error) -> bool {
572-
err.is_timeout() || err.is_connect() || err.is_body() || err.is_request() || err.is_decode()
573-
}
574-
575-
fn retry_backoff(&self, attempt: usize) -> Duration {
576-
let capped = attempt.min(5) as u32;
577-
Duration::from_millis(500 * (1 << capped))
578-
}
579-
}
580-
581-
#[derive(Serialize)]
582-
struct EmbeddingRequest<'a> {
583-
model: &'a str,
584-
#[serde(borrow)]
585-
input: &'a [&'a str],
586-
#[serde(skip_serializing_if = "Option::is_none")]
587-
dimensions: Option<usize>,
588-
}
589-
590-
#[derive(Debug, Deserialize)]
591-
struct EmbeddingResponse {
592-
data: Vec<EmbeddingData>,
593-
}
594-
595-
#[derive(Debug, Deserialize)]
596-
struct EmbeddingData {
597-
embedding: Vec<f32>,
598-
index: usize,
599-
}

src/bin/pgvector_store.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ async fn main() -> Result<()> {
6969

7070
let mut batch = Vec::with_capacity(batch_size);
7171
let Some(first_record) = next_record(&mut lines)? else {
72+
println!("No embeddings to insert; nothing to do.");
7273
return Ok(());
7374
};
7475
anyhow::ensure!(
@@ -84,18 +85,28 @@ async fn main() -> Result<()> {
8485
}
8586

8687
batch.push(first_record);
88+
let mut total_inserted = 0usize;
8789
while let Some(record) = next_record(&mut lines)? {
8890
batch.push(record);
8991
if batch.len() >= batch_size {
9092
insert_batch(&mut client, &table, &batch, cli.upsert).await?;
93+
total_inserted += batch.len();
9194
batch.clear();
9295
}
9396
}
9497

9598
if !batch.is_empty() {
9699
insert_batch(&mut client, &table, &batch, cli.upsert).await?;
100+
total_inserted += batch.len();
97101
}
98102

103+
println!(
104+
"Successfully inserted {} record{} into {}.",
105+
total_inserted,
106+
if total_inserted == 1 { "" } else { "s" },
107+
table.qualified()
108+
);
109+
99110
Ok(())
100111
}
101112

src/embedder/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
//! Embedding client implementations shared by embedding pipelines.
2+
3+
pub mod openai;

0 commit comments

Comments
 (0)