Skip to content

Commit c9814ad

Browse files
Fix managed ingestion to poll for final status on queued fallback
When managed streaming ingestion falls back to queued, the immediate status is Queued/Pending. Previously this was treated as success, silently dropping data when permissions were wrong. Now polls for final status (1 min timeout, 5 sec interval) to catch permission errors and ingestion failures. Moved PollIngestionStatus to base class for reuse by both managed and queued paths.
1 parent 211dc9b commit c9814ad

File tree

2 files changed

+31
-23
lines changed

2 files changed

+31
-23
lines changed

src/Services/IKustoIngestionService.cs

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,30 @@ public static KustoIngestionProperties GetKustoIngestionProperties(DataSourceFor
4747
}
4848
return kustoIngestProperties;
4949
}
50+
51+
/// <summary>
52+
/// Polls the ingestion status until a terminal state is reached or timeout occurs.
53+
/// Used by both queued ingestion and managed ingestion when it falls back to queued.
54+
/// </summary>
55+
protected static async Task<IngestionStatus> PollIngestionStatus(IKustoIngestionResult queuedIngestResult, Guid sourceId, int ingestionTimeoutMinutes, int pollIntervalSeconds, CancellationToken cancellationToken)
56+
{
57+
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
58+
cts.CancelAfter(TimeSpan.FromMinutes(ingestionTimeoutMinutes));
59+
IngestionStatus ingestionStatus = null;
60+
while (!cancellationToken.IsCancellationRequested)
61+
{
62+
ingestionStatus = queuedIngestResult.GetIngestionStatusBySourceId(sourceId);
63+
if (ingestionStatus.Status == Status.Succeeded
64+
|| ingestionStatus.Status == Status.Skipped
65+
|| ingestionStatus.Status == Status.PartiallySucceeded
66+
|| ingestionStatus.Status == Status.Failed)
67+
{
68+
break;
69+
}
70+
await Task.Delay(TimeSpan.FromSeconds(pollIntervalSeconds), cancellationToken);
71+
}
72+
return ingestionStatus;
73+
}
5074
}
5175

5276
internal class KustoManagedIngestionService : IKustoIngestionService
@@ -69,6 +93,13 @@ public override async Task<IngestionStatus> IngestData(DataSourceFormat dataForm
6993
{
7094
this._logger.LogDebug($"Ingestion status for sourceId {streamSourceOptions.SourceId} is {managedIngestionStatus.Status}");
7195
}
96+
// When managed streaming ingestion falls back to queued, the immediate status is Queued/Pending.
97+
// In this case, poll for the final status to ensure we catch permission and ingestion errors.
98+
if (managedIngestionStatus.Status == Status.Queued || managedIngestionStatus.Status == Status.Pending)
99+
{
100+
this._logger.LogDebug($"Managed ingestion fell back to queued for sourceId {streamSourceOptions.SourceId}. Polling for final status.");
101+
return await PollIngestionStatus(ingestionResult, streamSourceOptions.SourceId, 1, 5, cancellationToken);
102+
}
72103
return managedIngestionStatus;
73104
}
74105
}
@@ -123,27 +154,5 @@ public override async Task<IngestionStatus> IngestData(DataSourceFormat dataForm
123154
}
124155
return await PollIngestionStatus(ingestionResult, streamSourceOptions.SourceId, pollTimeoutMinutes, pollIntervalSeconds, cancellationToken);
125156
}
126-
127-
private static async Task<IngestionStatus> PollIngestionStatus(IKustoIngestionResult queuedIngestResult, Guid sourceId, int ingestionTimeoutMinutes, int pollIntervalSeconds, CancellationToken cancellationToken)
128-
{
129-
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
130-
cts.CancelAfter(TimeSpan.FromMinutes(ingestionTimeoutMinutes));
131-
IngestionStatus ingestionStatus = null;
132-
while (!cancellationToken.IsCancellationRequested)
133-
{
134-
ingestionStatus = queuedIngestResult.GetIngestionStatusBySourceId(sourceId);
135-
// Check if the ingestion status indicates completion
136-
if (ingestionStatus.Status == Status.Succeeded
137-
|| ingestionStatus.Status == Status.Skipped // The ingestion was skipped because it was already ingested
138-
|| ingestionStatus.Status == Status.PartiallySucceeded // Some of the records were ingested
139-
|| ingestionStatus.Status == Status.Failed)
140-
{
141-
break;
142-
}
143-
// Wait for a specified interval before polling again
144-
await Task.Delay(TimeSpan.FromSeconds(pollIntervalSeconds), cancellationToken);
145-
}
146-
return ingestionStatus;
147-
}
148157
}
149158
}

test/IntegrationTests/KustoBindingE2EIntegrationTests.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,6 @@ public async Task KustoFunctionsE2E()
115115
Assert.True(isError, readPrivilegeException.GetBaseException().Message);
116116

117117
// Fail scenario for no ingest privileges
118-
119118
string[] testsNoPrivilegesExecute = { nameof(KustoEndToEndTestClass.OutputFailForUserWithNoReadPrivileges) };
120119
// , nameof(KustoEndToEndTestClass.OutputQueuedFailForUserWithNoReadPrivileges)
121120
foreach (string testNoPrivilegesExecute in testsNoPrivilegesExecute)

0 commit comments

Comments
 (0)