Skip to content

Commit d61fd76

Browse files
committed
Add MySqlBulkCopyColumnMapping. Fixes #773
1 parent 2650579 commit d61fd76

File tree

4 files changed

+189
-11
lines changed

4 files changed

+189
-11
lines changed

docs/content/api/mysql-bulk-copy.md

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
lastmod: 2020-04-04
2+
lastmod: 2020-05-02
33
date: 2019-11-11
44
menu:
55
main:
@@ -17,9 +17,6 @@ for SQL Server.
1717
Due to [security features](../troubleshooting/load-data-local-infile/) in MySQL Server, the connection string
1818
**must** have `AllowLoadLocalInfile=true` in order to use this class.
1919

20-
`MySqlBulkCopy` currently requires that the source data (`DataTable` or `IDataReader`) contain all
21-
the columns of the destination table in the same order.
22-
2320
For data that is in CSV or TSV format, use [`MySqlBulkLoader`](api/mysql-bulk-loader/) to bulk load the file.
2421

2522
**Note:** This API is a unique feature of MySqlConnector; you must [switch to MySqlConnector](../../overview/installing/)
@@ -63,6 +60,14 @@ Name of the destination table on the server. (This name shouldn't be quoted or e
6360

6461
If non-zero, this defines the number of rows to be processed before generating a notification event.
6562

63+
`public List<MySqlBulkCopyColumnMapping> ColumnMappings { get; }`
64+
65+
A collection of `MySqlBulkCopyColumnMapping` objects. If the columns being copied from the
66+
data source line up one-to-one with the columns in the destination table then populating this collection is
67+
unnecessary. Otherwise, this should be filled with a collection of `MySqlBulkCopyColumnMapping` objects
68+
specifying how source columns are to be mapped onto destination columns. If one column mapping is specified,
69+
then all must be specified.
70+
6671
### Methods
6772

6873
`public void WriteToServer(DataTable dataTable);`
@@ -101,3 +106,41 @@ Receipt of a `RowsCopied` event does not imply that any rows have been sent to t
101106

102107
The `MySqlRowsCopiedEventArgs.Abort` property can be set to `true` by the event handler to abort
103108
the copy.
109+
110+
## MySqlBulkCopyColumnMapping
111+
112+
Use `MySqlBulkCopyColumnMapping` to specify how to map columns in the source data to
113+
columns in the destination table.
114+
115+
Set `SourceOrdinal` to the index of the source column to map. Set `DestinationColumn` to
116+
either the name of a column in the destination table, or the name of a user-defined variable.
117+
If a user-defined variable, you can use `Expression` to specify a MySQL expression that sets
118+
a destination column.
119+
120+
Source columns that don't have an entry in `MySqlBulkCopy.ColumnMappings` will be ignored
121+
(unless the `ColumnMappings` collection is empty, in which case all columns will be mapped
122+
one-to-one).
123+
124+
Columns containing binary data must be mapped using an expression that uses the `UNHEX` function.
125+
126+
### Examples
127+
128+
```csharp
129+
new MySqlBulkCopyColumnMapping
130+
{
131+
SourceOrdinal = 2,
132+
DestinationColumn = "user_name",
133+
},
134+
new MySqlBulkCopyColumnMapping
135+
{
136+
SourceOrdinal = 0,
137+
DestinationColumn = "@tmp",
138+
Expression = "SET column_value = @tmp * 2",
139+
},
140+
new MySqlBulkCopyColumnMapping
141+
{
142+
SourceOrdinal = 1,
143+
DestinationColumn = "@tmp2",
144+
Expression = "SET binary_column = UNHEX(@tmp2)",
145+
},
146+
```

src/MySqlConnector/MySql.Data.MySqlClient/MySqlBulkCopy.cs

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.Buffers.Text;
44
using System.Collections.Generic;
55
using System.Data;
6+
using System.Linq;
67
using System.Text;
78
using System.Threading;
89
using System.Threading.Tasks;
@@ -20,6 +21,7 @@ public MySqlBulkCopy(MySqlConnection connection, MySqlTransaction? transaction =
2021
{
2122
m_connection = connection ?? throw new ArgumentNullException(nameof(connection));
2223
m_transaction = transaction;
24+
ColumnMappings = new List<MySqlBulkCopyColumnMapping>();
2325
}
2426

2527
public int BulkCopyTimeout { get; set; }
@@ -44,6 +46,15 @@ public MySqlBulkCopy(MySqlConnection connection, MySqlTransaction? transaction =
4446
/// </remarks>
4547
public event MySqlRowsCopiedEventHandler? RowsCopied;
4648

49+
/// <summary>
50+
/// A collection of <see cref="MySqlBulkCopyColumnMapping"/> objects. If the columns being copied from the
51+
/// data source line up one-to-one with the columns in the destination table then populating this collection is
52+
/// unnecessary. Otherwise, this should be filled with a collection of <see cref="MySqlBulkCopyColumnMapping"/> objects
53+
/// specifying how source columns are to be mapped onto destination columns. If one column mapping is specified,
54+
/// then all must be specified.
55+
/// </summary>
56+
public List<MySqlBulkCopyColumnMapping> ColumnMappings { get; }
57+
4758
#if !NETSTANDARD1_3
4859
public void WriteToServer(DataTable dataTable)
4960
{
@@ -134,16 +145,17 @@ private async ValueTask WriteToServerAsync(IOBehavior ioBehavior, CancellationTo
134145
closeConnection = true;
135146
}
136147

137-
using (var cmd = new MySqlCommand("select * from " + QuoteIdentifier(tableName) + ";", m_connection, m_transaction))
138-
using (var reader = (MySqlDataReader) await cmd.ExecuteReaderAsync(CommandBehavior.SchemaOnly, ioBehavior, cancellationToken).ConfigureAwait(false))
148+
// if no user-supplied column mappings, compute them from the destination schema
149+
if (ColumnMappings.Count == 0)
139150
{
151+
using var cmd = new MySqlCommand("select * from " + QuoteIdentifier(tableName) + ";", m_connection, m_transaction);
152+
using var reader = (MySqlDataReader) await cmd.ExecuteReaderAsync(CommandBehavior.SchemaOnly, ioBehavior, cancellationToken).ConfigureAwait(false);
140153
var schema = reader.GetColumnSchema();
141154
for (var i = 0; i < schema.Count; i++)
142155
{
143156
if (schema[i].DataTypeName == "BIT")
144157
{
145-
bulkLoader.Columns.Add($"@col{i}");
146-
bulkLoader.Expressions.Add($"`{reader.GetName(i)}` = CAST(@col{i} AS UNSIGNED)");
158+
ColumnMappings.Add(new MySqlBulkCopyColumnMapping(i, $"@col{i}", $"`{reader.GetName(i)}` = CAST(@col{i} AS UNSIGNED)"));
147159
}
148160
else if (schema[i].DataTypeName == "YEAR")
149161
{
@@ -155,17 +167,37 @@ private async ValueTask WriteToServerAsync(IOBehavior ioBehavior, CancellationTo
155167
var type = schema[i].DataType;
156168
if (type == typeof(byte[]) || (type == typeof(Guid) && (m_connection.GuidFormat == MySqlGuidFormat.Binary16 || m_connection.GuidFormat == MySqlGuidFormat.LittleEndianBinary16 || m_connection.GuidFormat == MySqlGuidFormat.TimeSwapBinary16)))
157169
{
158-
bulkLoader.Columns.Add($"@col{i}");
159-
bulkLoader.Expressions.Add($"`{reader.GetName(i)}` = UNHEX(@col{i})");
170+
ColumnMappings.Add(new MySqlBulkCopyColumnMapping(i, $"@col{i}", $"`{reader.GetName(i)}` = UNHEX(@col{i})"));
160171
}
161172
else
162173
{
163-
bulkLoader.Columns.Add(QuoteIdentifier(reader.GetName(i)));
174+
ColumnMappings.Add(new MySqlBulkCopyColumnMapping(i, reader.GetName(i)));
164175
}
165176
}
166177
}
167178
}
168179

180+
// set columns and expressions from the column mappings
181+
for (var i = 0; i < m_valuesEnumerator!.FieldCount; i++)
182+
{
183+
var columnMapping = ColumnMappings.FirstOrDefault(x => x.SourceOrdinal == i);
184+
if (columnMapping is null)
185+
{
186+
bulkLoader.Columns.Add("@`\uE002\bignore`");
187+
}
188+
else
189+
{
190+
if (columnMapping.DestinationColumn.Length == 0)
191+
throw new InvalidOperationException("MySqlBulkCopyColumnMapping.DestinationName is not set.");
192+
if (columnMapping.DestinationColumn[0] == '@')
193+
bulkLoader.Columns.Add(columnMapping.DestinationColumn);
194+
else
195+
bulkLoader.Columns.Add(QuoteIdentifier(columnMapping.DestinationColumn));
196+
if (columnMapping.Expression is object)
197+
bulkLoader.Expressions.Add(columnMapping.Expression);
198+
}
199+
}
200+
169201
await bulkLoader.LoadAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
170202

171203
if (closeConnection)
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
using System;
2+
3+
namespace MySql.Data.MySqlClient
4+
{
5+
/// <summary>
6+
/// <see cref="MySqlBulkCopyColumnMapping"/> specifies how to map columns in the source data to
7+
/// destination columns when using <see cref="MySqlBulkCopy"/>.
8+
/// </summary>
9+
public sealed class MySqlBulkCopyColumnMapping
10+
{
11+
/// <summary>
12+
/// Initializes <see cref="MySqlBulkCopyColumnMapping"/> with the default values.
13+
/// </summary>
14+
public MySqlBulkCopyColumnMapping()
15+
{
16+
DestinationColumn = "";
17+
}
18+
19+
/// <summary>
20+
/// Initializes <see cref="MySqlBulkCopyColumnMapping"/> to the specified values.
21+
/// </summary>
22+
/// <param name="sourceOrdinal">The ordinal position of the source column.</param>
23+
/// <param name="destinationColumn">The name of the destination column.</param>
24+
/// <param name="expression">The optional expression to be used to set the destination column.</param>
25+
public MySqlBulkCopyColumnMapping(int sourceOrdinal, string destinationColumn, string? expression = null)
26+
{
27+
SourceOrdinal = sourceOrdinal;
28+
DestinationColumn = destinationColumn ?? throw new ArgumentNullException(nameof(destinationColumn));
29+
Expression = expression;
30+
}
31+
32+
/// <summary>
33+
/// The ordinal position of the source column to map from.
34+
/// </summary>
35+
public int SourceOrdinal { get; set; }
36+
37+
/// <summary>
38+
/// The name of the destination column to copy to. To use an expression, this should be the name of a unique user-defined variable.
39+
/// </summary>
40+
public string DestinationColumn { get; set; }
41+
42+
/// <summary>
43+
/// An optional expression for setting a destination column. To use an expression, the <see cref="DestinationColumn"/> should
44+
/// be set to the name of a user-defined variable and this expression should set a column using that variable.
45+
/// </summary>
46+
/// <remarks>To populate a binary column, you must set <see cref="DestinationColumn"/> to a variable name, and <see cref="Expression"/> to an
47+
/// expression that uses <code>UNHEX</code> to set the column value, e.g., <code>`destColumn` = UNHEX(@variableName)</code>.</remarks>
48+
public string? Expression { get; set; }
49+
}
50+
}

tests/SideBySide/BulkLoaderSync.cs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -784,6 +784,59 @@ public void BulkCopyAbort(int notifyAfter, int rowCount, int abortAfter, int exp
784784
using (var cmd = new MySqlCommand("select count(value) from bulk_copy_abort;", connection))
785785
Assert.Equal(expectedCount, cmd.ExecuteScalar());
786786
}
787+
788+
[Fact]
789+
public void BulkCopyColumnMappings()
790+
{
791+
using var connection = new MySqlConnection(GetLocalConnectionString());
792+
connection.Open();
793+
using (var cmd = new MySqlCommand(@"drop table if exists bulk_copy_column_mapping;
794+
create table bulk_copy_column_mapping(intvalue int, `text` text);", connection))
795+
{
796+
cmd.ExecuteNonQuery();
797+
}
798+
799+
var bulkCopy = new MySqlBulkCopy(connection)
800+
{
801+
DestinationTableName = "bulk_copy_column_mapping",
802+
ColumnMappings =
803+
{
804+
new MySqlBulkCopyColumnMapping(1, "@val", "intvalue = @val + 1"),
805+
new MySqlBulkCopyColumnMapping(3, "text"),
806+
},
807+
};
808+
809+
var dataTable = new DataTable()
810+
{
811+
Columns =
812+
{
813+
new DataColumn("c1", typeof(int)),
814+
new DataColumn("c2", typeof(int)),
815+
new DataColumn("c3", typeof(string)),
816+
new DataColumn("c4", typeof(string)),
817+
},
818+
Rows =
819+
{
820+
new object[] { 1, 100, "a", "A" },
821+
new object[] { 2, 200, "bb", "BB" },
822+
new object[] { 3, 300, "ccc", "CCC" },
823+
}
824+
};
825+
826+
bulkCopy.WriteToServer(dataTable);
827+
828+
using var reader = connection.ExecuteReader(@"select * from bulk_copy_column_mapping;");
829+
Assert.True(reader.Read());
830+
Assert.Equal(101, reader.GetValue(0));
831+
Assert.Equal("A", reader.GetValue(1));
832+
Assert.True(reader.Read());
833+
Assert.Equal(201, reader.GetValue(0));
834+
Assert.Equal("BB", reader.GetValue(1));
835+
Assert.True(reader.Read());
836+
Assert.Equal(301, reader.GetValue(0));
837+
Assert.Equal("CCC", reader.GetValue(1));
838+
Assert.False(reader.Read());
839+
}
787840
#endif
788841

789842
[Fact]

0 commit comments

Comments
 (0)