Skip to content

Commit 195697f

Browse files
authored
DRV-490: Streaming (#132)
1 parent ec25741 commit 195697f

File tree

13 files changed

+797
-29
lines changed

13 files changed

+797
-29
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,5 @@ Thumbs.db
4040

4141
#dotCover
4242
*.dotCover
43+
44+
.idea/

FaunaDB.Client.Test/ClientTest.cs

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
using FaunaDB.Query;
55
using System;
66
using System.Linq;
7-
using System.Text;
87
using System.Threading.Tasks;
98
using System.Collections.Generic;
109
using NUnit.Framework;
@@ -20,7 +19,6 @@ namespace Test
2019
public class ClientTest : TestCase
2120
{
2221
private static Field<Value> DATA = Field.At("data");
23-
private static Field<RefV> REF_FIELD = Field.At("ref").To<RefV>();
2422
private static Field<long> TS_FIELD = Field.At("ts").To<long>();
2523
private static Field<RefV> DOCUMENT_FIELD = Field.At("document").To<RefV>();
2624
private static Field<IReadOnlyList<RefV>> REF_LIST = DATA.Collect(Field.To<RefV>());
@@ -38,9 +36,6 @@ public class ClientTest : TestCase
3836
private static RefV thorSpell1;
3937
private static RefV thorSpell2;
4038

41-
RefV GetRef(Value v) =>
42-
v.Get(REF_FIELD);
43-
4439
[OneTimeSetUp]
4540
new public void SetUp()
4641
{
@@ -2205,28 +2200,7 @@ await client.Query(CreateIndex(Obj(
22052200
);
22062201
}
22072202

2208-
private async Task<RefV> RandomCollection()
2209-
{
2210-
Value coll = await client.Query(
2211-
CreateCollection(
2212-
Obj("name", RandomStartingWith("some_coll_")))
2213-
);
2214-
2215-
return GetRef(coll);
2216-
}
2217-
2218-
private string RandomStartingWith(params string[] strs)
2219-
{
2220-
StringBuilder builder = new StringBuilder();
2221-
foreach (var str in strs)
2222-
builder.Append(str);
2223-
2224-
builder.Append(new Random().Next(0, int.MaxValue));
2225-
2226-
return builder.ToString();
2227-
}
2228-
2229-
static void AssertErrors(FaunaException ex, string code, string description)
2203+
internal static void AssertErrors(FaunaException ex, string code, string description)
22302204
{
22312205
Assert.That(ex.Errors, Has.Count.EqualTo(1));
22322206
Assert.AreEqual(code, ex.Errors[0].Code);
Lines changed: 290 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading.Tasks;
4+
using FaunaDB.Client;
5+
using FaunaDB.Errors;
6+
using FaunaDB.Types;
7+
using NUnit.Framework;
8+
9+
using static FaunaDB.Query.Language;
10+
using static Test.ClientTest;
11+
12+
namespace Test
13+
{
14+
public class StreamingTest : TestCase
15+
{
16+
[OneTimeSetUp]
17+
public new void SetUp()
18+
{
19+
SetUpAsync().Wait();
20+
}
21+
22+
async Task SetUpAsync()
23+
{
24+
await adminClient.Query(CreateCollection(Obj("name", "streams_test")));
25+
}
26+
27+
[Test]
28+
public async Task TestThatStreamFailsIfTargetDoesNotExist()
29+
{
30+
AsyncTestDelegate doc = async () => { await adminClient.Stream(Get(Ref(Collection("streams_test"), "1234"))); };
31+
32+
var ex = Assert.ThrowsAsync<NotFound>(doc);
33+
Assert.AreEqual("instance not found: Document not found.", ex.Message);
34+
AssertErrors(ex, code: "instance not found", description: "Document not found.");
35+
}
36+
37+
[Test]
38+
public async Task TestStreamFailsIfIncorrectValuePassedToStreamMethod()
39+
{
40+
AsyncTestDelegate doc = async () => { await adminClient.Stream(Collection("streams_test")); };
41+
42+
var ex = Assert.ThrowsAsync<BadRequest>(doc);
43+
Assert.AreEqual("invalid argument: Expected a Document Ref or Version, got Collection Ref.", ex.Message);
44+
AssertErrors(ex, code: "invalid argument", description: "Expected a Document Ref or Version, got Collection Ref.");
45+
}
46+
47+
[Test]
48+
public async Task TestStreamFailsIfQueryIsNotReadOnly()
49+
{
50+
AsyncTestDelegate doc = async () => { await adminClient.Stream(CreateCollection(Collection("streams_test"))); };
51+
52+
var ex = Assert.ThrowsAsync<BadRequest>(doc);
53+
Assert.AreEqual("invalid expression: Write effect in read-only query expression.", ex.Message);
54+
AssertErrors(ex, code: "invalid expression", description: "Write effect in read-only query expression.");
55+
}
56+
57+
[Test]
58+
public async Task TestStreamEventsOnDocumentReferenceWithDocumentFieldByDefault()
59+
{
60+
Value createdInstance = await adminClient.Query(
61+
Create(await RandomCollection(),
62+
Obj("credentials",
63+
Obj("password", "abcdefg"))));
64+
65+
var docRef = createdInstance.At("ref");
66+
67+
var provider = await adminClient.Stream(docRef);
68+
69+
var done = new TaskCompletionSource<object>();
70+
71+
List<Value> events = new List<Value>();
72+
73+
var monitor = new StreamingEventMonitor(
74+
value =>
75+
{
76+
events.Add(value);
77+
if (events.Count == 4)
78+
{
79+
provider.Complete();
80+
}
81+
else
82+
{
83+
provider.RequestData();
84+
}
85+
},
86+
ex => { done.SetException(ex); },
87+
() => { done.SetResult(null); }
88+
);
89+
90+
// subscribe to data provider
91+
monitor.Subscribe(provider);
92+
93+
// push 3 updates
94+
await adminClient.Query(Update(docRef, Obj("data", Obj("testField", "testValue1"))));
95+
await adminClient.Query(Update(docRef, Obj("data", Obj("testField", "testValue2"))));
96+
await adminClient.Query(Update(docRef, Obj("data", Obj("testField", "testValue3"))));
97+
98+
// blocking until we receive all the events
99+
await done.Task;
100+
101+
// clear the subscription
102+
monitor.Unsubscribe();
103+
104+
Value startEvent = events[0];
105+
Assert.AreEqual("start", startEvent.At("type").To<string>().Value);
106+
107+
Value e1 = events[1];
108+
Assert.AreEqual("version", e1.At("type").To<string>().Value);
109+
Assert.AreEqual("testValue1", e1.At("event", "document", "data", "testField").To<string>().Value);
110+
111+
Value e2 = events[2];
112+
Assert.AreEqual("version", e1.At("type").To<string>().Value);
113+
Assert.AreEqual("testValue2", e2.At("event", "document", "data", "testField").To<string>().Value);
114+
115+
Value e3 = events[3];
116+
Assert.AreEqual("version", e1.At("type").To<String>().Value);
117+
Assert.AreEqual("testValue3", e3.At("event", "document", "data", "testField").To<string>().Value);
118+
}
119+
120+
[Test]
121+
public async Task TeststreamEventsOnDocumentReferenceWithOptInFields()
122+
{
123+
Value createdInstance = await adminClient.Query(
124+
Create(await RandomCollection(),
125+
Obj("data",
126+
Obj("testField", "testValue0"))));
127+
128+
var docRef = createdInstance.At("ref");
129+
130+
var fields = new List<EventField> {
131+
EventField.ActionField,
132+
EventField.DiffField,
133+
EventField.DocumentField,
134+
EventField.PrevField
135+
};
136+
137+
var provider = await adminClient.Stream(docRef, fields);
138+
139+
var done = new TaskCompletionSource<object>();
140+
141+
List<Value> events = new List<Value>();
142+
143+
var monitor = new StreamingEventMonitor(
144+
value =>
145+
{
146+
events.Add(value);
147+
if (events.Count == 4)
148+
{
149+
provider.Complete();
150+
}
151+
else
152+
{
153+
provider.RequestData();
154+
}
155+
},
156+
ex => { done.SetException(ex); },
157+
() => { done.SetResult(null); }
158+
);
159+
160+
// subscribe to data provider
161+
monitor.Subscribe(provider);
162+
163+
// push 3 updates
164+
await adminClient.Query(Update(docRef, Obj("data", Obj("testField", "testValue1"))));
165+
await adminClient.Query(Update(docRef, Obj("data", Obj("testField", "testValue2"))));
166+
await adminClient.Query(Update(docRef, Obj("data", Obj("testField", "testValue3"))));
167+
168+
// blocking until we receive all the events
169+
await done.Task;
170+
171+
// clear the subscription
172+
monitor.Unsubscribe();
173+
174+
Value startEvent = events[0];
175+
Assert.AreEqual("start", startEvent.At("type").To<string>().Value);
176+
177+
Value e1 = events[1];
178+
Assert.AreEqual("version", e1.At("type").To<string>().Value);
179+
Assert.AreEqual("update", e1.At("event", "action").To<string>().Value);
180+
Assert.AreEqual(
181+
FaunaDB.Collections.ImmutableDictionary.Of("testField", StringV.Of("testValue1")),
182+
((ObjectV)e1.At("event", "diff", "data")).Value);
183+
Assert.AreEqual(
184+
FaunaDB.Collections.ImmutableDictionary.Of("testField", StringV.Of("testValue1")),
185+
((ObjectV)e1.At("event", "document", "data")).Value);
186+
Assert.AreEqual(
187+
FaunaDB.Collections.ImmutableDictionary.Of("testField", StringV.Of("testValue0")),
188+
((ObjectV)e1.At("event", "prev", "data")).Value);
189+
190+
Value e2 = events[2];
191+
Assert.AreEqual("version", e2.At("type").To<string>().Value);
192+
Assert.AreEqual("update", e2.At("event", "action").To<string>().Value);
193+
Assert.AreEqual(
194+
FaunaDB.Collections.ImmutableDictionary.Of("testField", StringV.Of("testValue2")),
195+
((ObjectV)e2.At("event", "diff", "data")).Value);
196+
Assert.AreEqual(
197+
FaunaDB.Collections.ImmutableDictionary.Of("testField", StringV.Of("testValue2")),
198+
((ObjectV)e2.At("event", "document", "data")).Value);
199+
Assert.AreEqual(
200+
FaunaDB.Collections.ImmutableDictionary.Of("testField", StringV.Of("testValue1")),
201+
((ObjectV)e2.At("event", "prev", "data")).Value);
202+
203+
Value e3 = events[3];
204+
Assert.AreEqual("version", e3.At("type").To<string>().Value);
205+
Assert.AreEqual("update", e3.At("event", "action").To<string>().Value);
206+
Assert.AreEqual(
207+
FaunaDB.Collections.ImmutableDictionary.Of("testField", StringV.Of("testValue3")),
208+
((ObjectV)e3.At("event", "diff", "data")).Value);
209+
Assert.AreEqual(
210+
FaunaDB.Collections.ImmutableDictionary.Of("testField", StringV.Of("testValue3")),
211+
((ObjectV)e3.At("event", "document", "data")).Value);
212+
Assert.AreEqual(
213+
FaunaDB.Collections.ImmutableDictionary.Of("testField", StringV.Of("testValue2")),
214+
((ObjectV)e3.At("event", "prev", "data")).Value);
215+
}
216+
217+
[Test]
218+
public async Task TestStreamHandlesLossOfAuthorization()
219+
{
220+
await adminClient.Query(
221+
CreateCollection(Obj("name", "streamed-things-auth"))
222+
);
223+
224+
Value createdInstance = await adminClient.Query(
225+
Create(Collection("streamed-things-auth"),
226+
Obj("credentials",
227+
Obj("password", "abcdefg"))));
228+
229+
var docRef = createdInstance.At("ref");
230+
231+
// new key + client
232+
Value newKey = await adminClient.Query(CreateKey(Obj("role", "server-readonly")));
233+
FaunaClient streamingClient = adminClient.NewSessionClient(newKey.At("secret").To<string>().Value);
234+
235+
var provider = await streamingClient.Stream(docRef);
236+
237+
var done = new TaskCompletionSource<object>();
238+
239+
List<Value> events = new List<Value>();
240+
241+
var monitor = new StreamingEventMonitor(
242+
async value =>
243+
{
244+
if (events.Count == 0)
245+
{
246+
try
247+
{
248+
// update doc on `start` event
249+
await adminClient.Query(Update(docRef, Obj("data", Obj("testField", "afterStart"))));
250+
251+
// delete key
252+
await adminClient.Query(Delete(newKey.At("ref").To<RefV>().Value));
253+
254+
// push an update to force auth revalidation.
255+
await adminClient.Query(Update(docRef, Obj("data", Obj("testField", "afterKeyDelete"))));
256+
}
257+
catch (Exception ex)
258+
{
259+
done.SetException(ex);
260+
}
261+
}
262+
263+
// capture element
264+
events.Add(value);
265+
266+
// ask for more elements
267+
provider.RequestData();
268+
},
269+
ex => { done.SetException(ex); },
270+
() => { done.SetResult(null); }
271+
);
272+
273+
// subscribe to data provider
274+
monitor.Subscribe(provider);
275+
276+
// wrapping an asynchronous call
277+
AsyncTestDelegate res = async () => await done.Task;
278+
279+
// blocking until we get an exception
280+
var exception = Assert.ThrowsAsync<StreamingException>(res);
281+
282+
// clear the subscription
283+
monitor.Unsubscribe();
284+
285+
// validating exception message
286+
Assert.AreEqual("permission denied: Authorization lost during stream evaluation.", exception.Message);
287+
AssertErrors(exception, code: "permission denied", description: "Authorization lost during stream evaluation.");
288+
}
289+
}
290+
}

0 commit comments

Comments
 (0)