Skip to content
This repository was archived by the owner on Jul 18, 2023. It is now read-only.

Commit 4c8e9f7

Browse files
committed
Add maxBatchSize parameter to IntervalBatcher
Fixes #58
1 parent 7dc6860 commit 4c8e9f7

File tree

4 files changed

+60
-5
lines changed

4 files changed

+60
-5
lines changed

src/InfluxDB.Collector/Configuration/CollectorBatchConfiguration.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@ namespace InfluxDB.Collector.Configuration
44
{
55
public abstract class CollectorBatchConfiguration
66
{
7-
public abstract CollectorConfiguration AtInterval(TimeSpan interval);
7+
public abstract CollectorConfiguration AtInterval(TimeSpan interval, int? maxBatchSize = null);
88
}
99
}

src/InfluxDB.Collector/Configuration/PipelinedCollectorBatchConfiguration.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,18 @@ class PipelinedCollectorBatchConfiguration : CollectorBatchConfiguration
88
{
99
readonly CollectorConfiguration _configuration;
1010
TimeSpan? _interval;
11+
int? _maxBatchSize;
1112

1213
public PipelinedCollectorBatchConfiguration(CollectorConfiguration configuration)
1314
{
1415
if (configuration == null) throw new ArgumentNullException(nameof(configuration));
1516
_configuration = configuration;
1617
}
1718

18-
public override CollectorConfiguration AtInterval(TimeSpan interval)
19+
public override CollectorConfiguration AtInterval(TimeSpan interval, int? maxBatchSize = null)
1920
{
2021
_interval = interval;
22+
_maxBatchSize = maxBatchSize;
2123
return _configuration;
2224
}
2325

@@ -29,7 +31,7 @@ public IPointEmitter CreateEmitter(IPointEmitter parent, out Action dispose)
2931
return parent;
3032
}
3133

32-
var batcher = new IntervalBatcher(_interval.Value, parent);
34+
var batcher = new IntervalBatcher(_interval.Value, _maxBatchSize, parent);
3335
dispose = batcher.Dispose;
3436
return batcher;
3537
}

src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Linq;
34
using System.Threading.Tasks;
45
using InfluxDB.Collector.Diagnostics;
56
using InfluxDB.Collector.Platform;
7+
using InfluxDB.Collector.Util;
68

79
namespace InfluxDB.Collector.Pipeline.Batch
810
{
@@ -12,17 +14,19 @@ class IntervalBatcher : IPointEmitter, IDisposable
1214
Queue<PointData> _queue = new Queue<PointData>();
1315

1416
readonly TimeSpan _interval;
17+
readonly int? _maxBatchSize;
1518
readonly IPointEmitter _parent;
1619

1720
readonly object _stateLock = new object();
1821
readonly PortableTimer _timer;
1922
bool _unloading;
2023
bool _started;
2124

22-
public IntervalBatcher(TimeSpan interval, IPointEmitter parent)
25+
public IntervalBatcher(TimeSpan interval, int? maxBatchSize, IPointEmitter parent)
2326
{
2427
_parent = parent;
2528
_interval = interval;
29+
_maxBatchSize = maxBatchSize;
2630
_timer = new PortableTimer(cancel => OnTick());
2731
}
2832

@@ -60,7 +64,17 @@ Task OnTick()
6064
_queue = new Queue<PointData>();
6165
}
6266

63-
_parent.Emit(batch.ToArray());
67+
if (_maxBatchSize == null || batch.Count <= _maxBatchSize.Value)
68+
{
69+
_parent.Emit(batch.ToArray());
70+
}
71+
else
72+
{
73+
foreach (var chunk in batch.Batch(_maxBatchSize.Value))
74+
{
75+
_parent.Emit(chunk.ToArray());
76+
}
77+
}
6478
}
6579
catch (Exception ex)
6680
{
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
using System.Collections.Generic;
2+
using System.Linq;
3+
4+
namespace InfluxDB.Collector.Util
5+
{
6+
internal static class EnumerableExtensions
7+
{
8+
// from https://github.com/morelinq/MoreLINQ/blob/master/MoreLinq/Batch.cs
9+
public static IEnumerable<IEnumerable<TSource>> Batch<TSource>(this IEnumerable<TSource> source, int size)
10+
{
11+
TSource[] bucket = null;
12+
var count = 0;
13+
14+
foreach (var item in source)
15+
{
16+
if (bucket == null)
17+
{
18+
bucket = new TSource[size];
19+
}
20+
21+
bucket[count++] = item;
22+
if (count != size)
23+
{
24+
continue;
25+
}
26+
27+
yield return bucket;
28+
29+
bucket = null;
30+
count = 0;
31+
}
32+
33+
if (bucket != null && count > 0)
34+
{
35+
yield return bucket.Take(count);
36+
}
37+
}
38+
}
39+
}

0 commit comments

Comments
 (0)