@@ -432,6 +432,7 @@ class _TargetConnectorContext:
432432 value_fields_schema : list [FieldSchema ]
433433 value_decoder : Callable [[Any ], Any ]
434434 index_options : IndexOptions
435+ setup_state : Any
435436
436437
437438def _build_args (
@@ -472,7 +473,7 @@ class _TargetConnector:
472473 """
473474
474475 _spec_cls : type [Any ]
475- _setup_key_type : Any
476+ _persistent_key_type : Any
476477 _setup_state_cls : type [Any ]
477478 _connector_cls : type [Any ]
478479
@@ -486,12 +487,12 @@ class _TargetConnector:
486487 def __init__ (
487488 self ,
488489 spec_cls : type [Any ],
489- setup_key_type : Any ,
490+ persistent_key_type : Any ,
490491 setup_state_cls : type [Any ],
491492 connector_cls : type [Any ],
492493 ):
493494 self ._spec_cls = spec_cls
494- self ._setup_key_type = setup_key_type
495+ self ._persistent_key_type = persistent_key_type
495496 self ._setup_state_cls = setup_state_cls
496497 self ._connector_cls = connector_cls
497498
@@ -550,7 +551,7 @@ def _analyze_mutate_mutation_type(
550551 raise ValueError (
551552 f"Method { connector_cls .__name__ } .mutate(*args) parameter must be a tuple with "
552553 f"2 elements (tuple[SpecType, dict[str, ValueStruct]], spec and mutation in dict), "
553- "got {args_type }"
554+ f "got { analyzed_args_type . core_type } "
554555 )
555556
556557 def create_export_context (
@@ -590,6 +591,7 @@ def create_export_context(
590591 value_fields_schema = value_fields_schema ,
591592 value_decoder = value_decoder ,
592593 index_options = index_options ,
594+ setup_state = None ,
593595 )
594596
595597 def get_persistent_key (self , export_context : _TargetConnectorContext ) -> Any :
@@ -602,27 +604,28 @@ def get_persistent_key(self, export_context: _TargetConnectorContext) -> Any:
602604 return dump_engine_object (self ._get_persistent_key_fn (* args ))
603605
604606 def get_setup_state (self , export_context : _TargetConnectorContext ) -> Any :
605- get_persistent_state_fn = getattr (self ._connector_cls , "get_setup_state" , None )
606- if get_persistent_state_fn is None :
607+ get_setup_state_fn = getattr (self ._connector_cls , "get_setup_state" , None )
608+ if get_setup_state_fn is None :
607609 state = export_context .spec
608610 if not isinstance (state , self ._setup_state_cls ):
609611 raise ValueError (
610612 f"Expect a get_setup_state() method for { self ._connector_cls } that returns an instance of { self ._setup_state_cls } "
611613 )
612614 else :
613615 args = _build_args (
614- get_persistent_state_fn ,
616+ get_setup_state_fn ,
615617 1 ,
616618 spec = export_context .spec ,
617619 key_fields_schema = export_context .key_fields_schema ,
618620 value_fields_schema = export_context .value_fields_schema ,
619621 index_options = export_context .index_options ,
620622 )
621- state = get_persistent_state_fn (* args )
623+ state = get_setup_state_fn (* args )
622624 if not isinstance (state , self ._setup_state_cls ):
623625 raise ValueError (
624- f"Method { get_persistent_state_fn .__name__ } must return an instance of { self ._setup_state_cls } , got { type (state )} "
626+ f"Method { get_setup_state_fn .__name__ } must return an instance of { self ._setup_state_cls } , got { type (state )} "
625627 )
628+ export_context .setup_state = state
626629 return dump_engine_object (state )
627630
628631 def check_state_compatibility (
@@ -644,7 +647,10 @@ def check_state_compatibility(
644647 )
645648 return dump_engine_object (compatibility )
646649
647- async def prepare_async (self , export_context : _TargetConnectorContext ) -> None :
650+ async def prepare_async (
651+ self ,
652+ export_context : _TargetConnectorContext ,
653+ ) -> None :
648654 prepare_fn = getattr (self ._connector_cls , "prepare" , None )
649655 if prepare_fn is None :
650656 export_context .prepared_spec = export_context .spec
@@ -653,14 +659,15 @@ async def prepare_async(self, export_context: _TargetConnectorContext) -> None:
653659 prepare_fn ,
654660 1 ,
655661 spec = export_context .spec ,
662+ setup_state = export_context .setup_state ,
656663 key_fields_schema = export_context .key_fields_schema ,
657664 value_fields_schema = export_context .value_fields_schema ,
658665 )
659666 async_prepare_fn = to_async_call (prepare_fn )
660667 export_context .prepared_spec = await async_prepare_fn (* args )
661668
662669 def describe_resource (self , raw_key : Any ) -> str :
663- key = load_engine_object (self ._setup_key_type , raw_key )
670+ key = load_engine_object (self ._persistent_key_type , raw_key )
664671 describe_fn = getattr (self ._connector_cls , "describe" , None )
665672 if describe_fn is None :
666673 return str (key )
@@ -671,7 +678,7 @@ async def apply_setup_changes_async(
671678 changes : list [tuple [Any , list [dict [str , Any ] | None ], dict [str , Any ] | None ]],
672679 ) -> None :
673680 for raw_key , previous , current in changes :
674- key = load_engine_object (self ._setup_key_type , raw_key )
681+ key = load_engine_object (self ._persistent_key_type , raw_key )
675682 prev_specs = [
676683 load_engine_object (self ._setup_state_cls , spec )
677684 if spec is not None
@@ -715,7 +722,7 @@ async def mutate_async(
715722def target_connector (
716723 * ,
717724 spec_cls : type [Any ],
718- setup_key_type : Any = Any ,
725+ persistent_key_type : Any = Any ,
719726 setup_state_cls : type [Any ] | None = None ,
720727) -> Callable [[type ], type ]:
721728 """
@@ -729,7 +736,7 @@ def target_connector(
729736 # Register the target connector.
730737 def _inner (connector_cls : type ) -> type :
731738 connector = _TargetConnector (
732- spec_cls , setup_key_type , setup_state_cls or spec_cls , connector_cls
739+ spec_cls , persistent_key_type , setup_state_cls or spec_cls , connector_cls
733740 )
734741 _engine .register_target_connector (spec_cls .__name__ , connector )
735742 return connector_cls
0 commit comments