Skip to content

Commit d79a36b

Browse files
authored
[Azure.Monitor.Ingestion] Add tests and update Readme (Azure#34021)
1 parent a27a4d8 commit d79a36b

File tree

5 files changed

+309
-23
lines changed

5 files changed

+309
-23
lines changed

sdk/monitor/Azure.Monitor.Ingestion/README.md

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ We guarantee that all client instance methods are thread-safe and independent of
8888

8989
- [Register the client with dependency injection](#register-the-client-with-dependency-injection)
9090
- [Upload custom logs](#upload-custom-logs)
91+
- [Upload custom logs as IEnumerable](#upload-custom-logs-ienumerable)
92+
- [Upload custom logs as IEnumerable with EventHandler](#upload-custom-logs-ienumerable-eventhandler)
9193
- [Verify logs](#verify-logs)
9294

9395
You can familiarize yourself with different APIs using [samples](https://github.com/Azure/azure-sdk-for-net/tree/main/sdk/monitor/Azure.Monitor.Ingestion/samples).
@@ -98,7 +100,7 @@ To register `LogsIngestionClient` with the dependency injection (DI) container,
98100

99101
### Upload custom logs
100102

101-
You can upload logs using either the `LogsIngestionClient.Upload` or the `LogsIngestionClient.UploadAsync` method. Note the data ingestion [limits](https://learn.microsoft.com/azure/azure-monitor/service-limits#custom-logs).
103+
You can upload logs using either the `LogsIngestionClient.Upload` or the `LogsIngestionClient.UploadAsync` method. Note the data ingestion [limits](https://learn.microsoft.com/azure/azure-monitor/service-limits#custom-logs). This method has an optional parameter: string contentEncoding. This refers to the encoding of the RequestContent that is being passed in. If you are passing in content that is already manipulated, set the contentEncoding parameter. For example if your content is gzipped, set contentEncoding to be "gzip". The default behavior if this parameter is not set is to `gzip` all input.
102104

103105
```C# Snippet:UploadCustomLogsAsync
104106
var endpoint = new Uri("<data_collection_endpoint>");
@@ -147,6 +149,81 @@ Response response = await client.UploadAsync(
147149
RequestContent.Create(data)).ConfigureAwait(false);
148150
```
149151

152+
### Upload custom logs as IEnumerable
153+
154+
You can also upload logs using either the `LogsIngestionClient.Upload` or the `LogsIngestionClient.UploadAsync` method in which logs are passed in a generic `IEnumerable` type along with an optional `LogsUploadOptions` parameter. The `LogsUploadOptions` parameter includes a serializer, concurrency, and an EventHandler.
155+
156+
```C# Snippet:UploadLogDataIEnumerableAsync
157+
var endpoint = new Uri("<data_collection_endpoint_uri>");
158+
var ruleId = "<data_collection_rule_id>";
159+
var streamName = "<stream_name>";
160+
161+
var credential = new DefaultAzureCredential();
162+
LogsIngestionClient client = new(endpoint, credential);
163+
164+
DateTimeOffset currentTime = DateTimeOffset.UtcNow;
165+
166+
var entries = new List<Object>();
167+
for (int i = 0; i < 100; i++)
168+
{
169+
entries.Add(
170+
new {
171+
Time = currentTime,
172+
Computer = "Computer" + i.ToString(),
173+
AdditionalContext = i
174+
}
175+
);
176+
}
177+
178+
// Upload our logs
179+
Response response = await client.UploadAsync(ruleId, streamName, entries).ConfigureAwait(false);
180+
```
181+
182+
### Upload custom logs as IEnumerable with EventHandler
183+
184+
You can upload logs using either the `LogsIngestionClient.Upload` or the `LogsIngestionClient.UploadAsync` method. In these two methods, logs are passed in a generic `IEnumerable` type. Additionally, there's an `LogsUploadOptions`-typed parameter in which a serializer, concurrency, and EventHandler can be set. The default serializer is set to `System.Text.Json`, but you can pass in the serializer you would like used. The `MaxConcurrency` property sets the number of threads that will be used in the `UploadAsync` method. The default value is 5, and this parameter is unused in the `Upload` method. The EventHandler is used for error handling. It gives the user the option to abort the upload if a batch fails and access the failed logs and corresponding exception. Without the EventHandler, if an upload fails, an `AggregateException` will be thrown.
185+
186+
```C# Snippet:LogDataIEnumerableEventHandlerAsync
187+
var endpoint = new Uri("<data_collection_endpoint_uri>");
188+
var ruleId = "<data_collection_rule_id>";
189+
var streamName = "<stream_name>";
190+
191+
var credential = new DefaultAzureCredential();
192+
LogsIngestionClient client = new(endpoint, credential);
193+
194+
DateTimeOffset currentTime = DateTimeOffset.UtcNow;
195+
196+
var entries = new List<Object>();
197+
for (int i = 0; i < 100; i++)
198+
{
199+
entries.Add(
200+
new {
201+
Time = currentTime,
202+
Computer = "Computer" + i.ToString(),
203+
AdditionalContext = i
204+
}
205+
);
206+
}
207+
// Set concurrency and EventHandler in LogsUploadOptions
208+
LogsUploadOptions options = new LogsUploadOptions();
209+
options.MaxConcurrency = 10;
210+
options.UploadFailed += Options_UploadFailed;
211+
212+
// Upload our logs
213+
Response response = await client.UploadAsync(ruleId, streamName, entries, options).ConfigureAwait(false);
214+
215+
Task Options_UploadFailed(UploadFailedEventArgs e)
216+
{
217+
// Throw exception from EventHandler to stop Upload if there is a failure
218+
IReadOnlyList<object> failedLogs = e.FailedLogs;
219+
// 413 status is RequestTooLarge - don't throw here because other batches can successfully upload
220+
if ((e.Exception is RequestFailedException) && (((RequestFailedException)e.Exception).Status != 413))
221+
throw e.Exception;
222+
else
223+
return Task.CompletedTask;
224+
}
225+
```
226+
150227
### Verify logs
151228

152229
You can verify that your data has been uploaded correctly by using the [Azure Monitor Query](https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/monitor/Azure.Monitor.Query/README.md#install-the-package) library. Run the [Upload custom logs](#upload-custom-logs) sample first before verifying the logs.

sdk/monitor/Azure.Monitor.Ingestion/src/LogsIngestionClient.cs

Lines changed: 56 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,8 @@ public virtual Response Upload<T>(string ruleId, string streamName, IEnumerable<
219219
break;
220220
try
221221
{
222+
// Cancel all future Uploads if user triggers CancellationToken
223+
cancellationToken.ThrowIfCancellationRequested();
222224
// Because we are uploading in sequence, wait for each batch to upload before starting the next batch
223225
response = UploadBatchListSyncOrAsync(
224226
batch,
@@ -232,18 +234,23 @@ public virtual Response Upload<T>(string ruleId, string streamName, IEnumerable<
232234
// if there is no Handler on options, throw exception otherwise raise Handler
233235
if (!options.HasHandler)
234236
{
237+
// throw exception here that is caught in catch and we increment LogsFailed
235238
throw new RequestFailedException(response);
236239
}
237240
else
238241
{
242+
logsFailed += batch.Logs.Count;
239243
var eventArgs = new UploadFailedEventArgs(batch.Logs, new RequestFailedException(response), isRunningSynchronously: true, ClientDiagnostics, cancellationToken);
240244
#pragma warning disable AZC0106 // Non-public asynchronous method needs 'async' parameter.
241245
// sync/async parameter in eventArgs
242-
var ex = options.OnUploadFailedAsync(eventArgs).EnsureCompleted();
246+
var userThrownException = options.OnUploadFailedAsync(eventArgs).EnsureCompleted();
243247
#pragma warning restore AZC0106 // Non-public asynchronous method needs 'async' parameter.
244-
shouldAbort = ex != null;
245-
if (shouldAbort)
246-
AddException(ref exceptions, ex);
248+
// if exception is thrown stop processing future batches
249+
if (userThrownException != null)
250+
{
251+
shouldAbort = true;
252+
AddException(ref exceptions, userThrownException);
253+
}
247254
}
248255
}
249256
}
@@ -259,13 +266,24 @@ public virtual Response Upload<T>(string ruleId, string streamName, IEnumerable<
259266
}
260267
else
261268
{
269+
logsFailed += batch.Logs.Count;
262270
var eventArgs = new UploadFailedEventArgs(batch.Logs, new RequestFailedException(response), isRunningSynchronously: true, ClientDiagnostics, cancellationToken);
263271
#pragma warning disable AZC0106 // Non-public asynchronous method needs 'async' parameter.
264272
var exceptionOnUpload = options.OnUploadFailedAsync(eventArgs).EnsureCompleted();
265273
#pragma warning restore AZC0106 // Non-public asynchronous method needs 'async' parameter.
266-
shouldAbort = exceptionOnUpload != null;
267-
if (shouldAbort)
274+
// if exception is thrown stop processing future batches
275+
if (exceptionOnUpload != null)
276+
{
277+
shouldAbort = true;
268278
AddException(ref exceptions, exceptionOnUpload);
279+
}
280+
}
281+
282+
// Cancel all future Uploads if user triggers CancellationToken
283+
if (ex is OperationCanceledException && cancellationToken.IsCancellationRequested)
284+
{
285+
shouldAbort = true;
286+
AddException(ref exceptions, ex);
269287
}
270288
}
271289
}
@@ -332,6 +350,8 @@ public virtual async Task<Response> UploadAsync<T>(string ruleId, string streamN
332350
break;
333351
try
334352
{
353+
// Cancel all future Uploads if user triggers CancellationToken
354+
cancellationToken.ThrowIfCancellationRequested();
335355
// Start staging the next batch (but don't await the Task!)
336356
Task<Response> task = UploadBatchListSyncOrAsync(
337357
batch,
@@ -363,11 +383,16 @@ public virtual async Task<Response> UploadAsync<T>(string ruleId, string streamN
363383
}
364384
else
365385
{
366-
Exception exceptionEventHandler = await ProcessCompletedTaskEventHandlerAsync(runningTask, batch.Logs, options, cancellationToken).ConfigureAwait(false);
367-
shouldAbort = exceptionEventHandler != null;
368-
if (shouldAbort)
369-
AddException(ref exceptions, exceptionEventHandler);
386+
var processCompletedTask = await ProcessCompletedTaskEventHandlerAsync(runningTask, batch.Logs, options, cancellationToken).ConfigureAwait(false);
387+
logsFailed += processCompletedTask.FailedLogsCount;
388+
// if exception is thrown stop processing future batches
389+
if (processCompletedTask.Exception != null)
390+
{
391+
shouldAbort = true;
392+
AddException(ref exceptions, processCompletedTask.Exception);
393+
}
370394
}
395+
371396
// Remove completed task from task list
372397
runningTasks.RemoveAt(i);
373398
i--;
@@ -377,9 +402,15 @@ public virtual async Task<Response> UploadAsync<T>(string ruleId, string streamN
377402
// Wait for all the remaining blocks to finish uploading
378403
await Task.WhenAll(runningTasks.Select(_ => _.CurrentTask)).ConfigureAwait(false);
379404
}
380-
catch (Exception)
405+
catch (Exception ex)
381406
{
382407
// We do not want to log exceptions here as we will loop through all the tasks later
408+
// Cancel all future Uploads if user triggers CancellationToken
409+
if (ex is OperationCanceledException && cancellationToken.IsCancellationRequested)
410+
{
411+
shouldAbort = true;
412+
AddException(ref exceptions, ex);
413+
}
383414
}
384415
}
385416

@@ -393,10 +424,14 @@ public virtual async Task<Response> UploadAsync<T>(string ruleId, string streamN
393424
}
394425
else
395426
{
396-
Exception exceptionEventHandler = await ProcessCompletedTaskEventHandlerAsync(task.CurrentTask, task.Logs, options, cancellationToken).ConfigureAwait(false);
397-
shouldAbort = exceptionEventHandler != null;
398-
if (shouldAbort)
399-
AddException(ref exceptions, exceptionEventHandler);
427+
var processTaskResult = await ProcessCompletedTaskEventHandlerAsync(task.CurrentTask, task.Logs, options, cancellationToken).ConfigureAwait(false);
428+
logsFailed += processTaskResult.FailedLogsCount;
429+
// if exception is thrown stop processing future batches
430+
if (processTaskResult.Exception != null)
431+
{
432+
shouldAbort = true;
433+
AddException(ref exceptions, processTaskResult.Exception);
434+
}
400435
}
401436
}
402437
if (exceptions?.Count > 0)
@@ -431,22 +466,24 @@ private static void ProcessCompletedTask((Task<Response> CurrentTask, List<objec
431466
}
432467
}
433468

434-
internal async Task<Exception> ProcessCompletedTaskEventHandlerAsync(Task<Response> completedTask, List<object> logs, LogsUploadOptions options, CancellationToken cancellationToken)
469+
internal async Task<(Exception Exception, int FailedLogsCount)> ProcessCompletedTaskEventHandlerAsync(Task<Response> completedTask, List<object> logs, LogsUploadOptions options, CancellationToken cancellationToken)
435470
{
436471
UploadFailedEventArgs eventArgs;
437472
if (completedTask.Exception != null)
438473
{
439474
eventArgs = new UploadFailedEventArgs(logs, completedTask.Exception, isRunningSynchronously: false, ClientDiagnostics, cancellationToken);
440-
return await options.OnUploadFailedAsync(eventArgs).ConfigureAwait(false);
475+
var exception = await options.OnUploadFailedAsync(eventArgs).ConfigureAwait(false);
476+
return (exception, logs.Count);
441477
}
442478
else if (completedTask.Result.Status != 204)
443479
{
444480
eventArgs = new UploadFailedEventArgs(logs, new RequestFailedException(completedTask.Result), isRunningSynchronously: false, ClientDiagnostics, cancellationToken);
445-
return await options.OnUploadFailedAsync(eventArgs).ConfigureAwait(false);
481+
var exception = await options.OnUploadFailedAsync(eventArgs).ConfigureAwait(false);
482+
return (exception, logs.Count);
446483
}
447484
else
448485
{
449-
return null;
486+
return (null, 0);
450487
}
451488
}
452489

sdk/monitor/Azure.Monitor.Ingestion/tests/ErrorTest.cs

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using System;
55
using System.Collections.Generic;
6+
using System.Linq;
67
using System.Threading;
78
using System.Threading.Tasks;
89
using Azure.Core;
@@ -131,7 +132,6 @@ public async Task OneFailureWithEventHandler()
131132
var cts = new CancellationTokenSource();
132133
bool isTriggered = false;
133134
options.UploadFailed += Options_UploadFailed;
134-
//await client.UploadAsync(TestEnvironment.DCRImmutableId, TestEnvironment.StreamName, entries, options, cts.Token).ConfigureAwait(false);
135135
await client.UploadAsync(TestEnvironment.DCRImmutableId, TestEnvironment.StreamName, entries, options).ConfigureAwait(false);
136136
Assert.IsTrue(isTriggered);
137137
Task Options_UploadFailed(UploadFailedEventArgs e)
@@ -183,5 +183,87 @@ Task Options_UploadFailed(UploadFailedEventArgs e)
183183
return Task.CompletedTask;
184184
}
185185
}
186+
187+
[Test]
188+
public void TwoFailuresWithEventHandlerCancellationToken()
189+
{
190+
LogsIngestionClient client = CreateClient();
191+
// set compression to gzip so SDK does not gzip data (assumes already gzipped)
192+
LogsIngestionClient.Compression = "gzip";
193+
var entries = GenerateEntries(800, Recording.Now.DateTime);
194+
entries.Add(new object[] {
195+
new {
196+
Time = Recording.Now.DateTime,
197+
Computer = "Computer" + new string('*', Mb),
198+
AdditionalContext = 1
199+
}
200+
});
201+
entries.Add(new object[] {
202+
new {
203+
Time = Recording.Now.DateTime,
204+
Computer = "Computer" + new string('!', Mb),
205+
AdditionalContext = 1
206+
}
207+
});
208+
entries.Add(new object[] {
209+
new {
210+
Time = Recording.Now.DateTime,
211+
Computer = "Computer" + new string(';', Mb),
212+
AdditionalContext = 1
213+
}
214+
});
215+
216+
// Make the request
217+
LogsUploadOptions options = new LogsUploadOptions();
218+
options.MaxConcurrency = 2;
219+
bool isTriggered = false;
220+
var cts = new CancellationTokenSource();
221+
options.UploadFailed += Options_UploadFailed;
222+
AggregateException exceptions = Assert.ThrowsAsync<AggregateException>(async () => { await client.UploadAsync(TestEnvironment.DCRImmutableId, TestEnvironment.StreamName, entries, options, cts.Token).ConfigureAwait(false); });
223+
Assert.IsTrue(isTriggered);
224+
Assert.IsTrue(cts.IsCancellationRequested);
225+
// check if OperationCanceledException is in the Exception list
226+
// may not be first one in async case
227+
Assert.IsTrue(exceptions.InnerExceptions.Any(exception => exception is OperationCanceledException));
228+
Task Options_UploadFailed(UploadFailedEventArgs e)
229+
{
230+
cts.Cancel();
231+
isTriggered = true;
232+
Assert.IsInstanceOf<RequestFailedException>(e.Exception);
233+
Assert.AreEqual("ContentLengthLimitExceeded", ((RequestFailedException)(e.Exception)).ErrorCode);
234+
Assert.IsNull(((RequestFailedException)(e.Exception)).InnerException);
235+
Assert.AreEqual(413, ((RequestFailedException)(e.Exception)).Status);
236+
return Task.CompletedTask;
237+
}
238+
}
239+
240+
[Test]
241+
public void OneFailureWithEventHandlerThrowException()
242+
{
243+
LogsIngestionClient client = CreateClient();
244+
// set compression to gzip so SDK does not gzip data (assumes already gzipped)
245+
LogsIngestionClient.Compression = "gzip";
246+
var entries = GenerateEntries(800, Recording.Now.DateTime);
247+
entries.Add(new object[] {
248+
new {
249+
Time = Recording.Now.DateTime,
250+
Computer = "Computer" + new string('*', Mb),
251+
AdditionalContext = 1
252+
}
253+
});
254+
255+
// Make the request
256+
LogsUploadOptions options = new LogsUploadOptions();
257+
options.UploadFailed += Options_UploadFailed;
258+
var exceptions = Assert.ThrowsAsync<AggregateException>(async () => { await client.UploadAsync(TestEnvironment.DCRImmutableId, TestEnvironment.StreamName, entries, options).ConfigureAwait(false); });
259+
Task Options_UploadFailed(UploadFailedEventArgs e)
260+
{
261+
Assert.IsInstanceOf<RequestFailedException>(e.Exception);
262+
Assert.AreEqual("ContentLengthLimitExceeded", ((RequestFailedException)(e.Exception)).ErrorCode);
263+
Assert.IsNull(((RequestFailedException)(e.Exception)).InnerException);
264+
Assert.AreEqual(413, ((RequestFailedException)(e.Exception)).Status);
265+
throw e.Exception;
266+
}
267+
}
186268
}
187269
}

0 commit comments

Comments
 (0)