109109import java .util .Optional ;
110110import java .util .Set ;
111111import java .util .concurrent .Callable ;
112+ import java .util .concurrent .ConcurrentHashMap ;
112113import java .util .concurrent .ExecutionException ;
113114import java .util .concurrent .Executors ;
114115import java .util .concurrent .Future ;
115116import java .util .concurrent .Semaphore ;
116117import java .util .concurrent .TimeUnit ;
117118import java .util .concurrent .atomic .AtomicLong ;
119+ import java .util .stream .Collectors ;
118120import org .apache .beam .fn .harness .logging .QuotaEvent ;
119121import org .apache .beam .fn .harness .logging .QuotaEvent .QuotaEventCloseable ;
120122import org .apache .beam .runners .core .metrics .MonitoringInfoConstants ;
@@ -172,7 +174,12 @@ public class BigQueryServicesImpl implements BigQueryServices {
172174
173175 // The approximate maximum payload of rows for an insertAll request.
174176 // We set it to 9MB, which leaves room for request overhead.
175- private static final Integer MAX_BQ_ROW_PAYLOAD = 9 * 1024 * 1024 ;
177+ private static final Integer MAX_BQ_ROW_PAYLOAD_MB = 9 ;
178+
179+ private static final Integer MAX_BQ_ROW_PAYLOAD_BYTES = MAX_BQ_ROW_PAYLOAD_MB * 1024 * 1024 ;
180+
181+ private static final String MAX_BQ_ROW_PAYLOAD_DESC =
182+ String .format ("%sMB" , MAX_BQ_ROW_PAYLOAD_MB );
176183
177184 // The initial backoff for polling the status of a BigQuery job.
178185 private static final Duration INITIAL_JOB_STATUS_POLL_BACKOFF = Duration .standardSeconds (1 );
@@ -596,6 +603,7 @@ public static class DatasetServiceImpl implements DatasetService {
596603 private final PipelineOptions options ;
597604 private final long maxRowsPerBatch ;
598605 private final long maxRowBatchSize ;
606+ private final Map <String , TableSchema > tableSchemaCache = new ConcurrentHashMap <>();
599607 // aggregate the total time spent in exponential backoff
600608 private final Counter throttlingMsecs =
601609 Metrics .counter (DatasetServiceImpl .class , Metrics .THROTTLE_TIME_COUNTER_NAME );
@@ -1149,22 +1157,53 @@ <T> long insertAll(
11491157 // If this row's encoding by itself is larger than the maximum row payload, then it's
11501158 // impossible to insert into BigQuery, and so we send it out through the dead-letter
11511159 // queue.
1152- if (nextRowSize >= MAX_BQ_ROW_PAYLOAD ) {
1160+ if (nextRowSize >= MAX_BQ_ROW_PAYLOAD_BYTES ) {
11531161 InsertErrors error =
11541162 new InsertErrors ()
11551163 .setErrors (ImmutableList .of (new ErrorProto ().setReason ("row-too-large" )));
11561164 // We verify whether the retryPolicy parameter expects us to retry. If it does, then
11571165 // it will return true. Otherwise it will return false.
1158- Boolean isRetry = retryPolicy .shouldRetry (new InsertRetryPolicy .Context (error ));
1159- if (isRetry ) {
1160- throw new RuntimeException (
1166+ if (retryPolicy .shouldRetry (new InsertRetryPolicy .Context (error ))) {
1167+ // Obtain table schema
1168+ TableSchema tableSchema = null ;
1169+ try {
1170+ String tableSpec = BigQueryHelpers .toTableSpec (ref );
1171+ if (tableSchemaCache .containsKey (tableSpec )) {
1172+ tableSchema = tableSchemaCache .get (tableSpec );
1173+ } else {
1174+ Table table = getTable (ref );
1175+ if (table != null ) {
1176+ tableSchema =
1177+ TableRowToStorageApiProto .schemaToProtoTableSchema (table .getSchema ());
1178+ tableSchemaCache .put (tableSpec , tableSchema );
1179+ }
1180+ }
1181+ } catch (Exception e ) {
1182+ LOG .warn ("Failed to get table schema" , e );
1183+ }
1184+
1185+ // Validate row schema
1186+ String rowDetails = "" ;
1187+ if (tableSchema != null ) {
1188+ rowDetails = validateRowSchema (row , tableSchema );
1189+ }
1190+
1191+ // Basic log to return
1192+ String bqLimitLog =
11611193 String .format (
1162- "We have observed a row that is %s bytes in size and exceeded BigQueryIO"
1163- + " limit of 9MB. While BigQuery supports request sizes up to 10MB,"
1164- + " BigQueryIO sets the limit at 9MB to leave room for request"
1165- + " overhead. You may change your retry strategy to unblock this"
1166- + " pipeline, and the row will be output as a failed insert." ,
1167- nextRowSize ));
1194+ "We have observed a row of size %s bytes exceeding the "
1195+ + "BigQueryIO limit of %s." ,
1196+ nextRowSize , MAX_BQ_ROW_PAYLOAD_DESC );
1197+
1198+ // Add on row schema diff details if present
1199+ if (!rowDetails .isEmpty ()) {
1200+ bqLimitLog +=
1201+ String .format (
1202+ " This is probably due to a schema "
1203+ + "mismatch. Problematic row had extra schema fields: %s." ,
1204+ rowDetails );
1205+ }
1206+ throw new RuntimeException (bqLimitLog );
11681207 } else {
11691208 numFailedRows += 1 ;
11701209 errorContainer .add (failedInserts , error , ref , rowsToPublish .get (rowIndex ));
@@ -1177,7 +1216,7 @@ <T> long insertAll(
11771216 // If adding the next row will push the request above BQ row limits, or
11781217 // if the current batch of elements is larger than the targeted request size,
11791218 // we immediately go and issue the data insertion.
1180- if (dataSize + nextRowSize >= MAX_BQ_ROW_PAYLOAD
1219+ if (dataSize + nextRowSize >= MAX_BQ_ROW_PAYLOAD_BYTES
11811220 || dataSize >= maxRowBatchSize
11821221 || rows .size () + 1 > maxRowsPerBatch ) {
11831222 // If the row does not fit into the insert buffer, then we take the current buffer,
@@ -1317,6 +1356,48 @@ <T> long insertAll(
13171356 }
13181357 }
13191358
1359+ /**
1360+ * Validates a {@link TableRow} for logging, comparing the provided row against a BigQuery
1361+ * schema. The formatted string shows the field names in the row indicating any mismatches
1362+ * unknown entries.
1363+ *
1364+ * <p>For example, a {@link TableRow} with a "names" field, where the schema expects
1365+ * "name" would return "names".
1366+ *
1367+ * <pre>{@code {'name': java.lang.String}</pre>
1368+ *
1369+ * <p>If a field exists in the row but not in the schema,
1370+ * "Unknown fields" is prefixed to the log.</p>
1371+ *
1372+ * @param row The {@link TableRow} to validate.
1373+ * @param tableSchema The {@link TableSchema} to check against.
1374+ * @return A string representation of the row, indicating any schema mismatches.
1375+ */
1376+ private String validateRowSchema (TableRow row , TableSchema tableSchema ) {
1377+ // Creates bqSchemaFields containing field names
1378+ Set <String > bqSchemaFields =
1379+ tableSchema .getFieldsList ().stream ().map (f -> f .getName ()).collect (Collectors .toSet ());
1380+
1381+ // Validate
1382+ String rowDetails =
1383+ row .keySet ().stream ()
1384+ .map (
1385+ fieldName -> {
1386+ if (!bqSchemaFields .contains (fieldName )) {
1387+ return fieldName ;
1388+ }
1389+ return "" ;
1390+ })
1391+ .filter (s -> !s .isEmpty ())
1392+ .collect (Collectors .joining (", " , "{Unknown fields: " , "}" ));
1393+
1394+ // Shorten row details if too long for human readability
1395+ if (rowDetails .length () > 1024 ) {
1396+ rowDetails = rowDetails .substring (0 , 1024 ) + "...}" ;
1397+ }
1398+ return rowDetails ;
1399+ }
1400+
13201401 @ Override
13211402 public <T > long insertAll (
13221403 TableReference ref ,
0 commit comments