Skip to content

RDMP-355 SQL Merge Extraction Destination Component#2320

Merged
rdteviotdale merged 14 commits intodevelopfrom
task/simple-mssql-merge
Mar 6, 2026
Merged

RDMP-355 SQL Merge Extraction Destination Component#2320
rdteviotdale merged 14 commits intodevelopfrom
task/simple-mssql-merge

Conversation

@JFriel
Copy link
Collaborator

@JFriel JFriel commented Mar 6, 2026

Proposed Change

Create new component to simplify meging extraction data into existing database tables

Type of change

What types of changes does your code introduce? Tick all that apply.

  • Bugfix (non-breaking change which fixes an issue)
  • New Feature (non-breaking change which adds functionality)
  • Breaking Change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation-Only Update
  • Other (if none of the other choices apply)

Checklist

By opening this PR, I confirm that I have:

  • Ensured that the PR branch is in sync with the target branch (i.e. it is automatically merge-able)
  • Created or updated any tests if relevant
  • Have validated this change against the Test Plan
  • Requested a review by one of the repository maintainers
  • Have written new documentation or updated existing documentation to detail any new or updated functionality and how to use it
  • Have added an entry into the changelog

var _managedConnection = tmpTbl.Database.Server.GetManagedConnection();
var _bulkcopy = tmpTbl.BeginBulkInsert(CultureInfo.CurrentCulture, _managedConnection.ManagedTransaction);
_bulkcopy.Upload(toProcess);
_bulkcopy.Dispose();

Check warning

Code scanning / CodeQL

Dispose may not be called if an exception is thrown during execution Warning

Dispose missed if exception is thrown by
call to method Upload
.

Copilot Autofix

AI 12 days ago

In general, to fix this class of issue, ensure that any IDisposable object is disposed even if an exception occurs between its creation and the intended Dispose call. The idiomatic C# solution is to wrap such objects in a using statement (or using declaration in newer C#) so that Dispose is called automatically in a finally-like manner. If a using is not suitable, a try/finally block explicitly calling Dispose in finally is appropriate.

Here, the problem is _bulkcopy, created on line 263 and disposed on line 265. The single best fix with minimal functional change is to move the _bulkcopy variable into a using statement, calling Upload inside the using block and removing the explicit _bulkcopy.Dispose() call. This keeps the same logic and order of operations, while guaranteeing disposal if Upload or any subsequent code in that block throws. No new methods or external libraries are needed. The change is localized to Rdmp.Core/DataExport/DataExtraction/Pipeline/Destinations/MSSqlMergeDestination.cs around lines 263–265.

Suggested changeset 1
Rdmp.Core/DataExport/DataExtraction/Pipeline/Destinations/MSSqlMergeDestination.cs

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/Rdmp.Core/DataExport/DataExtraction/Pipeline/Destinations/MSSqlMergeDestination.cs b/Rdmp.Core/DataExport/DataExtraction/Pipeline/Destinations/MSSqlMergeDestination.cs
--- a/Rdmp.Core/DataExport/DataExtraction/Pipeline/Destinations/MSSqlMergeDestination.cs
+++ b/Rdmp.Core/DataExport/DataExtraction/Pipeline/Destinations/MSSqlMergeDestination.cs
@@ -260,9 +260,10 @@
                 $"mergeTempTable_{tableName}_{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff",CultureInfo.InvariantCulture).Replace('.','-')}",
                 toProcess, null,true, null);
             var _managedConnection = tmpTbl.Database.Server.GetManagedConnection();
-            var _bulkcopy = tmpTbl.BeginBulkInsert(CultureInfo.CurrentCulture, _managedConnection.ManagedTransaction);
-            _bulkcopy.Upload(toProcess);
-            _bulkcopy.Dispose();
+            using (var _bulkcopy = tmpTbl.BeginBulkInsert(CultureInfo.CurrentCulture, _managedConnection.ManagedTransaction))
+            {
+                _bulkcopy.Upload(toProcess);
+            }
             var mergeSql = $"""
                 MERGE INTO {destinationTable.GetFullyQualifiedName()} WITH (HOLDLOCK) AS target
                 USING {tmpTbl.GetFullyQualifiedName()} AS source
EOF
@@ -260,9 +260,10 @@
$"mergeTempTable_{tableName}_{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff",CultureInfo.InvariantCulture).Replace('.','-')}",
toProcess, null,true, null);
var _managedConnection = tmpTbl.Database.Server.GetManagedConnection();
var _bulkcopy = tmpTbl.BeginBulkInsert(CultureInfo.CurrentCulture, _managedConnection.ManagedTransaction);
_bulkcopy.Upload(toProcess);
_bulkcopy.Dispose();
using (var _bulkcopy = tmpTbl.BeginBulkInsert(CultureInfo.CurrentCulture, _managedConnection.ManagedTransaction))
{
_bulkcopy.Upload(toProcess);
}
var mergeSql = $"""
MERGE INTO {destinationTable.GetFullyQualifiedName()} WITH (HOLDLOCK) AS target
USING {tmpTbl.GetFullyQualifiedName()} AS source
Copilot is powered by AI and may make mistakes. Always verify output.
""":"")}
""";
job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"{mergeSql}"));
var cmd = new SqlCommand(mergeSql, (SqlConnection)_managedConnection.Connection);

Check warning

Code scanning / CodeQL

Missing Dispose call on local IDisposable Warning

Disposable 'SqlCommand' is created but not disposed.

Copilot Autofix

AI 12 days ago

In general, the correct fix is to ensure that the SqlCommand instance is disposed deterministically, typically via a using statement or using declaration that wraps its creation and use. This guarantees that any unmanaged resources associated with the command are released promptly, even if exceptions occur.

For this specific code, the simplest and least intrusive change is to wrap the creation and use of cmd in a using block. We only need to modify the block starting at line 281 in Rdmp.Core/DataExport/DataExtraction/Pipeline/Destinations/MSSqlMergeDestination.cs. Replace the standalone creation of cmd and its use with a using (var cmd = new SqlCommand(...)) { ... } block that contains the ExecuteNonQuery, the success logging, and the optional temp table drop. This keeps the existing behavior, ensures cmd.Dispose() is always called, and does not require any new imports or changes elsewhere. _managedConnection.Dispose() should remain after the using block so that the command is disposed before the connection/transaction wrapper is disposed.

Suggested changeset 1
Rdmp.Core/DataExport/DataExtraction/Pipeline/Destinations/MSSqlMergeDestination.cs

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/Rdmp.Core/DataExport/DataExtraction/Pipeline/Destinations/MSSqlMergeDestination.cs b/Rdmp.Core/DataExport/DataExtraction/Pipeline/Destinations/MSSqlMergeDestination.cs
--- a/Rdmp.Core/DataExport/DataExtraction/Pipeline/Destinations/MSSqlMergeDestination.cs
+++ b/Rdmp.Core/DataExport/DataExtraction/Pipeline/Destinations/MSSqlMergeDestination.cs
@@ -278,10 +278,12 @@
                 """:"")}
                 """;
             job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"{mergeSql}"));
-            var cmd = new SqlCommand(mergeSql, (SqlConnection)_managedConnection.Connection);
-            var rowCount = cmd.ExecuteNonQuery();
-            job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"Merged {rowCount} rows into {destinationTable.GetFullyQualifiedName()}."));
-            if(DeleteMergeTempTable)tmpTbl.Drop();
+            using (var cmd = new SqlCommand(mergeSql, (SqlConnection)_managedConnection.Connection))
+            {
+                var rowCount = cmd.ExecuteNonQuery();
+                job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"Merged {rowCount} rows into {destinationTable.GetFullyQualifiedName()}."));
+                if (DeleteMergeTempTable) tmpTbl.Drop();
+            }
             _managedConnection.Dispose();
         }
     }
EOF
@@ -278,10 +278,12 @@
""":"")}
""";
job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"{mergeSql}"));
var cmd = new SqlCommand(mergeSql, (SqlConnection)_managedConnection.Connection);
var rowCount = cmd.ExecuteNonQuery();
job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"Merged {rowCount} rows into {destinationTable.GetFullyQualifiedName()}."));
if(DeleteMergeTempTable)tmpTbl.Drop();
using (var cmd = new SqlCommand(mergeSql, (SqlConnection)_managedConnection.Connection))
{
var rowCount = cmd.ExecuteNonQuery();
job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"Merged {rowCount} rows into {destinationTable.GetFullyQualifiedName()}."));
if (DeleteMergeTempTable) tmpTbl.Drop();
}
_managedConnection.Dispose();
}
}
Copilot is powered by AI and may make mistakes. Always verify output.
var rowCount = cmd.ExecuteNonQuery();
job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"Merged {rowCount} rows into {destinationTable.GetFullyQualifiedName()}."));
if(DeleteMergeTempTable)tmpTbl.Drop();
_managedConnection.Dispose();

Check warning

Code scanning / CodeQL

Dispose may not be called if an exception is thrown during execution Warning

Dispose missed if exception is thrown by
call to method BeginBulkInsert
.
Dispose missed if exception is thrown by
call to method Upload
.
Dispose missed if exception is thrown by
call to method GetFullyQualifiedName
.
Dispose missed if exception is thrown by
call to method GetFullyQualifiedName
.
Dispose missed if exception is thrown by
call to method Join
.
Dispose missed if exception is thrown by
call to method Select<DataColumn,String>
.
Dispose missed if exception is thrown by
call to method Join
.
Dispose missed if exception is thrown by
call to method Select<DataColumn,String>
.
Dispose missed if exception is thrown by
call to method Join
.
Dispose missed if exception is thrown by
call to method Select<DataColumn,String>
.
Dispose missed if exception is thrown by
call to method Cast
.
Dispose missed if exception is thrown by
call to method Join
.
Dispose missed if exception is thrown by
call to method Select<DataColumn,String>
.
Dispose missed if exception is thrown by
call to method Cast
.
Dispose missed if exception is thrown by
call to method OnNotify
.
Dispose missed if exception is thrown by
call to method ExecuteNonQuery
.
Dispose missed if exception is thrown by
call to method OnNotify
.
Dispose missed if exception is thrown by
call to method GetFullyQualifiedName
.
Dispose missed if exception is thrown by
call to method Drop
.

Copilot Autofix

AI 12 days ago

In general, to fix this kind of issue you ensure that every IDisposable resource is disposed even if an exception occurs between its creation and its intended Dispose call. In C#, that is normally achieved with using statements or, if using can’t be used, a try/finally that calls Dispose in the finally block. This guarantees cleanup regardless of how control leaves the block.

For this specific method, we need to guarantee disposal of _managedConnection, _bulkcopy, and also SqlCommand cmd (which currently is never disposed at all). The minimal change that preserves existing functionality is to introduce nested using blocks: one around the managed connection, one around the bulk copy, and one around the SqlCommand. All existing logic (creating the temp table, performing the bulk copy, building mergeSql, logging, executing the command, optionally dropping the temp table) can remain the same, just indented inside these using scopes. This ensures that if any of the operations flagged in the alert (e.g., BeginBulkInsert, Upload, building mergeSql, OnNotify, ExecuteNonQuery, Drop) throw, the using blocks will still dispose the relevant objects.

Concretely, in MSSqlMergeDestination.WriteRows (in Rdmp.Core/DataExport/DataExtraction/Pipeline/Destinations/MSSqlMergeDestination.cs), replace the section starting at the creation of _managedConnection (current line 262) through the end of the method (current line 286) with code that:

  • Uses using (var _managedConnection = tmpTbl.Database.Server.GetManagedConnection()) instead of a bare var.
  • Inside that, uses using (var _bulkcopy = tmpTbl.BeginBulkInsert(...)) instead of a bare var plus explicit Dispose.
  • After building mergeSql, uses using (var cmd = new SqlCommand(...)).
  • Keeps the job.OnNotify, ExecuteNonQuery, tmpTbl.Drop(), and logic unchanged, only indented.
  • Removes the explicit _bulkcopy.Dispose(); and _managedConnection.Dispose(); calls, since using will now handle disposal automatically.

This single restructuring addresses all 19 variants: any exception from any of the flagged calls will still result in the connection, bulk copy, and command being disposed.

Suggested changeset 1
Rdmp.Core/DataExport/DataExtraction/Pipeline/Destinations/MSSqlMergeDestination.cs

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/Rdmp.Core/DataExport/DataExtraction/Pipeline/Destinations/MSSqlMergeDestination.cs b/Rdmp.Core/DataExport/DataExtraction/Pipeline/Destinations/MSSqlMergeDestination.cs
--- a/Rdmp.Core/DataExport/DataExtraction/Pipeline/Destinations/MSSqlMergeDestination.cs
+++ b/Rdmp.Core/DataExport/DataExtraction/Pipeline/Destinations/MSSqlMergeDestination.cs
@@ -259,11 +259,13 @@
                 out Dictionary<string, Guesser> _dataTypeDictionary,
                 $"mergeTempTable_{tableName}_{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff",CultureInfo.InvariantCulture).Replace('.','-')}",
                 toProcess, null,true, null);
-            var _managedConnection = tmpTbl.Database.Server.GetManagedConnection();
-            var _bulkcopy = tmpTbl.BeginBulkInsert(CultureInfo.CurrentCulture, _managedConnection.ManagedTransaction);
-            _bulkcopy.Upload(toProcess);
-            _bulkcopy.Dispose();
-            var mergeSql = $"""
+            using (var _managedConnection = tmpTbl.Database.Server.GetManagedConnection())
+            {
+                using (var _bulkcopy = tmpTbl.BeginBulkInsert(CultureInfo.CurrentCulture, _managedConnection.ManagedTransaction))
+                {
+                    _bulkcopy.Upload(toProcess);
+                }
+                var mergeSql = $"""
                 MERGE INTO {destinationTable.GetFullyQualifiedName()} WITH (HOLDLOCK) AS target
                 USING {tmpTbl.GetFullyQualifiedName()} AS source
                     ON {string.Join(" AND ", pkColumns.Select(pkc => $"target.{pkc.ColumnName} = source.{pkc.ColumnName}"))}
@@ -277,12 +279,14 @@
                     DELETE;
                 """:"")}
                 """;
-            job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"{mergeSql}"));
-            var cmd = new SqlCommand(mergeSql, (SqlConnection)_managedConnection.Connection);
-            var rowCount = cmd.ExecuteNonQuery();
-            job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"Merged {rowCount} rows into {destinationTable.GetFullyQualifiedName()}."));
-            if(DeleteMergeTempTable)tmpTbl.Drop();
-            _managedConnection.Dispose();
+                job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"{mergeSql}"));
+                using (var cmd = new SqlCommand(mergeSql, (SqlConnection)_managedConnection.Connection))
+                {
+                    var rowCount = cmd.ExecuteNonQuery();
+                    job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"Merged {rowCount} rows into {destinationTable.GetFullyQualifiedName()}."));
+                }
+                if (DeleteMergeTempTable) tmpTbl.Drop();
+            }
         }
     }
 }
EOF
@@ -259,11 +259,13 @@
out Dictionary<string, Guesser> _dataTypeDictionary,
$"mergeTempTable_{tableName}_{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff",CultureInfo.InvariantCulture).Replace('.','-')}",
toProcess, null,true, null);
var _managedConnection = tmpTbl.Database.Server.GetManagedConnection();
var _bulkcopy = tmpTbl.BeginBulkInsert(CultureInfo.CurrentCulture, _managedConnection.ManagedTransaction);
_bulkcopy.Upload(toProcess);
_bulkcopy.Dispose();
var mergeSql = $"""
using (var _managedConnection = tmpTbl.Database.Server.GetManagedConnection())
{
using (var _bulkcopy = tmpTbl.BeginBulkInsert(CultureInfo.CurrentCulture, _managedConnection.ManagedTransaction))
{
_bulkcopy.Upload(toProcess);
}
var mergeSql = $"""
MERGE INTO {destinationTable.GetFullyQualifiedName()} WITH (HOLDLOCK) AS target
USING {tmpTbl.GetFullyQualifiedName()} AS source
ON {string.Join(" AND ", pkColumns.Select(pkc => $"target.{pkc.ColumnName} = source.{pkc.ColumnName}"))}
@@ -277,12 +279,14 @@
DELETE;
""":"")}
""";
job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"{mergeSql}"));
var cmd = new SqlCommand(mergeSql, (SqlConnection)_managedConnection.Connection);
var rowCount = cmd.ExecuteNonQuery();
job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"Merged {rowCount} rows into {destinationTable.GetFullyQualifiedName()}."));
if(DeleteMergeTempTable)tmpTbl.Drop();
_managedConnection.Dispose();
job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"{mergeSql}"));
using (var cmd = new SqlCommand(mergeSql, (SqlConnection)_managedConnection.Connection))
{
var rowCount = cmd.ExecuteNonQuery();
job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"Merged {rowCount} rows into {destinationTable.GetFullyQualifiedName()}."));
}
if (DeleteMergeTempTable) tmpTbl.Drop();
}
}
}
}
Copilot is powered by AI and may make mistakes. Always verify output.
Comment on lines +214 to +235
foreach (var dleListener in listeners)
{
IDataLoadInfo dataLoadInfo = dleListener.DataLoadInfo;
DataColumn newColumn = new(SpecialFieldNames.DataLoadRunID, typeof(int))
{
DefaultValue = dataLoadInfo.ID
};
try
{
destinationTable.DiscoverColumn(SpecialFieldNames.DataLoadRunID);
}
catch (Exception)
{
destinationTable.AddColumn(SpecialFieldNames.DataLoadRunID, new DatabaseTypeRequest(typeof(int)), true, 30000);

}
if (!toProcess.Columns.Contains(SpecialFieldNames.DataLoadRunID))
toProcess.Columns.Add(newColumn);
foreach (DataRow dr in toProcess.Rows)
dr[SpecialFieldNames.DataLoadRunID] = dataLoadInfo.ID;

}

Check notice

Code scanning / CodeQL

Missed opportunity to use Select Note

This foreach loop immediately
maps its iteration variable to another variable
- consider mapping the sequence explicitly using '.Select(...)'.

Copilot Autofix

AI 12 days ago

In general, to address this kind of issue you transform the foreach to iterate over a projected sequence using LINQ’s Select, so that the loop variable directly represents the derived value you actually use inside the loop. This removes the extra local mapping step and better communicates the loop’s intent.

Here, the loop currently iterates over dleListener instances and immediately maps each to its DataLoadInfo:

foreach (var dleListener in listeners)
{
    IDataLoadInfo dataLoadInfo = dleListener.DataLoadInfo;
    ...
}

The best fix is to change the foreach to iterate directly over IDataLoadInfo by applying .Select(l => l.DataLoadInfo) to listeners, and then remove the now-unnecessary local variable. Concretely, replace the foreach header and the first line in the loop body with:

foreach (IDataLoadInfo dataLoadInfo in listeners.Select(l => l.DataLoadInfo))
{
    DataColumn newColumn = new(SpecialFieldNames.DataLoadRunID, typeof(int))
    {
        DefaultValue = dataLoadInfo.ID
    };
    ...
}

We already have using System.Linq; at line 24, so no new imports are needed. No other files or regions need changes; everything is contained within the shown block in MSSqlMergeDestination.cs.

Suggested changeset 1
Rdmp.Core/DataExport/DataExtraction/Pipeline/Destinations/MSSqlMergeDestination.cs

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/Rdmp.Core/DataExport/DataExtraction/Pipeline/Destinations/MSSqlMergeDestination.cs b/Rdmp.Core/DataExport/DataExtraction/Pipeline/Destinations/MSSqlMergeDestination.cs
--- a/Rdmp.Core/DataExport/DataExtraction/Pipeline/Destinations/MSSqlMergeDestination.cs
+++ b/Rdmp.Core/DataExport/DataExtraction/Pipeline/Destinations/MSSqlMergeDestination.cs
@@ -211,9 +211,8 @@
             {
 
                 var listeners = ((ForkDataLoadEventListener)job).GetToLoggingDatabaseDataLoadEventListenersIfany();
-                foreach (var dleListener in listeners)
+                foreach (IDataLoadInfo dataLoadInfo in listeners.Select(l => l.DataLoadInfo))
                 {
-                    IDataLoadInfo dataLoadInfo = dleListener.DataLoadInfo;
                     DataColumn newColumn = new(SpecialFieldNames.DataLoadRunID, typeof(int))
                     {
                         DefaultValue = dataLoadInfo.ID
EOF
@@ -211,9 +211,8 @@
{

var listeners = ((ForkDataLoadEventListener)job).GetToLoggingDatabaseDataLoadEventListenersIfany();
foreach (var dleListener in listeners)
foreach (IDataLoadInfo dataLoadInfo in listeners.Select(l => l.DataLoadInfo))
{
IDataLoadInfo dataLoadInfo = dleListener.DataLoadInfo;
DataColumn newColumn = new(SpecialFieldNames.DataLoadRunID, typeof(int))
{
DefaultValue = dataLoadInfo.ID
Copilot is powered by AI and may make mistakes. Always verify output.
@JFriel JFriel marked this pull request as ready for review March 6, 2026 12:57
@rdteviotdale rdteviotdale merged commit 13f3314 into develop Mar 6, 2026
7 checks passed
@rdteviotdale rdteviotdale deleted the task/simple-mssql-merge branch March 6, 2026 14:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants