Skip to content

Commit 1ef6c6d

Browse files
committed
Fixed bug where nested json inside pandas wouldn't be ingested correctly
1 parent c2e31d3 commit 1ef6c6d

File tree

2 files changed

+13
-9
lines changed

2 files changed

+13
-9
lines changed

azure-kusto-ingest/azure/kusto/ingest/base_ingest_client.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,20 @@
1+
import gzip
12
import ipaddress
23
import os
34
import tempfile
45
import time
56
import uuid
67
from abc import ABCMeta, abstractmethod
7-
from copy import copy
88
from enum import Enum
99
from io import TextIOWrapper
1010
from typing import TYPE_CHECKING, Union, IO, AnyStr, Optional, Tuple
1111
from urllib.parse import urlparse
1212

1313
from azure.kusto.data.data_format import DataFormat
1414
from azure.kusto.data.exceptions import KustoClosedError
15-
1615
from .descriptors import FileDescriptor, StreamDescriptor
1716
from .ingestion_properties import IngestionProperties
1817

19-
2018
if TYPE_CHECKING:
2119
import pandas
2220

@@ -117,12 +115,11 @@ def ingest_from_dataframe(self, df: "pandas.DataFrame", ingestion_properties: In
117115
if not isinstance(df, DataFrame):
118116
raise ValueError("Expected DataFrame instance, found {}".format(type(df)))
119117

120-
file_name = "df_{id}_{timestamp}_{uid}.csv.gz".format(id=id(df), timestamp=int(time.time()), uid=uuid.uuid4())
118+
file_name = "df_{id}_{timestamp}_{uid}.json.gz".format(id=id(df), timestamp=int(time.time()), uid=uuid.uuid4())
121119
temp_file_path = os.path.join(tempfile.gettempdir(), file_name)
122-
123-
df.to_csv(temp_file_path, index=False, encoding="utf-8", header=False, compression="gzip")
124-
125-
ingestion_properties.format = DataFormat.CSV
120+
with gzip.open(temp_file_path, "wb") as temp_file:
121+
df.to_json(temp_file, orient="records", date_format="iso", lines=True)
122+
ingestion_properties.format = DataFormat.JSON
126123

127124
try:
128125
return self.ingest_from_file(temp_file_path, ingestion_properties)

azure-kusto-ingest/tests/test_e2e_ingest.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -529,15 +529,22 @@ async def test_streaming_ingest_from_dataframe(self):
529529
"xtextWithNulls",
530530
"xdynamicWithNulls",
531531
]
532+
533+
guid = uuid.uuid4()
534+
535+
dynamic_value = ["me@dummy.com", "you@dummy.com", "them@dummy.com"]
532536
rows = [
533-
[0, "00000000-0000-0000-0001-020304050607", 0.0, 0.0, 0, 0, 0, 0, 0, 0, 0, 0, "2014-01-01T01:01:01Z", "Zero", "Zero", "0", "00:00:00", None, ""]
537+
[0, str(guid), 0.0, 0.0, 0, 0, 0, 0, 0, 0, 0, 0, "2014-01-01T01:01:01Z", "Zero", "Zero", "0", "00:00:00", None, dynamic_value]
534538
]
535539
df = DataFrame(data=rows, columns=fields)
536540
ingestion_properties = IngestionProperties(database=self.test_db, table=self.test_table, flush_immediately=True, data_format=DataFormat.CSV)
537541
self.ingest_client.ingest_from_dataframe(df, ingestion_properties)
538542

539543
await self.assert_rows_added(1, timeout=120)
540544

545+
a = self.client.execute(self.test_db, f"{self.test_table} | where rowguid == '{guid}'")
546+
assert a.primary_results[0].rows[0]["xdynamicWithNulls"] == dynamic_value
547+
541548
@pytest.mark.asyncio
542549
async def test_streaming_ingest_from_blob(self, is_managed_streaming):
543550
ingestion_properties = IngestionProperties(

0 commit comments

Comments
 (0)