9
9
using System . Threading . Tasks ;
10
10
using MySql . Data . Types ;
11
11
using MySqlConnector . Core ;
12
+ using MySqlConnector . Logging ;
12
13
using MySqlConnector . Protocol ;
13
14
using MySqlConnector . Protocol . Serialization ;
14
15
using MySqlConnector . Utilities ;
@@ -150,17 +151,18 @@ private async ValueTask WriteToServerAsync(IOBehavior ioBehavior, CancellationTo
150
151
closeConnection = true ;
151
152
}
152
153
153
- // if no user-supplied column mappings, compute them from the destination schema
154
- if ( ColumnMappings . Count == 0 )
154
+ // merge column mappings with the destination schema
155
+ var columnMappings = new List < MySqlBulkCopyColumnMapping > ( ColumnMappings ) ;
156
+ var addDefaultMappings = columnMappings . Count == 0 ;
157
+ using ( var cmd = new MySqlCommand ( "select * from " + tableName + ";" , m_connection , m_transaction ) )
158
+ using ( var reader = ( MySqlDataReader ) await cmd . ExecuteReaderAsync ( CommandBehavior . SchemaOnly , ioBehavior , cancellationToken ) . ConfigureAwait ( false ) )
155
159
{
156
- using var cmd = new MySqlCommand ( "select * from " + tableName + ";" , m_connection , m_transaction ) ;
157
- using var reader = ( MySqlDataReader ) await cmd . ExecuteReaderAsync ( CommandBehavior . SchemaOnly , ioBehavior , cancellationToken ) . ConfigureAwait ( false ) ;
158
160
var schema = reader . GetColumnSchema ( ) ;
159
161
for ( var i = 0 ; i < schema . Count ; i ++ )
160
162
{
161
163
if ( schema [ i ] . DataTypeName == "BIT" )
162
164
{
163
- ColumnMappings . Add ( new MySqlBulkCopyColumnMapping ( i , $ "@col { i } ", $ "` { reader . GetName ( i ) } ` = CAST(@col { i } AS UNSIGNED)") ) ;
165
+ AddColumnMapping ( columnMappings , addDefaultMappings , i , reader . GetName ( i ) , $ "@` \uE002 \b col { i } ` ", $ "%COL% = CAST(%VAR% AS UNSIGNED)") ;
164
166
}
165
167
else if ( schema [ i ] . DataTypeName == "YEAR" )
166
168
{
@@ -172,11 +174,11 @@ private async ValueTask WriteToServerAsync(IOBehavior ioBehavior, CancellationTo
172
174
var type = schema [ i ] . DataType ;
173
175
if ( type == typeof ( byte [ ] ) || ( type == typeof ( Guid ) && ( m_connection . GuidFormat == MySqlGuidFormat . Binary16 || m_connection . GuidFormat == MySqlGuidFormat . LittleEndianBinary16 || m_connection . GuidFormat == MySqlGuidFormat . TimeSwapBinary16 ) ) )
174
176
{
175
- ColumnMappings . Add ( new MySqlBulkCopyColumnMapping ( i , $ "@col { i } ", $ "` { reader . GetName ( i ) } ` = UNHEX(@col { i } )" ) ) ;
177
+ AddColumnMapping ( columnMappings , addDefaultMappings , i , reader . GetName ( i ) , $ "@` \uE002 \b col { i } ` ", $ "%COL% = UNHEX(%VAR%)" ) ;
176
178
}
177
- else
179
+ else if ( addDefaultMappings )
178
180
{
179
- ColumnMappings . Add ( new MySqlBulkCopyColumnMapping ( i , reader . GetName ( i ) ) ) ;
181
+ columnMappings . Add ( new MySqlBulkCopyColumnMapping ( i , reader . GetName ( i ) ) ) ;
180
182
}
181
183
}
182
184
}
@@ -185,9 +187,10 @@ private async ValueTask WriteToServerAsync(IOBehavior ioBehavior, CancellationTo
185
187
// set columns and expressions from the column mappings
186
188
for ( var i = 0 ; i < m_valuesEnumerator ! . FieldCount ; i ++ )
187
189
{
188
- var columnMapping = ColumnMappings . FirstOrDefault ( x => x . SourceOrdinal == i ) ;
190
+ var columnMapping = columnMappings . FirstOrDefault ( x => x . SourceOrdinal == i ) ;
189
191
if ( columnMapping is null )
190
192
{
193
+ Log . Info ( "Ignoring column with SourceOrdinal {0}" , i ) ;
191
194
bulkLoader . Columns . Add ( "@`\uE002 \b ignore`" ) ;
192
195
}
193
196
else
@@ -213,6 +216,28 @@ private async ValueTask WriteToServerAsync(IOBehavior ioBehavior, CancellationTo
213
216
#endif
214
217
215
218
static string QuoteIdentifier ( string identifier ) => "`" + identifier . Replace ( "`" , "``" ) + "`" ;
219
+
220
+ static void AddColumnMapping ( List < MySqlBulkCopyColumnMapping > columnMappings , bool addDefaultMappings , int destinationOrdinal , string destinationColumn , string variableName , string expression )
221
+ {
222
+ expression = expression . Replace ( "%COL%" , "`" + destinationColumn + "`" ) . Replace ( "%VAR%" , variableName ) ;
223
+ var columnMapping = columnMappings . FirstOrDefault ( x => destinationColumn . Equals ( x . DestinationColumn , StringComparison . OrdinalIgnoreCase ) ) ;
224
+ if ( columnMapping is object )
225
+ {
226
+ if ( columnMapping . Expression is object )
227
+ {
228
+ Log . Warn ( "Column mapping for SourceOrdinal {0}, DestinationColumn {1} already has Expression {2}" , columnMapping . SourceOrdinal , columnMapping . DestinationColumn , columnMapping . Expression ) ;
229
+ }
230
+ else
231
+ {
232
+ columnMappings . Remove ( columnMapping ) ;
233
+ columnMappings . Add ( new MySqlBulkCopyColumnMapping ( columnMapping . SourceOrdinal , variableName , expression ) ) ;
234
+ }
235
+ }
236
+ else if ( addDefaultMappings )
237
+ {
238
+ columnMappings . Add ( new MySqlBulkCopyColumnMapping ( destinationOrdinal , variableName , expression ) ) ;
239
+ }
240
+ }
216
241
}
217
242
218
243
internal async Task SendDataReaderAsync ( IOBehavior ioBehavior , CancellationToken cancellationToken )
@@ -515,6 +540,7 @@ static bool WriteBytes(ReadOnlySpan<byte> value, Span<byte> output, out int byte
515
540
516
541
private static ReadOnlySpan < byte > EscapedNull => new byte [ ] { 0x5C , 0x4E } ;
517
542
private static readonly char [ ] s_specialCharacters = new char [ ] { '\t ' , '\\ ' , '\n ' } ;
543
+ private static readonly IMySqlConnectorLogger Log = MySqlConnectorLogManager . CreateLogger ( nameof ( MySqlBulkCopy ) ) ;
518
544
519
545
readonly MySqlConnection m_connection ;
520
546
readonly MySqlTransaction ? m_transaction ;
0 commit comments