1+ import asyncio
2+ import ssl
3+ import logging
4+ import os
5+ import json
6+ import sys
7+ from aiokafka .abc import AbstractTokenProvider
8+ from aiokafka import AIOKafkaProducer
9+ from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
10+ from botocore .session import get_session
11+
12+ # --- Configuration Section ---
13+ logging .basicConfig (
14+ level = logging .INFO ,
15+ format = "%(asctime)s [%(levelname)s] %(name)s - %(message)s"
16+ )
17+ log = logging .getLogger ("msk_iam_producer" )
18+
19+ # === MSK Cluster Config ===
20+ BOOTSTRAP_SERVERS = "b-1-public.your-msk-cluster.amazonaws.com:9198,b-2-public.your-msk-cluster.amazonaws.com:9198"
21+ TOPICS = ["example-topic" ]
22+
23+ # === AWS IAM Config ===
24+ AWS_ACCESS_KEY_ID = None
25+ AWS_SECRET_ACCESS_KEY = None
26+ AWS_SESSION_TOKEN = None
27+
28+ # === Producer Config ===
29+ CLIENT_ID = "iam-producer-instance-1"
30+ PRODUCER_NAME = "iam-producer-instance-1"
31+ MAX_BATCH_SIZE = 16384
32+ LINGER_MS = 100
33+
34+ examples_path = os .path .abspath (os .path .join (os .path .dirname (__file__ ), '..' ))
35+ sys .path .append (examples_path )
36+ from json_generator import generate_random_json
37+
38+
39+ class MSKIAMTokenProvider (AbstractTokenProvider ):
40+ """Custom TokenProvider that uses botocore to handle credential discovery."""
41+ def __init__ (self , access_key = None , secret_key = None , session_token = None ):
42+ self ._botocore_session = get_session ()
43+ if access_key and secret_key :
44+ log .info ("Overwriting default credentials with keys provided directly in the script." )
45+ self ._botocore_session .set_credentials (access_key , secret_key , session_token )
46+ else :
47+ log .info ("Using botocore's default credential discovery." )
48+
49+ self .region = self ._botocore_session .get_config_variable ('region' )
50+ if not self .region :
51+ raise ValueError ("AWS Region not found." )
52+ log .info (f"Token provider initialized for region: { self .region } " )
53+
54+ async def token (self ) -> str :
55+ token , _ = MSKAuthTokenProvider .generate_auth_token (self .region , self ._botocore_session )
56+ return token
57+
58+
59+ async def create_producer (client_id , ** aws_creds ):
60+ """Creates and starts an MSK IAM-compatible AIOKafkaProducer."""
61+ log .info (f"Creating async producer with client.id: { client_id } " )
62+ context = ssl .create_default_context ()
63+ context .check_hostname = False
64+ context .verify_mode = ssl .CERT_NONE
65+ token_provider = MSKIAMTokenProvider (** aws_creds )
66+
67+ producer = AIOKafkaProducer (
68+ bootstrap_servers = BOOTSTRAP_SERVERS ,
69+ client_id = client_id ,
70+ security_protocol = "SASL_SSL" ,
71+ sasl_mechanism = "OAUTHBEARER" ,
72+ sasl_oauth_token_provider = token_provider ,
73+ ssl_context = context ,
74+ key_serializer = lambda k : k .encode ("utf-8" ),
75+ value_serializer = lambda v : json .dumps (v ).encode ("utf-8" ),
76+ max_batch_size = MAX_BATCH_SIZE ,
77+ linger_ms = LINGER_MS ,
78+ )
79+ await producer .start ()
80+ return producer
81+
82+
83+ async def send_messages_to_topics (producer , topics , producer_name , num_messages = 50 ):
84+ """Generates and sends messages using the provided producer instance."""
85+ successful = 0
86+ failed = 0
87+ log .info (f"Sending { num_messages } messages from logical producer '{ producer_name } ' to topics: { topics } " )
88+
89+ for i in range (num_messages ):
90+ try :
91+ message = generate_random_json (min_size_kb = 1 )
92+ message ['message_number' ] = i + 1
93+ message ['producer' ] = producer_name
94+ key = f"msg-{ i + 1 } "
95+ for topic in topics :
96+ await producer .send (topic , key = key , value = message )
97+ log .info (f"Queued message #{ i + 1 } with key '{ key } ' for all topics." )
98+ successful += 1
99+ except Exception as e :
100+ failed += 1
101+ log .error (f"Failed to queue message #{ i + 1 } : { e } " )
102+
103+ log .info ("Flushing final messages..." )
104+ await producer .flush ()
105+ log .info (f"--- '{ producer_name } ' Summary: { successful } queued, { failed } failed ---" )
106+
107+
108+ async def main ():
109+ """The main entry point for the script."""
110+ producer = None
111+ try :
112+ producer = await create_producer (
113+ CLIENT_ID ,
114+ access_key = AWS_ACCESS_KEY_ID ,
115+ secret_key = AWS_SECRET_ACCESS_KEY ,
116+ session_token = AWS_SESSION_TOKEN
117+ )
118+ await send_messages_to_topics (producer , TOPICS , PRODUCER_NAME )
119+ except Exception as e :
120+ log .exception (f"A critical error occurred in main: { e } " )
121+ finally :
122+ if producer :
123+ log .info ("Attempting to stop the producer..." )
124+ try :
125+ await producer .flush ()
126+ await producer .stop ()
127+ log .info ("Producer stopped successfully." )
128+ except Exception as e :
129+ log .exception (f"An error occurred while stopping the producer: { e } " )
130+
131+
132+ if __name__ == "__main__" :
133+ asyncio .run (main ())
0 commit comments