Skip to content

Commit 3ca2f4b

Browse files
authored
Fixed bug where nested json inside pandas wouldn't be ingested correctly (#568)
* Fixed bug where nested json inside pandas wouldn't be ingested correctly * Update changelog * Format * Fixed test * Fixed test * New logic * New logic * New logic * New logic * New logic * Update
1 parent c2e31d3 commit 3ca2f4b

File tree

4 files changed

+43
-15
lines changed

4 files changed

+43
-15
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## Unreleased
99

10+
### Changed
1011
- [BREAKING] Dropped support for python version 3.7, as it is on EOL for over year.
1112
- [BREAKING] Aligned the Connection String Builder keywords with the rest of the SDKs.
1213
This means that some keywords were removed, and they will no longer be parsed as part of the Connection String.
@@ -17,7 +18,12 @@ The following keywords have been removed:
1718
- `interactive_login`
1819
- `az_cli`
1920

21+
### Fixed
22+
- Fixed issues with nested pandas dataframes ingestion.
23+
2024
## [4.6.3] - 2025-01-08
25+
26+
### Fixed
2127
- Explicitly export members in `__init__.py` via `__all__`
2228

2329
## [4.6.2] - 2025-01-07

azure-kusto-ingest/azure/kusto/ingest/base_ingest_client.py

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,20 @@
1+
import gzip
12
import ipaddress
23
import os
34
import tempfile
45
import time
56
import uuid
67
from abc import ABCMeta, abstractmethod
7-
from copy import copy
88
from enum import Enum
99
from io import TextIOWrapper
1010
from typing import TYPE_CHECKING, Union, IO, AnyStr, Optional, Tuple
1111
from urllib.parse import urlparse
1212

1313
from azure.kusto.data.data_format import DataFormat
1414
from azure.kusto.data.exceptions import KustoClosedError
15-
1615
from .descriptors import FileDescriptor, StreamDescriptor
1716
from .ingestion_properties import IngestionProperties
1817

19-
2018
if TYPE_CHECKING:
2119
import pandas
2220

@@ -101,12 +99,15 @@ def set_proxy(self, proxy_url: str):
10199
if self._is_closed:
102100
raise KustoClosedError()
103101

104-
def ingest_from_dataframe(self, df: "pandas.DataFrame", ingestion_properties: IngestionProperties) -> IngestionResult:
102+
def ingest_from_dataframe(
103+
self, df: "pandas.DataFrame", ingestion_properties: IngestionProperties, data_format: Optional[DataFormat] = None
104+
) -> IngestionResult:
105105
"""Enqueue an ingest command from local files.
106106
To learn more about ingestion methods go to:
107107
https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods
108108
:param pandas.DataFrame df: input dataframe to ingest.
109109
:param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
110+
:param DataFormat data_format: Format to convert the dataframe to - Can be DataFormat.CSV, DataFormat.JSOn or None. If not specified, it will try to infer it from the mapping, if not found, it will default to JSON.
110111
"""
111112

112113
if self._is_closed:
@@ -117,12 +118,28 @@ def ingest_from_dataframe(self, df: "pandas.DataFrame", ingestion_properties: In
117118
if not isinstance(df, DataFrame):
118119
raise ValueError("Expected DataFrame instance, found {}".format(type(df)))
119120

120-
file_name = "df_{id}_{timestamp}_{uid}.csv.gz".format(id=id(df), timestamp=int(time.time()), uid=uuid.uuid4())
121-
temp_file_path = os.path.join(tempfile.gettempdir(), file_name)
121+
is_json = True
122122

123-
df.to_csv(temp_file_path, index=False, encoding="utf-8", header=False, compression="gzip")
123+
# If we are given CSV mapping, or the mapping format is explicitly set to CSV, we should use CSV
124+
if not data_format:
125+
if ingestion_properties is not None and (ingestion_properties.ingestion_mapping_type == DataFormat.CSV):
126+
is_json = False
127+
elif data_format == DataFormat.CSV:
128+
is_json = False
129+
elif data_format == DataFormat.JSON:
130+
is_json = True
131+
else:
132+
raise ValueError("Unsupported format: {}. Supported formats are: CSV, JSON, None".format(data_format))
124133

125-
ingestion_properties.format = DataFormat.CSV
134+
file_name = "df_{id}_{timestamp}_{uid}.{ext}.gz".format(id=id(df), timestamp=int(time.time()), uid=uuid.uuid4(), ext="json" if is_json else "csv")
135+
temp_file_path = os.path.join(tempfile.gettempdir(), file_name)
136+
with gzip.open(temp_file_path, "wb") as temp_file:
137+
if is_json:
138+
df.to_json(temp_file, orient="records", date_format="iso", lines=True)
139+
ingestion_properties.format = DataFormat.JSON
140+
else:
141+
df.to_csv(temp_file_path, index=False, encoding="utf-8", header=False)
142+
ingestion_properties.format = DataFormat.CSV
126143

127144
try:
128145
return self.ingest_from_file(temp_file_path, ingestion_properties)

azure-kusto-ingest/tests/test_e2e_ingest.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -529,15 +529,20 @@ async def test_streaming_ingest_from_dataframe(self):
529529
"xtextWithNulls",
530530
"xdynamicWithNulls",
531531
]
532-
rows = [
533-
[0, "00000000-0000-0000-0001-020304050607", 0.0, 0.0, 0, 0, 0, 0, 0, 0, 0, 0, "2014-01-01T01:01:01Z", "Zero", "Zero", "0", "00:00:00", None, ""]
534-
]
532+
533+
guid = uuid.uuid4()
534+
535+
536+
rows = [[0, str(guid), 0.0, 0.0, 0, 0, 0, 0, 0, 0, 0, 0, "2014-01-01T01:01:01Z", "Zero", "Zero", "0", "00:00:00", None, dynamic_value]]
535537
df = DataFrame(data=rows, columns=fields)
536-
ingestion_properties = IngestionProperties(database=self.test_db, table=self.test_table, flush_immediately=True, data_format=DataFormat.CSV)
538+
ingestion_properties = IngestionProperties(database=self.test_db, table=self.test_table, flush_immediately=True)
537539
self.ingest_client.ingest_from_dataframe(df, ingestion_properties)
538540

539541
await self.assert_rows_added(1, timeout=120)
540542

543+
a = self.client.execute(self.test_db, f"{self.test_table} | where rowguid == '{guid}'")
544+
assert a.primary_results[0].rows[0]["xdynamicWithNulls"] == dynamic_value
545+
541546
@pytest.mark.asyncio
542547
async def test_streaming_ingest_from_blob(self, is_managed_streaming):
543548
ingestion_properties = IngestionProperties(

azure-kusto-ingest/tests/test_kusto_ingest_client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -507,7 +507,7 @@ def test_simple_ingest_from_dataframe(self, mock_pid, mock_time, mock_uuid, mock
507507
)
508508

509509
ingest_client = ingest_client_class("https://ingest-somecluster.kusto.windows.net")
510-
ingestion_properties = IngestionProperties(database="database", table="table", data_format=DataFormat.CSV)
510+
ingestion_properties = IngestionProperties(database="database", table="table")
511511

512512
from pandas import DataFrame
513513

@@ -518,11 +518,11 @@ def test_simple_ingest_from_dataframe(self, mock_pid, mock_time, mock_uuid, mock
518518
result = ingest_client.ingest_from_dataframe(df, ingestion_properties=ingestion_properties)
519519
assert result.status == IngestionStatus.QUEUED
520520

521-
expected_url = "https://storageaccount.blob.core.windows.net/tempstorage/database__table__11111111-1111-1111-1111-111111111111__df_{0}_100_11111111-1111-1111-1111-111111111111.csv.gz?".format(
521+
expected_url = "https://storageaccount.blob.core.windows.net/tempstorage/database__table__11111111-1111-1111-1111-111111111111__df_{0}_100_11111111-1111-1111-1111-111111111111.json.gz?".format(
522522
id(df)
523523
)
524524

525-
assert_queued_upload(mock_put_message_in_queue, mock_upload_blob_from_stream, expected_url)
525+
assert_queued_upload(mock_put_message_in_queue, mock_upload_blob_from_stream, expected_url, format="json")
526526

527527
@responses.activate
528528
@patch("azure.kusto.ingest.managed_streaming_ingest_client.ManagedStreamingIngestClient.MAX_STREAMING_SIZE_IN_BYTES", new=0)

0 commit comments

Comments
 (0)