@@ -91,8 +91,8 @@ public void prepareRun(BatchSourceContext context) throws Exception {
9191 uuid = UUID .randomUUID ();
9292 configuration = BigQueryUtils .getBigQueryConfig (config .getServiceAccountFilePath (), config .getProject ());
9393
94- String bucket = config .bucket ;
95- if (config . bucket == null ) {
94+ String bucket = config .getBucket () ;
95+ if (bucket == null ) {
9696 bucket = uuid .toString ();
9797 // By default, this option is false, meaning the job can not delete the bucket. So enable it only when bucket name
9898 // is not provided.
@@ -106,7 +106,8 @@ public void prepareRun(BatchSourceContext context) throws Exception {
106106 String temporaryGcsPath = String .format ("gs://%s/hadoop/input/%s" , bucket , uuid );
107107 AvroBigQueryInputFormat .setTemporaryCloudStorageDirectory (configuration , temporaryGcsPath );
108108 AvroBigQueryInputFormat .setEnableShardedExport (configuration , false );
109- BigQueryConfiguration .configureBigQueryInput (configuration , config .getProject (), config .dataset , config .table );
109+ BigQueryConfiguration .configureBigQueryInput (configuration , config .getDatasetProject (),
110+ config .getDataset (), config .getTable ());
110111
111112 Job job = Job .getInstance (configuration );
112113 job .setOutputKeyClass (LongWritable .class );
@@ -141,7 +142,7 @@ public void transform(KeyValue<LongWritable, GenericData.Record> input, Emitter<
141142 public void onRunFinish (boolean succeeded , BatchSourceContext context ) {
142143 org .apache .hadoop .fs .Path gcsPath = new org .apache .hadoop .fs .Path (String .format ("gs://%s" , uuid .toString ()));
143144 try {
144- if (config .bucket == null ) {
145+ if (config .getBucket () == null ) {
145146 FileSystem fs = gcsPath .getFileSystem (configuration );
146147 if (fs .exists (gcsPath )) {
147148 fs .delete (gcsPath , true );
@@ -163,18 +164,20 @@ public void onRunFinish(boolean succeeded, BatchSourceContext context) {
163164 */
164165 @ Path ("getSchema" )
165166 public Schema getSchema (BigQuerySourceConfig request ) throws Exception {
166- Table table = BigQueryUtils .getBigQueryTable (request .getServiceAccountFilePath (), request .getProject (),
167- request .dataset , request .table );
167+ String dataset = request .getDataset ();
168+ String table = request .getTable ();
169+ String project = request .getDatasetProject ();
170+ Table bqTable = BigQueryUtils .getBigQueryTable (request .getServiceAccountFilePath (), project , dataset , table );
168171 if (table == null ) {
169172 // Table does not exist
170- throw new IllegalArgumentException (String .format ("BigQuery table '%s.%s' does not exist" ,
171- request . dataset , request . table ));
173+ throw new IllegalArgumentException (String .format ("BigQuery table '%s:%s .%s' does not exist" ,
174+ project , dataset , table ));
172175 }
173176
174- com .google .cloud .bigquery .Schema bgSchema = table .getDefinition ().getSchema ();
177+ com .google .cloud .bigquery .Schema bgSchema = bqTable .getDefinition ().getSchema ();
175178 if (bgSchema == null ) {
176- throw new IllegalArgumentException (String .format ("Cannot read from table '%s.%s' because it has no schema." ,
177- request . dataset , request . table ));
179+ throw new IllegalArgumentException (String .format ("Cannot read from table '%s:%s .%s' because it has no schema." ,
180+ project , dataset , table ));
178181 }
179182 List <Schema .Field > fields = getSchemaFields (bgSchema );
180183 return Schema .recordOf ("output" , fields );
@@ -185,32 +188,34 @@ public Schema getSchema(BigQuerySourceConfig request) throws Exception {
185188 * {@link #getSchema(BigQuerySourceConfig)} method.
186189 */
187190 private void validateOutputSchema () throws IOException {
188- Table table = BigQueryUtils .getBigQueryTable (config .getServiceAccountFilePath (), config .getProject (),
189- config .dataset , config .table );
191+ String dataset = config .getDataset ();
192+ String tableName = config .getTable ();
193+ String project = config .getDatasetProject ();
194+ Table table = BigQueryUtils .getBigQueryTable (config .getServiceAccountFilePath (), project , dataset , tableName );
190195 if (table == null ) {
191196 // Table does not exist
192- throw new IllegalArgumentException (String .format ("BigQuery table '%s.%s' does not exist." , config . dataset ,
193- config . table ));
197+ throw new IllegalArgumentException (String .format ("BigQuery table '%s:%s .%s' does not exist." ,
198+ project , dataset , table ));
194199 }
195200
196201 com .google .cloud .bigquery .Schema bgSchema = table .getDefinition ().getSchema ();
197202 if (bgSchema == null ) {
198- throw new IllegalArgumentException (String .format ("Cannot read from table '%s.%s' because it has no schema." ,
199- config . dataset , config . table ));
203+ throw new IllegalArgumentException (String .format ("Cannot read from table '%s:%s .%s' because it has no schema." ,
204+ project , dataset , table ));
200205 }
201206
202207 // Output schema should not have more fields than BigQuery table
203208 List <String > diff = BigQueryUtils .getSchemaMinusBqFields (config .getSchema ().getFields (), bgSchema .getFields ());
204209 if (!diff .isEmpty ()) {
205210 throw new IllegalArgumentException (String .format ("Output schema has field(s) '%s' which are not present in table"
206- + " '%s.%s' schema." , diff , config . dataset , config . table ));
211+ + " '%s:%s .%s' schema." , diff , project , dataset , table ));
207212 }
208213
209214 FieldList fields = bgSchema .getFields ();
210215 // Match output schema field type with bigquery column type
211216 for (Schema .Field field : config .getSchema ().getFields ()) {
212217 validateSimpleTypes (field );
213- BigQueryUtils .validateFieldSchemaMatches (fields .get (field .getName ()), field , config . dataset , config . table );
218+ BigQueryUtils .validateFieldSchemaMatches (fields .get (field .getName ()), field , dataset , tableName );
214219 }
215220 }
216221
0 commit comments