@@ -220,11 +220,12 @@ def _get_query_result(self, jobs, query):
220
220
job_data = self ._get_job_data (query )
221
221
insert_response = jobs .insert (projectId = project_id , body = job_data ).execute ()
222
222
self .current_job_id = insert_response ["jobReference" ]["jobId" ]
223
+ self .current_job_location = insert_response ["jobReference" ]["location" ]
223
224
current_row = 0
224
225
query_reply = _get_query_results (
225
226
jobs ,
226
227
project_id = project_id ,
227
- location = self ._get_location () ,
228
+ location = self .current_job_location ,
228
229
job_id = self .current_job_id ,
229
230
start_index = current_row ,
230
231
)
@@ -241,13 +242,11 @@ def _get_query_result(self, jobs, query):
241
242
242
243
query_result_request = {
243
244
"projectId" : project_id ,
244
- "jobId" : query_reply [ "jobReference" ][ "jobId" ] ,
245
+ "jobId" : self . current_job_id ,
245
246
"startIndex" : current_row ,
247
+ "location" : self .current_job_location ,
246
248
}
247
249
248
- if self ._get_location ():
249
- query_result_request ["location" ] = self ._get_location ()
250
-
251
250
query_reply = jobs .getQueryResults (** query_result_request ).execute ()
252
251
253
252
columns = [
@@ -314,33 +313,41 @@ def get_schema(self, get_stats=False):
314
313
WHERE table_schema NOT IN ('information_schema')
315
314
"""
316
315
316
+ location_dataset_ids = {}
317
317
schema = {}
318
- queries = []
319
318
for dataset in datasets :
320
319
dataset_id = dataset ["datasetReference" ]["datasetId" ]
321
320
location = dataset ["location" ]
322
321
if self ._get_location () and location != self ._get_location ():
323
322
logger .debug ("dataset location is different: %s" , location )
324
323
continue
325
- query = query_base .format (dataset_id = dataset_id )
326
- queries .append (query )
327
-
328
- query = "\n UNION ALL\n " .join (queries )
329
- results , error = self .run_query (query , None )
330
- if error is not None :
331
- self ._handle_run_query_error (error )
332
-
333
- for row in results ["rows" ]:
334
- table_name = "{0}.{1}" .format (row ["table_schema" ], row ["table_name" ])
335
- if table_name not in schema :
336
- schema [table_name ] = {"name" : table_name , "columns" : []}
337
- schema [table_name ]["columns" ].append (
338
- {
339
- "name" : row ["field_path" ],
340
- "type" : row ["data_type" ],
341
- "description" : row ["description" ],
342
- }
343
- )
324
+
325
+ if location not in location_dataset_ids :
326
+ location_dataset_ids [location ] = []
327
+ location_dataset_ids [location ].append (dataset_id )
328
+
329
+ for location , datasets in location_dataset_ids .items ():
330
+ queries = []
331
+ for dataset_id in datasets :
332
+ query = query_base .format (dataset_id = dataset_id )
333
+ queries .append (query )
334
+
335
+ query = "\n UNION ALL\n " .join (queries )
336
+ results , error = self .run_query (query , None )
337
+ if error is not None :
338
+ self ._handle_run_query_error (error )
339
+
340
+ for row in results ["rows" ]:
341
+ table_name = "{0}.{1}" .format (row ["table_schema" ], row ["table_name" ])
342
+ if table_name not in schema :
343
+ schema [table_name ] = {"name" : table_name , "columns" : []}
344
+ schema [table_name ]["columns" ].append (
345
+ {
346
+ "name" : row ["field_path" ],
347
+ "type" : row ["data_type" ],
348
+ "description" : row ["description" ],
349
+ }
350
+ )
344
351
345
352
return list (schema .values ())
346
353
@@ -374,7 +381,7 @@ def run_query(self, query, user):
374
381
self ._get_bigquery_service ().jobs ().cancel (
375
382
projectId = self ._get_project_id (),
376
383
jobId = self .current_job_id ,
377
- location = self ._get_location () ,
384
+ location = self .current_job_location ,
378
385
).execute ()
379
386
380
387
raise
0 commit comments