@@ -4,7 +4,7 @@ use etl::store::schema::SchemaStore;
4
4
use etl:: store:: state:: StateStore ;
5
5
use etl:: types:: { Cell , Event , PgLsn , TableId , TableName , TableRow } ;
6
6
use etl:: { bail, etl_error} ;
7
- use gcp_bigquery_client:: storage:: { TableBatch , TableDescriptor } ;
7
+ use gcp_bigquery_client:: storage:: TableDescriptor ;
8
8
use std:: collections:: { HashMap , HashSet } ;
9
9
use std:: fmt:: Display ;
10
10
use std:: iter;
@@ -14,7 +14,6 @@ use tokio::sync::Mutex;
14
14
use tracing:: { debug, info, warn} ;
15
15
16
16
use crate :: bigquery:: client:: { BigQueryClient , BigQueryOperationType } ;
17
- use crate :: bigquery:: encoding:: BigQueryTableRow ;
18
17
use crate :: bigquery:: { BigQueryDatasetId , BigQueryTableId } ;
19
18
use crate :: metrics:: register_metrics;
20
19
@@ -306,6 +305,9 @@ where
306
305
. await ?;
307
306
308
307
// Optimistically skip table creation if we've already seen this sequenced table.
308
+ //
309
+ // Note that if the table is deleted outside ETL and the cache marks it as created, the
310
+ // inserts will fail because the table will be missing and won't be created.
309
311
if !inner. created_tables . contains ( & sequenced_bigquery_table_id) {
310
312
self . client
311
313
. create_table_if_missing (
@@ -343,45 +345,6 @@ where
343
345
Ok ( ( sequenced_bigquery_table_id, Arc :: new ( table_descriptor) ) )
344
346
}
345
347
346
- /// Streams table batches to BigQuery concurrently without holding locks.
347
- ///
348
- /// This method can operate without locking because:
349
- /// - The BigQuery client is thread-safe and uses internal buffering
350
- /// - Table preparation is completed before calling this method
351
- /// - Multiple streaming operations can execute concurrently
352
- async fn stream_table_batches_concurrent_with_fallback (
353
- & self ,
354
- client : & BigQueryClient ,
355
- table_batches : Vec < TableBatch < BigQueryTableRow > > ,
356
- max_concurrent_streams : usize ,
357
- ) -> EtlResult < ( usize , usize ) > {
358
- // First attempt - optimistically assume all tables exist
359
- let result = client
360
- . stream_table_batches_concurrent ( table_batches, max_concurrent_streams)
361
- . await ;
362
-
363
- match result {
364
- Ok ( ( bytes_sent, bytes_received) ) => Ok ( ( bytes_sent, bytes_received) ) ,
365
- Err ( err) => {
366
- // From our testing, when trying to send data to a missing table, this is the error that is
367
- // returned:
368
- // `Status { code: PermissionDenied, message: "Permission 'TABLES_UPDATE_DATA' denied on
369
- // resource 'x' (or it may not exist).", source: None }`
370
- //
371
- // If we get permission denied, we assume that a table doesn't exist.
372
- // For now, we'll return the error since reconstructing batches is complex
373
- if err. kind ( ) == ErrorKind :: PermissionDenied {
374
- warn ! ( "one or more tables not found during concurrent streaming" ) ;
375
- // TODO: figure out how we could get per-table errors here and try to recreate the
376
- // tables.
377
- Err ( err)
378
- } else {
379
- Err ( err)
380
- }
381
- }
382
- }
383
- }
384
-
385
348
/// Adds a table to the creation cache to avoid redundant existence checks.
386
349
fn add_to_created_tables_cache ( inner : & mut Inner , table_id : & SequencedBigQueryTableId ) {
387
350
if inner. created_tables . contains ( table_id) {
@@ -391,11 +354,6 @@ where
391
354
inner. created_tables . insert ( table_id. clone ( ) ) ;
392
355
}
393
356
394
- /// Removes a table from the creation cache when it's found to not exist.
395
- fn remove_from_created_tables_cache ( inner : & mut Inner , table_id : & SequencedBigQueryTableId ) {
396
- inner. created_tables . remove ( table_id) ;
397
- }
398
-
399
357
/// Retrieves the current sequenced table ID or creates a new one starting at version 0.
400
358
async fn get_or_create_sequenced_bigquery_table_id (
401
359
& self ,
@@ -455,6 +413,7 @@ where
455
413
. create_or_replace_view ( & self . dataset_id , view_name, & target_table_id. to_string ( ) )
456
414
. await ?;
457
415
416
+ // We insert/overwrite the new (view -> sequenced bigquery table id) mapping
458
417
inner
459
418
. created_views
460
419
. insert ( view_name. clone ( ) , target_table_id. clone ( ) ) ;
@@ -507,11 +466,8 @@ where
507
466
// Stream all the batches concurrently.
508
467
if !table_batches. is_empty ( ) {
509
468
let ( bytes_sent, bytes_received) = self
510
- . stream_table_batches_concurrent_with_fallback (
511
- & self . client ,
512
- table_batches,
513
- self . max_concurrent_streams ,
514
- )
469
+ . client
470
+ . stream_table_batches_concurrent ( table_batches, self . max_concurrent_streams )
515
471
. await ?;
516
472
517
473
// Logs with egress_metric = true can be used to identify egress logs.
@@ -547,7 +503,6 @@ where
547
503
}
548
504
549
505
let event = event_iter. next ( ) . unwrap ( ) ;
550
-
551
506
match event {
552
507
Event :: Insert ( mut insert) => {
553
508
let sequence_number =
@@ -594,6 +549,7 @@ where
594
549
}
595
550
_ => {
596
551
// Every other event type is currently not supported.
552
+ debug ! ( "skipping unsupported event in BigQuery" ) ;
597
553
}
598
554
}
599
555
}
@@ -617,11 +573,8 @@ where
617
573
618
574
if !table_batches. is_empty ( ) {
619
575
let ( bytes_sent, bytes_received) = self
620
- . stream_table_batches_concurrent_with_fallback (
621
- & self . client ,
622
- table_batches,
623
- self . max_concurrent_streams ,
624
- )
576
+ . client
577
+ . stream_table_batches_concurrent ( table_batches, self . max_concurrent_streams )
625
578
. await ?;
626
579
627
580
// Logs with egress_metric = true can be used to identify egress logs.
@@ -766,7 +719,7 @@ where
766
719
) ;
767
720
768
721
// We remove the old table from the cache since it's no longer necessary.
769
- Self :: remove_from_created_tables_cache ( & mut inner, & sequenced_bigquery_table_id) ;
722
+ inner. created_tables . remove ( & sequenced_bigquery_table_id) ;
770
723
771
724
// Schedule cleanup of the previous table. We do not care to track this task since
772
725
// if it fails, users can clean up the table on their own, but the view will still point
0 commit comments