diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e87774d5..5e43d2c4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -291,4 +291,16 @@ jobs: run: | source ${VENV_PATH}/bin/activate rm -rf output data checkpoints - mpirun -np 2 ${DLIO_EXEC} workload=llama_8b_zero3 ++workload.model.parallelism.data=1024 ++workload.checkpoint.mode=subset \ No newline at end of file + mpirun -np 2 ${DLIO_EXEC} workload=llama_8b_zero3 ++workload.model.parallelism.data=1024 ++workload.checkpoint.mode=subset + - name: test_model_comms + run: | + source ${VENV_PATH}/bin/activate + rm -rf output data checkpoints + mpirun -np 2 pytest -k test_resnet_model_with_comms_enabled -v + rm -rf data + - name: test_model_compute + run: | + source ${VENV_PATH}/bin/activate + rm -rf output data checkpoints + mpirun -np 2 pytest -k test_resnet_model_with_compute_enabled -v + rm -rf data \ No newline at end of file diff --git a/dlio_benchmark/checkpointing/base_checkpointing.py b/dlio_benchmark/checkpointing/base_checkpointing.py index d0ecc838..768bf152 100644 --- a/dlio_benchmark/checkpointing/base_checkpointing.py +++ b/dlio_benchmark/checkpointing/base_checkpointing.py @@ -46,6 +46,7 @@ class BaseCheckpointing(ABC): def __init__(self, ext): #TODO(Huihuo): Add support for checkpointing rng states for transformer type of architecture + #TODO: Consider actual model instances - Model.SLEEP is default self.ext = ext self.args = ConfigArguments.get_instance() self.checkpoint_storage = StorageFactory().get_storage(self.args.storage_type, self.args.checkpoint_folder, diff --git a/dlio_benchmark/common/enumerations.py b/dlio_benchmark/common/enumerations.py index e8245b39..08188e58 100644 --- a/dlio_benchmark/common/enumerations.py +++ b/dlio_benchmark/common/enumerations.py @@ -108,6 +108,27 @@ class FrameworkType(Enum): def __str__(self): return self.value + +class Model(Enum): + """ + Different Model Architectures + """ + RESNET = 'resnet50' + UNET= 'unet3d' + BERT = 'bert' + SLEEP = 'sleep' + DEFAULT = 'default' + + def __str__(self): + return self.value + +class Loss(Enum): + """ + Loss functions for models + """ + MSE = 'mse' + CE = 'cross_entropy' + NONE = 'none' class ComputationType(Enum): """ diff --git a/dlio_benchmark/configs/workload/resnet50_a100.yaml b/dlio_benchmark/configs/workload/resnet50_a100.yaml index 018600e4..25419366 100644 --- a/dlio_benchmark/configs/workload/resnet50_a100.yaml +++ b/dlio_benchmark/configs/workload/resnet50_a100.yaml @@ -9,8 +9,8 @@ workflow: train: True dataset: - num_files_train: 1024 - num_samples_per_file: 1251 + num_files_train: 2 + num_samples_per_file: 800 record_length_bytes: 114660.07 record_length_bytes_resize: 150528 data_folder: data/resnet50 diff --git a/dlio_benchmark/framework/framework.py b/dlio_benchmark/framework/framework.py index 80a5729c..a8cc9873 100644 --- a/dlio_benchmark/framework/framework.py +++ b/dlio_benchmark/framework/framework.py @@ -70,9 +70,10 @@ def stop_framework_profiler(self): @abstractmethod def trace_object(self, string, step, r): pass - - def model(epoch, batch, computation_time): - sleep(computation_time) + + @abstractmethod + def model(self, epoch, batch, computation_time): + pass @abstractmethod def compute(self, batch, epoch_number, step, computation_time): diff --git a/dlio_benchmark/framework/framework_factory.py b/dlio_benchmark/framework/framework_factory.py index 1aa88f73..98de0b20 100644 --- a/dlio_benchmark/framework/framework_factory.py +++ b/dlio_benchmark/framework/framework_factory.py @@ -15,7 +15,7 @@ limitations under the License. """ -from dlio_benchmark.common.enumerations import FrameworkType +from dlio_benchmark.common.enumerations import FrameworkType, Model from dlio_benchmark.common.error_code import ErrorCodes @@ -24,12 +24,12 @@ def __init__(self): pass @staticmethod - def get_framework(framework_type, profiling): + def get_framework(framework_type, profiling, model: Model = Model.SLEEP, communication: bool = False): if framework_type == FrameworkType.TENSORFLOW: from dlio_benchmark.framework.tf_framework import TFFramework - return TFFramework.get_instance(profiling) + return TFFramework.get_instance(profiling, model, communication) elif framework_type == FrameworkType.PYTORCH: from dlio_benchmark.framework.torch_framework import TorchFramework - return TorchFramework.get_instance(profiling) + return TorchFramework.get_instance(profiling, model, communication) else: raise Exception(str(ErrorCodes.EC1001)) \ No newline at end of file diff --git a/dlio_benchmark/framework/tf_framework.py b/dlio_benchmark/framework/tf_framework.py index 6566ab39..aec452ec 100644 --- a/dlio_benchmark/framework/tf_framework.py +++ b/dlio_benchmark/framework/tf_framework.py @@ -20,6 +20,7 @@ from time import time, sleep from dlio_benchmark.common.constants import MODULE_AI_FRAMEWORK from dlio_benchmark.data_loader.data_loader_factory import DataLoaderFactory +from dlio_benchmark.model.model_factory import ModelFactory from dlio_benchmark.utils.utility import utcnow, DLIOMPI from dlio_benchmark.utils.utility import Profile, sleep from dlio_benchmark.common.error_code import ErrorCodes @@ -27,7 +28,7 @@ from dlio_benchmark.reader.reader_factory import ReaderFactory from dlio_benchmark.profiler.profiler_factory import ProfilerFactory from dlio_benchmark.storage.storage_factory import StorageFactory -from dlio_benchmark.common.enumerations import FrameworkType, Profiler, FormatType, DatasetType, MetadataType, \ +from dlio_benchmark.common.enumerations import FrameworkType, Model, Profiler, FormatType, DatasetType, MetadataType, \ DataLoaderType import tensorflow as tf @@ -43,15 +44,19 @@ class TFFramework(Framework): __instance = None @dlp.log_init - def __init__(self, profiling): + def __init__(self, profiling, model: Model = Model.SLEEP, communication: bool = False): super().__init__() self.profiling = profiling + self._model = ModelFactory.create_model(FrameworkType.TENSORFLOW, model, communication, gpu_id=DLIOMPI.get_instance().local_rank()) # TODO: Temporary fix, need to separate the iostat profiler (needed for report gen) and the others if profiling: if self.args.profiler != Profiler.IOSTAT: self.tensorboard = ProfilerFactory.get_profiler(Profiler.NONE) else: self.tensorboard = ProfilerFactory.get_profiler(Profiler.TENSORBOARD) + + + # self.model = DDP(model) self.reader_handler = None @dlp.log @@ -64,10 +69,10 @@ def get_type(self): return FrameworkType.TENSORFLOW @staticmethod - def get_instance(profiling): + def get_instance(profiling, model: Model = Model.SLEEP, communication: bool = False): """ Static access method. """ if TFFramework.__instance is None: - TFFramework.__instance = TFFramework(profiling) + TFFramework.__instance = TFFramework(profiling, model, communication) return TFFramework.__instance @dlp.log @@ -87,9 +92,18 @@ def trace_object(self, string, step, r): @dlp.log def compute(self, batch, epoch_number, step, computation_time): - return self.model(batch, computation_time) + return self.model(epoch_number, batch, computation_time) # tf.function(self.model)(epoch_number, step, computation_time) + + def model(self, epoch, batch, computation_time): + if self._model is None: + sleep(computation_time + ) + else: + self._model.compute(batch) + + @dlp.log def get_loader(self, dataset_type=DatasetType.TRAIN): if dataset_type == DatasetType.TRAIN: @@ -145,3 +159,4 @@ def get_data(self, id, data, offset=None, length=None): @dlp.log def isfile(self, id): return tf.io.gfile.exists(id) and not tf.io.gfile.isdir(id) + diff --git a/dlio_benchmark/framework/torch_framework.py b/dlio_benchmark/framework/torch_framework.py index d3398f55..d13be261 100644 --- a/dlio_benchmark/framework/torch_framework.py +++ b/dlio_benchmark/framework/torch_framework.py @@ -1,22 +1,28 @@ """ - Copyright (c) 2025, UChicago Argonne, LLC - All Rights Reserved - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. +Copyright (c) 2025, UChicago Argonne, LLC +All Rights Reserved + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. """ from dlio_benchmark.common.error_code import ErrorCodes -from dlio_benchmark.common.enumerations import FormatType, FrameworkType, DatasetType, DataLoaderType +from dlio_benchmark.common.enumerations import ( + FormatType, + FrameworkType, + DatasetType, + DataLoaderType, + Model, +) from dlio_benchmark.data_loader.data_loader_factory import DataLoaderFactory from dlio_benchmark.framework.framework import Framework, DummyTraceObject from dlio_benchmark.common.constants import MODULE_AI_FRAMEWORK @@ -24,6 +30,7 @@ import torch import functools import logging +from dlio_benchmark.model.model_factory import ModelFactory from dlio_benchmark.utils.utility import utcnow, DLIOMPI from dlio_benchmark.utils.utility import Profile @@ -58,10 +65,12 @@ class TorchFramework(Framework): __instance = None @dlp.log_init - def __init__(self, profiling): + def __init__(self, profiling, model: Model = Model.SLEEP, communication: bool = False): super().__init__() self.profiling = profiling self.reader_handler = None + # TODO: Check if we need to add config for gpu, use local_rank or 0 always maybe + self._model = ModelFactory.create_model(FrameworkType.PYTORCH, model, communication, gpu_id=DLIOMPI.get_instance().local_rank()) @dlp.log def init_loader(self, format_type, epoch=0, data_loader=None): @@ -74,10 +83,10 @@ def get_type(self): return FrameworkType.PYTORCH @staticmethod - def get_instance(profiling): - """ Static access method. """ + def get_instance(profiling, model: Model = Model.SLEEP, communication: bool = False): + """Static access method.""" if TorchFramework.__instance is None: - TorchFramework.__instance = TorchFramework(profiling) + TorchFramework.__instance = TorchFramework(profiling, model, communication) return TorchFramework.__instance @dlp.log @@ -94,7 +103,15 @@ def trace_object(self, string, step, r): @dlp.log def compute(self, batch, epoch_number, step, computation_time): - return self.model(batch, computation_time) + return self.model(epoch_number, batch, computation_time) + + def model(self, epoch, batch, computation_time): + if self._model is None: + print("sleeping") + sleep(computation_time) + else: + print("Using model to compute") + self._model.compute(batch) @dlp.log def get_loader(self, dataset_type=DatasetType.TRAIN): diff --git a/dlio_benchmark/main.py b/dlio_benchmark/main.py index 684296e0..720f76b4 100644 --- a/dlio_benchmark/main.py +++ b/dlio_benchmark/main.py @@ -40,7 +40,7 @@ from dlio_benchmark.utils.statscounter import StatsCounter from hydra.core.config_store import ConfigStore from dlio_benchmark.utils.config import LoadConfig, ConfigArguments, GetConfig -from dlio_benchmark.common.enumerations import Profiler, DatasetType, StorageType, MetadataType, FormatType +from dlio_benchmark.common.enumerations import Model, Profiler, DatasetType, StorageType, MetadataType, FormatType from dlio_benchmark.profiler.profiler_factory import ProfilerFactory from dlio_benchmark.framework.framework_factory import FrameworkFactory from dlio_benchmark.data_generator.generator_factory import GeneratorFactory @@ -73,10 +73,8 @@ def __init__(self, cfg): global dftracer, dftracer_initialize, dftracer_finalize t0 = time() - self.args = ConfigArguments.get_instance() + self.args : ConfigArguments = ConfigArguments.get_instance() # type: ignore LoadConfig(self.args, cfg) - self.storage = StorageFactory().get_storage(self.args.storage_type, self.args.storage_root, - self.args.framework) self.output_folder = self.args.output_folder os.makedirs(self.args.output_folder, mode=0o755, exist_ok=True) @@ -85,10 +83,18 @@ def __init__(self, cfg): self.comm_size = self.args.comm_size = DLIOMPI.get_instance().size() self.data_folder = self.args.data_folder self.storage_root = self.args.storage_root + try: + model_enum = Model(self.args.model) + except: + model_enum = Model.DEFAULT + if not self.args.compute: + model_enum = Model.DEFAULT + self.framework = FrameworkFactory().get_framework(self.args.framework, + self.args.do_profiling, model_enum, self.args.communication) + self.storage = StorageFactory().get_storage(self.args.storage_type, self.args.storage_root, + self.args.framework) if self.args.storage_root: self.storage.create_namespace(exist_ok=True) - self.framework = FrameworkFactory().get_framework(self.args.framework, - self.args.do_profiling) # Delete previous logfile if self.my_rank == 0: @@ -126,10 +132,10 @@ def __init__(self, cfg): self.num_subfolders_eval = self.args.num_subfolders_eval self.num_samples = self.args.num_samples_per_file self.total_training_steps = self.args.total_training_steps + self.computation_time = self.args.computation_time self.epochs = self.args.epochs self.batch_size = self.args.batch_size - self.computation_time = self.args.computation_time if self.do_profiling: self.profiler = ProfilerFactory().get_profiler(self.args.profiler) diff --git a/dlio_benchmark/model/__init__.py b/dlio_benchmark/model/__init__.py new file mode 100644 index 00000000..3548ecf1 --- /dev/null +++ b/dlio_benchmark/model/__init__.py @@ -0,0 +1,2 @@ +from dlio_benchmark.model.model_factory import ModelFactory +from dlio_benchmark.model.model import UnifiedModel \ No newline at end of file diff --git a/dlio_benchmark/model/impl/resnet.py b/dlio_benchmark/model/impl/resnet.py new file mode 100644 index 00000000..e8f90e21 --- /dev/null +++ b/dlio_benchmark/model/impl/resnet.py @@ -0,0 +1,173 @@ + + + +from dlio_benchmark.common.enumerations import FrameworkType, Loss +from dlio_benchmark.model.model import UnifiedModel +from typing import Any, Optional, Tuple, Type, Union + +#TODO: Verify correctness of resnet50 + +class ResNet50Block: + """ResNet50 basic block""" + + def __init__(self, layer_factory, framework: FrameworkType, in_channels: int, + out_channels: int, stride: int = 1, downsample: Optional[Any] = None): + self.framework = framework + self.conv1 = layer_factory.conv2d(in_channels, out_channels, 1, bias=False) + self.bn1 = layer_factory.batch_norm(out_channels) + self.conv2 = layer_factory.conv2d(out_channels, out_channels, 3, + stride=stride, padding=1, bias=False) + self.bn2 = layer_factory.batch_norm(out_channels) + self.conv3 = layer_factory.conv2d(out_channels, out_channels * 4, 1, bias=False) + self.bn3 = layer_factory.batch_norm(out_channels * 4) + self.relu = layer_factory.relu() + self.downsample = downsample + + def __call__(self, x: Any) -> Any: + residual = x + + out = self.conv1(x) + out = self.bn1(out) + out = self.relu(out) + + out = self.conv2(out) + out = self.bn2(out) + out = self.relu(out) + + out = self.conv3(out) + out = self.bn3(out) + + if self.downsample is not None: + residual = self.downsample(x) + + # Element-wise addition + if self.framework == FrameworkType.PYTORCH: + out += residual + else: # TensorFlow + import tensorflow as tf + out = tf.add(out, residual) + + out = self.relu(out) + return out + +class ResNet50(UnifiedModel): + """ResNet50 implementation""" + + def __init__(self, framework: FrameworkType, communication, gpu_id, num_classes: int = 1000): + super().__init__(framework, Loss.CE, communication, gpu_id) + self.num_classes = num_classes + self.build_model() + # bound_forward = partial(self.forward, self) + self._model = self.layer_factory.get_model(self.forward) + if framework == FrameworkType.PYTORCH: + # TODO Make configurable + import torch + self.layer_factory.set_optimizer(torch.optim.SGD, 1, + momentum=1, + weight_decay=1) + else: + import tensorflow as tf + self.layer_factory.set_optimizer(tf.optimizers.SGD, 0.1) + + def build_model(self): + """Build ResNet50 architecture""" + # Initial convolution + self.conv1 = self.layer_factory.conv2d(3, 64, 7, stride=2, padding=3, bias=False) + self.bn1 = self.layer_factory.batch_norm(64) + self.relu = self.layer_factory.relu() + self.maxpool = self.layer_factory.max_pool2d(3, stride=2) + + # ResNet blocks (simplified - just showing the concept) + self.layer1 = self._make_layer(64, 64, 3, stride=1) + self.layer2 = self._make_layer(256, 128, 4, stride=2) + self.layer3 = self._make_layer(512, 256, 6, stride=2) + self.layer4 = self._make_layer(1024, 512, 3, stride=2) + + # Final layers + self.avgpool = self.layer_factory.adaptive_avg_pool2d((1, 1)) + self.flatten = self.layer_factory.flatten() + self.fc = self.layer_factory.linear(2048, self.num_classes) + + def _make_layer(self, in_channels: int, out_channels: int, blocks: int, stride: int = 1): + """Create a ResNet layer with multiple blocks""" + layers = [] + downsample = None + + if stride != 1 or in_channels != out_channels * 4: + downsample_conv = self.layer_factory.conv2d(in_channels, out_channels * 4, 1, + stride=stride, bias=False) + downsample_bn = self.layer_factory.batch_norm(out_channels * 4) + # In practice, you'd need to sequence these layers properly + downsample = lambda x: downsample_bn(downsample_conv(x)) + + layers.append(ResNet50Block(self.layer_factory, self.framework, in_channels, + out_channels, stride, downsample)) + + for _ in range(1, blocks): + layers.append(ResNet50Block(self.layer_factory, self.framework, + out_channels * 4, out_channels)) + + return layers + + def forward(self, x: Any) -> Any: + """Forward pass through ResNet50""" + x = self.conv1(x) + x = self.bn1(x) + x = self.relu(x) + x = self.maxpool(x) + + # Apply ResNet blocks + for block in self.layer1: + x = block(x) + for block in self.layer2: + x = block(x) + for block in self.layer3: + x = block(x) + for block in self.layer4: + x = block(x) + + x = self.avgpool(x) + x = self.flatten(x) + x = self.fc(x) + + return x + + def validate_data(self, data: Any) -> Tuple[Any, Any]: + try: + if self.framework == FrameworkType.PYTORCH: + import torch + if isinstance(data, torch.Tensor): + if len(data.shape) == 3: + # Duplicate array thrice to make three channels + data = data.unsqueeze(1).repeat(1, 3, 1, 1) + elif len(data.shape) == 2: + data = data.unsqueeze(1).unsqueeze(2).repeat(1, 3, 1, 1) + input_data = data.float() + # We can generate target data, since it is not input/output + target = torch.zeros((input_data.shape[0], self.num_classes)) + else: + input_data, target = data + else: # TensorFlow + import tensorflow as tf + if isinstance(data, tf.Tensor): + if len(data.shape) == 3: + # Duplicate array thrice to make three channels + data = tf.expand_dims(data, axis=1) + data = tf.repeat(data, repeats=3, axis=1) + # this gives shape (1,3, x,x) + # I need shape (1, x, x, 3) + data = tf.transpose(data, perm=[0, 2, 3, 1]) + elif len(data.shape) == 2: + data = tf.reshape(data, [1, *data.shape, 1]) + + # cast to float32 + input_data = tf.cast(data, tf.float32) + data = tf.cast(data, tf.float32) + target = tf.zeros((input_data.shape[0],1000)) + else: + input_data, target = data + except Exception as e: + raise ValueError(f"Invalid data format: {e}") + + return input_data, target + \ No newline at end of file diff --git a/dlio_benchmark/model/layer.py b/dlio_benchmark/model/layer.py new file mode 100644 index 00000000..29281be9 --- /dev/null +++ b/dlio_benchmark/model/layer.py @@ -0,0 +1,122 @@ +from abc import ABC, abstractmethod + +from typing import Any, Optional, Tuple, Type + + +class LayerFactoryBase(ABC): + """Abstract base class for layer factory classes.""" + + @abstractmethod + def compute(self, input_data: Any, target: Any) -> None: + pass + + + @abstractmethod + def set_optimizer(self, optimizer): + pass + + @abstractmethod + def conv2d( + self, + in_channels: int, + out_channels: int, + kernel_size: int, + stride: int = 1, + padding: int = 0, + bias: bool = True, + ): + pass + + @abstractmethod + def conv1d( + self, + in_channels: int, + out_channels: int, + kernel_size: int, + stride: int = 1, + padding: int = 0, + bias: bool = True, + ): + pass + + @abstractmethod + def batch_norm(self, num_features: int): + pass + + @abstractmethod + def relu( + self, + ): + pass + + @abstractmethod + def leaky_relu(self, negative_slope: float = 0.01): + pass + + @abstractmethod + def sigmoid( + self, + ): + pass + + @abstractmethod + def tanh( + self, + ): + pass + + @abstractmethod + def softmax(self, dim: int = -1): + pass + + @abstractmethod + def max_pool2d(self, kernel_size: int, stride: Optional[int] = None): + pass + + @abstractmethod + def adaptive_avg_pool2d(self, output_size: Tuple[int, int]): + pass + + @abstractmethod + def linear(self, in_features: int, out_features: int, bias: bool = True): + pass + + @abstractmethod + def flatten( + self, + ): + pass + + @abstractmethod + def dropout(self, p: float = 0.5): + pass + + @abstractmethod + def lstm( + self, + input_size: int, + hidden_size: int, + num_layers: int = 1, + batch_first: bool = True, + dropout: float = 0.0, + ): + pass + + @abstractmethod + def gru( + self, + input_size: int, + hidden_size: int, + num_layers: int = 1, + batch_first: bool = True, + dropout: float = 0.0, + ): + pass + + @abstractmethod + def embedding(self, num_embeddings: int, embedding_dim: int): + pass + + @abstractmethod + def layer_norm(self, normalized_shape: int): + pass diff --git a/dlio_benchmark/model/loss_fn.py b/dlio_benchmark/model/loss_fn.py new file mode 100644 index 00000000..ed8bcf94 --- /dev/null +++ b/dlio_benchmark/model/loss_fn.py @@ -0,0 +1,30 @@ + + + +from typing import Any +from dlio_benchmark.common.enumerations import Loss + + +def torch_loss(loss: Loss) -> Any: + """Convert a Loss enumeration to a PyTorch loss function.""" + import torch.nn as nn + if loss == Loss.MSE: + return nn.MSELoss() + elif loss == Loss.CE: + return nn.CrossEntropyLoss() + elif loss == Loss.NONE: + return None + else: + raise ValueError(f"Unsupported loss function: {loss}") + + +def tf_loss(loss: Loss) -> Any: + import tensorflow.keras.losses as losses # type: ignore + if loss == Loss.MSE: + return losses.MeanSquaredError() + elif loss == Loss.CE: + return losses.CategoricalCrossentropy() + elif loss == Loss.NONE: + return None + else: + raise ValueError(f"Unsupported loss function: {loss}") diff --git a/dlio_benchmark/model/model.py b/dlio_benchmark/model/model.py new file mode 100644 index 00000000..aa90283e --- /dev/null +++ b/dlio_benchmark/model/model.py @@ -0,0 +1,56 @@ + + +from dlio_benchmark.common.enumerations import FrameworkType, Loss + +from abc import ABC, abstractmethod +from typing import Any, Tuple, Type + +from dlio_benchmark.model.loss_fn import tf_loss, torch_loss +from dlio_benchmark.model.tf_layer import TensorFlowLayers +from dlio_benchmark.model.torch_layer import PyTorchLayers + + + +class UnifiedModel(ABC): + """Abstract base class for unified models""" + + def __init__(self, framework: FrameworkType, loss_function: Loss, communication: bool = False, gpu_id: int = -1) -> None: + self.framework = framework + self.layers = [] + self._model = None + # Initialize the appropriate layer factory + if framework == FrameworkType.PYTORCH: + self.layer_factory = PyTorchLayers(torch_loss(loss_function), communication, gpu_id) + elif framework == FrameworkType.TENSORFLOW: + self.layer_factory = TensorFlowLayers(tf_loss(loss_function), communication, gpu_id) + else: + raise ValueError(f"Unsupported framework: {framework}") + + @abstractmethod + def build_model(self) -> None: + """Build the model architecture""" + pass + + @abstractmethod + def forward(self, x: Any) -> Any: + """Forward pass through the model""" + pass + + @abstractmethod + def validate_data(self, data: Any) -> Tuple[Any, Any]: + """Validate the input batch data and return input and target tensors""" + pass + + + def compute(self, batch): + input_data, target_data = self.validate_data(batch) + self.layer_factory.compute(input_data, target_data) + +class TorchModel(UnifiedModel): + """Torch implementation of the unified model""" + + def __init__(self): + super().__init__(FrameworkType.PYTORCH) + + + diff --git a/dlio_benchmark/model/model_factory.py b/dlio_benchmark/model/model_factory.py new file mode 100644 index 00000000..624bf236 --- /dev/null +++ b/dlio_benchmark/model/model_factory.py @@ -0,0 +1,16 @@ +from dlio_benchmark.common.enumerations import FrameworkType, Model +from dlio_benchmark.model.model import UnifiedModel +from dlio_benchmark.model.impl.resnet import ResNet50 + + +class ModelFactory: + def __init__(self) -> None: + pass + + @staticmethod + def create_model(framework: FrameworkType, model_type: Model, communication: bool = False, gpu_id: int = -1) -> UnifiedModel: + if model_type == Model.RESNET: + return ResNet50(framework, communication, gpu_id) + elif model_type in (Model.SLEEP, Model.DEFAULT): + return None + raise ValueError(f"Unsupported model type: {model_type}") \ No newline at end of file diff --git a/dlio_benchmark/model/tf_layer.py b/dlio_benchmark/model/tf_layer.py new file mode 100644 index 00000000..ba05e80b --- /dev/null +++ b/dlio_benchmark/model/tf_layer.py @@ -0,0 +1,276 @@ +from typing import Any, Optional, Tuple +import tensorflow as tf +from tensorflow import keras + +from dlio_benchmark.model.layer import LayerFactoryBase # type: ignore + + +class TensorFlowLayers(LayerFactoryBase): + """Factory class for creating TensorFlow layers""" + + def __init__(self, loss_function, communication: bool = False, gpu_id: int = -1) -> None: + super().__init__() + self._model = None + self._optimizer = None + self._loss_function = loss_function + self._layer_registry = {} # Track created layers similar to PyTorch + self.communication = communication + self.gpu_id = gpu_id + + def _register_layer(self, layer: keras.layers.Layer, name: Optional[str] = None) -> keras.layers.Layer: + """Register a layer for automatic model building""" + if name is None: + name = f"layer_{len(self._layer_registry)}" + self._layer_registry[name] = layer + return layer + + def conv2d( + self, + in_channels: int, + out_channels: int, + kernel_size: int, + stride: int = 1, + padding: int = 0, + bias: bool = True, + ): + layer = keras.layers.Conv2D( + filters=out_channels, + kernel_size=kernel_size, + strides=stride, + padding="same" if padding > 0 else "valid", + use_bias=bias, + ) + return self._register_layer(layer) + + def conv1d( + self, + in_channels: int, + out_channels: int, + kernel_size: int, + stride: int = 1, + padding: int = 0, + bias: bool = True, + ): + layer = keras.layers.Conv1D( + filters=out_channels, + kernel_size=kernel_size, + strides=stride, + padding="same" if padding > 0 else "valid", + use_bias=bias, + ) + return self._register_layer(layer) + + def batch_norm(self, num_features: int): + layer = keras.layers.BatchNormalization() + return self._register_layer(layer) + + def relu(self): + layer = keras.layers.ReLU() + return self._register_layer(layer) + + def leaky_relu(self, negative_slope: float = 0.01): + layer = keras.layers.LeakyReLU(alpha=negative_slope) + return self._register_layer(layer) + + def sigmoid(self): + layer = keras.layers.Activation("sigmoid") + return self._register_layer(layer) + + def tanh(self): + layer = keras.layers.Activation("tanh") + return self._register_layer(layer) + + def softmax(self, dim: int = -1): + layer = keras.layers.Softmax(axis=dim) + return self._register_layer(layer) + + def max_pool2d(self, kernel_size: int, stride: Optional[int] = None): + if stride is None: + stride = kernel_size + layer = keras.layers.MaxPooling2D(pool_size=kernel_size, strides=stride) + return self._register_layer(layer) + + def adaptive_avg_pool2d(self, output_size: Tuple[int, int]): + if output_size == (1, 1): + layer = keras.layers.GlobalAveragePooling2D() + else: + raise NotImplementedError( + "Adaptive pooling with custom output size not implemented for TensorFlow" + ) + return self._register_layer(layer) + + def linear(self, in_features: int, out_features: int, bias: bool = True): + layer = keras.layers.Dense(out_features, use_bias=bias) + return self._register_layer(layer) + + def flatten(self): + layer = keras.layers.Flatten() + return self._register_layer(layer) + + def dropout(self, p: float = 0.5): + layer = keras.layers.Dropout(rate=p) + return self._register_layer(layer) + + def lstm( + self, + input_size: int, + hidden_size: int, + num_layers: int = 1, + batch_first: bool = True, + dropout: float = 0.0, + ): + layer = keras.layers.LSTM( + hidden_size, dropout=dropout, return_sequences=num_layers > 1 + ) + return self._register_layer(layer) + + def gru( + self, + input_size: int, + hidden_size: int, + num_layers: int = 1, + batch_first: bool = True, + dropout: float = 0.0, + ): + layer = keras.layers.GRU( + hidden_size, dropout=dropout, return_sequences=num_layers > 1 + ) + return self._register_layer(layer) + + def embedding(self, num_embeddings: int, embedding_dim: int): + layer = keras.layers.Embedding(num_embeddings, embedding_dim) + return self._register_layer(layer) + + def layer_norm(self, normalized_shape: int): + layer = keras.layers.LayerNormalization() + return self._register_layer(layer) + def get_model(self, forward_fn: Any) -> keras.Model: + """ + Constructs and returns a Keras Model. + Args: + forward_fn: The function that defines the forward pass of the model. + This function will be used as the `call` method of the Keras Model. + Returns: + A Keras Model instance. + """ + if self._model is not None: + return self._model + + class ModelWrapper(keras.Model): + """ + A Keras Model subclass that wraps the forward pass and tracks layers. + It uses the provided `forward_fn` for its `call` method. + """ + def __init__(self, layer_registry): + super().__init__() + self._layer_dict = {} + + # Register all layers from the factory's registry as attributes of this model + for name, layer in layer_registry.items(): + setattr(self, name, layer) + self._layer_dict[name] = layer + + def call(self, inputs: Any, training: Optional[bool] = None) -> Any: + """ + The forward pass method for the Keras Model. + It calls the provided `forward_fn`. + """ + # The forward_fn is expected to be a partial function (e.g., partial(self.forward, self)) + # where the first 'self' is the actual model instance (like ResNet50). + # So, simply calling forward_fn with inputs is correct. + return forward_fn(inputs) + + def get_layer_dict(self): + """Returns the dictionary of registered layers.""" + return self._layer_dict + + def list_layers(self): + """Prints all registered layers.""" + print("Registered layers:") + for name, layer in self._layer_dict.items(): + print(f" {name}: {type(layer).__name__}") + + # Instantiate the ModelWrapper + self._model = ModelWrapper(self._layer_registry) + if self.communication: + import horovod.tensorflow as hvd + hvd.init() + self._model = hvd.DistributedOptimizer(self._model) + + # layer_dict = self._model.get_layer_dict() + # print(f"Registered layers: {list(layer_dict.keys())}") + + # print("Trainable variables:") + # if not self._model.trainable_variables: + # print(" No trainable variables found yet. They will appear after the model processes its first input.") + # for var in self._model.trainable_variables: + # print(f" {var.name}: {var.shape}") + + return self._model + + + def set_optimizer(self, optimizer, *args, **kwargs): + assert self._model is not None, "Model must be set before optimizer." + self._optimizer = optimizer(*args, **kwargs) + self._model.compile(optimizer=self._optimizer) + + + def compute(self, input_data, target) -> None: + assert self._model is not None, "Model must be set before compute step." + assert self._optimizer is not None, "Optimizer must be set before compute step." + + # Perform the forward pass and backward pass within this function + # This ensures the GradientTape correctly records all operations. + # print Input shape of _model + + if self.communication: + raise NotImplementedError("Distributed training with tensorflow ( horovod ) is not implemented.") + import horovod.tensorflow as hvd + tape = hvd.DistributedGradientTape() + else: + tape = tf.GradientTape() + with tape: + # 1. Forward pass + pred = self._model(input_data, training=True) + + # Convert target to tensor if needed + target = tf.convert_to_tensor(target) + + # 2. Calculate loss + # TODO: Make loss configurable + loss = self._loss_function(target, pred) + # loss = keras.losses.MeanSquaredError()(target, pred) + + # 3. Calculate gradients + gradients = tape.gradient(loss, self._model.trainable_variables) + + # 4. Apply gradients (backward pass) + filtered_gradients_and_vars = [] + for grad, var in zip(gradients, self._model.trainable_variables): + if grad is not None: + filtered_gradients_and_vars.append((grad, var)) + else: + print(f"Warning: Gradient is None for variable: {var.name}. Skipping update for this variable.") + + if not filtered_gradients_and_vars: + print("Warning: No valid gradients found to apply. Optimizer step skipped.") + else: + self._optimizer.apply_gradients(filtered_gradients_and_vars) + + + + def reset_model(self): + """Reset the model to allow creating a new one""" + self._model = None + self._optimizer = None + self._layer_registry = {} + + def get_layer_registry(self): + """Return the current layer registry""" + return self._layer_registry.copy() + + def list_registered_layers(self): + """Print all registered layers in the factory""" + print("Factory layer registry:") + for name, layer in self._layer_registry.items(): + print(f" {name}: {type(layer).__name__}") \ No newline at end of file diff --git a/dlio_benchmark/model/torch_layer.py b/dlio_benchmark/model/torch_layer.py new file mode 100644 index 00000000..5b68870e --- /dev/null +++ b/dlio_benchmark/model/torch_layer.py @@ -0,0 +1,263 @@ +from typing import Any, Optional, Tuple +import torch +import torch.nn as nn + + + +class PyTorchLayers: + """Factory class for creating PyTorch layers""" + + def __init__(self, loss_function, communication: bool = False, gpu_id: int = -1) -> None: + super().__init__() + self._optimizer = None + self._model = None + self._loss_function = loss_function + self._layer_registry = {} # Track created layers + self.communication = communication + self.gpu_id = gpu_id + + def _register_layer(self, layer: nn.Module, name: Optional[str] = None) -> nn.Module: + """Register a layer for automatic model building""" + if name is None: + name = f"layer_{len(self._layer_registry)}" + self._layer_registry[name] = layer + return layer + + def conv2d( + self, + in_channels: int, + out_channels: int, + kernel_size: int, + stride: int = 1, + padding: int = 0, + bias: bool = True, + ): + layer = nn.Conv2d( + in_channels, + out_channels, + kernel_size, + stride=stride, + padding=padding, + bias=bias, + ) + return self._register_layer(layer) + + def conv1d( + self, + in_channels: int, + out_channels: int, + kernel_size: int, + stride: int = 1, + padding: int = 0, + bias: bool = True, + ): + layer = nn.Conv1d( + in_channels, + out_channels, + kernel_size, + stride=stride, + padding=padding, + bias=bias, + ) + return self._register_layer(layer) + + def batch_norm(self, num_features: int): + layer = nn.BatchNorm2d(num_features) + return self._register_layer(layer) + + def relu(self): + layer = nn.ReLU(inplace=True) + return self._register_layer(layer) + + def leaky_relu(self, negative_slope: float = 0.01): + layer = nn.LeakyReLU(negative_slope=negative_slope, inplace=True) + return self._register_layer(layer) + + def sigmoid(self): + layer = nn.Sigmoid() + return self._register_layer(layer) + + def tanh(self): + layer = nn.Tanh() + return self._register_layer(layer) + + def softmax(self, dim: int = -1): + layer = nn.Softmax(dim=dim) + return self._register_layer(layer) + + def max_pool2d(self, kernel_size: int, stride: Optional[int] = None): + if stride is None: + stride = kernel_size + layer = nn.MaxPool2d(kernel_size, stride=stride) + return self._register_layer(layer) + + def adaptive_avg_pool2d(self, output_size: Tuple[int, int]): + layer = nn.AdaptiveAvgPool2d(output_size) + return self._register_layer(layer) + + def linear(self, in_features: int, out_features: int, bias: bool = True): + layer = nn.Linear(in_features, out_features, bias=bias) + return self._register_layer(layer) + + def flatten(self): + layer = nn.Flatten() + return self._register_layer(layer) + + def dropout(self, p: float = 0.5): + layer = nn.Dropout(p=p) + return self._register_layer(layer) + + def lstm( + self, + input_size: int, + hidden_size: int, + num_layers: int = 1, + batch_first: bool = True, + dropout: float = 0.0, + ): + layer = nn.LSTM( + input_size, + hidden_size, + num_layers=num_layers, + batch_first=batch_first, + dropout=dropout, + ) + return self._register_layer(layer) + + def gru( + self, + input_size: int, + hidden_size: int, + num_layers: int = 1, + batch_first: bool = True, + dropout: float = 0.0, + ): + layer = nn.GRU( + input_size, + hidden_size, + num_layers=num_layers, + batch_first=batch_first, + dropout=dropout, + ) + return self._register_layer(layer) + + def embedding(self, num_embeddings: int, embedding_dim: int): + layer = nn.Embedding(num_embeddings, embedding_dim) + return self._register_layer(layer) + + def layer_norm(self, normalized_shape: int): + layer = nn.LayerNorm(normalized_shape) + return self._register_layer(layer) + + def get_model(self, forward_fn: Any, ) -> nn.Module: + if self._model is not None: + return self._model + + # If communication init process group + if self.communication: + from dlio_benchmark.utils.utility import DLIOMPI + import torch.distributed as dist + import socket + import os + rank = DLIOMPI.get_instance().rank() + if rank == 0: + master_addr = socket.gethostname() + else: + master_addr = None + master_addr = DLIOMPI.get_instance().comm().bcast(master_addr, root=0) + world_size = DLIOMPI.get_instance().size() + os.environ["MASTER_ADDR"] = master_addr + os.environ["MASTER_PORT"] = str(2345) + dist.init_process_group(backend="nccl" if torch.cuda.is_available() else "gloo", rank=rank, world_size=world_size) + + class Model(nn.Module): + def __init__(self, layer_registry): + super().__init__() + # Register all layers from the factory + for name, layer in layer_registry.items(): + setattr(self, name, layer) + + def _register_layer_list(self, layer_list, list_name): + """Register a list of layers or blocks""" + for i, item in enumerate(layer_list): + if hasattr(item, 'conv1'): # This is a ResNet block + self._register_block_layers(item, f"{list_name}_{i}") + elif isinstance(item, nn.Module): + setattr(self, f"{list_name}_{i}", item) + else: + print(f"Warning: {list_name} contains unsupported type: {type(item)}") + + def _register_block_layers(self, block, prefix): + """Register all layers from a framework-agnostic block""" + # Register individual layers from the block + for attr_name in ['conv1', 'bn1', 'conv2', 'bn2', 'conv3', 'bn3', 'relu']: + if hasattr(block, attr_name): + layer = getattr(block, attr_name) + if isinstance(layer, nn.Module): + setattr(self, f"{prefix}_{attr_name}", layer) + else: + print(f"Warning: {attr_name} in {prefix} is not a nn.Module: {type(layer)}") + else: + print(f"Warning: {attr_name} not found in block {prefix}") + + # Handle downsample if it exists + if hasattr(block, 'downsample') and block.downsample is not None: + if isinstance(block.downsample, nn.Module): + setattr(self, f"{prefix}_downsample", block.downsample) + elif callable(block.downsample): + # If downsample is a lambda/function, we need to register any layers it might use + # This is more complex, but we can try to extract from the function's closure + try: + # Get the closure variables (this works for lambdas that capture variables) + if hasattr(block.downsample, '__closure__') and block.downsample.__closure__: + closure_vars = block.downsample.__closure__ + for i, cell in enumerate(closure_vars): + if cell.cell_contents and isinstance(cell.cell_contents, nn.Module): + setattr(self, f"{prefix}_downsample_{i}", cell.cell_contents) + except: + # If we can't extract, just skip + print("Warning: Could not register downsample layers from closure.") + pass + else: + print(f"Warning: Unsupported downsample type for {prefix}: {type(block.downsample)}") + + def forward(self, x: Any) -> Any: + return forward_fn(x) + + self._model = Model(self._layer_registry) + #TODO: Set gpu - do we set by rank? + if self.gpu_id >= 0: + if torch.cuda.is_available(): + self._model = self._model.cuda("cuda:{}".format(self.gpu_id)) + else: + print("Warning: CUDA not available, running on CPU.") + self._model = self._model.cpu() + if self.communication: + from torch.nn.parallel import DistributedDataParallel as DDP + self._model = DDP(self._model) + return self._model + def set_optimizer(self, optimizer, *args, **kwargs): + assert self._model is not None, "Model must be set before optimizer." + self._optimizer = optimizer(self._model.parameters(), *args, **kwargs) + + def compute(self, input_data, target) -> None: + assert self._model is not None + assert self._optimizer is not None + + self._model.zero_grad() + if self.gpu_id >= 0 and torch.cuda.is_available(): + input_data = input_data.cuda("cuda:{}".format(self.gpu_id)) + target = target.cuda("cuda:{}".format(self.gpu_id)) + pred = self._model(input_data) + + loss = self._loss_function(pred, target) + loss.backward() + # print("weights before update:") + # for name, param in self._model.named_parameters(): + # if param.requires_grad: + # print(f"{name}: {param.data}") + self._optimizer.step() + # print("weights after update:") + # for name, param in self._model.named_parameters(): + # if param.requires_grad: + # print(f"{name}: {param.data}") diff --git a/dlio_benchmark/utils/config.py b/dlio_benchmark/utils/config.py index e9ecf2f9..f3d0ed39 100644 --- a/dlio_benchmark/utils/config.py +++ b/dlio_benchmark/utils/config.py @@ -25,7 +25,7 @@ from typing import Any, Dict, List, ClassVar from dlio_benchmark.common.constants import MODULE_CONFIG -from dlio_benchmark.common.enumerations import StorageType, FormatType, Shuffle, ReadType, FileAccess, Compression, \ +from dlio_benchmark.common.enumerations import Model, StorageType, FormatType, Shuffle, ReadType, FileAccess, Compression, \ FrameworkType, \ DataLoaderType, Profiler, DatasetType, DataLoaderSampler, CheckpointLocationType, CheckpointMechanismType, CheckpointModeType from dlio_benchmark.utils.utility import DLIOMPI, get_trace_name, utcnow @@ -43,7 +43,7 @@ class ConfigArguments: # command line argument # Framework to use - model: str = "default" + model: Model = Model.DEFAULT framework: FrameworkType = FrameworkType.TENSORFLOW # Dataset format, such as PNG, JPEG format: FormatType = FormatType.TFRECORD @@ -87,6 +87,8 @@ class ConfigArguments: read_threads: int = 1 dont_use_mmap: bool = False computation_threads: int = 1 + communication: bool = False + compute: bool = False computation_time: ClassVar[Dict[str, Any]] = {} preprocess_time: ClassVar[Dict[str, Any]] = {} prefetch_size: int = 2 @@ -294,6 +296,11 @@ def validate(self): raise Exception("To perform subset Checkpointing, please set a target data parallelism: workload.parallelism.data.") elif self.data_parallelism * self.tensor_parallelism * self.pipeline_parallelism < self.comm_size: raise Exception(f"Comm size: {self.comm_size} is larger than 3D parallelism size: {self.data_parallelism * self.tensor_parallelism * self.pipeline_parallelism}") + + if self.do_train and self.model in (Model.DEFAULT, Model.SLEEP) and not self.computation_time: + # TODO: Is this a good check? + raise Exception(f"workload.model.name is not set and workload.train.computation_time is not set. Please set one of them.") + if self.checkpoint_mode == CheckpointModeType.DEFAULT: if self.comm_size % (self.pipeline_parallelism * self.tensor_parallelism) != 0: raise Exception(f"Number of processes {self.comm_size} is not a multiple of model parallelism size: {self.pipeline_parallelism * self.tensor_parallelism}") @@ -590,6 +597,10 @@ def GetConfig(args, key): value = args.computation_time.get("stdev", None) elif keys[1] == "seed": value = args.seed + elif keys[1] == "communication": + value = args.communication + elif keys[1] == "compute": + value = args.compute if len(keys) > 1 and keys[0] == "evaluation": if keys[1] == "eval_time": @@ -839,6 +850,10 @@ def LoadConfig(args, config): args.computation_time["stdev"] = config['train']['computation_time_stdev'] if 'seed' in config['train']: args.seed = config['train']['seed'] + if 'communication' in config['train']: + args.communication = config['train']['communication'] + if 'compute' in config['train']: + args.compute = config['train']['compute'] if 'evaluation' in config: args.eval_time = {} @@ -904,7 +919,12 @@ def LoadConfig(args, config): if 'model' in config: if 'name' in config['model']: - args.model = config['model']['name'] + try: + args.model = Model(config['model']['name']) + except ValueError as e: + # This is to maintain compatibility with older configurations + #TODO: Do we need to set computation time here as well? + args.model = Model.SLEEP if 'type' in config['model']: args.model_type = config['model']['type'] if 'model_size_bytes' in config['model']: diff --git a/tests/dlio_benchmark_test.py b/tests/dlio_benchmark_test.py index b9a0e0e7..8b635330 100644 --- a/tests/dlio_benchmark_test.py +++ b/tests/dlio_benchmark_test.py @@ -21,6 +21,8 @@ import shutil from mpi4py import MPI import pathlib + +from dlio_benchmark.common.enumerations import FrameworkType comm = MPI.COMM_WORLD import pytest import time @@ -608,6 +610,8 @@ def test_custom_storage_root_train(fmt, framework) -> None: clean(storage_root) finalize() + + compute_time_distributions = { "uniform": {"type": "uniform", "min": 1.0, "max": 2.0}, "normal": {"type": "normal", "mean": 1.0, "stdev": 1.0}, @@ -653,5 +657,74 @@ def test_computation_time_distribution(request, dist) -> None: clean() finalize() + +@pytest.mark.parametrize("framework", [FrameworkType.TENSORFLOW, FrameworkType.PYTORCH]) +def test_resnet_model_with_compute_enabled(framework)-> None: + init() + clean() + with initialize_config_dir(version_base=None, config_dir=config_dir): + cfg = compose( + config_name="config", + overrides=[ + "++workload.workflow.train=True", + "++workload.workflow.generate_data=True", + "++workload.model.name=resnet50", + f"++workload.framework={framework}", + f"++workload.reader.data_loader={framework}", + "++workload.train.epochs=1", + "++workload.dataset.num_files_train=16", + "++workload.reader.read_threads=1", + "++workload.train.compute=True", + ], + ) + benchmark = run_benchmark(cfg) + finalize() + + +@pytest.mark.parametrize("framework", [FrameworkType.TENSORFLOW, FrameworkType.PYTORCH]) +def test_resnet_model_with_comms_enabled(framework) -> None: + init() + clean() + if comm.rank == 0: + logging.info("") + logging.info("=" * 80) + logging.info( + f" DLIO test for ResNet model with format in {framework} framework" + ) + logging.info("=" * 80) + with initialize_config_dir(version_base=None, config_dir=config_dir): + cfg = compose( + config_name="config", + overrides=[ + "++workload.workflow.train=True", + "++workload.workflow.generate_data=True", + "++workload.model.name=resnet50", + f"++workload.framework={framework}", + f"++workload.reader.data_loader={framework}", + "++workload.train.epochs=1", + "++workload.dataset.num_files_train=16", + "++workload.dataset.num_samples_per_file=256", + "++workload.dataset.record_length_bytes=114660.07", + "++workload.dataset.record_length_bytes_resize=150528", + "++workload.reader.read_threads=1", + "++workload.train.communication=True", + "++workload.train.compute=True", + ], + ) + benchmark = run_benchmark(cfg) + finalize() + + if __name__ == '__main__': - unittest.main() + def main(): + # for framework in [FrameworkType.TENSORFLOW, FrameworkType.PYTORCH]: + for framework in [FrameworkType.PYTORCH]: + print(f"\nRunning test_resnet_model_with_comms_enabled for {framework}...") + try: + test_resnet_model_with_comms_enabled(framework) + print(f"[SUCCESS] test_resnet_model_with_comms_enabled({framework}) completed.") + except Exception as e: + print(f"[FAIL] test_resnet_model_with_comms_enabled({framework}) failed: {e}") + + if __name__ == '__main__': + main() diff --git a/tests/unit_model_factory.py b/tests/unit_model_factory.py new file mode 100644 index 00000000..eb21c42e --- /dev/null +++ b/tests/unit_model_factory.py @@ -0,0 +1,32 @@ +from dlio_benchmark.common.enumerations import FrameworkType, Model +import pytest + +import torch +import tensorflow as tf + +from dlio_benchmark.model import ModelFactory + +params = [ + (FrameworkType.PYTORCH, Model.RESNET, (1, 3, 224, 224), torch.Tensor), + (FrameworkType.TENSORFLOW, Model.RESNET, (1, 224, 224, 3), tf.Tensor), + (FrameworkType.PYTORCH, Model.RESNET, (1, 3, 64, 64), torch.Tensor), # Increased size from 32x32 to 64x64 + (FrameworkType.TENSORFLOW, Model.RESNET, (1, 64, 64, 3), tf.Tensor), # Increased size from 32x32 to 64x64 +] + +@pytest.mark.parametrize("framework, model_type, input_shape, expected_type", params) +def test_model_factory_output_type(framework, model_type, input_shape, expected_type): + model = ModelFactory().create_model(framework, model_type, communication=False, gpu_id=0) + if framework == FrameworkType.PYTORCH: + input_data = torch.randn(*input_shape) + else: + input_data = tf.random.normal(input_shape) + # Backward pass + random_out = torch.randn(1, 1000) if framework == FrameworkType.PYTORCH else tf.random.normal((1, 1000)) + assert random_out.shape == (1, 1000) + model.compute([input_data, random_out]) + + +if __name__ == "__main__": + # Run test_model_factory + for param in params: + test_model_factory_output_type(*param) \ No newline at end of file