2
2
using System . Collections . Generic ;
3
3
using System . Data ;
4
4
using System . Diagnostics . CodeAnalysis ;
5
+ using System . Globalization ;
5
6
using System . IO ;
6
7
using System . Text ;
7
8
using System . Threading ;
8
9
using System . Threading . Tasks ;
9
10
using MySqlConnector . Protocol . Serialization ;
11
+ using MySqlConnector . Utilities ;
10
12
11
13
namespace MySql . Data . MySqlClient
12
14
{
13
15
public sealed class MySqlBulkLoader
14
16
{
15
- private const string defaultFieldTerminator = "\t " ;
16
- private const string defaultLineTerminator = "\n " ;
17
- private const char defaultEscapeCharacter = '\\ ' ;
18
-
19
- private static readonly object s_lock = new object ( ) ;
20
- private static readonly Dictionary < string , object > s_sources = new Dictionary < string , object > ( ) ;
21
-
22
17
public string ? CharacterSet { get ; set ; }
23
18
public List < string > Columns { get ; }
24
19
public MySqlBulkLoaderConflictOption ConflictOption { get ; set ; }
@@ -58,90 +53,10 @@ public MySqlBulkLoader(MySqlConnection connection)
58
53
{
59
54
Connection = connection ;
60
55
Local = true ;
61
- FieldTerminator = defaultFieldTerminator ;
62
- LineTerminator = defaultLineTerminator ;
63
- FieldQuotationCharacter = '\0 ' ;
64
- ConflictOption = MySqlBulkLoaderConflictOption . None ;
65
56
Columns = new List < string > ( ) ;
66
57
Expressions = new List < string > ( ) ;
67
58
}
68
59
69
- private string BuildSqlCommand ( )
70
- {
71
- StringBuilder sqlCommandMain = new StringBuilder ( "LOAD DATA " ) ;
72
- if ( Priority == MySqlBulkLoaderPriority . Low )
73
- sqlCommandMain . Append ( "LOW_PRIORITY " ) ;
74
- else if ( Priority == MySqlBulkLoaderPriority . Concurrent )
75
- sqlCommandMain . Append ( "CONCURRENT " ) ;
76
-
77
- if ( Local )
78
- sqlCommandMain . Append ( "LOCAL " ) ;
79
-
80
- sqlCommandMain . Append ( "INFILE " ) ;
81
-
82
- if ( System . IO . Path . DirectorySeparatorChar != '\\ ' )
83
- sqlCommandMain . AppendFormat ( "'{0}' " , FileName ! ) ;
84
- else
85
- sqlCommandMain . AppendFormat ( "'{0}' " , FileName ! . Replace ( "\\ " , "\\ \\ " ) ) ;
86
-
87
- if ( ConflictOption == MySqlBulkLoaderConflictOption . Ignore )
88
- sqlCommandMain . Append ( "IGNORE " ) ;
89
- else if ( ConflictOption == MySqlBulkLoaderConflictOption . Replace )
90
- sqlCommandMain . Append ( "REPLACE " ) ;
91
-
92
- sqlCommandMain . AppendFormat ( "INTO TABLE {0} " , TableName ) ;
93
-
94
- if ( CharacterSet is object )
95
- sqlCommandMain . AppendFormat ( "CHARACTER SET {0} " , CharacterSet ) ;
96
-
97
- StringBuilder sqlCommandFragment = new StringBuilder ( ) ;
98
- if ( FieldTerminator != defaultFieldTerminator )
99
- sqlCommandFragment . AppendFormat ( "TERMINATED BY \' {0}\' " , FieldTerminator ) ;
100
-
101
- if ( FieldQuotationCharacter != 0 )
102
- sqlCommandFragment . AppendFormat ( "{0} ENCLOSED BY \' {1}\' " , ( FieldQuotationOptional ? "OPTIONALLY" : "" ) , FieldQuotationCharacter ) ;
103
-
104
- if ( EscapeCharacter != defaultEscapeCharacter && EscapeCharacter != 0 )
105
- sqlCommandFragment . AppendFormat ( "ESCAPED BY \' {0}\' " , EscapeCharacter ) ;
106
-
107
- if ( sqlCommandFragment . Length > 0 )
108
- {
109
- sqlCommandMain . AppendFormat ( "FIELDS {0}" , sqlCommandFragment . ToString ( ) ) ;
110
- sqlCommandFragment . Clear ( ) ;
111
- }
112
-
113
- if ( ! string . IsNullOrEmpty ( LinePrefix ) )
114
- sqlCommandFragment . AppendFormat ( "STARTING BY \' {0}\' " , LinePrefix ) ;
115
-
116
- if ( LineTerminator != defaultLineTerminator )
117
- sqlCommandFragment . AppendFormat ( "TERMINATED BY \' {0}\' " , LineTerminator ) ;
118
-
119
- if ( sqlCommandFragment . Length > 0 )
120
- sqlCommandMain . AppendFormat ( "LINES {0}" , sqlCommandFragment . ToString ( ) ) ;
121
-
122
- if ( NumberOfLinesToSkip > 0 )
123
- sqlCommandMain . AppendFormat ( "IGNORE {0} LINES " , NumberOfLinesToSkip ) ;
124
-
125
- if ( Columns . Count > 0 )
126
- {
127
- sqlCommandMain . Append ( "(" ) ;
128
- sqlCommandMain . Append ( Columns [ 0 ] ) ;
129
- for ( int i = 1 ; i < Columns . Count ; i ++ )
130
- sqlCommandMain . AppendFormat ( ",{0}" , Columns [ i ] ) ;
131
- sqlCommandMain . Append ( ") " ) ;
132
- }
133
-
134
- if ( Expressions . Count > 0 )
135
- {
136
- sqlCommandMain . Append ( "SET " ) ;
137
- sqlCommandMain . Append ( Expressions [ 0 ] ) ;
138
- for ( int i = 1 ; i < Expressions . Count ; i ++ )
139
- sqlCommandMain . AppendFormat ( ",{0}" , Expressions [ i ] ) ;
140
- }
141
-
142
- return sqlCommandMain . ToString ( ) ;
143
- }
144
-
145
60
public int Load ( ) => LoadAsync ( IOBehavior . Synchronous , CancellationToken . None ) . GetAwaiter ( ) . GetResult ( ) ;
146
61
147
62
public Task < int > LoadAsync ( ) => LoadAsync ( IOBehavior . Asynchronous , CancellationToken . None ) . AsTask ( ) ;
@@ -180,7 +95,7 @@ internal async ValueTask<int> LoadAsync(IOBehavior ioBehavior, CancellationToken
180
95
s_sources . Add ( FileName , Source ! ) ;
181
96
}
182
97
183
- bool closeConnection = false ;
98
+ var closeConnection = false ;
184
99
if ( Connection . State != ConnectionState . Open )
185
100
{
186
101
closeConnection = true ;
@@ -193,16 +108,14 @@ internal async ValueTask<int> LoadAsync(IOBehavior ioBehavior, CancellationToken
193
108
if ( Local && ! Connection . AllowLoadLocalInfile )
194
109
throw new NotSupportedException ( "To use MySqlBulkLoader.Local=true, set AllowLoadLocalInfile=true in the connection string. See https://fl.vu/mysql-load-data" ) ;
195
110
196
- using ( var cmd = new MySqlCommand ( BuildSqlCommand ( ) , Connection , Connection . CurrentTransaction )
111
+ using var cmd = new MySqlCommand ( CreateSql ( ) , Connection , Connection . CurrentTransaction )
197
112
{
198
113
AllowUserVariables = true ,
199
114
CommandTimeout = Timeout ,
200
- } )
201
- {
202
- var result = await cmd . ExecuteNonQueryAsync ( ioBehavior , cancellationToken ) . ConfigureAwait ( false ) ;
203
- closeStream = false ;
204
- return result ;
205
- }
115
+ } ;
116
+ var result = await cmd . ExecuteNonQueryAsync ( ioBehavior , cancellationToken ) . ConfigureAwait ( false ) ;
117
+ closeStream = false ;
118
+ return result ;
206
119
}
207
120
finally
208
121
{
@@ -214,6 +127,62 @@ internal async ValueTask<int> LoadAsync(IOBehavior ioBehavior, CancellationToken
214
127
}
215
128
}
216
129
130
+ internal const string SourcePrefix = ":SOURCE:" ;
131
+
132
+ internal object ? Source { get ; set ; }
133
+
134
+ private string CreateSql ( )
135
+ {
136
+ var sb = new StringBuilder ( "LOAD DATA " ) ;
137
+
138
+ sb . Append ( Priority switch
139
+ {
140
+ MySqlBulkLoaderPriority . Low => "LOW_PRIORITY " ,
141
+ MySqlBulkLoaderPriority . Concurrent => "LOCAL " ,
142
+ _ => "" ,
143
+ } ) ;
144
+
145
+ if ( Local )
146
+ sb . Append ( "LOCAL " ) ;
147
+
148
+ sb . AppendFormat ( CultureInfo . InvariantCulture , "INFILE '{0}' " , MySqlHelper . EscapeString ( FileName ! ) ) ;
149
+
150
+ sb . Append ( ConflictOption switch
151
+ {
152
+ MySqlBulkLoaderConflictOption . Replace => "REPLACE " ,
153
+ MySqlBulkLoaderConflictOption . Ignore => "IGNORE " ,
154
+ _ => "" ,
155
+ } ) ;
156
+
157
+ sb . AppendFormat ( CultureInfo . InvariantCulture , "INTO TABLE {0} " , TableName ) ;
158
+
159
+ if ( CharacterSet is object )
160
+ sb . AppendFormat ( CultureInfo . InvariantCulture , "CHARACTER SET {0} " , CharacterSet ) ;
161
+
162
+ var fieldsTerminatedBy = FieldTerminator is null ? "" : "TERMINATED BY '{0}' " . FormatInvariant ( MySqlHelper . EscapeString ( FieldTerminator ) ) ;
163
+ var fieldsEnclosedBy = FieldQuotationCharacter == default ? "" : "{0}ENCLOSED BY '{1}' " . FormatInvariant ( FieldQuotationOptional ? "OPTIONALLY " : "" , MySqlHelper . EscapeString ( FieldQuotationCharacter . ToString ( ) ) ) ;
164
+ var fieldsEscapedBy = EscapeCharacter == default ? "" : "ESCAPED BY '{0}' " . FormatInvariant ( MySqlHelper . EscapeString ( EscapeCharacter . ToString ( ) ) ) ;
165
+ if ( fieldsTerminatedBy . Length + fieldsEnclosedBy . Length + fieldsEscapedBy . Length > 0 )
166
+ sb . AppendFormat ( CultureInfo . InvariantCulture , "FIELDS {0}{1}{2}" , fieldsTerminatedBy , fieldsEnclosedBy , fieldsEscapedBy ) ;
167
+
168
+ var linesTerminatedBy = LineTerminator is null ? "" : "TERMINATED BY '{0}' " . FormatInvariant ( MySqlHelper . EscapeString ( LineTerminator ) ) ;
169
+ var linesStartingBy = LinePrefix is null ? "" : "STARTING BY '{0}' " . FormatInvariant ( MySqlHelper . EscapeString ( LinePrefix ) ) ;
170
+ if ( linesTerminatedBy . Length + linesStartingBy . Length > 0 )
171
+ sb . AppendFormat ( CultureInfo . InvariantCulture , "LINES {0}{1}" , linesTerminatedBy , linesStartingBy ) ;
172
+
173
+ sb . AppendFormat ( CultureInfo . InvariantCulture , "IGNORE {0} LINES " , NumberOfLinesToSkip ) ;
174
+
175
+ if ( Columns . Count > 0 )
176
+ sb . AppendFormat ( CultureInfo . InvariantCulture , "({0}) " , string . Join ( "," , Columns ) ) ;
177
+
178
+ if ( Expressions . Count > 0 )
179
+ sb . AppendFormat ( "SET {0}" , string . Join ( "," , Expressions ) ) ;
180
+
181
+ sb . Append ( ';' ) ;
182
+
183
+ return sb . ToString ( ) ;
184
+ }
185
+
217
186
private Stream CreateFileStream ( string fileName )
218
187
{
219
188
try
@@ -226,12 +195,6 @@ private Stream CreateFileStream(string fileName)
226
195
}
227
196
}
228
197
229
- private static string GenerateSourceFileName ( ) => SourcePrefix + Guid . NewGuid ( ) . ToString ( "N" ) ;
230
-
231
- internal const string SourcePrefix = ":SOURCE:" ;
232
-
233
- internal object ? Source { get ; set ; }
234
-
235
198
internal static object GetAndRemoveSource ( string sourceKey )
236
199
{
237
200
lock ( s_lock )
@@ -255,5 +218,10 @@ internal static bool TryGetAndRemoveSource(string sourceKey, [NotNullWhen(true)]
255
218
256
219
return false ;
257
220
}
221
+
222
+ private static string GenerateSourceFileName ( ) => SourcePrefix + Guid . NewGuid ( ) . ToString ( "N" ) ;
223
+
224
+ static readonly object s_lock = new object ( ) ;
225
+ static readonly Dictionary < string , object > s_sources = new Dictionary < string , object > ( ) ;
258
226
}
259
227
}
0 commit comments