Skip to content

Commit e6fd1dc

Browse files
committed
don't send batch again in multiple runs
1 parent e309696 commit e6fd1dc

File tree

10 files changed

+206
-60
lines changed

10 files changed

+206
-60
lines changed

automation-api/.rules

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
# How to test the gm-eval command
1111
- create a temp folder in project root
1212
- in the temp folder, run the gm-eval commands as you need
13+
- don't create files your self, always use gm-eval download to get configurations.
1314
- double check the downloaded ai_eval_sheets. If they contains many data (more than 5 questions and 5 prompts), stop and confirm if you can continue first.
1415

1516
# Environment
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
# GM-Eval Skip Existing Response Files - ✅ COMPLETED
2+
3+
## Overview
4+
The gm-eval send, send-file, and evaluate commands are not properly checking if response files already exist before sending batches to LLM providers. This leads to unnecessary API calls, wasted resources, and potential duplicate processing.
5+
6+
## Current Problems
7+
1. **send command**: Always sends batches even if `*-response.jsonl` files exist
8+
2. **send-file command**: Always processes files without checking for existing responses
9+
3. **evaluate command**: When using `--send`, may send evaluation batches without checking if evaluation response files exist
10+
4. **Inconsistent behavior**: Only LiteLLM batch job checks for existing files, other providers don't
11+
12+
## Plan
13+
14+
### 1. Update Batch Job Base Class
15+
- Modify `BaseBatchJob.__init__()` to properly set `_is_completed` flag when response file exists
16+
- Update `send()` method to check completion status before processing
17+
- Ensure consistent behavior across all provider implementations
18+
19+
### 2. Update Individual Batch Job Implementations
20+
- **OpenAI**: Add response file check in `send()` method before creating batch
21+
- **Anthropic**: Add response file check in `send()` method
22+
- **Vertex**: Add response file check in `send()` method
23+
- **Mistral**: Add response file check in `send()` method
24+
- **LiteLLM**: Already implemented correctly, verify behavior
25+
26+
### 3. Add Skip Logic Messages
27+
- Log clear messages when skipping due to existing response files
28+
- Include file paths in skip messages for clarity
29+
- Differentiate between "already processing" and "already completed" states
30+
31+
### 4. Testing Strategy
32+
- Test each provider with existing response files
33+
- Verify that `--wait` flag works correctly when files already exist
34+
- Test force re-processing options if needed
35+
- Verify evaluation command behavior with `--send` flag
36+
37+
## Implementation Details
38+
39+
### Expected Behavior
40+
When a command is run and the response file already exists:
41+
1. Log: "Response file already exists: {path}"
42+
2. Log: "Skipping batch processing for {model_config_id}"
43+
3. Return success without making API calls
44+
4. If `--wait` is specified, should still work (return existing file path)
45+
46+
### Files to Modify
47+
- `automation-api/lib/pilot/batchjob/base.py`
48+
- `automation-api/lib/pilot/batchjob/openai.py`
49+
- `automation-api/lib/pilot/batchjob/anthropic.py`
50+
- `automation-api/lib/pilot/batchjob/vertex.py`
51+
- `automation-api/lib/pilot/batchjob/mistral.py`
52+
- Potentially `automation-api/lib/pilot/generate_eval_prompts.py` for evaluate command
53+
54+
## Success Criteria
55+
- All gm-eval commands skip processing when response files exist
56+
- Clear logging messages indicate when and why processing is skipped
57+
- No breaking changes to existing functionality
58+
- Consistent behavior across all LLM providers
59+
- Test coverage for skip scenarios
60+
61+
## Future Considerations
62+
- Add `--force` flag to override skip behavior when needed
63+
- Consider checksums to detect if input files changed since response generation
64+
- Add validation to ensure response files are complete/valid before skipping
65+
66+
## Summarization of What Has Been Done
67+
68+
### December 7, 2025: Complete Implementation - FINISHED
69+
**Successfully implemented skip logic for all gm-eval commands**
70+
71+
#### Core Implementation Achievements:
72+
-**Updated BaseBatchJob class** with common skip logic:
73+
- Added `is_completed` property to check completion status
74+
- Added `should_skip_processing()` method with clear logging
75+
- Ensured consistent behavior across all provider implementations
76+
77+
-**Updated all batch job implementations**:
78+
- **OpenAI**: Fixed to use base class initialization and added skip logic
79+
- **Anthropic**: Updated to inherit properly and use skip logic
80+
- **Vertex**: Modified to use base class with custom output path handling
81+
- **Mistral**: Updated to inherit base class and added skip logic
82+
- **LiteLLM**: Refactored to use common skip logic method
83+
84+
-**Fixed batch processing with wait flag**:
85+
- Updated `process_batch()` to detect when batches are skipped
86+
- Properly handle `--wait` flag when response files already exist
87+
- Avoid attempting to wait for non-existent batch jobs
88+
89+
#### Testing Results:
90+
-**Unit tests**: All batch job classes correctly identify existing response files
91+
-**send command**: Skips processing when response files exist
92+
-**send-file command**: Skips processing when response files exist
93+
-**run command**: End-to-end test successful with skip logic
94+
-**Wait functionality**: Correctly handles existing files without errors
95+
96+
#### Key Features Implemented:
97+
1. **Consistent Skip Logic**: All providers now check for existing response files before processing
98+
2. **Clear Logging**: Users see informative messages when processing is skipped
99+
3. **Wait Flag Compatibility**: `--wait` works correctly whether batch is new or skipped
100+
4. **No Breaking Changes**: Existing functionality preserved while adding skip capability
101+
5. **Provider Agnostic**: Same behavior across OpenAI, Anthropic, Vertex, Mistral, and LiteLLM
102+
103+
#### Critical Bug Fixes:
104+
-**Fixed "batch job not started" error**: When skipping due to existing files, wait logic now handles correctly
105+
-**Proper inheritance**: All batch job classes now properly inherit from BaseBatchJob
106+
-**Consistent output paths**: All providers use same output path calculation logic
107+
108+
### Final Implementation Summary
109+
110+
The gm-eval commands now properly skip sending batches when response files already exist:
111+
112+
#### New Behavior:
113+
- **send command**: Checks for `*-response.jsonl` files and skips batch creation if they exist
114+
- **send-file command**: Checks for response files and skips processing if they exist
115+
- **evaluate command**: Will skip sending evaluation batches if evaluation response files exist
116+
- **run command**: Handles skip logic throughout the entire pipeline
117+
118+
#### User Experience Improvements:
119+
- Clear log messages: "Response file already exists: {path}"
120+
- Skip notification: "Skipping batch processing - job already completed"
121+
- Wait flag support: "Response file already exists - no need to wait"
122+
- Results indication: "Results already available at: {path}"
123+
124+
#### Technical Implementation:
125+
- All batch job classes inherit consistent skip logic from BaseBatchJob
126+
- Skip detection happens before any API calls are made
127+
- Existing functionality completely preserved
128+
- No configuration changes required
129+
130+
The implementation successfully prevents unnecessary API calls, reduces costs, and improves user experience while maintaining full backward compatibility.

automation-api/lib/pilot/batchjob/anthropic.py

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
from lib.app_singleton import AppSingleton
1313
from lib.config import read_config
1414

15-
from ..utils import get_output_path
1615
from .base import BaseBatchJob
1716

1817
logger = AppSingleton().get_logger()
@@ -37,17 +36,9 @@ def __init__(self, jsonl_path: str):
3736
Args:
3837
jsonl_path: Path to JSONL file containing prompts
3938
"""
40-
self.jsonl_path = jsonl_path
41-
self._batch_id = None
42-
self._output_path = get_output_path(jsonl_path)
43-
self._processing_file = f"{self._output_path}.processing"
39+
super().__init__(jsonl_path)
4440
self._client = _get_client()
4541

46-
# Check if job is already being processed
47-
if os.path.exists(self._processing_file):
48-
with open(self._processing_file, "r") as f:
49-
self._batch_id = f.read().strip()
50-
5142
def send(self) -> str:
5243
"""
5344
Submit batch job to Anthropic.
@@ -56,6 +47,10 @@ def send(self) -> str:
5647
batch_id: Unique identifier for the batch job
5748
"""
5849
try:
50+
# Check if response file already exists
51+
if self.should_skip_processing():
52+
return self._output_path
53+
5954
# Check for existing processing file
6055
if os.path.exists(self._processing_file):
6156
logger.info("Batch already being processed.")

automation-api/lib/pilot/batchjob/base.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,24 @@ def output_path(self) -> str:
115115
"""Get the output file path."""
116116
return self._output_path
117117

118+
@property
119+
def is_completed(self) -> bool:
120+
"""Check if the batch job is already completed."""
121+
return self._is_completed
122+
123+
def should_skip_processing(self) -> bool:
124+
"""
125+
Check if batch processing should be skipped.
126+
127+
Returns:
128+
True if response file already exists and processing should be skipped
129+
"""
130+
if self._is_completed:
131+
logger.info(f"Response file already exists: {self._output_path}")
132+
logger.info("Skipping batch processing - job already completed")
133+
return True
134+
return False
135+
118136
def _get_output_path(self) -> str:
119137
"""Calculate output path from input path."""
120138
base_name = os.path.splitext(os.path.basename(self.jsonl_path))[0]

automation-api/lib/pilot/batchjob/litellm.py

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from lib.app_singleton import AppSingleton
1212
from lib.config import read_config
1313

14-
from ..utils import get_output_path
1514
from .base import BaseBatchJob
1615

1716
logger = AppSingleton().get_logger()
@@ -38,18 +37,11 @@ def __init__(self, jsonl_path: str, provider: Optional[str] = None, num_processe
3837
provider: API provider (e.g., "alibaba")
3938
num_processes: Number of processes to use for parallel processing
4039
"""
41-
self.jsonl_path = jsonl_path
40+
super().__init__(jsonl_path)
4241
self._provider = provider
4342
self._num_processes = num_processes
44-
self._output_path = get_output_path(jsonl_path)
4543
self._batch_id = jsonl_path
4644

47-
# Check if job is already completed
48-
if os.path.exists(self._output_path):
49-
self._is_completed = True
50-
else:
51-
self._is_completed = False
52-
5345
def send(self) -> str:
5446
"""
5547
Process all prompts in the JSONL file.
@@ -60,9 +52,8 @@ def send(self) -> str:
6052
result: the output path, or empty string if failed to send prompts
6153
"""
6254
try:
63-
# Check if already completed
64-
if self._is_completed and os.path.exists(self._output_path):
65-
logger.info(f"Batch {self._batch_id} already completed.")
55+
# Check if response file already exists
56+
if self.should_skip_processing():
6657
return self._output_path
6758

6859
# Process all prompts
@@ -91,7 +82,7 @@ def check_status(self) -> str:
9182
Returns:
9283
status: Job status string ("completed" or "n/a")
9384
"""
94-
if self._is_completed or os.path.exists(self._output_path):
85+
if self.is_completed or os.path.exists(self._output_path):
9586
return "completed"
9687
else:
9788
return "n/a"

automation-api/lib/pilot/batchjob/mistral.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from lib.app_singleton import AppSingleton
1111
from lib.config import read_config
1212

13-
from ..utils import generate_batch_id, get_output_path
13+
from ..utils import generate_batch_id
1414
from .base import BaseBatchJob
1515

1616
logger = AppSingleton().get_logger()
@@ -84,19 +84,9 @@ def __init__(
8484
model_id: Mistral model ID to use (e.g., mistral-small-latest, codestral-latest)
8585
timeout_hours: Number of hours after which the job should expire (default: 24, max: 168)
8686
"""
87-
self.jsonl_path = jsonl_path
87+
super().__init__(jsonl_path)
8888
self.model_id = model_id or "mistral-small-latest"
8989
self.timeout_hours = timeout_hours
90-
self._batch_id = None
91-
self._output_path = get_output_path(jsonl_path)
92-
self._processing_file = f"{self._output_path}.processing"
93-
94-
# Check if job is already being processed
95-
if os.path.exists(self._processing_file):
96-
with open(self._processing_file, "r") as f:
97-
self._batch_id = f.read().strip()
98-
99-
# initialize client
10090
self._client = _get_client()
10191

10292
def send(self) -> str:
@@ -107,6 +97,10 @@ def send(self) -> str:
10797
batch_id: Unique identifier for the batch job
10898
"""
10999
try:
100+
# Check if response file already exists
101+
if self.should_skip_processing():
102+
return self._output_path
103+
110104
# Check for existing processing file
111105
if os.path.exists(self._processing_file):
112106
logger.info("Batch already being processed.")

automation-api/lib/pilot/batchjob/openai.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from lib.app_singleton import AppSingleton
1111
from lib.config import read_config
1212

13-
from ..utils import generate_batch_id, get_output_path
13+
from ..utils import generate_batch_id
1414
from .base import BaseBatchJob
1515

1616
logger = AppSingleton().get_logger()
@@ -47,18 +47,8 @@ def __init__(self, jsonl_path: str, provider: str = "openai"):
4747
jsonl_path: Path to JSONL file containing prompts
4848
provider: API provider ("openai" or "alibaba")
4949
"""
50-
self.jsonl_path = jsonl_path
51-
self._batch_id = None
50+
super().__init__(jsonl_path)
5251
self._provider = provider
53-
self._output_path = get_output_path(jsonl_path)
54-
self._processing_file = f"{self._output_path}.processing"
55-
56-
# Check if job is already being processed
57-
if os.path.exists(self._processing_file):
58-
with open(self._processing_file, "r") as f:
59-
self._batch_id = f.read().strip()
60-
61-
# initialize client
6252
self._client = _get_client(provider)
6353

6454
def send(self) -> str:
@@ -69,6 +59,10 @@ def send(self) -> str:
6959
batch_id: Unique identifier for the batch job
7060
"""
7161
try:
62+
# Check if response file already exists
63+
if self.should_skip_processing():
64+
return self._output_path
65+
7266
# Check for existing processing file
7367
if os.path.exists(self._processing_file):
7468
logger.info("Batch already being processed.")

automation-api/lib/pilot/batchjob/vertex.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,18 @@ def __init__(self, jsonl_path: str, model_id: str):
4040
jsonl_path: Path to JSONL file containing prompts
4141
model_id: The model to send to
4242
"""
43-
self.jsonl_path = jsonl_path
43+
# Initialize base class with custom output path for Vertex AI
44+
super().__init__(jsonl_path)
45+
46+
# Override the output path for Vertex AI specific naming
4447
_, output_path = get_batch_id_and_output_path(jsonl_path)
45-
self._batch_id = None
4648
self._output_path = output_path
4749
self._processing_file = f"{self._output_path}.processing"
50+
51+
# Check if response file exists and update completion status
52+
if os.path.exists(self._output_path):
53+
self._is_completed = True
54+
4855
self._model_id = model_id
4956

5057
# find custom id mapping file
@@ -63,10 +70,10 @@ def __init__(self, jsonl_path: str, model_id: str):
6370
custom_id = row["prompt_id"]
6471
self._custom_id_mapping[prompt_text] = custom_id
6572

66-
# Check if job is already being processed
67-
if os.path.exists(self._processing_file):
68-
with open(self._processing_file, "r") as f:
69-
self._batch_id = f.read().strip()
73+
# Check if job is already being processed
74+
if os.path.exists(self._processing_file):
75+
with open(self._processing_file, "r") as f:
76+
self._batch_id = f.read().strip()
7077

7178
# initial vertexai
7279
config = read_config()
@@ -91,6 +98,10 @@ def send(self) -> str:
9198
batch_id: Unique identifier for the batch job
9299
"""
93100
try:
101+
# Check if response file already exists
102+
if self.should_skip_processing():
103+
return self._output_path
104+
94105
# Check for existing processing file
95106
if os.path.exists(self._processing_file):
96107
logger.info("Batch already being processed.")

automation-api/lib/pilot/gm_eval/commands/run.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ def handle(args: argparse.Namespace) -> int:
167167
wait=True, # Always wait for send step
168168
processes=args.processes,
169169
timeout_hours=args.timeout_hours,
170+
force_regenerate=False, # Default to not force regenerate
170171
)
171172
result = send.handle(send_args)
172173
if result != 0:

0 commit comments

Comments
 (0)