Skip to content

Commit a1670b7

Browse files
Fix race condition (#785)
* Fix a race condition in the way message pairs are queued * add a test
1 parent f01703a commit a1670b7

File tree

8 files changed

+145
-71
lines changed

8 files changed

+145
-71
lines changed

Neo4j.Driver/Neo4j.Driver.Tests/Connector/SocketConnectionTests.cs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,24 @@ await con.EnqueueAsync(
216216
await con.EnqueueAsync(PullAllMessage.Instance, NoOpResponseHandler.Instance);
217217
pipeline.Verify(h => h.Enqueue(NoOpResponseHandler.Instance), Times.Exactly(2));
218218
}
219+
220+
[Fact]
221+
public async Task ShouldEnqueueBoth()
222+
{
223+
var pipeline = new Mock<IResponsePipeline>();
224+
var con = NewSocketConnection(pipeline: pipeline.Object);
225+
226+
var m1 = new Mock<IRequestMessage>();
227+
var h1 = new Mock<IResponseHandler>();
228+
var m2 = new Mock<IRequestMessage>();
229+
var h2 = new Mock<IResponseHandler>();
230+
231+
await con.EnqueueAsync(m1.Object, h1.Object, m2.Object, h2.Object);
232+
233+
con.Messages[0].Should().Be(m1.Object);
234+
con.Messages[1].Should().Be(m2.Object);
235+
pipeline.Verify(x => x.Enqueue(It.IsAny<IResponseHandler>()), Times.Exactly(2));
236+
}
219237
}
220238

221239
public class ResetMethod
@@ -347,4 +365,4 @@ public static TheoryData<Exception> GenerateObjectDisposedExceptions()
347365
};
348366
}
349367
}
350-
}
368+
}

Neo4j.Driver/Neo4j.Driver.Tests/Internal/Protocol/BoltProtocolTests.cs

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -448,8 +448,12 @@ public async Task ShouldUseQueryToFetchRoutingTableForBoltVersionLessThan43(int
448448

449449
mockConn.Verify(x => x.ConfigureMode(AccessMode.Read), Times.Once);
450450
mockConn.Verify(
451-
x => x.EnqueueAsync(It.IsAny<IRequestMessage>(), It.IsAny<IResponseHandler>()),
452-
Times.Exactly(2));
451+
x => x.EnqueueAsync(
452+
It.IsAny<RunWithMetadataMessage>(),
453+
It.IsAny<RunResponseHandler>(),
454+
It.IsAny<PullMessage>(),
455+
It.IsAny<PullResponseHandler>()),
456+
Times.Exactly(1));
453457

454458
mockConn.Verify(x => x.SendAsync(), Times.Once);
455459
}
@@ -761,16 +765,12 @@ public async Task ShouldSendPullMessageWhenNotReactive()
761765
Times.Once);
762766

763767
mockConn.Verify(
764-
x => x.EnqueueAsync(It.IsAny<IRequestMessage>(), It.IsAny<IResponseHandler>()),
765-
Times.Exactly(2));
766-
767-
mockConn.Verify(
768-
x => x.EnqueueAsync(It.IsAny<RunWithMetadataMessage>(), It.IsAny<RunResponseHandler>()),
769-
Times.Once);
770-
771-
mockConn.Verify(
772-
x => x.EnqueueAsync(It.IsAny<PullMessage>(), It.IsAny<PullResponseHandler>()),
773-
Times.Once);
768+
x => x.EnqueueAsync(
769+
It.IsNotNull<RunWithMetadataMessage>(),
770+
It.IsNotNull<RunResponseHandler>(),
771+
It.IsNotNull<PullMessage>(),
772+
It.IsNotNull<PullResponseHandler>()),
773+
Times.Exactly(1));
774774

775775
mockConn.Verify(x => x.SendAsync(), Times.Once);
776776
mockConn.Verify(x => x.SyncAsync(), Times.Never);
@@ -1189,15 +1189,11 @@ await protocol.RunInExplicitTransactionAsync(
11891189
Times.Once);
11901190

11911191
mockConn.Verify(
1192-
x => x.EnqueueAsync(It.IsAny<IRequestMessage>(), It.IsAny<IResponseHandler>()),
1193-
Times.Exactly(2));
1194-
1195-
mockConn.Verify(
1196-
x => x.EnqueueAsync(It.IsAny<RunWithMetadataMessage>(), It.IsAny<RunResponseHandler>()),
1197-
Times.Once);
1198-
1199-
mockConn.Verify(
1200-
x => x.EnqueueAsync(It.IsAny<PullMessage>(), It.IsAny<PullResponseHandler>()),
1192+
x => x.EnqueueAsync(
1193+
It.IsAny<RunWithMetadataMessage>(),
1194+
It.IsAny<RunResponseHandler>(),
1195+
It.IsAny<PullMessage>(),
1196+
It.IsAny<PullResponseHandler>()),
12011197
Times.Once);
12021198

12031199
mockConn.Verify(x => x.SendAsync(), Times.Once);

Neo4j.Driver/Neo4j.Driver.Tests/Internal/Protocol/BoltProtocolV3Tests.cs

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,12 @@ public async Task ShouldSendRunWithMetadataMessageToGetRoutingTable()
229229

230230
mockConn.Verify(x => x.ConfigureMode(AccessMode.Read), Times.Once);
231231
mockConn.Verify(
232-
x => x.EnqueueAsync(It.IsAny<IRequestMessage>(), It.IsAny<IResponseHandler>()),
233-
Times.Exactly(2));
232+
x => x.EnqueueAsync(
233+
It.IsAny<IRequestMessage>(),
234+
It.IsAny<IResponseHandler>(),
235+
It.IsAny<IRequestMessage>(),
236+
It.IsAny<IResponseHandler>()),
237+
Times.Once);
234238

235239
mockConn.Verify(x => x.SendAsync(), Times.Once);
236240
}
@@ -391,15 +395,11 @@ public async Task ShouldSendMessages()
391395
Times.Once);
392396

393397
mockConn.Verify(
394-
x => x.EnqueueAsync(It.IsNotNull<IRequestMessage>(), It.IsNotNull<IResponseHandler>()),
395-
Times.Exactly(2));
396-
397-
mockConn.Verify(
398-
x => x.EnqueueAsync(It.IsNotNull<RunWithMetadataMessage>(), It.IsNotNull<RunResponseHandlerV3>()),
399-
Times.Once);
400-
401-
mockConn.Verify(
402-
x => x.EnqueueAsync(PullAllMessage.Instance, It.IsNotNull<PullAllResponseHandler>()),
398+
x => x.EnqueueAsync(
399+
It.IsNotNull<RunWithMetadataMessage>(),
400+
It.IsNotNull<RunResponseHandlerV3>(),
401+
PullAllMessage.Instance,
402+
It.IsNotNull<PullAllResponseHandler>()),
403403
Times.Once);
404404

405405
mockConn.Verify(x => x.SendAsync(), Times.Once);
@@ -672,15 +672,11 @@ public async Task ShouldSendMessages()
672672
Times.Once);
673673

674674
mockConn.Verify(
675-
x => x.EnqueueAsync(It.IsNotNull<IRequestMessage>(), It.IsNotNull<IResponseHandler>()),
676-
Times.Exactly(2));
677-
678-
mockConn.Verify(
679-
x => x.EnqueueAsync(It.IsNotNull<RunWithMetadataMessage>(), It.IsNotNull<RunResponseHandlerV3>()),
680-
Times.Once);
681-
682-
mockConn.Verify(
683-
x => x.EnqueueAsync(PullAllMessage.Instance, It.IsNotNull<PullAllResponseHandler>()),
675+
x => x.EnqueueAsync(
676+
It.IsNotNull<RunWithMetadataMessage>(),
677+
It.IsNotNull<RunResponseHandlerV3>(),
678+
PullAllMessage.Instance,
679+
It.IsNotNull<PullAllResponseHandler>()),
684680
Times.Once);
685681

686682
mockConn.Verify(x => x.SendAsync(), Times.Once);

Neo4j.Driver/Neo4j.Driver/Internal/Connector/DelegatedConnection.cs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,22 @@ public async Task EnqueueAsync(IRequestMessage message, IResponseHandler handler
134134
}
135135
}
136136

137+
public async ValueTask EnqueueAsync(
138+
IRequestMessage message1,
139+
IResponseHandler handler1,
140+
IRequestMessage message2,
141+
IResponseHandler handler2)
142+
{
143+
try
144+
{
145+
await Delegate.EnqueueAsync(message1, handler1, message2, handler2).ConfigureAwait(false);
146+
}
147+
catch (Exception e)
148+
{
149+
await OnErrorAsync(e).ConfigureAwait(false);
150+
}
151+
}
152+
137153
public virtual bool IsOpen => Delegate.IsOpen;
138154

139155
public IServerInfo Server => Delegate.Server;
@@ -190,7 +206,7 @@ public ValueTask ValidateCredsAsync()
190206
return Delegate.ValidateCredsAsync();
191207
}
192208

193-
/// <inheritdoc />
209+
/// <inheritdoc/>
194210
public bool TelemetryEnabled
195211
{
196212
get => Delegate.TelemetryEnabled;
@@ -232,7 +248,10 @@ public Task BeginTransactionAsync(BeginTransactionParams beginTransactionParams)
232248
return BoltProtocol.BeginTransactionAsync(this, beginTransactionParams);
233249
}
234250

235-
public Task<IResultCursor> RunInExplicitTransactionAsync(Query query, bool reactive, long fetchSize,
251+
public Task<IResultCursor> RunInExplicitTransactionAsync(
252+
Query query,
253+
bool reactive,
254+
long fetchSize,
236255
IInternalAsyncTransaction transaction)
237256
{
238257
return BoltProtocol.RunInExplicitTransactionAsync(this, query, reactive, fetchSize, transaction);

Neo4j.Driver/Neo4j.Driver/Internal/Connector/IConnection.cs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ internal interface IConnection : IConnectionDetails, IConnectionRunner
6060
IAuthTokenManager AuthTokenManager { get; }
6161

6262
public SessionConfig SessionConfig { get; set; }
63+
bool TelemetryEnabled { get; set; }
6364

6465
void ConfigureMode(AccessMode? mode);
6566
void Configure(string database, AccessMode? mode);
@@ -83,6 +84,13 @@ Task InitAsync(
8384

8485
Task EnqueueAsync(IRequestMessage message, IResponseHandler handler);
8586

87+
ValueTask EnqueueAsync(
88+
IRequestMessage message1,
89+
IResponseHandler handler1,
90+
IRequestMessage message2,
91+
IResponseHandler handler2
92+
);
93+
8694
// Enqueue a reset message
8795
Task ResetAsync();
8896

@@ -100,7 +108,6 @@ Task InitAsync(
100108

101109
void SetUseUtcEncodedDateTime();
102110
ValueTask ValidateCredsAsync();
103-
bool TelemetryEnabled { get; set; }
104111
}
105112

106113
internal interface IConnectionRunner
@@ -123,8 +130,12 @@ Task<IResultCursor> RunInAutoCommitTransactionAsync(
123130

124131
Task BeginTransactionAsync(BeginTransactionParams beginParams);
125132

126-
Task<IResultCursor> RunInExplicitTransactionAsync(Query query, bool reactive, long fetchSize,
133+
Task<IResultCursor> RunInExplicitTransactionAsync(
134+
Query query,
135+
bool reactive,
136+
long fetchSize,
127137
IInternalAsyncTransaction transaction);
138+
128139
Task CommitTransactionAsync(IBookmarksTracker bookmarksTracker);
129140
Task RollbackTransactionAsync();
130141
}

Neo4j.Driver/Neo4j.Driver/Internal/Connector/SocketConnection.cs

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ internal SocketConnection(
7373
ServerInfo server,
7474
IResponsePipeline responsePipeline = null,
7575
IAuthTokenManager authTokenManager = null,
76-
IBoltProtocolFactory protocolFactory = null,
76+
IBoltProtocolFactory protocolFactory = null,
7777
DriverContext context = null)
7878
{
7979
_client = socketClient ?? throw new ArgumentNullException(nameof(socketClient));
@@ -88,6 +88,7 @@ internal SocketConnection(
8888
}
8989

9090
internal IReadOnlyList<IRequestMessage> Messages => _messages.ToList();
91+
public DriverContext Context { get; }
9192

9293
public AccessMode? Mode { get; private set; }
9394

@@ -138,7 +139,11 @@ public async Task InitAsync(
138139

139140
try
140141
{
141-
await BoltProtocol.AuthenticateAsync(this, Context.Config.UserAgent, authToken, Context.Config.NotificationsConfig)
142+
await BoltProtocol.AuthenticateAsync(
143+
this,
144+
Context.Config.UserAgent,
145+
authToken,
146+
Context.Config.NotificationsConfig)
142147
.ConfigureAwait(false);
143148
}
144149
catch (Exception ex)
@@ -242,6 +247,27 @@ public async Task ReceiveOneAsync()
242247
}
243248
}
244249

250+
public async ValueTask EnqueueAsync(
251+
IRequestMessage message1,
252+
IResponseHandler handler1,
253+
IRequestMessage message2,
254+
IResponseHandler handler2)
255+
{
256+
await _sendLock.WaitAsync().ConfigureAwait(false);
257+
258+
try
259+
{
260+
_messages.Enqueue(message1);
261+
_messages.Enqueue(message2);
262+
_responsePipeline.Enqueue(handler1);
263+
_responsePipeline.Enqueue(handler2);
264+
}
265+
finally
266+
{
267+
_sendLock.Release();
268+
}
269+
}
270+
245271
public Task ResetAsync()
246272
{
247273
return BoltProtocol.ResetAsync(this);
@@ -251,7 +277,6 @@ public Task ResetAsync()
251277
public IServerInfo Server => _serverInfo;
252278

253279
public bool UtcEncodedDateTime { get; private set; }
254-
public DriverContext Context { get; }
255280
public IAuthToken AuthToken { get; private set; }
256281
public bool TelemetryEnabled { get; set; }
257282

@@ -263,7 +288,7 @@ public void UpdateId(string newConnId)
263288
newConnId);
264289

265290
_id = newConnId;
266-
291+
267292
if (_logger is PrefixLogger logger)
268293
{
269294
logger.Prefix = FormatPrefix(_id);
@@ -417,7 +442,10 @@ public Task BeginTransactionAsync(BeginTransactionParams beginParams)
417442
return BoltProtocol.BeginTransactionAsync(this, beginParams);
418443
}
419444

420-
public Task<IResultCursor> RunInExplicitTransactionAsync(Query query, bool reactive, long fetchSize,
445+
public Task<IResultCursor> RunInExplicitTransactionAsync(
446+
Query query,
447+
bool reactive,
448+
long fetchSize,
421449
IInternalAsyncTransaction transaction)
422450
{
423451
return BoltProtocol.RunInExplicitTransactionAsync(this, query, reactive, fetchSize, transaction);

0 commit comments

Comments
 (0)