Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ jobs:
# Create virtual environment
uv venv
source .venv/bin/activate
# Install dependencies without editable package (workaround for hatchling issue)
uv pip install nats-py aiohttp
uv pip install pytest pytest-asyncio black ruff mypy
# Install all project dependencies
uv pip install -e ".[dev]"
# Set PYTHONPATH for imports
echo "PYTHONPATH=src" >> $GITHUB_ENV
echo "VIRTUAL_ENV=$PWD/.venv" >> $GITHUB_ENV
Expand Down
6 changes: 4 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ RUN uv pip compile pyproject.toml -o requirements.txt && \
uv pip install --system -r requirements.txt && \
uv pip install --system .

# Copy node configuration, models, and startup script
# Copy node configuration and startup script
COPY node.json ./
COPY models/ ./models/
COPY scripts/ ./scripts/
COPY start-local.sh ./

# Create models directory (models are downloaded at runtime if not present)
RUN mkdir -p ./models

# Create non-root user and make script executable
RUN useradd -m -u 1000 nodeuser && \
chown -R nodeuser:nodeuser /app && \
Expand Down
1 change: 0 additions & 1 deletion dfx/components/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,3 @@
from dfx.components.minilm_embeddings import MiniLMEmbeddingsComponent

__all__ = ["MiniLMEmbeddingsComponent", "ExportEmbeddingsDataComponent"]

9 changes: 4 additions & 5 deletions dfx/components/all-MiniLM-L6-V2/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

import argparse
import json
from pathlib import Path
from typing import Iterable, List, Tuple
import subprocess
from collections.abc import Iterable
from pathlib import Path

try:
from sentence_transformers import SentenceTransformer # type: ignore[import-not-found]
Expand Down Expand Up @@ -56,7 +56,7 @@ def load_model(model_path: Path = MODEL_PATH) -> SentenceTransformer:
return SentenceTransformer(str(model_path))


def run(texts: Iterable[str]) -> List[List[float]]:
def run(texts: Iterable[str]) -> list[list[float]]:
"""Generate embeddings for the provided texts."""
texts_list = [text for text in texts if text.strip()]
if not texts_list:
Expand Down Expand Up @@ -94,7 +94,7 @@ def main() -> None:
main()


def get_component_runner() -> Tuple[str, str, str, SentenceTransformer]:
def get_component_runner() -> tuple[str, str, str, SentenceTransformer]:
"""
Return metadata + runner for this component.

Expand All @@ -103,4 +103,3 @@ def get_component_runner() -> Tuple[str, str, str, SentenceTransformer]:
"""
model = load_model()
return ("embeddings", HF_MODEL_ID, "sentence_transformer", model)

102 changes: 52 additions & 50 deletions dfx/components/export_embeddings_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

from __future__ import annotations

from typing import Any, Dict, Iterable, List
from collections.abc import Iterable
from typing import Any


class ExportEmbeddingsDataRunner:
"""Callable runner that merges raw data and embeddings from multiple inputs."""

def __call__(self, inputs: Any, **parameters: Any) -> Dict[str, Any]:
def __call__(self, inputs: Any, **parameters: Any) -> dict[str, Any]:
data_inputs = self._extract_data_inputs(inputs, parameters)
payloads = list(self._iter_payloads(data_inputs))

Expand All @@ -35,7 +36,7 @@ def __call__(self, inputs: Any, **parameters: Any) -> Dict[str, Any]:
}

@staticmethod
def _extract_data_inputs(inputs: Any, parameters: Dict[str, Any]) -> List[Any]:
def _extract_data_inputs(inputs: Any, parameters: dict[str, Any]) -> list[Any]:
"""Prefer explicit data_inputs in parameters; fall back to request inputs."""
if "data_inputs" in parameters and parameters["data_inputs"]:
candidates = parameters["data_inputs"]
Expand All @@ -51,7 +52,7 @@ def _extract_data_inputs(inputs: Any, parameters: Dict[str, Any]) -> List[Any]:
@staticmethod
def _iter_payloads(data_inputs: Iterable[Any]) -> Iterable[dict]:
"""Yield normalized payloads from input data.

Handles various input structures including:
- PrecomputedEmbeddings: {"type": "PrecomputedEmbeddings", "vectors": [...], "texts": [...]}
- Data objects: {"data": {...}}
Expand All @@ -77,10 +78,10 @@ def _iter_payloads(data_inputs: Iterable[Any]) -> Iterable[dict]:
yield {"value": item}

@staticmethod
def _collect_raw_items(payloads: List[dict]) -> List[Any]:
def _collect_raw_items(payloads: list[dict]) -> list[Any]:
"""Collect raw data items (text + metadata) from payloads."""
merged: List[Any] = []
merged: list[Any] = []

for payload in payloads:
# Handle PrecomputedEmbeddings format
if payload.get("type") == "PrecomputedEmbeddings":
Expand All @@ -92,21 +93,23 @@ def _collect_raw_items(payloads: List[dict]) -> List[Any]:
item["embeddings"] = vectors[i]
merged.append(item)
continue

# Collect from explicit "items" array
if isinstance(payload.get("items"), list):
merged.extend(payload["items"])
continue

# Collect single text entry
if payload.get("text"):
merged.append({
"text": payload.get("text"),
"model": payload.get("model"),
"embeddings": payload.get("embeddings"),
})
merged.append(
{
"text": payload.get("text"),
"model": payload.get("model"),
"embeddings": payload.get("embeddings"),
}
)
continue

# Collect multiple texts
if payload.get("texts") and isinstance(payload["texts"], list):
texts = payload["texts"]
Expand All @@ -120,27 +123,25 @@ def _collect_raw_items(payloads: List[dict]) -> List[Any]:
continue

# Check nested structure (legacy format)
nested_data = (
payload.get("locals", {})
.get("output", {})
.get("data", {})
)
nested_data = payload.get("locals", {}).get("output", {}).get("data", {})
if isinstance(nested_data, dict):
if isinstance(nested_data.get("items"), list):
merged.extend(nested_data["items"])
elif nested_data.get("text"):
merged.append({
"text": nested_data.get("text"),
"model": nested_data.get("model"),
"embeddings": nested_data.get("embeddings"),
})

merged.append(
{
"text": nested_data.get("text"),
"model": nested_data.get("model"),
"embeddings": nested_data.get("embeddings"),
}
)

return merged

@staticmethod
def _collect_embeddings(payloads: List[dict]) -> List[dict]:
def _collect_embeddings(payloads: list[dict]) -> list[dict]:
"""Collect embeddings in vector-store compatible format.

Returns list of entries with:
- id: Deterministic hash of text content
- vector: Embedding array (for vector DBs)
Expand All @@ -149,35 +150,40 @@ def _collect_embeddings(payloads: List[dict]) -> List[dict]:
- metadata: Same as payload (for other DBs)
"""
import hashlib

merged: List[dict[str, Any]] = []

def to_entry(text: str, vector: List[float], extra_metadata: dict | None = None) -> dict[str, Any]:
merged: list[dict[str, Any]] = []

def to_entry(
text: str, vector: list[float], extra_metadata: dict | None = None
) -> dict[str, Any]:
"""Create a vector-store compatible entry."""
# Normalize text
if isinstance(text, dict):
text = text.get("text", str(text))
elif isinstance(text, list):
# Join list items
text = " | ".join(
item.get("title", item.get("text", str(item)))
if isinstance(item, dict) else str(item)
(
item.get("title", item.get("text", str(item)))
if isinstance(item, dict)
else str(item)
)
for item in text
)
else:
text = str(text) if text else ""

# Generate deterministic ID from text content
text_hash = hashlib.md5(text.encode()).hexdigest()[:16]
entry_id = f"emb-{text_hash}"

# Build metadata
metadata = {"text": text}
if extra_metadata:
for key, value in extra_metadata.items():
if key not in {"embeddings", "vector", "vectors", "type", "texts"}:
metadata[key] = value

return {
"id": entry_id,
"vector": vector, # Standard field name for vector DBs
Expand All @@ -192,32 +198,32 @@ def to_entry(text: str, vector: List[float], extra_metadata: dict | None = None)
if payload.get("type") == "PrecomputedEmbeddings":
vectors = payload.get("vectors", [])
texts = payload.get("texts", [])

for i, vector in enumerate(vectors):
if not isinstance(vector, list):
continue
text = texts[i] if i < len(texts) else f"item_{i}"
entry = to_entry(text, vector, {"model": payload.get("model")})
merged.append(entry)
continue

# Get embeddings - check both "embeddings" and "vectors" keys
embeddings = payload.get("embeddings") or payload.get("vectors")

if not embeddings or not isinstance(embeddings, list):
continue

# Check if it's a single vector (list of floats) or multiple vectors (list of lists)
if embeddings and isinstance(embeddings[0], (int, float)):
if embeddings and isinstance(embeddings[0], int | float):
# Single vector - pair with single text
text = payload.get("text", "")
entry = to_entry(text, embeddings, payload)
merged.append(entry)

elif embeddings and isinstance(embeddings[0], list):
# Multiple vectors - pair with texts array
texts = payload.get("texts", [])

for i, vector in enumerate(embeddings):
if not isinstance(vector, list):
continue
Expand All @@ -227,15 +233,11 @@ def to_entry(text: str, vector: List[float], extra_metadata: dict | None = None)
merged.append(entry)

# Also check nested structure (legacy format)
nested_data = (
payload.get("locals", {})
.get("output", {})
.get("data", {})
)
nested_data = payload.get("locals", {}).get("output", {}).get("data", {})
if isinstance(nested_data, dict):
nested_embeddings = nested_data.get("embeddings")
if isinstance(nested_embeddings, list) and nested_embeddings:
if isinstance(nested_embeddings[0], (int, float)):
if isinstance(nested_embeddings[0], int | float):
text = nested_data.get("text", "")
entry = to_entry(text, nested_embeddings, nested_data)
merged.append(entry)
Expand Down Expand Up @@ -281,4 +283,4 @@ class DFXExportEmbeddingsDataComponent(ExportEmbeddingsDataComponent):
name = "DFXExportEmbeddingsDataComponent"


__all__ = ["get_component_runner"]
__all__ = ["get_component_runner"]
Loading