-
Notifications
You must be signed in to change notification settings - Fork 10
feat(RHOAIENG-21050) Endpoints: /data/download and /data/upload #24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(RHOAIENG-21050) Endpoints: /data/download and /data/upload #24
Conversation
Reviewer's GuideThis PR delivers complete implementations for the Entity Relationship Diagram for Stored Model Data ComponentserDiagram
MODEL {
string model_id PK "e.g., gaussian-credit-model"
}
MODEL_INPUT_DATA {
string model_id FK
string execution_id "Correlates with metadata"
array input_features "Stored as NumPy array"
array feature_names
}
MODEL_OUTPUT_DATA {
string model_id FK
string execution_id "Correlates with metadata"
array output_values "Stored as NumPy array"
array output_names
}
MODEL_METADATA {
string model_id FK
string execution_id PK "Unique ID for an inference transaction"
datetime timestamp "Timestamp of the transaction"
string tag "User-defined tag"
}
GROUND_TRUTH_DATA {
string model_id FK
string execution_id FK "References MODEL_METADATA.execution_id"
array ground_truth_values "Stored as NumPy array"
array ground_truth_names
}
MODEL ||--|{ MODEL_INPUT_DATA : "has inputs stored as"
MODEL ||--|{ MODEL_OUTPUT_DATA : "has outputs stored as"
MODEL ||--|{ MODEL_METADATA : "has metadata stored as"
MODEL_METADATA }o--|| MODEL_INPUT_DATA : "corresponds to"
MODEL_METADATA }o--|| MODEL_OUTPUT_DATA : "corresponds to"
MODEL_METADATA ||--o{ GROUND_TRUTH_DATA : "can have associated"
Class Diagram for Data Transfer Objects (DTOs)classDiagram
class UploadPayload {
<<DTO (src/endpoints/data/data_upload.py)>>
+model_name: str
+data_tag: Optional[str]
+is_ground_truth: bool
+request: Dict[str, Any]
+response: Dict[str, Any]
}
class RowMatcher {
<<DTO (src/service/utils/download.py)>>
+columnName: str
+operation: str
+values: List[Any]
}
class DataRequestPayload {
<<DTO (src/service/utils/download.py)>>
+modelId: str
+matchAny: Optional[List[RowMatcher]]
+matchAll: Optional[List[RowMatcher]]
+matchNone: Optional[List[RowMatcher]]
}
DataRequestPayload --> "*" RowMatcher : uses
class DataResponsePayload {
<<DTO (src/service/utils/download.py)>>
+dataCSV: str
}
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @m-misiura - I've reviewed your changes - here's some feedback:
- Consider refactoring the large
uploadhandler into smaller service functions or helpers to separate ground-truth and standard flows for better readability and maintainability. - Replace the raw
Dict[str, Any]fields for inputs/outputs inUploadPayloadwith dedicated Pydantic models to leverage automatic validation of tensor schemas (shape, datatype, execution IDs). - Avoid mutable default lists in
DataRequestPayload(e.g.matchAny: Optional[List[RowMatcher]] = []) by usingField(default_factory=list)orOptional[...] = Noneto prevent shared state across requests.
Here's what I looked at during the review
- 🟡 General issues: 3 issues found
- 🟢 Security: all looks good
- 🟡 Testing: 2 issues found
- 🟢 Documentation: all looks good
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| MODEL_ID = "example1" | ||
|
|
||
|
|
||
| def generate_payload(n_rows, n_input_cols, n_output_cols, datatype, tag, input_offset=0, output_offset=0): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Consider testing scenarios where execution IDs are provided for non-ground-truth uploads.
Please add a test that uploads non-ground-truth data with explicit execution IDs and verifies these IDs are stored in the metadata, ensuring user-supplied IDs are handled correctly.
Suggested implementation:
MODEL_ID = "example1"
def generate_payload(n_rows, n_input_cols, n_output_cols, datatype, tag, input_offset=0, output_offset=0):
"""Generate a test payload with specific dimensions and data types."""
model_name = f"{MODEL_ID}_{uuid.uuid4().hex[:8]}"
input_data = []
for i in range(n_rows):
if n_input_cols == 1:
input_data.append(i + input_offset)
else:
row = [i + j + input_offset for j in range(n_input_cols)]
input_data.append(row)
output_data = []
for i in range(n_rows):
if n_output_cols == 1: for i in range(n_rows):
if n_output_cols == 1:
def test_upload_non_ground_truth_with_explicit_execution_ids():
"""Test uploading non-ground-truth data with explicit execution IDs and verify they are stored in metadata."""
import uuid
n_rows = 3
n_input_cols = 2
n_output_cols = 1
datatype = "float"
tag = "test-non-gt-execid"
execution_ids = [f"execid-{uuid.uuid4().hex[:8]}" for _ in range(n_rows)]
payload = {
"model": f"{MODEL_ID}_{uuid.uuid4().hex[:8]}",
"inputs": [[i, i+1] for i in range(n_rows)],
"outputs": [[i * 2.0] for i in range(n_rows)],
"datatype": datatype,
"tag": tag,
"ground_truth": False,
"execution_ids": execution_ids,
}
response = client.post("/upload", json=payload)
assert response.status_code == 200, f"Unexpected status code: {response.status_code}, {response.text}"
data = response.json()
assert "metadata" in data, "No metadata in response"
meta = data["metadata"]
assert "execution_ids" in meta, "No execution_ids in metadata"
assert meta["execution_ids"] == execution_ids, f"Execution IDs in metadata do not match: {meta['execution_ids']} vs {execution_ids}"
| MODEL_ID = "example1" | ||
|
|
||
|
|
||
| def test_download_data(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Add test case for requesting data from a non-existent model ID.
Please add a test to ensure that requesting a download for a non-existent modelId returns a 404 status and the correct error message, as expected from load_model_dataframe.
Suggested implementation:
# Test constants
MODEL_ID = "example1"
NONEXISTENT_MODEL_ID = "nonexistent_model"
def test_download_data():def test_download_data():
"""equivalent of Java downloadData() test"""
dataframe = DataframeGenerators.generate_random_dataframe(1000)
mock_storage.save_dataframe(dataframe, MODEL_ID)
payload = {
"modelId": MODEL_ID,
"matchAll": [
{"columnName": "gender", "operation": "EQUALS", "values": [0]},
{"columnName": "race", "operation": "EQUALS", "values": [0]},
{"columnName": "income", "operation": "EQUALS", "values": [0]},
],
"matchAny": [
}
# ... rest of the test ...
def test_download_data_nonexistent_model(client):
"""Test that requesting data for a non-existent model ID returns 404 and correct error message."""
payload = {
"modelId": NONEXISTENT_MODEL_ID,
"matchAll": [],
"matchAny": [],
}
response = client.post("/download", json=payload)
assert response.status_code == 404
assert "not found" in response.json["error"].lower()- If your test client fixture is not named
client, adjust the function argument accordingly. - If the error message from
load_model_dataframeis more specific, update the assertion to match the exact message. - Ensure that the
/downloadendpoint and error response structure match your actual API.
| """equivalent of Java downloadTextDataInternalColumnIndex() test""" | ||
| dataframe = DataframeGenerators.generate_random_text_dataframe(1000) | ||
| mock_storage.save_dataframe(dataframe, MODEL_ID) | ||
| expected_rows = dataframe.iloc[0:10].copy() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): Replace a[0:x] with a[:x] and a[x:len(a)] with a[x:] (remove-redundant-slice-index)
| expected_rows = dataframe.iloc[0:10].copy() | |
| expected_rows = dataframe.iloc[:10].copy() |
| model_name = f"{MODEL_ID}_{uuid.uuid4().hex[:8]}" | ||
| input_tensors = [] | ||
| for col_idx in range(n_input_cols): | ||
| tensor_data = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): We've found these issues:
- Convert for loop into list comprehension [×2] (
list-comprehension) - Inline variable that is immediately returned (
inline-immediately-returned-variable)
| if isinstance(id_val, np.ndarray): | ||
| ids.append(str(id_val)) | ||
| else: | ||
| ids.append(str(id_val)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): Hoist repeated code outside conditional statement (hoist-statement-from-if)
| if isinstance(id_val, np.ndarray): | |
| ids.append(str(id_val)) | |
| else: | |
| ids.append(str(id_val)) | |
| ids.append(str(id_val)) |
| payload1 = generate_payload(n_payload1, 10, 1, "INT64", tag1) | ||
| payload1["model_name"] = model_name | ||
| post_test(payload1, 200, [f"{n_payload1} datapoints"]) | ||
| payload2 = generate_payload(n_payload2, 10, 1, "INT64", tag2) | ||
| payload2["model_name"] = model_name | ||
| post_test(payload2, 200, [f"{n_payload2} datapoints"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): Extract duplicate code into function (extract-duplicate-method)
|
Codecov ReportAttention: Patch coverage is
❗ Your organization needs to install the Codecov GitHub app to enable full functionality. Additional details and impacted files@@ Coverage Diff @@
## main #24 +/- ##
===========================================
+ Coverage 48.13% 62.58% +14.45%
===========================================
Files 15 26 +11
Lines 1498 2371 +873
===========================================
+ Hits 721 1484 +763
- Misses 777 887 +110 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
8a0eb28 to
fa2a88e
Compare
src/service/utils/download.py
Outdated
| if len(metadata_data) > 0 and isinstance(metadata_data[0], bytes): | ||
| deserialized_metadata = [] | ||
| for row in metadata_data: | ||
| deserialized_row = pickle.loads(row) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unsure why this is necessary- is the metadata returned from storage.read_data ever serialized? If so, that's a bug in the storage code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition to what @RobGeada said, even if it's necessary, we shouldn't be using raw pickle deserialisation. If some re-design of this part is needed, I'm fine with leaving it to another PR so we minimise conflicts with other PRs, but to be addressed before the final version.
src/service/utils/upload.py
Outdated
| return row_data | ||
|
|
||
|
|
||
| def process_tensors( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we re-use tensor parsing logic from the KServe inference parser?
src/endpoints/data/data_download.py
Outdated
| matching_dfs.append(matched_df) | ||
| # Union all results | ||
| if matching_dfs: | ||
| df = pd.concat(matching_dfs, ignore_index=True).drop_duplicates() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps for a separate PR, but we should revisit this. I'm concerned this might not scale well for large numbers of matchers and large DFs. concat, drop_duplicates (and even the filtering, in some situations) involve DF copying.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks -- very good food for thought; I tried to refactor it to avoid some of these operations
src/endpoints/data/data_upload.py
Outdated
| ] | ||
| for eid in exec_ids | ||
| ] | ||
| metadata = np.array(metadata_rows, dtype="<U100") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would move the character limit to a constant with an explanatory comment (perhaps make it configurable in a separate PR, if it makes sense)
src/endpoints/data/data_upload.py
Outdated
| # TODO: Implement | ||
| return {"status": "success", "message": "Data uploaded successfully"} | ||
| # Get fresh storage interface for each request | ||
| storage = get_storage_interface() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm unsure if we should get a storage instance per request, or once globally for the lifespan of the service.
An advantage of a global interface is that some interfaces are already thread-safe (e.g. PVC). This could a singleton either at the module level or as a FastAPI app state. wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very good question! I assumed (perhaps incorrectly) that upload operations via endpoint would be infrequent and would benefit from fault isolation. Creating storage interface per request would ensure that each upload operation is independent and if one fails, it doesn't affect the other.
I do not have a strong opinion on what is best here and happy to follow your recommendation :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah - we probably want to use the global storage_interface. You can access it from model_data.storage_interface, but we should expose it via function
src/service/utils/download.py
Outdated
| if len(metadata_data) > 0 and isinstance(metadata_data[0], bytes): | ||
| deserialized_metadata = [] | ||
| for row in metadata_data: | ||
| deserialized_row = pickle.loads(row) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition to what @RobGeada said, even if it's necessary, we shouldn't be using raw pickle deserialisation. If some re-design of this part is needed, I'm fine with leaving it to another PR so we minimise conflicts with other PRs, but to be addressed before the final version.
| for i, arr in enumerate(input_arrays[1:], 1): | ||
| if arr.shape[0] != first_dim: | ||
| errors.append( | ||
| f"Input tensor '{input_names[i]}' has first dimension {arr.shape[0]}, " | ||
| f"which doesn't match the first dimension {first_dim} of '{input_names[0]}'" | ||
| ) | ||
| if errors: | ||
| return ". ".join(errors) + "." | ||
| return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sourcery-ai, why not the simpler
errors = [
f"Input tensor '{input_names[i]}' has first dimension {arr.shape[0]}, which doesn't match the first dimension {first_dim} of '{input_names[0]}'"
for i, arr in enumerate(input_arrays[1:], 1)
if arr.shape[0] != first_dim
]
return ". ".join(errors) + "." if errors else None5ac8b6b to
9023359
Compare
89f3ff9 to
34e127b
Compare
|
duplicate of #47 |
What does this PR do?
Added initial implementation of the /data/download and /data/upload endpoints
Quick example of data upload / download
To upload / download data, you can start a server e.g.:
Tests
You can run the accompanying tests, using e.g.
Summary by Sourcery
Implement complete data ingestion and retrieval endpoints for models, backed by a unified storage interface with payload validation, ground truth handling, and filtering logic.
New Features:
Enhancements:
Tests: