Skip to content

Commit 78f9194

Browse files
authored
Add functionalities in DataModule and data loaders + tests datasets and DataModule (#453)
* Add num_workers and pin_memory arguments to DataLoader and DataModule tests
1 parent 2866eca commit 78f9194

File tree

5 files changed

+455
-29
lines changed

5 files changed

+455
-29
lines changed

pina/data/data_module.py

Lines changed: 51 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
import warnings
23
from lightning.pytorch import LightningDataModule
34
import torch
45
from ..label_tensor import LabelTensor
@@ -8,6 +9,7 @@
89
from .dataset import PinaDatasetFactory
910
from ..collector import Collector
1011

12+
1113
class DummyDataloader:
1214
""""
1315
Dummy dataloader used when batch size is None. It callects all the data
@@ -57,7 +59,7 @@ def __init__(self, max_conditions_lengths, dataset=None):
5759
self.max_conditions_lengths = max_conditions_lengths
5860
self.callable_function = self._collate_custom_dataloader if \
5961
max_conditions_lengths is None else (
60-
self._collate_standard_dataloader)
62+
self._collate_standard_dataloader)
6163
self.dataset = dataset
6264

6365
def _collate_custom_dataloader(self, batch):
@@ -95,7 +97,7 @@ def __call__(self, batch):
9597

9698

9799
class PinaSampler:
98-
def __new__(self, dataset, batch_size, shuffle, automatic_batching):
100+
def __new__(cls, dataset, shuffle):
99101

100102
if (torch.distributed.is_available() and
101103
torch.distributed.is_initialized()):
@@ -123,15 +125,35 @@ def __init__(self,
123125
batch_size=None,
124126
shuffle=True,
125127
repeat=False,
126-
automatic_batching=False
128+
automatic_batching=False,
129+
num_workers=0,
130+
pin_memory=False,
127131
):
128132
"""
129-
Initialize the object, creating dataset based on input problem
130-
:param problem: Problem where data are defined
131-
:param train_size: number/percentage of elements in train split
132-
:param test_size: number/percentage of elements in test split
133-
:param val_size: number/percentage of elements in evaluation split
134-
:param batch_size: batch size used for training
133+
Initialize the object, creating datasets based on the input problem.
134+
135+
:param problem: The problem defining the dataset.
136+
:type problem: AbstractProblem
137+
:param train_size: Fraction or number of elements in the training split.
138+
:type train_size: float
139+
:param test_size: Fraction or number of elements in the test split.
140+
:type test_size: float
141+
:param val_size: Fraction or number of elements in the validation split.
142+
:type val_size: float
143+
:param predict_size: Fraction or number of elements in the prediction split.
144+
:type predict_size: float
145+
:param batch_size: Batch size used for training. If None, the entire dataset is used per batch.
146+
:type batch_size: int or None
147+
:param shuffle: Whether to shuffle the dataset before splitting.
148+
:type shuffle: bool
149+
:param repeat: Whether to repeat the dataset indefinitely.
150+
:type repeat: bool
151+
:param automatic_batching: Whether to enable automatic batching.
152+
:type automatic_batching: bool
153+
:param num_workers: Number of worker threads for data loading. Default 0 (serial loading)
154+
:type num_workers: int
155+
:param pin_memory: Whether to use pinned memory for faster data transfer to GPU. (Default False)
156+
:type pin_memory: bool
135157
"""
136158
logging.debug('Start initialization of Pina DataModule')
137159
logging.info('Start initialization of Pina DataModule')
@@ -170,6 +192,15 @@ def __init__(self,
170192
collector = Collector(problem)
171193
collector.store_fixed_data()
172194
collector.store_sample_domains()
195+
if batch_size is None and num_workers != 0:
196+
warnings.warn(
197+
"Setting num_workers when batch_size is None has no effect on "
198+
"the DataLoading process.")
199+
if batch_size is None and pin_memory:
200+
warnings.warn("Setting pin_memory to True has no effect when "
201+
"batch_size is None.")
202+
self.num_workers = num_workers
203+
self.pin_memory = pin_memory
173204
self.collector_splits = self._create_splits(collector, splits_dict)
174205
self.transfer_batch_to_device = self._transfer_batch_to_device
175206

@@ -271,20 +302,27 @@ def _apply_shuffle(condition_dict, len_data):
271302
dataset_dict[key].update({condition_name: data})
272303
return dataset_dict
273304

274-
275305
def _create_dataloader(self, split, dataset):
276306
shuffle = self.shuffle if split == 'train' else False
307+
# Suppress the warning about num_workers.
308+
# In many cases, especially for PINNs, serial data loading can outperform parallel data loading.
309+
warnings.filterwarnings(
310+
"ignore",
311+
message=(
312+
r"The '(train|val|test)_dataloader' does not have many workers which may be a bottleneck."),
313+
module="lightning.pytorch.trainer.connectors.data_connector"
314+
)
277315
# Use custom batching (good if batch size is large)
278316
if self.batch_size is not None:
279-
sampler = PinaSampler(dataset, self.batch_size,
280-
shuffle, self.automatic_batching)
317+
sampler = PinaSampler(dataset, shuffle)
281318
if self.automatic_batching:
282319
collate = Collator(self.find_max_conditions_lengths(split))
283320

284321
else:
285322
collate = Collator(None, dataset)
286323
return DataLoader(dataset, self.batch_size,
287-
collate_fn=collate, sampler=sampler)
324+
collate_fn=collate, sampler=sampler,
325+
num_workers=self.num_workers)
288326
dataloader = DummyDataloader(dataset)
289327
dataloader.dataset = self._transfer_batch_to_device(
290328
dataloader.dataset, self.trainer.strategy.root_device, 0)

pina/trainer.py

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ def __init__(self,
1818
predict_size=0.,
1919
compile=None,
2020
automatic_batching=None,
21+
num_workers=None,
22+
pin_memory=None,
2123
**kwargs):
2224
"""
2325
PINA Trainer class for costumizing every aspect of training via flags.
@@ -44,6 +46,10 @@ def __init__(self,
4446
performed. Please avoid using automatic batching when batch_size is
4547
large, default False.
4648
:type automatic_batching: bool
49+
:param num_workers: Number of worker threads for data loading. Default 0 (serial loading)
50+
:type num_workers: int
51+
:param pin_memory: Whether to use pinned memory for faster data transfer to GPU. (Default False)
52+
:type pin_memory: bool
4753
4854
:Keyword Arguments:
4955
The additional keyword arguments specify the training setup
@@ -60,6 +66,14 @@ def __init__(self,
6066
check_consistency(automatic_batching, bool)
6167
if compile is not None:
6268
check_consistency(compile, bool)
69+
if pin_memory is not None:
70+
check_consistency(pin_memory, bool)
71+
else:
72+
pin_memory = False
73+
if num_workers is not None:
74+
check_consistency(pin_memory, int)
75+
else:
76+
num_workers = 0
6377
if train_size + test_size + val_size + predict_size > 1:
6478
raise ValueError('train_size, test_size, val_size and predict_size '
6579
'must sum up to 1.')
@@ -93,19 +107,16 @@ def __init__(self,
93107
compile = False
94108
if automatic_batching is None:
95109
automatic_batching = False
96-
110+
97111
# set attributes
98112
self.compile = compile
99-
self.automatic_batching = automatic_batching
100-
self.train_size = train_size
101-
self.test_size = test_size
102-
self.val_size = val_size
103-
self.predict_size = predict_size
104113
self.solver = solver
105114
self.batch_size = batch_size
106115
self._move_to_device()
107116
self.data_module = None
108-
self._create_loader()
117+
self._create_datamodule(train_size, test_size, val_size, predict_size,
118+
batch_size, automatic_batching, pin_memory,
119+
num_workers)
109120

110121
# logging
111122
self.logging_kwargs = {
@@ -127,7 +138,15 @@ def _move_to_device(self):
127138
pb.unknown_parameters[key] = torch.nn.Parameter(
128139
pb.unknown_parameters[key].data.to(device))
129140

130-
def _create_loader(self):
141+
def _create_datamodule(self,
142+
train_size,
143+
test_size,
144+
val_size,
145+
predict_size,
146+
batch_size,
147+
automatic_batching,
148+
pin_memory,
149+
num_workers):
131150
"""
132151
This method is used here because is resampling is needed
133152
during training, there is no need to define to touch the
@@ -136,21 +155,23 @@ def _create_loader(self):
136155
if not self.solver.problem.are_all_domains_discretised:
137156
error_message = '\n'.join([
138157
f"""{" " * 13} ---> Domain {key} {
139-
"sampled" if key in self.solver.problem.discretised_domains else
140-
"not sampled"}""" for key in
158+
"sampled" if key in self.solver.problem.discretised_domains else
159+
"not sampled"}""" for key in
141160
self.solver.problem.domains.keys()
142161
])
143162
raise RuntimeError('Cannot create Trainer if not all conditions '
144163
'are sampled. The Trainer got the following:\n'
145164
f'{error_message}')
146165
self.data_module = PinaDataModule(
147166
self.solver.problem,
148-
train_size=self.train_size,
149-
test_size=self.test_size,
150-
val_size=self.val_size,
151-
predict_size=self.predict_size,
152-
batch_size=self.batch_size,
153-
automatic_batching=self.automatic_batching)
167+
train_size=train_size,
168+
test_size=test_size,
169+
val_size=val_size,
170+
predict_size=predict_size,
171+
batch_size=batch_size,
172+
automatic_batching=automatic_batching,
173+
num_workers=num_workers,
174+
pin_memory=pin_memory)
154175

155176
def train(self, **kwargs):
156177
"""

0 commit comments

Comments
 (0)