Skip to content

Commit 12cebba

Browse files
author
David Coe
committed
successfully resetting Entra token during long query
1 parent d79a941 commit 12cebba

File tree

3 files changed

+57
-25
lines changed

3 files changed

+57
-25
lines changed

csharp/src/Drivers/BigQuery/BigQueryConnection.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
using System;
1919
using System.Collections.Generic;
2020
using System.Collections.ObjectModel;
21+
using System.Diagnostics;
2122
using System.Linq;
2223
using System.Net.Http;
2324
using System.Text;
@@ -65,7 +66,6 @@ public BigQueryConnection(IReadOnlyDictionary<string, string> properties)
6566
this.properties = new ReadOnlyDictionary<string, string>(modifiedProperties);
6667
this.httpClient = new HttpClient();
6768
}
68-
6969
public Func<Task>? UpdateToken { get; set; }
7070

7171
internal BigQueryClient? Client { get; private set; }
@@ -1225,6 +1225,8 @@ private string Sanitize(string? input)
12251225
{
12261226
try
12271227
{
1228+
Debug.WriteLine($"{DateTime.Now.ToString()} - Trading {entraAccessToken}");
1229+
12281230
var requestBody = new
12291231
{
12301232
scope = BigQueryConstants.EntraIdScope,
@@ -1245,6 +1247,8 @@ private string Sanitize(string? input)
12451247

12461248
BigQueryStsTokenResponse? bigQueryTokenResponse = JsonSerializer.Deserialize<BigQueryStsTokenResponse>(responseBody);
12471249

1250+
Debug.WriteLine($"{DateTime.Now.ToString()} - Traded for Google token {bigQueryTokenResponse?.AccessToken}");
1251+
12481252
return bigQueryTokenResponse?.AccessToken;
12491253
}
12501254
catch (Exception ex)

csharp/src/Drivers/BigQuery/BigQueryStatement.cs

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ public BigQueryStatement(BigQueryConnection bigQueryConnection)
5959

6060
private GoogleCredential Credential => this.bigQueryConnection.Credential ?? throw new AdbcException("Credential cannot be null");
6161

62-
6362
public override QueryResult ExecuteQuery()
6463
{
6564
//Func<Task<QueryResult>> func = () => ExecuteQueryInternalAsync();
@@ -74,6 +73,8 @@ private async Task<QueryResult> ExecuteQueryInternalAsync()
7473

7574
BigQueryJob job = await this.Client.CreateQueryJobAsync(SqlQuery, null, queryOptions);
7675

76+
JobReference jobReference = job.Reference;
77+
7778
GetQueryResultsOptions getQueryResultsOptions = new GetQueryResultsOptions();
7879

7980
if (this.Options?.TryGetValue(BigQueryParameters.GetQueryResultsOptionsTimeout, out string? timeoutSeconds) == true &&
@@ -83,16 +84,13 @@ private async Task<QueryResult> ExecuteQueryInternalAsync()
8384
getQueryResultsOptions.Timeout = TimeSpan.FromSeconds(seconds);
8485
}
8586

86-
Task<BigQueryResults> resultsTask = job.GetQueryResultsAsync(getQueryResultsOptions);
87-
8887
DateTime start = DateTime.Now;
8988

90-
Func<Task<bool>> func = () => Task.Run(() =>
89+
Func<Task<BigQueryJob>> checkJobStatus = async () =>
9190
{
9291
while (true)
9392
{
94-
var jobWithStatus = this.Client.GetJob(job.Reference);
95-
Debug.WriteLine($"Job state is {jobWithStatus.State}");
93+
var jobWithStatus = await this.Client.GetJobAsync(jobReference);
9694

9795
if (jobWithStatus.State == JobState.Done)
9896
{
@@ -104,49 +102,57 @@ private async Task<QueryResult> ExecuteQueryInternalAsync()
104102
DateTime end = DateTime.Now;
105103
TimeSpan duration = end - start;
106104
Debug.WriteLine($"Done at {end.ToString()} after {duration.TotalMinutes}");
107-
return true;
105+
return jobWithStatus;
108106
}
109107
}
110-
});
108+
};
111109

112110
Debug.WriteLine($"Starting ExecuteWithRetriesAsync at {start.ToString()}");
113111

114-
await AdbcRetryManager.ExecuteWithRetriesAsync<bool>(this, func);
112+
await AdbcRetryManager.ExecuteWithRetriesAsync<BigQueryJob>(this, checkJobStatus);
115113

116114
Debug.WriteLine($"Getting results at {DateTime.Now.ToString()}");
117115

118-
BigQueryResults results = resultsTask.Result;
116+
Func<Task<BigQueryResults>> getJobResults = async () =>
117+
{
118+
// if the authentication token was reset, then we need a new job with the latest token
119+
BigQueryJob completedJob = await this.Client.GetJobAsync(jobReference);
120+
return await completedJob.GetQueryResultsAsync();
121+
};
122+
123+
BigQueryResults results = await AdbcRetryManager.ExecuteWithRetriesAsync(this, getJobResults);
119124

120125
Debug.WriteLine($"Results received at {DateTime.Now.ToString()}");
121126

122127
TokenProtectedReadClientManger clientMgr = new TokenProtectedReadClientManger(this.Credential);
123128
clientMgr.UpdateToken = () => Task.Run(() =>
124129
{
130+
Debug.WriteLine($"TokenProtectedReadClientManger updating token at {DateTime.Now.ToString()}");
125131
this.bigQueryConnection.SetCredential();
126132
clientMgr.UpdateCredential(this.Credential);
127133
});
128134

129-
if (results.TableReference == null)
135+
if (results?.TableReference == null)
130136
{
131137
// To get the results of all statements in a multi-statement query, enumerate the child jobs and call jobs.getQueryResults on each of them.
132138
// Related public docs: https://cloud.google.com/bigquery/docs/multi-statement-queries#get_all_executed_statements
133139
ListJobsOptions listJobsOptions = new ListJobsOptions();
134-
listJobsOptions.ParentJobId = results.JobReference.JobId;
135-
PagedEnumerable<JobList, BigQueryJob> joblist = this.Client.ListJobs(listJobsOptions);
140+
listJobsOptions.ParentJobId = results?.JobReference.JobId;
141+
PagedAsyncEnumerable<JobList, BigQueryJob> joblist = this.Client.ListJobsAsync(listJobsOptions);
136142
BigQueryJob firstQueryJob = new BigQueryJob(this.Client, job.Resource);
137-
foreach (BigQueryJob childJob in joblist)
143+
await foreach (BigQueryJob childJob in joblist)
138144
{
139-
var tempJob = this.Client.GetJob(childJob.Reference);
145+
var tempJob = await this.Client.GetJobAsync(childJob.Reference);
140146
var query = tempJob.Resource?.Configuration?.Query;
141147
if (query != null && query.DestinationTable != null && query.DestinationTable.ProjectId != null && query.DestinationTable.DatasetId != null && query.DestinationTable.TableId != null)
142148
{
143149
firstQueryJob = tempJob;
144150
}
145151
}
146-
results = firstQueryJob.GetQueryResults();
152+
results = await firstQueryJob.GetQueryResultsAsync();
147153
}
148154

149-
if (results.TableReference == null)
155+
if (results?.TableReference == null)
150156
{
151157
throw new AdbcException("There is no query statement");
152158
}
@@ -156,6 +162,7 @@ private async Task<QueryResult> ExecuteQueryInternalAsync()
156162
string table = $"projects/{results.TableReference.ProjectId}/datasets/{results.TableReference.DatasetId}/tables/{results.TableReference.TableId}";
157163

158164
int maxStreamCount = 1;
165+
159166
if (this.Options?.TryGetValue(BigQueryParameters.MaxFetchConcurrency, out string? maxStreamCountString) == true)
160167
{
161168
if (int.TryParse(maxStreamCountString, out int count))
@@ -166,28 +173,34 @@ private async Task<QueryResult> ExecuteQueryInternalAsync()
166173
}
167174
}
168175
}
176+
169177
ReadSession rs = new ReadSession { Table = table, DataFormat = DataFormat.Arrow };
170178
ReadSession rrs = clientMgr.ReadClient.CreateReadSession("projects/" + results.TableReference.ProjectId, rs, maxStreamCount);
171179

172180
long totalRows = results.TotalRows == null ? -1L : (long)results.TotalRows.Value;
173181

182+
Debug.WriteLine($"Starting to read streams at {DateTime.Now.ToString()}");
183+
174184
var readers = rrs.Streams
175185
.Select(s => ReadChunkWithRetries(clientMgr, s.Name))
176186
.Where(chunk => chunk != null)
177187
.Cast<IArrowReader>();
178188

189+
Debug.WriteLine($"Creating Arrow stream at {DateTime.Now.ToString()}");
190+
179191
IArrowArrayStream stream = new MultiArrowReader(TranslateSchema(results.Schema), readers);
180192

193+
Debug.WriteLine($"Returning results at {DateTime.Now.ToString()}");
194+
181195
return new QueryResult(totalRows, stream);
182196
}
183197

184198
public override UpdateResult ExecuteUpdate()
185199
{
186-
Func<Task<UpdateResult>> func = () => Task.Run(() => ExecuteUpdateInternal());
187-
return AdbcRetryManager.ExecuteWithRetriesAsync<UpdateResult>(this, func).Result;
200+
return ExecuteUpdateInternalAsync().GetAwaiter().GetResult();
188201
}
189202

190-
private UpdateResult ExecuteUpdateInternal()
203+
private async Task<UpdateResult> ExecuteUpdateInternalAsync()
191204
{
192205
QueryOptions options = ValidateOptions();
193206
GetQueryResultsOptions getQueryResultsOptions = new GetQueryResultsOptions();
@@ -199,7 +212,7 @@ private UpdateResult ExecuteUpdateInternal()
199212
getQueryResultsOptions.Timeout = TimeSpan.FromSeconds(seconds);
200213
}
201214

202-
BigQueryResults result = this.Client.ExecuteQuery(
215+
BigQueryResults result = await this.Client.ExecuteQueryAsync(
203216
SqlQuery,
204217
parameters: null,
205218
queryOptions: options,

csharp/test/Drivers/BigQuery/AuthenticationTests.cs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public AuthenticationTests(ITestOutputHelper outputHelper)
4646
}
4747

4848
/// <summary>
49-
/// Validates if the Entra token can sign in and refresh.
49+
/// Validates if the Entra token can sign in.
5050
/// </summary>
5151
[SkippableFact, Order(1)]
5252
public void CanSignInWithEntraToken()
@@ -70,8 +70,11 @@ public void CanSignInWithEntraToken()
7070
Tests.DriverTests.CanExecuteQuery(queryResult, environment.ExpectedResultsCount, environment.Name);
7171
}
7272

73+
/// <summary>
74+
/// Validates if the Entra token can sign in and refresh.
75+
/// </summary>
7376
[SkippableFact, Order(1)]
74-
public void CanRefreshToken()
77+
public void CanSignInWithAndRefreshEntraToken()
7578
{
7679
BigQueryTestEnvironment? environment = _environments.Where(x => x.AuthenticationType == BigQueryConstants.EntraIdAuthenticationType).FirstOrDefault();
7780
Assert.NotNull(environment);
@@ -86,10 +89,20 @@ public void CanRefreshToken()
8689
_outputHelper.WriteLine("Successfully set a new token");
8790
});
8891

92+
// create a query that takes 75 minutes because Entra tokens typically expire in 60 minutes
8993
AdbcStatement statement = connection.CreateStatement();
90-
statement.SqlQuery = "SELECT COUNT(*) FROM UNNEST(GENERATE_ARRAY(1, 1000000, 1)) AS a, UNNEST(GENERATE_ARRAY(1, 1000000, 1)) AS b WHERE RAND() < 0.0000001";
94+
statement.SqlQuery = @"
95+
DECLARE end_time TIMESTAMP;
96+
SET end_time = TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 75 MINUTE);
97+
98+
WHILE CURRENT_TIMESTAMP() < end_time DO
99+
END WHILE;
100+
101+
SELECT 'Query completed after 75 minutes' AS result;";
91102

92103
QueryResult queryResult = statement.ExecuteQuery();
104+
105+
_outputHelper.WriteLine($"Retrieve query result with {queryResult.RowCount} rows");
93106
}
94107

95108
private string GetAccessToken(BigQueryTestEnvironment environment)
@@ -107,6 +120,8 @@ private string GetAccessToken(BigQueryTestEnvironment environment)
107120
TokenRequestContext requestContext = new TokenRequestContext(environment.EntraConfiguration.Scopes, claims: claimJson);
108121
AccessToken accessToken = credential.GetToken(requestContext);
109122

123+
_outputHelper.WriteLine($"Access token is {accessToken.Token}");
124+
110125
return accessToken.Token;
111126
}
112127
}

0 commit comments

Comments
 (0)