Add new scheduler with step granularity #161
Annotations
4 errors
|
unittest
Process completed with exit code 1.
|
|
Failed Test: tests/trainer/trainer_test.py::TestFullyAsyncMode::test_fully_async_mode
tests/trainer/trainer_test.py::TestFullyAsyncMode::test_fully_async_mode: The test failed in the call phase due to an assertion error - self = <tests.trainer.trainer_test.TestFullyAsyncMode testMethod=test_fully_async_mode>
def test_fully_async_mode(self):
config = get_template_config()
config.project = "unittest"
config.name = f"fully_async_{datetime.now().strftime('%Y%m%d%H%M%S')}"
config.checkpoint_root_dir = get_checkpoint_path()
config.buffer.total_epochs = 1
config.buffer.batch_size = 4
config.cluster.gpu_per_node = 2
config.cluster.node_num = 1
config.model.model_path = get_model_path()
config.buffer.explorer_input.taskset = get_unittest_dataset_config("countdown")
config.buffer.trainer_input.experience_buffer = StorageConfig(
name="exp_buffer",
storage_type=StorageType.QUEUE,
wrap_in_ray=True,
)
config.synchronizer.sync_method = SyncMethod.CHECKPOINT
config.synchronizer.sync_interval = 8
config.monitor.monitor_type = "tensorboard"
trainer_config = deepcopy(config)
trainer_config.mode = "train"
trainer_config.check_and_update()
explorer1_config = deepcopy(config)
explorer1_config.mode = "explore"
explorer1_config.explorer.name = "explorer1"
config.cluster.gpu_per_node = 1
config.cluster.node_num = 1
explorer1_config.explorer.rollout_model.engine_num = 1
explorer1_config.explorer.rollout_model.tensor_parallel_size = 1
explorer1_config.buffer.explorer_output = StorageConfig(
name="exp_buffer",
storage_type=StorageType.QUEUE,
wrap_in_ray=True,
)
explorer2_config = deepcopy(explorer1_config)
explorer1_config.check_and_update()
trainer_process = multiprocessing.Process(target=run_trainer, args=(trainer_config,))
trainer_process.start()
ray.init(ignore_reinit_error=True)
while True:
try:
ray.get_actor("queue-exp_buffer", namespace=trainer_config.ray_namespace)
break
except ValueError:
print("waiting for trainer to start.")
time.sleep(5)
explorer_process_1 = multiprocessing.Process(target=run_explorer, args=(explorer1_config,))
explorer_process_1.start()
time.sleep(20)
explorer2_config.explorer.name = "explorer2"
explorer2_config.check_and_update()
explorer_process_2 = multiprocessing.Process(target=run_explorer, args=(explorer2_config,))
explorer_process_2.start()
explorer_process_1.join()
explorer_process_2.join()
# wait for trainer process to finish.
trainer_process.join(timeout=200)
# check the tensorboard
parser = TensorBoardParser(
os.path.join(trainer_config.monitor.cache_dir, "tensorboard", "trainer")
)
actor_metrics = parser.metric_list("actor")
> self.assertEqual(parser.metric_max_step(actor_metrics[0]), 8)
E AssertionError: 4 != 8
tests/trainer/trainer_test.py:372: AssertionError
|
|
Failed Test: tests/buffer/file_test.py::TestFileBuffer::test_file_writer
tests/buffer/file_test.py::TestFileBuffer::test_file_writer: The test failed in the call phase - self = <tests.buffer.file_test.TestFileBuffer testMethod=test_file_writer>
async def test_file_writer(self):
writer = get_buffer_writer(
self.config.buffer.trainer_input.experience_buffer, self.config.buffer
)
await writer.acquire()
writer.write(
[
{"prompt": "hello world"},
{"prompt": "hi"},
]
)
await writer.write_async(
[
{"prompt": "My name is"},
{"prompt": "What is your name?"},
]
)
await writer.release()
file_wrapper = ray.get_actor("json-test_buffer")
self.assertIsNotNone(file_wrapper)
file_path = default_storage_path(
self.config.buffer.trainer_input.experience_buffer, self.config.buffer
)
> with open(file_path, "r") as f:
E FileNotFoundError: [Errno 2] No such file or directory: '/mnt/checkpoints/unittest/test/buffer/test_buffer.jsonl'
tests/buffer/file_test.py:119: FileNotFoundError
|
|
unittest
Process completed with exit code 1.
|