Add new scheduler with step granularity #170
Annotations
3 errors
|
unittest
Process completed with exit code 1.
|
|
Failed Test: tests/trainer/trainer_test.py::TestFullyAsyncMode::test_fully_async_mode
tests/trainer/trainer_test.py::TestFullyAsyncMode::test_fully_async_mode: The test failed in the call phase - self = <tests.trainer.trainer_test.TestFullyAsyncMode testMethod=test_fully_async_mode>
def test_fully_async_mode(self):
config = get_template_config()
config.project = "unittest"
config.name = f"fully_async_{datetime.now().strftime('%Y%m%d%H%M%S')}"
config.checkpoint_root_dir = get_checkpoint_path()
config.buffer.total_epochs = 1
config.buffer.batch_size = 4
config.cluster.gpu_per_node = 2
config.cluster.node_num = 1
config.model.model_path = get_model_path()
config.buffer.explorer_input.taskset = get_unittest_dataset_config("countdown")
config.buffer.trainer_input.experience_buffer = StorageConfig(
name="exp_buffer",
storage_type=StorageType.QUEUE,
wrap_in_ray=True,
)
config.synchronizer.sync_method = SyncMethod.CHECKPOINT
config.synchronizer.sync_interval = 8
config.monitor.monitor_type = "tensorboard"
trainer_config = deepcopy(config)
trainer_config.mode = "train"
trainer_config.check_and_update()
explorer1_config = deepcopy(config)
explorer1_config.mode = "explore"
explorer1_config.explorer.name = "explorer1"
config.cluster.gpu_per_node = 1
config.cluster.node_num = 1
explorer1_config.explorer.rollout_model.engine_num = 1
explorer1_config.explorer.rollout_model.tensor_parallel_size = 1
explorer1_config.buffer.explorer_output = StorageConfig(
name="exp_buffer",
storage_type=StorageType.QUEUE,
wrap_in_ray=True,
)
explorer2_config = deepcopy(explorer1_config)
explorer1_config.check_and_update()
trainer_process = multiprocessing.Process(target=run_trainer, args=(trainer_config,))
trainer_process.start()
ray.init(ignore_reinit_error=True)
while True:
try:
ray.get_actor("queue-exp_buffer", namespace=trainer_config.ray_namespace)
break
except ValueError:
print("waiting for trainer to start.")
time.sleep(5)
explorer_process_1 = multiprocessing.Process(target=run_explorer, args=(explorer1_config,))
explorer_process_1.start()
time.sleep(5)
explorer2_config.explorer.name = "explorer2"
explorer2_config.check_and_update()
explorer_process_2 = multiprocessing.Process(target=run_explorer, args=(explorer2_config,))
explorer_process_2.start()
explorer_process_1.join()
explorer_process_2.join()
# wait for trainer process to finish.
trainer_process.join(timeout=200)
# check the tensorboard
parser = TensorBoardParser(
os.path.join(trainer_config.monitor.cache_dir, "tensorboard", "trainer")
)
actor_metrics = parser.metric_list("actor")
self.assertEqual(parser.metric_max_step(actor_metrics[0]), 8)
parser = TensorBoardParser(
os.path.join(explorer1_config.monitor.cache_dir, "tensorboard", "explorer1")
)
rollout_metrics = parser.metric_list("rollout")
self.assertEqual(parser.metric_max_step(rollout_metrics[0]), 4)
parser = TensorBoardParser(
os.path.join(explorer2_config.monitor.cache_dir, "tensorboard", "explorer2")
)
rollout_metrics = parser.metric_list("rollout")
self.assertEqual(parser.metric_max_step(rollout_metrics[0]), 4)
# check the checkpoint
explorer1_cache = CacheManager(explorer1_config)
cache = explorer1_cache.load_explorer()
> self.assertEqual(cache["latest_iteration"], 4)
E KeyError: 'latest_iteration'
tests/trainer/trainer_test.py:388: KeyError
|
|
unittest
Process completed with exit code 1.
|