diff --git a/build_dlio2.sh b/build_dlio2.sh new file mode 100755 index 00000000..bd93c735 --- /dev/null +++ b/build_dlio2.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +docker build -t dlio . \ No newline at end of file diff --git a/configs/workload/dlrm.yaml b/configs/workload/dlrm.yaml new file mode 100644 index 00000000..1049e99c --- /dev/null +++ b/configs/workload/dlrm.yaml @@ -0,0 +1,34 @@ +model: dlrm + +framework: pytorch + +workflow: + generate_data: False + train: True + do_eval: True + +dataset: + data_folder: data/dlrm + format: bin + num_files_train: 1 + num_files_eval: 1 + num_samples_per_file: 4195198976 + record_length: 327680 + keep_files: True + eval_num_samples_per_file: 91681240 + +reader: + data_loader: terabyte + batch_size: 2048 + batch_size_eval: 16384 + sample_shuffle: random + +train: + epochs: 1 + computation_time: 0.064296 + total_training_steps: 32768 + total_eval_steps: 2048 + +evaluation: + eval_time: 0.0843 + steps_between_evals: 16384 \ No newline at end of file diff --git a/configs/workload/unet3d.yaml b/configs/workload/unet3d.yaml index 28d1fddb..c68146fe 100644 --- a/configs/workload/unet3d.yaml +++ b/configs/workload/unet3d.yaml @@ -5,14 +5,12 @@ framework: pytorch workflow: generate_data: False train: True - evaluation: True checkpoint: True dataset: data_folder: ./data/unet3d/ format: npz num_files_train: 168 - num_files_eval: 42 num_samples_per_file: 1 record_length: 234560851 record_length_stdev: 109346892 @@ -20,18 +18,13 @@ dataset: reader: data_loader: pytorch - batch_size: 2 - batch_size_eval: 1 + batch_size: 4 read_threads: 4 train: epochs: 10 - computation_time: 0.753 + computation_time: 1.3604 -evaluation: - eval_time: 5.8 - epochs_between_evals: 2 - checkpoint: checkpoint_after_epoch: 5 epochs_between_checkpoints: 2 diff --git a/exec.sh b/exec.sh new file mode 100755 index 00000000..ca3fb70f --- /dev/null +++ b/exec.sh @@ -0,0 +1,3 @@ +#!/bin/bash +mpirun -np 8 python3 src/dlio_benchmark.py workload=dlrm +cp -r /workspace/dlio/hydra_log/dlrm/* /workspace/dlio/save_spot \ No newline at end of file diff --git a/launch_dlio.sh b/launch_dlio.sh new file mode 100644 index 00000000..b8b302ea --- /dev/null +++ b/launch_dlio.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +docker run -t dlio python ./src/dlio_benchmark.py ++workload.workflow.generate_data=True \ No newline at end of file diff --git a/run_dlio2.sh b/run_dlio2.sh new file mode 100755 index 00000000..4e43708c --- /dev/null +++ b/run_dlio2.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +container_name=${1:-dlrm_dlio2} + +# Remove existing container if a previous run was interrupted +if [ "$(docker ps -a | grep $container_name)" ] +then + docker rm $container_name +fi + +# docker run -it --rm --name=$container_name --gpus all -v /raid/data/dlrm_dlio2/dlio2:/workspace/dlio/data/dlrm dlio:latest /bin/bash exec.sh +sudo docker run -it \ +--rm \ +--name=$container_name \ +--gpus all \ +-v /raid/data/dlrm_dlio2/dlio2:/workspace/dlio/data/dlrm \ +-v /raid/data/dlrm_dlio2/dlio2_log:/workspace/dlio/save_spot \ +dlio:latest \ +/bin/bash exec.sh \ No newline at end of file diff --git a/src/common/enumerations.py b/src/common/enumerations.py index 399444ef..35ae317c 100644 --- a/src/common/enumerations.py +++ b/src/common/enumerations.py @@ -90,6 +90,7 @@ class FormatType(Enum): HDF5_OPT = 'hdf5_opt' JPEG = 'jpeg' PNG = 'png' + BIN = 'bin' def __str__(self): return self.value @@ -100,6 +101,7 @@ class DataLoaderType(Enum): """ TENSORFLOW='tensorflow' PYTORCH='pytorch' + TERABYTE='terabyte' NONE='none' def __str__(self): diff --git a/src/data_generator/bin_generator.py b/src/data_generator/bin_generator.py new file mode 100644 index 00000000..cf0754d2 --- /dev/null +++ b/src/data_generator/bin_generator.py @@ -0,0 +1,83 @@ +""" +The binary file generator designed for simulating DLRM in DLIO +""" + +from src.common.enumerations import Compression +from src.data_generator.data_generator import DataGenerator + +import logging +import numpy as np +from numpy import random +import math +import os + +from src.utils.utility import progress +from shutil import copyfile + +""" +Generator for creating data in BIN format. +""" + +class BINGenerator(DataGenerator): + def __init__(self): + super().__init__() + + def generate(self): + """ + Generate binary data for training and testing. + """ + super().generate() + + for i in range(self.my_rank, int(self.total_files_to_generate), self.comm_size): + progress(i+1, self.total_files_to_generate, "Generating Binary Data") + out_path_spec = self.storage.get_uri(self._file_list[i]) + # File size will be different depending on training or validation file + if i < self.num_files_train: + # Generating Training files + segment_size = 91681240*5 + num_instance = self.num_samples #4195198976 for dlrm training + parts = math.ceil(num_instance / segment_size) + for k in range(0, parts): + num_written = segment_size if k < parts-1 else num_instance - k*segment_size + X_int = np.random.randint(2557264, size = (num_written, 13)) + X_cat = np.random.randint(8831335, size = (num_written, 26)) + y = np.random.randint(2, size=num_written) + np_data = np.concatenate([y.reshape(-1, 1), X_int, X_cat], axis=1) + np_data = np_data.astype(np.int32) + if self.compression != Compression.ZIP: + with open(out_path_spec, 'ab') as output_file: + output_file.write(np_data.tobytes()) + output_file.flush() + os.fsync(output_file.fileno()) + else: + # Generating Evaluation files + + #### Old implementation that flushes file written at the end + # + # num_instance = self.eval_num_samples_per_file # estimated as 6548660*14 + # X_int = np.random.randint(2557264, size = (num_instance, 13)) + # X_cat = np.random.randint(8831335, size = (num_instance, 26)) + # y = np.random.randint(2, size=num_instance) + # np_data = np.concatenate([y.reshape(-1, 1), X_int, X_cat], axis=1) + # np_data = np_data.astype(np.int32) + # if self.compression != Compression.ZIP: + # with open(out_path_spec, 'wb') as output_file: + # output_file.write(np_data.tobytes()) + + segment_size = 91681240*5 + num_instance = self.eval_num_samples_per_file #4195198976 for dlrm training + parts = math.ceil(num_instance / segment_size) + for k in range(0, parts): + num_written = segment_size if k < parts-1 else num_instance - k*segment_size + X_int = np.random.randint(2557264, size = (num_written, 13)) + X_cat = np.random.randint(8831335, size = (num_written, 26)) + y = np.random.randint(2, size=num_written) + np_data = np.concatenate([y.reshape(-1, 1), X_int, X_cat], axis=1) + np_data = np_data.astype(np.int32) + if self.compression != Compression.ZIP: + with open(out_path_spec, 'ab') as output_file: + output_file.write(np_data.tobytes()) + output_file.flush() + os.fsync(output_file.fileno()) + + diff --git a/src/data_generator/data_generator.py b/src/data_generator/data_generator.py index d438a454..0ccfb2ec 100644 --- a/src/data_generator/data_generator.py +++ b/src/data_generator/data_generator.py @@ -52,6 +52,8 @@ def __init__(self): self.storage = StorageFactory().get_storage(self._args.storage_type, self._args.storage_root, self._args.framework) + self.eval_num_samples_per_file = self._args.eval_num_samples_per_file + @abstractmethod def generate(self): if self.my_rank == 0: diff --git a/src/data_generator/generator_factory.py b/src/data_generator/generator_factory.py index a0aac65e..21b2773d 100644 --- a/src/data_generator/generator_factory.py +++ b/src/data_generator/generator_factory.py @@ -23,6 +23,7 @@ from src.data_generator.npz_generator import NPZGenerator from src.data_generator.jpeg_generator import JPEGGenerator from src.data_generator.png_generator import PNGGenerator +from src.data_generator.bin_generator import BINGenerator @@ -44,5 +45,7 @@ def get_generator(type): return JPEGGenerator() elif type == FormatType.PNG: return PNGGenerator() + elif type == FormatType.BIN: + return BINGenerator() else: raise Exception(str(ErrorCodes.EC1001)) \ No newline at end of file diff --git a/src/dlio_benchmark.py b/src/dlio_benchmark.py index 4e22a8af..c028a0bd 100644 --- a/src/dlio_benchmark.py +++ b/src/dlio_benchmark.py @@ -112,6 +112,7 @@ def __init__(self, cfg): self.num_files_train = self.args.num_files_train self.num_samples = self.args.num_samples_per_file self.total_training_steps = self.args.total_training_steps + self.total_eval_steps = self.args.total_eval_steps self.epochs = self.args.epochs self.batch_size = self.args.batch_size @@ -142,6 +143,9 @@ def __init__(self, cfg): self.eval_after_epoch = self.args.eval_after_epoch self.epochs_between_evals = self.args.epochs_between_evals + self.steps_between_evals = self.args.steps_between_evals + self.eval_num_samples = self.args.eval_num_samples_per_file + # Hold various lists/dicts for statistics self.time_to_load_train_batch = [] self.time_to_process_train_batch = [] @@ -193,8 +197,8 @@ def _eval(self, epoch): step = 1 total = math.floor(self.num_samples * self.num_files_eval / self.batch_size_eval / self.comm_size) t0 = time() - reader = self.framework.get_reader(DatasetType.VALID) - for batch in reader.next(): + + for batch in self.framework.get_reader(DatasetType.VALID).next(): self.stats.eval_batch_loaded(epoch, step, t0) if self.eval_time > 0: @@ -207,18 +211,24 @@ def _eval(self, epoch): self.stats.eval_batch_processed(epoch, step, t0) step += 1 - if step > total: + + # if step >= self.total_eval_steps: + # return + + if step > total or step >= self.total_eval_steps: return step - 1 self.framework.barrier() t0 = time() + return step - 1 + def _train(self, epoch): """ Training loop for reading the dataset and performing training computations. :return: returns total steps. """ - block = 1 # A continuous period of training steps, ended by checkpointing + block = 1 # A continuous period of training steps, ended by checkpointing (and evluation???) block_step = overall_step = 1 # Steps are taken within blocks max_steps = math.floor(self.num_samples * self.num_files_train / self.batch_size / self.comm_size) @@ -245,6 +255,31 @@ def _train(self, epoch): self.stats.batch_processed(epoch, overall_step, block, t0) + # Perform evaluation during epochs if required + # Assume that evaluation happens on all GPU + if overall_step > 0 and overall_step % self.steps_between_evals == 0: + # Before starting the evaluation, terminating the current block + self.stats.end_block(epoch, block, block_step) + + # Initialize the eval data loader & perform evaluation + self.stats.start_eval(epoch) + self.framework.get_reader(DatasetType.VALID).read(epoch) + self.framework.barrier() + self._eval(epoch) + self.stats.end_eval(epoch) + self.framework.barrier() + self.framework.get_reader(DatasetType.VALID).finalize() + + ##### checkpoint after evaluation + self.stats.start_ckpt(epoch, block, overall_step) + self.framework.checkpoint(epoch, overall_step) + self.stats.end_ckpt(epoch, block) + self.framework.barrier() + ##### checkpoint end + + # Start recording the next block + self.stats.start_block(epoch, block) + if self.do_checkpoint and (self.steps_between_checkpoints>=0) and overall_step == self.next_checkpoint_step: self.stats.end_block(epoch, block, block_step) @@ -266,7 +301,7 @@ def _train(self, epoch): if (block_step!=1 and self.do_checkpoint) or (not self.do_checkpoint): self.stats.end_block(epoch, block, block_step-1) break - + overall_step += 1 t0 = time() @@ -314,8 +349,6 @@ def run(self): self.stats.end_epoch(epoch, steps) logging.debug(f"{utcnow()} Rank {self.my_rank} returned after {steps} steps.") - - self.framework.barrier() self.framework.get_reader(DatasetType.TRAIN).finalize() diff --git a/src/reader/reader_factory.py b/src/reader/reader_factory.py index 3d6de76a..570fe683 100644 --- a/src/reader/reader_factory.py +++ b/src/reader/reader_factory.py @@ -54,6 +54,10 @@ def get_reader(type, data_loader, dataset_type): from src.reader.torch_data_loader_reader import TorchDataLoaderReader return TorchDataLoaderReader(dataset_type) # Implement other data loader here + # Terabyte Dataloader added for supporting DLRM simulation + elif data_loader == DataLoaderType.TERABYTE: + from src.reader.terabyte_data_loader_reader import TeraBinLoaderReader + return TeraBinLoaderReader(dataset_type) else: print("Data Loader %s is not implemented" %data_loader) raise Exception(str(ErrorCodes.EC1004)) diff --git a/src/reader/terabyte_data_loader_reader.py b/src/reader/terabyte_data_loader_reader.py new file mode 100644 index 00000000..e986cdac --- /dev/null +++ b/src/reader/terabyte_data_loader_reader.py @@ -0,0 +1,149 @@ +import math +import logging +import numpy as np + +from src.utils.utility import utcnow + +from torch.utils.data import Dataset, DataLoader, RandomSampler +from torch.utils.data.distributed import DistributedSampler + +from src.common.enumerations import Shuffle +from src.reader.reader_handler import FormatReader +import os +import torch + +import math +import logging +import numpy as np + +from src.utils.utility import utcnow + +from torch.utils.data import Dataset, DataLoader, RandomSampler +from torch.utils.data.distributed import DistributedSampler + +from src.common.enumerations import Shuffle, DatasetType +from src.reader.reader_handler import FormatReader +import os +import torch + +class TeraBinLoaderReader(FormatReader): + """ + Terabyte DataLoader reader and iterator logic. + PyTorch data loader is file format agnostic so it is not technically separate from the other formats + This is a revised version data_loader_reader to reflect the data loader for binary terabyte dataset for DLRM + """ + + class TerabyteBinDataset(Dataset): + def __init__(self, data_file, batch_size=1, max_ind_range=-1, bytes_per_feature=4): + self.tar_fea = 1 # single target + self.den_fea = 13 # 13 dense features + self.spa_fea = 26 # 26 sparse features + self.tad_fea = self.tar_fea + self.den_fea + self.tot_fea = self.tad_fea + self.spa_fea + self.batch_size = batch_size + self.max_ind_range = max_ind_range + self.bytes_per_entry = (bytes_per_feature * self.tot_fea * batch_size) + + self.num_entries = math.ceil(os.path.getsize(data_file) / self.bytes_per_entry) + + print('data file:', data_file, 'number of batches:', self.num_entries) + self.file = open(data_file, 'rb') + + def __len__(self): + return self.num_entries + + def __getitem__(self, idx): + self.file.seek(idx * self.bytes_per_entry, 0) + raw_data = self.file.read(self.bytes_per_entry) + array = np.frombuffer(raw_data, dtype=np.int32) + tensor = torch.from_numpy(array).view((-1, self.tot_fea)) + + return _transform_features(x_int_batch=tensor[:, 1:14], + x_cat_batch=tensor[:, 14:], + y_batch=tensor[:, 0], + max_ind_range=self.max_ind_range, + flag_input_torch_tensor=True) + + def __del__(self): + self.file.close() + + def __init__(self, dataset_type): + super().__init__(dataset_type) + # self.read_threads = self._arg_parser.args.read_threads + # self.computation_threads = self._arg_parser.args.computation_threads + + def read(self, epoch_number): + # superclass function initializes the file list + super().read(epoch_number) + + do_shuffle = True if self.sample_shuffle != Shuffle.OFF else False + + # There's only one training and valid file with shared file access. + if self.dataset_type == DatasetType.TRAIN: + dataset = self.TerabyteBinDataset(self._file_list[0], batch_size=self.batch_size, max_ind_range=10000000) + + self._dataset = DataLoader( + dataset, + batch_size=None, + batch_sampler=None, + num_workers=0, + collate_fn=None, + pin_memory=False, + drop_last=False, + sampler=RandomSampler(dataset) + ) # shuffle=do_shuffle removed since 'sampler option is mutually exclusive with shuffle', + elif self.dataset_type == DatasetType.VALID: + dataset = self.TerabyteBinDataset(self._file_list[0], batch_size=self.batch_size, max_ind_range=10000000) + + self._dataset = DataLoader( + dataset, + batch_size=None, + batch_sampler=None, + shuffle=do_shuffle, + num_workers=0, + collate_fn=None, + pin_memory=False, + drop_last=False, + ) + else: + print("ERROR: UNKNOWN DatasetType") + + # Must set the epoch in DistributedSampler to ensure proper shuffling + # https://pytorch.org/docs/stable/data.html#torch.utils.data.distributed.DistributedSampler + # self._dataset.sampler.set_epoch(epoch_number) + + logging.debug(f"{utcnow()} Rank {self.my_rank} will read {len(self._dataset) * self.batch_size} files") + + def next(self): + super().next() + + # dataset = self._dataset + logging.debug(f"{utcnow()} Rank {self.my_rank} should read {len(self._dataset)} batches") + + for batch in self._dataset: + yield batch + + def finalize(self): + pass + + +def _transform_features( + x_int_batch, x_cat_batch, y_batch, max_ind_range, flag_input_torch_tensor=False +): + if max_ind_range > 0: + x_cat_batch = x_cat_batch % max_ind_range + + if flag_input_torch_tensor: + x_int_batch = torch.log(x_int_batch.clone().detach().type(torch.float) + 1) + x_cat_batch = x_cat_batch.clone().detach().type(torch.long) + y_batch = y_batch.clone().detach().type(torch.float32).view(-1, 1) + else: + x_int_batch = torch.log(torch.tensor(x_int_batch, dtype=torch.float) + 1) + x_cat_batch = torch.tensor(x_cat_batch, dtype=torch.long) + y_batch = torch.tensor(y_batch, dtype=torch.float32).view(-1, 1) + + batch_size = x_cat_batch.shape[0] + feature_count = x_cat_batch.shape[1] + lS_o = torch.arange(batch_size).reshape(1, -1).repeat(feature_count, 1) + + return x_int_batch, lS_o, x_cat_batch.t(), y_batch.view(-1, 1) \ No newline at end of file diff --git a/src/reader/torch_data_loader_reader.py b/src/reader/torch_data_loader_reader.py index 6cb177be..504aa7a4 100644 --- a/src/reader/torch_data_loader_reader.py +++ b/src/reader/torch_data_loader_reader.py @@ -117,7 +117,7 @@ def read(self, epoch_number): prefetch_factor = self.prefetch_size if prefetch_factor>0: if self.my_rank==0: - logging.info(f"{utcnow()} Prefetch size is {prefetch_size}; prefetch factor of {prefetch_factor} will be set to Torch DataLoader.") + logging.info(f"{utcnow()} Prefetch size is {self.prefetch_size}; prefetch factor of {prefetch_factor} will be set to Torch DataLoader.") else: if self.my_rank==0: logging.info(f"{utcnow()} Prefetch size is 0; a default prefetch factor of 2 will be set to Torch DataLoader.") diff --git a/src/utils/config.py b/src/utils/config.py index 60959e6c..9002735f 100644 --- a/src/utils/config.py +++ b/src/utils/config.py @@ -88,6 +88,11 @@ class ConfigArguments: num_subfolders_eval: int = 0 iostat_devices: ClassVar[List[str]] = [] + # Added for supporting evaluation every few steps during one epoch + steps_between_evals: int = -1 # steps between eval renaming + eval_num_samples_per_file: int = 1 + total_eval_steps: int = -1 + def __init__(self): """ Virtually private constructor. """ if ConfigArguments.__instance is not None: @@ -163,6 +168,8 @@ def LoadConfig(args, config): args.format = FormatType(config['dataset']['format']) if 'keep_files' in config['dataset']: args.keep_files = config['dataset']['keep_files'] + if 'eval_num_samples_per_file' in config['dataset']: + args.eval_num_samples_per_file = config['dataset']['eval_num_samples_per_file'] # data reader reader=None @@ -208,6 +215,8 @@ def LoadConfig(args, config): args.computation_time_stdev = config['train']['computation_time_stdev'] if 'seed' in config['train']: args.seed = config['train']['seed'] + if 'total_eval_steps' in config['train']: + args.total_eval_steps = config['train']['total_eval_steps'] if 'evaluation' in config: if 'eval_time' in config['evaluation']: @@ -218,6 +227,8 @@ def LoadConfig(args, config): args.eval_after_epoch = config['evaluation']['eval_after_epoch'] if 'epochs_between_evals' in config['evaluation']: args.epochs_between_evals = config['evaluation']['epochs_between_evals'] + if 'steps_between_evals' in config['evaluation']: + args.steps_between_evals = config['evaluation']['steps_between_evals'] if 'checkpoint' in config: if 'checkpoint_folder' in config['checkpoint']: diff --git a/src/utils/statscounter.py b/src/utils/statscounter.py index fd68b965..f6be9871 100644 --- a/src/utils/statscounter.py +++ b/src/utils/statscounter.py @@ -92,6 +92,7 @@ def start_eval(self, epoch): self.load_and_proc_times[epoch]['load']['eval'] = [] self.load_and_proc_times[epoch]['proc']['eval'] = [] + # Solution: add one more key such as the step to avoid overlap def end_eval(self, epoch): if self.my_rank == 0: ts = utcnow()