diff --git a/data-explorer/ingest-data-streaming.md b/data-explorer/ingest-data-streaming.md index 5f46a8ce8e..90d74120e7 100644 --- a/data-explorer/ingest-data-streaming.md +++ b/data-explorer/ingest-data-streaming.md @@ -1,9 +1,9 @@ --- -title: Configure streaming ingestion on your Azure Data Explorer cluster +title: Configure Streaming Ingestion on Your Azure Data Explorer Cluster description: Learn how to configure your Azure Data Explorer cluster and start loading data with streaming ingestion. ms.reviewer: alexefro ms.topic: how-to -ms.date: 06/10/2025 +ms.date: 07/30/2025 --- # Configure streaming ingestion on your Azure Data Explorer cluster @@ -244,6 +244,9 @@ class Program Create your application for ingesting data to your cluster using your preferred language. +> [!NOTE] +> For queued ingestion, see the instructions to [Create an app to get data using queued ingestion](/kusto/api/get-started/app-queued-ingestion?view=azure-data-explorer&preserve-view=true) + ### [C#](#tab/csharp) ```csharp diff --git a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md index 000cc2a603..b00f929fa9 100644 --- a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md +++ b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md @@ -5,6 +5,8 @@ ms.reviewer: yogilad ms.topic: how-to ms.date: 02/03/2025 monikerRange: "azure-data-explorer" +zone_pivot_groups: ingest-api + # customer intent: To learn about creating an app to ingest data using Kusto’s managed streaming ingestion client. @@ -19,30 +21,48 @@ Streaming Ingestion allows writing data to Kusto with near-real-time latencies. In this article, you’ll learn how to ingest data to Kusto using the managed streaming ingestion client. You'll ingest a data stream in the form of a file or in-memory stream. > [!NOTE] -> Streaming ingestion is a high velocity ingestion protocol. Streaming Ingestion isn't the same as `IngestFromStream`. -> `IngestFromStream` is an API that takes in a memory stream and sends it for ingestion. `IngestFromStream` is available for all ingestion client implementations including queued and streaming ingestion. +> Streaming ingestion is a high-velocity ingestion protocol. Ingesting with a *Managed Streaming Ingestion* or *Streaming Ingestion* client isn't the same as ingesting with a *Stream Source*. +> +> The type of client refers to the _way_ data is ingested - When ingesting with a *Managed Streaming Ingestion* or *Streaming Ingestion* client, data is sent to Kusto using the streaming ingestion protocol - it uses a *Streaming Service* to allow for low latency ingestion. +> +> Ingesting from a *Stream Source* refers to how the data is stored. For example, in C# a *Stream Source* can be created from a `MemoryStream` object. This is as opposed to a *File Source* which is created from a file on disk. +> +> The ingestion method depends on the client used: with queued ingestion, the data from the source is first uploaded to blob storage and then queued for ingestion; with streaming ingestion, the data is sent directly to Kusto in the body of a streaming HTTP request. + +> [!IMPORTANT] +> +> The Ingest API now has two versions: V1 and V2. The V1 API is the original API, while the V2 API is a reimagined version that simplifies the ingest API while offering more customization. +> +> Ingest Version 2 is in **preview** and is available in the following languages: C# +> +> Also note, that the Query V2 API is not related to the Ingest V2 API. ## Streaming and Managed Streaming -Kusto SDKs provide two flavors of Streaming Ingestion Clients, `StreamingIngestionClient` and `ManagedStreamingIngestionClient` where Managed Streaming has built-in retry and failover logic. +Kusto SDKs provide two flavors of Streaming Ingestion Clients, A *Streaming Ingestion Client* and *Managed Streaming Ingestion Client* where Managed Streaming has built-in retry and failover logic -When ingesting with the `ManagedStreamingIngestionClient` API, failures and retries are handled automatically as follows: +> [!NOTE] +> This article shows how to use *Managed Streaming Ingestion*. If you wish to use plain *Streaming Ingestion* instead of *Managed Streaming*, simply change the instantiated client type to be *Streaming Ingestion Client*. +When ingesting with a *Managed Streaming Ingestion* API, failures and retries are handled automatically as follows: + Streaming requests that fail due to server-side size limitations are moved to queued ingestion. -+ Data that's larger than 4 MB is automatically sent to queued ingestion, regardless of format or compression. -+ Transient failure, for example throttling, are retried three times, then moved to queued ingestion. ++ Data that's estimated to be larger than the streaming limit is automatically sent to queued ingestion. + + The size of the streaming limit depends on the format and compression of the data. + + It's possible to change the limit by setting the *Size Factor* in the *Managed Streaming Ingest Policy*, passed in initialization. ++ Transient failures, for example throttling, are retried three times, then moved to queued ingestion. + Permanent failures aren't retried. > [!NOTE] -> If the streaming ingestion fails and the data is moved to queued ingestion, some delay is expected before the data is visible in the table. +> If the streaming ingestion fails and the data is moved to queued ingestion, then the data will take longer to be ingested, due to it being batched and queued for ingestion. You can control it via the [batching policy](/kusto/management/batching-policy). ## Limitations Data Streaming has some limitations compared to queuing data for ingestion. + + Tags can’t be set on data. -+ Mapping can only be provided using [`ingestionMappingReference`](/kusto/management/mappings?view=azure-data-explorer#mapping-with-ingestionmappingreference). Inline mapping isn't supported. ++ Mapping can only be provided using [`ingestionMappingReference`](/kusto/management/mappings?view=azure-data-explorer#mapping-with-ingestionmappingreference&preserve-view=true). Inline mapping isn't supported. + The payload sent in the request can’t exceed 10 MB, regardless of format or compression. -+ The `ignoreFirstRecord` property isn't supported for managed streaming ingestion, so ingested data must not contain a header row. ++ The `ignoreFirstRecord` property isn't supported for streaming ingestion, so ingested data must not contain a header row. For more information, see [Streaming Limitations](/azure/data-explorer/ingest-data-streaming#limitations). @@ -50,7 +70,7 @@ For more information, see [Streaming Limitations](/azure/data-explorer/ingest-da + Fabric or an Azure Data Explorer cluster where you have database User or higher rights. Provision a free cluster at . -+ [Set up your development environment](/kusto/api/get-started/app-set-up?view=azure-data-explorer) to use the Kusto client library. ++ [Set up your development environment](/kusto/api/get-started/app-set-up?view=azure-data-explorer&preserve-view=true) to use the Kusto client library. ## Before you begin @@ -61,7 +81,7 @@ Before creating the app, the following steps are required. Each step is detailed 1. Enable the streaming ingestion policy on the table. 1. Download the [stormevent.csv](https://github.com/MicrosoftDocs/dataexplorer-docs-samples/blob/main/docs/resources/app-managed-streaming-ingestion/stormevents.csv) sample data file containing 1,000 storm event records. -## Configure streaming ingestion +### Configure streaming ingestion To configure streaming ingestion, see [Configure streaming ingestion on your Azure Data Explorer cluster](/azure/data-explorer/ingest-data-streaming?tabs=azure-portal%2Ccsharp). It can take several minutes for the configuration to take effect. If you're using Fabric or a free cluster, streaming ingestion is automatically enabled. @@ -102,6 +122,7 @@ Create a basic client application which connects to the Kusto Help cluster. Enter the cluster query and ingest URI and database name in the relevant variables. The app uses two clients: one for querying and one for ingestion. Each client brings up a browser window to authenticate the user. +:::zone pivot="latest" ### [C#](#tab/c-sharp) The code sample includes a service function `PrintResultAsValueList()` for printing query results. @@ -114,57 +135,53 @@ dotnet add package Microsoft.Azure.Kusto.ingest ``` ```C# -using System; +using System.Data; + using Kusto.Data; +using Kusto.Data.Common; using Kusto.Data.Net.Client; using Kusto.Ingest; -using Kusto.Data.Common; -using Microsoft.Identity.Client; -using System.Data; -using System.Text; +using Kusto.Ingest.Common; + + +using Azure.Identity; -class Program +namespace BatchIngest; + +class BatchIngest { - static void Main(string[] args) + static async Task Main() { - var tableName = "MyStormEvents"; - var clusterUrl = ""; - var ingestionUrl = ""; - var databaseName = ""; - - var clusterKcsb = new KustoConnectionStringBuilder(clusterUrl).WithAadUserPromptAuthentication(); - var ingestionKcsb = new KustoConnectionStringBuilder(ingestionUrl).WithAadUserPromptAuthentication(); - - using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb)) - using (var ingestClient = KustoIngestFactory.CreateManagedStreamingIngestClient(clusterKcsb, ingestionKcsb)) - { - Console.WriteLine("Number of rows in " + tableName); - var queryProvider = KustoClientFactory.CreateCslQueryProvider(clusterKcsb); - var result = kustoClient.ExecuteQuery(databaseName, tableName + " | count", new ClientRequestProperties()); - - PrintResultAsValueList(result); + var tokenCredential = new InteractiveBrowserCredential(); + var clusterUri = ""; // e.g., "https://..kusto.windows.net" + var clusterKcsb = new KustoConnectionStringBuilder(clusterUri).WithAadAzureTokenCredentialsAuthentication(tokenCredential); + using var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb); + var database = ""; + var table = "MyStormEvents"; + var query = table + " | count"; + using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) + { + Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:"); + PrintResultsAsValueList(response); } } - - static void PrintResultAsValueList(IDataReader result) + static void PrintResultsAsValueList(IDataReader response) { - var row=0; - while (result.Read()) - { - row ++; - Console.WriteLine("row:" + row.ToString() + "\t"); - for (int i = 0; i < result.FieldCount; i++) + while (response.Read()) + { + for (var i = 0; i < response.FieldCount; i++) { - Console.WriteLine("\t" + result.GetName(i) + " - " + result.GetValue(i)); + object val = response.GetValue(i); + string value = val.ToString() ?? "None"; + Console.WriteLine("\t{0} - {1}", response.GetName(i), value); } - Console.WriteLine(); } } } ``` -## Stream a file for ingestion +### Stream a file for ingestion Use the `IngestFromStorageAsync` method to ingest the *stormevents.csv* file. @@ -173,13 +190,14 @@ Copy *stormevents.csv* file to the same location as your script. Since our input Add and ingestion section using the following lines to the end of `Main()`. ```csharp +using var ingestClient = KustoIngestFactory.CreateManagedStreamingIngestClient(clusterKcsb); var ingestProperties = new KustoIngestionProperties(databaseName, tableName) { Format = DataSourceFormat.csv }; //Ingestion section Console.WriteLine("Ingesting data from a file"); -ingestClient.IngestFromStorageAsync(".\\stormevents.csv", ingestProperties).Wait(); +await ingestClient.IngestFromStorageAsync(".\\stormevents.csv", ingestProperties); ``` Let’s also query the new number of rows and the most recent row after the ingestion. @@ -187,11 +205,11 @@ Add the following lines after the ingestion command: ```csharp Console.WriteLine("Number of rows in " + tableName); -result = kustoClient.ExecuteQuery(databaseName, tableName + " | count", new ClientRequestProperties()); +result = await kustoClient.ExecuteQueryAsync(databaseName, tableName + " | count", new ClientRequestProperties()); PrintResultAsValueList(result); Console.WriteLine("Example line from " + tableName); -result = kustoClient.ExecuteQuery(databaseName, tableName + " | top 1 by EndTime", new ClientRequestProperties()); +result = await kustoClient.ExecuteQueryAsync(databaseName, tableName + " | top 1 by EndTime", new ClientRequestProperties()); PrintResultAsValueList(result); ``` @@ -243,9 +261,9 @@ def main(): main() ``` -## Stream a file for ingestion +### Stream a file for ingestion -Use the `ingest_from_file()` API to ingest the *stormevents.csv* file. +Ingest the *stormevents.csv* file. Place the *stormevents.csv* file in the same location as your script. Since our input is a CSV file, use `DataFormat.CSV` in the ingestion properties. Add and ingestion section using the following lines to the end of `main()`. @@ -326,7 +344,7 @@ main().catch((err) => { ``` -## Stream a file for ingestion +### Stream a file for ingestion Use the `ingestFromFile()` API to ingest the *stormevents.csv* file. @@ -416,7 +434,7 @@ public class BatchIngestion { } ``` -## Stream a file for ingestion +### Stream a file for ingestion Use the `ingestFromFile()` method to ingest the *stormevents.csv* file. Place the *stormevents.csv* file in the same location as your script. Since our input is a CSV file, use `ingestionProperties.setDataFormat(DataFormat.CSV)` in the ingestion properties. @@ -459,6 +477,109 @@ printResultsAsValueList(primaryResults); ``` --- +:::zone-end +:::zone pivot="preview" + +### [C#](#tab/c-sharp) + +The code sample includes a service function `PrintResultAsValueList()` for printing query results. + +Add the Kusto libraries using the following commands: + +```powershell +dotnet add package Microsoft.Azure.Kusto.Data +dotnet add package Microsoft.Azure.Kusto.ingest.V2 +``` + +```C# +using System.Data; + +using Kusto.Data; +using Kusto.Data.Common; +using Kusto.Data.Net.Client; +using Kusto.Ingest.V2; +using Kusto.Ingest.Common; + +using Azure.Identity; + +namespace BatchIngest; + +class BatchIngest +{ + static async Task Main() + { + var tokenCredential = new InteractiveBrowserCredential(); + var clusterUri = ""; // e.g., "https://..kusto.windows.net" + var clusterKcsb = new KustoConnectionStringBuilder(clusterUri).WithAadAzureTokenCredentialsAuthentication(tokenCredential); + var database = ""; + var table = "MyStormEvents"; + var query = table + " | count"; + using var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb); + using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) + { + Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:"); + PrintResultsAsValueList(response); + } + } + + static void PrintResultsAsValueList(IDataReader response) + { + while (response.Read()) + { + for (var i = 0; i < response.FieldCount; i++) + { + object val = response.GetValue(i); + string value = val.ToString() ?? "None"; + Console.WriteLine("\t{0} - {1}", response.GetName(i), value); + } + } + } +} +``` + +### Stream a file for ingestion + +Use the `IngestAsync` method to ingest the *stormevents.csv* file. + +Copy *stormevents.csv* file to the same location as your script. Since our input is a CSV file, use `DataSourceFormat.csv` as the format in the `FileSource`. + +Add and ingestion section using the following lines to the end of `Main()`. + +```csharp +using var ingestClient = ManagedStreamingIngestClientBuilder.Create(new Uri(clusterUri)).WithAuthentication(tokenCredential).Build(); +var fileSource = new FileSource(.\\stormevents.csv, DataSourceFormat.csv); +await ingestClient.IngestAsync(fileSource, database, table); +``` + +Let’s also query the new number of rows and the most recent row after the ingestion. +Add the following lines after the ingestion command: + +```csharp +Console.WriteLine("Number of rows in " + tableName); +result = await kustoClient.ExecuteQueryAsync(databaseName, tableName + " | count", new ClientRequestProperties()); +PrintResultAsValueList(result); + +Console.WriteLine("Example line from " + tableName); +result = await kustoClient.ExecuteQueryAsync(databaseName, tableName + " | top 1 by EndTime", new ClientRequestProperties()); +PrintResultAsValueList(result); +``` + +### [Python](#tab/python) + +Not applicable + +### [TypeScript](#tab/typescript) + +Not applicable + +### [Java](#tab/java) + +Not applicable + + +--- + +:::zone-end The first time you run the application the results are as follows: @@ -485,6 +606,7 @@ row 1 : To ingest data from memory, create a stream containing the data for ingestion. +:::zone pivot="latest" ### [C#](#tab/c-sharp) To ingest the stream from memory, call the `IngestFromStreamAsync()` method. @@ -492,18 +614,19 @@ To ingest the stream from memory, call the `IngestFromStreamAsync()` method. Replace the ingestion section with the following code: ```csharp -// Ingestion section -Console.WriteLine("Ingesting data from memory"); -var singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,'{}'"; -byte[] byteArray = Encoding.UTF8.GetBytes(singleLine); -using (MemoryStream stream = new MemoryStream(byteArray)) - { + // Ingestion section + Console.WriteLine("Ingesting data from memory"); + var singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,'{}'"; + byte[] byteArray = Encoding.UTF8.GetBytes(singleLine); + using var ingestClient = KustoIngestFactory.CreateManagedStreamingIngestClient(clusterKcsb); + using var stream = new MemoryStream(byteArray); + var streamSourceOptions = new StreamSourceOptions { LeaveOpen = false }; - ingestClient.IngestFromStreamAsync(stream, ingestProperties, streamSourceOptions).Wait(); - } + + await ingestClient.IngestFromStreamAsync(stream, ingestProperties, streamSourceOptions); ``` ### [Python](#tab/python) @@ -559,8 +682,39 @@ try ( System.out.println("Error: " + e); } ``` +--- + +:::zone-end +:::zone pivot="preview" + +### [C#](#tab/c-sharp) +```csharp + // Ingestion section + Console.WriteLine("Ingesting data from memory"); + var singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,'{}'"; + byte[] byteArray = Encoding.UTF8.GetBytes(singleLine); + using var ingestClient = QueuedIngestClientBuilder.Create(new Uri(clusterUri)).WithAuthentication(tokenCredential).Build(); + using var ingestClient = KustoIngestFactory.CreateManagedStreamingIngestClient(clusterKcsb); + using var stream = new MemoryStream(byteArray); + var streamSource = new StreamSource(.\\stormevents.csv, DataSourceCompressionType.None, DataSourceFormat.csv); + await ingestClient.IngestAsync(streamSource, database, table); +``` + + +### [Python](#tab/python) + +Not applicable + +### [TypeScript](#tab/typescript) + +Not applicable + +### [Java](#tab/java) + +Not applicable --- +:::zone-end The results are as follows: diff --git a/data-explorer/kusto/api/get-started/app-queued-ingestion.md b/data-explorer/kusto/api/get-started/app-queued-ingestion.md index 501afae2e0..750e762726 100644 --- a/data-explorer/kusto/api/get-started/app-queued-ingestion.md +++ b/data-explorer/kusto/api/get-started/app-queued-ingestion.md @@ -3,14 +3,20 @@ title: Create an app to get data using queued ingestion description: Learn how to create an app to get data using queued ingestion of the Kusto client libraries. ms.reviewer: yogilad ms.topic: how-to -ms.date: 08/11/2024 +ms.date: 08/24/2025 monikerRange: "azure-data-explorer" +zone_pivot_groups: ingest-api + + #customer intent: To learn about creating an app to get data using queued ingestion. --- + # Create an app to get data using queued ingestion > [!INCLUDE [applies](../../includes/applies-to-version/applies.md)] [!INCLUDE [fabric](../../includes/applies-to-version/fabric.md)] [!INCLUDE [azure-data-explorer](../../includes/applies-to-version/azure-data-explorer.md)] + + Kusto is capable of handling mass data intake by optimizing and batching ingested data via its batching manager. The batching manager aggregates ingested data before it reaches its target table, allowing for more efficient processing and improved performance. Batching is typically done in bulks of 1 GB of raw data, 1000 individual files, or by a default time out of 5 minutes. Batching policies can be updated at the database and table levels, commonly to lower the batching time and reduce latency. For more information about ingestion batching, see [IngestionBatching policy](../../management/batching-policy.md) and [Change table level ingestion batching policy programmatically](app-management-commands.md#change-the-table-level-ingestion-batching-policy). > [!NOTE] @@ -24,6 +30,12 @@ In this article, you learn how to: > - [Queue in-memory data for ingestion and query the results](#queue-in-memory-data-for-ingestion-and-query-the-results) > - [Queue a blob for ingestion and query the results](#queue-a-blob-for-ingestion-and-query-the-results) +> [!IMPORTANT] +> +> The Ingest API now has two versions: V1 and V2. The V1 API is the original API, while the V2 API is a reimagined version that simplifies the ingest API while offering more customization. +> +> Ingest Version 2 is in **preview** and is available in the following languages: C# + ## Prerequisites - [Set up your development environment](app-set-up.md) to use the Kusto client library. @@ -32,12 +44,12 @@ In this article, you learn how to: - Use one of the following methods to create the *MyStormEvents* table and, as only a small amount of data is being ingested, set its ingestion batching policy timeout to 10 seconds: - ### [Run an app](#tab/app) + ### [Run an app](#tab/app) 1. Create a target table named *MyStormEvents* in your database by running the first app in [management commands](app-management-commands.md#run-a-management-command-and-process-the-results). - 1. Set the ingestion batching policy timeout to 10 seconds by running the second app in [management commands](app-management-commands.md#change-the-table-level-ingestion-batching-policy). Before running the app, change the timeout value to `00:00:10`. + 2. Set the ingestion batching policy timeout to 10 seconds by running the second app in [management commands](app-management-commands.md#change-the-table-level-ingestion-batching-policy). Before running the app, change the timeout value to `00:00:10`. - ### [Create with command](#tab/command) + ### [Create with command](#tab/command) 1. In your query environment, create a target table named *MyStormEvents* in your database by running the following query: @@ -52,23 +64,22 @@ In this article, you learn how to: StormSummary: dynamic) ``` - 1. Set the ingestion batching policy timeout to 10 seconds by running the following query: + 2. Set the ingestion batching policy timeout to 10 seconds by running the following query: ```kusto .alter-merge table MyStormEvents policy ingestionbatching '{ "MaximumBatchingTimeSpan":"00:00:10" }' ``` - --- - - > [!NOTE] - > It may take a few minutes for the new batching policy settings to propagate to the batching manager. + > [!NOTE] + > It may take a few minutes for the new batching policy settings to propagate to the batching manager. - Download the [stormevent.csv](https://github.com/MicrosoftDocs/dataexplorer-docs-samples/blob/main/docs/resources/app-basic-ingestion/stormevents.csv) sample data file. The file contains 1,000 storm event records. -> [!NOTE] -> -> The following examples assume a trivial match between the columns of the ingested data and the schema of the target table. -> If the ingested data doesn't trivially match the table schema, you must use an ingestion mapping to align the columns of the data with the table schema. + > [!NOTE] + > + > The following examples assume a trivial match between the columns of the ingested data and the schema of the target table. + > + > If the ingested data doesn't trivially match the table schema, you must use an ingestion mapping to align the columns of the data with the table schema. ## Queue a file for ingestion and query the results @@ -81,129 +92,130 @@ Add the following code: 1. Create a client app that connects to your cluster and prints the number of rows in the *MyStormEvents* table. You'll use this count as a baseline for comparison with the number of rows after each method of ingestion. Replace the `` and `` placeholders with your cluster URI and database name respectively. - ### [C\#](#tab/csharp) - + #### [C\#](#tab/csharp) + ```csharp + using System.Data; + using Kusto.Data; + using Kusto.Data.Common; using Kusto.Data.Net.Client; - - namespace BatchIngest { - class BatchIngest { - static void Main(string[] args) { - string clusterUri = ""; - var clusterKcsb = new KustoConnectionStringBuilder(clusterUri) - .WithAadUserPromptAuthentication(); - - using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb)) { - string database = ""; - string table = "MyStormEvents"; - - string query = table + " | count"; - using (var response = kustoClient.ExecuteQuery(database, query, null)) { - Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:"); - PrintResultsAsValueList(response); - } - } + + using Azure.Identity; + + namespace BatchIngest; + + class BatchIngest + { + static async Task Main() + { + var tokenCredential = new InteractiveBrowserCredential(); + var clusterUri = ""; // e.g., "https://..kusto.windows.net" + var clusterKcsb = new KustoConnectionStringBuilder(clusterUri).WithAadAzureTokenCredentialsAuthentication(tokenCredential); + + using var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb); + + var database = ""; + var table = "MyStormEvents"; + + var query = table + " | count"; + + using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) + { + Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:"); + PrintResultsAsValueList(response); } - - static void PrintResultsAsValueList(IDataReader response) { - string value; - while (response.Read()) { - for (int i = 0; i < response.FieldCount; i++) { - value = ""; - if (response.GetDataTypeName(i) == "Int32") - value = response.GetInt32(i).ToString(); - else if (response.GetDataTypeName(i) == "Int64") - value = response.GetInt64(i).ToString(); - else if (response.GetDataTypeName(i) == "DateTime") - value = response.GetDateTime(i).ToString(); - else if (response.GetDataTypeName(i) == "Object") - value = response.GetValue(i).ToString() ?? "{}"; - else - value = response.GetString(i); - - Console.WriteLine("\t{0} - {1}", response.GetName(i), value ?? "None"); + } + + static void PrintResultsAsValueList(IDataReader response) + { + while (response.Read()) + { + for (var i = 0; i < response.FieldCount; i++) + { + object val = response.GetValue(i); + string value = val.ToString() ?? "None"; + Console.WriteLine("\t{0} - {1}", response.GetName(i), value); } - } + } } } ``` - - ### [Python](#tab/python) - + + #### [Python](#tab/python) + ```python from azure.identity import InteractiveBrowserCredential from azure.kusto.data import KustoClient, KustoConnectionStringBuilder - + def main(): credentials = InteractiveBrowserCredential() cluster_uri = "" cluster_kcsb = KustoConnectionStringBuilder.with_azure_token_credential(cluster_uri, credentials) - + with KustoClient(cluster_kcsb) as kusto_client: database = "" table = "MyStormEvents" - + query = table + " | count" response = kusto_client.execute_query(database, query) print("\nNumber of rows in " + table + " BEFORE ingestion:") print_result_as_value_list(response) - + def print_result_as_value_list(response): cols = (col.column_name for col in response.primary_results[0].columns) - + for row in response.primary_results[0]: for col in cols: print("\t", col, "-", row[col]) - + if __name__ == "__main__": main() ``` - - ### [TypeScript](#tab/typescript) - + + #### [TypeScript](#tab/typescript) + ```typescript - import { Client as KustoClient, KustoConnectionStringBuilder } from "azure-kusto-data"; + import { Client, KustoConnectionStringBuilder } from "azure-kusto-data"; import { InteractiveBrowserCredential } from "@azure/identity"; - + async function main() { const credentials = new InteractiveBrowserCredential(); const clusterUri = ""; - const clusterKcsb = KustoConnectionStringBuilder.withAadUserPromptAuthentication(clusterUri, credentials); - + const clusterKcsb = KustoConnectionStringBuilder.withTokenCredential(clusterUri, credentials); + const kustoClient = new Client(clusterKcsb); - + const database = ""; const table = "MyStormEvents"; - + const query = table + " | count"; let response = await kustoClient.execute(database, query); console.log("\nNumber of rows in " + table + " BEFORE ingestion:"); printResultsAsValueList(response); } - + function printResultsAsValueList(response) { let cols = response.primaryResults[0].columns; - + for (row of response.primaryResults[0].rows()) { for (col of cols) console.log("\t", col.name, "-", row.getValueAt(col.ordinal) != null ? row.getValueAt(col.ordinal).toString() : "None") } } - + main(); ``` - + [!INCLUDE [node-vs-browser-auth](../../includes/node-vs-browser-auth.md)] - - - - - ### [Java](#tab/java) - + + + + #### [Java](#tab/java) + > [!NOTE] > The Java SDK doesn't currently support both clients sharing the same user prompt authenticator, resulting in a user prompt for each client. - + ```java import com.microsoft.azure.kusto.data.Client; import com.microsoft.azure.kusto.data.ClientFactory; @@ -211,16 +223,16 @@ Add the following code: import com.microsoft.azure.kusto.data.KustoResultSetTable; import com.microsoft.azure.kusto.data.KustoResultColumn; import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder; - + public class BatchIngestion { public static void main(String[] args) throws Exception { String clusterUri = ""; ConnectionStringBuilder clusterKcsb = ConnectionStringBuilder.createWithUserPrompt(clusterUri); - + try (Client kustoClient = ClientFactory.createClient(clusterKcsb)) { String database = ""; String table = "MyStormEvents"; - + String query = table + " | count"; KustoOperationResult results = kustoClient.execute(database, query); KustoResultSetTable primaryResults = results.getPrimaryResults(); @@ -228,7 +240,7 @@ Add the following code: printResultsAsValueList(primaryResults); } } - + public static void printResultsAsValueList(KustoResultSetTable results) { while (results.next()) { KustoResultColumn[] columns = results.getColumns(); @@ -240,105 +252,130 @@ Add the following code: } ``` - --- - 1. Create a connection string builder object that defines the data ingestion URI, where possible, using the sharing the same authentication credentials as the cluster URI. Replace the `` placeholder with data ingestion URI. - ### [C\#](#tab/csharp) - + ::: zone pivot="latest" + + #### [C\#](#tab/csharp) + ```csharp - using Kusto.Data.Common; - using Kusto.Ingest; - using System.Data; - - string ingestUri = ""; - var ingestKcsb = new KustoConnectionStringBuilder(ingestUri) - .WithAadUserPromptAuthentication(); + using Kusto.Ingest; // Add this import + + // No need to use a different connection string builder - the ingestion client can auto-correct to the ingestion URI ``` - - ### [Python](#tab/python) - + + #### [Python](#tab/python) + ```python from azure.kusto.data import DataFormat from azure.kusto.ingest import QueuedIngestClient, IngestionProperties - + ingest_uri = "" ingest_kcsb = KustoConnectionStringBuilder.with_azure_token_credential(ingest_uri, credentials) ``` - - ### [TypeScript](#tab/typescript) - + + #### [TypeScript](#tab/typescript) + ```typescript import { IngestClient, IngestionProperties, DataFormat } from "azure-kusto-ingest"; - + const ingestUri = ""; const ingestKcsb = KustoConnectionStringBuilder.withTokenCredential(ingestUri, credentials); ``` - - - - ### [Java](#tab/java) - + + + + #### [Java](#tab/java) + ```java import com.microsoft.azure.kusto.ingest.IngestClientFactory; import com.microsoft.azure.kusto.ingest.IngestionProperties; import com.microsoft.azure.kusto.ingest.IngestionProperties.DataFormat; import com.microsoft.azure.kusto.ingest.QueuedIngestClient; - + String ingestUri = ""; ConnectionStringBuilder ingestKcsb = ConnectionStringBuilder.createWithUserPrompt(ingestUri); ``` - - --- - -1. Ingest the *stormevent.csv* file by adding it to the batch queue. You use the following objects and properties: - - - **QueuedIngestClient** to create the ingest client. - - **IngestionProperties** to set the ingestion properties. - - **DataFormat** to specify the file format as *CSV*. - - **ignore_first_record** to specify whether the first row in CSV and similar file types is ignored, using the following logic: - - **True**: The first row is ignored. Use this option to drop the header row from tabular textual data. - - **False**: The first row is ingested as a regular row. - - [!INCLUDE [ingestion-size-limit](../../../includes/cross-repo/ingestion-size-limit.md)] - - ### [C\#](#tab/csharp) - + + ::: zone-end + ::: zone pivot="preview" + + #### [C\#](#tab/csharp) + ```csharp - using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) { - string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv"); - - Console.WriteLine("\nIngesting data from file: \n\t " + filePath); - var ingestProps = new KustoIngestionProperties(database, table) { - Format = DataSourceFormat.csv, - AdditionalProperties = new Dictionary() {{ "ignoreFirstRecord", "True" }} - }; - _= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result; - } + using Kusto.Ingest.V2; // Add this import + + // No need to use a different connection string builder - the ingestion client can auto-correct to the ingestion URI ``` - - ### [Python](#tab/python) - + + #### [Python](#tab/python) + + Not applicable + + #### [TypeScript](#tab/typescript) + + Not applicable + + + + #### [Java](#tab/java) + + Not applicable + + ::: zone-end + +1. Ingest the *stormevent.csv* file by adding it to the batch queue. + + ::: zone pivot="latest" + + You use the following objects and properties: + + - `QueuedIngestClient` to create the ingest client. + - `IngestionProperties` to set the ingestion properties. + - `DataFormat` to specify the file format as CSV. + - `ignore_first_record` to specify whether the first row in CSV and similar file types is ignored, using the following logic: + - `True`: The first row is ignored. Use this option to drop the header row from tabular textual data. + - `False`: The first row is ingested as a regular row. + + [!INCLUDE [ingestion-size-limit](../../../includes/cross-repo/ingestion-size-limit.md)] + + #### [C\#](#tab/csharp) + + ```csharp + using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(clusterKcsb); + + string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv"); + + Console.WriteLine("\nIngesting data from file: \n\t " + filePath); + var ingestProps = new KustoIngestionProperties(database, table) { + Format = DataSourceFormat.csv, + AdditionalProperties = new Dictionary() {{ "ignoreFirstRecord", "True" }} + }; + await ingestClient.IngestFromStorageAsync(filePath, ingestProps); + ``` + + #### [Python](#tab/python) + ```python import os - + with QueuedIngestClient(ingest_kcsb) as ingest_client: - file_path = os.path.join(os.path.dirname(__file__), "stormevents.csv") - print("\nIngesting data from file: \n\t " + file_path) - - ingest_props = IngestionProperties(database, table, DataFormat.CSV, ignore_first_record=True) - ingest_client.ingest_from_file(file_path, ingest_props) + file_path = os.path.join(os.path.dirname(__file__), "stormevents.csv") + print("\nIngesting data from file: \n\t " + file_path) + + ingest_props = IngestionProperties(database, table, DataFormat.CSV, ignore_first_record=True) + ingest_client.ingest_from_file(file_path, ingest_props) ``` - - ### [TypeScript](#tab/typescript) - + + #### [TypeScript](#tab/typescript) + ```typescript import path from 'path'; - + const ingestClient = new IngestClient(ingestKcsb); const filePath = path.join(__dirname, "stormevents.csv"); console.log("\nIngesting data from file: \n\t " + filePath); - + const ingestProps = new IngestionProperties({ database: database, table: table, @@ -347,17 +384,16 @@ Add the following code: }); await ingestClient.ingestFromFile(filePath, ingestProps); ``` - - - - ### [Java](#tab/java) - + + + #### [Java](#tab/java) + ```java import com.microsoft.azure.kusto.ingest.source.FileSourceInfo; - + try (QueuedIngestClient ingestClient = IngestClientFactory.createClient(ingestKcsb)) { FileSourceInfo fileSourceInfo = new FileSourceInfo(System.getProperty("user.dir") + "\\stormevents.csv", 0); - + System.out.println("\nIngesting data from file: \n\t " + fileSourceInfo.toString()); IngestionProperties ingestProps = new IngestionProperties(database, table); ingestProps.setDataFormat(DataFormat.CSV); @@ -365,168 +401,216 @@ Add the following code: ingestClient.ingestFromFile(fileSourceInfo, ingestProps); } ``` + + ::: zone-end + ::: zone pivot="preview" - --- + You use the following objects and properties: -1. Query the number of rows in the table after ingesting the file, and show the last row ingested. + - `QueuedIngestClientBuilder` to create the ingest client. + - `IngestProperties` is optional in most cases, but here is used to set `IgnoreFirstRecord`. + - `DataFormat` to specify the file format as `DataSourceFormat.csv`. + - `IgnoreFirstRecord` to specify whether the first row in CSV and similar file types is ignored, using the following logic: + - `True`: The first row is ignored. Use this option to drop the header row from tabular textual data. + - `False`: The first row is ingested as a regular row. - > [!NOTE] - > To allow time for the ingestion to complete, wait 30 seconds before querying the table. For C\# wait 60 seconds to allow time for adding the file to the ingestion queue asynchronously. + [!INCLUDE [ingestion-size-limit](../../../includes/cross-repo/ingestion-size-limit.md)] - ### [C\#](#tab/csharp) + #### [C\#](#tab/csharp) ```csharp - Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ..."); - Thread.Sleep(TimeSpan.FromSeconds(60)); + using var ingestClient = QueuedIngestClientBuilder.Create(new Uri(clusterUri)).WithAuthentication(tokenCredential).Build(); + + string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv"); + + var fileSource = new FileSource(filePath, DataSourceFormat.csv); + var props = new IngestProperties() { IgnoreFirstRecord = true }; + + Console.WriteLine("\nIngesting data from file: \n\t " + filePath); + + await ingestClient.IngestAsync(fileSource, database, table, props); + ``` + + #### [Python](#tab/python) - using (var response = kustoClient.ExecuteQuery(database, query, null)) { + Not applicable + + #### [TypeScript](#tab/typescript) + + Not applicable + + + + #### [Java](#tab/java) + + Not applicable + + ::: zone-end + +1. Query the number of rows in the table after ingesting the file, and show the last row ingested. + + > [!NOTE] + > To allow time for the ingestion to complete, wait 30 seconds before querying the table. For C\# wait 60 seconds to allow time for adding the file to the ingestion queue asynchronously. + + #### [C\#](#tab/csharp) + + ```csharp + Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ..."); + await Task.Delay(TimeSpan.FromSeconds(60)); + + using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) { Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:"); PrintResultsAsValueList(response); } - + query = table + " | top 1 by ingestion_time()"; - using (var response = kustoClient.ExecuteQuery(database, query, null)) { + using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) { Console.WriteLine("\nLast ingested row:"); PrintResultsAsValueList(response); } ``` - - ### [Python](#tab/python) - + + #### [Python](#tab/python) + ```python # Add this to the imports at the top of the file import time - + # Add this to the main method print("\nWaiting 30 seconds for ingestion to complete ...") time.sleep(30) - + response = kusto_client.execute_query(database, query) print("\nNumber of rows in " + table + " AFTER ingesting the file:") print_result_as_value_list(response) - + query = table + " | top 1 by ingestion_time()" response = kusto_client.execute_query(database, query) print("\nLast ingested row:") print_result_as_value_list(response) ``` - - ### [TypeScript](#tab/typescript) - + + #### [TypeScript](#tab/typescript) + ```typescript console.log("\nWaiting 30 seconds for ingestion to complete ..."); await sleep(30000); - + response = await kustoClient.execute(database, query); console.log("\nNumber of rows in " + table + " AFTER ingesting the file:"); printResultsAsValueList(response); - + query = table + " | top 1 by ingestion_time()" response = await kustoClient.execute(database, query); console.log("\nLast ingested row:"); printResultsAsValueList(response); - + // Add the sleep function after the main method function sleep(time) { return new Promise(resolve => setTimeout(resolve, time)); } ``` - - - - ### [Java](#tab/java) - + + + + #### [Java](#tab/java) + ```java - System.out.println("\nWaiting 30 seconds for ingestion to complete ..."); - Thread.sleep(30000); - - response = kustoClient.execute(database, query); - primaryResults = response.getPrimaryResults(); - System.out.println("\nNumber of rows in " + table + " AFTER ingesting the file:"); - printResultsAsValueList(primaryResults); - - query = table + " | top 1 by ingestion_time()"; - response = kustoClient.execute(database, query); - primaryResults = response.getPrimaryResults(); - System.out.println("\nLast ingested row:"); - printResultsAsValueList(primaryResults); + System.out.println("\nWaiting 30 seconds for ingestion to complete ..."); + Thread.sleep(30000); + + response = kustoClient.execute(database, query); + primaryResults = response.getPrimaryResults(); + System.out.println("\nNumber of rows in " + table + " AFTER ingesting the file:"); + printResultsAsValueList(primaryResults); + + query = table + " | top 1 by ingestion_time()"; + response = kustoClient.execute(database, query); + primaryResults = response.getPrimaryResults(); + System.out.println("\nLast ingested row:"); + printResultsAsValueList(primaryResults); ``` - + --- The complete code should look like this: -### [C\#](#tab/csharp) +::: zone pivot="latest" + +#### [C\#](#tab/csharp) ```csharp +using System.Data; + using Kusto.Data; -using Kusto.Data.Net.Client; using Kusto.Data.Common; +using Kusto.Data.Net.Client; using Kusto.Ingest; -using System.Data; -namespace BatchIngest { - class BatchIngest { - static void Main(string[] args) { - string clusterUri = ""; - var clusterKcsb = new KustoConnectionStringBuilder(clusterUri) - .WithAadUserPromptAuthentication(); - string ingestUri = ""; - var ingestKcsb = new KustoConnectionStringBuilder(ingestUri) - .WithAadUserPromptAuthentication(); - - - using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb)) - using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) { - string database = ""; - string table = "MyStormEvents"; - string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv"); - - string query = table + " | count"; - using (var response = kustoClient.ExecuteQuery(database, query, null)) { - Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:"); - PrintResultsAsValueList(response); +namespace BatchIngest; + +class BatchIngest +{ + static async Task Main() + { + var clusterUri = ""; // e.g., "https://..kusto.windows.net" + var clusterKcsb = new KustoConnectionStringBuilder(clusterUri).WithAadUserPromptAuthentication(); + + using var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb); + + var database = ""; + var table = "MyStormEvents"; + + var query = table + " | count"; + + using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) + { + Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:"); + PrintResultsAsValueList(response); } - + + using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(clusterKcsb); + + string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv"); + Console.WriteLine("\nIngesting data from file: \n\t " + filePath); var ingestProps = new KustoIngestionProperties(database, table) { - Format = DataSourceFormat.csv, - AdditionalProperties = new Dictionary() {{ "ignoreFirstRecord", "True" }} + Format = DataSourceFormat.csv, + AdditionalProperties = new Dictionary() {{ "ignoreFirstRecord", "True" }} }; - _= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result; - + await ingestClient.IngestFromStorageAsync(filePath, ingestProps); + Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ..."); - Thread.Sleep(TimeSpan.FromSeconds(60)); + await Task.Delay(TimeSpan.FromSeconds(60)); - using (var response = kustoClient.ExecuteQuery(database, query, null)) { - Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:"); - PrintResultsAsValueList(response); + using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) { + Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:"); + PrintResultsAsValueList(response); } query = table + " | top 1 by ingestion_time()"; - using (var response = kustoClient.ExecuteQuery(database, query, null)) - { - Console.WriteLine("\nLast ingested row:"); - PrintResultsAsValueList(response); + using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) { + Console.WriteLine("\nLast ingested row:"); + PrintResultsAsValueList(response); } - } } - - static void PrintResultsAsValueList(IDataReader response) { - while (response.Read()) { - for (int i = 0; i < response.FieldCount; i++) { - if (response.GetDataTypeName(i) == "Int64") - Console.WriteLine("\t{0} - {1}", response.GetName(i), response.IsDBNull(i) ? "None" : response.GetInt64(i)); - else - Console.WriteLine("\t{0} - {1}", response.GetName(i), response.IsDBNull(i) ? "None" : response.GetString(i)); + + static void PrintResultsAsValueList(IDataReader response) + { + while (response.Read()) + { + for (var i = 0; i < response.FieldCount; i++) + { + object val = response.GetValue(i); + string value = val.ToString() ?? "None"; + Console.WriteLine("\t{0} - {1}", response.GetName(i), value); + } } - } } - } } ``` -### [Python](#tab/python) +#### [Python](#tab/python) ```python import os @@ -581,7 +665,7 @@ if __name__ == "__main__": main() ``` -### [TypeScript](#tab/typescript) +#### [TypeScript](#tab/typescript) ```typescript import path from 'path'; @@ -645,9 +729,7 @@ function printResultsAsValueList(response) { main(); ``` - - -### [Java](#tab/java) +#### [Java](#tab/java) ```java import com.microsoft.azure.kusto.data.Client; @@ -716,24 +798,120 @@ public class BatchIngestion { --- -## Run your app +::: zone-end + +::: zone pivot="preview" + +#### [C\#](#tab/csharp) + + ```csharp +using System.Data; +using Azure.Identity; +using Kusto.Data; +using Kusto.Data.Common; +using Kusto.Data.Net.Client; +using Kusto.Ingest.V2; + +namespace BatchIngest; + +class BatchIngest +{ + static async Task Main() + { + var tokenCredential = new InteractiveBrowserCredential(); + var clusterUri = ""; // e.g., "https://..kusto.windows.net" + var clusterKcsb = new KustoConnectionStringBuilder(clusterUri).WithAadAzureTokenCredentialsAuthentication(tokenCredential); + + using var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb); + + var database = ""; + var table = "MyStormEvents"; + + var query = table + " | count"; + + using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) + { + Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:"); + PrintResultsAsValueList(response); + } + + using var ingestClient = QueuedIngestClientBuilder.Create(new Uri(clusterUri)).WithAuthentication(tokenCredential).Build(); + + string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv"); + + var fileSource = new FileSource(filePath, DataSourceFormat.csv); + var props = new IngestProperties() { IgnoreFirstRecord = true }; + + Console.WriteLine("\nIngesting data from file: \n\t " + filePath); + + await ingestClient.IngestAsync(fileSource, database, table, props); + + Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ..."); + await Task.Delay(TimeSpan.FromSeconds(60)); + + using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) { + Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:"); + PrintResultsAsValueList(response); + } + + query = table + " | top 1 by ingestion_time()"; + using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) { + Console.WriteLine("\nLast ingested row:"); + PrintResultsAsValueList(response); + } + } + + static void PrintResultsAsValueList(IDataReader response) + { + while (response.Read()) + { + for (var i = 0; i < response.FieldCount; i++) + { + object val = response.GetValue(i); + string value = val.ToString() ?? "None"; + Console.WriteLine("\t{0} - {1}", response.GetName(i), value); + } + } + } +} + ``` + +#### [Python](#tab/python) + +Not applicable + +#### [TypeScript](#tab/typescript) + +Not applicable + + + +#### [Java](#tab/java) + +Not applicable + +--- + +::: zone-end + +### Run your app In a command shell, use the following command to run your app: -### [C\#](#tab/csharp) +#### [C\#](#tab/csharp) ```bash # Change directory to the folder that contains the management commands project dotnet run . ``` -### [Python](#tab/python) +#### [Python](#tab/python) ```bash python basic_ingestion.py ``` -### [TypeScript](#tab/typescript) +#### [TypeScript](#tab/typescript) In a Node.js environment: @@ -750,9 +928,9 @@ npm run dev > [!NOTE] > In a browser environment, open the [developer tools console](/microsoft-edge/devtools-guide-chromium/console/) to see the output. - + -### [Java](#tab/java) +#### [Java](#tab/java) ```bash mvn install exec:java -Dexec.mainClass=".BatchIngestion" @@ -792,26 +970,26 @@ For example, you can modify the app replacing the *ingest from file* code, as fo 1. Add the stream descriptor package to the imports at the top of the file. - ### [C\#](#tab/csharp) + #### [C\#](#tab/csharp) - No additional packages are required. + No additional packages are required. - ### [Python](#tab/python) + #### [Python](#tab/python) ```python import io from azure.kusto.ingest import StreamDescriptor ``` - ### [TypeScript](#tab/typescript) + #### [TypeScript](#tab/typescript) ```typescript import { Readable } from "stream"; ``` - + - ### [Java](#tab/java) + #### [Java](#tab/java) ```java import java.io.ByteArrayInputStream; @@ -820,25 +998,23 @@ For example, you can modify the app replacing the *ingest from file* code, as fo import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo; ``` - --- - 1. Add an in-memory string with the data to ingest. - ### [C\#](#tab/csharp) + #### [C\#](#tab/csharp) ```csharp string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\""; var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine)); ``` - ### [Python](#tab/python) + #### [Python](#tab/python) ```python single_line = '2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,"{}"' string_stream = io.StringIO(single_line) ``` - ### [TypeScript](#tab/typescript) + #### [TypeScript](#tab/typescript) ```typescript const singleLine = '2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,"{}"'; @@ -847,9 +1023,9 @@ For example, you can modify the app replacing the *ingest from file* code, as fo stringStream.push(null); ``` - + - ### [Java](#tab/java) + #### [Java](#tab/java) ```java String singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\""; @@ -857,111 +1033,156 @@ For example, you can modify the app replacing the *ingest from file* code, as fo StreamSourceInfo streamSourceInfo = new StreamSourceInfo(stream); ``` - --- - 1. Set the ingestion properties to not ignore the first record as the in-memory string doesn't have a header row. - ### [C\#](#tab/csharp) + ::: zone pivot="latest" + #### [C\#](#tab/csharp) + ```csharp ingestProps.AdditionalProperties = new Dictionary() {{ "ignoreFirstRecord", "False" }}; ``` - - ### [Python](#tab/python) - + + #### [Python](#tab/python) + ```python ingest_props = IngestionProperties(database, table, DataFormat.CSV, ignore_first_record=False) ``` - - ### [TypeScript](#tab/typescript) - + + #### [TypeScript](#tab/typescript) + ```typescript ingestProps.ignoreFirstRecord = false; ``` - - - - ### [Java](#tab/java) - + + + + #### [Java](#tab/java) + ```java ingestProps.setIgnoreFirstRecord(false); ``` - - --- - + + ::: zone-end + + ::: zone pivot="preview" + + #### [C\#](#tab/csharp) + + ```csharp + // Remove the IngestionProperties object `props` + ``` + + #### [Python](#tab/python) + + Not applicable + + #### [TypeScript](#tab/typescript) + + Not applicable + + + + #### [Java](#tab/java) + + Not applicable + + ::: zone-end + 1. Ingest the in-memory data by adding it to the batch queue. Where possible, provide the size of the raw data. +1. + ::: zone pivot="latest" - ### [C\#](#tab/csharp) - + #### [C\#](#tab/csharp) + ```csharp - _= ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length}).Result; + _= await ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length}); ``` - - ### [Python](#tab/python) - + + #### [Python](#tab/python) + ```python stream_descriptor = StreamDescriptor(string_stream, is_compressed=False, size=len(single_line)) ingest_client.ingest_from_stream(stream_descriptor, ingest_props) ``` - - ### [TypeScript](#tab/typescript) - + + #### [TypeScript](#tab/typescript) + ```typescript stringStream.size = singleLine.length; await ingestClient.ingestFromStream(stringStream, ingestProps); ``` - - - - ### [Java](#tab/java) - + + + + #### [Java](#tab/java) + ```java ingestClient.ingestFromStream(streamSourceInfo, ingestProps); ``` + ::: zone-end + ::: zone pivot="preview" - --- + #### [C\#](#tab/csharp) + + ```csharp + var streamSource = new StreamSource(stringStream, DataSourceCompressionType.None, DataSourceFormat.csv); + + await ingestClient.IngestAsync(streamSource, database, table); + ``` + + #### [Python](#tab/python) + + Not applicable + + #### [TypeScript](#tab/typescript) + + Not applicable + + + + #### [Java](#tab/java) + + Not applicable + + ::: zone-end An outline of the updated code should look like this: -### [C\#](#tab/csharp) +::: zone pivot="latest" + +#### [C\#](#tab/csharp) ```csharp +using System.Data; +using Azure.Identity; using Kusto.Data; -using Kusto.Data.Net.Client; using Kusto.Data.Common; +using Kusto.Data.Net.Client; using Kusto.Ingest; -using System.Data; - -namespace BatchIngest { - class BatchIngest { - static void Main(string[] args) { - ... - string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\""; - var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine)); - - using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb)) - using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) { - string database = ""; - string table = "MyStormEvents"; +namespace BatchIngest; + +class BatchIngest +{ + static async Task Main() + { + string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\""; + var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine)); ... - - Console.WriteLine("\nIngesting data from memory:"); - ingestProps.AdditionalProperties = new Dictionary() {{ "ignoreFirstRecord", "False" }}; - _= ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length}).Result; - + + _= await ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length}); ... - } } - - static void PrintResultsAsValueList(IDataReader response) { - ... + + static void PrintResultsAsValueList(IDataReader response) + { + ... } - } } ``` -### [Python](#tab/python) +#### [Python](#tab/python) ```python import io @@ -996,7 +1217,7 @@ if __name__ == "__main__": main() ``` -### [TypeScript](#tab/typescript) +#### [TypeScript](#tab/typescript) ```typescript import path from 'path'; @@ -1038,9 +1259,9 @@ function printResultsAsValueList(response) { main(); ``` - + -### [Java](#tab/java) +#### [Java](#tab/java) ```java import com.microsoft.azure.kusto.data.Client; @@ -1088,6 +1309,62 @@ public class BatchIngestion { --- +::: zone-end + +::: zone pivot="preview" + +#### [C\#](#tab/csharp) + +```csharp +using System.Data; +using Azure.Identity; +using Kusto.Data; +using Kusto.Data.Common; +using Kusto.Data.Net.Client; +using Kusto.Ingest.V2; + +namespace BatchIngest; + +class BatchIngest +{ + static async Task Main() + { + string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\""; + var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine)); + ... + + var streamSource = new StreamSource(stringStream, DataSourceCompressionType.None, DataSourceFormat.csv); + + await ingestClient.IngestAsync(streamSource, database, table); + + ... + } + + static void PrintResultsAsValueList(IDataReader response) + { + ... + } +} +``` + +#### [Python](#tab/python) + +Not applicable + +#### [TypeScript](#tab/typescript) + +Not applicable + + + +#### [Java](#tab/java) + +Not applicable + +--- + +::: zone-end + When you run the app, you should see a result similar to the following. Notice that after the ingestion, the number of rows in the table increased by one. ```bash @@ -1121,79 +1398,103 @@ For example, you can modify the app replacing the *ingest from memory* code with 1. Add the blob descriptor package to the imports at the top of the file. - ### [C\#](#tab/csharp) + #### [C\#](#tab/csharp) - No additional packages are required. + No additional packages are required. - ### [Python](#tab/python) + #### [Python](#tab/python) ```python from azure.kusto.ingest import BlobDescriptor ``` - ### [TypeScript](#tab/typescript) + #### [TypeScript](#tab/typescript) ```typescript No additional packages are required. ``` - + - ### [Java](#tab/java) + #### [Java](#tab/java) ```java import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo; ``` - --- - 1. Create a blob descriptor using the blob URI, set the ingestion properties, and then ingest data from the blob. Replace the `` placeholder with the blob URI. - - ### [C\#](#tab/csharp) - + ::: zone pivot="latest" + + #### [C\#](#tab/csharp) + ```csharp string blobUri = ""; - + ingestProps.AdditionalProperties = new Dictionary() { { "ignoreFirstRecord", "True" } }; _= ingestClient.IngestFromStorageAsync(blobUri, ingestProps).Result; ``` - - ### [Python](#tab/python) - + + #### [Python](#tab/python) + ```python blob_uri = "" - + ingest_props = IngestionProperties(database, table, DataFormat.CSV, ignore_first_record=True) blob_descriptor = BlobDescriptor(blob_uri) ingest_client.ingest_from_blob(blob_descriptor, ingest_props) ``` - - ### [TypeScript](#tab/typescript) - + + #### [TypeScript](#tab/typescript) + ```typescript const blobUri = ""; - + ingestProps.ignoreFirstRecord = true; await ingestClient.ingestFromBlob(blobUri, ingestProps); ``` - - - - ### [Java](#tab/java) - + + + + #### [Java](#tab/java) + ```java String blobUri = ""; - + ingestProps.setIgnoreFirstRecord(true); BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobUri, 100); ingestClient.ingestFromBlob(blobSourceInfo, ingestProps); ``` - - --- + + ::: zone-end + ::: zone pivot="preview" + + #### [C\#](#tab/csharp) + + ```csharp + var blobSource = new BlobSource("() { { "ignoreFirstRecord", "True" } }; - _=_ ingestClient.IngestFromStorageAsync(blobUri, ingestProps).Result; - + await ingestClient.IngestFromStorageAsync(blobUri, ingestProps); + ... - } } - - static void PrintResultsAsValueList(IDataReader response) { - ... + + static void PrintResultsAsValueList(IDataReader response) + { + ... } - } } ``` -### [Python](#tab/python) +#### [Python](#tab/python) ```python import time @@ -1264,7 +1559,7 @@ if __name__ == "__main__": main() ``` -### [TypeScript](#tab/typescript) +#### [TypeScript](#tab/typescript) ```typescript import path from 'path'; @@ -1302,9 +1597,9 @@ function printResultsAsValueList(response) { main(); ``` - + -### [Java](#tab/java) +#### [Java](#tab/java) ```java import com.microsoft.azure.kusto.data.Client; @@ -1348,6 +1643,62 @@ public class BatchIngestion { --- +::: zone-end + +::: zone pivot="preview" + +#### [C\#](#tab/csharp) + +```csharp +using Kusto.Data; +using Kusto.Data.Net.Client; +using Kusto.Data.Common; +using Kusto.Ingest; +using System.Data; + +namespace BatchIngest; + +class BatchIngest +{ + static async Task Main() + { + string blobUri = ""; + ... + + Console.WriteLine("\nIngesting data from memory:"); + var blobSource = new BlobSource(" + +#### [Java](#tab/java) + +Not applicable + +--- + +::: zone-end + When you run the app, you should see a result similar to the following. Notice that after the ingestion, the number of rows in the table increased by 1,000. ```bash diff --git a/data-explorer/kusto/api/netfx/kusto-ingest-migrate.md b/data-explorer/kusto/api/netfx/kusto-ingest-migrate.md new file mode 100644 index 0000000000..8973c1d623 --- /dev/null +++ b/data-explorer/kusto/api/netfx/kusto-ingest-migrate.md @@ -0,0 +1,371 @@ +--- +title: Kusto Ingest - Migrate V1 to V2 +description: Guide to migrating Kusto ingestion from Ingest V1 to Ingest V2. Covers client builders, sources, uploaders, ingestion properties, and status tracking with examples. +ms.topic: concept-article +ms.date: 08/27/2025 +ms.reviewer: yogilad +--- + +# Migrating from Ingest V1 to Ingest V2 + +This guide provides code migration steps from Ingest V1 SDK to the Ingest V2 SDK. The guide walks through key differences and improvements and includes detailed code examples and best practice recommendations. + +## Creating the Ingest client + +### Ingest V1 - Factory methods + +In Ingest V1, clients are created using static factory methods, built from a connection string. + +**Example (V1):** + +```csharp +var kcsb = new KustoConnectionStringBuilder(clusterUrl).WithAadUserPromptAuthentication(); +var client = KustoIngestFactory.CreateQueuedIngestClient(kcsb); +``` + +### Ingest V2: Builder pattern + +In Ingest V2, clients are created using builder classes. This pattern allows for more flexible and readable configuration, supports method chaining, and makes it easier to add new options in the future. + +Connection strings are no longer used to create clients. Instead, you provide a `Uri` for the cluster and an authentication provider. + +An authentication provider can be any implementation of `Azure.Identity.TokenCredential`, Kusto's `IKustoTokenCredentialsProvider`, or a delegate function of the format `Func>`. + +**Example - Ingest V2:** + +```csharp +using Azure.Identity; +using Kusto.Ingest.V2; + +var auth = new InteractiveBrowserCredential(); +var client = QueuedIngestClientBuilder.Create(new Uri(clusterUrl)) + .WithAuthentication(auth) + .Build(); +``` + +**Other V2 clients (similar to V1):** + +- `StreamingIngestClientBuilder` +- `ManagedStreamingIngestClientBuilder` + +> [!NOTE] +> In both V1 and V2, you can now pass the cluster URL to a client without the "ingest-" prefix. The constructor converts the URL to the correct format automatically. + +## Managed Streaming ingestion + +Managed Streaming Ingest client is similar to the one in V1 - it tries to stream the data first, and if it fails after a few retries, it falls back to queued ingestion. + +In V2, `ManagedStreamingPolicy` becomes the `IManagedStreamingPolicy` interface, which provides methods for finer control of managed streaming. + +## Sources + +### Ingest V1: Multiple methods for different sources + +V1 exposes different methods for each source type, such as `IngestFromStorage`, `IngestFromStreamAsync`, etc. Format and compression are provided in `ingestionProperties`. + +**Example (V1):** + +```csharp +var ingestionProperties = new KustoIngestionProperties(database, table) { Format = DataSourceFormat.csv }; +await client.IngestFromStorageAsync(filePath, ingestionProperties); +await client.IngestFromStreamAsync(stream, ingestionProperties); +``` + +### Ingest V2: Unified source abstractions + +V2 introduces source classes that encapsulate all relevant information, including format and compression. There's one method to ingest a single source: the `IngestAsync` method. The client and source types determine the ingestion behavior. + +**Source types:** + +- `FileSource` (local files) +- `StreamSource` (.NET streams) +- `BlobSource` (cloud storage) +- `DataReaderSource` (.NET data readers) + +The "database" and "table" properties are now parameters of the `IngestAsync` method, rather than properties of the ingestion properties. This means that for most cases, you don't need to create `IngestProperties`. + +**Example - Ingest V2:** + +```csharp +var source = new FileSource(filePath, DataSourceFormat.csv); +await client.IngestAsync(source, database, table); +``` + +## Uploaders + +> [!IMPORTANT] +> +> * The V1 Ingest SDK operated almost entirely with Azure Storage and Azure Queue and calls to Kusto were fairly limited in quantity. +> +> * The V2 ingest SDK replaces Azure Queue operations with calls to REST calls Kusto. This means the ingest client is more sensitive to Kusto maintenance windows and request rates. Please be mindful of this an incorporate retries and throttle backoffs in your applications that meet your scale and volume of ingestion needs. + +V1 had several limitations: + +- It was done implicitly without any control by the SDK user. +- It always used Kusto's internal storage + - It had hidden costs for the user. + - Couldn't be monitored or scaled by the user. + - Private networks caused issues. +- It always used the same strategy to upload the data. + +In V2, uploaders are introduced to provide more flexibility and control over the upload process. + +- `UserContainersUploader` - Uploads data to a list of user-defined Azure Blob Storage containers. +- `ManagedUploader` - Uploads data to Kusto's internal storage, similar to V1. + +You can also implement your own uploader by implementing the `IUploader` interface. + +> [!NOTE] +> Uploaders are only relevant for queued ingestion, done from `QueuedIngestClient` or `ManagedStreamingIngestClient`. +> +> Streaming ingestion sends the data directly in the HTTP request, so it doesn't use uploaders. + +By default, the clients create a `ManagedUploader` instance, but you can specify a different uploader using the `WithUploader` method in the builder. + +**Example - Ingest V2:** + +```csharp +var client = QueuedIngestClientBuilder.Create(new Uri(clusterUrl)) + .WithAuthentication(auth) + .Build(); + +// Equivalent to: +var uploader = ManagedUploaderBuilder.Create() + .WithAuthentication(auth) + .Build(); +var client = QueuedIngestClientBuilder.Create(new Uri(clusterUrl)) + .WithUploader(uploader) + .Build(); + +// Or, with user provided upload containers: +var uploader = UserContainersUploaderBuilder.Create() + .AddContainer("
") + .AddContainer("", tokenCredential) + .Build(); +``` + +### Manual uploads + +You can use the uploader to convert local and data reader sources to `BlobSource`, and follow by ingesting them with the V2 client. + +This allows for greater control of retry and failure behaviors, building ingestion pipelines where upload and ingestion are handled in parallel, and leveraging the Multi-Ingestion API provided by V2 (see advanced topics below). + +```csharp +var source = new FileSource(filePath, DataSourceFormat.csv); +BlobSource blobSource = await uploader.UploadAsync(source); +``` + +Or even multiple sources: + +```csharp +BlobSource source1 = new FileSource(filePath1, DataSourceFormat.csv); +BlobSource source2 = new StreamSource(stream, DataSourceFormat.json); +(IEnumerable successes, IEnumerable failures) = await uploader.UploadAsync(new[] { source1, source2 }); +``` + +## Ingestion properties + +As mentioned before, in V2, these properties are no longer part of the ingestion properties class: + +- `database` and `table` are now parameters of the `IngestAsync` method. +- `format` and `compression` are now part of the source classes. + +The new class is named `IngestProperties`, and it contains properties that are relevant to the ingestion process, some examples: + +- `EnableTracking` - Whether to enable tracking for the ingestion. +- `MappingReference` - The name of the mapping to use for the ingestion. +- `SkipBatching` - Whether to skip batching and ingest the data immediately (equivalent to `FlushImmediately` in V1). + Not recommended for most use cases. + +**Example (V2):** + +```csharp +var source = new FileSource(filePath, DataSourceFormat.csv); +var properties = new IngestProperties +{ + EnableTracking = true, + MappingReference = "MyMapping" +}; +await client.IngestAsync(source, database, table, properties); +``` + +## Status tracking + +In V1, status tracking was done using the `ReportLevel` and `ReportMethod` properties in the ingestion properties. + +Tracking was reimplemented in V2 and is now simpler. + +### Queued Ingestion + +When data is queued for ingestion (via a `QueuedIngestClient` or `ManagedStreamingIngestClient`), the ingestion is asynchronous, and the result isn't immediate. + +The method returns an `IngestionOperation` object: + +```csharp +var source = new FileSource(filePath, DataSourceFormat.csv); +var operation = await client.IngestAsync(source, database, table); +Assert.IsTrue(operation.IngestionMethod == IngestionMethod.Queued); +``` + +If `IngestAsync` returns successfully, the data was queued for ingestion, but it can still fail later in the ingestion pipeline. +If you need to track the status of the ingestion, enable tracking by setting `EnableTracking` to true in the `IngestProperties`: + +```csharp +var properties = new IngestProperties { EnableTracking = true }; +var operation = await client.IngestAsync(source, database, table, properties); +``` + +Then, `operation` becomes a handle to the ingestion operation, and you can use it to track the status of the ingestion using the client: + +```csharp +var summary = await client.GetOperationSummaryAsync(operation); +// `summary.Status` can be `Succeeded`, `Failed`, `InProgress`, or `Cancelled`. +``` + +You may query the status again until it's no longer `InProgress`. + +To get more details about the ingestion, use the `GetIngestionDetailsAsync` method: + +```csharp +var details = await client.GetIngestionDetailsAsync(operation); +var blob = details.IngestResults.Single(); +blob.Status // Succeeded, Failed, etc. +blob.Details // Additional details about the ingestion +blob.Exception // If the ingestion failed, this will contain the exception details +``` + +> [!IMPORTANT] +> +> - Each call to `GetOperationSummaryAsync` or `GetIngestionDetailsAsync` will make an HTTP request to the Kusto service. +> - Too frequent calls may lead to throttling or performance issues. +> - Consider waiting a few seconds between calls, or using a backoff strategy. + +### Streaming Ingestion + +For streaming ingestion, the result of the ingestion is immediate. +If the method returns successfully, the data was ingested successfully. + +Still, the interfaces of the methods are the same, and `GetOperationSummaryAsync` and `GetOperationDetailsAsync` return the expected results. + +### Managed Streaming Ingestion + +Managed streaming ingestion can resolve to either queued or streaming ingestion. +Either way, if tracking is enabled, you may use the same methods to track the status of the ingestion operation. + +### Serializing Ingestion Operations + +After running an operation with tracking enabled, you may not want to track it immediately. + +In V2, you can serialize and deserialize ingestion operations using the `ToJsonString` and `FromJsonString` methods. + +This allows you to store the operation in a database or file, and later retrieve it to continue monitoring the ingestion status. You need to use a client that matches the address and type of the client that created the operation, and has tracking enabled. + +```csharp +var serialized = operation.ToJsonString(); +var deserializedOperation = IngestionOperation.FromJsonString(serialized, client); +var summary = await client.GetOperationSummaryAsync(deserializedOperation); +``` + +## Advanced topics + +### Multi-Ingestion + +`QueuedIngestClient` in V2 implements the `IMultiIngest` interface, which allows you to ingest multiple sources in a single call. + +The number of sources possible per call is limited. You can get the limit via the method `int GetMaxSourcesPerMultiIngest()` on `IMultiIngest`. + +Currently, only a list of `BlobSource` is supported. You might need to use an uploader to convert your local files or streams to `BlobSource` before using this method. + +**Example (V2):** + +```csharp +var uploader = new ManagedUploaderBuilder() + .WithAuthentication(auth) + .Build(); +var client = QueuedIngestClientBuilder.Create(new Uri(clusterUrl)) + .WithUploader(uploader) + .WithAuthentication(auth) + .Build(); +var source1 = new FileSource(filePath1, DataSourceFormat.csv); +var source2 = new FileSource(filePath2, DataSourceFormat.csv); +var (successes, failures) = uploader.UploadManyAsync(new[] { source1, source2 }); + +foreach (var blob in failures) +{ + Console.WriteLine($"Failed to upload {blob.SourceId}: {blob.Exception?.Message}"); +} + +var operation = await client.IngestAsync(successes, database, table, + new IngestProperties { EnableTracking = true }); +``` + +When tracking the operation, the status contains the number of successes and failures: + +```csharp +var summary = await client.GetOperationSummaryAsync(operation); +Assert.IsTrue(summary.Successes > 0); +Assert.IsTrue(summary.Failures > 0); +summary.Status // Succeeded, Failed, Cancelled and can also be InProgress in multi-ingestion +``` + +And you can get the details of each source: + +```csharp +var details = await client.GetIngestionDetailsAsync(operation); +foreach (var blob in details.IngestResults) +{ + if (blob.Status == IngestStatus.Succeeded) + { + Console.WriteLine($"Blob {blob.BlobName} was ingested successfully."); + } + else if (blob.Status == IngestStatus.Failed) + { + Console.WriteLine($"Blob {blob.BlobName} failed to ingest: {blob.Exception?.Message}"); + } +} +``` + +### DataReaderSource + +In V2, you can use `DataReaderSource` to ingest data from a .NET `IDataReader` implementation. Unlike other sources, DataReaders can be partially ingested, meaning they can ingest some of the data or be ingested in batches. + +When using `IngestAsync` with a `DataReaderSource`, its internal `MaxBytesPerFragment` and `MaxRecordsPerFragment` properties are used to determine how much data to ingest. + +Any data beyond that will remain in the reader for the next ingestion call. You can know if the reader has more data to ingest by checking the `HasDataRemaining` property of the `DataReaderSource`. + +**Example (V2):** + +```csharp +var dataReader = GetMyDataReader(); +var source = new DataReaderSource(dataReader, maxBytesPerFragment: 1024); +await client.IngestAsync(source, database, table); // Will ingest up to 1024 bytes of data +if (source.HasDataRemaining) +{ + // There is more data to ingest; you can call IngestAsync again +} +``` + +If you want to ingest multiple batches of data readers, you can use an uploader: + +```csharp +var uploader = new ManagedUploaderBuilder() + .WithAuthentication(auth) + .Build(); +(IEnumerable successes, IEnumerable failures) = await uploader.UploadManyAsync( + dataReaderSource, + maxFragmentsToCreate: 10, // defaults to the maximum number of blobs you can ingest in a single operation + props); + +// dataReaderSource.HasDataRemaining can still be true if `maxFragmentsToCreate` was reached before all data was ingested. +if (dataReaderSource.HasDataRemaining) +{ + Console.WriteLine("There is more data to ingest."); +} + +await client.IngestAsync(successes, database, table, props); +``` + +## Related content + +* [Create an app to get data using queued ingestion](../get-started/app-queued-ingestion.md) +* [Stream data for ingestion](../get-started/app-managed-streaming-ingest.md) diff --git a/data-explorer/kusto/toc.yml b/data-explorer/kusto/toc.yml index f0480de946..fff4fe87b2 100644 --- a/data-explorer/kusto/toc.yml +++ b/data-explorer/kusto/toc.yml @@ -98,8 +98,11 @@ items: href: api/get-started/app-management-commands.md - name: Queue data for ingestion href: api/get-started/app-queued-ingestion.md - - name: Managed streaming ingestion + - name: Stream data for ingestion href: api/get-started/app-managed-streaming-ingest.md + - name: Migrating to Kusto Ingest v2 + href: api/netfx/kusto-ingest-migrate.md + displayName: kusto.ingest, SDK - name: Connection strings items: - name: Connection strings overview diff --git a/data-explorer/kusto/zone-pivot-groups.yml b/data-explorer/kusto/zone-pivot-groups.yml new file mode 100644 index 0000000000..160d4cff1b --- /dev/null +++ b/data-explorer/kusto/zone-pivot-groups.yml @@ -0,0 +1,11 @@ +# YamlMime:ZonePivotGroups +# GitHub repository = 'MicrosoftDocs/dataexplorer-docs-pr' +groups: +- id: ingest-api + title: Ingest API version + prompt: Select the Ingest API version + pivots: + - id: latest + title: Ingest v1 + - id: preview + title: Ingest v2 (preview) \ No newline at end of file diff --git a/data-explorer/zone-pivot-groups.yml b/data-explorer/zone-pivot-groups.yml index 6927f88fe5..ede2d926e9 100644 --- a/data-explorer/zone-pivot-groups.yml +++ b/data-explorer/zone-pivot-groups.yml @@ -11,6 +11,14 @@ groups: title: Azure Monitor - id: fabric title: Microsoft Fabric +- id: ingest-api + title: Ingest API version + prompt: Select the Ingest API version + pivots: + - id: latest + title: Ingest v1 + - id: preview + title: Ingest v2 (preview) - id: kql-flavors-adx-fabric title: Kusto Query Language flavors prompt: Select your Azure service