-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathparallel_operation_example.py
More file actions
436 lines (355 loc) · 14 KB
/
parallel_operation_example.py
File metadata and controls
436 lines (355 loc) · 14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Unified interface for LLM providers using OpenAI format
# https://github.com/muxi-ai/onellm
#
# Copyright (C) 2025 Ran Aroussi
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
# ============================================================================ #
# OneLLM EXAMPLE: Parallel Operations
# ============================================================================ #
#
# This example demonstrates how to use OneLLM to achieve high throughput by
# processing multiple requests in parallel. This is particularly useful for:
#
# - Batch processing of many prompts/queries
# - Implementing efficient serverless functions or APIs
# - Reducing overall latency when making many independent LLM calls
# - Combining multiple model outputs (ensemble approaches)
#
# CODEBASE RELATIONSHIP:
# ----------------------
# This example leverages OneLLM's support for:
# - Asynchronous API (acreate methods)
# - Thread safety for concurrent usage
# - Provider-agnostic interface across multiple models
# - Usage data for performance tracking
#
# RELATED EXAMPLES:
# ----------------
# - chat_completion_example.py: Basic usage of the chat completion API
# - fallback_example.py: Using fallbacks for reliability
# - retry_example.py: Automatic retries for transient failures
#
# REQUIREMENTS:
# ------------
# - OneLLM
# - multitasking
# ============================================================================ #
"""
import os
import time
import asyncio
import multitasking
import concurrent.futures
from typing import List, Dict, Any
import onellm
from onellm import ChatCompletion
# Configure API key from environment variable
api_key = os.environ.get("OPENAI_API_KEY", "your-api-key-here")
onellm.api_key = api_key
# Sample data for processing
TOPICS = [
"Artificial Intelligence",
"Quantum Computing",
"Sustainable Energy",
"Space Exploration",
"Biotechnology",
"Neuroscience",
"Robotics",
"Blockchain",
"3D Printing",
"Virtual Reality"
]
# Example prompts that will be processed in parallel
QUERIES = [
f"Explain {topic} in simple terms. Keep it under 100 words."
for topic in TOPICS
]
# ========================== Async Approach ==========================
async def process_single_query_async(
query: str,
model: str = "openai/gpt-3.5-turbo"
) -> Dict[str, Any]:
"""
Process a single query asynchronously.
Args:
query: The text prompt to send to the LLM
model: The model identifier to use
Returns:
Dict containing the query and response
"""
print(f"Processing: {query[:30]}...")
# Create messages for chat completion
messages = [{"role": "user", "content": query}]
# Make the async API call
response = await ChatCompletion.acreate(
model=model,
messages=messages,
temperature=0.7,
max_tokens=150
)
# Extract the response text
response_text = response.choices[0].message["content"]
return {
"query": query,
"response": response_text,
"tokens": response.usage.total_tokens
}
async def process_all_queries_async(
queries: List[str],
model: str = "openai/gpt-3.5-turbo"
) -> List[Dict[str, Any]]:
"""
Process multiple queries concurrently using asyncio.gather.
This function demonstrates true concurrency for I/O-bound operations
by using asyncio.gather to send multiple requests at once.
Args:
queries: List of queries to process
model: Model identifier to use
Returns:
List of results from all queries
"""
# Create tasks for all queries
tasks = [process_single_query_async(query, model) for query in queries]
# Run all tasks concurrently and wait for all to complete
results = await asyncio.gather(*tasks)
return results
# ========================== Thread Pool Approach ==========================
@multitasking.task # Decorator from multitasking library
def process_in_thread(
query: str,
model: str,
results: List[Dict[str, Any]],
index: int
) -> None:
"""
Process a single query in a separate thread.
This function runs in a thread and stores the result in the shared results list.
Args:
query: The query to process
model: Model identifier to use
results: Shared list where results will be stored
index: Position in the results list for this query
"""
print(f"Thread processing: {query[:30]}...")
# Create messages for chat completion
messages = [{"role": "user", "content": query}]
try:
# Use the synchronous API since we're already in a separate thread
response = ChatCompletion.create(
model=model,
messages=messages,
temperature=0.7,
max_tokens=150
)
# Extract the response text
response_text = response.choices[0].message["content"]
# Store result at the specified index
results[index] = {
"query": query,
"response": response_text,
"tokens": response.usage.total_tokens
}
except Exception as e:
results[index] = {"query": query, "error": str(e)}
def process_all_queries_threaded(
queries: List[str],
model: str = "openai/gpt-3.5-turbo"
) -> List[Dict[str, Any]]:
"""
Process multiple queries concurrently using thread pool via multitasking.
This function uses the multitasking library to distribute work across threads
for improved throughput when dealing with multiple independent LLM requests.
Args:
queries: List of queries to process
model: Model identifier to use
Returns:
List of results from all queries
"""
# Set up multitasking to use a thread pool
multitasking.set_max_threads(min(10, len(queries)))
# Initialize results list with None placeholders
results = [None] * len(queries)
# Start processing each query in its own thread
for i, query in enumerate(queries):
process_in_thread(query, model, results, i)
# Wait for all threads to complete
multitasking.wait_for_tasks()
return results
# ========================== Post-Processing in Parallel ==========================
def analyze_response(result: Dict[str, Any]) -> Dict[str, Any]:
"""
Perform analysis on a single response (simulated CPU-bound task).
Args:
result: The response data to analyze
Returns:
Dictionary with the original result and added analysis
"""
# Simulate CPU-bound work
time.sleep(0.1)
# Add analysis to the result
word_count = len(result["response"].split())
sentiment = "positive" if "amazing" in result["response"].lower() else "neutral"
return {
**result,
"analysis": {
"word_count": word_count,
"sentiment": sentiment,
"efficiency": word_count / (result["tokens"] or 1)
}
}
def parallel_post_process(
results: List[Dict[str, Any]],
max_workers: int = 4
) -> List[Dict[str, Any]]:
"""
Process results in parallel using thread pool.
Uses concurrent.futures for CPU-bound tasks parallelized across multiple cores.
Args:
results: List of query results to analyze
max_workers: Maximum number of worker threads
Returns:
Processed results with analysis added
"""
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Map the analyze_response function over all results in parallel
analyzed_results = list(executor.map(analyze_response, results))
return analyzed_results
# ========================== Mixed Approach (Advanced) ==========================
async def combined_parallel_processing(
queries: List[str],
model: str = "openai/gpt-3.5-turbo",
max_workers: int = 4
) -> List[Dict[str, Any]]:
"""
Combine async API calls with parallel post-processing.
This advanced function demonstrates how to combine asyncio for I/O-bound
tasks (API calls) with thread pools for CPU-bound tasks (post-processing).
Args:
queries: List of queries to process
model: Model identifier to use
max_workers: Maximum number of worker threads for post-processing
Returns:
Fully processed results with analysis
"""
# First, process all queries concurrently using asyncio
raw_results = await process_all_queries_async(queries, model)
# Then, perform CPU-bound post-processing in a thread pool
# Create a loop to run the post-processing in an executor
loop = asyncio.get_event_loop()
processed_results = await loop.run_in_executor(
None, # Use default executor
lambda: parallel_post_process(raw_results, max_workers)
)
return processed_results
# ========================== Main Example Code ==========================
async def run_async_example():
"""Run the async example and measure performance."""
print("\n=== Running Async Example ===")
start_time = time.time()
results = await process_all_queries_async(QUERIES)
duration = time.time() - start_time
print(f"Processed {len(results)} queries in {duration:.2f} seconds")
print(f"Average time per query: {duration/len(results):.2f} seconds")
# Display first result as sample
print("\nSample response:")
print(f"Query: {results[0]['query']}")
print(f"Response: {results[0]['response']}")
return results
def run_threaded_example():
"""Run the thread pool example and measure performance."""
print("\n=== Running Thread Pool Example ===")
start_time = time.time()
results = process_all_queries_threaded(QUERIES)
duration = time.time() - start_time
print(f"Processed {len(results)} queries in {duration:.2f} seconds")
print(f"Average time per query: {duration/len(results):.2f} seconds")
# Display first result as sample
print("\nSample response:")
print(f"Query: {results[0]['query']}")
print(f"Response: {results[0]['response']}")
return results
async def run_combined_example():
"""Run the combined approach example and measure performance."""
print("\n=== Running Combined Approach Example ===")
start_time = time.time()
results = await combined_parallel_processing(QUERIES)
duration = time.time() - start_time
print(f"Processed {len(results)} queries in {duration:.2f} seconds")
print(f"Average time per query: {duration/len(results):.2f} seconds")
# Display first result as sample
print("\nSample response with analysis:")
print(f"Query: {results[0]['query']}")
print(f"Response: {results[0]['response']}")
print(f"Analysis: {results[0]['analysis']}")
return results
async def compare_approaches():
"""Compare all three approaches."""
print("\n=== Comparing All Approaches ===")
# Sequential processing for baseline
print("\nSequential Processing (Baseline):")
start_time = time.time()
sequential_results = []
for query in QUERIES[:3]: # Use fewer queries for baseline
result = await process_single_query_async(query)
sequential_results.append(result)
sequential_duration = time.time() - start_time
print(f"Processed 3 queries sequentially in {sequential_duration:.2f} seconds")
print(f"Average time per query: {sequential_duration/3:.2f} seconds")
# Async approach
print("\nAsynchronous Approach:")
start_time = time.time()
async_results = await process_all_queries_async(QUERIES)
async_duration = time.time() - start_time
print(f"Processed {len(async_results)} queries in {async_duration:.2f} seconds")
print(f"Average time per query: {async_duration/len(async_results):.2f} seconds")
speedup_async = (sequential_duration/3)/(async_duration/len(async_results))
print(f"Speedup vs sequential: {speedup_async:.2f}x")
# Thread pool approach
print("\nThread Pool Approach:")
start_time = time.time()
threaded_results = process_all_queries_threaded(QUERIES)
threaded_duration = time.time() - start_time
print(f"Processed {len(threaded_results)} queries in {threaded_duration:.2f} seconds")
print(f"Average time per query: {threaded_duration/len(threaded_results):.2f} seconds")
speedup_thread = (sequential_duration/3)/(threaded_duration/len(threaded_results))
print(f"Speedup vs sequential: {speedup_thread:.2f}x")
# Combined approach
print("\nCombined Approach:")
start_time = time.time()
combined_results = await combined_parallel_processing(QUERIES)
combined_duration = time.time() - start_time
print(f"Processed {len(combined_results)} queries in {combined_duration:.2f} seconds")
print(f"Average time per query: {combined_duration/len(combined_results):.2f} seconds")
speedup_combined = (sequential_duration/3)/(combined_duration/len(combined_results))
print(f"Speedup vs sequential: {speedup_combined:.2f}x")
print("\nConclusion:")
print("The best approach depends on your use case:")
print("- Async: best for pure I/O-bound workloads")
print("- Thread pool: simple to use for mixed workloads")
print("- Combined: most control but more complex")
if __name__ == "__main__":
# Run the examples
print("Parallel Operation Example with OneLLM")
print("======================================")
print("This example demonstrates different approaches")
print("to process multiple LLM requests in parallel.")
# Select which example to run (uncomment one)
asyncio.run(run_async_example())
# run_threaded_example()
# asyncio.run(run_combined_example())
# asyncio.run(compare_approaches()) # This runs all approaches and compares them