-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path05_Batch_API_Pipeline.py
More file actions
69 lines (61 loc) · 2.12 KB
/
05_Batch_API_Pipeline.py
File metadata and controls
69 lines (61 loc) · 2.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import json
from openai import OpenAI
"""
Industrial Best Practice: Batch API for Cost & Throughput
---------------------------------------------------------
For non-real-time tasks (classification, data extraction, summary of millions of docs),
use the Batch API to:
1. Save 50% on token costs.
2. Avoid Rate Limits (higher throughput quotas).
3. Scale horizontally without managing concurrent threads.
"""
client = OpenAI(api_key="sk-...")
def create_batch_file(tasks):
"""
Creates a .jsonl file for the Batch API.
Each line must be a valid Request object.
"""
with open("batch_tasks.jsonl", "w") as f:
for i, task in enumerate(tasks):
request = {
"custom_id": f"request-{i}",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": task}],
"max_tokens": 1000
}
}
f.write(json.dumps(request) + "\n")
def submit_batch():
# 1. Upload file
batch_file = client.files.create(
file=open("batch_tasks.jsonl", "rb"),
purpose="batch"
)
# 2. Create batch job
batch_job = client.batches.create(
input_file_id=batch_file.id,
endpoint="/v1/chat/completions",
completion_window="24h", # Currently the only supported window
metadata={"description": "Nightly data processing"}
)
print(f"Batch Job Created: {batch_job.id}")
return batch_job.id
def check_status(batch_id):
status = client.batches.retrieve(batch_id)
print(f"Status: {status.status}")
if status.status == "completed":
print(f"Output File ID: {status.output_file_id}")
# Use client.files.content(status.output_file_id) to download
return status
if __name__ == "__main__":
tasks = [
"Classify this as SPAM or NOT: Buy cheap watches!",
"Summarize the history of Rome in 1 sentence.",
"What is 2+2?"
]
# create_batch_file(tasks)
# submit_batch()
print("Batch API Pipeline structure ready.")