1919logger = logging .getLogger (__name__ )
2020
2121
22+ def _extract_proto_value (field ):
23+ """Extract value from protobuf wrapper types (StringValue, UInt32Value, etc.) when serialized to dict."""
24+ if field is None :
25+ return None
26+ if isinstance (field , dict ):
27+ return field .get ('value' )
28+ return field
29+
30+
2231def _execute_asset_refresh_task (playbook_task_execution_log ):
2332 """Execute asset refresh task using the playbook infrastructure"""
2433 try :
@@ -28,22 +37,25 @@ def _execute_asset_refresh_task(playbook_task_execution_log):
2837 drd_proxy_agent = task .get ('drd_proxy_agent' , {})
2938
3039 asset_refresh = drd_proxy_agent .get ('asset_refresh' , {})
31- connector_name = asset_refresh .get ('connector_name' )
32- connector_type = asset_refresh .get ('connector_type' )
33- extractor_method = asset_refresh .get ('extractor_method' ) # Optional field for specific method
34-
40+ # Extract values from protobuf wrapper types (they become {'value': 'x'} in JSON)
41+ connector_name = _extract_proto_value (asset_refresh .get ('connector_name' ))
42+ connector_type = _extract_proto_value (asset_refresh .get ('connector_type' ))
43+ extractor_method = _extract_proto_value (asset_refresh .get ('extractor_method' ))
44+
3545 logger .info (f'_execute_asset_refresh_task:: Starting asset refresh for connector: { connector_name } , '
3646 f'type: { connector_type } , request_id: { request_id } , method: { extractor_method } ' )
37-
47+
3848 if not request_id or not connector_name or not connector_type :
3949 raise ValueError (f'Missing required fields: request_id={ request_id } , connector_name={ connector_name } , connector_type={ connector_type } ' )
4050
4151 # Handle native kubernetes mode or find connector in loaded connections
4252 loaded_connections = settings .LOADED_CONNECTIONS if settings .LOADED_CONNECTIONS else {}
4353 credentials_dict = None
44-
54+
4555 # Check if this is a native kubernetes connector
46- if settings .NATIVE_KUBERNETES_API_MODE and connector_type == 'KUBERNETES' :
56+ # connector_type is now an integer (Source enum value) - KUBERNETES = 47
57+ is_kubernetes = connector_type == 47 or connector_type == 'KUBERNETES' or str (connector_type ) == '47'
58+ if settings .NATIVE_KUBERNETES_API_MODE and is_kubernetes :
4759 # For native kubernetes, we don't need loaded connections
4860 credentials_dict = {}
4961 logger .info (f'Using native Kubernetes mode for connector: { connector_name } ' )
@@ -56,7 +68,10 @@ def _execute_asset_refresh_task(playbook_task_execution_log):
5668 break
5769
5870 if credentials_dict is None :
59- raise ValueError (f'Connector not found or no credentials: { connector_name } ' )
71+ available_connectors = list (loaded_connections .keys ()) if loaded_connections else []
72+ raise ValueError (f'Connector not found or no credentials: { connector_name } . '
73+ f'Available connectors in config: { available_connectors } . '
74+ f'Please ensure the connector is configured in the VPC agent credentials.' )
6075
6176 # Execute asset refresh for the specific connector
6277 from asset_manager .tasks import populate_connector_metadata , extractor_async_method_call
0 commit comments