Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Changed
- [BREAKING] Dropped support for python version 3.7, as it is on EOL for over year.
- [BREAKING] Aligned the Connection String Builder keywords with the rest of the SDKs.
This means that some keywords were removed, and they will no longer be parsed as part of the Connection String.
Expand All @@ -17,7 +18,12 @@ The following keywords have been removed:
- `interactive_login`
- `az_cli`

### Fixed
- Fixed issues with nested pandas dataframes ingestion.

## [4.6.3] - 2025-01-08

### Fixed
- Explicitly export members in `__init__.py` via `__all__`

## [4.6.2] - 2025-01-07
Expand Down
33 changes: 25 additions & 8 deletions azure-kusto-ingest/azure/kusto/ingest/base_ingest_client.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
import gzip
import ipaddress
import os
import tempfile
import time
import uuid
from abc import ABCMeta, abstractmethod
from copy import copy
from enum import Enum
from io import TextIOWrapper
from typing import TYPE_CHECKING, Union, IO, AnyStr, Optional, Tuple
from urllib.parse import urlparse

from azure.kusto.data.data_format import DataFormat
from azure.kusto.data.exceptions import KustoClosedError

from .descriptors import FileDescriptor, StreamDescriptor
from .ingestion_properties import IngestionProperties


if TYPE_CHECKING:
import pandas

Expand Down Expand Up @@ -101,12 +99,15 @@ def set_proxy(self, proxy_url: str):
if self._is_closed:
raise KustoClosedError()

def ingest_from_dataframe(self, df: "pandas.DataFrame", ingestion_properties: IngestionProperties) -> IngestionResult:
def ingest_from_dataframe(
self, df: "pandas.DataFrame", ingestion_properties: IngestionProperties, data_format: Optional[DataFormat] = None
) -> IngestionResult:
"""Enqueue an ingest command from local files.
To learn more about ingestion methods go to:
https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods
:param pandas.DataFrame df: input dataframe to ingest.
:param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
:param DataFormat data_format: Format to convert the dataframe to - Can be DataFormat.CSV, DataFormat.JSOn or None. If not specified, it will try to infer it from the mapping, if not found, it will default to JSON.
"""

if self._is_closed:
Expand All @@ -117,12 +118,28 @@ def ingest_from_dataframe(self, df: "pandas.DataFrame", ingestion_properties: In
if not isinstance(df, DataFrame):
raise ValueError("Expected DataFrame instance, found {}".format(type(df)))

file_name = "df_{id}_{timestamp}_{uid}.csv.gz".format(id=id(df), timestamp=int(time.time()), uid=uuid.uuid4())
temp_file_path = os.path.join(tempfile.gettempdir(), file_name)
is_json = True

df.to_csv(temp_file_path, index=False, encoding="utf-8", header=False, compression="gzip")
# If we are given CSV mapping, or the mapping format is explicitly set to CSV, we should use CSV
if not data_format:
if ingestion_properties is not None and (ingestion_properties.ingestion_mapping_type == DataFormat.CSV):
is_json = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should check explictly for json mapping and throw if not CSV or JSON or None
https://learn.microsoft.com/en-us/kusto/management/mappings?view=microsoft-fabric#supported-mapping-types

elif data_format == DataFormat.CSV:
is_json = False
elif data_format == DataFormat.JSON:
is_json = True
else:
raise ValueError("Unsupported format: {}. Supported formats are: CSV, JSON, None".format(data_format))

ingestion_properties.format = DataFormat.CSV
file_name = "df_{id}_{timestamp}_{uid}.{ext}.gz".format(id=id(df), timestamp=int(time.time()), uid=uuid.uuid4(), ext="json" if is_json else "csv")
temp_file_path = os.path.join(tempfile.gettempdir(), file_name)
with gzip.open(temp_file_path, "wb") as temp_file:
if is_json:
df.to_json(temp_file, orient="records", date_format="iso", lines=True)
ingestion_properties.format = DataFormat.JSON
else:
df.to_csv(temp_file_path, index=False, encoding="utf-8", header=False)
ingestion_properties.format = DataFormat.CSV

try:
return self.ingest_from_file(temp_file_path, ingestion_properties)
Expand Down
13 changes: 9 additions & 4 deletions azure-kusto-ingest/tests/test_e2e_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,15 +529,20 @@ async def test_streaming_ingest_from_dataframe(self):
"xtextWithNulls",
"xdynamicWithNulls",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: but column name is not correct anymore (not sure if the table is created ad-hoc or not)

]
rows = [
[0, "00000000-0000-0000-0001-020304050607", 0.0, 0.0, 0, 0, 0, 0, 0, 0, 0, 0, "2014-01-01T01:01:01Z", "Zero", "Zero", "0", "00:00:00", None, ""]
]

guid = uuid.uuid4()

dynamic_value = ["[email protected]", "[email protected]", "[email protected]"]
rows = [[0, str(guid), 0.0, 0.0, 0, 0, 0, 0, 0, 0, 0, 0, "2014-01-01T01:01:01Z", "Zero", "Zero", "0", "00:00:00", None, dynamic_value]]
df = DataFrame(data=rows, columns=fields)
ingestion_properties = IngestionProperties(database=self.test_db, table=self.test_table, flush_immediately=True, data_format=DataFormat.CSV)
ingestion_properties = IngestionProperties(database=self.test_db, table=self.test_table, flush_immediately=True)
self.ingest_client.ingest_from_dataframe(df, ingestion_properties)

await self.assert_rows_added(1, timeout=120)

a = self.client.execute(self.test_db, f"{self.test_table} | where rowguid == '{guid}'")
assert a.primary_results[0].rows[0]["xdynamicWithNulls"] == dynamic_value

@pytest.mark.asyncio
async def test_streaming_ingest_from_blob(self, is_managed_streaming):
ingestion_properties = IngestionProperties(
Expand Down
6 changes: 3 additions & 3 deletions azure-kusto-ingest/tests/test_kusto_ingest_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ def test_simple_ingest_from_dataframe(self, mock_pid, mock_time, mock_uuid, mock
)

ingest_client = ingest_client_class("https://ingest-somecluster.kusto.windows.net")
ingestion_properties = IngestionProperties(database="database", table="table", data_format=DataFormat.CSV)
ingestion_properties = IngestionProperties(database="database", table="table")

from pandas import DataFrame

Expand All @@ -518,11 +518,11 @@ def test_simple_ingest_from_dataframe(self, mock_pid, mock_time, mock_uuid, mock
result = ingest_client.ingest_from_dataframe(df, ingestion_properties=ingestion_properties)
assert result.status == IngestionStatus.QUEUED

expected_url = "https://storageaccount.blob.core.windows.net/tempstorage/database__table__11111111-1111-1111-1111-111111111111__df_{0}_100_11111111-1111-1111-1111-111111111111.csv.gz?".format(
expected_url = "https://storageaccount.blob.core.windows.net/tempstorage/database__table__11111111-1111-1111-1111-111111111111__df_{0}_100_11111111-1111-1111-1111-111111111111.json.gz?".format(
id(df)
)

assert_queued_upload(mock_put_message_in_queue, mock_upload_blob_from_stream, expected_url)
assert_queued_upload(mock_put_message_in_queue, mock_upload_blob_from_stream, expected_url, format="json")

@responses.activate
@patch("azure.kusto.ingest.managed_streaming_ingest_client.ManagedStreamingIngestClient.MAX_STREAMING_SIZE_IN_BYTES", new=0)
Expand Down
Loading