|
58 | 58 | import org.apache.hadoop.mapreduce.lib.input.FileSplit; |
59 | 59 | import org.apache.hadoop.util.Progressable; |
60 | 60 |
|
| 61 | + |
61 | 62 | import java.io.IOException; |
62 | 63 | import java.security.GeneralSecurityException; |
63 | 64 | import java.time.Instant; |
|
66 | 67 | import java.time.LocalTime; |
67 | 68 | import java.time.format.DateTimeFormatter; |
68 | 69 | import java.util.ArrayList; |
| 70 | +import java.util.LinkedHashMap; |
69 | 71 | import java.util.List; |
70 | 72 | import java.util.Map; |
71 | 73 | import java.util.Objects; |
@@ -271,16 +273,30 @@ private static void runQuery(Configuration configuration, |
271 | 273 | queryConfig.setQuery(query); |
272 | 274 | queryConfig.setUseLegacySql(false); |
273 | 275 | if (!Strings.isNullOrEmpty(parameterMapString)) { |
274 | | - Map<String, String> parameterMap = ConfigUtil.parseKeyValueConfig(parameterMapString, ",", "="); |
| 276 | + Map<String, String> parameterMap = new LinkedHashMap<>(); |
| 277 | + for (String entry : parameterMapString.split(",")) { |
| 278 | + int idx = entry.indexOf('='); |
| 279 | + if (idx < 0) { |
| 280 | + throw new IllegalArgumentException("Invalid entry: " + entry + ". Expected format alias=column=value"); |
| 281 | + } |
| 282 | + String alias = entry.substring(0, idx); |
| 283 | + String colAndVal = entry.substring(idx + 1); |
| 284 | + parameterMap.put(alias, colAndVal); |
| 285 | + } |
| 286 | + |
275 | 287 | List<QueryParameter> queryParameters = new ArrayList<>(); |
276 | 288 | FieldList fieldList = bqTable.getDefinition().getSchema().getFields(); |
277 | 289 | for (String alias : parameterMap.keySet()) { |
278 | 290 | String raw = parameterMap.get(alias); // "sys_updated_on=2018-12-11T23" |
279 | | - String[] parts = raw.split("=", 2); // [ "sys_updated_on", "2018-12-11T23" ] |
| 291 | + String[] parts = raw.split("=", 2); |
| 292 | + if (parts.length < 2) { |
| 293 | + throw new IllegalArgumentException( |
| 294 | + String.format("Invalid parameter entry for alias '%s': '%s'. Expected format 'column=value'.", |
| 295 | + alias, raw)); |
| 296 | + } |
280 | 297 |
|
281 | 298 | String columnName = parts[0]; |
282 | 299 | String rawValue = parts[1]; |
283 | | - |
284 | 300 | String parameterType = fieldList.get(columnName).getType().name(); |
285 | 301 | String normalizedValue = normalizeValueForBigQuery(parameterType, rawValue); |
286 | 302 |
|
@@ -341,12 +357,19 @@ private static String normalizeValueForBigQuery(String parameterType, String raw |
341 | 357 | switch (parameterType) { |
342 | 358 | case "DATE": |
343 | 359 | LocalDate date = LocalDate.parse(rawValue, |
344 | | - DateTimeFormatter.ofPattern("[yyyy-MM-dd][MM/dd/yyyy][dd-MM-yyyy]")); |
| 360 | + DateTimeFormatter.ofPattern("[yyyy-MM-dd][MM/dd/yyyy][dd-MM-yyyy][yyyy/MM/dd]")); |
345 | 361 | return date.toString(); |
346 | 362 |
|
347 | 363 | case "DATETIME": |
348 | 364 | LocalDateTime datetime = LocalDateTime.parse(rawValue, |
349 | | - DateTimeFormatter.ofPattern("[yyyy-MM-dd HH:mm:ss][yyyy-MM-dd'T'HH:mm:ss]")); |
| 365 | + DateTimeFormatter.ofPattern( |
| 366 | + "[yyyy-MM-dd HH:mm:ss][yyyy-MM-dd'T'HH:mm:ss]" + |
| 367 | + "[yyyy/MM/dd HH:mm:ss][yyyy/MM/dd]" + |
| 368 | + "[MM/dd/yyyy HH:mm:ss][MM/dd/yyyy]" + |
| 369 | + "[dd-MM-yyyy HH:mm:ss][dd-MM-yyyy]")); |
| 370 | + if (datetime.toLocalTime().equals(LocalTime.MIDNIGHT) && rawValue.length() <= 10) { |
| 371 | + datetime = datetime.withHour(0).withMinute(0).withSecond(0); |
| 372 | + } |
350 | 373 | return datetime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); |
351 | 374 |
|
352 | 375 | case "TIME": |
|
0 commit comments