Skip to content

Commit ddf3ae1

Browse files
Support for BigQuery DateTime format
1 parent d1166a4 commit ddf3ae1

File tree

1 file changed

+50
-4
lines changed

1 file changed

+50
-4
lines changed

src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@
6060

6161
import java.io.IOException;
6262
import java.security.GeneralSecurityException;
63+
import java.time.Instant;
64+
import java.time.LocalDate;
65+
import java.time.LocalDateTime;
66+
import java.time.LocalTime;
67+
import java.time.format.DateTimeFormatter;
6368
import java.util.ArrayList;
6469
import java.util.List;
6570
import java.util.Map;
@@ -271,10 +276,15 @@ private static void runQuery(Configuration configuration,
271276
FieldList fieldList = bqTable.getDefinition().getSchema().getFields();
272277
for (String columnName : parameterMap.keySet()) {
273278
String parameterType = fieldList.get(columnName).getType().name();
274-
QueryParameter queryParameter = new QueryParameter().setName(columnName)
275-
.setParameterType(new QueryParameterType().setType(parameterType));
276-
QueryParameterValue value = new QueryParameterValue().setValue(parameterMap.get(columnName));
277-
queryParameters.add(queryParameter.setParameterValue(value));
279+
String rawValue = parameterMap.get(columnName);
280+
String normalizedValue = normalizeValueForBigQuery(parameterType, rawValue);
281+
282+
QueryParameter queryParameter = new QueryParameter()
283+
.setName(columnName)
284+
.setParameterType(new QueryParameterType().setType(parameterType))
285+
.setParameterValue(new QueryParameterValue().setValue(normalizedValue));
286+
287+
queryParameters.add(queryParameter);
278288
}
279289
queryConfig.setParameterMode("NAMED");
280290
queryConfig.setQueryParameters(queryParameters);
@@ -320,6 +330,42 @@ public void progress() {
320330
}
321331
}
322332

333+
private static String normalizeValueForBigQuery(String parameterType, String rawValue) {
334+
try {
335+
switch (parameterType) {
336+
case "DATE":
337+
LocalDate date = LocalDate.parse(rawValue,
338+
DateTimeFormatter.ofPattern("[yyyy-MM-dd][MM/dd/yyyy][dd-MM-yyyy]"));
339+
return date.toString();
340+
341+
case "DATETIME":
342+
LocalDateTime datetime = LocalDateTime.parse(rawValue,
343+
DateTimeFormatter.ofPattern("[yyyy-MM-dd HH:mm:ss][yyyy-MM-dd'T'HH:mm:ss]"));
344+
return datetime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
345+
346+
case "TIME":
347+
LocalTime time = LocalTime.parse(rawValue,
348+
DateTimeFormatter.ofPattern("[HH:mm:ss][HH:mm:ss.SSSSSS]"));
349+
return time.toString();
350+
351+
case "TIMESTAMP":
352+
if (rawValue.matches("\\d+")) {
353+
Instant instant = Instant.ofEpochMilli(Long.parseLong(rawValue));
354+
return instant.toString();
355+
}
356+
Instant instant = Instant.parse(rawValue.replace(" ", "T").replace("UTC", "Z"));
357+
return instant.toString();
358+
359+
default:
360+
return rawValue;
361+
}
362+
} catch (Exception e) {
363+
throw new IllegalArgumentException(
364+
String.format("Invalid value '%s' for BigQuery type %s", rawValue, parameterType), e);
365+
}
366+
}
367+
368+
323369
/**
324370
* Gets the Job Reference for the BQ job to execute.
325371
*

0 commit comments

Comments
 (0)