Skip to content

Commit e9ebdce

Browse files
authored
Merge pull request #241 from zhenlineo/1.5-receive-run-reply
Changed the impl for waiting for keys to arrive
2 parents 16c606e + 7728991 commit e9ebdce

File tree

9 files changed

+125
-55
lines changed

9 files changed

+125
-55
lines changed

Neo4j.Driver/Neo4j.Driver.Tests/Result/ResultBuilderTests.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,8 @@ public class CollectFieldsMethod
143143
public void ShouldPassDefaultKeysToResultIfNoKeySet()
144144
{
145145
var builder = new ResultBuilder(null, () => { }, null, null);
146+
builder.DoneSuccess();
147+
146148
var result = builder.PreBuild();
147149

148150
result.Keys.Should().BeEmpty();
@@ -153,6 +155,7 @@ public void ShouldDoNothingWhenMetaIsNull()
153155
{
154156
var builder = new ResultBuilder(null, () => { }, null, null);
155157
builder.CollectFields(null);
158+
builder.DoneSuccess();
156159

157160
var result = builder.PreBuild();
158161
result.Keys.Should().BeEmpty();
@@ -167,6 +170,7 @@ public void ShouldDoNothingWhenMetaDoesNotContainFields()
167170
{"something", "here" }
168171
};
169172
builder.CollectFields(meta);
173+
builder.DoneSuccess();
170174

171175
var result = builder.PreBuild();
172176
result.Keys.Should().BeEmpty();
@@ -180,6 +184,7 @@ public void ShouldCollectKeys()
180184

181185
var builder = new ResultBuilder(null, () => { }, null, null);
182186
builder.CollectFields(meta);
187+
builder.DoneSuccess();
183188
var result = builder.PreBuild();
184189

185190
result.Keys.Should().ContainInOrder("fieldKey1", "fieldKey2", "fieldKey3");

Neo4j.Driver/Neo4j.Driver.Tests/Result/ResultCursorBuilderTests.cs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ private static Task AssertGetExpectResults(IStatementResultCursor cursor, int nu
5858
public class CollectRecordMethod
5959
{
6060
[Fact]
61-
public void ShouldStreamResults()
61+
public async void ShouldStreamResults()
6262
{
6363
var builder = GenerateBuilder();
6464
var i = 0;
@@ -67,6 +67,7 @@ public void ShouldStreamResults()
6767
if (i++ >= 3)
6868
{
6969
builder.CollectSummary(null);
70+
builder.DoneSuccess();
7071
}
7172
else
7273
{
@@ -75,32 +76,32 @@ public void ShouldStreamResults()
7576

7677
return Task.CompletedTask;
7778
});
78-
var result = builder.PreBuild();
79+
var result = await builder.PreBuildAsync();
7980

8081
var t = AssertGetExpectResults(result, 3, new List<object> {124, 125, 126});
8182

8283
t.Wait();
8384
}
8485

8586
[Fact]
86-
public void ShouldReturnNoResultsWhenNoneRecieved()
87+
public async void ShouldReturnNoResultsWhenNoneRecieved()
8788
{
8889
var builder = GenerateBuilder();
8990
builder.SetReceiveOneFunc(() =>
9091
{
9192
builder.CollectSummary(null);
92-
93+
builder.DoneSuccess();
9394
return Task.CompletedTask;
9495
});
95-
var result = builder.PreBuild();
96+
var result = await builder.PreBuildAsync();
9697

9798
var t = AssertGetExpectResults(result, 0);
9899

99100
t.Wait();
100101
}
101102

102103
[Fact]
103-
public void ShouldReturnQueuedResultsWithExspectedValue()
104+
public async void ShouldReturnQueuedResultsWithExspectedValue()
104105
{
105106
var builder = GenerateBuilder();
106107
List<object> recordValues = new List<object>
@@ -115,15 +116,16 @@ public void ShouldReturnQueuedResultsWithExspectedValue()
115116
builder.CollectRecord(new[] { recordValues[i] });
116117
}
117118
builder.CollectSummary(null);
119+
builder.DoneSuccess();
118120

119-
var result = builder.PreBuild();
121+
var result = await builder.PreBuildAsync();
120122

121123
var task = AssertGetExpectResults(result, recordValues.Count, recordValues);
122124
task.Wait();
123125
}
124126

125127
[Fact]
126-
public void ShouldStopStreamingWhenResultIsInvalid()
128+
public async void ShouldStopStreamingWhenResultIsInvalid()
127129
{
128130
var builder = GenerateBuilder();
129131
var i = 0;
@@ -140,7 +142,7 @@ public void ShouldStopStreamingWhenResultIsInvalid()
140142

141143
return Task.CompletedTask;
142144
});
143-
var result = builder.PreBuild();
145+
var result = await builder.PreBuildAsync();
144146

145147
var t = AssertGetExpectResults(result, 3, new List<object> { 124, 125, 126 });
146148
t.Wait();

Neo4j.Driver/Neo4j.Driver.Tests/SessionTests.cs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ public async void ShouldSendOnRun()
7575
{
7676
var mockConn = new Mock<IConnection>();
7777
mockConn.Setup(x => x.IsOpen).Returns(true);
78+
mockConn.Setup(x => x.Run(It.IsAny<string>(), It.IsAny<IDictionary<string, object>>(),
79+
It.IsAny<IMessageResponseCollector>(), It.IsAny<bool>())).Callback<string, IDictionary<string, object>, IMessageResponseCollector, bool>(
80+
(s, d, c, b) =>
81+
{
82+
c?.DoneSuccess();
83+
});
7884
var session = NewSession(mockConn.Object);
7985
await session.RunAsync("lalalal");
8086

@@ -87,6 +93,12 @@ public async void ResultBuilderShouldObtainServerInfoFromConnection()
8793
{
8894
var mockConn = new Mock<IConnection>();
8995
mockConn.Setup(x => x.IsOpen).Returns(true);
96+
mockConn.Setup(x => x.Run(It.IsAny<string>(), It.IsAny<IDictionary<string, object>>(),
97+
It.IsAny<IMessageResponseCollector>(), It.IsAny<bool>())).Callback<string, IDictionary<string, object>, IMessageResponseCollector, bool>(
98+
(s, d, c, b) =>
99+
{
100+
c?.DoneSuccess();
101+
});
90102
var session = NewSession(mockConn.Object);
91103
await session.RunAsync("lalalal");
92104

@@ -284,6 +296,13 @@ public async void ShouldBeAbleToUseSessionAgainWhenTransactionIsClosed()
284296
{
285297
var mockConn = new Mock<IConnection>();
286298
mockConn.Setup(x => x.IsOpen).Returns(true);
299+
mockConn.Setup(x => x.Run(It.IsAny<string>(), It.IsAny<IDictionary<string, object>>(),
300+
It.IsAny<IMessageResponseCollector>(), It.IsAny<bool>())).Callback<string, IDictionary<string, object>, IMessageResponseCollector, bool>(
301+
(s, d, c, b) =>
302+
{
303+
c?.DoneSuccess();
304+
});
305+
287306
var session = NewSession(mockConn.Object);
288307
var tx = await session.BeginTransactionAsync();
289308
await tx.RollbackAsync();
@@ -295,6 +314,12 @@ public async void ShouldBeAbleToUseSessionAgainWhenTransactionIsClosed()
295314
public async void ShouldClosePreviousRunConnectionWhenRunMoreStatements()
296315
{
297316
var mockConn = new Mock<IConnection>();
317+
mockConn.Setup(x => x.Run(It.IsAny<string>(), It.IsAny<IDictionary<string, object>>(),
318+
It.IsAny<IMessageResponseCollector>(), It.IsAny<bool>())).Callback<string, IDictionary<string, object>, IMessageResponseCollector, bool>(
319+
(s, d, c, b) =>
320+
{
321+
c?.DoneSuccess();
322+
});
298323
var session = NewSession(mockConn.Object);
299324
await session.RunAsync("lalal");
300325

@@ -307,6 +332,12 @@ public async void ShouldClosePreviousRunConnectionWhenRunMoreTransactions()
307332
{
308333
var mockConn = new Mock<IConnection>();
309334
mockConn.Setup(x => x.IsOpen).Returns(false);
335+
mockConn.Setup(x => x.Run(It.IsAny<string>(), It.IsAny<IDictionary<string, object>>(),
336+
It.IsAny<IMessageResponseCollector>(), It.IsAny<bool>())).Callback<string, IDictionary<string, object>, IMessageResponseCollector, bool>(
337+
(s, d, c, b) =>
338+
{
339+
c?.DoneSuccess();
340+
});
310341
var session = NewSession(mockConn.Object);
311342
await session.RunAsync("lala");
312343

@@ -320,6 +351,12 @@ public async void ShouldDisposeConnectionOnRunIfBeginTxFailed()
320351
// Given
321352
var mockConn = new Mock<IConnection>();
322353
mockConn.Setup(x => x.IsOpen).Returns(true);
354+
mockConn.Setup(x => x.Run(It.IsAny<string>(), It.IsAny<IDictionary<string, object>>(),
355+
It.IsAny<IMessageResponseCollector>(), It.IsAny<bool>())).Callback<string, IDictionary<string, object>, IMessageResponseCollector, bool>(
356+
(s, d, c, b) =>
357+
{
358+
c?.DoneSuccess();
359+
});
323360
mockConn.Setup(x => x.Run("BEGIN", null, null, true))
324361
.Throws(new IOException("Triggered an error when beginTx"));
325362
var session = NewSession(mockConn.Object);
@@ -458,6 +495,12 @@ public async void ShouldDisposeConnectinOnDispose()
458495
{
459496
var mockConn = new Mock<IConnection>();
460497
mockConn.Setup(x => x.IsOpen).Returns(true);
498+
mockConn.Setup(x => x.Run(It.IsAny<string>(), It.IsAny<IDictionary<string, object>>(),
499+
It.IsAny<IMessageResponseCollector>(), It.IsAny<bool>())).Callback<string, IDictionary<string, object>, IMessageResponseCollector, bool>(
500+
(s, d, c, b) =>
501+
{
502+
c?.DoneSuccess();
503+
});
461504
var session = NewSession(mockConn.Object);
462505
await session.RunAsync("lalal");
463506
await session.CloseAsync();
@@ -472,6 +515,12 @@ public async void ShouldAllowDisposeAfterCloseAsync()
472515
// Given
473516
var mockConn = new Mock<IConnection>();
474517
mockConn.Setup(x => x.IsOpen).Returns(true);
518+
mockConn.Setup(x => x.Run(It.IsAny<string>(), It.IsAny<IDictionary<string, object>>(),
519+
It.IsAny<IMessageResponseCollector>(), It.IsAny<bool>())).Callback<string, IDictionary<string, object>, IMessageResponseCollector, bool>(
520+
(s, d, c, b) =>
521+
{
522+
c?.DoneSuccess();
523+
});
475524
var session = NewSession(mockConn.Object);
476525
await session.RunAsync("lalal");
477526

@@ -492,6 +541,12 @@ public async void ShouldAllowCloseAsyncAfterDispose()
492541
// Given
493542
var mockConn = new Mock<IConnection>();
494543
mockConn.Setup(x => x.IsOpen).Returns(true);
544+
mockConn.Setup(x => x.Run(It.IsAny<string>(), It.IsAny<IDictionary<string, object>>(),
545+
It.IsAny<IMessageResponseCollector>(), It.IsAny<bool>())).Callback<string, IDictionary<string, object>, IMessageResponseCollector, bool>(
546+
(s, d, c, b) =>
547+
{
548+
c?.DoneSuccess();
549+
});
495550
var session = NewSession(mockConn.Object);
496551
await session.RunAsync("lalal");
497552

Neo4j.Driver/Neo4j.Driver.Tests/TransactionTests.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,12 @@ public class RunAsyncMethod
192192
public async void ShouldRunPullAllSyncRun()
193193
{
194194
var mockConn = new Mock<IConnection>();
195+
mockConn.Setup(x => x.Run(It.IsAny<string>(), It.IsAny<IDictionary<string, object>>(),
196+
It.IsAny<IMessageResponseCollector>(), It.IsAny<bool>())).Callback<string, IDictionary<string, object>, IMessageResponseCollector, bool>(
197+
(s, d, c, b) =>
198+
{
199+
c?.DoneSuccess();
200+
});
195201
var tx = new Transaction(mockConn.Object);
196202

197203
await tx.RunAsync("lalala");
@@ -225,6 +231,12 @@ public async void ShouldThrowExceptionIfPreviousTxFailed()
225231
public async void ShouldThrowExceptionIfFailedToRunAndFetchResult()
226232
{
227233
var mockConn = new Mock<IConnection>();
234+
mockConn.Setup(x => x.Run(It.IsAny<string>(), It.IsAny<IDictionary<string, object>>(),
235+
It.IsAny<IMessageResponseCollector>(), It.IsAny<bool>())).Callback<string, IDictionary<string, object>, IMessageResponseCollector, bool>(
236+
(s, d, c, b) =>
237+
{
238+
c?.DoneSuccess();
239+
});
228240
var tx = new Transaction(mockConn.Object);
229241

230242
mockConn.Setup(x => x.Run(It.IsAny<string>(), new Dictionary<string, object>(), It.IsAny<ResultCursorBuilder>(), true))
@@ -238,6 +250,13 @@ public async void ShouldThrowExceptionIfFailedToRunAndFetchResult()
238250
public async void ResultBuilderShouldObtainServerInfoFromConnection()
239251
{
240252
var mockConn = new Mock<IConnection>();
253+
mockConn.Setup(x => x.Run(It.IsAny<string>(), It.IsAny<IDictionary<string, object>>(),
254+
It.IsAny<IMessageResponseCollector>(), It.IsAny<bool>())).Callback<string, IDictionary<string, object>, IMessageResponseCollector, bool>(
255+
(s, d, c, b) =>
256+
{
257+
c?.DoneSuccess();
258+
});
259+
241260
var tx = new Transaction(mockConn.Object);
242261

243262
await tx.RunAsync("lalala");

Neo4j.Driver/Neo4j.Driver/Internal/Result/ResultBuilder.cs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,16 @@ public ResultBuilder(string statement, IDictionary<string, object> parameters,
4343

4444
public StatementResult PreBuild()
4545
{
46-
return new StatementResult(() => Keys, new RecordSet(NextRecord), Summary);
46+
return new StatementResult(GetKeys, new RecordSet(NextRecord), Summary);
47+
}
48+
49+
private List<string> GetKeys()
50+
{
51+
while (!StatementProcessed)
52+
{
53+
_receiveOneAction();
54+
}
55+
return Keys;
4756
}
4857

4958
/// <summary>
@@ -92,11 +101,6 @@ internal void SetReceiveOneAction(Action receiveOneAction)
92101
};
93102
}
94103

95-
protected override void EnsureStatementProcessed()
96-
{
97-
_receiveOneAction();
98-
}
99-
100104
protected override void EnqueueRecord(Record record)
101105
{
102106
_records.Enqueue(record);
@@ -107,4 +111,4 @@ protected override void NoMoreRecords()
107111
_hasMoreRecords = false;
108112
}
109113
}
110-
}
114+
}

Neo4j.Driver/Neo4j.Driver/Internal/Result/ResultBuilderBase.cs

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,36 +22,23 @@ namespace Neo4j.Driver.Internal.Result
2222
{
2323
internal abstract class ResultBuilderBase : IMessageResponseCollector
2424
{
25-
private bool _statementProcessed = false;
26-
protected List<string> _keys = new List<string>();
25+
protected bool StatementProcessed { get; set; } = false;
26+
protected List<string> Keys { get; } = new List<string>();
2727
protected SummaryCollector SummaryCollector { get; }
2828

2929
protected ResultBuilderBase(Statement statement, IServerInfo server)
3030
{
3131
SummaryCollector = new SummaryCollector(statement, server);
3232
}
3333

34-
protected List<string> Keys
35-
{
36-
get
37-
{
38-
if (!_statementProcessed)
39-
{
40-
EnsureStatementProcessed();
41-
}
42-
43-
return _keys;
44-
}
45-
}
46-
4734
public void CollectFields(IDictionary<string, object> meta)
4835
{
4936
if (meta == null)
5037
{
5138
return;
5239
}
5340

54-
CollectKeys(meta, "fields", _keys);
41+
CollectKeys(meta, "fields", Keys);
5542
SummaryCollector.CollectWithFields(meta);
5643
}
5744

@@ -63,7 +50,7 @@ public void CollectBookmark(IDictionary<string, object> meta)
6350

6451
public void CollectRecord(object[] fields)
6552
{
66-
var record = new Record(_keys, fields);
53+
var record = new Record(Keys, fields);
6754
EnqueueRecord(record);
6855
}
6956

@@ -80,22 +67,21 @@ public void CollectSummary(IDictionary<string, object> meta)
8067
public void DoneSuccess()
8168
{
8269
// do nothing
83-
_statementProcessed = true;
70+
StatementProcessed = true;
8471
}
8572

8673
public void DoneFailure()
8774
{
8875
NoMoreRecords();// an error received, so the result is broken
89-
_statementProcessed = true;
76+
StatementProcessed = true;
9077
}
9178

9279
public void DoneIgnored()
9380
{
9481
NoMoreRecords();// the result is ignored
95-
_statementProcessed = true;
82+
StatementProcessed = true;
9683
}
9784

98-
protected abstract void EnsureStatementProcessed();
9985
protected abstract void NoMoreRecords();
10086
protected abstract void EnqueueRecord(Record record);
10187

@@ -107,4 +93,4 @@ private static void CollectKeys(IDictionary<string, object> meta, string name, L
10793
}
10894
}
10995
}
110-
}
96+
}

0 commit comments

Comments
 (0)