Skip to content

Error while ingesting sample data for tables with JSON columns in Apache Pinot #25721

@piby180

Description

@piby180

Affected module
Does it impact the UI, backend or Ingestion Framework?
Ingestion Framework - No sample data is pushed for tables with json columns

Describe the bug
We have a few tables in Apache pinot with json columns. These columns store list of dicts. We run Auto Classification job externally on Argo which pulls sample data for every Pinot table and ingest it to Openmetadata. However it throws exception for tables with json columns

| Sampler | Pinot.default.my_table | Unexpected exception processing entity Pinot.default.default.my_table: expected bytes, str found | Traceback (most recent call last):                                                                                                  |
|         |                                                                  |                                                                                                                                    |   File "/home/airflow/.local/lib/python3.10/site-packages/metadata/sampler/processor.py", line 115, in _run                         |
|         |                                                                  |                                                                                                                                    |     data=sampler_interface.generate_sample_data(),                                                                                  |
|         |                                                                  |                                                                                                                                    |   File "/home/airflow/.local/lib/python3.10/site-packages/metadata/utils/execution_time_tracker.py", line 266, in inner             |
|         |                                                                  |                                                                                                                                    |     result = func(*args, **kwargs)                                                                                                  |
|         |                                                                  |                                                                                                                                    |   File "/home/airflow/.local/lib/python3.10/site-packages/metadata/sampler/sampler_interface.py", line 259, in generate_sample_data |
|         |                                                                  |                                                                                                                                    |     raise err                                                                                                                       |
|         |                                                                  |                                                                                                                                    |   File "/home/airflow/.local/lib/python3.10/site-packages/metadata/sampler/sampler_interface.py", line 243, in generate_sample_data |
|         |                                                                  |                                                                                                                                    |     table_data = self.fetch_sample_data(self.columns)                                                                               |
|         |                                                                  |                                                                                                                                    |   File "/home/airflow/.local/lib/python3.10/site-packages/metadata/sampler/sqlalchemy/sampler.py", line 258, in fetch_sample_data   |
|         |                                                                  |                                                                                                                                    |     .all()                                                                                                                          |
|         |                                                                  |                                                                                                                                    |   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/query.py", line 2773, in all                               |
|         |                                                                  |                                                                                                                                    |     return self._iter().all()                                                                                                       |
|         |                                                                  |                                                                                                                                    |   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/result.py", line 1129, in all                           |
|         |                                                                  |                                                                                                                                    |     return self._allrows()                                                                                                          |
|         |                                                                  |                                                                                                                                    |   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/result.py", line 401, in _allrows                       |
|         |                                                                  |                                                                                                                                    |     rows = self._fetchall_impl()                                                                                                    |
|         |                                                                  |                                                                                                                                    |   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/result.py", line 1813, in _fetchall_impl                |
|         |                                                                  |                                                                                                                                    |     return list(self.iterator)                                                                                                      |
|         |                                                                  |                                                                                                                                    |   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/loading.py", line 147, in chunks                           |
|         |                                                                  |                                                                                                                                    |     fetch = cursor._raw_all_rows()                                                                                                  |
|         |                                                                  |                                                                                                                                    |   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/result.py", line 393, in _raw_all_rows                  |
|         |                                                                  |                                                                                                                                    |     return [make_row(row) for row in rows]                                                                                          |
|         |                                                                  |                                                                                                                                    |   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/result.py", line 393, in <listcomp>                     |
|         |                                                                  |                                                                                                                                    |     return [make_row(row) for row in rows]                                                                                          |
|         |                                                                  |                                                                                                                                    |   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/sql/sqltypes.py", line 2684, in process                        |
|         |                                                                  |                                                                                                                                    |     value = string_process(value)                                                                                                   |
|         |                                                                  |                                                                                                                                    | TypeError: expected bytes, str found                                                                                                |
+---------+------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------+[0m

This is our autoclassification config

source:
      type: pinotdb
      serviceName: Pinot
      serviceConnection:
        config:
          type: PinotDB
          username: ${OPENMETADATA_PIPELINES_PINOT_USERNAME}
          password: ${OPENMETADATA_PIPELINES_PINOT_PASSWORD}
          hostPort: ${OPENMETADATA_PIPELINES_PINOT_BROKER_URL}
          pinotControllerHost: ${OPENMETADATA_PIPELINES_PINOT_CONTROLLER_URL}
          connectionArguments:
            use_multistage_engine: true
      sourceConfig:
        config:
          type: AutoClassification
          storeSampleData: true
          enableAutoClassification: false
          sampleDataCount: 100
          tableFilterPattern:
            excludes: # currently all tables with json columns
              - table_1
              - table_2
              - table_3
    processor:
      type: orm-profiler
      config: {}
    sink:
      type: metadata-rest
      config: {}
    workflowConfig:
      loggerLevel: DEBUG  # DEBUG, INFO, WARNING or ERROR
      openMetadataServerConfig:
        hostPort: ${OPENMETADATA_PIPELINES_OPENMETADATA_API_URL}
        authProvider: openmetadata
        securityConfig:
          jwtToken: ${OPENMETADATA_PIPELINES_AUTO_CLASSIFICATION_BOT_JWT_TOKEN}
        storeServiceConnection: false
    ingestionPipelineFQN: argo.pinot-auto-classification

To Reproduce

You can quickly use docker-componse to test Pinot locally.
https://docs.pinot.apache.org/basics/getting-started/running-pinot-in-docker#create-docker-compose.yml-file

Injest a table with json column and use pinotdb to fetch the data and check the type returned for json columns

Expected behavior
Sample data for tables with json columns should be ingested in Openmetadata

Version:

  • OS: openmetadata/ingestion:1.11.8 docker image
  • Python version: 3.19
  • OpenMetadata version: 1.11.8
  • OpenMetadata Ingestion package version: openmetadata-ingestion[docker]==1.11.8`

Additional context
Pinotdb client returns json colum as strings.

Example code :

import json

cell_value = data.json_column.iloc[0]
# '[ {'a' : 1, 'b' : 2}]'

type(cell_value) == str
# True

type(json.loads(cell_value)) == dict
# True


Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    Status

    No status

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions