Skip to content

BigQuery: make jobs awaitable #18

@dkapitan

Description

@dkapitan

I know BigQuery jobs are asynchronous by default. However, I am struggling to make my datapipeline async end-to-end.

Looking at this JS example, I thought it would be the most Pythonic to make a BigQuery job awaitable. However, I can't get that to work in Python i.e. errors when await client.query(query). Looking at the source code, I don't see which method returns an awaitable object.

I have little experience in writing async Python code and found this example that wraps jobs in a async def coroutine.

class BQApi(object):                                                                                                 
    def __init__(self):                                                                                              
        self.api = bigquery.Client.from_service_account_json(BQ_CONFIG["credentials"])                               

    async def exec_query(self, query, **kwargs) -> bigquery.table.RowIterator:                                       
        job = self.api.query(query, **kwargs)                                                                        
        task = asyncio.create_task(self.coroutine_job(job))                                                          
        return await task                                                                                            

    @staticmethod                                                                                                    
    async def coroutine_job(job):                                                                                    
        return job.result()   

The google.api_core.operation.Operation shows how to use add_done_callback to asynchronously wait for long-running operations. I have tried that, but the following yields AttributeError: 'QueryJob' object has no attribute '_condition' :

from concurrent.futures import ThreadPoolExecutor, as_completed
query1 = 'SELECT 1'
query2 = 'SELECT 2'

def my_callback(future):
    result = future.result()

operations = [bq.query(query1), bq.query(query2)]
[operation.add_done_callback(my_callback) for operation in operations]
results2 = []
for future in as_completed(operations):
  results2.append(list(future.result()))

Given that jobs are already asynchronous, would it make sense to add a method that returns an awaitable?

Or am I missing something and is there an Pythonic way to use the BigQuery client with the async/await pattern?

Metadata

Metadata

Assignees

Labels

Python 3 OnlyThis work would involve supporting Python 3 only code paths.api: bigqueryIssues related to the googleapis/python-bigquery API.type: feature request‘Nice-to-have’ improvement, new feature or different behavior or design.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions