Skip to content

Commit 7eb87bc

Browse files
zivillianwiredbarb
authored andcommitted
add CancellationToken (#13)
* add CancellationToken
1 parent fd75fcc commit 7eb87bc

File tree

4 files changed

+48
-15
lines changed

4 files changed

+48
-15
lines changed

Graphite/GraphiteClient.cs

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Net.Http;
55
using System.Net.Sockets;
66
using System.Text;
7+
using System.Threading;
78
using System.Threading.Tasks;
89
using ahd.Graphite.Base;
910
using ahd.Graphite.Exceptions;
@@ -81,10 +82,11 @@ public GraphiteClient()
8182
/// </summary>
8283
/// <param name="series">metric path</param>
8384
/// <param name="value">metric value</param>
85+
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
8486
/// <returns></returns>
85-
public Task SendAsync(string series, double value)
87+
public Task SendAsync(string series, double value, CancellationToken cancellationToken = default(CancellationToken))
8688
{
87-
return SendAsync(series, value, DateTime.Now);
89+
return SendAsync(series, value, DateTime.Now, cancellationToken);
8890
}
8991

9092
/// <summary>
@@ -93,10 +95,11 @@ public Task SendAsync(string series, double value)
9395
/// <param name="series">metric path</param>
9496
/// <param name="value">metric value</param>
9597
/// <param name="timestamp">metric timestamp</param>
98+
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
9699
/// <returns></returns>
97-
public Task SendAsync(string series, double value, DateTime timestamp)
100+
public Task SendAsync(string series, double value, DateTime timestamp, CancellationToken cancellationToken = default(CancellationToken))
98101
{
99-
return SendAsync(new Datapoint(series, value, timestamp));
102+
return SendAsync(new []{new Datapoint(series, value, timestamp)}, cancellationToken);
100103
}
101104

102105
/// <summary>
@@ -105,35 +108,59 @@ public Task SendAsync(string series, double value, DateTime timestamp)
105108
/// <param name="datapoints"></param>
106109
/// <returns></returns>
107110
public Task SendAsync(params Datapoint[] datapoints)
111+
{
112+
return SendAsync(datapoints, CancellationToken.None);
113+
}
114+
115+
/// <summary>
116+
/// Send a list of datapoints in up to <see cref="BatchSize"/> batches
117+
/// </summary>
118+
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
119+
/// <param name="datapoints"></param>
120+
/// <returns></returns>
121+
public Task SendAsync(CancellationToken cancellationToken, params Datapoint[] datapoints)
122+
{
123+
return SendAsync(datapoints, cancellationToken);
124+
}
125+
126+
/// <summary>
127+
/// Send a list of datapoints in up to <see cref="BatchSize"/> batches
128+
/// </summary>
129+
/// <param name="datapoints"></param>
130+
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
131+
/// <returns></returns>
132+
public Task SendAsync(Datapoint[] datapoints, CancellationToken cancellationToken)
108133
{
109134
ICollection<Datapoint> points = datapoints;
110-
return SendAsync(points);
135+
return SendAsync(points, cancellationToken);
111136
}
112137

113138
/// <summary>
114139
/// Send a list of datapoints in up to <see cref="BatchSize"/> batches
115140
/// </summary>
116141
/// <param name="datapoints"></param>
142+
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
117143
/// <returns></returns>
118-
public async Task SendAsync(ICollection<Datapoint> datapoints)
144+
public async Task SendAsync(ICollection<Datapoint> datapoints, CancellationToken cancellationToken = default(CancellationToken))
119145
{
120146
if (datapoints == null || datapoints.Count == 0) throw new ArgumentNullException(nameof(datapoints));
121147
var batches = GetBatches(datapoints);
122148
foreach (var batch in batches)
123149
{
124-
await SendInternalAsync(batch).ConfigureAwait(false);
150+
await SendInternalAsync(batch, cancellationToken).ConfigureAwait(false);
125151
}
126152
}
127153

128-
private async Task SendInternalAsync(ICollection<Datapoint> datapoints)
154+
private async Task SendInternalAsync(ICollection<Datapoint> datapoints, CancellationToken cancellationToken)
129155
{
130156
using (var client = new TcpClient(AddressFamily.InterNetworkV6))
131157
{
132158
client.Client.DualMode = true;
133159
await client.ConnectAsync(Host, Formatter.Port).ConfigureAwait(false);
160+
cancellationToken.ThrowIfCancellationRequested();
134161
using (var stream = client.GetStream())
135162
{
136-
await Formatter.WriteAsync(stream, datapoints).ConfigureAwait(false);
163+
await Formatter.WriteAsync(stream, datapoints, cancellationToken).ConfigureAwait(false);
137164
}
138165
}
139166
}

Graphite/IGraphiteFormatter.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Generic;
33
using System.IO;
4+
using System.Threading;
45
using System.Threading.Tasks;
56

67
namespace ahd.Graphite
@@ -15,7 +16,8 @@ public interface IGraphiteFormatter
1516
/// </summary>
1617
/// <param name="stream">target stream to write the data to</param>
1718
/// <param name="datapoints">list of datapoints to send</param>
18-
Task WriteAsync(Stream stream, ICollection<Datapoint> datapoints);
19+
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
20+
Task WriteAsync(Stream stream, ICollection<Datapoint> datapoints, CancellationToken cancellationToken = default(CancellationToken));
1921

2022
/// <summary>
2123
/// Format the datapoints and write to the target stream

Graphite/PickleGraphiteFormatter.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Collections.Generic;
33
using System.IO;
44
using System.Linq;
5+
using System.Threading;
56
using System.Threading.Tasks;
67
using Razorvine.Pickle;
78

@@ -33,7 +34,7 @@ public PickleGraphiteFormatter()
3334
}
3435

3536
/// <inheritdoc/>
36-
public async Task WriteAsync(Stream stream, ICollection<Datapoint> datapoints)
37+
public async Task WriteAsync(Stream stream, ICollection<Datapoint> datapoints, CancellationToken cancellationToken = default(CancellationToken))
3738
{
3839
using (var pickler = new Pickler())
3940
{
@@ -45,10 +46,10 @@ public async Task WriteAsync(Stream stream, ICollection<Datapoint> datapoints)
4546
var sizeBytes = BitConverter.GetBytes(size);
4647
if (BitConverter.IsLittleEndian)
4748
Array.Reverse(sizeBytes);
48-
await stream.WriteAsync(sizeBytes, 0, sizeBytes.Length).ConfigureAwait(false);
49+
await stream.WriteAsync(sizeBytes, 0, sizeBytes.Length, cancellationToken).ConfigureAwait(false);
4950

50-
await stream.WriteAsync(pickled, 0, pickled.Length).ConfigureAwait(false);
51-
await stream.FlushAsync().ConfigureAwait(false);
51+
await stream.WriteAsync(pickled, 0, pickled.Length, cancellationToken).ConfigureAwait(false);
52+
await stream.FlushAsync(cancellationToken).ConfigureAwait(false);
5253
}
5354
}
5455

Graphite/PlaintextGraphiteFormatter.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System.Collections.Generic;
22
using System.Globalization;
33
using System.IO;
4+
using System.Threading;
45
using System.Threading.Tasks;
56

67
namespace ahd.Graphite
@@ -31,14 +32,16 @@ public PlaintextGraphiteFormatter(ushort port):this()
3132
public ushort Port { get; }
3233

3334
/// <inheritdoc/>
34-
public async Task WriteAsync(Stream stream, ICollection<Datapoint> datapoints)
35+
public async Task WriteAsync(Stream stream, ICollection<Datapoint> datapoints, CancellationToken cancellationToken = default(CancellationToken))
3536
{
3637
using (var writer = new StreamWriter(stream) {NewLine = "\n"})
3738
{
3839
foreach (var datapoint in datapoints)
3940
{
41+
cancellationToken.ThrowIfCancellationRequested();
4042
await writer.WriteLineAsync($"{datapoint.Series} {datapoint.Value.ToString(CultureInfo.InvariantCulture)} {datapoint.UnixTimestamp}").ConfigureAwait(false);
4143
}
44+
cancellationToken.ThrowIfCancellationRequested();
4245
await writer.FlushAsync().ConfigureAwait(false);
4346
}
4447
}

0 commit comments

Comments
 (0)