Skip to content

Commit cb36134

Browse files
committed
feat: Include functional test and Improve GPU and testing mode support in subset selection
- Add testing_mode configuration to SystemConfig - Enhance GPU and device handling in subset selection - Modify process_folds_with_gpu to support CPU fallback in testing mode - Remove query_description from EncoderConfig - Update get_default_num_gpus to handle testing scenarios - Increase batch size for similarity matrix computation - Improve error handling and logging for GPU/device availability Signed-off-by: eshwarprasadS <eshwarprasad.s01@gmail.com>
1 parent 27af55f commit cb36134

File tree

4 files changed

+169
-29
lines changed

4 files changed

+169
-29
lines changed

src/instructlab/sdg/encoders/arctic_encoder.py

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -185,18 +185,6 @@ def encode(
185185

186186
return embeddings if return_tensors else embeddings.numpy()
187187

188-
def encode_queries(
189-
self, queries: Union[str, List[str]], instruction: str = "", **kwargs
190-
) -> Union[torch.Tensor, np.ndarray]:
191-
"""Specialized method for encoding queries."""
192-
return self.encode(queries, instruction=instruction, **kwargs)
193-
194-
def encode_corpus(
195-
self, corpus: Union[str, List[str]], instruction: str = "", **kwargs
196-
) -> Union[torch.Tensor, np.ndarray]:
197-
"""Specialized method for encoding corpus documents."""
198-
return self.encode(corpus, instruction=instruction, **kwargs)
199-
200188

201189
def cleanup():
202190
if dist.is_initialized():

src/instructlab/sdg/subset_selection.py

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ class EncoderConfig:
8686
default="Generate embeddings that capture the core meaning of user-assistant conversations, ensuring the embeddings can be clustered based on semantic similarity for subset selection.",
8787
metadata={"advanced": True},
8888
)
89-
query_description: str = field(default="Conversation", metadata={"advanced": True})
9089
encoder_type: str = field(default="arctic", metadata={"advanced": True})
9190
encoder_model: str = field(
9291
default="Snowflake/snowflake-arctic-embed-l-v2.0", metadata={"advanced": True}
@@ -112,12 +111,15 @@ class TemplateConfig:
112111
class SystemConfig:
113112
"""System-related configuration parameters."""
114113

115-
num_gpus: int = field(
116-
default_factory=get_default_num_gpus, metadata={"advanced": True}
117-
)
114+
num_gpus: int = field(init=False) # Don't initialize in __init__
118115
seed: int = field(default=42, metadata={"advanced": True})
119116
max_retries: int = field(default=3, metadata={"advanced": True})
120117
retry_delay: int = field(default=30, metadata={"advanced": True})
118+
testing_mode: bool = field(default=False, metadata={"advanced": True})
119+
120+
def __post_init__(self):
121+
"""Initialize num_gpus after other fields are set."""
122+
self.num_gpus = get_default_num_gpus(testing_mode=self.testing_mode)
121123

122124

123125
@dataclass
@@ -302,7 +304,6 @@ def process_batch(self, batch_texts: List[str], output_file: str) -> Optional[in
302304
self.encoder.encode(
303305
inputs=batch_texts,
304306
instruction=self.config.encoder.instruction,
305-
query_description=self.config.encoder.query_description,
306307
)
307308
.cpu()
308309
.numpy()
@@ -545,6 +546,7 @@ def select_subsets(
545546
self.config.subset_sizes,
546547
len(embeddings), # Pass total samples for absolute size calculation
547548
self.config.basic.epsilon,
549+
self.config.system.testing_mode, # Explicitly pass testing_mode
548550
)
549551
)
550552
start_fold = end_fold
@@ -731,12 +733,30 @@ def _save_subset(self, subset_data, output_file: str, input_file: str):
731733

732734
def process_folds_with_gpu(args):
733735
"""
734-
Process folds on GPU with support for both percentage and absolute size specifications.
736+
Process folds on GPU or CPU with support for both percentage and absolute size specifications.
735737
"""
736-
gpu_id, gpu_folds_info, embeddings, subset_sizes, total_samples, epsilon = args
738+
(
739+
gpu_id,
740+
gpu_folds_info,
741+
embeddings,
742+
subset_sizes,
743+
total_samples,
744+
epsilon,
745+
testing_mode,
746+
) = args
747+
737748
try:
738-
torch.cuda.set_device(gpu_id)
739-
device = f"cuda:{gpu_id}"
749+
if torch.cuda.is_available():
750+
torch.cuda.set_device(gpu_id)
751+
device = f"cuda:{gpu_id}"
752+
else:
753+
if not testing_mode:
754+
raise RuntimeError("GPU processing required but CUDA is not available")
755+
logger.warning(
756+
"Running in CPU mode for testing. Production use requires GPU acceleration."
757+
)
758+
device = "cpu"
759+
740760
results = []
741761
for fold_idx, fold_indices in gpu_folds_info:
742762
try:
@@ -747,7 +767,7 @@ def process_folds_with_gpu(args):
747767
logger.info(f"Computing similarity matrix for fold {fold_idx + 1}")
748768
max_sim_mat = compute_pairwise_dense(
749769
fold_embeddings,
750-
batch_size=50,
770+
batch_size=50000,
751771
metric="cosine",
752772
device=device,
753773
scaling="additive",
@@ -848,18 +868,21 @@ def get_encoder_class(encoder_type: str):
848868

849869

850870
def subset_datasets(
851-
input_files: List[str], subset_sizes: List[Union[int, float]], **kwargs: Any
871+
input_files: List[str],
872+
subset_sizes: List[Union[int, float]],
873+
testing_mode: bool = False,
874+
**kwargs: Any,
852875
) -> None:
853876
"""Create subsets of datasets using facility location for diverse subset selection."""
854877

855878
# Get system's available GPU count
856-
available_gpus = get_default_num_gpus()
879+
available_gpus = get_default_num_gpus(testing_mode=testing_mode)
857880

858881
# Create configuration groups
859882
basic_config = BasicConfig()
860883
encoder_config = EncoderConfig()
861884
template_config = TemplateConfig()
862-
system_config = SystemConfig()
885+
system_config = SystemConfig(testing_mode=testing_mode)
863886

864887
# Update configuration groups from kwargs
865888
for key, value in kwargs.items():
@@ -892,7 +915,6 @@ def subset_datasets(
892915

893916
try:
894917
logger.info(f"Processing configuration: {config}")
895-
896918
processor = DataProcessor(
897919
config, get_encoder_class(config.encoder.encoder_type)
898920
)
@@ -905,4 +927,5 @@ def subset_datasets(
905927
finally:
906928
# Cleanup
907929
gc.collect()
908-
torch.cuda.empty_cache()
930+
if torch.cuda.is_available():
931+
torch.cuda.empty_cache()

src/instructlab/sdg/utils/subset_selection_utils.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,20 @@ def wrapper(self, *args, **kwargs):
6262
return wrapper
6363

6464

65-
def get_default_num_gpus() -> int:
66-
"""Get the default number of GPUs based on available CUDA devices."""
65+
def get_default_num_gpus(testing_mode: bool = False) -> int:
66+
"""
67+
Get the default number of GPUs based on available CUDA devices.
68+
69+
Args:
70+
testing_mode (bool): If True, allows CPU usage with warnings. For testing only.
71+
"""
6772
if not torch.cuda.is_available():
73+
if testing_mode:
74+
logger.warning(
75+
"No CUDA devices detected. Running in testing mode with CPU. "
76+
"Production use requires GPU acceleration."
77+
)
78+
return 1
6879
raise RuntimeError(
6980
"No CUDA devices detected. This functionality requires at least one GPU."
7081
)
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
# Standard
2+
from multiprocessing import set_start_method
3+
from pathlib import Path
4+
import json
5+
import logging
6+
import os
7+
import tempfile
8+
import uuid
9+
10+
# Third Party
11+
from datasets import Dataset
12+
import pytest
13+
import torch
14+
15+
# First Party
16+
from instructlab.sdg.subset_selection import subset_datasets
17+
18+
19+
def create_test_data(num_samples=50):
20+
"""Create synthetic conversation data similar to the real dataset."""
21+
test_data = []
22+
23+
# Create conversation examples
24+
topics = ["stars", "galaxies", "planets", "nebulae", "black holes"]
25+
for i in range(num_samples):
26+
topic = topics[i % len(topics)]
27+
conversation = {
28+
"messages": [
29+
{"content": "", "role": "system"},
30+
{
31+
"content": f"Document:\nThis is a test document about {topic} in astronomy.\nIt contains synthetic data for testing purposes.\nThe document discusses various properties of {topic}.\n\nWhat are the main characteristics of {topic}?",
32+
"role": "user",
33+
},
34+
{
35+
"content": f"This is a test response about {topic} characteristics.",
36+
"role": "assistant",
37+
},
38+
],
39+
"metadata": json.dumps(
40+
{
41+
"sdg_document": f"Test document about {topic}",
42+
"domain": "astronomy",
43+
"dataset": "test_dataset",
44+
"dataset_type": "test",
45+
}
46+
),
47+
"id": str(uuid.uuid4()),
48+
}
49+
test_data.append(conversation)
50+
51+
return test_data
52+
53+
54+
def test_subset_datasets_functional():
55+
"""Functional test for subset_datasets."""
56+
set_start_method("spawn", force=True)
57+
logger = logging.getLogger(__name__)
58+
59+
try:
60+
# Create a temporary directory for input/output
61+
with tempfile.TemporaryDirectory() as temp_dir:
62+
# Generate synthetic test data
63+
test_data = create_test_data(num_samples=50)
64+
65+
# Save as JSONL file
66+
input_file = Path(temp_dir) / "test_data.jsonl"
67+
with open(input_file, "w") as f:
68+
for item in test_data:
69+
f.write(json.dumps(item) + "\n")
70+
71+
logger.info(f"Created test file with {len(test_data)} samples")
72+
73+
# Configure subset selection
74+
input_files = [str(input_file)]
75+
output_dir = os.path.join(temp_dir, "output")
76+
77+
# Run subset selection
78+
subset_datasets(
79+
input_files=input_files,
80+
output_dir=output_dir,
81+
batch_size=10, # Small batch size for testing
82+
num_folds=2, # Fewer folds for faster testing
83+
subset_sizes=[20], # Select 20 samples
84+
num_gpus=2, # Use 2 threads
85+
encoder_type="arctic",
86+
encoder_model="Snowflake/snowflake-arctic-embed-l-v2.0",
87+
epsilon=0.1, # Small epsilon for small dataset
88+
testing_mode=True, # Enable testing mode
89+
)
90+
91+
# Verify outputs
92+
dataset_name = "test_data"
93+
dataset_output_dir = os.path.join(output_dir, dataset_name)
94+
95+
# Check if embeddings were generated
96+
assert os.path.exists(
97+
os.path.join(dataset_output_dir, "embeddings", "embeddings.h5")
98+
), "Embeddings file not found"
99+
100+
# Check if subset file was created
101+
assert os.path.exists(
102+
os.path.join(
103+
dataset_output_dir, f"{dataset_name}_samples_20_subset.jsonl"
104+
)
105+
), "20-sample subset file not found"
106+
107+
# Check if metadata file was created
108+
assert os.path.exists(
109+
os.path.join(
110+
output_dir,
111+
f"{dataset_name}_fl_2_partitions_samples_20_metadata.npz",
112+
)
113+
), "Metadata file for 20-sample subset not found"
114+
115+
finally:
116+
# Clean up GPU memory if available
117+
if torch.cuda.is_available():
118+
torch.cuda.empty_cache()

0 commit comments

Comments
 (0)