Skip to content

Conversation

@NeriCarcasci
Copy link

@NeriCarcasci NeriCarcasci commented Aug 28, 2025

Summary

This PR extends the work on data upload and storage integration. It addresses the following Jira tasks:

  • RHOAIENG-27966 – Ensure testing of both PVC and Maria backends for all data operations
  • RHOAIENG-27964 – Ensure all StorageInterface functions are consistently async
  • RHOAIENG-27967 – Ensure consistent data upload logic for both MariaDB and PVC

Changes

1. Data Model & Validation

  • Introduced KServeDataType enum and K_SERVE_NUMPY_DTYPES mapping to enforce consistent dtype handling.
  • Extended KServeData with strict shape validation and type value validataion (BOOL, unsigned ints, BYTES as JSON strings).

2. Storage Interface Consistency

  • Enforced async signatures for all StorageInterface methods.
  • Unified shape handling for single-row uploads.
  • Fixed serialization/parsing inconsistencies across Maria and PVC, including BYTES support in PVC.

4. Testing

  • Added Bash test harness (scripts/test_upload_endpoint.sh) to validate upload endpoint across multiple edge cases.

Summary by Sourcery

Unify and async-ify the storage layer across PVC and MariaDB, introduce strict KServe data validation, refactor consumer reconciliation logic, and add a new /data/upload endpoint with tag validation for consistent data ingestion.

New Features:

  • Implement a /data/upload endpoint with tag validation and automatic data reconciliation.
  • Introduce KServeDataType enum and KServeData model with strict shape and datatype validation.
  • Add write_reconciled_data helper to centralize reconciled data writes for both PVC and MariaDB backends.

Bug Fixes:

  • Fix inconsistent BYTES serialization in PVC storage and enforce maximum void type length.
  • Convert all StorageInterface methods to async and update PVC and MariaDB implementations accordingly.

Enhancements:

  • Refactor consumer endpoint to use a unified persist/get/delete_partial_payload API with get_global_storage_interface.
  • Consolidate reconciliation logic into a single reconcile_kserve function and remove legacy reconcile functions.
  • Enhance ModelData to perform async storage checks, dataset existence warnings, and metadata DataFrame conversion.

Build:

  • Add fastapi-utils and typing_inspect dependencies to pyproject.toml.

Tests:

  • Update storage tests for async API and cover PVC/MariaDB parity.
  • Add async contract tests to enforce coroutine signatures on storage methods.
  • Introduce comprehensive tests for the upload endpoint, including shell harness for edge cases and dtype sweeps.

@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Aug 28, 2025

Reviewer's Guide

This PR refactors KServe data models and storage interfaces to enforce async contracts and strict dtype/shape validation, unifies ModelMesh and KServe payload reconciliation in the consumer endpoints, implements a comprehensive data upload endpoint with tag validation and usage metrics, ensures feature parity between PVC and MariaDB backends, and extends the test suite with integration, async‐contract, and end-to-end upload tests.

Class diagram for new and refactored KServe data models

classDiagram
    class KServeDataType {
        <<enum>>
        BOOL
        INT8
        INT16
        INT32
        INT64
        UINT8
        UINT16
        UINT32
        UINT64
        FP16
        FP32
        FP64
        BYTES
    }
    class KServeData {
        +name: str
        +shape: List[int]
        +datatype: KServeDataType
        +parameters: Optional[Dict[str, str]]
        +data: List[Any]
        +_validate_shape()
        +validate_data_matches_type()
    }
    class KServeInferenceRequest {
        +id: Optional[str]
        +parameters: Optional[Dict[str, str]]
        +inputs: List[KServeData]
        +outputs: Optional[List[KServeData]]
    }
    class KServeInferenceResponse {
        +model_name: str
        +model_version: Optional[str]
        +id: Optional[str]
        +parameters: Optional[Dict[str, str]]
        +outputs: List[KServeData]
    }
    KServeInferenceRequest --> KServeData
    KServeInferenceResponse --> KServeData
    KServeData --> KServeDataType
Loading

Class diagram for refactored StorageInterface and implementations

classDiagram
    class StorageInterface {
        <<abstract>>
        +async dataset_exists(dataset_name: str)
        +async list_all_datasets()
        +async dataset_rows(dataset_name: str)
        +async dataset_shape(dataset_name: str)
        +async write_data(dataset_name: str, new_rows, column_names: List[str])
        +async read_data(dataset_name: str, start_row: int, n_rows: int)
        +async get_original_column_names(dataset_name: str)
        +async get_aliased_column_names(dataset_name: str)
        +async apply_name_mapping(dataset_name: str, name_mapping: Dict[str, str])
        +async delete_dataset(dataset_name: str)
        +async persist_partial_payload(payload, payload_id, is_input)
        +async get_partial_payload(payload_id, is_input, is_modelmesh)
        +async delete_partial_payload(payload_id, is_input)
    }
    class PVCStorage {
        +async dataset_exists(...)
        +async list_all_datasets(...)
        +async dataset_rows(...)
        +async dataset_shape(...)
        +async write_data(...)
        +async read_data(...)
        +async get_original_column_names(...)
        +async get_aliased_column_names(...)
        +async apply_name_mapping(...)
        +async delete_dataset(...)
        +async persist_partial_payload(...)
        +async get_partial_payload(...)
        +async delete_partial_payload(...)
    }
    class MariaDBStorage {
        +async dataset_exists(...)
        +async list_all_datasets(...)
        +async dataset_rows(...)
        +async dataset_shape(...)
        +async write_data(...)
        +async read_data(...)
        +async get_original_column_names(...)
        +async get_aliased_column_names(...)
        +async apply_name_mapping(...)
        +async delete_dataset(...)
        +async persist_partial_payload(...)
        +async get_partial_payload(...)
        +async delete_partial_payload(...)
    }
    StorageInterface <|-- PVCStorage
    StorageInterface <|-- MariaDBStorage
Loading

File-Level Changes

Change Details Files
Enhanced KServeData model with strict dtype and shape validation
  • Introduced KServeDataType enum and K_SERVE_NUMPY_DTYPES mapping
  • Added post‐validators to KServeData to enforce declared shape matches actual data and type constraints (BOOL, unsigned ints, BYTES)
  • Refactored KServeInferenceRequest and KServeInferenceResponse to use the new model
src/endpoints/consumer/__init__.py
Unified StorageInterface async signatures and payload APIs across PVC and MariaDB
  • Converted all dataset and partial‐payload methods to async in StorageInterface
  • Replaced separate persist/get/delete methods for ModelMesh with a single persist_partial_payload/get_partial_payload/delete_partial_payload
  • Standardized byte/void dtype handling in PVCStorage and scheduled async migration in MariaDBStorage
  • Added get_global_storage_interface singleton and updated init.py to handle force_reload
src/service/data/storage/storage_interface.py
src/service/data/storage/pvc.py
src/service/data/storage/maria/maria.py
src/service/data/storage/__init__.py
Refactored consumer endpoint reconciliation logic
  • Extracted write_reconciled_data helper to centralize writing input/output/metadata
  • Unified reconcile_kserve and reconcile_modelmesh with consistent storage calls
  • Replaced get_modelmesh_payload/persist_modelmesh_payload with get_partial_payload/persist_partial_payload
  • Switched to get_global_storage_interface in consume_inference_payload
src/endpoints/consumer/consumer_endpoint.py
Implemented new /data/upload endpoint with tag validation
  • Added UploadPayload with optional data_tag and ground-truth flag
  • Validated tag prefix against TRUSTYAI_TAG_PREFIX and raised HTTP 400 on violation
  • Generated unique request IDs, invoked consume_cloud_event for response then request with tag
  • Returned new datapoint count with success/failure HTTP semantics
src/endpoints/data/data_upload.py
Expanded and updated test suite for async contracts and upload flow
  • Added end-to-end upload endpoint tests for PVC and Maria, covering dtypes, multi-tensor shapes, tagging edge cases
  • Introduced pytest async‐contract test to enforce all storage methods are async
  • Updated existing reconciliation and storage tests to match new persist/get signatures
  • Included Bash harness script scripts/test_upload_endpoint.sh for manual endpoint validation
tests/endpoints/test_upload_endpoint_pvc.py
tests/endpoints/test_upload_endpoint_maria.py
tests/service/data/test_async_contract.py
tests/service/data/test_payload_reconciliation_*.py
tests/service/data/test_mariadb_storage.py
tests/service/data/test_mariadb_migration.py
scripts/test_upload_endpoint.sh

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

Blocking issues:

  • Detected possible formatted SQL query. Use parameterized queries instead. (link)
  • Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option. (link)
  • Detected possible formatted SQL query. Use parameterized queries instead. (link)
  • Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option. (link)
  • Detected possible formatted SQL query. Use parameterized queries instead. (link)
  • Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option. (link)
  • Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option. (link)
  • Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option. (link)
  • Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option. (link)
  • Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option. (link)
  • Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option. (link)
  • Detected possible formatted SQL query. Use parameterized queries instead. (link)
  • Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option. (link)
  • Detected possible formatted SQL query. Use parameterized queries instead. (link)
  • Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option. (link)
  • Detected possible formatted SQL query. Use parameterized queries instead. (link)
  • Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option. (link)

General comments:

  • Avoid brittle error handling in the upload endpoint—rather than checking for “Could not reconcile_kserve” in the exception message, raise and catch a dedicated ReconciliationError (or similar) to cleanly separate reconcilation failures from other HTTPExceptions.
  • The module-level global storage interface (get_global_storage_interface) can leak state between tests and requests; consider using FastAPI’s dependency injection (e.g. Depends) to provide a fresh storage instance per request instead of a mutable global.
  • In pvc.py’s _write_raw_data, casting all void arrays to a fixed V1024 dtype may silently truncate larger bytes payloads; consider computing the necessary void type size per payload or failing fast with a clear error when the data exceeds the max length.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Avoid brittle error handling in the upload endpoint—rather than checking for “Could not reconcile_kserve” in the exception message, raise and catch a dedicated ReconciliationError (or similar) to cleanly separate reconcilation failures from other HTTPExceptions.
- The module-level global storage interface (`get_global_storage_interface`) can leak state between tests and requests; consider using FastAPI’s dependency injection (e.g. `Depends`) to provide a fresh storage instance per request instead of a mutable global.
- In pvc.py’s `_write_raw_data`, casting all void arrays to a fixed V1024 dtype may silently truncate larger bytes payloads; consider computing the necessary void type size per payload or failing fast with a clear error when the data exceeds the max length.

## Individual Comments

### Comment 1
<location> `src/service/data/model_data.py:121` </location>
<code_context>

         return input_data, output_data, metadata

+    async def get_metadata_as_df(self):
+        _, _, metadata = await self.data(get_input=False, get_output=False)
+        metadata_cols = (await self.column_names())[2]
+        return pd.DataFrame(metadata, columns=metadata_cols)
+
+
</code_context>

<issue_to_address>
get_metadata_as_df assumes metadata and columns are always present and aligned.

Add checks to handle cases where metadata or columns are missing to prevent runtime errors.
</issue_to_address>

<suggested_fix>
<<<<<<< SEARCH
    async def get_metadata_as_df(self):
        _, _, metadata = await self.data(get_input=False, get_output=False)
        metadata_cols = (await self.column_names())[2]
        return pd.DataFrame(metadata, columns=metadata_cols)
=======
    async def get_metadata_as_df(self):
        _, _, metadata = await self.data(get_input=False, get_output=False)
        metadata_cols = (await self.column_names())[2]
        if not metadata or not metadata_cols:
            logger.warning("Metadata or metadata columns missing; returning empty DataFrame.")
            return pd.DataFrame()
        if len(metadata) > 0 and len(metadata_cols) > 0 and all(isinstance(row, (list, tuple, np.ndarray)) for row in metadata):
            # Check if columns and data are aligned
            if all(len(row) == len(metadata_cols) for row in metadata):
                return pd.DataFrame(metadata, columns=metadata_cols)
            else:
                logger.warning("Metadata rows and columns are not aligned; returning empty DataFrame.")
                return pd.DataFrame()
        else:
            logger.warning("Metadata format is invalid; returning empty DataFrame.")
            return pd.DataFrame()
>>>>>>> REPLACE

</suggested_fix>

### Comment 2
<location> `tests/service/data/test_payload_reconciliation_pvc.py:86` </location>
<code_context>
+    async def _test_full_reconciliation(self):
</code_context>

<issue_to_address>
Full reconciliation test covers input/output persistence and cleanup.

Consider adding a test case for corrupted or invalid payloads to verify error handling.
</issue_to_address>

## Security Issues

### Issue 1
<location> `src/service/data/storage/maria/maria.py:67` </location>

<issue_to_address>
**security (python.lang.security.audit.formatted-sql-query):** Detected possible formatted SQL query. Use parameterized queries instead.

*Source: opengrep*
</issue_to_address>

### Issue 2
<location> `src/service/data/storage/maria/maria.py:67` </location>

<issue_to_address>
**security (python.sqlalchemy.security.sqlalchemy-execute-raw-query):** Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

*Source: opengrep*
</issue_to_address>

### Issue 3
<location> `src/service/data/storage/maria/maria.py:123` </location>

<issue_to_address>
**security (python.lang.security.audit.formatted-sql-query):** Detected possible formatted SQL query. Use parameterized queries instead.

*Source: opengrep*
</issue_to_address>

### Issue 4
<location> `src/service/data/storage/maria/maria.py:123` </location>

<issue_to_address>
**security (python.sqlalchemy.security.sqlalchemy-execute-raw-query):** Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

*Source: opengrep*
</issue_to_address>

### Issue 5
<location> `src/service/data/storage/maria/maria.py:151` </location>

<issue_to_address>
**security (python.lang.security.audit.formatted-sql-query):** Detected possible formatted SQL query. Use parameterized queries instead.

*Source: opengrep*
</issue_to_address>

### Issue 6
<location> `src/service/data/storage/maria/maria.py:151` </location>

<issue_to_address>
**security (python.sqlalchemy.security.sqlalchemy-execute-raw-query):** Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

*Source: opengrep*
</issue_to_address>

### Issue 7
<location> `src/service/data/storage/maria/maria.py:282` </location>

<issue_to_address>
**security (python.sqlalchemy.security.sqlalchemy-execute-raw-query):** Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

*Source: opengrep*
</issue_to_address>

### Issue 8
<location> `src/service/data/storage/maria/maria.py:349` </location>

<issue_to_address>
**security (python.sqlalchemy.security.sqlalchemy-execute-raw-query):** Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

*Source: opengrep*
</issue_to_address>

### Issue 9
<location> `src/service/data/storage/maria/maria.py:357` </location>

<issue_to_address>
**security (python.sqlalchemy.security.sqlalchemy-execute-raw-query):** Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

*Source: opengrep*
</issue_to_address>

### Issue 10
<location> `src/service/data/storage/maria/maria.py:372` </location>

<issue_to_address>
**security (python.sqlalchemy.security.sqlalchemy-execute-raw-query):** Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

*Source: opengrep*
</issue_to_address>

### Issue 11
<location> `src/service/data/storage/maria/maria.py:382` </location>

<issue_to_address>
**security (python.sqlalchemy.security.sqlalchemy-execute-raw-query):** Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

*Source: opengrep*
</issue_to_address>

### Issue 12
<location> `src/service/data/storage/maria/maria.py:383` </location>

<issue_to_address>
**security (python.lang.security.audit.formatted-sql-query):** Detected possible formatted SQL query. Use parameterized queries instead.

*Source: opengrep*
</issue_to_address>

### Issue 13
<location> `src/service/data/storage/maria/maria.py:383` </location>

<issue_to_address>
**security (python.sqlalchemy.security.sqlalchemy-execute-raw-query):** Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

*Source: opengrep*
</issue_to_address>

### Issue 14
<location> `src/service/data/storage/maria/maria.py:395` </location>

<issue_to_address>
**security (python.lang.security.audit.formatted-sql-query):** Detected possible formatted SQL query. Use parameterized queries instead.

*Source: opengrep*
</issue_to_address>

### Issue 15
<location> `src/service/data/storage/maria/maria.py:395` </location>

<issue_to_address>
**security (python.sqlalchemy.security.sqlalchemy-execute-raw-query):** Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

*Source: opengrep*
</issue_to_address>

### Issue 16
<location> `src/service/data/storage/maria/maria.py:396` </location>

<issue_to_address>
**security (python.lang.security.audit.formatted-sql-query):** Detected possible formatted SQL query. Use parameterized queries instead.

*Source: opengrep*
</issue_to_address>

### Issue 17
<location> `src/service/data/storage/maria/maria.py:396` </location>

<issue_to_address>
**security (python.sqlalchemy.security.sqlalchemy-execute-raw-query):** Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

*Source: opengrep*
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +121 to +124
async def get_metadata_as_df(self):
_, _, metadata = await self.data(get_input=False, get_output=False)
metadata_cols = (await self.column_names())[2]
return pd.DataFrame(metadata, columns=metadata_cols)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: get_metadata_as_df assumes metadata and columns are always present and aligned.

Add checks to handle cases where metadata or columns are missing to prevent runtime errors.

Suggested change
async def get_metadata_as_df(self):
_, _, metadata = await self.data(get_input=False, get_output=False)
metadata_cols = (await self.column_names())[2]
return pd.DataFrame(metadata, columns=metadata_cols)
async def get_metadata_as_df(self):
_, _, metadata = await self.data(get_input=False, get_output=False)
metadata_cols = (await self.column_names())[2]
if not metadata or not metadata_cols:
logger.warning("Metadata or metadata columns missing; returning empty DataFrame.")
return pd.DataFrame()
if len(metadata) > 0 and len(metadata_cols) > 0 and all(isinstance(row, (list, tuple, np.ndarray)) for row in metadata):
# Check if columns and data are aligned
if all(len(row) == len(metadata_cols) for row in metadata):
return pd.DataFrame(metadata, columns=metadata_cols)
else:
logger.warning("Metadata rows and columns are not aligned; returning empty DataFrame.")
return pd.DataFrame()
else:
logger.warning("Metadata format is invalid; returning empty DataFrame.")
return pd.DataFrame()

output_payload = await self.storage.get_partial_payload(
self.request_id, is_input=False, is_modelmesh=True
)
self.assertIsNone(output_payload)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Full reconciliation test covers input/output persistence and cleanup.

Consider adding a test case for corrupted or invalid payloads to verify error handling.


with self.connection_manager as (conn, cursor):
cursor.execute(f"CREATE TABLE IF NOT EXISTS `{self.dataset_reference_table}` (table_idx BIGINT AUTO_INCREMENT, dataset_name varchar(255), metadata JSON, n_rows BIGINT, PRIMARY KEY (table_idx))")
cursor.execute(f"CREATE TABLE IF NOT EXISTS `{self.partial_payload_table}` (payload_id varchar(255), is_input BOOLEAN, payload_data LONGBLOB)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.lang.security.audit.formatted-sql-query): Detected possible formatted SQL query. Use parameterized queries instead.

Source: opengrep


with self.connection_manager as (conn, cursor):
cursor.execute(f"CREATE TABLE IF NOT EXISTS `{self.dataset_reference_table}` (table_idx BIGINT AUTO_INCREMENT, dataset_name varchar(255), metadata JSON, n_rows BIGINT, PRIMARY KEY (table_idx))")
cursor.execute(f"CREATE TABLE IF NOT EXISTS `{self.partial_payload_table}` (payload_id varchar(255), is_input BOOLEAN, payload_data LONGBLOB)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.sqlalchemy.security.sqlalchemy-execute-raw-query): Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

Source: opengrep

"""
def _list_all_datasets_sync(self):
with self.connection_manager as (conn, cursor):
cursor.execute(f"SELECT dataset_name FROM `{self.dataset_reference_table}`")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.lang.security.audit.formatted-sql-query): Detected possible formatted SQL query. Use parameterized queries instead.

Source: opengrep

from src.service.data.modelmesh_parser import PartialPayload
from src.service.data.storage.maria.maria import MariaDBStorage
from src.service.data.storage.pvc import PVCStorage
from tests.service.data.test_payload_reconciliation_pvc import TestPayloadReconciliation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): Don't import test modules. (dont-import-test-modules)

ExplanationDon't import test modules.

Tests should be self-contained and don't depend on each other.

If a helper function is used by multiple tests,
define it in a helper module,
instead of importing one test from the other.

try:
np.array(flat, dtype=np_dtype)
except (ValueError, TypeError) as e:
raise ValueError(f"Data cannot be cast to {self.datatype}: {e}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Explicitly raise from a previous error (raise-from-previous-error)

Suggested change
raise ValueError(f"Data cannot be cast to {self.datatype}: {e}")
raise ValueError(f"Data cannot be cast to {self.datatype}: {e}") from e

async def upload_data(payload: ModelInferJointPayload):
"""Upload a batch of model data to TrustyAI."""
async def upload(payload: UploadPayload) -> Dict[str, str]:
"""Upload model data"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): We've found these issues:

else:
row = [i * 2 + j + output_offset for j in range(n_output_cols)]
output_data.append(row)
payload = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): Inline variable that is immediately returned (inline-immediately-returned-variable)

for row_idx in range(n_rows):
row = [row_idx * 2 + col_idx for col_idx in range(n_output_cols)]
output_data.append(row)
payload = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): Inline variable that is immediately returned (inline-immediately-returned-variable)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants