@@ -140,9 +140,21 @@ class AtlanConnectionCategory(str, Enum):
140
140
CUSTOM = "custom"
141
141
142
142
143
- class AtlanConnectorType (str , Enum ):
143
+ class AtlanConnectorType (str , Enum , metaclass = utils . ExtendableEnumMeta ):
144
144
category : AtlanConnectionCategory
145
145
146
+ @classmethod
147
+ def get_values (cls ):
148
+ return [member .value for member in cls ._member_map_ .values ()]
149
+
150
+ @classmethod
151
+ def get_names (cls ):
152
+ return list (cls ._member_map_ .keys ())
153
+
154
+ @classmethod
155
+ def get_items (cls ):
156
+ return [(name , member .value ) for name , member in cls ._member_map_ .items ()]
157
+
146
158
@classmethod
147
159
def _get_connector_type_from_qualified_name (
148
160
cls , qualified_name : str
@@ -152,12 +164,17 @@ def _get_connector_type_from_qualified_name(
152
164
raise ValueError (
153
165
f"Qualified name '{ qualified_name } ' does not contain enough segments."
154
166
)
155
- connector_type_key = tokens [1 ].upper ()
156
- # Check if the connector_type_key exists in AtlanConnectorType
167
+
168
+ connector_value = tokens [1 ]
169
+ # Ensure the enum name is converted to UPPER_SNAKE_CASE from kebab-case
170
+ connector_type_key = tokens [1 ].replace ("-" , "_" ).upper ()
171
+
172
+ # Check if the connector_type_key exists in AtlanConnectorType;
173
+ # if so, return it directly. Otherwise, it may be a custom type.
157
174
if connector_type_key not in AtlanConnectorType .__members__ :
158
- raise ValueError (
159
- f"Could not determine AtlanConnectorType from ' { qualified_name } '; "
160
- f"' { connector_type_key } ' is not a valid connector type."
175
+ return AtlanConnectorType . CREATE_CUSTOM (
176
+ name = connector_type_key ,
177
+ value = connector_value ,
161
178
)
162
179
return AtlanConnectorType [connector_type_key ]
163
180
@@ -169,6 +186,12 @@ def __new__(
169
186
obj .category = category
170
187
return obj
171
188
189
+ @classmethod
190
+ def CREATE_CUSTOM (
191
+ cls , name : str , value : str , category = AtlanConnectionCategory .CUSTOM
192
+ ) -> "AtlanConnectorType" :
193
+ return cls .add_value (name , value , category )
194
+
172
195
def to_qualified_name (self ):
173
196
return f"default/{ self .value } /{ int (utils .get_epoch_timestamp ())} "
174
197
@@ -193,14 +216,22 @@ def get_connector_name(
193
216
fields = qualified_name .split ("/" )
194
217
if len (fields ) != qualified_name_len :
195
218
raise ValueError (err )
219
+
220
+ connector_value = fields [1 ]
221
+ # Try enum conversion; fallback to custom connector if it fails
196
222
try :
197
- connector_name = AtlanConnectorType (fields [1 ]).value # type:ignore
198
- if attribute_name != "connection_qualified_name" :
199
- connection_qn = f"{ fields [0 ]} /{ fields [1 ]} /{ fields [2 ]} "
200
- return connection_qn , connector_name
201
- return connector_name
202
- except ValueError as e :
203
- raise ValueError (err ) from e
223
+ connector_name = AtlanConnectorType (connector_value ).value # type: ignore
224
+ except ValueError :
225
+ custom_connection = AtlanConnectorType .CREATE_CUSTOM (
226
+ # Ensure the enum name is converted to UPPER_SNAKE_CASE from kebab-case
227
+ name = connector_value .replace ("-" , "_" ).upper (),
228
+ value = connector_value ,
229
+ )
230
+ connector_name = custom_connection .value
231
+ if attribute_name != "connection_qualified_name" :
232
+ connection_qn = f"{ fields [0 ]} /{ fields [1 ]} /{ fields [2 ]} "
233
+ return connection_qn , connector_name
234
+ return connector_name
204
235
205
236
SNOWFLAKE = ("snowflake" , AtlanConnectionCategory .WAREHOUSE )
206
237
TABLEAU = ("tableau" , AtlanConnectionCategory .BI )
0 commit comments