2222import com .google .api .services .bigquery .model .JobReference ;
2323import com .google .api .services .bigquery .model .Table ;
2424import com .google .api .services .bigquery .model .TableReference ;
25+ import com .google .cloud .bigquery .RangePartitioning ;
2526import com .google .cloud .bigquery .StandardTableDefinition ;
2627import com .google .cloud .bigquery .TableDefinition .Type ;
2728import com .google .cloud .bigquery .TimePartitioning ;
5253import org .apache .hadoop .mapreduce .RecordReader ;
5354import org .apache .hadoop .mapreduce .lib .input .FileSplit ;
5455import org .apache .hadoop .util .Progressable ;
56+ import org .slf4j .Logger ;
57+ import org .slf4j .LoggerFactory ;
5558
5659import java .io .IOException ;
5760import java .security .GeneralSecurityException ;
6871 */
6972public class PartitionedBigQueryInputFormat extends AbstractBigQueryInputFormat <LongWritable , GenericData .Record > {
7073
74+ private static final Logger LOG = LoggerFactory .getLogger (PartitionedBigQueryInputFormat .class );
7175 private InputFormat <LongWritable , GenericData .Record > delegateInputFormat =
7276 new AvroBigQueryInputFormatWithScopes ();
7377
@@ -132,19 +136,27 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc
132136 String partitionFromDate = configuration .get (BigQueryConstants .CONFIG_PARTITION_FROM_DATE , null );
133137 String partitionToDate = configuration .get (BigQueryConstants .CONFIG_PARTITION_TO_DATE , null );
134138 String filter = configuration .get (BigQueryConstants .CONFIG_FILTER , null );
139+ String limit = configuration .get (BigQueryConstants .CONFIG_LIMIT , null );
140+ String orderBy = configuration .get (BigQueryConstants .CONFIG_ORDER_BY , null );
135141 Integer readTimeout = configuration .getInt (BigQueryConstants .CONFIG_BQ_HTTP_READ_TIMEOUT ,
136142 GCPUtils .BQ_DEFAULT_READ_TIMEOUT_SECONDS );
137143
138144 com .google .cloud .bigquery .Table bigQueryTable = BigQueryUtil .getBigQueryTable (
139- datasetProjectId , datasetId , tableName , serviceAccount , isServiceAccountFilePath , null , readTimeout );
145+ datasetProjectId , datasetId , tableName , serviceAccount , isServiceAccountFilePath , null ,
146+ readTimeout );
140147 Type type = Objects .requireNonNull (bigQueryTable ).getDefinition ().getType ();
148+ Boolean isPartitionFilterRequired = bigQueryTable .getRequirePartitionFilter ();
149+ StandardTableDefinition tableDefinition = Objects .requireNonNull (bigQueryTable ).getDefinition ();
141150
142151 String query ;
143152 if (type == Type .VIEW || type == Type .MATERIALIZED_VIEW || type == Type .EXTERNAL ) {
144- query = generateQueryForMaterializingView (datasetProjectId , datasetId , tableName , filter );
153+ query = generateQueryForMaterializingView (datasetProjectId , datasetId , tableName , filter ,
154+ limit , orderBy );
145155 } else {
146- query = generateQuery (partitionFromDate , partitionToDate , filter , projectId , datasetProjectId , datasetId ,
147- tableName , serviceAccount , isServiceAccountFilePath );
156+ query = generateQuery (partitionFromDate , partitionToDate , filter , datasetProjectId ,
157+ datasetId ,
158+ tableName , limit , orderBy ,
159+ isPartitionFilterRequired , tableDefinition );
148160 }
149161
150162 if (query != null ) {
@@ -166,30 +178,35 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc
166178 }
167179
168180 @ VisibleForTesting
169- String generateQuery (String partitionFromDate , String partitionToDate , String filter , String project ,
170- String datasetProject , String dataset , String table , @ Nullable String serviceAccount ,
171- @ Nullable Boolean isServiceAccountFilePath ) {
172- if (partitionFromDate == null && partitionToDate == null && filter == null ) {
173- return null ;
174- }
175- String queryTemplate = "select * from `%s` where %s" ;
176- com .google .cloud .bigquery .Table sourceTable =
177- BigQueryUtil .getBigQueryTable (datasetProject , dataset , table , serviceAccount , isServiceAccountFilePath , null ,
178- null );
179- StandardTableDefinition tableDefinition = Objects .requireNonNull (sourceTable ).getDefinition ();
181+ String generateQuery (String partitionFromDate , String partitionToDate , String filter ,
182+ String datasetProject , String dataset , String table , String limit , String orderBy ,
183+ Boolean isPartitionFilterRequired , StandardTableDefinition tableDefinition ) {
184+
185+ RangePartitioning rangePartitioning = tableDefinition .getRangePartitioning ();
180186 TimePartitioning timePartitioning = tableDefinition .getTimePartitioning ();
181- if (timePartitioning == null && filter == null ) {
182- return null ;
183- }
184187 StringBuilder condition = new StringBuilder ();
188+ String partitionCondition = null ;
185189
186190 if (timePartitioning != null ) {
187- String timePartitionCondition = BigQueryUtil .generateTimePartitionCondition (tableDefinition , partitionFromDate ,
188- partitionToDate );
189- condition .append (timePartitionCondition );
191+ if (partitionFromDate == null && partitionToDate == null
192+ && Objects .equals (isPartitionFilterRequired , Boolean .TRUE )) {
193+ partitionCondition = BigQueryUtil .generateDefaultTimePartitionCondition (tableDefinition );
194+ } else if (partitionFromDate != null || partitionToDate != null ) {
195+ partitionCondition =
196+ BigQueryUtil .generateTimePartitionCondition (tableDefinition , partitionFromDate ,
197+ partitionToDate );
198+ }
199+ } else if (rangePartitioning != null && Objects .equals (isPartitionFilterRequired ,
200+ Boolean .TRUE )) {
201+ partitionCondition = BigQueryUtil .generateDefaultRangePartitionCondition (
202+ tableDefinition );
203+ }
204+
205+ if (!Strings .isNullOrEmpty (partitionCondition )) {
206+ condition .append ("(" ).append (partitionCondition ).append (")" );
190207 }
191208
192- if (filter != null ) {
209+ if (! Strings . isNullOrEmpty ( filter ) ) {
193210 if (condition .length () == 0 ) {
194211 condition .append (filter );
195212 } else {
@@ -198,20 +215,42 @@ String generateQuery(String partitionFromDate, String partitionToDate, String fi
198215 }
199216
200217 String tableName = datasetProject + "." + dataset + "." + table ;
201- return String .format (queryTemplate , tableName , condition .toString ());
218+ StringBuilder query = new StringBuilder ("select * from " ).append (tableName );
219+
220+ if (condition .length () > 0 ) {
221+ query .append (" where " ).append (condition );
222+ }
223+
224+ if (!Strings .isNullOrEmpty (orderBy )) {
225+ query .append (" order by " ).append (orderBy );
226+ }
227+
228+ if (!Strings .isNullOrEmpty (limit )) {
229+ query .append (" limit " ).append (limit );
230+ }
231+
232+ LOG .debug ("Generated BigQuery query for job: {}" , query );
233+ return query .toString ();
202234 }
203235
204236 @ VisibleForTesting
205- String generateQueryForMaterializingView (String datasetProject , String dataset , String table , String filter ) {
206- String queryTemplate = "select * from `%s`%s" ;
207- StringBuilder condition = new StringBuilder ( );
208-
237+ String generateQueryForMaterializingView (String datasetProject , String dataset , String table ,
238+ String filter , String limit , String orderBy ) {
239+ String tableName = String . format ( "`%s.%s.%s`" , datasetProject , dataset , table );
240+ StringBuilder query = new StringBuilder ( "select * from " ). append ( tableName );
209241 if (!Strings .isNullOrEmpty (filter )) {
210- condition .append (String . format ( " where %s" , filter ) );
242+ query .append (" where " ). append ( filter );
211243 }
212244
213- String tableName = datasetProject + "." + dataset + "." + table ;
214- return String .format (queryTemplate , tableName , condition .toString ());
245+ if (!Strings .isNullOrEmpty (orderBy )) {
246+ query .append (" order by " ).append (orderBy );
247+ }
248+
249+ if (!Strings .isNullOrEmpty (limit )) {
250+ query .append (" limit " ).append (limit );
251+ }
252+
253+ return query .toString ();
215254 }
216255
217256 /**
0 commit comments