Skip to content

ValueError: Default process group has not been initialized, please make sure to call init_process_group. #20690

@hanyangyu1021

Description

@hanyangyu1021

Bug description

Because my dataset has order, so I don't want to use shuffle in distributional sampling. I follow the solutionns online and customed my own sampling. But raise the ValueError: Default process group has not been initialized, please make sure to call init_process_group. How can I solve this?

`from torch_geometric.data import DataLoader
from torch.utils.data import DistributedSampler
from lightning.pytorch.callbacks import LearningRateMonitor
from lightning.pytorch.loggers import WandbLogger
import argparse
from lightning.pytorch.strategies import DDPStrategy

def train_dataloader2(train_dataset, num_workers, batch_size, num_gpus):
"""returns a dataloader for training according to hparams

Returns:
    DataLoader: DataLoader ready to deliver samples for training
"""
# define a distributed sampler in case we are using multiple GPUs
if num_gpus>1:
    sampler = DistributedSampler(train_dataset, shuffle=False)
# only use the sampler if using multiple GPUs
return DataLoader(
    train_dataset,
    shuffle=False,
    num_workers=num_workers,
    batch_size=batch_size,
    pin_memory=False,
    sampler=sampler if num_gpus > 1 else None)

if name == 'main':

####################################################################################################################
# Args
parser = argparse.ArgumentParser()
parser.add_argument('--run_name', type=str, default='moregpu', help='Name of the run.')
parser.add_argument('--record', type=int, default=0,
                    help='Whether to log the training and save models [0, 1].')
parser.add_argument('--use_wandb', type=int, default=0,
                    help='Log training on weights and biases [0, 1]. You might need to log in to wandb.')
parser.add_argument('--save_path', type=str, default='./runs',
                    help='Where the config and models will be saved.')
parser.add_argument('--fine_tune', type=int, default=0,
                    help='Whether to train from scratch (0), or fine-tune existing model (1).')
parser.add_argument('--model_path', type=str, default='./checkpoints',
                    help='If fine-tuning, path to where that model is saved.')
parser.add_argument('--model_name', type=str, default='model.pt',
                    help='If fine-tuning, path to what is the name of the model.')
parser.add_argument('--compile_models', type=int, default=0,
                    help='If fine-tuning, whether to compile models. When not fine-tuning, it is defined in the config')
parser.add_argument('--data_path_train', type=str, default='./poses/', #./datatry/
                    help='Path to the training data.')
parser.add_argument('--batch_size', type=int, default=16,
                    help='Batch size for fine-tuning. When not fine-tuning, it is defined in the config')
parser.add_argument('--data_path_val', type=str, default='./poses/', #./datatry/
                    help='Path to the validation data.')

gpu_count = torch.cuda.device_count()
print(f"GPU number is : {gpu_count}")
record = bool(parser.parse_args().record)
use_wandb = bool(parser.parse_args().use_wandb)
fine_tune = bool(parser.parse_args().fine_tune)
compile_models = bool(parser.parse_args().compile_models)
run_name = parser.parse_args().run_name
save_path = parser.parse_args().save_path
model_path = parser.parse_args().model_path
model_name = parser.parse_args().model_name
data_path_train = parser.parse_args().data_path_train
data_path_val = parser.parse_args().data_path_val
bs = parser.parse_args().batch_size
####################################################################################################################
save_dir = f'{save_path}/{run_name}' if record else None

if record and not os.path.exists(save_dir):
    os.makedirs(save_dir)

if fine_tune:
    config = pickle.load(open(f'{model_path}/config.pkl', 'rb'))
    config['compile_models'] = False
    config['batch_size'] = bs
    config['save_dir'] = save_dir
    config['record'] = record
    # TODO: Here you can change other parameter from the ones used to train initial model.
    model = GraphDiffusion.load_from_checkpoint(f'{model_path}/{model_name}', config=config, strict=True,
                                                map_location=config['device']).to(config['device'])
    if compile_models:
        model.model.compile_models()
else:
    config['save_dir'] = save_dir
    config['record'] = record
    model = GraphDiffusion(config).to(config['device'])
####################################################################################################################
dset_val = FixDatasetSythetic(data_path_val, 4, rand_g_prob=0, total=20,  val=True)
dataloader_val = DataLoader(dset_val, batch_size=config['batch_size'], shuffle=False, num_workers=0, pin_memory=True)

dset = FixDatasetSythetic(data_path_train, 16, rand_g_prob=config['randomize_g_prob'], total=20)
dataloader = train_dataloader2(dset, num_workers=2, batch_size=config['batch_size'], num_gpus=gpu_count)
# dataloader = DataLoader(dset, batch_size=config['batch_size'], drop_last=True, shuffle=False, persistent_workers=True,
#                         num_workers=2, pin_memory=True)
####################################################################################################################
if record:
    if use_wandb:
        logger = WandbLogger(project='Instant Policy',
                             name=f'{run_name}',
                             save_dir=save_dir,
                             log_model=False)
    # Dump config to save_dir
    pickle.dump(config, open(f'{save_dir}/config.pkl', 'wb'))
else:
    logger = None
lr_monitor = LearningRateMonitor(logging_interval='step')
trainer = L.Trainer(
    enable_checkpointing=False,  # We save the models manually.
    accelerator=config['device'],
    devices=gpu_count, #gpu_count
    strategy=DDPStrategy(find_unused_parameters=True),
    max_steps=config['num_iters'],
    enable_progress_bar=True,
    precision='16-mixed',
    val_check_interval=1000,  # TODO: might want to change that.
    num_sanity_val_steps=2,
    check_val_every_n_epoch=None,#None
    logger=logger,
    log_every_n_steps=1000,  # TODO: might want to change that.
    gradient_clip_val=1,
    gradient_clip_algorithm='norm',
    callbacks=[lr_monitor],
    use_distributed_sampler = False
)

trainer.fit(
    model=model,
    train_dataloaders=dataloader,
    val_dataloaders=dataloader_val,
)`

What version are you seeing the problem on?

v2.2, v2.5

How to reproduce the bug

Error messages and logs

# Error messages and logs here please

Environment

Current environment
#- PyTorch Lightning Version (e.g., 2.5.0):
#- PyTorch Version (e.g., 2.5):
#- Python version (e.g., 3.12):
#- OS (e.g., Linux):
#- CUDA/cuDNN version:
#- GPU models and configuration:
#- How you installed Lightning(`conda`, `pip`, source):

More info

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions