15
15
from IPython .display import HTML , display
16
16
from pandas import DataFrame
17
17
from sqlalchemy import inspect
18
+ from sqlalchemy .engine .base import Engine
18
19
from sqlalchemy .engine .reflection import Inspector
19
20
from sqlalchemy .exc import NoSuchTableError
20
21
from sqlalchemy .types import TypeEngine
@@ -412,7 +413,7 @@ def run(self, invoked_as: str, args: List[str]) -> Tuple[DataFrame, bool]:
412
413
# down below will then raise NoSuchTableError.
413
414
column_dicts = inspector .get_columns (relation_name , schema = schema )
414
415
except NoSuchTableError :
415
- self . _raise_from_no_such_table (schema , relation_name )
416
+ _raise_from_no_such_table (schema , relation_name )
416
417
417
418
# 'Pivot' the dicts from get_columns()
418
419
names = []
@@ -553,31 +554,34 @@ def run(self, invoked_as: str, args: List[str]) -> None:
553
554
# * Uploading batches of successfully discovered relations
554
555
# * Catching any exception and reporting it back to gate. Will suppress the exception.
555
556
#
556
- with RelationStructureMessager (ds_id ) as messenger :
557
- inspector = self .get_inspector ()
558
557
559
- # This and delta() just for development timing figures. Could become yet another
560
- # timer context manager implementation.
561
- start = time .monotonic ()
558
+ # This and delta() just for development timing figures. Could become yet another
559
+ # timer context manager implementation.
560
+ start = time .monotonic ()
562
561
563
- def delta () -> float :
564
- """Record new timing section, kindof like a stopwatch lap timer.
565
- Returns the prior 'lap' time.
566
- """
567
- nonlocal start
562
+ def delta () -> float :
563
+ """Record new timing section, kindof like a stopwatch lap timer.
564
+ Returns the prior 'lap' time.
565
+ """
566
+ nonlocal start
568
567
569
- now = time .monotonic ()
570
- ret = now - start
571
- start = now
568
+ now = time .monotonic ()
569
+ ret = now - start
570
+ start = now
572
571
573
- return ret
572
+ return ret
573
+
574
+ with RelationStructureMessager (ds_id ) as messenger :
575
+ inspector = self .get_inspector ()
574
576
575
577
relations_and_kinds = self .all_table_and_views (inspector )
576
578
print (f'Discovered { len (relations_and_kinds )} relations in { delta ()} ' )
577
579
578
580
# Introspect each relation concurrently.
579
581
# TODO: Take minimum concurrency as a param?
580
- with ThreadPoolExecutor (max_workers = self .MAX_INTROSPECTION_THREADS ) as executor :
582
+ with ThreadPoolExecutor (
583
+ max_workers = self .get_max_threadpool_workers (inspector )
584
+ ) as executor :
581
585
future_to_relation = {
582
586
executor .submit (
583
587
self .fully_introspect , inspector , schema_name , relation_name , kind
@@ -589,10 +593,8 @@ def delta() -> float:
589
593
schema_name , relation_name , kind = future_to_relation [future ]
590
594
messenger .queue_for_delivery (future .result ())
591
595
592
- table_introspection_delta = delta ()
593
- print (
594
- f'Done introspecting and messaging gate in { table_introspection_delta } , amortized { table_introspection_delta / len (relations_and_kinds )} s per relation'
595
- )
596
+ table_introspection_delta = delta ()
597
+ print (f'Done introspecting and messaging gate in { table_introspection_delta } ' )
596
598
597
599
# run() contract: return what to bind to the SQL cell variable name, and if display() needs
598
600
# to be called on it. Nothing and nope!
@@ -602,14 +604,27 @@ def delta() -> float:
602
604
# All of the rest of the methods end up assisting run(), directly or indirectly
603
605
###
604
606
605
- def all_table_and_views (self , inspector ) -> List [Tuple [str , str , str ]]:
607
+ @classmethod
608
+ def get_max_threadpool_workers (cls , inspector : SchemaStrippingInspector ) -> int :
609
+ """Determine max concurrency for introspecting this sort of database.
610
+
611
+ BigQuery, in particular, seems to be not totally threadsafe, ENG-5808
612
+ """
613
+
614
+ if inspector .engine .driver == 'bigquery' :
615
+ return 1
616
+ else :
617
+ return cls .MAX_INTROSPECTION_THREADS
618
+
619
+ @classmethod
620
+ def all_table_and_views (cls , inspector ) -> List [Tuple [str , str , str ]]:
606
621
"""Returns list of (schema name, relation name, table-or-view) tuples"""
607
622
608
623
results = []
609
624
610
625
default_schema = inspector .default_schema_name
611
626
all_schemas = set (inspector .get_schema_names ())
612
- all_schemas .difference_update (self .AVOID_SCHEMAS )
627
+ all_schemas .difference_update (cls .AVOID_SCHEMAS )
613
628
if default_schema and default_schema not in all_schemas :
614
629
all_schemas .add (default_schema )
615
630
@@ -632,23 +647,24 @@ def all_table_and_views(self, inspector) -> List[Tuple[str, str, str]]:
632
647
633
648
return results
634
649
650
+ @classmethod
635
651
def fully_introspect (
636
- self , inspector : SchemaStrippingInspector , schema_name : str , relation_name : str , kind : str
652
+ cls , inspector : SchemaStrippingInspector , schema_name : str , relation_name : str , kind : str
637
653
) -> RelationStructureDescription :
638
654
"""Drive introspecting into this single relation, making all the necessary Introspector API
639
655
calls to learn all of the relation's sub-structures.
640
656
641
657
Returns a RelationStructureDescription pydantic model, suitable to POST back to Gate with.
642
658
"""
643
659
644
- columns = self .introspect_columns (inspector , schema_name , relation_name )
660
+ columns = cls .introspect_columns (inspector , schema_name , relation_name )
645
661
646
662
# Always introspect indexes, even if a view, because materialized views
647
663
# can have indexes.
648
- indexes = self .introspect_indexes (inspector , schema_name , relation_name )
664
+ indexes = cls .introspect_indexes (inspector , schema_name , relation_name )
649
665
650
666
# Likewise unique constraints? Those _might_ be definable on materialized views?
651
- unique_constraints = self .introspect_unique_constraints (
667
+ unique_constraints = cls .introspect_unique_constraints (
652
668
inspector , schema_name , relation_name
653
669
)
654
670
@@ -660,14 +676,14 @@ def fully_introspect(
660
676
foreign_keys = []
661
677
else :
662
678
view_definition = None
663
- primary_key_name , primary_key_columns = self .introspect_primary_key (
679
+ primary_key_name , primary_key_columns = cls .introspect_primary_key (
664
680
inspector , relation_name , schema_name
665
681
)
666
- check_constraints = self .introspect_check_constraints (
682
+ check_constraints = cls .introspect_check_constraints (
667
683
inspector , schema_name , relation_name
668
684
)
669
685
670
- foreign_keys = self .introspect_foreign_keys (inspector , schema_name , relation_name )
686
+ foreign_keys = cls .introspect_foreign_keys (inspector , schema_name , relation_name )
671
687
672
688
print (f'Introspected { kind } { schema_name } .{ relation_name } ' )
673
689
@@ -685,8 +701,9 @@ def fully_introspect(
685
701
foreign_keys = foreign_keys ,
686
702
)
687
703
704
+ @classmethod
688
705
def introspect_foreign_keys (
689
- self , inspector : SchemaStrippingInspector , schema_name : str , relation_name : str
706
+ cls , inspector : SchemaStrippingInspector , schema_name : str , relation_name : str
690
707
) -> List [ForeignKeysModel ]:
691
708
"""Introspect all foreign keys for a table, describing the results as a List[ForeignKeysModel]"""
692
709
@@ -716,8 +733,9 @@ def introspect_foreign_keys(
716
733
717
734
return fkeys
718
735
736
+ @classmethod
719
737
def introspect_check_constraints (
720
- self , inspector , schema_name , relation_name
738
+ cls , inspector , schema_name , relation_name
721
739
) -> List [CheckConstraintModel ]:
722
740
"""Introspect all check constraints for a table, describing the results as a List[CheckConstraintModel]"""
723
741
@@ -734,8 +752,9 @@ def introspect_check_constraints(
734
752
735
753
return constraints
736
754
755
+ @classmethod
737
756
def introspect_unique_constraints (
738
- self , inspector , schema_name , relation_name
757
+ cls , inspector , schema_name , relation_name
739
758
) -> List [UniqueConstraintModel ]:
740
759
"""Introspect all unique constraints for a table, describing the results as a List[UniqueConstraintModel]"""
741
760
@@ -752,7 +771,8 @@ def introspect_unique_constraints(
752
771
753
772
return constraints
754
773
755
- def introspect_indexes (self , inspector , schema_name , relation_name ) -> List [IndexModel ]:
774
+ @classmethod
775
+ def introspect_indexes (cls , inspector , schema_name , relation_name ) -> List [IndexModel ]:
756
776
"""Introspect all indexes for a table or materialized view, describing the results as a List[IndexModel]"""
757
777
indexes = []
758
778
@@ -769,8 +789,9 @@ def introspect_indexes(self, inspector, schema_name, relation_name) -> List[Inde
769
789
770
790
return indexes
771
791
792
+ @classmethod
772
793
def introspect_primary_key (
773
- self , inspector : SchemaStrippingInspector , relation_name : str , schema_name : str
794
+ cls , inspector : SchemaStrippingInspector , relation_name : str , schema_name : str
774
795
) -> Tuple [Optional [str ], List [str ]]:
775
796
"""Introspect the primary key of a table, returning the pkey name and list of columns in the primary key (if any).
776
797
@@ -792,8 +813,9 @@ def introspect_primary_key(
792
813
# No primary key to be returned.
793
814
return None , []
794
815
816
+ @classmethod
795
817
def introspect_columns (
796
- self , inspector : SchemaStrippingInspector , schema_name : str , relation_name : str
818
+ cls , inspector : SchemaStrippingInspector , schema_name : str , relation_name : str
797
819
) -> List [ColumnModel ]:
798
820
column_dicts = inspector .get_columns (relation_name , schema = schema_name )
799
821
@@ -1235,6 +1257,15 @@ def default_schema_name(self) -> Optional[str]:
1235
1257
# BigQuery, Trino dialects may end up returning None.
1236
1258
return self .underlying_inspector .default_schema_name
1237
1259
1260
+ @property
1261
+ def engine (self ) -> Engine :
1262
+ return self .underlying_inspector .engine
1263
+
1264
+ # Value-added properties
1265
+ @property
1266
+ def is_bigquery (self ) -> bool :
1267
+ return self .engine .driver == 'bigquery'
1268
+
1238
1269
def get_schema_names (self ) -> List [str ]:
1239
1270
return self .underlying_inspector .get_schema_names ()
1240
1271
@@ -1243,6 +1274,11 @@ def get_columns(self, relation_name: str, schema: Optional[str] = None) -> List[
1243
1274
1244
1275
@handle_not_implemented ('(unobtainable)' )
1245
1276
def get_view_definition (self , view_name : str , schema : Optional [str ] = None ) -> str :
1277
+ if self .is_bigquery :
1278
+ # Sigh. Have to explicitly interpolate schema into view name, else
1279
+ # underlying driver code complains. Not even joking.
1280
+ view_name = f'{ schema } .{ view_name } '
1281
+
1246
1282
return self .underlying_inspector .get_view_definition (view_name , schema = schema )
1247
1283
1248
1284
def get_pk_constraint (self , table_name : str , schema : Optional [str ] = None ) -> dict :
0 commit comments