Skip to content

Commit 7dfaf91

Browse files
committed
✨ Multi-modal data sdk
1 parent 7be849a commit 7dfaf91

File tree

3 files changed

+688
-0
lines changed

3 files changed

+688
-0
lines changed

sdk/nexent/multi_modal/__init__.py

Whitespace-only changes.
Lines changed: 383 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,383 @@
1+
import functools
2+
import inspect
3+
import logging
4+
from io import BytesIO
5+
from typing import Any, Callable, List, Optional, Tuple
6+
import requests
7+
8+
from .utils import (
9+
is_url,
10+
generate_object_name,
11+
detect_content_type_from_bytes,
12+
guess_extension_from_content_type,
13+
)
14+
15+
logger = logging.getLogger("multi_modal")
16+
17+
18+
# Global storage client instance
19+
_storage_client: Optional[Any] = None
20+
21+
22+
def set_storage_client(storage_client: Any) -> None:
23+
"""
24+
Initialize the global storage client for load_object and save_object decorators
25+
26+
This should be called once at application startup to set up the storage client
27+
that will be used by all decorators.
28+
29+
Args:
30+
storage_client: Storage client instance with get_file_stream() and upload_fileobj() methods
31+
"""
32+
global _storage_client
33+
_storage_client = storage_client
34+
35+
36+
def get_storage_client() -> Optional[Any]:
37+
"""
38+
Get the global storage client instance
39+
40+
Returns:
41+
Storage client instance or None if not initialized
42+
"""
43+
return _storage_client
44+
45+
46+
def parse_s3_url(s3_url: str) -> Tuple[str, str]:
47+
"""
48+
Parse S3 URL to extract bucket and object_name
49+
50+
Supports formats:
51+
- s3://bucket/key
52+
- /bucket/key (MinIO path format)
53+
54+
Args:
55+
s3_url: S3 URL string
56+
57+
Returns:
58+
Tuple[bucket, object_name]
59+
60+
Raises:
61+
ValueError: If URL format is not recognized
62+
"""
63+
if not s3_url:
64+
raise ValueError("S3 URL cannot be empty")
65+
66+
# s3:// protocol
67+
if s3_url.startswith('s3://'):
68+
parts = s3_url.replace('s3://', '').split('/', 1)
69+
if len(parts) == 2:
70+
bucket, object_name = parts
71+
if not bucket or not object_name:
72+
raise ValueError(f"Invalid s3:// URL format: {s3_url}")
73+
return bucket, object_name
74+
else:
75+
raise ValueError(f"Invalid s3:// URL format: {s3_url}")
76+
77+
# MinIO path format: /bucket/key
78+
elif s3_url.startswith('/'):
79+
parts = s3_url.lstrip('/').split('/', 1)
80+
if len(parts) == 2:
81+
bucket, object_name = parts
82+
return bucket, object_name
83+
else:
84+
raise ValueError(f"Invalid path format: {s3_url}")
85+
86+
else:
87+
raise ValueError(f"Unrecognized S3 URL format: {s3_url[:50]}...")
88+
89+
90+
def download_file_from_url(url: str, timeout: int = 30) -> Optional[bytes]:
91+
"""
92+
Download file content from S3 URL or HTTP/HTTPS URL as bytes
93+
94+
Uses the global storage client for S3 URLs. Call set_storage_client() to initialize.
95+
96+
Args:
97+
url: S3 URL (s3://bucket/key or /bucket/key) or HTTP/HTTPS URL
98+
timeout: Request timeout in seconds (for HTTP URLs)
99+
100+
Returns:
101+
File content as bytes, or None if failed
102+
103+
Raises:
104+
ValueError: If storage client is not initialized for S3 URLs
105+
"""
106+
if not url:
107+
return None
108+
109+
try:
110+
# Handle HTTP/HTTPS URLs - direct download
111+
if url.startswith(('http://', 'https://')):
112+
response = requests.get(url, timeout=timeout)
113+
response.raise_for_status()
114+
return response.content
115+
116+
# Handle S3 URLs - use global storage_client
117+
elif url.startswith('s3://') or url.startswith('/'):
118+
if _storage_client is None:
119+
raise ValueError(
120+
f"Storage client required for S3 URL, but not initialized. "
121+
f"Call set_storage_client() first. URL: {url[:50]}..."
122+
)
123+
124+
bucket, object_name = parse_s3_url(url)
125+
126+
# Get file stream from storage client
127+
if hasattr(_storage_client, 'get_file_stream'):
128+
success, stream = _storage_client.get_file_stream(object_name, bucket)
129+
if not success:
130+
raise ValueError(f"Failed to get file stream from storage: {stream}")
131+
132+
# Read stream content to bytes
133+
try:
134+
bytes_data = stream.read()
135+
if hasattr(stream, 'close'):
136+
stream.close()
137+
return bytes_data
138+
except Exception as e:
139+
raise ValueError(f"Failed to read stream content: {e}")
140+
else:
141+
raise ValueError(f"Storage client does not have get_file_stream method")
142+
143+
else:
144+
raise ValueError(f"Unsupported URL format: {url[:50]}...")
145+
146+
except Exception as e:
147+
logger.error(f"Failed to download file from URL: {e}")
148+
return None
149+
150+
151+
def _upload_bytes_to_minio(
152+
bytes_data: bytes,
153+
object_name: Optional[str] = None,
154+
bucket: str = "multi-modal",
155+
content_type: str = "application/octet-stream"
156+
) -> str:
157+
"""
158+
Upload bytes to MinIO and return file URL
159+
160+
Uses the global storage client. Call set_storage_client() to initialize.
161+
162+
Args:
163+
bytes_data: File content as bytes
164+
object_name: Object name, if not specified will be auto-generated
165+
bucket: Bucket name, if not specified use default bucket
166+
content_type: MIME type for file extension guessing
167+
168+
Returns:
169+
File URL (path format: /bucket/object_name)
170+
171+
Raises:
172+
ValueError: If upload fails or storage_client is not initialized
173+
"""
174+
if _storage_client is None:
175+
raise ValueError(
176+
"Storage client is required for uploading to MinIO. "
177+
"Call set_storage_client() first."
178+
)
179+
180+
if not hasattr(_storage_client, 'upload_fileobj'):
181+
raise ValueError("Storage client must have upload_fileobj method")
182+
183+
# Generate object name if not provided
184+
if object_name is None:
185+
file_ext = guess_extension_from_content_type(content_type)
186+
object_name = generate_object_name(file_ext)
187+
188+
# Create BytesIO object from bytes
189+
file_obj = BytesIO(bytes_data)
190+
191+
# Upload to MinIO
192+
success, result = _storage_client.upload_fileobj(file_obj, object_name, bucket)
193+
194+
if not success:
195+
raise ValueError(f"Failed to upload file to MinIO: {result}")
196+
197+
return result
198+
199+
200+
def load_object(
201+
input_names: List[str],
202+
input_data_transformer: Optional[List[Callable[[bytes], Any]]] = None,
203+
):
204+
"""
205+
Decorator: Automatically download files from S3/HTTP URLs and convert to specified format
206+
207+
Behavior:
208+
- Input should be S3 URL (s3://bucket/key or /bucket/key) or HTTP/HTTPS URL
209+
- Download file and convert to binary stream (bytes)
210+
- If input_data_transformer is provided, use custom function to convert bytes to corresponding object
211+
- If input_data_transformer is not provided, pass bytes directly to function
212+
213+
Uses the global storage client for S3 URLs. Call set_storage_client() to initialize.
214+
215+
Args:
216+
input_names: List of input parameter names to transform
217+
input_data_transformer: Optional list of custom transformer functions.
218+
Each function should accept bytes (binary stream) and return transformed object.
219+
If provided, bytes will be passed to transformer to convert to custom format.
220+
"""
221+
222+
def decorator(func: Callable):
223+
@functools.wraps(func)
224+
def wrapper(*args, **kwargs):
225+
# Get function signature and bound arguments
226+
sig = inspect.signature(func)
227+
bound_args = sig.bind(*args, **kwargs)
228+
bound_args.apply_defaults()
229+
230+
# Iterate through all parameters that need transformation
231+
for i, param_name in enumerate(input_names):
232+
if param_name in bound_args.arguments:
233+
original_data = bound_args.arguments[param_name]
234+
235+
# Skip if data is None or empty
236+
if original_data is None:
237+
continue
238+
239+
# Check if it's a URL - download and convert to bytes
240+
if isinstance(original_data, str) and is_url(original_data):
241+
try:
242+
# Download file as bytes (binary stream)
243+
bytes_data = download_file_from_url(original_data)
244+
245+
if bytes_data is None:
246+
raise ValueError(f"Failed to download file from URL: {original_data}")
247+
248+
# If custom transformer is provided, use it to convert bytes to object
249+
if input_data_transformer and i < len(input_data_transformer):
250+
transformer_func = input_data_transformer[i]
251+
try:
252+
transformed_data = transformer_func(bytes_data)
253+
logger.info(f"Downloaded {param_name} from URL and transformed using {transformer_func.__name__}")
254+
except Exception as e:
255+
logger.error(f"Error transforming {param_name} with {transformer_func.__name__}: {e}")
256+
raise e
257+
else:
258+
# No transformer: pass bytes directly
259+
transformed_data = bytes_data
260+
logger.info(f"Downloaded {param_name} from URL as bytes (binary stream)")
261+
except Exception as e:
262+
logger.error(f"Error downloading {param_name} from URL: {e}")
263+
raise e
264+
else:
265+
# Not a URL: raise error or pass through?
266+
# For now, we'll raise an error since load_object is specifically for URLs
267+
raise ValueError(
268+
f"Parameter '{param_name}' is not a URL. "
269+
f"load_object decorator expects S3 or HTTP/HTTPS URLs. "
270+
f"Got: {type(original_data).__name__}"
271+
)
272+
273+
# Update parameter value with transformed result
274+
bound_args.arguments[param_name] = transformed_data
275+
276+
# Call original function with transformed parameters
277+
return func(*bound_args.args, **bound_args.kwargs)
278+
279+
return wrapper
280+
281+
return decorator
282+
283+
284+
def save_object(
285+
output_names: List[str],
286+
output_transformers: Optional[List[Callable[[Any], bytes]]] = None,
287+
bucket: str = "multi-modal"
288+
):
289+
"""
290+
Decorator: Automatically upload function return values to MinIO and return file URLs
291+
292+
Behavior:
293+
- Function return values are expected to be bytes by default, which will be uploaded directly to MinIO
294+
- If output_transformers is provided, use custom functions to convert return values to bytes first
295+
- Upload bytes to MinIO and return file URLs instead of original return values
296+
297+
Uses the global storage client. Call set_storage_client() to initialize.
298+
299+
Args:
300+
output_names: List of output result names (for logging/debugging)
301+
output_transformers: Optional list of custom transformer functions.
302+
Each function should accept the original return value and return bytes.
303+
If provided, will be used to convert return values to bytes before uploading.
304+
bucket: Bucket name, if not specified use default bucket from storage_client
305+
"""
306+
307+
def decorator(func: Callable) -> Callable:
308+
309+
@functools.wraps(func)
310+
def wrapper(*args, **kwargs):
311+
# Call original function
312+
results = func(*args, **kwargs)
313+
314+
# Normalize results to tuple
315+
if not isinstance(results, tuple):
316+
results_tuple = (results,)
317+
else:
318+
results_tuple = results
319+
320+
if len(results_tuple) != len(output_names):
321+
raise ValueError(
322+
f"Function returned {len(results_tuple)} values, "
323+
f"but expected {len(output_names)} outputs"
324+
)
325+
326+
# Process each return value
327+
uploaded_urls = []
328+
for i, (result, name) in enumerate(zip(results_tuple, output_names)):
329+
bytes_data = None
330+
331+
# If transformer is provided, use it to convert to bytes
332+
if output_transformers and i < len(output_transformers):
333+
transformer_func = output_transformers[i]
334+
try:
335+
bytes_data = transformer_func(result)
336+
if not isinstance(bytes_data, bytes):
337+
raise ValueError(
338+
f"Transformer {transformer_func.__name__} for {name} must return bytes, "
339+
f"got {type(bytes_data).__name__}"
340+
)
341+
logger.info(f"Transformed {name} using {transformer_func.__name__} to bytes")
342+
except Exception as e:
343+
logger.error(f"Error transforming {name} with {transformer_func.__name__}: {e}")
344+
raise e
345+
else:
346+
# No transformer: assume result is already bytes
347+
if not isinstance(result, bytes):
348+
raise ValueError(
349+
f"Return value for {name} must be bytes when no transformer is provided, "
350+
f"got {type(result).__name__}"
351+
)
352+
bytes_data = result
353+
logger.info(f"Using {name} as bytes directly")
354+
355+
# Detect content type from binary data (magic bytes)
356+
content_type = detect_content_type_from_bytes(bytes_data)
357+
logger.info(f"Detected content type for {name}: {content_type}")
358+
359+
# Upload to MinIO
360+
try:
361+
file_url = _upload_bytes_to_minio(
362+
bytes_data,
363+
object_name=None, # Auto-generate object name
364+
content_type=content_type,
365+
bucket=bucket
366+
)
367+
logger.info(f"Uploaded {name} to MinIO: {file_url}")
368+
uploaded_urls.append(file_url)
369+
except Exception as e:
370+
logger.error(f"Error uploading {name} to MinIO: {e}")
371+
raise e
372+
373+
# Return tuple if original result was a tuple, otherwise return single value
374+
if len(uploaded_urls) == 1:
375+
return uploaded_urls[0]
376+
return tuple(uploaded_urls)
377+
378+
return wrapper
379+
380+
return decorator
381+
382+
383+

0 commit comments

Comments
 (0)