55import logging
66import traceback
77from dataclasses import InitVar , dataclass
8- from typing import Any , List , Mapping , Tuple , Dict , Optional
8+ from typing import Any , Dict , List , Mapping , Optional , Tuple
99
1010from airbyte_cdk import AbstractSource
1111from airbyte_cdk .sources .declarative .checks .connection_checker import ConnectionChecker
@@ -47,7 +47,7 @@ def _log_error(self, logger: logging.Logger, action: str, error: Exception) -> T
4747 return False , error_message
4848
4949 def check_connection (
50- self , source : AbstractSource , logger : logging .Logger , config : Mapping [str , Any ]
50+ self , source : AbstractSource , logger : logging .Logger , config : Mapping [str , Any ]
5151 ) -> Tuple [bool , Any ]:
5252 """Checks the connection to the source and its streams."""
5353 try :
@@ -64,18 +64,26 @@ def check_connection(
6464 f"{ stream_name } is not part of the catalog. Expected one of { list (stream_name_to_stream .keys ())} ."
6565 )
6666
67- stream_availability , message = self ._check_stream_availability (stream_name_to_stream , stream_name , logger )
67+ stream_availability , message = self ._check_stream_availability (
68+ stream_name_to_stream , stream_name , logger
69+ )
6870 if not stream_availability :
6971 return stream_availability , message
7072
71- should_check_dynamic_streams = hasattr (source , "resolved_manifest" ) and hasattr (source , "dynamic_streams" ) and self .dynamic_streams_check_configs
73+ should_check_dynamic_streams = (
74+ hasattr (source , "resolved_manifest" )
75+ and hasattr (source , "dynamic_streams" )
76+ and self .dynamic_streams_check_configs
77+ )
7278
7379 if should_check_dynamic_streams :
7480 return self ._check_dynamic_streams_availability (source , stream_name_to_stream , logger )
7581
7682 return True , None
7783
78- def _check_stream_availability (self , stream_name_to_stream : Dict [str , Any ], stream_name : str , logger : logging .Logger ) -> Tuple [bool , Any ]:
84+ def _check_stream_availability (
85+ self , stream_name_to_stream : Dict [str , Any ], stream_name : str , logger : logging .Logger
86+ ) -> Tuple [bool , Any ]:
7987 """Checks if streams are available."""
8088 availability_strategy = HttpAvailabilityStrategy ()
8189 try :
@@ -90,47 +98,61 @@ def _check_stream_availability(self, stream_name_to_stream: Dict[str, Any], stre
9098 return True , None
9199
92100 def _check_dynamic_streams_availability (
93- self , source : AbstractSource , stream_name_to_stream : Dict [str , Any ], logger : logging .Logger
101+ self , source : AbstractSource , stream_name_to_stream : Dict [str , Any ], logger : logging .Logger
94102 ) -> Tuple [bool , Any ]:
95103 """Checks the availability of dynamic streams."""
96104 dynamic_streams = source .resolved_manifest .get ("dynamic_streams" , []) # type: ignore[attr-defined] # The source's resolved_manifest manifest is checked before calling this method
97105 dynamic_stream_name_to_dynamic_stream = {
98106 ds .get ("name" , f"dynamic_stream_{ i } " ): ds for i , ds in enumerate (dynamic_streams )
99107 }
100- generated_streams = self ._map_generated_streams (source .dynamic_streams ) # type: ignore[attr-defined] # The source's dynamic_streams manifest is checked before calling this method
108+ generated_streams = self ._map_generated_streams (source .dynamic_streams ) # type: ignore[attr-defined] # The source's dynamic_streams manifest is checked before calling this method
101109
102- for check_config in self .dynamic_streams_check_configs : # type: ignore[union-attr] # None value for self.dynamic_streams_check_configs handled in __post_init__
110+ for check_config in self .dynamic_streams_check_configs : # type: ignore[union-attr] # None value for self.dynamic_streams_check_configs handled in __post_init__
103111 if check_config .dynamic_stream_name not in dynamic_stream_name_to_dynamic_stream :
104- return False , f"Dynamic stream { check_config .dynamic_stream_name } is not found in manifest."
112+ return (
113+ False ,
114+ f"Dynamic stream { check_config .dynamic_stream_name } is not found in manifest." ,
115+ )
105116
106117 generated = generated_streams .get (check_config .dynamic_stream_name , [])
107- stream_availability , message = self ._check_generated_streams_availability (generated , stream_name_to_stream , logger , check_config .stream_count )
118+ stream_availability , message = self ._check_generated_streams_availability (
119+ generated , stream_name_to_stream , logger , check_config .stream_count
120+ )
108121 if not stream_availability :
109122 return stream_availability , message
110123
111124 return True , None
112125
113- def _map_generated_streams (self , dynamic_streams : List [Dict [str , Any ]]) -> Dict [str , List [Dict [str , Any ]]]:
126+ def _map_generated_streams (
127+ self , dynamic_streams : List [Dict [str , Any ]]
128+ ) -> Dict [str , List [Dict [str , Any ]]]:
114129 """Maps dynamic stream names to their corresponding generated streams."""
115130 mapped_streams : Dict [str , List [Dict [str , Any ]]] = {}
116131 for stream in dynamic_streams :
117132 mapped_streams .setdefault (stream ["dynamic_stream_name" ], []).append (stream )
118133 return mapped_streams
119134
120135 def _check_generated_streams_availability (
121- self , generated_streams : List [Dict [str , Any ]], stream_name_to_stream : Dict [str , Any ],
122- logger : logging .Logger , max_count : int
136+ self ,
137+ generated_streams : List [Dict [str , Any ]],
138+ stream_name_to_stream : Dict [str , Any ],
139+ logger : logging .Logger ,
140+ max_count : int ,
123141 ) -> Tuple [bool , Any ]:
124142 """Checks availability of generated dynamic streams."""
125143 availability_strategy = HttpAvailabilityStrategy ()
126144 for declarative_stream in generated_streams [: min (max_count , len (generated_streams ))]:
127145 stream = stream_name_to_stream [declarative_stream ["name" ]]
128146 try :
129- stream_is_available , reason = availability_strategy .check_availability (stream , logger )
147+ stream_is_available , reason = availability_strategy .check_availability (
148+ stream , logger
149+ )
130150 if not stream_is_available :
131151 message = f"Dynamic Stream { stream .name } is not available: { reason } "
132152 logger .warning (message )
133153 return False , message
134154 except Exception as error :
135- return self ._log_error (logger , f"checking availability of dynamic stream { stream .name } " , error )
155+ return self ._log_error (
156+ logger , f"checking availability of dynamic stream { stream .name } " , error
157+ )
136158 return True , None
0 commit comments