2222import com .google .api .services .bigquery .model .JobReference ;
2323import com .google .api .services .bigquery .model .Table ;
2424import com .google .api .services .bigquery .model .TableReference ;
25+ import com .google .cloud .bigquery .RangePartitioning ;
2526import com .google .cloud .bigquery .StandardTableDefinition ;
2627import com .google .cloud .bigquery .TableDefinition .Type ;
2728import com .google .cloud .bigquery .TimePartitioning ;
4950import org .apache .hadoop .mapreduce .RecordReader ;
5051import org .apache .hadoop .mapreduce .lib .input .FileSplit ;
5152import org .apache .hadoop .util .Progressable ;
53+ import org .slf4j .Logger ;
54+ import org .slf4j .LoggerFactory ;
5255
5356import java .io .IOException ;
5457import java .security .GeneralSecurityException ;
6568 */
6669public class PartitionedBigQueryInputFormat extends AbstractBigQueryInputFormat <LongWritable , GenericData .Record > {
6770
71+ private static final Logger LOG = LoggerFactory .getLogger (PartitionedBigQueryInputFormat .class );
6872 private InputFormat <LongWritable , GenericData .Record > delegateInputFormat =
6973 new AvroBigQueryInputFormatWithScopes ();
7074
@@ -128,17 +132,24 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc
128132 String partitionFromDate = configuration .get (BigQueryConstants .CONFIG_PARTITION_FROM_DATE , null );
129133 String partitionToDate = configuration .get (BigQueryConstants .CONFIG_PARTITION_TO_DATE , null );
130134 String filter = configuration .get (BigQueryConstants .CONFIG_FILTER , null );
135+ String limit = configuration .get (BigQueryConstants .CONFIG_LIMIT , null );
136+ String orderBy = configuration .get (BigQueryConstants .CONFIG_ORDER_BY , null );
131137
132138 com .google .cloud .bigquery .Table bigQueryTable = BigQueryUtil .getBigQueryTable (
133- datasetProjectId , datasetId , tableName , serviceAccount , isServiceAccountFilePath );
139+ datasetProjectId , datasetId , tableName , serviceAccount , isServiceAccountFilePath , null );
134140 Type type = Objects .requireNonNull (bigQueryTable ).getDefinition ().getType ();
141+ Boolean isPartitionFilterRequired = bigQueryTable .getRequirePartitionFilter ();
142+ StandardTableDefinition tableDefinition = Objects .requireNonNull (bigQueryTable ).getDefinition ();
135143
136144 String query ;
137145 if (type == Type .VIEW || type == Type .MATERIALIZED_VIEW || type == Type .EXTERNAL ) {
138- query = generateQueryForMaterializingView (datasetProjectId , datasetId , tableName , filter );
146+ query = generateQueryForMaterializingView (datasetProjectId , datasetId , tableName , filter ,
147+ limit , orderBy );
139148 } else {
140- query = generateQuery (partitionFromDate , partitionToDate , filter , projectId , datasetProjectId , datasetId ,
141- tableName , serviceAccount , isServiceAccountFilePath );
149+ query = generateQuery (partitionFromDate , partitionToDate , filter , datasetProjectId ,
150+ datasetId ,
151+ tableName , limit , orderBy ,
152+ isPartitionFilterRequired , tableDefinition );
142153 }
143154
144155 if (query != null ) {
@@ -160,30 +171,41 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc
160171 }
161172
162173 @ VisibleForTesting
163- String generateQuery (String partitionFromDate , String partitionToDate , String filter , String project ,
164- String datasetProject , String dataset , String table , @ Nullable String serviceAccount ,
165- @ Nullable Boolean isServiceAccountFilePath ) {
166- if (partitionFromDate == null && partitionToDate == null && filter == null ) {
174+ String generateQuery (String partitionFromDate , String partitionToDate , String filter ,
175+ String datasetProject , String dataset , String table , String limit , String orderBy ,
176+ Boolean isPartitionFilterRequired , StandardTableDefinition tableDefinition ) {
177+
178+ if (Strings .isNullOrEmpty (filter ) && Strings .isNullOrEmpty (orderBy ) && Strings .isNullOrEmpty (
179+ limit )
180+ && Strings .isNullOrEmpty (partitionFromDate ) && Strings .isNullOrEmpty (partitionToDate )) {
167181 return null ;
168182 }
169- String queryTemplate = "select * from `%s` where %s" ;
170- com .google .cloud .bigquery .Table sourceTable = BigQueryUtil .getBigQueryTable (datasetProject , dataset , table ,
171- serviceAccount ,
172- isServiceAccountFilePath );
173- StandardTableDefinition tableDefinition = Objects .requireNonNull (sourceTable ).getDefinition ();
183+
184+ RangePartitioning rangePartitioning = tableDefinition .getRangePartitioning ();
174185 TimePartitioning timePartitioning = tableDefinition .getTimePartitioning ();
175- if (timePartitioning == null && filter == null ) {
176- return null ;
177- }
178186 StringBuilder condition = new StringBuilder ();
187+ String partitionCondition = null ;
179188
180189 if (timePartitioning != null ) {
181- String timePartitionCondition = BigQueryUtil .generateTimePartitionCondition (tableDefinition , partitionFromDate ,
182- partitionToDate );
183- condition .append (timePartitionCondition );
190+ if (partitionFromDate == null && partitionToDate == null
191+ && Objects .equals (isPartitionFilterRequired , Boolean .TRUE )) {
192+ partitionCondition = BigQueryUtil .generateDefaultTimePartitionCondition (tableDefinition );
193+ } else if (partitionFromDate != null || partitionToDate != null ) {
194+ partitionCondition =
195+ BigQueryUtil .generateTimePartitionCondition (tableDefinition , partitionFromDate ,
196+ partitionToDate );
197+ }
198+ } else if (rangePartitioning != null && Objects .equals (isPartitionFilterRequired ,
199+ Boolean .TRUE )) {
200+ partitionCondition = BigQueryUtil .generateDefaultRangePartitionCondition (
201+ tableDefinition );
184202 }
185203
186- if (filter != null ) {
204+ if (!Strings .isNullOrEmpty (partitionCondition )) {
205+ condition .append ("(" ).append (partitionCondition ).append (")" );
206+ }
207+
208+ if (!Strings .isNullOrEmpty (filter )) {
187209 if (condition .length () == 0 ) {
188210 condition .append (filter );
189211 } else {
@@ -192,20 +214,42 @@ String generateQuery(String partitionFromDate, String partitionToDate, String fi
192214 }
193215
194216 String tableName = datasetProject + "." + dataset + "." + table ;
195- return String .format (queryTemplate , tableName , condition .toString ());
217+ StringBuilder query = new StringBuilder ("select * from " ).append (tableName );
218+
219+ if (condition .length () > 0 ) {
220+ query .append (" where " ).append (condition );
221+ }
222+
223+ if (!Strings .isNullOrEmpty (orderBy )) {
224+ query .append (" order by " ).append (orderBy );
225+ }
226+
227+ if (!Strings .isNullOrEmpty (limit )) {
228+ query .append (" limit " ).append (limit );
229+ }
230+
231+ LOG .debug ("Generated BigQuery query for job: {}" , query );
232+ return query .toString ();
196233 }
197234
198235 @ VisibleForTesting
199- String generateQueryForMaterializingView (String datasetProject , String dataset , String table , String filter ) {
200- String queryTemplate = "select * from `%s`%s" ;
201- StringBuilder condition = new StringBuilder ( );
202-
236+ String generateQueryForMaterializingView (String datasetProject , String dataset , String table ,
237+ String filter , String limit , String orderBy ) {
238+ String tableName = String . format ( "`%s.%s.%s`" , datasetProject , dataset , table );
239+ StringBuilder query = new StringBuilder ( "select * from " ). append ( tableName );
203240 if (!Strings .isNullOrEmpty (filter )) {
204- condition .append (String . format ( " where %s" , filter ) );
241+ query .append (" where " ). append ( filter );
205242 }
206243
207- String tableName = datasetProject + "." + dataset + "." + table ;
208- return String .format (queryTemplate , tableName , condition .toString ());
244+ if (!Strings .isNullOrEmpty (orderBy )) {
245+ query .append (" order by " ).append (orderBy );
246+ }
247+
248+ if (!Strings .isNullOrEmpty (limit )) {
249+ query .append (" limit " ).append (limit );
250+ }
251+
252+ return query .toString ();
209253 }
210254
211255 /**
0 commit comments