Skip to content

Commit d193f20

Browse files
authored
Enhance Logger (agentscope-ai#217)
1 parent 65b441d commit d193f20

38 files changed

+413
-187
lines changed

.github/workflows/unittest.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,12 @@ jobs:
7676
TYPE="${{ steps.test_type.outputs.type }}"
7777
if [ "$TYPE" = "all" ]; then
7878
echo "tests_run=true" >> $GITHUB_ENV
79-
docker compose exec trinity-node-1 pytest tests -v -s --ignore=tests/data --ctrf report.json
79+
docker compose exec trinity-node-1 pytest tests -v -s --ctrf report.json
8080
elif [ "$TYPE" = "diff" ]; then
8181
if [ -s ../../../test_dirs.txt ]; then
8282
echo "tests_run=true" >> $GITHUB_ENV
8383
TEST_DIRS=$(cat ../../../test_dirs.txt | xargs)
84-
docker compose exec trinity-node-1 pytest $TEST_DIRS -v -s --ignore=tests/data --ctrf report.json
84+
docker compose exec trinity-node-1 pytest $TEST_DIRS -v -s --ctrf report.json
8585
else
8686
echo "No changed modules detected, skipping tests."
8787
echo "tests_run=false" >> $GITHUB_ENV
@@ -90,7 +90,7 @@ jobs:
9090
MODULE="${{ steps.test_type.outputs.module }}"
9191
if [ -n "$MODULE" ]; then
9292
echo "tests_run=true" >> $GITHUB_ENV
93-
docker compose exec trinity-node-1 pytest tests/$MODULE -v -s --ignore=tests/data --ctrf report.json
93+
docker compose exec trinity-node-1 pytest tests/$MODULE -v -s --ctrf report.json
9494
else
9595
echo "No module specified, skipping tests."
9696
echo "tests_run=false" >> $GITHUB_ENV

docs/sphinx_doc/source/tutorial/trinity_configs.md

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,14 @@ monitor:
4040
data_processor:
4141
# Preprocessing data settings
4242
...
43+
44+
service:
45+
# Services to use
46+
...
47+
48+
log:
49+
# Ray actor logging
50+
...
4351
```
4452

4553
Each of these sections will be explained in detail below.
@@ -395,28 +403,36 @@ trainer:
395403

396404
---
397405

398-
## Data Processor Configuration
406+
## Service Configuration
399407

400-
Configures preprocessing and data cleaning pipelines.
408+
Configures services used by Trinity-RFT. Only support Data Juicer service for now.
401409

402410
```yaml
403-
data_processor:
404-
source_data_path: /PATH/TO/DATASET
405-
load_kwargs:
406-
split: 'train'
407-
format:
408-
prompt_key: 'question'
409-
response_key: 'answer'
410-
dj_config_path: 'tests/test_configs/active_iterator_test_dj_cfg.yaml'
411-
clean_strategy: 'iterative'
412-
db_url: 'postgresql://{username}@localhost:5432/{db_name}'
411+
service:
412+
data_juicer:
413+
server_url: 'http://127.0.0.1:5005'
414+
auto_start: true
415+
port: 5005
416+
```
417+
418+
- `server_url`: The url of data juicer server.
419+
- `auto_start`: Whether to automatically start the data juicer service.
420+
- `port`: The port for Data Juicer service when `auto_start` is true.
421+
422+
--
423+
424+
## Log Configuration
425+
426+
Ray actor logging configuration.
427+
428+
```yaml
429+
log:
430+
level: INFO
431+
group_by_node: False
413432
```
414433

415-
- `source_data_path`: Path to the task dataset.
416-
- `load_kwargs`: Arguments passed to HuggingFace’s `load_dataset()`.
417-
- `dj_config_path`: Path to Data-Juicer configuration for cleaning.
418-
- `clean_strategy`: Strategy for iterative data cleaning.
419-
- `db_url`: Database URL if using SQL backend.
434+
- `level`: The logging level (supports `DEBUG`, `INFO`, `WARNING`, `ERROR`).
435+
- `group_by_node`: Whether to group logs by node IP. If set to `True`, an actor's logs will be save to `<checkpoint_root_dir>/<project>/<name>/log/<node_ip>/<actor_name>.log`, otherwise it will be saved to `<checkpoint_root_dir>/<project>/<name>/log/<actor_name>.log`.
420436

421437
---
422438

tests/tools.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@
1313
def get_template_config() -> Config:
1414
config_path = os.path.join(os.path.dirname(__file__), "template", "config.yaml")
1515
config = load_config(config_path)
16-
config.ray_namespace = ray.get_runtime_context().namespace
16+
if ray.is_initialized():
17+
config.ray_namespace = ray.get_runtime_context().namespace
18+
else:
19+
config.ray_namespace = "trinity_unittest"
1720
return config
1821

1922

tests/trainer/trainer_test.py

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,13 @@
2121
)
2222
from trinity.cli.launcher import bench, both, explore, train
2323
from trinity.common.config import Config, StorageConfig
24-
from trinity.common.constants import StorageType, SyncMethod, SyncStyle
24+
from trinity.common.constants import (
25+
LOG_DIR_ENV_VAR,
26+
LOG_LEVEL_ENV_VAR,
27+
StorageType,
28+
SyncMethod,
29+
SyncStyle,
30+
)
2531
from trinity.common.models.utils import get_checkpoint_dir_with_step_num
2632
from trinity.manager.manager import CacheManager
2733

@@ -355,12 +361,28 @@ def tearDown(self):
355361

356362

357363
def run_trainer(config: Config) -> None:
358-
ray.init(namespace=config.ray_namespace)
364+
ray.init(
365+
namespace=config.ray_namespace,
366+
runtime_env={
367+
"env_vars": {
368+
LOG_DIR_ENV_VAR: config.log.save_dir,
369+
LOG_LEVEL_ENV_VAR: "INFO",
370+
}
371+
},
372+
)
359373
train(config)
360374

361375

362376
def run_explorer(config: Config) -> None:
363-
ray.init(namespace=config.ray_namespace)
377+
ray.init(
378+
namespace=config.ray_namespace,
379+
runtime_env={
380+
"env_vars": {
381+
LOG_DIR_ENV_VAR: config.log.save_dir,
382+
LOG_LEVEL_ENV_VAR: "INFO",
383+
}
384+
},
385+
)
364386
explore(config)
365387

366388

@@ -487,6 +509,22 @@ def test_fully_async_mode(self, name, use_priority_queue):
487509
)[1],
488510
8,
489511
)
512+
log_files = os.listdir(os.path.join(explorer1_config.checkpoint_job_dir, "log"))
513+
self.assertTrue("trainer.log" in log_files)
514+
self.assertTrue("synchronizer.log" in log_files)
515+
self.assertTrue("explorer1.log" in log_files)
516+
self.assertTrue("explorer2.log" in log_files)
517+
self.assertTrue("explorer1_runner_0.log" in log_files)
518+
self.assertTrue("explorer1_runner_7.log" in log_files)
519+
self.assertTrue("explorer2_runner_0.log" in log_files)
520+
self.assertTrue("explorer2_runner_7.log" in log_files)
521+
self.assertTrue("explorer1_experience_pipeline.log" in log_files)
522+
self.assertTrue("explorer2_experience_pipeline.log" in log_files)
523+
files_to_check = ["trainer.log", "synchronizer.log", "explorer1.log", "explorer2.log"]
524+
for file_name in files_to_check:
525+
with open(os.path.join(explorer1_config.checkpoint_job_dir, "log", file_name)) as f:
526+
lines = f.readlines()
527+
self.assertTrue(len(lines) > 0)
490528
ray.shutdown()
491529

492530
def tearDown(self):

tests/utils/log_test.py

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
import logging
2+
import os
3+
import shutil
4+
import unittest
5+
6+
import ray
7+
from ray.runtime_env import RuntimeEnv
8+
9+
from tests.tools import get_template_config
10+
from trinity.common.constants import (
11+
LOG_DIR_ENV_VAR,
12+
LOG_LEVEL_ENV_VAR,
13+
LOG_NODE_IP_ENV_VAR,
14+
)
15+
from trinity.utils.log import get_logger
16+
17+
18+
def log_outside_actor(log_level=logging.INFO):
19+
logger = get_logger("outside_actor", level=log_level)
20+
logger.info("Outside logger initialized")
21+
logger.debug("Outside logger initialized")
22+
23+
24+
class ModuleInActor:
25+
def __init__(self):
26+
self.logger = get_logger("module_in_actor", in_ray_actor=True)
27+
self.logger.info("ModuleInActor initialized")
28+
self.logger.debug("ModuleInActor initialized")
29+
30+
31+
class ModuleInActor2:
32+
def __init__(self):
33+
# module create in actor should automatically inherit the logger created by the root actor
34+
self.logger = get_logger("module_in_actor2")
35+
self.logger.info("ModuleInActor2 initialized")
36+
self.logger.debug("ModuleInActor2 initialized")
37+
38+
39+
@ray.remote
40+
class ActorInActor:
41+
"""An actor created inside an actor"""
42+
43+
def __init__(self, parent_name, log_level):
44+
self.logger = get_logger(f"{parent_name}_nested", in_ray_actor=True, level=log_level)
45+
self.logger.info("ActorInActor initialized")
46+
self.logger.debug("ActorInActor initialized")
47+
48+
49+
@ray.remote
50+
class LogActor:
51+
def __init__(self, aid: int, log_level=logging.INFO):
52+
assert os.environ.get(LOG_DIR_ENV_VAR) is not None, "LOG_DIR_ENV_VAR must be set"
53+
self.logger = get_logger(f"actor_{aid}", in_ray_actor=True, level=log_level)
54+
self.logger.info(f"LogActor {aid} initialized ")
55+
self.logger.debug(f"LogActor {aid} initialized")
56+
self.aid = aid
57+
self.actor = ActorInActor.remote(f"actor_{aid}", log_level)
58+
ray.get(self.actor.__ray_ready__.remote())
59+
60+
def log_info(self, message: str):
61+
self.logger.info(f"LogActor {self.aid} info: {message}")
62+
self.logger.debug(f"LogActor {self.aid} debug: {message}")
63+
ModuleInActor()
64+
ModuleInActor2()
65+
66+
67+
class LogTest(unittest.TestCase):
68+
def setUp(self):
69+
if ray.is_initialized():
70+
ray.shutdown()
71+
self.config = get_template_config()
72+
self.config.check_and_update()
73+
self.log_dir = self.config.log.save_dir
74+
shutil.rmtree(self.log_dir, ignore_errors=True)
75+
os.makedirs(self.log_dir, exist_ok=True)
76+
77+
def test_no_actor_log(self):
78+
ray.init(
79+
namespace=self.config.ray_namespace,
80+
runtime_env=RuntimeEnv(
81+
env_vars={LOG_DIR_ENV_VAR: self.log_dir, LOG_LEVEL_ENV_VAR: "INFO"}
82+
),
83+
)
84+
try:
85+
logger = get_logger("outside_actor", level=logging.DEBUG)
86+
logger.info("Outside logger initialized")
87+
logger.debug("Outside logger initialized")
88+
self.assertFalse(os.path.exists(os.path.join(self.log_dir, "outside_actor.log")))
89+
90+
logger = get_logger(
91+
"outside_actor", in_ray_actor=True
92+
) # in_ray_actor should not take effect
93+
logger.info("Outside logger initialized")
94+
self.assertFalse(os.path.exists(os.path.join(self.log_dir, "outside_actor.log")))
95+
96+
finally:
97+
ray.shutdown(_exiting_interpreter=True)
98+
99+
def test_actor_log(self):
100+
ray.init(
101+
namespace=self.config.ray_namespace,
102+
runtime_env=RuntimeEnv(
103+
env_vars={
104+
LOG_DIR_ENV_VAR: self.log_dir,
105+
LOG_LEVEL_ENV_VAR: "INFO",
106+
}
107+
),
108+
)
109+
try:
110+
actor1 = LogActor.remote(1, log_level=logging.INFO)
111+
actor2 = LogActor.remote(2, log_level=logging.DEBUG)
112+
actor3 = LogActor.remote(3, log_level=None)
113+
ray.get(actor1.log_info.remote("Test message"))
114+
ray.get(actor2.log_info.remote("Test message"))
115+
ray.get(actor3.log_info.remote("Test message"))
116+
self.assertTrue(os.path.exists(os.path.join(self.log_dir, "actor_1.log")))
117+
self.assertTrue(os.path.exists(os.path.join(self.log_dir, "actor_2.log")))
118+
self.assertTrue(os.path.exists(os.path.join(self.log_dir, "actor_3.log")))
119+
self.assertTrue(os.path.exists(os.path.join(self.log_dir, "actor_1_nested.log")))
120+
self.assertTrue(os.path.exists(os.path.join(self.log_dir, "actor_2_nested.log")))
121+
self.assertTrue(os.path.exists(os.path.join(self.log_dir, "actor_3_nested.log")))
122+
self.assertFalse(os.path.exists(os.path.join(self.log_dir, "module_in_actor.log")))
123+
self.assertFalse(os.path.exists(os.path.join(self.log_dir, "module_in_actor2.log")))
124+
with open(os.path.join(self.log_dir, "actor_1.log"), "r") as f:
125+
lines = f.readlines()
126+
self.assertEqual(len(lines), 4)
127+
with open(os.path.join(self.log_dir, "actor_2.log"), "r") as f:
128+
lines = f.readlines()
129+
self.assertEqual(len(lines), 8)
130+
with open(os.path.join(self.log_dir, "actor_3.log"), "r") as f:
131+
lines = f.readlines()
132+
self.assertEqual(len(lines), 4)
133+
with open(os.path.join(self.log_dir, "actor_1_nested.log"), "r") as f:
134+
lines = f.readlines()
135+
self.assertEqual(len(lines), 1)
136+
with open(os.path.join(self.log_dir, "actor_2_nested.log"), "r") as f:
137+
lines = f.readlines()
138+
self.assertEqual(len(lines), 2)
139+
with open(os.path.join(self.log_dir, "actor_3_nested.log"), "r") as f:
140+
lines = f.readlines()
141+
self.assertEqual(len(lines), 1)
142+
finally:
143+
ray.shutdown(_exiting_interpreter=True)
144+
145+
def test_group_by_node(self):
146+
ray.init(
147+
namespace=self.config.ray_namespace,
148+
runtime_env=RuntimeEnv(
149+
env_vars={
150+
LOG_DIR_ENV_VAR: self.log_dir,
151+
LOG_LEVEL_ENV_VAR: "INFO",
152+
LOG_NODE_IP_ENV_VAR: "1",
153+
}
154+
),
155+
)
156+
try:
157+
actor = LogActor.remote(1, log_level=logging.INFO)
158+
ray.get(actor.log_info.remote("Test message"))
159+
ips = os.listdir(self.config.log.save_dir)
160+
self.assertTrue(len(ips) > 0)
161+
for ip in ips:
162+
self.assertTrue(os.path.isdir(os.path.join(self.config.log.save_dir, ip)))
163+
ip_logs = os.listdir(os.path.join(self.config.log.save_dir, ip))
164+
self.assertTrue(len(ip_logs) > 0)
165+
finally:
166+
ray.shutdown(_exiting_interpreter=True)

trinity/buffer/pipelines/experience_pipeline.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class ExperiencePipeline:
3434
"""
3535

3636
def __init__(self, config: Config):
37-
self.logger = get_logger(__name__)
37+
self.logger = get_logger(f"{config.explorer.name}_experience_pipeline", in_ray_actor=True)
3838
load_plugins()
3939
pipeline_config = config.data_processor.experience_pipeline
4040
buffer_config = config.buffer

trinity/buffer/ray_wrapper.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class DBWrapper:
3535
"""
3636

3737
def __init__(self, storage_config: StorageConfig, config: BufferConfig) -> None:
38-
self.logger = get_logger(__name__)
38+
self.logger = get_logger(f"sql_{storage_config.name}")
3939
if storage_config.path is None:
4040
storage_config.path = default_storage_path(storage_config, config)
4141
self.engine = create_engine(storage_config.path, poolclass=NullPool)
@@ -220,7 +220,7 @@ class QueueWrapper:
220220
"""An wrapper of a async queue."""
221221

222222
def __init__(self, storage_config: StorageConfig, config: BufferConfig) -> None:
223-
self.logger = get_logger(__name__)
223+
self.logger = get_logger(f"queue_{storage_config.name}")
224224
self.config = config
225225
self.capacity = storage_config.capacity
226226
self.queue = QueueBuffer.get_queue(storage_config, config)

trinity/buffer/reader/queue_reader.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@
88
from trinity.buffer.ray_wrapper import QueueWrapper
99
from trinity.common.config import BufferConfig, StorageConfig
1010
from trinity.common.constants import ReadStrategy, StorageType
11-
from trinity.utils.log import get_logger
12-
13-
logger = get_logger(__name__)
1411

1512

1613
class QueueReader(BufferReader):

trinity/buffer/writer/queue_writer.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,6 @@
77
from trinity.buffer.ray_wrapper import QueueWrapper
88
from trinity.common.config import BufferConfig, StorageConfig
99
from trinity.common.constants import StorageType
10-
from trinity.utils.log import get_logger
11-
12-
logger = get_logger(__name__)
1310

1411

1512
class QueueWriter(BufferWriter):

0 commit comments

Comments
 (0)