Skip to content

Commit b400d2e

Browse files
feat: seperate ray logs and service logs
1 parent 37cbfcf commit b400d2e

File tree

17 files changed

+482
-389
lines changed

17 files changed

+482
-389
lines changed

graphgen/bases/__init__.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,11 @@
22
from .base_generator import BaseGenerator
33
from .base_kg_builder import BaseKGBuilder
44
from .base_llm_wrapper import BaseLLMWrapper
5+
from .base_operator import BaseOperator
56
from .base_partitioner import BasePartitioner
67
from .base_reader import BaseReader
78
from .base_searcher import BaseSearcher
89
from .base_splitter import BaseSplitter
9-
from .base_storage import (
10-
BaseGraphStorage,
11-
BaseKVStorage,
12-
BaseListStorage,
13-
StorageNameSpace,
14-
)
10+
from .base_storage import BaseGraphStorage, BaseKVStorage, StorageNameSpace
1511
from .base_tokenizer import BaseTokenizer
1612
from .datatypes import Chunk, Config, Node, QAPair, Token

graphgen/bases/base_operator.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import inspect
2+
import os
3+
from abc import ABC, abstractmethod
4+
from typing import Iterable, Union
5+
6+
import pandas as pd
7+
import ray
8+
9+
from graphgen.utils import CURRENT_LOGGER_VAR, set_logger
10+
11+
12+
class BaseOperator(ABC):
13+
def __init__(self, working_dir: str = "cache", op_name: str = None):
14+
log_dir = os.path.join(working_dir, "logs")
15+
self.op_name = op_name or self.__class__.__name__
16+
17+
try:
18+
ctx = ray.get_runtime_context()
19+
worker_id = ctx.get_actor_id() or ctx.get_worker_id()
20+
worker_id_short = worker_id[-6:] if worker_id else "driver"
21+
except Exception as e:
22+
print(
23+
"Warning: Could not get Ray worker ID, defaulting to 'local'. Exception:",
24+
e,
25+
)
26+
worker_id_short = "local"
27+
28+
# e.g. cache/logs/ChunkService_a1b2c3.log
29+
log_file = os.path.join(log_dir, f"{self.op_name}_{worker_id_short}.log")
30+
31+
self.logger = set_logger(
32+
log_file=log_file, name=f"{self.op_name}.{worker_id_short}", force=True
33+
)
34+
35+
self.logger.info(
36+
"[%s] Operator initialized on Worker %s", self.op_name, worker_id_short
37+
)
38+
39+
def __call__(
40+
self, batch: pd.DataFrame
41+
) -> Union[pd.DataFrame, Iterable[pd.DataFrame]]:
42+
logger_token = CURRENT_LOGGER_VAR.set(self.logger)
43+
try:
44+
result = self.process(batch)
45+
if inspect.isgenerator(result):
46+
yield from result
47+
else:
48+
yield result
49+
finally:
50+
CURRENT_LOGGER_VAR.reset(logger_token)
51+
52+
@abstractmethod
53+
def process(self, batch):
54+
raise NotImplementedError("Subclasses must implement the process method.")
55+
56+
def get_logger(self):
57+
return self.logger

graphgen/bases/base_storage.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ def upsert(self, data: dict[str, T]):
4141
def drop(self):
4242
raise NotImplementedError
4343

44+
def reload(self):
45+
raise NotImplementedError
46+
4447

4548
class BaseGraphStorage(StorageNameSpace):
4649
def has_node(self, node_id: str) -> bool:
@@ -88,3 +91,6 @@ def upsert_edge(
8891

8992
def delete_node(self, node_id: str):
9093
raise NotImplementedError
94+
95+
def reload(self):
96+
raise NotImplementedError

0 commit comments

Comments
 (0)