From b47e85fd0bd9c5ce617344971d0c83d1ec6c86cb Mon Sep 17 00:00:00 2001 From: Kirill Rakhman Date: Tue, 8 May 2018 13:28:28 +0200 Subject: [PATCH 1/3] Implement Aggregation of Increment and Time measurements Fixes #9 --- .../CollectorConfiguration.cs | 7 +- .../CollectorAggregateConfiguration.cs | 14 ++ ...ipelinedCollectorAggregateConfiguration.cs | 53 +++++ .../Aggregate/AggregateGroupingKey.cs | 92 +++++++++ .../Aggregate/AggregatePointEmitter.cs | 93 +++++++++ .../Pipeline/Aggregate/MeasurementKind.cs | 7 + .../Pipeline/Batch/IntervalBatcher.cs | 102 ++-------- .../Pipeline/Common/IntervalEmitterBase.cs | 100 +++++++++ .../Collector/AggregationTests.cs | 189 ++++++++++++++++++ 9 files changed, 566 insertions(+), 91 deletions(-) create mode 100644 src/InfluxDB.Collector/Configuration/CollectorAggregateConfiguration.cs create mode 100644 src/InfluxDB.Collector/Configuration/PipelinedCollectorAggregateConfiguration.cs create mode 100644 src/InfluxDB.Collector/Pipeline/Aggregate/AggregateGroupingKey.cs create mode 100644 src/InfluxDB.Collector/Pipeline/Aggregate/AggregatePointEmitter.cs create mode 100644 src/InfluxDB.Collector/Pipeline/Aggregate/MeasurementKind.cs create mode 100644 src/InfluxDB.Collector/Pipeline/Common/IntervalEmitterBase.cs create mode 100644 test/InfluxDB.LineProtocol.Tests/Collector/AggregationTests.cs diff --git a/src/InfluxDB.Collector/CollectorConfiguration.cs b/src/InfluxDB.Collector/CollectorConfiguration.cs index d507229..5e86553 100644 --- a/src/InfluxDB.Collector/CollectorConfiguration.cs +++ b/src/InfluxDB.Collector/CollectorConfiguration.cs @@ -10,6 +10,7 @@ public class CollectorConfiguration readonly PipelinedCollectorTagConfiguration _tag; readonly PipelinedCollectorEmitConfiguration _emitter; readonly PipelinedCollectorBatchConfiguration _batcher; + readonly PipelinedCollectorAggregateConfiguration _aggregator; public CollectorConfiguration() : this(null) @@ -22,6 +23,7 @@ internal CollectorConfiguration(IPointEmitter parent = null) _tag = new PipelinedCollectorTagConfiguration(this); _emitter = new PipelinedCollectorEmitConfiguration(this); _batcher = new PipelinedCollectorBatchConfiguration(this); + _aggregator = new PipelinedCollectorAggregateConfiguration(this); } public CollectorTagConfiguration Tag => _tag; @@ -30,6 +32,8 @@ internal CollectorConfiguration(IPointEmitter parent = null) public CollectorBatchConfiguration Batch => _batcher; + public CollectorAggregateConfiguration Aggregate => _aggregator; + public MetricsCollector CreateCollector() { Action disposeEmitter; @@ -38,6 +42,7 @@ public MetricsCollector CreateCollector() var emitter = _parent; emitter = _emitter.CreateEmitter(emitter, out disposeEmitter); emitter = _batcher.CreateEmitter(emitter, out disposeBatcher); + emitter = _aggregator.CreateEmitter(emitter, out disposeEmitter); return new PipelinedMetricsCollector(emitter, _tag.CreateEnricher(), () => { @@ -46,4 +51,4 @@ public MetricsCollector CreateCollector() }); } } -} +} \ No newline at end of file diff --git a/src/InfluxDB.Collector/Configuration/CollectorAggregateConfiguration.cs b/src/InfluxDB.Collector/Configuration/CollectorAggregateConfiguration.cs new file mode 100644 index 0000000..9b19717 --- /dev/null +++ b/src/InfluxDB.Collector/Configuration/CollectorAggregateConfiguration.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; + +namespace InfluxDB.Collector.Configuration +{ + public abstract class CollectorAggregateConfiguration + { + public abstract CollectorConfiguration AtInterval(TimeSpan interval); + + public abstract CollectorConfiguration SumIncrements(); + + public abstract CollectorConfiguration AggregateTimes(Func, double> func); + } +} \ No newline at end of file diff --git a/src/InfluxDB.Collector/Configuration/PipelinedCollectorAggregateConfiguration.cs b/src/InfluxDB.Collector/Configuration/PipelinedCollectorAggregateConfiguration.cs new file mode 100644 index 0000000..4f927a6 --- /dev/null +++ b/src/InfluxDB.Collector/Configuration/PipelinedCollectorAggregateConfiguration.cs @@ -0,0 +1,53 @@ +using System; +using System.Collections.Generic; +using InfluxDB.Collector.Pipeline; +using InfluxDB.Collector.Pipeline.Aggregate; + +namespace InfluxDB.Collector.Configuration +{ + class PipelinedCollectorAggregateConfiguration : CollectorAggregateConfiguration + { + private readonly CollectorConfiguration _configuration; + + bool _sumIncrements; + Func, double> _timeAggregation; + TimeSpan? _interval; + + public PipelinedCollectorAggregateConfiguration(CollectorConfiguration configuration) + { + if (configuration == null) throw new ArgumentNullException(nameof(configuration)); + _configuration = configuration; + } + + public override CollectorConfiguration AtInterval(TimeSpan interval) + { + _interval = interval; + return _configuration; + } + + public override CollectorConfiguration SumIncrements() + { + _sumIncrements = true; + return _configuration; + } + + public override CollectorConfiguration AggregateTimes(Func, double> func) + { + _timeAggregation = func; + return _configuration; + } + + public IPointEmitter CreateEmitter(IPointEmitter parent, out Action dispose) + { + if (_interval == null) + { + dispose = null; + return parent; + } + + var aggregator = new AggregatePointEmitter(_interval.Value, _sumIncrements, _timeAggregation, parent); + dispose = aggregator.Dispose; + return aggregator; + } + } +} \ No newline at end of file diff --git a/src/InfluxDB.Collector/Pipeline/Aggregate/AggregateGroupingKey.cs b/src/InfluxDB.Collector/Pipeline/Aggregate/AggregateGroupingKey.cs new file mode 100644 index 0000000..59bac94 --- /dev/null +++ b/src/InfluxDB.Collector/Pipeline/Aggregate/AggregateGroupingKey.cs @@ -0,0 +1,92 @@ +using System; +using System.Collections.Generic; + +namespace InfluxDB.Collector.Pipeline.Aggregate +{ + struct GroupingKey : IEquatable + { + private static readonly Dictionary EmptyDict = new Dictionary(); + + public long Bucket { get; } + + public MeasurementKind Kind { get; } + + public string Measurement { get; } + + public Dictionary Tags { get; } + + public GroupingKey(long bucket, MeasurementKind kind, string measurement, Dictionary tags) + { + Bucket = bucket; + Kind = kind; + Measurement = measurement; + Tags = tags ?? EmptyDict; + } + + public bool Equals(GroupingKey other) + { + return Bucket == other.Bucket && Kind == other.Kind && Measurement == other.Measurement && DictionaryEquals(Tags, other.Tags); + } + + public override bool Equals(object obj) + { + if (ReferenceEquals(null, obj)) + { + return false; + } + + return obj is GroupingKey key && Equals(key); + } + + public override int GetHashCode() + { + unchecked + { + int hashCode = Bucket.GetHashCode(); + hashCode = (hashCode * 397) ^ (int) Kind; + hashCode = (hashCode * 397) ^ Measurement.GetHashCode(); + hashCode = (hashCode * 397) ^ TagsHashCode(); + return hashCode; + } + } + + int TagsHashCode() + { + unchecked + { + int hashCode = 1; + foreach (var kvp in Tags) + { + hashCode *= (kvp.Key.GetHashCode() * 397) ^ kvp.Key.GetHashCode(); + } + + return hashCode; + } + } + + static bool DictionaryEquals(Dictionary dict, Dictionary dict2) + { + if (dict.Count != dict2.Count) + { + return false; + } + + foreach (var kvp in dict) + { + if (dict2.TryGetValue(kvp.Key, out string value)) + { + if (value != kvp.Value) + { + return false; + } + } + else + { + return false; + } + } + + return true; + } + } +} \ No newline at end of file diff --git a/src/InfluxDB.Collector/Pipeline/Aggregate/AggregatePointEmitter.cs b/src/InfluxDB.Collector/Pipeline/Aggregate/AggregatePointEmitter.cs new file mode 100644 index 0000000..a2badcb --- /dev/null +++ b/src/InfluxDB.Collector/Pipeline/Aggregate/AggregatePointEmitter.cs @@ -0,0 +1,93 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using InfluxDB.Collector.Pipeline.Common; + +namespace InfluxDB.Collector.Pipeline.Aggregate +{ + class AggregatePointEmitter : IntervalEmitterBase + { + readonly bool _sumIncrements; + readonly Func, double> _timesAggregation; + readonly IPointEmitter _parent; + + public AggregatePointEmitter(TimeSpan timeSpan, bool sumIncrements, Func, double> timesAggregation, IPointEmitter parent) + : base(timeSpan) + { + _sumIncrements = sumIncrements; + _timesAggregation = timesAggregation; + _parent = parent; + } + + protected override void HandleBatch(IReadOnlyCollection batch) + { + var grouped = batch.GroupBy(x => new GroupingKey( + x.UtcTimestamp.HasValue ? x.UtcTimestamp.Value.Ticks / _interval.Ticks : 0, + DetermineKind(x), + x.Measurement, + x.Tags + )); + + var aggregated = grouped.SelectMany(Aggregate).ToArray(); + + _parent.Emit(aggregated); + } + + IEnumerable Aggregate(IGrouping group) + { + GroupingKey key = group.Key; + MeasurementKind kind = key.Kind; + + if (kind == MeasurementKind.Increment && _sumIncrements) + { + long sum = group.Sum(x => (long) x.Fields["count"]); + return new[] + { + new PointData( + key.Measurement, + new Dictionary { { "count", sum } }, + key.Tags, + AverageTime(key)) + }; + } + + if (kind == MeasurementKind.Time && _timesAggregation != null) + { + long ticks = (long) _timesAggregation(group.Select(x => ((TimeSpan) x.Fields["value"]).Ticks)); + return new[] + { + new PointData( + key.Measurement, + new Dictionary { { "value", new TimeSpan(ticks) } }, + key.Tags, + AverageTime(key)) + }; + } + + return group; + } + + private DateTime AverageTime(GroupingKey key) + { + return new DateTime(key.Bucket * _interval.Ticks + _interval.Ticks / 2, DateTimeKind.Utc); + } + + static MeasurementKind DetermineKind(PointData x) + { + if (x.Fields.Count != 1) return MeasurementKind.Other; + + if (x.Fields.TryGetValue("count", out var count) && count is long) + { + return MeasurementKind.Increment; + } + else if (x.Fields.TryGetValue("value", out var value) && value is TimeSpan) + { + return MeasurementKind.Time; + } + else + { + return MeasurementKind.Other; + } + } + } +} \ No newline at end of file diff --git a/src/InfluxDB.Collector/Pipeline/Aggregate/MeasurementKind.cs b/src/InfluxDB.Collector/Pipeline/Aggregate/MeasurementKind.cs new file mode 100644 index 0000000..89e4e94 --- /dev/null +++ b/src/InfluxDB.Collector/Pipeline/Aggregate/MeasurementKind.cs @@ -0,0 +1,7 @@ +namespace InfluxDB.Collector.Pipeline.Aggregate +{ + public enum MeasurementKind + { + Other = 0, Increment, Time + } +} \ No newline at end of file diff --git a/src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs b/src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs index 521ca6f..4ca56a5 100644 --- a/src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs +++ b/src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs @@ -1,114 +1,36 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Threading.Tasks; -using InfluxDB.Collector.Diagnostics; -using InfluxDB.Collector.Platform; +using InfluxDB.Collector.Pipeline.Common; using InfluxDB.Collector.Util; namespace InfluxDB.Collector.Pipeline.Batch { - class IntervalBatcher : IPointEmitter, IDisposable + class IntervalBatcher : IntervalEmitterBase { - readonly object _queueLock = new object(); - Queue _queue = new Queue(); - - readonly TimeSpan _interval; - readonly int? _maxBatchSize; readonly IPointEmitter _parent; - readonly object _stateLock = new object(); - readonly PortableTimer _timer; - bool _unloading; - bool _started; + readonly int? _maxBatchSize; - public IntervalBatcher(TimeSpan interval, int? maxBatchSize, IPointEmitter parent) + public IntervalBatcher(TimeSpan interval, int? maxBatchSize, IPointEmitter parent) : base(interval) { - _parent = parent; - _interval = interval; _maxBatchSize = maxBatchSize; - _timer = new PortableTimer(cancel => OnTick()); + _parent = parent; } - void CloseAndFlush() + protected override void HandleBatch(IReadOnlyCollection batch) { - lock (_stateLock) + if (_maxBatchSize == null || batch.Count <= _maxBatchSize.Value) { - if (!_started || _unloading) - return; - - _unloading = true; + _parent.Emit(batch.ToArray()); } - - _timer.Dispose(); - - OnTick(); - } - - public void Dispose() - { - CloseAndFlush(); - } - - Task OnTick() - { - try + else { - Queue batch; - lock (_queueLock) + foreach (var chunk in batch.Batch(_maxBatchSize.Value)) { - if (_queue.Count == 0) - return Task.Delay(0); - - batch = _queue; - _queue = new Queue(); + _parent.Emit(chunk.ToArray()); } - - if (_maxBatchSize == null || batch.Count <= _maxBatchSize.Value) - { - _parent.Emit(batch.ToArray()); - } - else - { - foreach (var chunk in batch.Batch(_maxBatchSize.Value)) - { - _parent.Emit(chunk.ToArray()); - } - } - } - catch (Exception ex) - { - CollectorLog.ReportError("Failed to emit metrics batch", ex); - } - finally - { - lock (_stateLock) - { - if (!_unloading) - _timer.Start(_interval); - } - } - - return Task.Delay(0); - } - - public void Emit(PointData[] points) - { - lock (_stateLock) - { - if (_unloading) return; - if (!_started) - { - _started = true; - _timer.Start(TimeSpan.Zero); - } - } - - lock (_queueLock) - { - foreach(var point in points) - _queue.Enqueue(point); } } } -} +} \ No newline at end of file diff --git a/src/InfluxDB.Collector/Pipeline/Common/IntervalEmitterBase.cs b/src/InfluxDB.Collector/Pipeline/Common/IntervalEmitterBase.cs new file mode 100644 index 0000000..b5a2e5b --- /dev/null +++ b/src/InfluxDB.Collector/Pipeline/Common/IntervalEmitterBase.cs @@ -0,0 +1,100 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using InfluxDB.Collector.Diagnostics; +using InfluxDB.Collector.Platform; + +namespace InfluxDB.Collector.Pipeline.Common +{ + internal abstract class IntervalEmitterBase : IPointEmitter, IDisposable + { + readonly object _queueLock = new object(); + Queue _queue = new Queue(); + + protected readonly TimeSpan _interval; + + readonly object _stateLock = new object(); + readonly PortableTimer _timer; + bool _unloading; + bool _started; + + protected IntervalEmitterBase(TimeSpan interval) + { + _interval = interval; + _timer = new PortableTimer(cancel => OnTick()); + } + + private void CloseAndFlush() + { + lock (_stateLock) + { + if (!_started || _unloading) + return; + + _unloading = true; + } + + _timer.Dispose(); + + OnTick(); + } + + public void Dispose() + { + CloseAndFlush(); + } + + protected Task OnTick() + { + try + { + Queue batch; + lock (_queueLock) + { + if (_queue.Count == 0) + return Task.Delay(0); + + batch = _queue; + _queue = new Queue(); + } + + HandleBatch(batch); + } + catch (Exception ex) + { + CollectorLog.ReportError("Failed to emit metrics batch", ex); + } + finally + { + lock (_stateLock) + { + if (!_unloading) + _timer.Start(_interval); + } + } + + return Task.Delay(0); + } + + public void Emit(PointData[] points) + { + lock (_stateLock) + { + if (_unloading) return; + if (!_started) + { + _started = true; + _timer.Start(TimeSpan.Zero); + } + } + + lock (_queueLock) + { + foreach (var point in points) + _queue.Enqueue(point); + } + } + + protected abstract void HandleBatch(IReadOnlyCollection batch); + } +} \ No newline at end of file diff --git a/test/InfluxDB.LineProtocol.Tests/Collector/AggregationTests.cs b/test/InfluxDB.LineProtocol.Tests/Collector/AggregationTests.cs new file mode 100644 index 0000000..9f95d5b --- /dev/null +++ b/test/InfluxDB.LineProtocol.Tests/Collector/AggregationTests.cs @@ -0,0 +1,189 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using InfluxDB.Collector; +using InfluxDB.Collector.Pipeline; +using Xunit; + +namespace InfluxDB.LineProtocol.Tests.Collector +{ + public class AggregationTests + { + [Fact] + public async Task IncrementsCanBeSummed() + { + var list = new List(); + + IPointEmitter collector = new CollectorConfiguration() + .Aggregate.AtInterval(TimeSpan.FromMilliseconds(500)) + .Aggregate.SumIncrements() + .WriteTo.Emitter(pts => list.AddRange(pts)) + .CreateCollector(); + + collector.Emit(new[] + { + new PointData("foo", + new Dictionary { { "count", 1L } }, + new Dictionary { { "tag1", "a" } }, + new DateTime(2018, 1, 1, 0, 0, 0, 0)), + + new PointData("foo", + new Dictionary { { "count", 2L } }, + new Dictionary { { "tag1", "a" } }, + new DateTime(2018, 1, 1, 0, 0, 0, 200)), + + new PointData("foo", + new Dictionary { { "count", 3L } }, + new Dictionary { { "tag1", "a" } }, + new DateTime(2018, 1, 1, 0, 0, 0, 300)) + }); + + await Task.Delay(TimeSpan.FromSeconds(1)); + + Assert.Equal(1, list.Count); + Assert.Equal(6L, list[0].Fields["count"]); + Assert.InRange(list[0].UtcTimestamp.Value.TimeOfDay, TimeSpan.FromMilliseconds(200), TimeSpan.FromMilliseconds(300)); + } + + [Fact] + public async Task TimesCanBeAveraged() + { + var list = new List(); + + IPointEmitter collector = new CollectorConfiguration() + .Aggregate.AtInterval(TimeSpan.FromMilliseconds(400)) + .Aggregate.AggregateTimes(Enumerable.Average) + .WriteTo.Emitter(pts => list.AddRange(pts)) + .CreateCollector(); + + collector.Emit(new[] + { + new PointData("foo", + new Dictionary { { "value", TimeSpan.FromSeconds(1) } }, + new Dictionary { { "tag1", "a" } }, + new DateTime(2018, 1, 1, 0, 0, 0, 0)), + + new PointData("foo", + new Dictionary { { "value", TimeSpan.FromSeconds(2) } }, + new Dictionary { { "tag1", "a" } }, + new DateTime(2018, 1, 1, 0, 0, 0, 200)), + + new PointData("foo", + new Dictionary { { "value", TimeSpan.FromSeconds(3) } }, + new Dictionary { { "tag1", "a" } }, + new DateTime(2018, 1, 1, 0, 0, 0, 300)) + }); + + await Task.Delay(TimeSpan.FromSeconds(1)); + + Assert.Equal(1, list.Count); + Assert.Equal(TimeSpan.FromSeconds(2), (TimeSpan) list[0].Fields["value"]); + Assert.InRange(list[0].UtcTimestamp.Value.TimeOfDay, TimeSpan.FromMilliseconds(200), TimeSpan.FromMilliseconds(300)); + } + + [Fact] + public async Task DifferentTagsArentAggregated() + { + var list = new List(); + + IPointEmitter collector = new CollectorConfiguration() + .Aggregate.AtInterval(TimeSpan.FromMilliseconds(500)) + .Aggregate.SumIncrements() + .WriteTo.Emitter(pts => list.AddRange(pts)) + .CreateCollector(); + + collector.Emit(new[] + { + new PointData("foo", + new Dictionary { { "count", 1L } }, + new Dictionary { { "tag1", "a" } }, + new DateTime(2018, 1, 1, 0, 0, 0, 0)), + + new PointData("foo", + new Dictionary { { "count", 2L } }, + new Dictionary { { "tag1", "b" } }, + new DateTime(2018, 1, 1, 0, 0, 0, 200)), + + new PointData("foo", + new Dictionary { { "count", 3L } }, + new Dictionary { { "tag1", "c" } }, + new DateTime(2018, 1, 1, 0, 0, 0, 300)) + }); + + await Task.Delay(TimeSpan.FromSeconds(1)); + + Assert.Equal(3, list.Count); + } + + [Fact] + public async Task DifferentMeasurementsArentAggregated() + { + var list = new List(); + + IPointEmitter collector = new CollectorConfiguration() + .Aggregate.AtInterval(TimeSpan.FromMilliseconds(500)) + .Aggregate.SumIncrements() + .WriteTo.Emitter(pts => list.AddRange(pts)) + .CreateCollector(); + + collector.Emit(new[] + { + new PointData("foo", + new Dictionary { { "count", 1L } }, + new Dictionary { { "tag1", "a" } }, + new DateTime(2018, 1, 1, 0, 0, 0, 0)), + + new PointData("bar", + new Dictionary { { "count", 2L } }, + new Dictionary { { "tag1", "a" } }, + new DateTime(2018, 1, 1, 0, 0, 0, 200)), + + new PointData("baz", + new Dictionary { { "count", 3L } }, + new Dictionary { { "tag1", "a" } }, + new DateTime(2018, 1, 1, 0, 0, 0, 300)) + }); + + await Task.Delay(TimeSpan.FromSeconds(1)); + + Assert.Equal(3, list.Count); + } + + [Fact] + public async Task DifferentTimeSpansArentAggregated() + { + var list = new List(); + + IPointEmitter collector = new CollectorConfiguration() + .Aggregate.AtInterval(TimeSpan.FromMilliseconds(500)) + .Aggregate.SumIncrements() + .WriteTo.Emitter(pts => list.AddRange(pts)) + .CreateCollector(); + + collector.Emit(new[] + { + new PointData("foo", + new Dictionary { { "count", 1L } }, + new Dictionary { { "tag1", "a" } }, + new DateTime(2018, 1, 1, 0, 0, 0, 0)), + + new PointData("foo", + new Dictionary { { "count", 2L } }, + new Dictionary { { "tag1", "a" } }, + new DateTime(2018, 1, 1, 0, 0, 0, 700)), + + new PointData("foo", + new Dictionary { { "count", 3L } }, + new Dictionary { { "tag1", "a" } }, + new DateTime(2018, 1, 1, 0, 0, 0, 800)) + }); + + await Task.Delay(TimeSpan.FromSeconds(1)); + + Assert.Equal(2, list.Count); + Assert.True(list.Any(x => (long) x.Fields["count"] == 1)); + Assert.True(list.Any(x => (long) x.Fields["count"] == 5)); + } + } +} \ No newline at end of file From f708be2fc3a67f7edafed89a54375b7ff6d754f9 Mon Sep 17 00:00:00 2001 From: Kirill Rakhman Date: Wed, 9 May 2018 11:27:08 +0200 Subject: [PATCH 2/3] aggregation: always treat points inside timer interval as one bucket --- .../Aggregate/AggregatePointEmitter.cs | 64 +++++++++++----- .../Collector/AggregationTests.cs | 75 +++++++++++++++++-- 2 files changed, 114 insertions(+), 25 deletions(-) diff --git a/src/InfluxDB.Collector/Pipeline/Aggregate/AggregatePointEmitter.cs b/src/InfluxDB.Collector/Pipeline/Aggregate/AggregatePointEmitter.cs index a2badcb..7f6c9c1 100644 --- a/src/InfluxDB.Collector/Pipeline/Aggregate/AggregatePointEmitter.cs +++ b/src/InfluxDB.Collector/Pipeline/Aggregate/AggregatePointEmitter.cs @@ -21,8 +21,11 @@ public AggregatePointEmitter(TimeSpan timeSpan, bool sumIncrements, Func batch) { + DateTime now = DateTime.UtcNow; + DateTime bucketThreshold = now - _interval; + var grouped = batch.GroupBy(x => new GroupingKey( - x.UtcTimestamp.HasValue ? x.UtcTimestamp.Value.Ticks / _interval.Ticks : 0, + DetermineBucket(x.UtcTimestamp, bucketThreshold, now), DetermineKind(x), x.Measurement, x.Tags @@ -33,6 +36,45 @@ protected override void HandleBatch(IReadOnlyCollection batch) _parent.Emit(aggregated); } + private long DetermineBucket(DateTime? timestamp, DateTime bucketThreshold, DateTime now) + { + if (!timestamp.HasValue) + { + return 0; + } + + DateTime value = timestamp.Value; + + if (value >= bucketThreshold && value <= now) + { + // point was in timer interval + return bucketThreshold.Ticks; + } + else + { + // point was before or after timer interval, round it to multiple of interval + return (value.Ticks / _interval.Ticks) * _interval.Ticks; + } + } + + static MeasurementKind DetermineKind(PointData x) + { + if (x.Fields.Count != 1) return MeasurementKind.Other; + + if (x.Fields.TryGetValue("count", out var count) && count is long) + { + return MeasurementKind.Increment; + } + else if (x.Fields.TryGetValue("value", out var value) && value is TimeSpan) + { + return MeasurementKind.Time; + } + else + { + return MeasurementKind.Other; + } + } + IEnumerable Aggregate(IGrouping group) { GroupingKey key = group.Key; @@ -69,25 +111,7 @@ IEnumerable Aggregate(IGrouping group) private DateTime AverageTime(GroupingKey key) { - return new DateTime(key.Bucket * _interval.Ticks + _interval.Ticks / 2, DateTimeKind.Utc); - } - - static MeasurementKind DetermineKind(PointData x) - { - if (x.Fields.Count != 1) return MeasurementKind.Other; - - if (x.Fields.TryGetValue("count", out var count) && count is long) - { - return MeasurementKind.Increment; - } - else if (x.Fields.TryGetValue("value", out var value) && value is TimeSpan) - { - return MeasurementKind.Time; - } - else - { - return MeasurementKind.Other; - } + return new DateTime(key.Bucket + _interval.Ticks / 2, DateTimeKind.Utc); } } } \ No newline at end of file diff --git a/test/InfluxDB.LineProtocol.Tests/Collector/AggregationTests.cs b/test/InfluxDB.LineProtocol.Tests/Collector/AggregationTests.cs index 9f95d5b..ab7fc12 100644 --- a/test/InfluxDB.LineProtocol.Tests/Collector/AggregationTests.cs +++ b/test/InfluxDB.LineProtocol.Tests/Collector/AggregationTests.cs @@ -10,15 +10,60 @@ namespace InfluxDB.LineProtocol.Tests.Collector { public class AggregationTests { + [Fact] + public async Task PointsAreCorrectlyGrouped() + { + var written = new TaskCompletionSource(); + var list = new List(); + + var start = DateTime.UtcNow; + + var collector = new CollectorConfiguration() + .Aggregate.AtInterval(TimeSpan.FromMilliseconds(500)) + .Aggregate.SumIncrements() + .WriteTo.Emitter(pts => + { + list.AddRange(pts); + written.SetResult(0); + }) + .CreateCollector(); + + collector.Write("foo", + new Dictionary { { "count", 1L } }, + new Dictionary { { "tag1", "a" } }, + start + ); + collector.Write("foo", + new Dictionary { { "count", 1L } }, + new Dictionary { { "tag1", "a" } }, + start + TimeSpan.FromMilliseconds(200) + ); + collector.Write("foo", + new Dictionary { { "count", 1L } }, + new Dictionary { { "tag1", "a" } }, + start + TimeSpan.FromMilliseconds(400) + ); + + await written.Task; + + Assert.Equal(1, list.Count); + Assert.Equal(3L, list[0].Fields["count"]); + } + [Fact] public async Task IncrementsCanBeSummed() { + var written = new TaskCompletionSource(); var list = new List(); IPointEmitter collector = new CollectorConfiguration() .Aggregate.AtInterval(TimeSpan.FromMilliseconds(500)) .Aggregate.SumIncrements() - .WriteTo.Emitter(pts => list.AddRange(pts)) + .WriteTo.Emitter(pts => + { + list.AddRange(pts); + written.SetResult(0); + }) .CreateCollector(); collector.Emit(new[] @@ -49,12 +94,17 @@ public async Task IncrementsCanBeSummed() [Fact] public async Task TimesCanBeAveraged() { + var written = new TaskCompletionSource(); var list = new List(); IPointEmitter collector = new CollectorConfiguration() .Aggregate.AtInterval(TimeSpan.FromMilliseconds(400)) .Aggregate.AggregateTimes(Enumerable.Average) - .WriteTo.Emitter(pts => list.AddRange(pts)) + .WriteTo.Emitter(pts => + { + list.AddRange(pts); + written.SetResult(0); + }) .CreateCollector(); collector.Emit(new[] @@ -85,12 +135,17 @@ public async Task TimesCanBeAveraged() [Fact] public async Task DifferentTagsArentAggregated() { + var written = new TaskCompletionSource(); var list = new List(); IPointEmitter collector = new CollectorConfiguration() .Aggregate.AtInterval(TimeSpan.FromMilliseconds(500)) .Aggregate.SumIncrements() - .WriteTo.Emitter(pts => list.AddRange(pts)) + .WriteTo.Emitter(pts => + { + list.AddRange(pts); + written.SetResult(0); + }) .CreateCollector(); collector.Emit(new[] @@ -119,12 +174,17 @@ public async Task DifferentTagsArentAggregated() [Fact] public async Task DifferentMeasurementsArentAggregated() { + var written = new TaskCompletionSource(); var list = new List(); IPointEmitter collector = new CollectorConfiguration() .Aggregate.AtInterval(TimeSpan.FromMilliseconds(500)) .Aggregate.SumIncrements() - .WriteTo.Emitter(pts => list.AddRange(pts)) + .WriteTo.Emitter(pts => + { + list.AddRange(pts); + written.SetResult(0); + }) .CreateCollector(); collector.Emit(new[] @@ -153,12 +213,17 @@ public async Task DifferentMeasurementsArentAggregated() [Fact] public async Task DifferentTimeSpansArentAggregated() { + var written = new TaskCompletionSource(); var list = new List(); IPointEmitter collector = new CollectorConfiguration() .Aggregate.AtInterval(TimeSpan.FromMilliseconds(500)) .Aggregate.SumIncrements() - .WriteTo.Emitter(pts => list.AddRange(pts)) + .WriteTo.Emitter(pts => + { + list.AddRange(pts); + written.SetResult(0); + }) .CreateCollector(); collector.Emit(new[] From 4de1a980d934ef7b08f4fdf46517ba30e8a4ce3a Mon Sep 17 00:00:00 2001 From: Kirill Rakhman Date: Tue, 8 May 2018 14:50:04 +0200 Subject: [PATCH 3/3] Optimize allocations on emission of single point Fixes #60 (cherry picked from commit f9d678f) --- .../Configuration/AggregateEmitter.cs | 17 ++++++++- src/InfluxDB.Collector/MetricsCollector.cs | 20 ++++++++-- .../Pipeline/Common/IntervalEmitterBase.cs | 37 +++++++++++++++---- .../Pipeline/Emit/HttpLineProtocolEmitter.cs | 18 ++++++++- .../Pipeline/ISinglePointEmitter.cs | 7 ++++ .../Pipeline/NullMetricsCollector.cs | 4 ++ .../Pipeline/PipelinedMetricsCollector.cs | 19 ++++++++-- 7 files changed, 105 insertions(+), 17 deletions(-) create mode 100644 src/InfluxDB.Collector/Pipeline/ISinglePointEmitter.cs diff --git a/src/InfluxDB.Collector/Configuration/AggregateEmitter.cs b/src/InfluxDB.Collector/Configuration/AggregateEmitter.cs index c92b59b..dfd798d 100644 --- a/src/InfluxDB.Collector/Configuration/AggregateEmitter.cs +++ b/src/InfluxDB.Collector/Configuration/AggregateEmitter.cs @@ -4,7 +4,7 @@ namespace InfluxDB.Collector.Configuration { - class AggregateEmitter : IPointEmitter + class AggregateEmitter : IPointEmitter, ISinglePointEmitter { readonly List _emitters; @@ -19,5 +19,20 @@ public void Emit(PointData[] points) foreach (var emitter in _emitters) emitter.Emit(points); } + + public void Emit(PointData point) + { + foreach (var emitter in _emitters) + { + if (emitter is ISinglePointEmitter singlePointEmitter) + { + singlePointEmitter.Emit(point); + } + else + { + emitter.Emit(new[] { point }); + } + } + } } } \ No newline at end of file diff --git a/src/InfluxDB.Collector/MetricsCollector.cs b/src/InfluxDB.Collector/MetricsCollector.cs index 36f2c2e..bc29078 100644 --- a/src/InfluxDB.Collector/MetricsCollector.cs +++ b/src/InfluxDB.Collector/MetricsCollector.cs @@ -5,7 +5,7 @@ namespace InfluxDB.Collector { - public abstract class MetricsCollector : IPointEmitter, IDisposable + public abstract class MetricsCollector : IPointEmitter, ISinglePointEmitter, IDisposable { readonly Util.ITimestampSource _timestampSource = new Util.PseudoHighResTimestampSource(); @@ -34,14 +34,16 @@ public void Dispose() Dispose(true); } - protected virtual void Dispose(bool disposing) { } + protected virtual void Dispose(bool disposing) + { + } public void Write(string measurement, IReadOnlyDictionary fields, IReadOnlyDictionary tags = null, DateTime? timestamp = null) { try { var point = new PointData(measurement, fields, tags, timestamp ?? _timestampSource.GetUtcNow()); - Emit(new[] { point }); + Emit(point); } catch (Exception ex) { @@ -54,6 +56,16 @@ void IPointEmitter.Emit(PointData[] points) Emit(points); } + void ISinglePointEmitter.Emit(PointData point) + { + Emit(point); + } + protected abstract void Emit(PointData[] points); + + protected virtual void Emit(PointData point) + { + Emit(new[] { point }); + } } -} +} \ No newline at end of file diff --git a/src/InfluxDB.Collector/Pipeline/Common/IntervalEmitterBase.cs b/src/InfluxDB.Collector/Pipeline/Common/IntervalEmitterBase.cs index b5a2e5b..995c497 100644 --- a/src/InfluxDB.Collector/Pipeline/Common/IntervalEmitterBase.cs +++ b/src/InfluxDB.Collector/Pipeline/Common/IntervalEmitterBase.cs @@ -6,7 +6,7 @@ namespace InfluxDB.Collector.Pipeline.Common { - internal abstract class IntervalEmitterBase : IPointEmitter, IDisposable + internal abstract class IntervalEmitterBase : IPointEmitter, ISinglePointEmitter, IDisposable { readonly object _queueLock = new object(); Queue _queue = new Queue(); @@ -77,10 +77,37 @@ protected Task OnTick() } public void Emit(PointData[] points) + { + if (!CheckState()) + { + return; + } + + lock (_queueLock) + { + foreach (var point in points) + _queue.Enqueue(point); + } + } + + public void Emit(PointData point) + { + if (!CheckState()) + { + return; + } + + lock (_queueLock) + { + _queue.Enqueue(point); + } + } + + private bool CheckState() { lock (_stateLock) { - if (_unloading) return; + if (_unloading) return false; if (!_started) { _started = true; @@ -88,11 +115,7 @@ public void Emit(PointData[] points) } } - lock (_queueLock) - { - foreach (var point in points) - _queue.Enqueue(point); - } + return true; } protected abstract void HandleBatch(IReadOnlyCollection batch); diff --git a/src/InfluxDB.Collector/Pipeline/Emit/HttpLineProtocolEmitter.cs b/src/InfluxDB.Collector/Pipeline/Emit/HttpLineProtocolEmitter.cs index 529cb42..78aeb92 100644 --- a/src/InfluxDB.Collector/Pipeline/Emit/HttpLineProtocolEmitter.cs +++ b/src/InfluxDB.Collector/Pipeline/Emit/HttpLineProtocolEmitter.cs @@ -5,7 +5,7 @@ namespace InfluxDB.Collector.Pipeline.Emit { - class HttpLineProtocolEmitter : IDisposable, IPointEmitter + class HttpLineProtocolEmitter : IDisposable, IPointEmitter, ISinglePointEmitter { readonly ILineProtocolClient _client; @@ -29,9 +29,23 @@ public void Emit(PointData[] points) payload.Add(new LineProtocolPoint(point.Measurement, point.Fields, point.Tags, point.UtcTimestamp)); } + SendPayload(payload); + } + + public void Emit(PointData point) + { + var payload = new LineProtocolPayload(); + + payload.Add(new LineProtocolPoint(point.Measurement, point.Fields, point.Tags, point.UtcTimestamp)); + + SendPayload(payload); + } + + private void SendPayload(LineProtocolPayload payload) + { var influxResult = _client.WriteAsync(payload).Result; if (!influxResult.Success) CollectorLog.ReportError(influxResult.ErrorMessage, null); } } -} +} \ No newline at end of file diff --git a/src/InfluxDB.Collector/Pipeline/ISinglePointEmitter.cs b/src/InfluxDB.Collector/Pipeline/ISinglePointEmitter.cs new file mode 100644 index 0000000..078372b --- /dev/null +++ b/src/InfluxDB.Collector/Pipeline/ISinglePointEmitter.cs @@ -0,0 +1,7 @@ +namespace InfluxDB.Collector.Pipeline +{ + interface ISinglePointEmitter + { + void Emit(PointData point); + } +} \ No newline at end of file diff --git a/src/InfluxDB.Collector/Pipeline/NullMetricsCollector.cs b/src/InfluxDB.Collector/Pipeline/NullMetricsCollector.cs index 39ceade..34e0a20 100644 --- a/src/InfluxDB.Collector/Pipeline/NullMetricsCollector.cs +++ b/src/InfluxDB.Collector/Pipeline/NullMetricsCollector.cs @@ -5,5 +5,9 @@ class NullMetricsCollector : MetricsCollector protected override void Emit(PointData[] points) { } + + protected override void Emit(PointData point) + { + } } } diff --git a/src/InfluxDB.Collector/Pipeline/PipelinedMetricsCollector.cs b/src/InfluxDB.Collector/Pipeline/PipelinedMetricsCollector.cs index e9d69d2..6ee606b 100644 --- a/src/InfluxDB.Collector/Pipeline/PipelinedMetricsCollector.cs +++ b/src/InfluxDB.Collector/Pipeline/PipelinedMetricsCollector.cs @@ -1,5 +1,4 @@ - -using System; +using System; namespace InfluxDB.Collector.Pipeline { @@ -24,10 +23,24 @@ protected override void Emit(PointData[] points) _emitter.Emit(points); } + protected override void Emit(PointData point) + { + _enricher.Enrich(point); + + if (_emitter is ISinglePointEmitter singlePointEmitter) + { + singlePointEmitter.Emit(point); + } + else + { + _emitter.Emit(new[] { point }); + } + } + protected override void Dispose(bool disposing) { if (disposing) _dispose(); } } -} +} \ No newline at end of file