Skip to content

Commit 06537b4

Browse files
committed
Add configurable concurrency parameters to synthesis and evaluation services
- Added max_concurrent_topics field to SynthesisRequest (1-100, default: 5) - Added max_workers field to EvaluationRequest (1-100, default: 4) - Updated all synthesis services to use request.max_concurrent_topics - Updated all evaluator services to use request.max_workers - Added validation constraints to prevent invalid values - Updated example payloads in main.py to include new parameters - Services now respect API-configurable concurrency limits while maintaining defaults This allows users to optimize performance based on their infrastructure and workload requirements via API parameters.
1 parent 00eb07a commit 06537b4

File tree

6 files changed

+35
-14
lines changed

6 files changed

+35
-14
lines changed

app/main.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1262,6 +1262,7 @@ async def get_example_payloads(use_case:UseCase):
12621262
"technique": "sft",
12631263
"topics": ["python_basics", "data_structures"],
12641264
"is_demo": True,
1265+
"max_concurrent_topics": 5,
12651266
"examples": [
12661267
{
12671268
"question": "How do you create a list in Python and add elements to it?",
@@ -1288,6 +1289,7 @@ async def get_example_payloads(use_case:UseCase):
12881289
"technique": "sft",
12891290
"topics": ["basic_queries", "joins"],
12901291
"is_demo": True,
1292+
"max_concurrent_topics": 5,
12911293
"schema": "CREATE TABLE users (id INT PRIMARY KEY, name VARCHAR(100), email VARCHAR(255));\nCREATE TABLE orders (id INT PRIMARY KEY, user_id INT, amount DECIMAL(10,2), FOREIGN KEY (user_id) REFERENCES users(id));",
12921294
"examples":[
12931295
{
@@ -1316,6 +1318,7 @@ async def get_example_payloads(use_case:UseCase):
13161318
"topics": ["topic 1", "topic 2"],
13171319
"custom_prompt": "Give your instructions here",
13181320
"is_demo": True,
1321+
"max_concurrent_topics": 5,
13191322

13201323
"examples":[
13211324
{

app/models/request_models.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,13 @@ class SynthesisRequest(BaseModel):
138138
example_path: Optional[str] = None
139139
schema: Optional[str] = None # Added schema field
140140
custom_prompt: Optional[str] = None
141-
display_name: Optional[str] = None
141+
display_name: Optional[str] = None
142+
max_concurrent_topics: Optional[int] = Field(
143+
default=5,
144+
ge=1,
145+
le=100,
146+
description="Maximum number of concurrent topics to process (1-100)"
147+
)
142148

143149
# Optional model parameters with defaults
144150
model_params: Optional[ModelParameters] = Field(
@@ -156,7 +162,7 @@ class SynthesisRequest(BaseModel):
156162
"technique": "sft",
157163
"topics": ["python_basics", "data_structures"],
158164
"is_demo": True,
159-
165+
"max_concurrent_topics": 5
160166

161167
}
162168
}
@@ -209,6 +215,12 @@ class EvaluationRequest(BaseModel):
209215
display_name: Optional[str] = None
210216
output_key: Optional[str] = 'Prompt'
211217
output_value: Optional[str] = 'Completion'
218+
max_workers: Optional[int] = Field(
219+
default=4,
220+
ge=1,
221+
le=100,
222+
description="Maximum number of worker threads for parallel evaluation (1-100)"
223+
)
212224

213225
# Export configuration
214226
export_type: str = "local" # "local" or "s3"
@@ -227,7 +239,8 @@ class EvaluationRequest(BaseModel):
227239
"inference_type": "aws_bedrock",
228240
"import_path": "qa_pairs_llama3-1-70b-instruct-v1:0_20241114_212837_test.json",
229241
"import_type": "local",
230-
"export_type":"local"
242+
"export_type":"local",
243+
"max_workers": 4
231244

232245
}
233246
}

app/services/evaluator_legacy_service.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class EvaluatorLegacyService:
2424
def __init__(self, max_workers: int = 4):
2525
self.bedrock_client = get_bedrock_client()
2626
self.db = DatabaseManager()
27-
self.max_workers = max_workers
27+
self.max_workers = max_workers # Default max workers (configurable via request)
2828
self.guard = ContentGuardrail()
2929
self._setup_logging()
3030

@@ -155,7 +155,8 @@ def evaluate_topic(self, topic: str, qa_pairs: List[Dict], model_handler, reques
155155
failed_pairs = []
156156

157157
try:
158-
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
158+
max_workers = request.max_workers or self.max_workers
159+
with ThreadPoolExecutor(max_workers=max_workers) as executor:
159160
try:
160161
evaluate_func = partial(
161162
self.evaluate_single_pair,
@@ -287,8 +288,9 @@ def evaluate_results(self, request: EvaluationRequest, job_name=None,is_demo: bo
287288
# Add to appropriate topic list
288289
transformed_data['results'][topic].append(qa_pair)
289290

290-
self.logger.info(f"Processing {len(transformed_data['results'])} topics with {self.max_workers} workers")
291-
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
291+
max_workers = request.max_workers or self.max_workers
292+
self.logger.info(f"Processing {len(transformed_data['results'])} topics with {max_workers} workers")
293+
with ThreadPoolExecutor(max_workers=max_workers) as executor:
292294
future_to_topic = {
293295
executor.submit(
294296
self.evaluate_topic,

app/services/evaluator_service.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121
class EvaluatorService:
2222
"""Service for evaluating freeform data rows using Claude with parallel processing (Freeform technique only)"""
2323

24-
def __init__(self, max_workers: int = 4):
24+
def __init__(self, max_workers: int = 5):
2525
self.bedrock_client = get_bedrock_client()
2626
self.db = DatabaseManager()
27-
self.max_workers = max_workers
27+
self.max_workers = max_workers # Default max workers (configurable via request)
2828
self.guard = ContentGuardrail()
2929
self._setup_logging()
3030

@@ -143,7 +143,8 @@ def evaluate_rows(self, rows: List[Dict[str, Any]], model_handler, request: Eval
143143
failed_rows = []
144144

145145
try:
146-
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
146+
max_workers = request.max_workers or self.max_workers
147+
with ThreadPoolExecutor(max_workers=max_workers) as executor:
147148
try:
148149
evaluate_func = partial(
149150
self.evaluate_single_row,

app/services/synthesis_legacy_service.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
class SynthesisLegacyService:
3737
"""Legacy service for generating synthetic QA pairs (SFT and Custom_Workflow only)"""
3838
QUESTIONS_PER_BATCH = 5 # Maximum questions per batch
39-
MAX_CONCURRENT_TOPICS = 5 # Limit concurrent I/O operations
39+
MAX_CONCURRENT_TOPICS = 5 # Default limit for concurrent I/O operations (configurable via request)
4040

4141

4242
def __init__(self):
@@ -313,7 +313,8 @@ async def generate_examples(self, request: SynthesisRequest , job_name = None, i
313313

314314
# Create thread pool
315315
loop = asyncio.get_event_loop()
316-
with ThreadPoolExecutor(max_workers=self.MAX_CONCURRENT_TOPICS) as executor:
316+
max_workers = request.max_concurrent_topics or self.MAX_CONCURRENT_TOPICS
317+
with ThreadPoolExecutor(max_workers=max_workers) as executor:
317318
topic_futures = [
318319
loop.run_in_executor(
319320
executor,

app/services/synthesis_service.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
class SynthesisService:
3737
"""Service for generating synthetic freeform data (Freeform technique only)"""
3838
QUESTIONS_PER_BATCH = 5 # Maximum questions per batch
39-
MAX_CONCURRENT_TOPICS = 5 # Limit concurrent I/O operations
39+
MAX_CONCURRENT_TOPICS = 5 # Default limit for concurrent I/O operations (configurable via request)
4040

4141

4242
def __init__(self):
@@ -368,7 +368,8 @@ async def generate_freeform(self, request: SynthesisRequest, job_name=None, is_d
368368

369369
# Create thread pool
370370
loop = asyncio.get_event_loop()
371-
with ThreadPoolExecutor(max_workers=self.MAX_CONCURRENT_TOPICS) as executor:
371+
max_workers = request.max_concurrent_topics or self.MAX_CONCURRENT_TOPICS
372+
with ThreadPoolExecutor(max_workers=max_workers) as executor:
372373
topic_futures = [
373374
loop.run_in_executor(
374375
executor,

0 commit comments

Comments
 (0)