Skip to content

Commit 23ccd19

Browse files
UN-2301 [FIX] Capture connector error logs in ETL and display in frontend (#1650)
* Adding logs to worker-logging * Adding logs to worker-logging * minor change in source connector * PR reviews * small change --------- Co-authored-by: Chandrasekharan M <[email protected]>
1 parent 6f8c169 commit 23ccd19

File tree

6 files changed

+136
-46
lines changed

6 files changed

+136
-46
lines changed

backend/workflow_manager/endpoint_v2/destination.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -126,16 +126,24 @@ def _get_source_endpoint_for_workflow(
126126

127127
def validate(self) -> None:
128128
connection_type = self.endpoint.connection_type
129-
connector: ConnectorInstance = self.endpoint.connector_instance
129+
connector: ConnectorInstance | None = self.endpoint.connector_instance
130+
130131
if connection_type is None:
132+
error_msg = "Missing destination connection type"
133+
self.workflow_log.log_error(logger, error_msg)
131134
raise MissingDestinationConnectionType()
132135
if connection_type not in WorkflowEndpoint.ConnectionType.values:
136+
error_msg = f"Invalid destination connection type: {connection_type}"
137+
self.workflow_log.log_error(logger, error_msg)
133138
raise InvalidDestinationConnectionType()
134-
if (
139+
# Check if connector is required but missing
140+
requires_connector = (
135141
connection_type != WorkflowEndpoint.ConnectionType.API
136142
and connection_type != WorkflowEndpoint.ConnectionType.MANUALREVIEW
137-
and connector is None
138-
):
143+
)
144+
if requires_connector and connector is None:
145+
error_msg = "Destination connector not configured"
146+
self.workflow_log.log_error(logger, error_msg)
139147
raise DestinationConnectorNotConfigured()
140148

141149
# Validate database connection if it's a database destination
@@ -147,10 +155,18 @@ def validate(self) -> None:
147155
connector_settings=connector.connector_metadata,
148156
)
149157
engine = db_class.get_engine()
158+
self.workflow_log.log_info(logger, "Database connection test successful")
150159
if hasattr(engine, "close"):
151160
engine.close()
161+
except ConnectorError as e:
162+
error_msg = f"Database connector validation failed: {e}"
163+
self.workflow_log.log_error(logger, error_msg)
164+
logger.exception(error_msg)
165+
raise
152166
except Exception as e:
153-
logger.error(f"Database connection failed: {str(e)}")
167+
error_msg = f"Unexpected error during database validation: {e}"
168+
self.workflow_log.log_error(logger, error_msg)
169+
logger.exception(error_msg)
154170
raise
155171

156172
def _should_handle_hitl(

backend/workflow_manager/endpoint_v2/source.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,20 @@ def _get_endpoint_for_workflow(
124124
def validate(self) -> None:
125125
connection_type = self.endpoint.connection_type
126126
connector: ConnectorInstance = self.endpoint.connector_instance
127+
127128
if connection_type is None:
129+
error_msg = "Missing source connection type"
130+
self.workflow_log.log_error(logger, error_msg)
128131
raise MissingSourceConnectionType()
132+
129133
if connection_type not in WorkflowEndpoint.ConnectionType.values:
134+
error_msg = f"Invalid source connection type: {connection_type}"
135+
self.workflow_log.log_error(logger, error_msg)
130136
raise InvalidSourceConnectionType()
137+
131138
if connection_type != WorkflowEndpoint.ConnectionType.API and connector is None:
139+
error_msg = "Source connector not configured"
140+
self.workflow_log.log_error(logger, error_msg)
132141
raise SourceConnectorNotConfigured()
133142

134143
def valid_file_patterns(self, required_patterns: list[Any]) -> list[str]:

backend/workflow_manager/utils/workflow_log.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,7 @@
44
from utils.local_context import StateStore
55

66
from unstract.core.pubsub_helper import LogPublisher
7-
from unstract.workflow_execution.enums import (
8-
LogComponent,
9-
LogLevel,
10-
LogStage,
11-
LogState,
12-
)
7+
from unstract.workflow_execution.enums import LogComponent, LogLevel, LogStage, LogState
138

149

1510
class WorkflowLog:
@@ -22,7 +17,8 @@ def __init__(
2217
pipeline_id: str | None = None,
2318
):
2419
log_events_id: str | None = StateStore.get(Common.LOG_EVENTS_ID)
25-
self.messaging_channel = log_events_id if log_events_id else pipeline_id
20+
# Ensure messaging_channel is never None - use execution_id as fallback
21+
self.messaging_channel = log_events_id or pipeline_id or str(execution_id)
2622
self.execution_id = str(execution_id)
2723
self.file_execution_id = str(file_execution_id) if file_execution_id else None
2824
self.organization_id = str(organization_id) if organization_id else None

workers/shared/workflow/destination_connector.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,20 @@
2020
from dataclasses import dataclass
2121
from typing import TYPE_CHECKING, Any, Optional
2222

23-
from shared.enums import QueueResultStatus
23+
from shared.enums import DestinationConfigKey, QueueResultStatus
2424

2525
# Import database utils (stable path)
2626
from shared.infrastructure.database.utils import WorkerDatabaseUtils
27+
from shared.infrastructure.logging import WorkerLogger
28+
from shared.infrastructure.logging.helpers import log_file_error, log_file_info
2729
from shared.models.result_models import QueueResult
30+
from shared.utils.api_result_cache import get_api_cache_manager
2831
from shared.utils.manual_review_factory import (
2932
get_manual_review_service,
3033
has_manual_review_plugin,
3134
)
35+
from shared.workflow.connectors.service import WorkerConnectorService
36+
from shared.workflow.logger_helper import WorkflowLoggerHelper
3237

3338
from unstract.connectors.connectorkit import Connectorkit
3439
from unstract.connectors.exceptions import ConnectorError
@@ -53,12 +58,6 @@
5358
ExecutionFileHandler,
5459
)
5560

56-
from ..enums import DestinationConfigKey
57-
from ..infrastructure.logging import WorkerLogger
58-
from ..infrastructure.logging.helpers import log_file_error, log_file_info
59-
from ..utils.api_result_cache import get_api_cache_manager
60-
from .connectors.service import WorkerConnectorService
61-
6261
if TYPE_CHECKING:
6362
from ..api_client import InternalAPIClient
6463

@@ -198,6 +197,9 @@ def __init__(self, config: DestinationConfig, workflow_log=None):
198197
self.settings = config.settings
199198
self.workflow_log = workflow_log
200199

200+
# Initialize logger helper for safe logging operations
201+
self.logger_helper = WorkflowLoggerHelper(workflow_log)
202+
201203
# Store destination connector instance details
202204
self.connector_id = config.connector_id
203205
self.connector_settings = config.connector_settings
@@ -971,7 +973,7 @@ def insert_into_db(
971973
logger.info(f"Successfully inserted data into database table {table_name}")
972974

973975
# Log to UI with file_execution_id for better correlation
974-
if self.workflow_log and hasattr(self, "current_file_execution_id"):
976+
if hasattr(self, "current_file_execution_id"):
975977
log_file_info(
976978
self.workflow_log,
977979
self.current_file_execution_id,
@@ -1219,7 +1221,7 @@ def copy_output_to_output_directory(
12191221
logger.error(error_message)
12201222

12211223
# Log to UI with file_execution_id
1222-
if self.workflow_log and hasattr(self, "current_file_execution_id"):
1224+
if hasattr(self, "current_file_execution_id"):
12231225
log_file_info(
12241226
self.workflow_log,
12251227
self.current_file_execution_id,
@@ -1233,7 +1235,7 @@ def copy_output_to_output_directory(
12331235
logger.info(success_message)
12341236

12351237
# Log to UI
1236-
if self.workflow_log and hasattr(self, "current_file_execution_id"):
1238+
if hasattr(self, "current_file_execution_id"):
12371239
log_file_info(
12381240
self.workflow_log,
12391241
self.current_file_execution_id,
@@ -1246,7 +1248,7 @@ def copy_output_to_output_directory(
12461248
logger.info(success_message)
12471249

12481250
# Log to UI
1249-
if self.workflow_log and hasattr(self, "current_file_execution_id"):
1251+
if hasattr(self, "current_file_execution_id"):
12501252
log_file_info(
12511253
self.workflow_log,
12521254
self.current_file_execution_id,
@@ -1258,7 +1260,7 @@ def copy_output_to_output_directory(
12581260
logger.error(error_msg, exc_info=True)
12591261

12601262
# Log error to UI
1261-
if self.workflow_log and hasattr(self, "current_file_execution_id"):
1263+
if hasattr(self, "current_file_execution_id"):
12621264
log_file_info(
12631265
self.workflow_log,
12641266
self.current_file_execution_id,
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
"""Workflow logger helper for safe logging operations.
2+
3+
This module provides a helper class to handle workflow logging operations
4+
safely, eliminating the need for repetitive conditional checks throughout
5+
the codebase.
6+
"""
7+
8+
import logging
9+
10+
from shared.infrastructure.logging.workflow_logger import WorkerWorkflowLogger
11+
12+
13+
class WorkflowLoggerHelper:
14+
"""Helper class for safe workflow logging operations.
15+
16+
This class encapsulates the logic for safely logging messages when a
17+
workflow_log instance is available, eliminating repetitive conditional
18+
checks throughout the connector classes.
19+
"""
20+
21+
def __init__(self, workflow_log: WorkerWorkflowLogger | None = None) -> None:
22+
"""Initialize the logger helper.
23+
24+
Args:
25+
workflow_log: Optional workflow log instance that provides
26+
log_info and log_error methods.
27+
"""
28+
self.workflow_log = workflow_log
29+
30+
def log_info(self, logger: logging.Logger, message: str) -> None:
31+
"""Safely log info message if workflow_log is available.
32+
33+
Args:
34+
logger: The standard Python logger instance
35+
message: The message to log
36+
"""
37+
if self.workflow_log:
38+
self.workflow_log.log_info(logger, message)
39+
40+
def log_error(self, logger: logging.Logger, message: str) -> None:
41+
"""Safely log error message if workflow_log is available.
42+
43+
Args:
44+
logger: The standard Python logger instance
45+
message: The error message to log
46+
"""
47+
if self.workflow_log:
48+
self.workflow_log.log_error(logger, message)

workers/shared/workflow/source_connector.py

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@
1313
from dataclasses import dataclass
1414
from typing import Any
1515

16-
from unstract.core.data_models import ConnectionType as CoreConnectionType
16+
from shared.infrastructure.logging.logger import WorkerLogger
17+
from shared.workflow.logger_helper import WorkflowLoggerHelper
1718

18-
from ..infrastructure.logging.logger import WorkerLogger
19+
from unstract.core.data_models import ConnectionType as CoreConnectionType
1920

2021
logger = WorkerLogger.get_logger(__name__)
2122

@@ -94,6 +95,7 @@ def __init__(self, config: SourceConfig, workflow_log=None):
9495
self.connection_type = config.connection_type
9596
self.settings = config.settings
9697
self.workflow_log = workflow_log
98+
self.logger_helper = WorkflowLoggerHelper(workflow_log)
9799

98100
# Store connector instance details
99101
self.connector_id = config.connector_id
@@ -110,27 +112,39 @@ def get_fsspec_fs(self):
110112
111113
This method replicates backend logic for getting filesystem access.
112114
"""
113-
if self.connection_type == self.ConnectionType.API_STORAGE:
114-
# API storage uses workflow execution storage
115-
from unstract.filesystem import FileStorageType, FileSystem
115+
try:
116+
if self.connection_type == self.ConnectionType.API_STORAGE:
117+
# API storage uses workflow execution storage
118+
from unstract.filesystem import FileStorageType, FileSystem
119+
120+
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
121+
return file_system.get_file_storage()
116122

117-
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
118-
return file_system.get_file_storage()
123+
if not self.connector_id or not self.connector_settings:
124+
error_msg = (
125+
"Source connector not configured - missing connector_id or settings"
126+
)
127+
self.logger_helper.log_error(logger, error_msg)
128+
raise Exception(error_msg)
119129

120-
if not self.connector_id or not self.connector_settings:
121-
raise Exception("Source connector not configured")
130+
# Get the connector instance using connectorkit
131+
from unstract.connectors.connectorkit import Connectorkit
122132

123-
# Get the connector instance using connectorkit
124-
from unstract.connectors.connectorkit import Connectorkit
133+
connectorkit = Connectorkit()
134+
connector_class = connectorkit.get_connector_class_by_connector_id(
135+
self.connector_id
136+
)
137+
connector_instance = connector_class(self.connector_settings)
125138

126-
connectorkit = Connectorkit()
127-
connector_class = connectorkit.get_connector_class_by_connector_id(
128-
self.connector_id
129-
)
130-
connector_instance = connector_class(self.connector_settings)
139+
# Get fsspec filesystem
140+
fs = connector_instance.get_fsspec_fs()
141+
return fs
131142

132-
# Get fsspec filesystem
133-
return connector_instance.get_fsspec_fs()
143+
except Exception as e:
144+
error_msg = f"Failed to initialize source connector filesystem: {str(e)}"
145+
self.logger_helper.log_error(logger, error_msg)
146+
logger.error(error_msg)
147+
raise
134148

135149
def read_file_content(self, file_path: str) -> bytes:
136150
"""Read file content from source connector.
@@ -164,7 +178,6 @@ def list_files(
164178
List of file information dictionaries
165179
"""
166180
fs = self.get_fsspec_fs()
167-
168181
# Implementation would list files using fsspec
169182
# This is a simplified version
170183
try:
@@ -186,7 +199,9 @@ def list_files(
186199

187200
return files
188201
except Exception as e:
189-
logger.error(f"Failed to list files from source: {e}")
202+
error_msg = f"Failed to list files from source connector directory '{input_directory}': {str(e)}"
203+
self.logger_helper.log_error(logger, error_msg)
204+
logger.error(error_msg)
190205
return []
191206

192207
def validate(self) -> None:
@@ -198,11 +213,15 @@ def validate(self) -> None:
198213
self.ConnectionType.API,
199214
self.ConnectionType.API_STORAGE,
200215
]:
201-
raise Exception(f"Invalid source connection type: {connection_type}")
216+
error_msg = f"Invalid source connection type: {connection_type}"
217+
self.logger_helper.log_error(logger, error_msg)
218+
raise Exception(error_msg)
202219

203220
if connection_type == self.ConnectionType.FILESYSTEM:
204221
if not self.connector_id or not self.connector_settings:
205-
raise Exception("Filesystem source requires connector configuration")
222+
error_msg = "Filesystem source requires connector configuration"
223+
self.logger_helper.log_error(logger, error_msg)
224+
raise Exception(error_msg)
206225

207226
def get_config(self) -> SourceConfig:
208227
"""Get serializable configuration for the source connector."""

0 commit comments

Comments
 (0)