Skip to content

Commit dcb498e

Browse files
shestakovgparkhomenko
authored andcommitted
OSS-890: Driver needs to detect and throw an error upon loss of networking (#167)
1 parent 181e1a2 commit dcb498e

File tree

2 files changed

+51
-2
lines changed

2 files changed

+51
-2
lines changed

FaunaDB.Client/Client/FaunaClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ public async Task<StreamingEventHandler> Stream(Expr data = null, IReadOnlyList<
163163

164164
RaiseForStatusCode(responseHttp);
165165

166-
return new StreamingEventHandler(responseHttp.ResponseContent);
166+
return new StreamingEventHandler(responseHttp.ResponseContent, async () => await Query(dataString));
167167
}
168168

169169
internal struct ErrorsWrapper

FaunaDB.Client/Client/StreamingEventHandler.cs

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,61 @@
22
using System.Collections.Generic;
33
using System.IO;
44
using System.Linq;
5+
using System.Threading;
6+
using System.Threading.Tasks;
57
using FaunaDB.Errors;
68
using FaunaDB.Types;
79

810
namespace FaunaDB.Client
911
{
1012
public class StreamingEventHandler : IObservable<Value>, IDisposable
1113
{
14+
private const int TIME_OUT_IN_MILLIS = 10000;
1215
private readonly List<IObserver<Value>> observers;
1316
private StreamReader streamReader;
17+
private CancellationTokenSource cancelTokenSource;
18+
private Task checkConnectionTask;
1419

1520
private static Field<string> CODE = Field.At("event", "code").To<string>();
1621
private static Field<string> DESCRIPTION = Field.At("event", "description").To<string>();
1722
private static Field<string> TYPE = Field.At("type").To<string>();
1823

19-
public StreamingEventHandler(Stream dataSource)
24+
public StreamingEventHandler(Stream dataSource, Func<Task> checkConnection = null)
2025
{
2126
dataSource.AssertNotNull(nameof(dataSource));
2227
observers = new List<IObserver<Value>>();
2328
streamReader = new StreamReader(dataSource);
29+
if (checkConnection != null)
30+
{
31+
cancelTokenSource = new CancellationTokenSource();
32+
CancellationToken token = cancelTokenSource.Token;
33+
34+
checkConnectionTask = Task.Factory.StartNew(async () =>
35+
{
36+
while (true)
37+
{
38+
if (token.IsCancellationRequested)
39+
{
40+
return;
41+
}
42+
43+
try
44+
{
45+
await checkConnection?.Invoke();
46+
}
47+
catch (Exception ex)
48+
{
49+
foreach (var observer in observers.ToList())
50+
{
51+
observer.OnError(ex);
52+
}
53+
}
54+
55+
Task.Delay(TIME_OUT_IN_MILLIS, token).Wait();
56+
}
57+
}
58+
);
59+
}
2460
}
2561

2662
public IDisposable Subscribe(IObserver<Value> observer)
@@ -61,6 +97,15 @@ public async void RequestData()
6197

6298
public void Complete()
6399
{
100+
if (cancelTokenSource != null)
101+
{
102+
cancelTokenSource.Cancel();
103+
if (!checkConnectionTask.IsCanceled)
104+
{
105+
checkConnectionTask.Wait();
106+
}
107+
}
108+
64109
foreach (var observer in observers.ToList())
65110
{
66111
observer.OnCompleted();
@@ -70,6 +115,10 @@ public void Complete()
70115
public void Dispose()
71116
{
72117
streamReader.Dispose();
118+
if (cancelTokenSource != null)
119+
{
120+
cancelTokenSource.Cancel();
121+
}
73122
}
74123

75124
private FaunaException ConstructStreamingException(Exception ex)

0 commit comments

Comments
 (0)