22using System . Collections . Generic ;
33using System . Data ;
44using System . Linq ;
5- using System . Transactions ;
5+
66using Inforigami . Regalo . Core ;
77using Inforigami . Regalo . EventSourcing ;
88using Inforigami . Regalo . Interfaces ;
9- using Microsoft . Data . SqlClient ;
9+
1010using Newtonsoft . Json ;
11- using IsolationLevel = System . Transactions . IsolationLevel ;
1211
1312namespace Inforigami . Regalo . SqlServer
1413{
1514 public class SqlServerEventStore : IEventStore , IDisposable
1615 {
17- private readonly string _connectionString ;
16+ private readonly Func < ISqlSession > _sqlSessionFactory ;
1817 private readonly ILogger _logger ;
1918
2019 public SqlServerEventStore ( string connectionString , ILogger logger )
20+ : this ( ( ) => new TransientSqlSession ( connectionString ) , logger )
2121 {
22- if ( connectionString == null ) throw new ArgumentNullException ( "connectionString" ) ;
23- if ( logger == null ) throw new ArgumentNullException ( nameof ( logger ) ) ;
22+ }
2423
25- _connectionString = connectionString ;
26- _logger = logger ;
24+ public SqlServerEventStore ( Func < ISqlSession > sqlSessionFactory , ILogger logger )
25+ {
26+ _sqlSessionFactory = sqlSessionFactory ?? throw new ArgumentNullException ( "sqlSessionFactory" ) ;
27+ _logger = logger ?? throw new ArgumentNullException ( "logger" ) ;
2728 }
2829
2930 public void Save < T > ( string eventStreamId , int expectedVersion , IEnumerable < IEvent > newEvents )
3031 {
3132 if ( newEvents == null ) throw new ArgumentNullException ( "newEvents" ) ;
32-
33- using ( var transaction = GetTransaction ( ) )
34- using ( var connection = GetConnection ( ) )
35- {
36- connection . Open ( ) ;
3733
34+ using ( var session = _sqlSessionFactory . Invoke ( ) )
35+ {
3836 if ( expectedVersion == EntityVersion . New )
3937 {
40- InsertEventStreamRow ( eventStreamId , newEvents , connection ) ;
38+ InsertEventStreamRow ( eventStreamId , newEvents , session ) ;
4139 }
4240 else
4341 {
44- UpdateEventStreamRow ( eventStreamId , expectedVersion , newEvents , connection ) ;
42+ UpdateEventStreamRow ( eventStreamId , expectedVersion , newEvents , session ) ;
4543 }
4644
47- InsertEvents ( eventStreamId , newEvents , connection ) ;
45+ InsertEvents ( eventStreamId , newEvents , session ) ;
4846
49- transaction . Complete ( ) ;
47+ session . Complete ( ) ;
5048 }
5149 }
5250
@@ -66,27 +64,25 @@ public EventStream<T> Load<T>(string eventStreamId, int version)
6664
6765 _logger . Debug ( this , "Loading " + typeof ( T ) + " version " + EntityVersion . GetName ( version ) + " from stream " + eventStreamId ) ;
6866
69- using ( var transaction = GetTransaction ( ) )
70- using ( var connection = GetConnection ( ) )
67+ using ( var session = _sqlSessionFactory . Invoke ( ) )
7168 {
72- connection . Open ( ) ;
73-
74- var command = connection . CreateCommand ( ) ;
69+ var command = session . CreateCommand ( ) ;
7570 command . CommandType = CommandType . Text ;
7671 command . CommandText = @"select * from EventStreamEvent where EventStreamId = @eventStreamId and Version <= @Version order by Version;" ;
7772
78- var eventStreamIdParameter = command . Parameters . Add ( "@EventStreamId" , SqlDbType . NVarChar , 1024 ) ;
79- var versionParameter = command . Parameters . Add ( "@Version" , SqlDbType . Int ) ;
73+ var eventStreamIdParameter = command . AddParameter ( "@EventStreamId" , DbType . String , 1024 ) ;
74+ var versionParameter = command . AddParameter ( "@Version" , DbType . Int32 ) ;
8075
8176 eventStreamIdParameter . Value = eventStreamId ;
8277 versionParameter . Value = version == EntityVersion . Latest ? int . MaxValue : version ;
8378
84- var reader = command . ExecuteReader ( CommandBehavior . SequentialAccess ) ;
85-
8679 var events = new List < IEvent > ( ) ;
87- while ( reader . Read ( ) )
80+ using ( var reader = command . ExecuteReader ( CommandBehavior . SequentialAccess ) )
8881 {
89- events . Add ( ( IEvent ) JsonConvert . DeserializeObject ( reader . GetString ( 2 ) , GetJsonSerialisationSettings ( ) ) ) ;
82+ while ( reader . Read ( ) )
83+ {
84+ events . Add ( ( IEvent ) JsonConvert . DeserializeObject ( reader . GetString ( 2 ) , GetJsonSerialisationSettings ( ) ) ) ;
85+ }
9086 }
9187
9288 if ( events . Count == 0 )
@@ -104,7 +100,7 @@ public EventStream<T> Load<T>(string eventStreamId, int version)
104100 throw exception ;
105101 }
106102
107- transaction . Complete ( ) ;
103+ session . Complete ( ) ;
108104
109105 return result ;
110106 }
@@ -117,15 +113,14 @@ public void Delete(string eventStreamId, int version)
117113
118114 public void Delete < T > ( string eventStreamId , int version )
119115 {
120- using ( var transaction = GetTransaction ( ) )
121- using ( var connection = GetConnection ( ) )
116+ using ( var session = _sqlSessionFactory . Invoke ( ) )
122117 {
123- connection . Open ( ) ;
118+ session . Connection . Open ( ) ;
124119
125- DeleteEvents ( eventStreamId , connection ) ;
126- DeleteEventStreamRow ( eventStreamId , version , connection ) ;
120+ DeleteEvents ( eventStreamId , session ) ;
121+ DeleteEventStreamRow ( eventStreamId , version , session ) ;
127122
128- transaction . Complete ( ) ;
123+ session . Complete ( ) ;
129124 }
130125 }
131126
@@ -138,36 +133,31 @@ public void Dispose()
138133 {
139134 }
140135
141- private static TransactionScope GetTransaction ( )
142- {
143- return new TransactionScope ( TransactionScopeOption . Required , new TransactionOptions { IsolationLevel = IsolationLevel . ReadCommitted } ) ;
144- }
145-
146- private void DeleteEvents ( string eventStreamId , SqlConnection connection )
136+ private void DeleteEvents ( string eventStreamId , ISqlSession session )
147137 {
148- var eventCommand = connection . CreateCommand ( ) ;
138+ var eventCommand = session . CreateCommand ( ) ;
149139
150140 eventCommand . CommandType = CommandType . Text ;
151141 eventCommand . CommandText = @"delete from EventStreamEvent where EventStreamId = @EventStreamId;" ;
152142
153- var eventStreamIdParameter = eventCommand . Parameters . Add ( "@EventStreamId" , SqlDbType . NVarChar , 1024 ) ;
143+ var eventStreamIdParameter = eventCommand . AddParameter ( "@EventStreamId" , DbType . String , 1024 ) ;
154144 eventStreamIdParameter . Value = eventStreamId ;
155145
156146 eventCommand . ExecuteNonQuery ( ) ;
157147 }
158148
159- private void DeleteEventStreamRow ( string eventStreamId , int version , SqlConnection connection )
149+ private void DeleteEventStreamRow ( string eventStreamId , int version , ISqlSession session )
160150 {
161- var eventCommand = connection . CreateCommand ( ) ;
151+ var eventCommand = session . CreateCommand ( ) ;
162152
163153 eventCommand . CommandType = CommandType . Text ;
164154 eventCommand . CommandText = @"delete from EventStream where Id = @EventStreamId and [Version] = @Version;" ;
165155
166- var eventStreamIdParameter = eventCommand . Parameters . Add ( "@EventStreamId" , SqlDbType . NVarChar , 1024 ) ;
167- var versionParameter = eventCommand . Parameters . Add ( "@Version" , SqlDbType . Int ) ;
156+ var eventStreamIdParameter = eventCommand . AddParameter ( "@EventStreamId" , DbType . String , 1024 ) ;
157+ var versionParameter = eventCommand . AddParameter ( "@Version" , DbType . Int32 ) ;
168158
169159 eventStreamIdParameter . Value = eventStreamId ;
170- versionParameter . Value = version ;
160+ versionParameter . Value = version ;
171161
172162 var rowsDeleted = eventCommand . ExecuteNonQuery ( ) ;
173163
@@ -180,16 +170,16 @@ private void DeleteEventStreamRow(string eventStreamId, int version, SqlConnecti
180170 }
181171 }
182172
183- private void InsertEvents ( string eventStreamId , IEnumerable < IEvent > newEvents , SqlConnection connection )
173+ private void InsertEvents ( string eventStreamId , IEnumerable < IEvent > newEvents , ISqlSession session )
184174 {
185- var eventCommand = connection . CreateCommand ( ) ;
175+ var eventCommand = session . CreateCommand ( ) ;
186176
187177 eventCommand . CommandType = CommandType . Text ;
188178 eventCommand . CommandText = @"insert into EventStreamEvent (EventStreamId, [Version], Data) values (@EventStreamId, @Version, @Data);" ;
189179
190- var eventStreamIdParameter = eventCommand . Parameters . Add ( "@EventStreamId" , SqlDbType . NVarChar , 1024 ) ;
191- var versionParameter = eventCommand . Parameters . Add ( "@Version" , SqlDbType . Int ) ;
192- var dataParameter = eventCommand . Parameters . Add ( "@Data" , SqlDbType . NVarChar , - 1 ) ;
180+ var eventStreamIdParameter = eventCommand . AddParameter ( "@EventStreamId" , DbType . String , 1024 ) ;
181+ var versionParameter = eventCommand . AddParameter ( "@Version" , DbType . Int32 ) ;
182+ var dataParameter = eventCommand . AddParameter ( "@Data" , DbType . String , - 1 ) ;
193183
194184 eventCommand . Prepare ( ) ;
195185
@@ -203,16 +193,16 @@ private void InsertEvents(string eventStreamId, IEnumerable<IEvent> newEvents, S
203193 }
204194 }
205195
206- private static void UpdateEventStreamRow ( string eventStreamId , int expectedVersion , IEnumerable < IEvent > newEvents , SqlConnection connection )
196+ private static void UpdateEventStreamRow ( string eventStreamId , int expectedVersion , IEnumerable < IEvent > newEvents , ISqlSession session )
207197 {
208- var eventStreamCommand = connection . CreateCommand ( ) ;
198+ var eventStreamCommand = session . CreateCommand ( ) ;
209199
210200 eventStreamCommand . CommandType = CommandType . Text ;
211201 eventStreamCommand . CommandText = @"update EventStream set Version = @Version where Id = @Id and Version = @ExpectedVersion;" ;
212202
213- eventStreamCommand . Parameters . AddWithValue ( "@Id" , eventStreamId ) ;
214- eventStreamCommand . Parameters . AddWithValue ( "@Version" , newEvents . Last ( ) . Version ) ;
215- eventStreamCommand . Parameters . AddWithValue ( "@ExpectedVersion" , expectedVersion ) ;
203+ eventStreamCommand . AddParameterWithValue ( "@Id" , eventStreamId ) ;
204+ eventStreamCommand . AddParameterWithValue ( "@Version" , newEvents . Last ( ) . Version ) ;
205+ eventStreamCommand . AddParameterWithValue ( "@ExpectedVersion" , expectedVersion ) ;
216206
217207 int rowsUpdated = eventStreamCommand . ExecuteNonQuery ( ) ;
218208
@@ -222,29 +212,24 @@ private static void UpdateEventStreamRow(string eventStreamId, int expectedVersi
222212 }
223213 }
224214
225- private static void InsertEventStreamRow ( string eventStreamId , IEnumerable < IEvent > newEvents , SqlConnection connection )
215+ private static void InsertEventStreamRow ( string eventStreamId , IEnumerable < IEvent > newEvents , ISqlSession session )
226216 {
227217 if ( newEvents == null || ! newEvents . Any ( ) )
228218 {
229219 return ;
230220 }
231221
232- var eventStreamCommand = connection . CreateCommand ( ) ;
222+ var eventStreamCommand = session . CreateCommand ( ) ;
233223
234224 eventStreamCommand . CommandType = CommandType . Text ;
235225 eventStreamCommand . CommandText = @"insert into EventStream (Id, [Version]) values (@Id, @Version);" ;
236226
237- eventStreamCommand . Parameters . AddWithValue ( "@Id" , eventStreamId ) ;
238- eventStreamCommand . Parameters . AddWithValue ( "@Version" , newEvents . Last ( ) . Version ) ;
227+ eventStreamCommand . AddParameterWithValue ( "@Id" , eventStreamId ) ;
228+ eventStreamCommand . AddParameterWithValue ( "@Version" , newEvents . Last ( ) . Version ) ;
239229
240230 eventStreamCommand . ExecuteNonQuery ( ) ;
241231 }
242232
243- private SqlConnection GetConnection ( )
244- {
245- return new SqlConnection ( _connectionString ) ;
246- }
247-
248233 private string GetJson ( IEvent evt )
249234 {
250235 var json = JsonConvert . SerializeObject ( evt , GetJsonSerialisationSettings ( ) ) ;
@@ -256,4 +241,4 @@ private JsonSerializerSettings GetJsonSerialisationSettings()
256241 return new JsonSerializerSettings { Formatting = Formatting . Indented , TypeNameHandling = TypeNameHandling . All } ;
257242 }
258243 }
259- }
244+ }
0 commit comments