|
| 1 | +import os |
| 2 | +import time |
| 3 | +from concurrent.futures import ThreadPoolExecutor, as_completed |
| 4 | + |
| 5 | +import anthropic |
| 6 | + |
| 7 | +client = anthropic.Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"]) |
| 8 | + |
| 9 | + |
| 10 | +def make_request(messages): |
| 11 | + response = client.messages.create( |
| 12 | + model="claude-3-5-sonnet-20241022", max_tokens=10, messages=messages |
| 13 | + ) |
| 14 | + return response.usage |
| 15 | + |
| 16 | + |
| 17 | +def make_message(text): |
| 18 | + return { |
| 19 | + "role": "user", |
| 20 | + "content": [ |
| 21 | + { |
| 22 | + "type": "text", |
| 23 | + "text": text, |
| 24 | + } |
| 25 | + ], |
| 26 | + } |
| 27 | + |
| 28 | + |
| 29 | +def add_cache_control(message: dict, cache_type="ephemeral"): |
| 30 | + message["content"][0]["cache_control"] = {"type": cache_type} |
| 31 | + |
| 32 | + |
| 33 | +def remove_cache_control(message: dict): |
| 34 | + if "cache_control" in message["content"][0]: |
| 35 | + del message["content"][0]["cache_control"] |
| 36 | + |
| 37 | + |
| 38 | +def test_rate_limit_single(thread_id): |
| 39 | + # Create ~100k token message that will be cached |
| 40 | + big_text = "This is a large block of text for caching. " * 10000 # ~100k tokens |
| 41 | + medium_text = "This is a large block of text for caching. " * 2000 # ~10k tokens |
| 42 | + |
| 43 | + print(f"Thread {thread_id}: Starting rate limit test with cached content...") |
| 44 | + |
| 45 | + # Rebuild conversation each time (simulating web agent) |
| 46 | + messages = [] |
| 47 | + |
| 48 | + # Add all previous conversation turns |
| 49 | + for i in range(5): |
| 50 | + if i == 0: |
| 51 | + messages.append(make_message(big_text)) |
| 52 | + t0 = time.time() |
| 53 | + else: |
| 54 | + messages.append(make_message(medium_text)) |
| 55 | + add_cache_control(messages[-1]) |
| 56 | + try: |
| 57 | + usage = make_request(messages) |
| 58 | + dt = time.time() - t0 |
| 59 | + print(f"{dt:.2f}: Thread {thread_id}: {usage}") |
| 60 | + except Exception as e: |
| 61 | + print(f"Thread {thread_id}: Error - {e}") |
| 62 | + break |
| 63 | + remove_cache_control(messages[-1]) |
| 64 | + |
| 65 | + |
| 66 | +def test_rate_limit_parallel(num_threads=3): |
| 67 | + print(f"Starting parallel rate limit test with {num_threads} threads...") |
| 68 | + |
| 69 | + with ThreadPoolExecutor(max_workers=num_threads) as executor: |
| 70 | + futures = [executor.submit(test_rate_limit_single, i) for i in range(num_threads)] |
| 71 | + |
| 72 | + for future in as_completed(futures): |
| 73 | + try: |
| 74 | + future.result() |
| 75 | + except Exception as e: |
| 76 | + print(f"Thread completed with error: {e}") |
| 77 | + |
| 78 | + |
| 79 | +def test_rate_limit(): |
| 80 | + # Original single-threaded version |
| 81 | + test_rate_limit_single(0) |
| 82 | + |
| 83 | + |
| 84 | +if __name__ == "__main__": |
| 85 | + # Use parallel version to quickly exhaust rate limits |
| 86 | + test_rate_limit_parallel(num_threads=3) |
| 87 | + |
| 88 | + # Or use original single-threaded version |
| 89 | + # test_rate_limit() |
0 commit comments