Skip to content

Commit d70bbd6

Browse files
authored
Allows for providing credentials to ESDB (#141)
- Callers to `EventStoreConnectionManager` and `EventStoreConnectionWrapper` can provide credentials for the ESDB connection. - Adds missing XML documentation to EventStoreConnectionWrapper - Adds missing XML documentation to EventStoreConnectionManager
1 parent 0bf8022 commit d70bbd6

File tree

4 files changed

+65
-32
lines changed

4 files changed

+65
-32
lines changed

src/ReactiveDomain.Persistence/EventStore/EventStoreConnectionManager.cs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,23 @@ public class EventStoreConnectionManager
1414
private readonly ILogger _log = LogManager.GetLogger(nameof(EventStoreConnectionManager));
1515
private static readonly JsonSerializerSettings SerializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.None };
1616

17+
/// <summary>
18+
/// Gets the connection to ESDB as an <see cref="IStreamStoreConnection"/>.
19+
/// </summary>
1720
public IStreamStoreConnection Connection => ESConnection;
18-
public EventStoreConnectionWrapper ESConnection { get; private set; }
21+
/// <summary>
22+
/// Gets the connection to ESDB.
23+
/// </summary>
24+
public EventStoreConnectionWrapper ESConnection { get; }
1925

2026
/// <summary>
2127
/// Setup EventStore based on the config settings provided by the caller
2228
/// </summary>
2329
/// <param name="config"><see cref="EsdbConfig"/> defined by the caller.</param>
24-
public EventStoreConnectionManager(EsdbConfig config)
30+
/// <param name="credentials">User credentials to use for the connection.</param>
31+
public EventStoreConnectionManager(EsdbConfig config, UserCredentials credentials = null)
2532
{
26-
ESConnection = new EventStoreConnectionWrapper(EventStoreConnection.Create(config.ConnectionString));
33+
ESConnection = new EventStoreConnectionWrapper(EventStoreConnection.Create(config.ConnectionString), credentials);
2734

2835
//TODO: The connection settings to keep retrying in the EventStore code circumvents this loop of 8 tries never returning from the Connect call.
2936
Connection.Connect();

src/ReactiveDomain.Persistence/EventStore/EventStoreConnectionWrapper.cs

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,37 @@
77

88
namespace ReactiveDomain.EventStore
99
{
10+
/// <summary>
11+
/// A wrapper for EventStore Database (ESDB) connections.
12+
/// </summary>
1013
public class EventStoreConnectionWrapper : IStreamStoreConnection
1114
{
15+
/// <summary>
16+
/// The connection to the ESDB instance.
17+
/// </summary>
1218
public readonly ES.IEventStoreConnection EsConnection;
19+
private readonly UserCredentials _credentials;
1320
private bool _disposed;
1421
private const int WriteBatchSize = 500;
1522

16-
public EventStoreConnectionWrapper(ES.IEventStoreConnection eventStoreConnection)
23+
/// <summary>
24+
/// Creates a wrapper around an ESDB connection.
25+
/// </summary>
26+
/// <param name="eventStoreConnection">A connection to an EventStoreDB instance.</param>
27+
/// <param name="credentials">The optional credentials to use when connecting.</param>
28+
public EventStoreConnectionWrapper(ES.IEventStoreConnection eventStoreConnection, UserCredentials credentials = null)
1729
{
1830
Ensure.NotNull(eventStoreConnection, nameof(eventStoreConnection));
1931
EsConnection = eventStoreConnection;
32+
_credentials = credentials;
2033
}
2134

35+
/// <inheritdoc cref="IStreamStoreConnection"/>
2236
public string ConnectionName => EsConnection.ConnectionName;
2337
private bool _connected;
38+
39+
/// <inheritdoc cref="IStreamStoreConnection"/>
40+
/// <exception cref="CannotEstablishConnectionException">Thrown if a connection cannot be established to the ESDB.</exception>
2441
public void Connect()
2542
{
2643
if(_connected){ return;}
@@ -36,12 +53,13 @@ public void Connect()
3653
}
3754
}
3855

56+
/// <inheritdoc cref="IStreamStoreConnection"/>
3957
public void Close()
4058
{
4159
EsConnection.Close();
4260
}
4361

44-
62+
/// <inheritdoc cref="IStreamStoreConnection"/>
4563
public WriteResult AppendToStream(
4664
string stream,
4765
long expectedVersion,
@@ -52,7 +70,7 @@ public WriteResult AppendToStream(
5270
{
5371
if (events.Length < WriteBatchSize)
5472
{
55-
return EsConnection.AppendToStreamAsync(stream, expectedVersion, events.ToESEventData(), credentials.ToESCredentials()).Result.ToWriteResult();
73+
return EsConnection.AppendToStreamAsync(stream, expectedVersion, events.ToESEventData(), (credentials ?? _credentials)?.ToESCredentials()).Result.ToWriteResult();
5674
}
5775

5876
var transaction = EsConnection.StartTransactionAsync(stream, expectedVersion).Result;
@@ -77,13 +95,14 @@ public WriteResult AppendToStream(
7795

7896
}
7997

98+
/// <inheritdoc cref="IStreamStoreConnection"/>
8099
public StreamEventsSlice ReadStreamForward(
81100
string stream,
82101
long start,
83102
long count,
84103
UserCredentials credentials = null)
85104
{
86-
var slice = EsConnection.ReadStreamEventsForwardAsync(stream, start, (int)count, true, credentials.ToESCredentials()).Result;
105+
var slice = EsConnection.ReadStreamEventsForwardAsync(stream, start, (int)count, true, (credentials ?? _credentials)?.ToESCredentials()).Result;
87106
switch (slice.Status)
88107
{
89108
case ES.SliceReadStatus.Success:
@@ -97,13 +116,14 @@ public StreamEventsSlice ReadStreamForward(
97116
}
98117
}
99118

119+
/// <inheritdoc cref="IStreamStoreConnection"/>
100120
public StreamEventsSlice ReadStreamBackward(
101121
string stream,
102122
long start,
103123
long count,
104124
UserCredentials credentials = null)
105125
{
106-
var slice = EsConnection.ReadStreamEventsBackwardAsync(stream, start, (int)count, true, credentials.ToESCredentials()).Result;
126+
var slice = EsConnection.ReadStreamEventsBackwardAsync(stream, start, (int)count, true, (credentials ?? _credentials)?.ToESCredentials()).Result;
107127
switch (slice.Status)
108128
{
109129
case ES.SliceReadStatus.Success:
@@ -117,18 +137,19 @@ public StreamEventsSlice ReadStreamBackward(
117137
}
118138
}
119139

140+
/// <inheritdoc cref="IStreamStoreConnection"/>
120141
public IDisposable SubscribeToStream(
121142
string stream,
122143
Action<RecordedEvent> eventAppeared,
123144
Action<SubscriptionDropReason, Exception> subscriptionDropped = null,
124-
UserCredentials userCredentials = null)
145+
UserCredentials credentials = null)
125146
{
126147
var sub = EsConnection.SubscribeToStreamAsync(
127148
stream,
128149
true,
129150
async (_, evt) => { eventAppeared(evt.Event.ToRecordedEvent(evt.OriginalEvent.EventNumber)); await Task.FromResult(Unit.Default); },
130151
(_, reason, ex) => subscriptionDropped?.Invoke((SubscriptionDropReason)(int)reason, ex),
131-
userCredentials?.ToESCredentials()).Result;
152+
(credentials ?? _credentials)?.ToESCredentials()).Result;
132153
return new Disposer(() =>
133154
{
134155
sub?.Unsubscribe();
@@ -137,14 +158,15 @@ public IDisposable SubscribeToStream(
137158
});
138159
}
139160

161+
/// <inheritdoc cref="IStreamStoreConnection"/>
140162
public IDisposable SubscribeToStreamFrom(
141163
string stream,
142164
long? lastCheckpoint,
143165
CatchUpSubscriptionSettings settings,
144166
Action<RecordedEvent> eventAppeared,
145167
Action<Unit> liveProcessingStarted = null,
146168
Action<SubscriptionDropReason, Exception> subscriptionDropped = null,
147-
UserCredentials userCredentials = null)
169+
UserCredentials credentials = null)
148170
{
149171
var sub = EsConnection.SubscribeToStreamFrom(
150172
stream,
@@ -153,7 +175,7 @@ public IDisposable SubscribeToStreamFrom(
153175
async (_, evt) => { eventAppeared(evt.Event.ToRecordedEvent(evt.OriginalEvent.EventNumber)); await Task.FromResult(Unit.Default); },
154176
_ => liveProcessingStarted?.Invoke(Unit.Default),
155177
(_, reason, ex) => subscriptionDropped?.Invoke((SubscriptionDropReason)(int)reason, ex),
156-
userCredentials?.ToESCredentials());
178+
(credentials ?? _credentials)?.ToESCredentials());
157179

158180
return new Disposer(() =>
159181
{
@@ -162,10 +184,11 @@ public IDisposable SubscribeToStreamFrom(
162184
});
163185
}
164186

187+
/// <inheritdoc cref="IStreamStoreConnection"/>
165188
public IDisposable SubscribeToAll(
166189
Action<RecordedEvent> eventAppeared,
167190
Action<SubscriptionDropReason, Exception> subscriptionDropped = null,
168-
UserCredentials userCredentials = null,
191+
UserCredentials credentials = null,
169192
bool resolveLinkTos = true)
170193
{
171194
var sub = EsConnection.SubscribeToAllAsync(
@@ -176,7 +199,7 @@ public IDisposable SubscribeToAll(
176199
await Task.FromResult(Unit.Default);
177200
},
178201
(_, reason, ex) => subscriptionDropped?.Invoke((SubscriptionDropReason)(int)reason, ex),
179-
userCredentials?.ToESCredentials()).Result;
202+
(credentials ?? _credentials)?.ToESCredentials()).Result;
180203
return new Disposer(() =>
181204
{
182205
sub?.Unsubscribe();
@@ -185,13 +208,14 @@ public IDisposable SubscribeToAll(
185208
});
186209
}
187210

211+
/// <inheritdoc cref="IStreamStoreConnection"/>
188212
public IDisposable SubscribeToAllFrom(
189213
Position from,
190214
Action<RecordedEvent> eventAppeared,
191215
CatchUpSubscriptionSettings settings = null,
192216
Action liveProcessingStarted = null,
193217
Action<SubscriptionDropReason, Exception> subscriptionDropped = null,
194-
UserCredentials userCredentials = null,
218+
UserCredentials credentials = null,
195219
bool resolveLinkTos = true)
196220
{
197221
var sub = EsConnection.SubscribeToAllFrom(
@@ -204,7 +228,7 @@ public IDisposable SubscribeToAllFrom(
204228
},
205229
__ => { liveProcessingStarted?.Invoke(); },
206230
(_, reason, ex) => subscriptionDropped?.Invoke((SubscriptionDropReason)(int)reason, ex),
207-
userCredentials?.ToESCredentials());
231+
(credentials ?? _credentials)?.ToESCredentials());
208232
return new Disposer(() =>
209233
{
210234
sub.Stop(TimeSpan.FromMilliseconds(250));
@@ -213,11 +237,13 @@ public IDisposable SubscribeToAllFrom(
213237
}
214238

215239

240+
/// <inheritdoc cref="IStreamStoreConnection"/>
216241
public void DeleteStream(string stream, long expectedVersion, UserCredentials credentials = null)
217-
=> EsConnection.DeleteStreamAsync(stream, expectedVersion, credentials.ToESCredentials()).Wait();
242+
=> EsConnection.DeleteStreamAsync(stream, expectedVersion, (credentials ?? _credentials).ToESCredentials()).Wait();
218243

244+
/// <inheritdoc cref="IStreamStoreConnection"/>
219245
public void HardDeleteStream(string stream, long expectedVersion, UserCredentials credentials = null)
220-
=> EsConnection.DeleteStreamAsync(stream, expectedVersion, true, credentials.ToESCredentials()).Wait();
246+
=> EsConnection.DeleteStreamAsync(stream, expectedVersion, true, (credentials ?? _credentials).ToESCredentials()).Wait();
221247

222248
public void Dispose()
223249
{
@@ -320,7 +346,7 @@ public static ReadDirection ToReadDirection(this ES.ReadDirection readDirection)
320346
case ES.ReadDirection.Backward:
321347
return ReadDirection.Backward;
322348
default:
323-
throw new ArgumentOutOfRangeException(nameof(readDirection), "Unknown ReadDirection returned from Eventstore");
349+
throw new ArgumentOutOfRangeException(nameof(readDirection), "Unknown ReadDirection returned from EventStore");
324350
}
325351
}
326352

src/ReactiveDomain.Persistence/IStreamStoreConnection.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,13 @@ public interface IStreamStoreConnection : IDisposable
6868
/// <param name="stream">The stream to subscribe to.</param>
6969
/// <param name="eventAppeared">A Task invoked and awaited when a new event is received over the subscription.</param>
7070
/// <param name="subscriptionDropped">An action invoked if the subscription is dropped.</param>
71-
/// <param name="userCredentials">User credentials to use for the operation.</param>
71+
/// <param name="credentials">User credentials to use for the operation.</param>
7272
/// <returns>An IDisposable that can be used to dispose the subscription.</returns>
7373
IDisposable SubscribeToStream(
7474
string stream,
7575
Action<RecordedEvent> eventAppeared,
7676
Action<SubscriptionDropReason, Exception> subscriptionDropped = null,
77-
UserCredentials userCredentials = null);
77+
UserCredentials credentials = null);
7878

7979
/// <summary>
8080
/// Subscribes to a single event stream. Existing events from
@@ -103,7 +103,7 @@ IDisposable SubscribeToStream(
103103
/// <param name="eventAppeared">A Task invoked and awaited when a new event is received over the subscription.</param>
104104
/// <param name="liveProcessingStarted">An action invoked when the subscription switches to newly-pushed events.</param>
105105
/// <param name="subscriptionDropped">An action invoked if the subscription is dropped.</param>
106-
/// <param name="userCredentials">User credentials to use for the operation.</param>
106+
/// <param name="credentials">User credentials to use for the operation.</param>
107107
/// <param name="settings">The <see cref="T:ReactiveDomain.CatchUpSubscriptionSettings" /> for the subscription.</param>
108108
/// <returns>An IDisposable that can be used to close the subscription.</returns>
109109
IDisposable SubscribeToStreamFrom(
@@ -113,7 +113,7 @@ IDisposable SubscribeToStreamFrom(
113113
Action<RecordedEvent> eventAppeared,
114114
Action<Unit> liveProcessingStarted = null,
115115
Action<SubscriptionDropReason, Exception> subscriptionDropped = null,
116-
UserCredentials userCredentials = null);
116+
UserCredentials credentials = null);
117117

118118
/// <summary>
119119
/// Asynchronously subscribes to all events. New
@@ -122,13 +122,13 @@ IDisposable SubscribeToStreamFrom(
122122
/// </summary>
123123
/// <param name="eventAppeared">A Task invoked and awaited when a new event is received over the subscription.</param>
124124
/// <param name="subscriptionDropped">An action invoked if the subscription is dropped.</param>
125-
/// <param name="userCredentials">User credentials to use for the operation.</param>
125+
/// <param name="credentials">User credentials to use for the operation.</param>
126126
/// <param name="resolveLinkTos">If true, resolve link events.</param>
127127
/// <returns>An IDisposable that can be used to close the subscription.</returns>
128128
IDisposable SubscribeToAll(
129129
Action<RecordedEvent> eventAppeared,
130130
Action<SubscriptionDropReason, Exception> subscriptionDropped = null,
131-
UserCredentials userCredentials = null,
131+
UserCredentials credentials = null,
132132
bool resolveLinkTos = true);
133133

134134
IDisposable SubscribeToAllFrom(
@@ -137,7 +137,7 @@ IDisposable SubscribeToAllFrom(
137137
CatchUpSubscriptionSettings settings = null,
138138
Action liveProcessingStarted = null,
139139
Action<SubscriptionDropReason, Exception> subscriptionDropped = null,
140-
UserCredentials userCredentials = null,
140+
UserCredentials credentials = null,
141141
bool resolveLinkTos = true);
142142

143143
void DeleteStream(string stream, long expectedVersion, UserCredentials credentials = null);

src/ReactiveDomain.Testing/EventStore/MockStreamStoreConnection.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ public IDisposable SubscribeToStream(
324324
string stream,
325325
Action<RecordedEvent> eventAppeared,
326326
Action<SubscriptionDropReason, Exception> subscriptionDropped = null,
327-
UserCredentials userCredentials = null)
327+
UserCredentials credentials = null)
328328
{
329329
long currentPos = 0;
330330
lock (_store) {
@@ -340,7 +340,7 @@ public IDisposable SubscribeToStream(
340340
eventAppeared,
341341
null, //live processing started
342342
subscriptionDropped,
343-
userCredentials);
343+
credentials);
344344
}
345345
public IDisposable SubscribeToStreamFrom(
346346
string stream,
@@ -349,7 +349,7 @@ public IDisposable SubscribeToStreamFrom(
349349
Action<RecordedEvent> eventAppeared,
350350
Action<Unit> liveProcessingStarted = null,
351351
Action<SubscriptionDropReason, Exception> subscriptionDropped = null,
352-
UserCredentials userCredentials = null)
352+
UserCredentials credentials = null)
353353
{
354354

355355
var start = (lastCheckpoint ?? -1) + 1;
@@ -386,7 +386,7 @@ public IDisposable SubscribeToStreamFrom(
386386

387387
public IDisposable SubscribeToAllFrom(Position @from, Action<RecordedEvent> eventAppeared, CatchUpSubscriptionSettings settings = null,
388388
Action liveProcessingStarted = null, Action<SubscriptionDropReason, Exception> subscriptionDropped = null,
389-
UserCredentials userCredentials = null, bool resolveLinkTos = true)
389+
UserCredentials credentials = null, bool resolveLinkTos = true)
390390
{
391391
var current = (int)@from.CommitPosition;
392392
var currentEvents = new RecordedEvent[] { };
@@ -417,7 +417,7 @@ public IDisposable SubscribeToAllFrom(Position @from, Action<RecordedEvent> even
417417
public IDisposable SubscribeToAll(
418418
Action<RecordedEvent> eventAppeared,
419419
Action<SubscriptionDropReason, Exception> subscriptionDropped = null,
420-
UserCredentials userCredentials = null,
420+
UserCredentials credentials = null,
421421
bool resolveLinkTos = true)
422422
{
423423
return SubscribeToAllFrom(
@@ -426,7 +426,7 @@ public IDisposable SubscribeToAll(
426426
null,
427427
null,
428428
subscriptionDropped,
429-
userCredentials);
429+
credentials);
430430
}
431431

432432
public void DeleteStream(string stream, long expectedVersion, UserCredentials credentials = null)

0 commit comments

Comments
 (0)