Skip to content

Commit be7a637

Browse files
authored
Merge pull request #254 from datalust/dev
2022.1.x Release
2 parents 153fbe6 + 4d3de24 commit be7a637

File tree

1 file changed

+21
-6
lines changed

1 file changed

+21
-6
lines changed

src/SeqCli/Ingestion/LogShipper.cs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ public static async Task<int> ShipEvents(
4343
if (connection == null) throw new ArgumentNullException(nameof(connection));
4444
if (reader == null) throw new ArgumentNullException(nameof(reader));
4545

46-
var batch = await ReadBatchAsync(reader, filter, batchSize, invalidDataHandling);
46+
const int maxEmptyBatchWaitMS = 2000;
47+
var batch = await ReadBatchAsync(reader, filter, batchSize, invalidDataHandling, maxEmptyBatchWaitMS);
4748
var retries = 0;
4849
while (true)
4950
{
@@ -81,7 +82,7 @@ public static async Task<int> ShipEvents(
8182
if (batch.IsLast)
8283
break;
8384

84-
batch = await ReadBatchAsync(reader, filter, batchSize, invalidDataHandling);
85+
batch = await ReadBatchAsync(reader, filter, batchSize, invalidDataHandling, maxEmptyBatchWaitMS);
8586
}
8687

8788
return 0;
@@ -91,10 +92,16 @@ static async Task<BatchResult> ReadBatchAsync(
9192
ILogEventReader reader,
9293
Func<LogEvent, bool> filter,
9394
int count,
94-
InvalidDataHandling invalidDataHandling)
95+
InvalidDataHandling invalidDataHandling,
96+
int maxWaitMS)
9597
{
9698
var batch = new List<LogEvent>();
9799
var isLast = false;
100+
101+
// Avoid consuming stacks of CPU unnecessarily when there's no work to do. We do eventually yield
102+
// an empty batch, because level switching relies on this.
103+
var totalWaitMS = 0;
104+
const int idleWaitMS = 5;
98105
do
99106
{
100107
try
@@ -105,8 +112,16 @@ static async Task<BatchResult> ReadBatchAsync(
105112
isLast = rr.IsAtEnd;
106113
var evt = rr.LogEvent;
107114
if (evt == null)
108-
break;
109-
115+
{
116+
if (isLast || batch.Count != 0 || totalWaitMS > maxWaitMS)
117+
break;
118+
119+
// Nothing to to ship; wait to try to fill a batch.
120+
await Task.Delay(idleWaitMS);
121+
totalWaitMS += idleWaitMS;
122+
continue;
123+
}
124+
110125
if (filter == null || filter(evt))
111126
{
112127
batch.Add(evt);
@@ -163,7 +178,7 @@ static async Task<bool> SendBatchAsync(
163178
{
164179
try
165180
{
166-
var error = JsonConvert.DeserializeObject<dynamic>(resultJson);
181+
var error = JsonConvert.DeserializeObject<dynamic>(resultJson)!;
167182

168183
Log.Error("Failed with status code {StatusCode}: {ErrorMessage}",
169184
result.StatusCode,

0 commit comments

Comments
 (0)