@@ -79,15 +79,22 @@ Run the following commands on your database via Kusto Explorer (Desktop) or Kust
7979
8080Enable streaming ingestion on the table or on the entire database using one of the following commands:
8181
82+ Table level:
83+
8284``` kql
8385.alter table <your table name> policy streamingingestion enable
86+ ```
87+
88+ Database level:
89+
90+ ``` kql
8491
8592.alter database <databaseName> policy streamingingestion enable
8693```
8794
8895It can take up to two minutes for the policy to take effect.
8996
90- For more information about streaming policy, see [ Streaming ingestion policy - Azure Data Explorer & Real-Time Analytics ] ( ../../../kusto//management/streaming-ingestion-policy.md )
97+ For more information about streaming policy, see [ Streaming ingestion policy] ( ../../../kusto//management/streaming-ingestion-policy.md ) .
9198
9299## Create a basic client application
93100
@@ -149,7 +156,7 @@ class Program
149156 Console .WriteLine (" row:" + row .ToString () + " \t " );
150157 for (int i = 0 ; i < result .FieldCount ; i ++ )
151158 {
152- Console .WriteLine (" \t " + result .GetName (i )+ " - " + result .GetValue (i ) );
159+ Console .WriteLine (" \t " + result .GetName (i ) + " - " + result .GetValue (i ));
153160 }
154161 Console .WriteLine ();
155162 }
@@ -166,27 +173,26 @@ Copy *stormevents.csv* file to the same location as your script. Since our input
166173Add and ingestion section using the following lines to the end of ` Main() ` .
167174
168175``` csharp
169- var ingestProperties = new KustoIngestionProperties (databaseName , tableName )
170- {
171- Format = DataSourceFormat .csv
172- };
173-
174- // Ingestion section
175- Console .WriteLine (" Ingesting data from a file" );
176- ingestClient .IngestFromStorageAsync (" .\\ stormevents.csv" , ingestProperties ).Wait ();
176+ var ingestProperties = new KustoIngestionProperties (databaseName , tableName )
177+ {
178+ Format = DataSourceFormat .csv
179+ };
180+ // Ingestion section
181+ Console .WriteLine (" Ingesting data from a file" );
182+ ingestClient .IngestFromStorageAsync (" .\\ stormevents.csv" , ingestProperties ).Wait ();
177183```
178184
179185Let’s also query the new number of rows and the most recent row after the ingestion.
180186Add the following lines after the ingestion command:
181187
182188``` csharp
183- Console .WriteLine (" Number of rows in " + tableName );
184- result = kustoClient .ExecuteQuery (databaseName , tableName + " | count" , new ClientRequestProperties ());
185- PrintResultAsValueList (result );
186-
187- Console .WriteLine (" Example line from " + tableName );
188- result = kustoClient .ExecuteQuery (databaseName , tableName + " | top 1 by EndTime" , new ClientRequestProperties ());
189- PrintResultAsValueList (result );
189+ Console .WriteLine (" Number of rows in " + tableName );
190+ result = kustoClient .ExecuteQuery (databaseName , tableName + " | count" , new ClientRequestProperties ());
191+ PrintResultAsValueList (result );
192+
193+ Console .WriteLine (" Example line from " + tableName );
194+ result = kustoClient .ExecuteQuery (databaseName , tableName + " | top 1 by EndTime" , new ClientRequestProperties ());
195+ PrintResultAsValueList (result );
190196```
191197
192198### [ Python] ( #tab/python )
@@ -245,23 +251,23 @@ Place the *stormevents.csv* file in the same location as your script. Since our
245251Add and ingestion section using the following lines to the end of ` main() ` .
246252
247253``` python
248- # Ingestion section
249- print (" Ingesting data from a file" )
250- ingest_properties = IngestionProperties(database_name, table_name, DataFormat.CSV )
251- ingest_client.ingest_from_file(file_path, ingest_properties)
254+ # Ingestion section
255+ print (" Ingesting data from a file" )
256+ ingest_properties = IngestionProperties(database_name, table_name, DataFormat.CSV )
257+ ingest_client.ingest_from_file(file_path, ingest_properties)
252258```
253259
254260Let’s also query the new number of rows and the most recent row after the ingestion.
255261Add the following lines after the ingestion command:
256262
257263``` python
258- print (" New number of rows in " + table_name)
259- result = kusto_client.execute_query(database_name, table_name + " | count" )
260- print_result_as_value_list(result)
261-
262- print (" Example line from " + table_name)
263- result = kusto_client.execute_query(database_name, table_name + " | top 1 by EndTime" )
264- print_result_as_value_list(result)
264+ print (" New number of rows in " + table_name)
265+ result = kusto_client.execute_query(database_name, table_name + " | count" )
266+ print_result_as_value_list(result)
267+
268+ print (" Example line from " + table_name)
269+ result = kusto_client.execute_query(database_name, table_name + " | top 1 by EndTime" )
270+ print_result_as_value_list(result)
265271```
266272
267273Run the script from the directory where the script and stormevents.csv are located. Alternatively, you can specify the full path to the file replacing ` file_path = os.curdir + "/stormevents.csv" ` with ` file_path = "<full path to stormevents.csv>" `
@@ -329,29 +335,27 @@ Place the *stormevents.csv* file in the same location as your script. Since our
329335Add and ingestion section using the following lines to the end of ` main() ` .
330336
331337``` typescript
332- const ingestProperties = new IngestionProperties ({
333- database: databaseName ,
334- table: tableName ,
335- format: DataFormat .CSV
336- });
337-
338- // Ingest section
339- console .log (" Ingesting data from a file" );
340- await ingestClient .ingestFromFile (" .\\ stormevents.csv" , ingestProperties );
341- ingestClient .close ();
338+ const ingestProperties = new IngestionProperties ({
339+ database: databaseName ,
340+ table: tableName ,
341+ format: DataFormat .CSV
342+ });
343+ // Ingest section
344+ console .log (" Ingesting data from a file" );
345+ await ingestClient .ingestFromFile (" .\\ stormevents.csv" , ingestProperties );
346+ ingestClient .close ();
342347```
343348
344349Let’s also query the new number of rows and the most recent row after the ingestion.
345350Add the following lines after the ingestion command:
346351
347352``` typescript
348- console .log (` New number of rows in ${tableName } ` );
349- result = await kustoClient .executeQuery (databaseName , ` ${tableName } | count ` );
350- printResultAsValueList (result );
351-
352- console .log (` Example line from ${tableName } ` );
353- result = await kustoClient .executeQuery (databaseName , ` ${tableName } | top 1 by EndTime ` );
354- printResultAsValueList (result );
353+ console .log (` New number of rows in ${tableName } ` );
354+ result = await kustoClient .executeQuery (databaseName , ` ${tableName } | count ` );
355+ printResultAsValueList (result );
356+ console .log (` Example line from ${tableName } ` );
357+ result = await kustoClient .executeQuery (databaseName , ` ${tableName } | top 1 by EndTime ` );
358+ printResultAsValueList (result );
355359```
356360
357361### [ Java] ( #tab/java )
@@ -420,44 +424,43 @@ Place the *stormevents.csv* file in the same location as your script. Since our
420424Add and ingestion section using the following lines to the end of ` main() ` .
421425
422426``` java
423- // Ingestion section
424- try (
425- ManagedStreamingIngestClient ingestClient = IngestClientFactory
426- .createManagedStreamingIngestClient(clusterKcsb, ingestionKcsb)) {
427- System . out. println(" Ingesting data from a file" );
428- String filePath = " stormevents.csv" ;
429- IngestionProperties ingestionProperties = new IngestionProperties (databaseName, table);
430- ingestionProperties. setDataFormat(DataFormat . CSV );
431- FileSourceInfo fileSourceInfo = new FileSourceInfo (filePath, 0 );
432- ingestClient. ingestFromFile(fileSourceInfo, ingestionProperties);
433-
434- } catch (Exception e) {
435- // TODO: handle exception
436- System . out. println(" Error: " + e);
437- }
427+ // Ingestion section
428+ try (
429+ ManagedStreamingIngestClient ingestClient = IngestClientFactory
430+ .createManagedStreamingIngestClient(clusterKcsb, ingestionKcsb)) {
431+ System . out. println(" Ingesting data from a file" );
432+ String filePath = " stormevents.csv" ;
433+ IngestionProperties ingestionProperties = new IngestionProperties (databaseName, table);
434+ ingestionProperties. setDataFormat(DataFormat . CSV );
435+ FileSourceInfo fileSourceInfo = new FileSourceInfo (filePath, 0 );
436+ ingestClient. ingestFromFile(fileSourceInfo, ingestionProperties);
437+ } catch (Exception e) {
438+ // TODO: handle exception
439+ System . out. println(" Error: " + e);
440+ }
438441
439442```
440443
441444Let’s also query the new number of rows and the most recent row after the ingestion.
442445Add the following lines after the ingestion command:
443446
444447``` java
445- query = table + " | count" ;
446- results = kustoClient. execute(databaseName, query);
447- primaryResults = results. getPrimaryResults();
448- System . out. println(" \n Number of rows in " + table + " AFTER ingestion:" );
449- printResultsAsValueList(primaryResults);
450-
451- query = table + " | top 1 by EndTime" ;
452- results = kustoClient. execute(databaseName, query);
453- primaryResults = results. getPrimaryResults();
454- System . out. println(" \n Example line from " + table);
455- printResultsAsValueList(primaryResults);
448+ query = table + " | count" ;
449+ results = kustoClient. execute(databaseName, query);
450+ primaryResults = results. getPrimaryResults();
451+ System . out. println(" \n Number of rows in " + table + " AFTER ingestion:" );
452+ printResultsAsValueList(primaryResults);
453+
454+ query = table + " | top 1 by EndTime" ;
455+ results = kustoClient. execute(databaseName, query);
456+ primaryResults = results. getPrimaryResults();
457+ System . out. println(" \n Example line from " + table);
458+ printResultsAsValueList(primaryResults);
456459```
457460
458461---
459462
460- The first time your run the application the results are as follows:
463+ The first time you run the application the results are as follows:
461464
462465``` plaintext
463466Number of rows in MyStormEvents
@@ -489,18 +492,18 @@ To ingest the stream from memory, call the `IngestFromStreamAsync()` method.
489492Replace the ingestion section with the following code:
490493
491494``` csharp
492- // Ingestion section
493- Console .WriteLine (" Ingesting data from memory" );
494- var singleLine = " 2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,'{}'" ;
495- byte [] byteArray = Encoding .UTF8 .GetBytes (singleLine );
496- using (MemoryStream stream = new MemoryStream (byteArray ))
497- {
498- var streamSourceOptions = new StreamSourceOptions
499- {
500- LeaveOpen = false
501- };
502- ingestClient .IngestFromStreamAsync (stream , ingestProperties , streamSourceOptions ).Wait ();
503- }
495+ // Ingestion section
496+ Console .WriteLine (" Ingesting data from memory" );
497+ var singleLine = " 2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,'{}'" ;
498+ byte [] byteArray = Encoding .UTF8 .GetBytes (singleLine );
499+ using (MemoryStream stream = new MemoryStream (byteArray ))
500+ {
501+ var streamSourceOptions = new StreamSourceOptions
502+ {
503+ LeaveOpen = false
504+ };
505+ ingestClient .IngestFromStreamAsync (stream , ingestProperties , streamSourceOptions ).Wait ();
506+ }
504507```
505508
506509### [ Python] ( #tab/python )
@@ -510,14 +513,14 @@ To ingest the stream from memory, call the `ingest_from_stream()` API.
510513Replace the ingestion section with the following code:
511514
512515``` python
513- # Ingestion section
514- print (" Ingesting data from memory" )
515- single_line = ' 2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,"{} "'
516- string_stream = io.StringIO(single_line)
517- ingest_properties = IngestionProperties(database_name, table_name, DataFormat.CSV )
518- # when possible provide the size of the raw data
519- stream_descriptor = StreamDescriptor(string_stream, is_compressed = False , size = len (single_line))
520- ingest_client.ingest_from_stream(stream_descriptor, ingest_properties)
516+ # Ingestion section
517+ print (" Ingesting data from memory" )
518+ single_line = ' 2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,"{} "'
519+ string_stream = io.StringIO(single_line)
520+ ingest_properties = IngestionProperties(database_name, table_name, DataFormat.CSV )
521+ # when possible provide the size of the raw data
522+ stream_descriptor = StreamDescriptor(string_stream, is_compressed = False , size = len (single_line))
523+ ingest_client.ingest_from_stream(stream_descriptor, ingest_properties)
521524```
522525
523526### [ TypeScript] ( #tab/typescript )
@@ -527,11 +530,11 @@ To ingest the stream from memory, call the `ingestFromStream()` API.
527530Replace the ingestion section with the following code:
528531
529532``` typescript
530- // Ingest section
531- console .log (' Ingesting data from memory' );
532- const singleLine = ' 2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,"{}"'
533- await ingestClient .ingestFromStream (Buffer .from (singleLine ), ingestProperties )
534- ingestClient .close ();
533+ // Ingest section
534+ console .log (' Ingesting data from memory' );
535+ const singleLine = ' 2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,"{}"'
536+ await ingestClient .ingestFromStream (Buffer .from (singleLine ), ingestProperties )
537+ ingestClient .close ();
535538```
536539
537540### [ Java] ( #tab/java )
@@ -541,21 +544,20 @@ To ingest the stream from memory, call the `ingestFromStream()` API.
541544Replace the ingestion section with the following code:
542545
543546``` java
544- String singleLine = " 2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\" {}\" " ;
545- ByteArrayInputStream inputStream = new ByteArrayInputStream (singleLine. getBytes(StandardCharsets . UTF_8 ));
546- try (
547- ManagedStreamingIngestClient ingestClient = (ManagedStreamingIngestClient ) IngestClientFactory
548- .createManagedStreamingIngestClient(clusterKcsb, ingestionKcsb)) {
549- System . out. println(" Ingesting data from a byte array" );
550- IngestionProperties ingestionProperties = new IngestionProperties (databaseName, table);
551- ingestionProperties. setDataFormat(DataFormat . CSV );
552- StreamSourceInfo streamSourceInfo = new StreamSourceInfo (inputStream);
553- ingestClient. ingestFromStream(streamSourceInfo, ingestionProperties);
554-
555- } catch (Exception e) {
556- // TODO: handle exception
557- System . out. println(" Error: " + e);
558- }
547+ String singleLine = " 2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\" {}\" " ;
548+ ByteArrayInputStream inputStream = new ByteArrayInputStream (singleLine. getBytes(StandardCharsets . UTF_8 ));
549+ try (
550+ ManagedStreamingIngestClient ingestClient = (ManagedStreamingIngestClient ) IngestClientFactory
551+ .createManagedStreamingIngestClient(clusterKcsb, ingestionKcsb)) {
552+ System . out. println(" Ingesting data from a byte array" );
553+ IngestionProperties ingestionProperties = new IngestionProperties (databaseName, table);
554+ ingestionProperties. setDataFormat(DataFormat . CSV );
555+ StreamSourceInfo streamSourceInfo = new StreamSourceInfo (inputStream);
556+ ingestClient. ingestFromStream(streamSourceInfo, ingestionProperties);
557+ } catch (Exception e) {
558+ // TODO: handle exception
559+ System . out. println(" Error: " + e);
560+ }
559561```
560562
561563---
0 commit comments