Skip to content

Commit e719815

Browse files
committed
Extracted SqlBulkBatchWriter from MSSqlServerSink.
1 parent 253eecb commit e719815

File tree

8 files changed

+135
-74
lines changed

8 files changed

+135
-74
lines changed

src/Serilog.Sinks.MSSqlServer/GlobalSuppressions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
[assembly: SuppressMessage("Globalization", "CA1303:Do not pass literals as localized parameters", Justification = "Supplying string literals and not using resources is accepted within this project.", Scope = "namespaceanddescendants", Target = "Serilog.Sinks.MSSqlServer")]
99
[assembly: SuppressMessage("Security", "CA2100:Review SQL queries for security vulnerabilities", Justification = "Too hard to change. Accepted for now.", Scope = "namespaceanddescendants", Target = "Serilog.Sinks.MSSqlServer")]
1010
[assembly: SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Too hard to change. Accepted for now.", Scope = "member", Target = "~M:Serilog.Sinks.MSSqlServer.Platform.SqlTableCreator.CreateTable(System.String,System.String,System.Data.DataTable,Serilog.Sinks.MSSqlServer.ColumnOptions)")]
11-
[assembly: SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Too hard to change. Accepted for now.", Scope = "member", Target = "~M:Serilog.Sinks.MSSqlServer.MSSqlServerSink.EmitBatchAsync(System.Collections.Generic.IEnumerable{Serilog.Events.LogEvent})~System.Threading.Tasks.Task")]
11+
[assembly: SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Too hard to change. Accepted for now.", Scope = "member", Target = "~M:Serilog.Sinks.MSSqlServer.Sinks.MSSqlServer.Platform.SqlBulkBatchWriter.WriteBatch(System.Collections.Generic.IEnumerable{Serilog.Events.LogEvent},System.Data.DataTable)~System.Threading.Tasks.Task")]
1212
[assembly: SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Too hard to change. Accepted for now.", Scope = "member", Target = "~M:Serilog.Sinks.MSSqlServer.Output.StandardColumnDataGenerator.ConvertPropertiesToXmlStructure(System.Collections.Generic.IEnumerable{System.Collections.Generic.KeyValuePair{System.String,Serilog.Events.LogEventPropertyValue}})~System.String")]
1313
[assembly: SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Too hard to change. Accepted for now.", Scope = "member", Target = "~M:Serilog.Sinks.MSSqlServer.Output.PropertiesColumnDataGenerator.TryChangeType(System.Object,System.Type,System.Object@)~System.Boolean")]
1414
[assembly: SuppressMessage("Design", "CA1062:Validate arguments of public methods", Justification = "Serilog core guarantees to call Emit() with non-null logEvent parameter.", Scope = "member", Target = "~M:Serilog.Sinks.MSSqlServer.MSSqlServerAuditSink.Emit(Serilog.Events.LogEvent)")]

src/Serilog.Sinks.MSSqlServer/Sinks/MSSqlServer/Dependencies/SinkDependencies.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,6 @@ internal class SinkDependencies
1010
public ISqlConnectionFactory SqlConnectionFactory { get; set; }
1111
public ISqlTableCreator SqlTableCreator { get; set; }
1212
public ILogEventDataGenerator LogEventDataGenerator { get; set; }
13+
public ISqlBulkBatchWriter SqlBulkBatchWriter { get; set; }
1314
}
1415
}

src/Serilog.Sinks.MSSqlServer/Sinks/MSSqlServer/Dependencies/SinkDependenciesFactory.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ internal static SinkDependencies Create(
1616
ColumnOptions columnOptions,
1717
ITextFormatter logEventFormatter)
1818
{
19+
columnOptions = columnOptions ?? new ColumnOptions();
20+
columnOptions.FinalizeConfigurationForSinkConstructor();
21+
1922
var sinkDependencies = new SinkDependencies
2023
{
2124
DataTableCreator = new DataTableCreator(),
@@ -31,6 +34,8 @@ internal static SinkDependencies Create(
3134
};
3235
sinkDependencies.SqlTableCreator = new SqlTableCreator(
3336
new SqlCreateTableWriter(), sinkDependencies.SqlConnectionFactory);
37+
sinkDependencies.SqlBulkBatchWriter = new SqlBulkBatchWriter(sinkOptions.TableName, sinkOptions.SchemaName,
38+
columnOptions.DisableTriggers, sinkDependencies.SqlConnectionFactory, sinkDependencies.LogEventDataGenerator);
3439

3540
return sinkDependencies;
3641
}

src/Serilog.Sinks.MSSqlServer/Sinks/MSSqlServer/MSSqlServerAuditSink.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,16 @@ internal MSSqlServerAuditSink(
105105
{
106106
throw new ArgumentNullException(nameof(sinkDependencies));
107107
}
108-
_sqlConnectionFactory = sinkDependencies?.SqlConnectionFactory ?? throw new InvalidOperationException($"{nameof(SqlConnectionFactory)} is not initialized");
109-
_logEventDataGenerator = sinkDependencies?.LogEventDataGenerator ?? throw new InvalidOperationException($"{nameof(LogEventDataGenerator)} is not initialized");
108+
_sqlConnectionFactory = sinkDependencies?.SqlConnectionFactory ?? throw new InvalidOperationException($"SqlConnectionFactory is not initialized!");
109+
_logEventDataGenerator = sinkDependencies?.LogEventDataGenerator ?? throw new InvalidOperationException($"LogEventDataGenerator is not initialized!");
110110

111111
if (_sinkOptions.AutoCreateSqlTable)
112112
{
113+
if (sinkDependencies?.DataTableCreator == null)
114+
{
115+
throw new InvalidOperationException($"DataTableCreator is not initialized!");
116+
}
117+
113118
using (var eventTable = sinkDependencies.DataTableCreator.CreateDataTable(_sinkOptions.TableName, columnOptions))
114119
{
115120
sinkDependencies.SqlTableCreator.CreateTable(_sinkOptions.SchemaName, _sinkOptions.TableName, eventTable, columnOptions);
@@ -183,7 +188,7 @@ public void Emit(LogEvent logEvent)
183188
/// <param name="disposing">True to release both managed and unmanaged resources; false to release only unmanaged resources.</param>
184189
protected virtual void Dispose(bool disposing)
185190
{
186-
// This class needn't to be IDisposable anymore. This is just here for backwards compatibility.
191+
// This class needn't to dispose anything. This is just here for sink interface compatibility.
187192
}
188193

189194
/// <summary>

src/Serilog.Sinks.MSSqlServer/Sinks/MSSqlServer/MSSqlServerSink.cs

Lines changed: 13 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,9 @@
1515
using System;
1616
using System.Collections.Generic;
1717
using System.Data;
18-
using System.Data.SqlClient;
19-
using System.Globalization;
20-
using System.Linq;
2118
using System.Threading.Tasks;
22-
using Serilog.Debugging;
2319
using Serilog.Events;
2420
using Serilog.Formatting;
25-
using Serilog.Sinks.MSSqlServer.Output;
2621
using Serilog.Sinks.MSSqlServer.Sinks.MSSqlServer.Dependencies;
2722
using Serilog.Sinks.MSSqlServer.Sinks.MSSqlServer.Options;
2823
using Serilog.Sinks.MSSqlServer.Sinks.MSSqlServer.Platform;
@@ -37,8 +32,7 @@ public class MSSqlServerSink : PeriodicBatchingSink
3732
{
3833
private readonly SinkOptions _sinkOptions;
3934
private readonly ColumnOptions _columnOptions;
40-
private readonly ISqlConnectionFactory _sqlConnectionFactory;
41-
private readonly ILogEventDataGenerator _logEventDataGenerator;
35+
private readonly ISqlBulkBatchWriter _sqlBulkBatchWriter;
4236
private readonly DataTable _eventTable;
4337

4438
/// <summary>
@@ -127,13 +121,21 @@ internal MSSqlServerSink(
127121
{
128122
throw new ArgumentNullException(nameof(sinkDependencies));
129123
}
130-
_sqlConnectionFactory = sinkDependencies?.SqlConnectionFactory ?? throw new InvalidOperationException($"{nameof(SqlConnectionFactory)} is not initialized");
131-
_logEventDataGenerator = sinkDependencies?.LogEventDataGenerator ?? throw new InvalidOperationException($"{nameof(LogEventDataGenerator)} is not initialized");
132124

125+
_sqlBulkBatchWriter = sinkDependencies?.SqlBulkBatchWriter ?? throw new InvalidOperationException($"SqlBulkBatchWriter is not initialized!");
126+
127+
if (sinkDependencies?.DataTableCreator == null)
128+
{
129+
throw new InvalidOperationException($"DataTableCreator is not initialized!");
130+
}
133131
_eventTable = sinkDependencies.DataTableCreator.CreateDataTable(sinkOptions.TableName, columnOptions);
134132

135133
if (_sinkOptions.AutoCreateSqlTable)
136134
{
135+
if (sinkDependencies?.SqlBulkBatchWriter == null)
136+
{
137+
throw new InvalidOperationException($"SqlTableCreator is not initialized!");
138+
}
137139
sinkDependencies.SqlTableCreator.CreateTable(sinkOptions.SchemaName, sinkOptions.TableName, _eventTable, _columnOptions);
138140
}
139141
}
@@ -147,43 +149,8 @@ internal MSSqlServerSink(
147149
/// ,
148150
/// not both.
149151
/// </remarks>
150-
protected override async Task EmitBatchAsync(IEnumerable<LogEvent> events)
151-
{
152-
// Copy the events to the data table
153-
FillDataTable(events);
154-
155-
try
156-
{
157-
using (var cn = _sqlConnectionFactory.Create())
158-
{
159-
await cn.OpenAsync().ConfigureAwait(false);
160-
using (var copy = _columnOptions.DisableTriggers
161-
? new SqlBulkCopy(cn)
162-
: new SqlBulkCopy(cn, SqlBulkCopyOptions.CheckConstraints | SqlBulkCopyOptions.FireTriggers, null)
163-
)
164-
{
165-
copy.DestinationTableName = string.Format(CultureInfo.InvariantCulture, "[{0}].[{1}]", _sinkOptions.SchemaName, _sinkOptions.TableName);
166-
foreach (var column in _eventTable.Columns)
167-
{
168-
var columnName = ((DataColumn)column).ColumnName;
169-
var mapping = new SqlBulkCopyColumnMapping(columnName, columnName);
170-
copy.ColumnMappings.Add(mapping);
171-
}
172-
173-
await copy.WriteToServerAsync(_eventTable).ConfigureAwait(false);
174-
}
175-
}
176-
}
177-
catch (Exception ex)
178-
{
179-
SelfLog.WriteLine("Unable to write {0} log events to the database due to following error: {1}", events.Count(), ex.Message);
180-
}
181-
finally
182-
{
183-
// Processed the items, clear for the next run
184-
_eventTable.Clear();
185-
}
186-
}
152+
protected override Task EmitBatchAsync(IEnumerable<LogEvent> events) =>
153+
_sqlBulkBatchWriter.WriteBatch(events, _eventTable);
187154

188155
/// <summary>
189156
/// Disposes the connection
@@ -197,23 +164,5 @@ protected override void Dispose(bool disposing)
197164
_eventTable.Dispose();
198165
}
199166
}
200-
201-
private void FillDataTable(IEnumerable<LogEvent> events)
202-
{
203-
// Add the new rows to the collection.
204-
foreach (var logEvent in events)
205-
{
206-
var row = _eventTable.NewRow();
207-
208-
foreach (var field in _logEventDataGenerator.GetColumnsAndValues(logEvent))
209-
{
210-
row[field.Key] = field.Value;
211-
}
212-
213-
_eventTable.Rows.Add(row);
214-
}
215-
216-
_eventTable.AcceptChanges();
217-
}
218167
}
219168
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using System.Collections.Generic;
2+
using System.Data;
3+
using System.Threading.Tasks;
4+
using Serilog.Events;
5+
6+
namespace Serilog.Sinks.MSSqlServer.Sinks.MSSqlServer.Platform
7+
{
8+
internal interface ISqlBulkBatchWriter
9+
{
10+
Task WriteBatch(IEnumerable<LogEvent> events, DataTable dataTable);
11+
}
12+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Data;
4+
using System.Data.SqlClient;
5+
using System.Globalization;
6+
using System.Linq;
7+
using System.Threading.Tasks;
8+
using Serilog.Debugging;
9+
using Serilog.Events;
10+
using Serilog.Sinks.MSSqlServer.Output;
11+
12+
namespace Serilog.Sinks.MSSqlServer.Sinks.MSSqlServer.Platform
13+
{
14+
internal class SqlBulkBatchWriter : ISqlBulkBatchWriter
15+
{
16+
private readonly string _tableName;
17+
private readonly string _schemaName;
18+
private readonly bool _disableTriggers;
19+
private readonly ISqlConnectionFactory _sqlConnectionFactory;
20+
private readonly ILogEventDataGenerator _logEventDataGenerator;
21+
22+
public SqlBulkBatchWriter(
23+
string tableName,
24+
string schemaName,
25+
bool disableTriggers,
26+
ISqlConnectionFactory sqlConnectionFactory,
27+
ILogEventDataGenerator logEventDataGenerator)
28+
{
29+
_tableName = tableName ?? throw new ArgumentNullException(nameof(tableName));
30+
_schemaName = schemaName ?? throw new ArgumentNullException(nameof(schemaName));
31+
_disableTriggers = disableTriggers;
32+
_sqlConnectionFactory = sqlConnectionFactory ?? throw new ArgumentNullException(nameof(sqlConnectionFactory));
33+
_logEventDataGenerator = logEventDataGenerator ?? throw new ArgumentNullException(nameof(logEventDataGenerator));
34+
}
35+
36+
public async Task WriteBatch(IEnumerable<LogEvent> events, DataTable dataTable)
37+
{
38+
// Copy the events to the data table
39+
FillDataTable(events, dataTable);
40+
41+
try
42+
{
43+
using (var cn = _sqlConnectionFactory.Create())
44+
{
45+
await cn.OpenAsync().ConfigureAwait(false);
46+
using (var copy = _disableTriggers
47+
? new SqlBulkCopy(cn)
48+
: new SqlBulkCopy(cn, SqlBulkCopyOptions.CheckConstraints | SqlBulkCopyOptions.FireTriggers, null)
49+
)
50+
{
51+
copy.DestinationTableName = string.Format(CultureInfo.InvariantCulture, "[{0}].[{1}]", _schemaName, _tableName);
52+
foreach (var column in dataTable.Columns)
53+
{
54+
var columnName = ((DataColumn)column).ColumnName;
55+
var mapping = new SqlBulkCopyColumnMapping(columnName, columnName);
56+
copy.ColumnMappings.Add(mapping);
57+
}
58+
59+
await copy.WriteToServerAsync(dataTable).ConfigureAwait(false);
60+
}
61+
}
62+
}
63+
catch (Exception ex)
64+
{
65+
SelfLog.WriteLine("Unable to write {0} log events to the database due to following error: {1}", events.Count(), ex.Message);
66+
}
67+
finally
68+
{
69+
// Processed the items, clear for the next run
70+
dataTable.Clear();
71+
}
72+
}
73+
74+
private void FillDataTable(IEnumerable<LogEvent> events, DataTable dataTable)
75+
{
76+
// Add the new rows to the collection.
77+
foreach (var logEvent in events)
78+
{
79+
var row = dataTable.NewRow();
80+
81+
foreach (var field in _logEventDataGenerator.GetColumnsAndValues(logEvent))
82+
{
83+
row[field.Key] = field.Value;
84+
}
85+
86+
dataTable.Rows.Add(row);
87+
}
88+
89+
dataTable.AcceptChanges();
90+
}
91+
}
92+
}

test/Serilog.Sinks.MSSqlServer.Tests/Sinks/MSSqlServer/MSSqlServerSinkTests.cs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,8 @@ public class MSSqlServerSinkTests : IDisposable
1616
{
1717
private readonly SinkDependencies _sinkDependencies;
1818
private readonly Mock<IDataTableCreator> _dataTableCreatorMock;
19-
private readonly Mock<ISqlConnectionFactory> _sqlConnectionFactoryMock;
2019
private readonly Mock<ISqlTableCreator> _sqlTableCreatorMock;
21-
private readonly Mock<ILogEventDataGenerator> _logEventDataGeneratorMock;
20+
private readonly Mock<ISqlBulkBatchWriter> _sqlBulkBatchWriter;
2221
private readonly string _tableName = "tableName";
2322
private readonly string _schemaName = "schemaName";
2423
private readonly DataTable _dataTable;
@@ -32,15 +31,13 @@ public MSSqlServerSinkTests()
3231
_dataTableCreatorMock.Setup(d => d.CreateDataTable(It.IsAny<string>(), It.IsAny<Serilog.Sinks.MSSqlServer.ColumnOptions>()))
3332
.Returns(_dataTable);
3433

35-
_sqlConnectionFactoryMock = new Mock<ISqlConnectionFactory>();
3634
_sqlTableCreatorMock = new Mock<ISqlTableCreator>();
37-
_logEventDataGeneratorMock = new Mock<ILogEventDataGenerator>();
35+
_sqlBulkBatchWriter = new Mock<ISqlBulkBatchWriter>();
3836
_sinkDependencies = new SinkDependencies
3937
{
4038
DataTableCreator = _dataTableCreatorMock.Object,
41-
SqlConnectionFactory = _sqlConnectionFactoryMock.Object,
4239
SqlTableCreator = _sqlTableCreatorMock.Object,
43-
LogEventDataGenerator = _logEventDataGeneratorMock.Object
40+
SqlBulkBatchWriter = _sqlBulkBatchWriter.Object
4441
};
4542
}
4643

0 commit comments

Comments
 (0)