1+ """Utility runner that aggregates raw items and embeddings from multiple inputs.
2+
3+ This runner receives data from DFXEmbeddingsComponent outputs and aggregates them.
4+
5+ Expected input formats from DFXEmbeddingsComponent (via NATS):
6+ 1. Single text: {"text": "...", "embeddings": [...], "model": "..."}
7+ 2. Multiple texts: {"texts": [...], "embeddings": [[...], [...]], "model": "...", "count": N}
8+ 3. Nested Data: {"data": {"text": "...", "embeddings": [...], "model": "..."}}
9+
10+ Output format:
11+ {
12+ "data": [{"text": "...", "model": "..."}, ...], # Raw items for downstream processing
13+ "embeddings": [{"id": "...", "vector": [...], "text": "...", "payload": {...}}, ...] # Vector store format
14+ }
15+ """
16+
17+ from __future__ import annotations
18+
19+ from typing import Any , Dict , Iterable , List
20+
21+
22+ class ExportEmbeddingsDataRunner :
23+ """Callable runner that merges raw data and embeddings from multiple inputs."""
24+
25+ def __call__ (self , inputs : Any , ** parameters : Any ) -> Dict [str , Any ]:
26+ data_inputs = self ._extract_data_inputs (inputs , parameters )
27+ payloads = list (self ._iter_payloads (data_inputs ))
28+
29+ merged_items = self ._collect_raw_items (payloads )
30+ merged_embeddings = self ._collect_embeddings (payloads )
31+
32+ return {
33+ "data" : merged_items ,
34+ "embeddings" : merged_embeddings ,
35+ }
36+
37+ @staticmethod
38+ def _extract_data_inputs (inputs : Any , parameters : Dict [str , Any ]) -> List [Any ]:
39+ """Prefer explicit data_inputs in parameters; fall back to request inputs."""
40+ if "data_inputs" in parameters and parameters ["data_inputs" ]:
41+ candidates = parameters ["data_inputs" ]
42+ elif inputs :
43+ candidates = inputs
44+ else :
45+ raise ValueError ("ExportEmbeddingsDataComponent requires 'data_inputs' to be provided." )
46+
47+ if isinstance (candidates , list ):
48+ return candidates
49+ return [candidates ]
50+
51+ @staticmethod
52+ def _iter_payloads (data_inputs : Iterable [Any ]) -> Iterable [dict ]:
53+ """Yield normalized payloads from input data.
54+
55+ Handles various input structures including:
56+ - PrecomputedEmbeddings: {"type": "PrecomputedEmbeddings", "vectors": [...], "texts": [...]}
57+ - Data objects: {"data": {...}}
58+ - Direct embedding payloads: {"text": "...", "embeddings": [...]}
59+ """
60+ for item in data_inputs :
61+ if isinstance (item , dict ):
62+ # Check for PrecomputedEmbeddings format
63+ if item .get ("type" ) == "PrecomputedEmbeddings" :
64+ # This is a merged PrecomputedEmbeddings object
65+ yield item
66+ # Check for nested "data" key (from Data.data serialization)
67+ elif "data" in item and isinstance (item ["data" ], dict ):
68+ inner = item ["data" ]
69+ # Check if inner data is PrecomputedEmbeddings
70+ if inner .get ("type" ) == "PrecomputedEmbeddings" :
71+ yield inner
72+ else :
73+ yield inner
74+ else :
75+ yield item
76+ else :
77+ yield {"value" : item }
78+
79+ @staticmethod
80+ def _collect_raw_items (payloads : List [dict ]) -> List [Any ]:
81+ """Collect raw data items (text + metadata) from payloads."""
82+ merged : List [Any ] = []
83+
84+ for payload in payloads :
85+ # Handle PrecomputedEmbeddings format
86+ if payload .get ("type" ) == "PrecomputedEmbeddings" :
87+ texts = payload .get ("texts" , [])
88+ vectors = payload .get ("vectors" , []) or payload .get ("embeddings" , [])
89+ for i , text in enumerate (texts ):
90+ item = {"text" : text }
91+ if i < len (vectors ):
92+ item ["embeddings" ] = vectors [i ]
93+ merged .append (item )
94+ continue
95+
96+ # Collect from explicit "items" array
97+ if isinstance (payload .get ("items" ), list ):
98+ merged .extend (payload ["items" ])
99+ continue
100+
101+ # Collect single text entry
102+ if payload .get ("text" ):
103+ merged .append ({
104+ "text" : payload .get ("text" ),
105+ "model" : payload .get ("model" ),
106+ "embeddings" : payload .get ("embeddings" ),
107+ })
108+ continue
109+
110+ # Collect multiple texts
111+ if payload .get ("texts" ) and isinstance (payload ["texts" ], list ):
112+ texts = payload ["texts" ]
113+ embeddings = payload .get ("embeddings" , []) or payload .get ("vectors" , [])
114+ model = payload .get ("model" )
115+ for i , text in enumerate (texts ):
116+ item = {"text" : text , "model" : model }
117+ if i < len (embeddings ):
118+ item ["embeddings" ] = embeddings [i ]
119+ merged .append (item )
120+ continue
121+
122+ # Check nested structure (legacy format)
123+ nested_data = (
124+ payload .get ("locals" , {})
125+ .get ("output" , {})
126+ .get ("data" , {})
127+ )
128+ if isinstance (nested_data , dict ):
129+ if isinstance (nested_data .get ("items" ), list ):
130+ merged .extend (nested_data ["items" ])
131+ elif nested_data .get ("text" ):
132+ merged .append ({
133+ "text" : nested_data .get ("text" ),
134+ "model" : nested_data .get ("model" ),
135+ "embeddings" : nested_data .get ("embeddings" ),
136+ })
137+
138+ return merged
139+
140+ @staticmethod
141+ def _collect_embeddings (payloads : List [dict ]) -> List [dict ]:
142+ """Collect embeddings in vector-store compatible format.
143+
144+ Returns list of entries with:
145+ - id: Deterministic hash of text content
146+ - vector: Embedding array (for vector DBs)
147+ - text: Original text
148+ - payload: Metadata dict (for Qdrant)
149+ - metadata: Same as payload (for other DBs)
150+ """
151+ import hashlib
152+
153+ merged : List [dict [str , Any ]] = []
154+
155+ def to_entry (text : str , vector : List [float ], extra_metadata : dict | None = None ) -> dict [str , Any ]:
156+ """Create a vector-store compatible entry."""
157+ # Normalize text
158+ if isinstance (text , dict ):
159+ text = text .get ("text" , str (text ))
160+ elif isinstance (text , list ):
161+ # Join list items
162+ text = " | " .join (
163+ item .get ("title" , item .get ("text" , str (item )))
164+ if isinstance (item , dict ) else str (item )
165+ for item in text
166+ )
167+ else :
168+ text = str (text ) if text else ""
169+
170+ # Generate deterministic ID from text content
171+ text_hash = hashlib .md5 (text .encode ()).hexdigest ()[:16 ]
172+ entry_id = f"emb-{ text_hash } "
173+
174+ # Build metadata
175+ metadata = {"text" : text }
176+ if extra_metadata :
177+ for key , value in extra_metadata .items ():
178+ if key not in {"embeddings" , "vector" , "vectors" , "type" , "texts" }:
179+ metadata [key ] = value
180+
181+ return {
182+ "id" : entry_id ,
183+ "vector" : vector , # Standard field name for vector DBs
184+ "text" : text ,
185+ "payload" : metadata , # Qdrant uses "payload"
186+ "metadata" : metadata , # Other DBs use "metadata"
187+ "embeddings" : vector , # Backward compatibility
188+ }
189+
190+ for payload in payloads :
191+ # Handle PrecomputedEmbeddings format (from DFXEmbeddingsComponent)
192+ if payload .get ("type" ) == "PrecomputedEmbeddings" :
193+ vectors = payload .get ("vectors" , [])
194+ texts = payload .get ("texts" , [])
195+
196+ for i , vector in enumerate (vectors ):
197+ if not isinstance (vector , list ):
198+ continue
199+ text = texts [i ] if i < len (texts ) else f"item_{ i } "
200+ entry = to_entry (text , vector , {"model" : payload .get ("model" )})
201+ merged .append (entry )
202+ continue
203+
204+ # Get embeddings - check both "embeddings" and "vectors" keys
205+ embeddings = payload .get ("embeddings" ) or payload .get ("vectors" )
206+
207+ if not embeddings or not isinstance (embeddings , list ):
208+ continue
209+
210+ # Check if it's a single vector (list of floats) or multiple vectors (list of lists)
211+ if embeddings and isinstance (embeddings [0 ], (int , float )):
212+ # Single vector - pair with single text
213+ text = payload .get ("text" , "" )
214+ entry = to_entry (text , embeddings , payload )
215+ merged .append (entry )
216+
217+ elif embeddings and isinstance (embeddings [0 ], list ):
218+ # Multiple vectors - pair with texts array
219+ texts = payload .get ("texts" , [])
220+
221+ for i , vector in enumerate (embeddings ):
222+ if not isinstance (vector , list ):
223+ continue
224+ # Get corresponding text
225+ text = texts [i ] if i < len (texts ) else f"item_{ i } "
226+ entry = to_entry (text , vector , {"model" : payload .get ("model" )})
227+ merged .append (entry )
228+
229+ # Also check nested structure (legacy format)
230+ nested_data = (
231+ payload .get ("locals" , {})
232+ .get ("output" , {})
233+ .get ("data" , {})
234+ )
235+ if isinstance (nested_data , dict ):
236+ nested_embeddings = nested_data .get ("embeddings" )
237+ if isinstance (nested_embeddings , list ) and nested_embeddings :
238+ if isinstance (nested_embeddings [0 ], (int , float )):
239+ text = nested_data .get ("text" , "" )
240+ entry = to_entry (text , nested_embeddings , nested_data )
241+ merged .append (entry )
242+
243+ return merged
244+
245+
246+ def get_component_runner ():
247+ """Return task metadata and the callable runner."""
248+ runner = ExportEmbeddingsDataRunner ()
249+ return (
250+ "export_embeddings_data" ,
251+ "ExportEmbeddingsDataComponent" ,
252+ "pipeline" ,
253+ runner ,
254+ )
255+
256+
257+ class ExportEmbeddingsDataComponent :
258+ """
259+ Lightweight placeholder component so Langflow's registrar can import this module.
260+
261+ The actual execution logic lives in ``ExportEmbeddingsDataRunner`` above; the
262+ backend only needs a class with a matching name to satisfy component discovery.
263+ """
264+
265+ name = "ExportEmbeddingsDataComponent"
266+ description = (
267+ "Merge outputs from multiple embedding components into separate data and "
268+ "embedding streams."
269+ )
270+
271+
272+ class DFXExportEmbeddingsDataComponent (ExportEmbeddingsDataComponent ):
273+ """
274+ Registry-facing alias that matches the component name advertised by the node.
275+
276+ DroqFlow looks up `DFXExportEmbeddingsDataComponent` in the registry, so expose
277+ this alias to ensure component discovery and routing succeed without having to
278+ duplicate implementation details.
279+ """
280+
281+ name = "DFXExportEmbeddingsDataComponent"
282+
283+
284+ __all__ = ["get_component_runner" ]
0 commit comments