Skip to content
This repository was archived by the owner on May 27, 2025. It is now read-only.

Commit c171387

Browse files
committed
update callbacks
1 parent 92af333 commit c171387

File tree

6 files changed

+43
-62
lines changed

6 files changed

+43
-62
lines changed

backend/src/logger/application_insights_workflow_callbacks.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
)
1414

1515
from azure.monitor.opentelemetry.exporter import AzureMonitorLogExporter
16-
from datashaper.workflow.workflow_callbacks import NoopWorkflowCallbacks
16+
from graphrag.callbacks.noop_workflow_callbacks import NoopWorkflowCallbacks
1717
from opentelemetry._logs import (
1818
get_logger_provider,
1919
set_logger_provider,
@@ -121,7 +121,7 @@ def _format_details(self, details: Dict[str, Any] | None = None) -> Dict[str, An
121121
return {}
122122
return {"custom_dimensions": {**self._properties, **unwrap_dict(details)}}
123123

124-
def on_workflow_start(self, name: str, instance: object) -> None:
124+
def workflow_start(self, name: str, instance: object) -> None:
125125
"""Execute this callback when a workflow starts."""
126126
self._workflow_name = name
127127
self._processed_workflow_steps.append(name)
@@ -142,7 +142,7 @@ def on_workflow_start(self, name: str, instance: object) -> None:
142142
message, stack_info=False, extra=self._format_details(details=details)
143143
)
144144

145-
def on_workflow_end(self, name: str, instance: object) -> None:
145+
def workflow_end(self, name: str, instance: object) -> None:
146146
"""Execute this callback when a workflow ends."""
147147
message = f"Index: {self._index_name} -- " if self._index_name else ""
148148
workflow_progress = (
@@ -161,7 +161,7 @@ def on_workflow_end(self, name: str, instance: object) -> None:
161161
message, stack_info=False, extra=self._format_details(details=details)
162162
)
163163

164-
def on_error(
164+
def error(
165165
self,
166166
message: str,
167167
cause: Optional[BaseException] = None,
@@ -178,24 +178,18 @@ def on_error(
178178
extra=self._format_details(details=details),
179179
)
180180

181-
def on_warning(self, message: str, details: Optional[dict] = None) -> None:
181+
def warning(self, message: str, details: Optional[dict] = None) -> None:
182182
"""A call back handler for when a warning occurs."""
183183
self._logger.warning(
184184
message, stack_info=False, extra=self._format_details(details=details)
185185
)
186186

187-
def on_log(self, message: str, details: Optional[dict] = None) -> None:
187+
def log(self, message: str, details: Optional[dict] = None) -> None:
188188
"""A call back handler for when a log message occurs."""
189189
self._logger.info(
190190
message, stack_info=False, extra=self._format_details(details=details)
191191
)
192192

193-
def on_measure(
194-
self, name: str, value: float, details: Optional[dict] = None
195-
) -> None:
196-
"""A call back handler for when a measurement occurs."""
197-
raise NotImplementedError("on_measure() not supported by this logger.")
198-
199193

200194
def unwrap_dict(input_dict, parent_key="", sep="_"):
201195
"""

backend/src/logger/blob_workflow_callbacks.py

Lines changed: 21 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,11 @@
44
from datetime import datetime
55
from typing import (
66
Any,
7-
Optional,
87
)
98

109
from azure.storage.blob import BlobServiceClient
11-
from datashaper import NoopWorkflowCallbacks
1210
from devtools import pformat
11+
from graphrag.callbacks.noop_workflow_callbacks import NoopWorkflowCallbacks
1312

1413

1514
class BlobWorkflowCallbacks(NoopWorkflowCallbacks):
@@ -72,23 +71,7 @@ def _write_log(self, log: dict[str, Any]):
7271
blob_client.append_block(pformat(log, indent=2) + "\n")
7372
self._num_blocks += 1
7473

75-
def on_error(
76-
self,
77-
message: str,
78-
cause: BaseException | None = None,
79-
stack: str | None = None,
80-
details: dict | None = None,
81-
):
82-
"""Report an error."""
83-
self._write_log({
84-
"type": "error",
85-
"data": message,
86-
"cause": str(cause),
87-
"stack": stack,
88-
"details": details,
89-
})
90-
91-
def on_workflow_start(self, name: str, instance: object) -> None:
74+
def workflow_start(self, name: str, instance: object) -> None:
9275
"""Execute this callback when a workflow starts."""
9376
self._workflow_name = name
9477
self._processed_workflow_steps.append(name)
@@ -111,7 +94,7 @@ def on_workflow_start(self, name: str, instance: object) -> None:
11194
"details": details,
11295
})
11396

114-
def on_workflow_end(self, name: str, instance: object) -> None:
97+
def workflow_end(self, name: str, instance: object) -> None:
11598
"""Execute this callback when a workflow ends."""
11699
message = f"Index: {self._index_name} -- " if self._index_name else ""
117100
workflow_progress = (
@@ -132,16 +115,26 @@ def on_workflow_end(self, name: str, instance: object) -> None:
132115
"details": details,
133116
})
134117

135-
def on_warning(self, message: str, details: dict | None = None):
118+
def error(
119+
self,
120+
message: str,
121+
cause: BaseException | None = None,
122+
stack: str | None = None,
123+
details: dict | None = None,
124+
):
125+
"""Report an error."""
126+
self._write_log({
127+
"type": "error",
128+
"data": message,
129+
"cause": str(cause),
130+
"stack": stack,
131+
"details": details,
132+
})
133+
134+
def warning(self, message: str, details: dict | None = None):
136135
"""Report a warning."""
137136
self._write_log({"type": "warning", "data": message, "details": details})
138137

139-
def on_log(self, message: str, details: dict | None = None):
138+
def log(self, message: str, details: dict | None = None):
140139
"""Report a generic log message."""
141140
self._write_log({"type": "log", "data": message, "details": details})
142-
143-
def on_measure(
144-
self, name: str, value: float, details: Optional[dict] = None
145-
) -> None:
146-
"""A call back handler for when a measurement occurs."""
147-
pass

backend/src/logger/console_workflow_callbacks.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
Optional,
1212
)
1313

14-
from datashaper.workflow.workflow_callbacks import NoopWorkflowCallbacks
14+
from graphrag.callbacks.noop_workflow_callbacks import NoopWorkflowCallbacks
1515

1616

1717
class ConsoleWorkflowCallbacks(NoopWorkflowCallbacks):
@@ -107,7 +107,7 @@ def _format_details(self, details: Dict[str, Any] | None = None) -> Dict[str, An
107107
details = {}
108108
return {**self._properties, **details}
109109

110-
def on_workflow_start(self, name: str, instance: object) -> None:
110+
def workflow_start(self, name: str, instance: object) -> None:
111111
"""Execute this callback when a workflow starts."""
112112
self._workflow_name = name
113113
self._processed_workflow_steps.append(name)
@@ -128,7 +128,7 @@ def on_workflow_start(self, name: str, instance: object) -> None:
128128
message, stack_info=False, extra=self._format_details(details=details)
129129
)
130130

131-
def on_workflow_end(self, name: str, instance: object) -> None:
131+
def workflow_end(self, name: str, instance: object) -> None:
132132
"""Execute this callback when a workflow ends."""
133133
message = f"Index: {self._index_name} -- " if self._index_name else ""
134134
workflow_progress = (
@@ -147,7 +147,7 @@ def on_workflow_end(self, name: str, instance: object) -> None:
147147
message, stack_info=False, extra=self._format_details(details=details)
148148
)
149149

150-
def on_error(
150+
def error(
151151
self,
152152
message: str,
153153
cause: Optional[BaseException] = None,
@@ -164,20 +164,14 @@ def on_error(
164164
extra=self._format_details(details=details),
165165
)
166166

167-
def on_warning(self, message: str, details: Optional[dict] = None) -> None:
167+
def warning(self, message: str, details: Optional[dict] = None) -> None:
168168
"""A call back handler for when a warning occurs."""
169169
self._logger.warning(
170170
message, stack_info=False, extra=self._format_details(details=details)
171171
)
172172

173-
def on_log(self, message: str, details: Optional[dict] = None) -> None:
173+
def log(self, message: str, details: Optional[dict] = None) -> None:
174174
"""A call back handler for when a log message occurs."""
175175
self._logger.info(
176176
message, stack_info=False, extra=self._format_details(details=details)
177177
)
178-
179-
def on_measure(
180-
self, name: str, value: float, details: Optional[dict] = None
181-
) -> None:
182-
"""A call back handler for when a measurement occurs."""
183-
pass

backend/src/logger/load_logger.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
from pathlib import Path
66
from typing import List
77

8-
from datashaper import WorkflowCallbacks, WorkflowCallbacksManager
98
from graphrag.callbacks.file_workflow_callbacks import FileWorkflowCallbacks
9+
from graphrag.callbacks.workflow_callbacks import WorkflowCallbacks
1010

1111
from src.api.azure_clients import AzureClientManager
1212
from src.logger.application_insights_workflow_callbacks import (
@@ -23,7 +23,7 @@ def load_pipeline_logger(
2323
index_name: str = "",
2424
num_workflow_steps: int = 0,
2525
) -> WorkflowCallbacks:
26-
"""Create a callback manager and register a list of loggers.
26+
"""Create and load a list of loggers.
2727
2828
Loggers may be configured as generic loggers or associated with a specified indexing job.
2929
"""
@@ -32,7 +32,7 @@ def load_pipeline_logger(
3232
reporters.append(Reporters.CONSOLE)
3333

3434
azure_client_manager = AzureClientManager()
35-
callback_manager = WorkflowCallbacksManager()
35+
logger_callbacks = []
3636
for reporter in reporters:
3737
match reporter:
3838
case Reporters.BLOB:
@@ -48,7 +48,7 @@ def load_pipeline_logger(
4848
).exists():
4949
blob_service_client.create_container(container_root)
5050
# register the blob reporter
51-
callback_manager.register(
51+
logger_callbacks.append(
5252
BlobWorkflowCallbacks(
5353
blob_service_client=blob_service_client,
5454
container_name=container_name,
@@ -57,10 +57,10 @@ def load_pipeline_logger(
5757
)
5858
)
5959
case Reporters.FILE:
60-
callback_manager.register(FileWorkflowCallbacks(dir=reporting_dir))
60+
logger_callbacks.append(FileWorkflowCallbacks(dir=reporting_dir))
6161
case Reporters.APP_INSIGHTS:
6262
if os.getenv("APP_INSIGHTS_CONNECTION_STRING"):
63-
callback_manager.register(
63+
logger_callbacks.append(
6464
ApplicationInsightsWorkflowCallbacks(
6565
connection_string=os.environ[
6666
"APP_INSIGHTS_CONNECTION_STRING"
@@ -70,11 +70,11 @@ def load_pipeline_logger(
7070
)
7171
)
7272
case Reporters.CONSOLE:
73-
callback_manager.register(
73+
logger_callbacks.append(
7474
ConsoleWorkflowCallbacks(
7575
index_name=index_name, num_workflow_steps=num_workflow_steps
7676
)
7777
)
7878
case _:
7979
print(f"WARNING: unknown reporter type: {reporter}. Skipping.")
80-
return callback_manager
80+
return logger_callbacks

backend/src/logger/logger_singleton.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import os
55
from urllib.parse import urlparse
66

7-
from datashaper import WorkflowCallbacks
7+
from graphrag.callbacks.workflow_callbacks import WorkflowCallbacks
88

99
from src.logger.load_logger import load_pipeline_logger
1010
from src.logger.typing import Reporters

backend/src/logger/pipeline_job_workflow_callbacks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT License.
33

4-
from datashaper.workflow.workflow_callbacks import NoopWorkflowCallbacks
4+
from graphrag.callbacks.noop_workflow_callbacks import NoopWorkflowCallbacks
55

66
from src.typing.pipeline import PipelineJobState
77
from src.utils.pipeline import PipelineJob

0 commit comments

Comments
 (0)