|
| 1 | +import queue |
| 2 | +import sys |
| 3 | +from datetime import datetime |
| 4 | +from multiprocessing.pool import ThreadPool |
| 5 | + |
1 | 6 | from volcenginesdkarkruntime import Ark |
2 | 7 |
|
3 | 8 | # Authentication |
|
10 | 15 | # or specify ak&sk by Ark(ak="${YOUR_AK}", sk="${YOUR_SK}"). |
11 | 16 | # To get your ak&sk, please refer to this document(https://www.volcengine.com/docs/6291/65568) |
12 | 17 | # For more information,please check this document(https://www.volcengine.com/docs/82379/1263279) |
13 | | -client = Ark() |
| 18 | + |
| 19 | + |
| 20 | +def worker( |
| 21 | + worker_id: int, |
| 22 | + client: Ark, |
| 23 | + requests: queue.Queue[dict], |
| 24 | +): |
| 25 | + print(f"Worker {worker_id} is starting.") |
| 26 | + |
| 27 | + while True: |
| 28 | + request = requests.get() |
| 29 | + |
| 30 | + # check for signal of no more request |
| 31 | + if not request: |
| 32 | + # put back on the queue for other workers |
| 33 | + requests.put(request) |
| 34 | + return |
| 35 | + |
| 36 | + try: |
| 37 | + # do request |
| 38 | + completion = client.batch_chat.completions.create(**request) |
| 39 | + print(completion) |
| 40 | + except Exception as e: |
| 41 | + print(e, file=sys.stderr) |
| 42 | + finally: |
| 43 | + requests.task_done() |
| 44 | + |
| 45 | + |
| 46 | +def main(): |
| 47 | + start = datetime.now() |
| 48 | + max_concurrent_tasks, task_num = 1000, 10000 |
| 49 | + |
| 50 | + requests = queue.Queue() |
| 51 | + client = Ark(timeout=24 * 3600) |
| 52 | + |
| 53 | + # mock `task_num` tasks |
| 54 | + for _ in range(task_num): |
| 55 | + requests.put( |
| 56 | + { |
| 57 | + "model": "${YOUR_ENDPOINT_ID}", |
| 58 | + "messages": [ |
| 59 | + { |
| 60 | + "role": "system", |
| 61 | + "content": "你是豆包,是由字节跳动开发的 AI 人工智能助手", |
| 62 | + }, |
| 63 | + {"role": "user", "content": "常见的十字花科植物有哪些?"}, |
| 64 | + ], |
| 65 | + } |
| 66 | + ) |
| 67 | + |
| 68 | + # put a signal of no more request |
| 69 | + requests.put(None) |
| 70 | + |
| 71 | + # create `max_concurrent_tasks` workers and start them |
| 72 | + with ThreadPool(max_concurrent_tasks) as pool: |
| 73 | + for i in range(max_concurrent_tasks): |
| 74 | + pool.apply_async(worker, args=(i, client, requests)) |
| 75 | + pool.apply_async(worker, args=(i, client, requests)) |
| 76 | + |
| 77 | + # wait for all request to done |
| 78 | + pool.close() |
| 79 | + pool.join() |
| 80 | + |
| 81 | + client.close() |
| 82 | + |
| 83 | + end = datetime.now() |
| 84 | + print(f"Total time: {end - start}, Total task: {task_num}") |
| 85 | + |
14 | 86 |
|
15 | 87 | if __name__ == "__main__": |
16 | | - # Non-streaming: |
17 | | - print("----- standard request -----") |
18 | | - completion = client.batch_chat.completions.create( |
19 | | - model="${YOUR_ENDPOINT_ID}", |
20 | | - messages=[ |
21 | | - {"role": "system", "content": "你是豆包,是由字节跳动开发的 AI 人工智能助手"}, |
22 | | - {"role": "user", "content": "常见的十字花科植物有哪些?"}, |
23 | | - ], |
24 | | - ) |
25 | | - print(completion.choices[0].message.content) |
| 88 | + main() |
0 commit comments