Skip to content

Commit dd29418

Browse files
committed
Slight refactorings
1 parent ce2d9ed commit dd29418

File tree

4 files changed

+50
-17
lines changed

4 files changed

+50
-17
lines changed

packages/fetchcraft-admin/src/fetchcraft/admin/ingestion/pipeline_factory.py

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
from fetchcraft.connector.filesystem import FilesystemConnector
1717
from fetchcraft.index import BaseIndex, IndexFactory, VectorIndex
18-
from fetchcraft.ingestion.interfaces import QueueBackend
18+
from fetchcraft.ingestion.interfaces import QueueBackend, Source
1919
from fetchcraft.ingestion.models import IngestionJob
2020
from fetchcraft.ingestion.pipeline import TrackedIngestionPipeline
2121
from fetchcraft.ingestion.repository import JobRepository, DocumentRepository, TaskRepository
@@ -158,7 +158,12 @@ def index_id(self) -> str:
158158
return self._index_id
159159

160160
@abstractmethod
161-
def configure_pipeline(self, pipeline: TrackedIngestionPipeline) -> None:
161+
async def create_source(self, documents_path: Path) -> Source:
162+
"""Create a source for the pipeline."""
163+
pass
164+
165+
@abstractmethod
166+
async def configure_pipeline(self, pipeline: TrackedIngestionPipeline) -> None:
162167
"""
163168
Configure the pipeline with transformations and sinks.
164169
@@ -219,12 +224,12 @@ async def create_pipeline(
219224
self._index_id = index_id
220225

221226
full_source_path = self._document_root / job.source_path
222-
connector = FilesystemConnector(
223-
path=full_source_path,
224-
filter=None
225-
)
227+
# connector = FilesystemConnector(
228+
# path=full_source_path,
229+
# filter=None
230+
# )
226231

227-
directories = await connector.list_directories()
232+
# directories = await connector.list_directories()
228233

229234
# Create base pipeline with system dependencies
230235
pipeline = TrackedIngestionPipeline(
@@ -235,19 +240,21 @@ async def create_pipeline(
235240
task_repo=self.task_repo,
236241
num_workers=self.num_workers,
237242
callback_base_url=self.callback_base_url,
238-
context={"directories": directories}
243+
context={}
244+
# context={"directories": directories}
239245
)
240246

241247
# Configure source if requested (skip for recovery scenarios)
242248
if include_source:
243-
source = ConnectorSource(
244-
connector=connector,
245-
document_root=self._document_root,
246-
)
249+
# source = ConnectorSource(
250+
# connector=connector,
251+
# document_root=self._document_root,
252+
# )
253+
source = await self.create_source(full_source_path)
247254
pipeline.source(source)
248255

249256
# Call user's configure_pipeline to add transformations and sinks
250-
self.configure_pipeline(pipeline)
257+
await self.configure_pipeline(pipeline)
251258

252259
# Clear job-specific dependencies
253260
self._parser_map = None

packages/fetchcraft-admin/src/fetchcraft/admin/main.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@
2222
FetchcraftIngestionPipelineFactory,
2323
IngestionConfig, DefaultIndexFactory,
2424
)
25+
from fetchcraft.connector import Connector
26+
from fetchcraft.connector.filesystem import FilesystemConnector
2527
from fetchcraft.document_store import MongoDBDocumentStore
2628
from fetchcraft.embeddings import OpenAIEmbeddings
29+
from fetchcraft.ingestion import Source, ConnectorSource
2730
from fetchcraft.ingestion.pipeline import TrackedIngestionPipeline
2831
from fetchcraft.ingestion.transformations import (
2932
AsyncParsingTransformation,
@@ -51,6 +54,8 @@ class DefaultPipelineFactory(FetchcraftIngestionPipelineFactory):
5154
index_factory: DefaultIndexFactory
5255
chunker: HierarchicalNodeParser
5356
parser_map: dict[str, DocumentParser]
57+
directories: list[str] = []
58+
connector: Connector
5459

5560
def __init__(self, **kwargs):
5661
super().__init__(**kwargs)
@@ -65,13 +70,24 @@ def __init__(self, **kwargs):
6570
- Chunking transformation
6671
- Vector index sink
6772
"""
68-
69-
def configure_pipeline(self, pipeline: TrackedIngestionPipeline) -> None:
73+
74+
async def create_source(self, documents_path: Path) -> Source:
75+
"""Create a source for the pipeline."""
76+
return ConnectorSource(
77+
connector=self.connector,
78+
document_root=self._document_root,
79+
)
80+
81+
async def configure_pipeline(self, pipeline: TrackedIngestionPipeline) -> None:
7082
"""Configure the pipeline with default transformations and sinks."""
83+
if len(self.directories) == 0:
84+
self.directories = await self.connector.list_directories()
85+
7186
pipeline.add_transformation(AsyncParsingTransformation(parser_map=self.parser_map))
7287
pipeline.add_transformation(ExtractKeywords())
7388
pipeline.add_transformation(ChunkingTransformation(chunker=self.chunker))
7489
pipeline.add_sink(VectorIndexSink(index_factory=self.index_factory))
90+
pipeline.context({"directories": self.directories})
7591

7692

7793

@@ -121,6 +137,11 @@ def get_ingestion_dependencies(settings: IngestionConfig):
121137
# Build callback URL for docling async parsing
122138
callback_url = f"{settings.callback_base_url}/api/tasks/callback"
123139

140+
connector = FilesystemConnector(
141+
path=settings.documents_path,
142+
filter=None
143+
)
144+
124145
parser_map = {
125146
"default": TextFileParser(),
126147
"application/pdf": RemoteDoclingParser(
@@ -132,6 +153,7 @@ def get_ingestion_dependencies(settings: IngestionConfig):
132153
return {
133154
"index_factory": index_factory,
134155
"chunker": chunker,
156+
"connector": connector,
135157
"parser_map": parser_map,
136158
}
137159

packages/fetchcraft-core/src/fetchcraft/ingestion/pipeline.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,11 @@ def __init__(
254254
)
255255

256256
# ========== Builder API ==========
257-
257+
258+
def context(self, items: dict):
259+
self._context.update(items)
260+
return self
261+
258262
def source(self, src: Source) -> "TrackedIngestionPipeline":
259263
"""
260264
Set the pipeline source.

packages/fetchcraft-core/src/fetchcraft/ingestion/sinks/vector_index.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ async def write(self, record: DocumentRecord, context: Optional[dict] = None) ->
5555
# Get document and chunks
5656
doc = DocumentNode.model_validate(record.metadata["document"])
5757
chunk_dicts = record.metadata.get("chunks", [])
58-
vector_index = self.vector_index or self.index_factory.create_index(VectorIndex, index_id=self.index_id)
58+
vector_index = self.vector_index or self.index_factory.create_index(VectorIndex, index_id=self.index_id, context=context)
5959

6060
# Add new chunks
6161
if chunk_dicts:

0 commit comments

Comments
 (0)