Skip to content

Commit 208d438

Browse files
committed
optimized tqdm to finish everything in a single loop
1 parent 46d98c9 commit 208d438

File tree

2 files changed

+90
-50
lines changed

2 files changed

+90
-50
lines changed

nbs/project/experiments.ipynb

Lines changed: 46 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -378,25 +378,50 @@
378378
" if name_prefix:\n",
379379
" name = f\"{name_prefix}-{name}\"\n",
380380
"\n",
381-
" # Create tasks for all items\n",
382-
" tasks = []\n",
383-
" for item in dataset:\n",
384-
" tasks.append(wrapped_experiment(item))\n",
385-
"\n",
386-
" # Use as_completed with tqdm for progress tracking\n",
387-
" results = []\n",
388-
" for future in tqdm(asyncio.as_completed(tasks), total=len(tasks)):\n",
389-
" result = await future\n",
390-
" # Add each result to experiment view as it completes\n",
391-
" if result is not None:\n",
392-
" results.append(result)\n",
393-
"\n",
394-
" # upload results to experiment view\n",
395-
" experiment_view = self.create_experiment(name=name, model=experiment_model)\n",
396-
" for result in results:\n",
397-
" experiment_view.append(result)\n",
398-
"\n",
399-
" return experiment_view\n",
381+
" experiment_view = None\n",
382+
" try:\n",
383+
" # Create the experiment view upfront\n",
384+
" experiment_view = self.create_experiment(name=name, model=experiment_model)\n",
385+
" \n",
386+
" # Create tasks for all items\n",
387+
" tasks = []\n",
388+
" for item in dataset:\n",
389+
" tasks.append(wrapped_experiment(item))\n",
390+
"\n",
391+
" # Calculate total operations (processing + appending)\n",
392+
" total_operations = len(tasks) * 2 # Each item requires processing and appending\n",
393+
" \n",
394+
" # Use tqdm for combined progress tracking\n",
395+
" results = []\n",
396+
" progress_bar = tqdm(total=total_operations, desc=\"Running experiment\")\n",
397+
" \n",
398+
" # Process all items\n",
399+
" for future in asyncio.as_completed(tasks):\n",
400+
" result = await future\n",
401+
" if result is not None:\n",
402+
" results.append(result)\n",
403+
" progress_bar.update(1) # Update for task completion\n",
404+
" \n",
405+
" # Append results to experiment view\n",
406+
" for result in results:\n",
407+
" experiment_view.append(result)\n",
408+
" progress_bar.update(1) # Update for append operation\n",
409+
" \n",
410+
" progress_bar.close()\n",
411+
" return experiment_view\n",
412+
" \n",
413+
" except Exception as e:\n",
414+
" # Clean up the experiment if there was an error and it was created\n",
415+
" if experiment_view is not None:\n",
416+
" try:\n",
417+
" # Delete the experiment (you might need to implement this method)\n",
418+
" sync_version = async_to_sync(self._ragas_api_client.delete_experiment)\n",
419+
" sync_version(project_id=self.project_id, experiment_id=experiment_view.experiment_id)\n",
420+
" except Exception as cleanup_error:\n",
421+
" print(f\"Failed to clean up experiment after error: {cleanup_error}\")\n",
422+
" \n",
423+
" # Re-raise the original exception\n",
424+
" raise e\n",
400425
"\n",
401426
" wrapped_experiment.__setattr__(\"run_async\", run_async)\n",
402427
" return t.cast(ExperimentProtocol, wrapped_experiment)\n",
@@ -431,7 +456,6 @@
431456
"# create a test experiment function\n",
432457
"@p.experiment(TextExperimentModel)\n",
433458
"async def test_experiment(item: TestModel):\n",
434-
" print(item)\n",
435459
" return TextExperimentModel(**item.model_dump(), response=\"test response\", is_correct=\"yes\")\n"
436460
]
437461
},
@@ -444,22 +468,13 @@
444468
"name": "stderr",
445469
"output_type": "stream",
446470
"text": [
447-
"100%|██████████| 3/3 [00:00<00:00, 7752.87it/s]\n"
448-
]
449-
},
450-
{
451-
"name": "stdout",
452-
"output_type": "stream",
453-
"text": [
454-
"name='test item 2' description='test item 2 description' price=200.0\n",
455-
"name='test item 1' description='test item 1 description' price=100.0\n",
456-
"name='test item 3' description='test item 3 description' price=300.0\n"
471+
"Running experiment: 100%|██████████| 6/6 [00:01<00:00, 3.84it/s]\n"
457472
]
458473
},
459474
{
460475
"data": {
461476
"text/plain": [
462-
"Experiment(name=keen_backus, model=TextExperimentModel)"
477+
"Experiment(name=gallant_torvalds, model=TextExperimentModel)"
463478
]
464479
},
465480
"execution_count": null,

ragas_experimental/project/experiments.py

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -162,25 +162,50 @@ async def run_async(dataset: Dataset, name: t.Optional[str] = None):
162162
if name_prefix:
163163
name = f"{name_prefix}-{name}"
164164

165-
# Create tasks for all items
166-
tasks = []
167-
for item in dataset:
168-
tasks.append(wrapped_experiment(item))
169-
170-
# Use as_completed with tqdm for progress tracking
171-
results = []
172-
for future in tqdm(asyncio.as_completed(tasks), total=len(tasks)):
173-
result = await future
174-
# Add each result to experiment view as it completes
175-
if result is not None:
176-
results.append(result)
177-
178-
# upload results to experiment view
179-
experiment_view = self.create_experiment(name=name, model=experiment_model)
180-
for result in results:
181-
experiment_view.append(result)
182-
183-
return experiment_view
165+
experiment_view = None
166+
try:
167+
# Create the experiment view upfront
168+
experiment_view = self.create_experiment(name=name, model=experiment_model)
169+
170+
# Create tasks for all items
171+
tasks = []
172+
for item in dataset:
173+
tasks.append(wrapped_experiment(item))
174+
175+
# Calculate total operations (processing + appending)
176+
total_operations = len(tasks) * 2 # Each item requires processing and appending
177+
178+
# Use tqdm for combined progress tracking
179+
results = []
180+
progress_bar = tqdm(total=total_operations, desc="Running experiment")
181+
182+
# Process all items
183+
for future in asyncio.as_completed(tasks):
184+
result = await future
185+
if result is not None:
186+
results.append(result)
187+
progress_bar.update(1) # Update for task completion
188+
189+
# Append results to experiment view
190+
for result in results:
191+
experiment_view.append(result)
192+
progress_bar.update(1) # Update for append operation
193+
194+
progress_bar.close()
195+
return experiment_view
196+
197+
except Exception as e:
198+
# Clean up the experiment if there was an error and it was created
199+
if experiment_view is not None:
200+
try:
201+
# Delete the experiment (you might need to implement this method)
202+
sync_version = async_to_sync(self._ragas_api_client.delete_experiment)
203+
sync_version(project_id=self.project_id, experiment_id=experiment_view.experiment_id)
204+
except Exception as cleanup_error:
205+
print(f"Failed to clean up experiment after error: {cleanup_error}")
206+
207+
# Re-raise the original exception
208+
raise e
184209

185210
wrapped_experiment.__setattr__("run_async", run_async)
186211
return t.cast(ExperimentProtocol, wrapped_experiment)

0 commit comments

Comments
 (0)