2020import com .google .api .services .bigquery .model .JobConfiguration ;
2121import com .google .api .services .bigquery .model .JobConfigurationQuery ;
2222import com .google .api .services .bigquery .model .JobReference ;
23+ import com .google .api .services .bigquery .model .QueryParameter ;
24+ import com .google .api .services .bigquery .model .QueryParameterType ;
25+ import com .google .api .services .bigquery .model .QueryParameterValue ;
2326import com .google .api .services .bigquery .model .Table ;
2427import com .google .api .services .bigquery .model .TableReference ;
28+ import com .google .cloud .bigquery .FieldList ;
2529import com .google .cloud .bigquery .StandardTableDefinition ;
2630import com .google .cloud .bigquery .TableDefinition .Type ;
2731import com .google .cloud .bigquery .TimePartitioning ;
4044import io .cdap .cdap .api .exception .ErrorCategory ;
4145import io .cdap .cdap .api .exception .ErrorType ;
4246import io .cdap .cdap .api .exception .ErrorUtils ;
47+ import io .cdap .plugin .common .ConfigUtil ;
4348import io .cdap .plugin .gcp .bigquery .util .BigQueryConstants ;
4449import io .cdap .plugin .gcp .bigquery .util .BigQueryUtil ;
4550import io .cdap .plugin .gcp .common .GCPUtils ;
@@ -134,6 +139,7 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc
134139 String filter = configuration .get (BigQueryConstants .CONFIG_FILTER , null );
135140 Integer readTimeout = configuration .getInt (BigQueryConstants .CONFIG_BQ_HTTP_READ_TIMEOUT ,
136141 GCPUtils .BQ_DEFAULT_READ_TIMEOUT_SECONDS );
142+ String parameterMap = configuration .get (BigQueryConstants .CONFIG_FILTER_PARAMETER_MAP , null );
137143
138144 com .google .cloud .bigquery .Table bigQueryTable = BigQueryUtil .getBigQueryTable (
139145 datasetProjectId , datasetId , tableName , serviceAccount , isServiceAccountFilePath , null , readTimeout );
@@ -150,11 +156,16 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc
150156 if (query != null ) {
151157 TableReference sourceTable = new TableReference ().setDatasetId (datasetId ).setProjectId (datasetProjectId )
152158 .setTableId (tableName );
159+ com .google .cloud .bigquery .Table bqTable = BigQueryUtil .getBigQueryTable (datasetProjectId , datasetId , tableName ,
160+ serviceAccount , isServiceAccountFilePath ,
161+ null ,
162+ null );
153163 String location = bigQueryHelper .getTable (sourceTable ).getLocation ();
154164 String temporaryTableName = configuration .get (BigQueryConstants .CONFIG_TEMPORARY_TABLE_NAME );
155165 TableReference exportTableReference = createExportTableReference (type , datasetProjectId , datasetId ,
156166 temporaryTableName , configuration );
157- runQuery (configuration , bigQueryHelper , projectId , exportTableReference , query , location );
167+ runQuery (configuration , bigQueryHelper , projectId , exportTableReference , query , location , bqTable ,
168+ parameterMap );
158169
159170 // Default values come from BigquerySource config, and can be overridden by config.
160171 configuration .set (BigQueryConfiguration .INPUT_PROJECT_ID .getKey (),
@@ -244,14 +255,30 @@ private static void runQuery(Configuration configuration,
244255 String projectId ,
245256 TableReference tableRef ,
246257 String query ,
247- String location )
258+ String location ,
259+ com .google .cloud .bigquery .Table bqTable ,
260+ @ Nullable String parameterMapString )
248261 throws IOException , InterruptedException {
249262
250263 // Create a query statement and query request object.
251264 JobConfigurationQuery queryConfig = new JobConfigurationQuery ();
252265 queryConfig .setAllowLargeResults (true );
253266 queryConfig .setQuery (query );
254267 queryConfig .setUseLegacySql (false );
268+ if (!Strings .isNullOrEmpty (parameterMapString )) {
269+ Map <String , String > parameterMap = ConfigUtil .parseKeyValueConfig (parameterMapString , "," , "=" );
270+ List <QueryParameter > queryParameters = new ArrayList <>();
271+ FieldList fieldList = bqTable .getDefinition ().getSchema ().getFields ();
272+ for (String columnName : parameterMap .keySet ()) {
273+ String parameterType = fieldList .get (columnName ).getType ().name ();
274+ QueryParameter queryParameter = new QueryParameter ().setName (columnName )
275+ .setParameterType (new QueryParameterType ().setType (parameterType ));
276+ QueryParameterValue value = new QueryParameterValue ().setValue (parameterMap .get (columnName ));
277+ queryParameters .add (queryParameter .setParameterValue (value ));
278+ }
279+ queryConfig .setParameterMode ("NAMED" );
280+ queryConfig .setQueryParameters (queryParameters );
281+ }
255282
256283 // Set the table to put results into.
257284 queryConfig .setDestinationTable (tableRef );
0 commit comments