Skip to content

Commit 499ef23

Browse files
committed
Added execution metrics for BQ Pushdown jobs
1 parent 9f76f9c commit 499ef23

File tree

4 files changed

+107
-1
lines changed

4 files changed

+107
-1
lines changed

src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryJoinDataset.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,12 +150,14 @@ public void executeJoin() {
150150
if (queryJob == null) {
151151
throw new SQLEngineException("BigQuery job not found: " + jobId);
152152
} else if (queryJob.getStatus().getError() != null) {
153+
BigQuerySQLEngineUtils.logJobMetrics(queryJob);
153154
throw new SQLEngineException(String.format(
154155
"Error executing BigQuery Job: '%s' in Project '%s', Dataset '%s', Location'%s' : %s",
155156
jobId, project, bqDataset, location, queryJob.getStatus().getError().toString()));
156157
}
157158

158159
LOG.info("Created BigQuery table `{}` using Job: {}", bqTable, jobId);
160+
BigQuerySQLEngineUtils.logJobMetrics(queryJob);
159161
}
160162

161163
@Override

src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySelectDataset.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,12 +124,14 @@ public BigQuerySelectDataset execute() {
124124
if (queryJob == null) {
125125
throw new SQLEngineException("BigQuery job not found: " + jobId);
126126
} else if (queryJob.getStatus().getError() != null) {
127+
BigQuerySQLEngineUtils.logJobMetrics(queryJob);
127128
throw new SQLEngineException(String.format(
128129
"Error executing BigQuery Job: '%s' in Project '%s', Dataset '%s', Location'%s' : %s",
129130
jobId, project, bqDataset, location, queryJob.getStatus().getError().toString()));
130131
}
131132

132133
LOG.info("Created BigQuery table `{}` using Job: {}", bqTable, jobId);
134+
BigQuerySQLEngineUtils.logJobMetrics(queryJob);
133135
return this;
134136
}
135137

src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryWrite.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ private SQLWriteResult writeInternal(SQLWriteRequest writeRequest,
228228

229229
// Check for errors
230230
if (queryJob.getStatus().getError() != null) {
231+
BigQuerySQLEngineUtils.logJobMetrics(queryJob);
231232
LOG.error("Error executing BigQuery Job: '{}' in Project '{}', Dataset '{}': {}",
232233
jobId, sqlEngineConfig.getProject(), sqlEngineConfig.getDatasetProject(),
233234
queryJob.getStatus().getError().toString());
@@ -241,6 +242,7 @@ private SQLWriteResult writeInternal(SQLWriteRequest writeRequest,
241242
LOG.info("Executed copy operation for {} records from {}.{}.{} to {}.{}.{}", numRows,
242243
sourceTableId.getProject(), sourceTableId.getDataset(), sourceTableId.getTable(),
243244
destinationTableId.getProject(), destinationTableId.getDataset(), destinationTableId.getTable());
245+
BigQuerySQLEngineUtils.logJobMetrics(queryJob);
244246

245247
return SQLWriteResult.success(datasetName, numRows);
246248
}

src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/util/BigQuerySQLEngineUtils.java

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,15 @@
1818

1919
import com.google.cloud.bigquery.BigQuery;
2020
import com.google.cloud.bigquery.DatasetId;
21+
import com.google.cloud.bigquery.Job;
22+
import com.google.cloud.bigquery.JobStatistics;
23+
import com.google.cloud.bigquery.QueryStage;
2124
import com.google.cloud.bigquery.StandardTableDefinition;
2225
import com.google.cloud.bigquery.Table;
2326
import com.google.cloud.bigquery.TableDefinition;
2427
import com.google.cloud.bigquery.TableId;
2528
import com.google.cloud.bigquery.TableInfo;
29+
import com.google.gson.Gson;
2630
import io.cdap.cdap.api.data.schema.Schema;
2731
import io.cdap.cdap.etl.api.engine.sql.SQLEngineException;
2832
import io.cdap.cdap.etl.api.join.JoinCondition;
@@ -41,6 +45,7 @@
4145
import java.util.Map;
4246
import java.util.UUID;
4347
import java.util.concurrent.TimeUnit;
48+
import java.util.stream.Collectors;
4449
import javax.annotation.Nullable;
4550

4651
/**
@@ -49,6 +54,7 @@
4954
public class BigQuerySQLEngineUtils {
5055

5156
private static final Logger LOG = LoggerFactory.getLogger(BigQuerySQLEngineUtils.class);
57+
private static final Gson GSON = new Gson();
5258

5359
public static final String GCS_PATH_FORMAT = BigQuerySinkUtils.GS_PATH_FORMAT + "/%s";
5460
public static final String BQ_TABLE_NAME_FORMAT = "%s_%s";
@@ -218,7 +224,7 @@ public static void validateOnExpressionJoinCondition(JoinCondition.OnExpression
218224

219225
/**
220226
* Validates stages for a Join on Key operation
221-
*
227+
* <p>
222228
* TODO: Update logic once BQ SQL engine joins support multiple outer join tables
223229
*
224230
* @param joinDefinition Join Definition to validate
@@ -292,4 +298,98 @@ public static Map<String, String> getJobTags(String operation) {
292298
labels.put("pushdown_operation", operation);
293299
return Collections.unmodifiableMap(labels);
294300
}
301+
302+
/**
303+
* Logs information about a BigQUery Job execution using a specified Logger instance
304+
*
305+
* @param job BigQuery Job
306+
*/
307+
public static void logJobMetrics(Job job) {
308+
// Ensure job has statistics information
309+
if (job.getStatistics() == null) {
310+
LOG.warn("No statistics were found for BigQuery job {}", job.getJobId());
311+
}
312+
313+
String startTimeStr = getISODateTimeString(job.getStatistics().getStartTime());
314+
String endTimeStr = getISODateTimeString(job.getStatistics().getEndTime());
315+
String executionTimeStr = getExecutionTimeString(job.getStatistics().getStartTime(),
316+
job.getStatistics().getEndTime());
317+
318+
// Print detailed query statistics if available
319+
if (job.getStatistics() instanceof JobStatistics.QueryStatistics) {
320+
JobStatistics.QueryStatistics queryStatistics = (JobStatistics.QueryStatistics) job.getStatistics();
321+
LOG.info("Metrics for job {}:\n" +
322+
" Start: {} ,\n" +
323+
" End: {} ,\n" +
324+
" Execution time: {} ,\n" +
325+
" Processed Bytes: {} ,\n" +
326+
" Billed Bytes: {} ,\n" +
327+
" Total Slot ms: {} ,\n" +
328+
" Records per stage (read/write): {}",
329+
job.getJobId().getJob(),
330+
startTimeStr,
331+
endTimeStr,
332+
executionTimeStr,
333+
queryStatistics.getTotalBytesProcessed(),
334+
queryStatistics.getTotalBytesBilled(),
335+
queryStatistics.getTotalSlotMs(),
336+
getQueryStageRecordCounts(queryStatistics.getQueryPlan()));
337+
338+
if (LOG.isTraceEnabled()) {
339+
LOG.trace("Additional Metrics for job {}:\n" +
340+
" Query Plan: {} ,\n" +
341+
" Query Timeline: {} \n",
342+
job.getJobId().getJob(),
343+
GSON.toJson(queryStatistics.getQueryPlan()),
344+
GSON.toJson(queryStatistics.getTimeline()));
345+
}
346+
347+
return;
348+
}
349+
350+
// Print basic metrics
351+
JobStatistics statistics = job.getStatistics();
352+
LOG.info("Metrics for job: {}\n" +
353+
" Start: {} ,\n" +
354+
" End: {} ,\n" +
355+
" Execution time: {}",
356+
job.getJobId().getJob(),
357+
startTimeStr,
358+
endTimeStr,
359+
executionTimeStr);
360+
}
361+
362+
private static String getISODateTimeString(Long epoch) {
363+
if (epoch == null) {
364+
return "N/A";
365+
}
366+
367+
return Instant.ofEpochMilli(epoch).toString();
368+
}
369+
370+
private static String getExecutionTimeString(Long startEpoch, Long endEpoch) {
371+
if (startEpoch == null || endEpoch == null) {
372+
return "N/A";
373+
}
374+
375+
return (endEpoch - startEpoch) + " ms";
376+
}
377+
378+
private static String getQueryStageRecordCounts(List<QueryStage> queryPlan) {
379+
if (queryPlan == null || queryPlan.isEmpty()) {
380+
return "N/A";
381+
}
382+
383+
return queryPlan.stream()
384+
.map(qs -> formatRecordCount(qs.getRecordsRead()) + "/" + formatRecordCount(qs.getRecordsWritten()))
385+
.collect(Collectors.joining(" , ", "[ ", " ]"));
386+
}
387+
388+
private static String formatRecordCount(Long val) {
389+
if (val == null) {
390+
return "N/A";
391+
}
392+
393+
return val.toString();
394+
}
295395
}

0 commit comments

Comments
 (0)