4646 from azure.identity import DefaultAzureCredential
4747
4848 # Get events list from environment variable
49- events_list = json.loads(os.environ.get('EVENTS_LIST', ' [{"test": "test"}]'))
49+ events_list = [{"test": n} for n in range(20)]
5050
5151 # Create queue service client using managed identity
5252 account_url = f"https://{os.environ.get('AZURE_QUEUE_STORAGE_ACCOUNT')}.queue.core.windows.net"
7474 permissions :
7575 id-token : write
7676 contents : read
77+ strategy :
78+ matrix :
79+ worker : [1, 2, 3]
7780 steps :
7881 - name : Checkout code
7982 uses : actions/checkout@v4
@@ -107,6 +110,7 @@ jobs:
107110 # Create queue service client using managed identity
108111 account_url = f"https://{os.environ.get('AZURE_QUEUE_STORAGE_ACCOUNT')}.queue.core.windows.net"
109112 queue_name = os.environ.get('AZURE_STORAGE_QUEUE_NAME')
113+ worker_id = os.environ.get('WORKER_ID')
110114
111115 # Use DefaultAzureCredential which will automatically use the managed identity
112116 credential = DefaultAzureCredential()
@@ -122,47 +126,49 @@ jobs:
122126 messages = queue_client.peek_messages(max_messages=1)
123127 if messages:
124128 break
125- print(f"No messages yet, attempt {attempt + 1}/{max_attempts}")
129+ print(f"Worker {worker_id}: No messages yet, attempt {attempt + 1}/{max_attempts}")
126130 time.sleep(2) # Wait 2 seconds between attempts
127131 attempt += 1
128132 except Exception as e:
129- print(f"Error peeking messages: {e}")
133+ print(f"Worker {worker_id}: Error peeking messages: {e}")
130134 time.sleep(2)
131135 attempt += 1
132136
133137 if attempt >= max_attempts:
134- print(" No messages found in queue after waiting - exiting successfully")
135- exit(0) # Exit with success code
138+ print(f"Worker {worker_id}: No messages found in queue after waiting - exiting successfully")
139+ exit(0)
136140
137141 # Receive and process messages
138142 while True:
139143 try:
140144 messages = queue_client.receive_messages(messages_per_page=1)
141145 if not messages:
142- print(" No more messages in queue - exiting successfully")
146+ print(f"Worker {worker_id}: No more messages in queue - exiting successfully")
143147 exit(0)
144148
145149 for message in messages:
146150 try:
147151 content = json.loads(message.content)
148- print(f"Received message: {json.dumps(content, indent=2)}")
152+ print(f"Worker {worker_id}: Received message: {json.dumps(content, indent=2)}")
149153 # Delete the message after processing
150154 queue_client.delete_message(message.id, message.pop_receipt)
151155
152156 # Check if there are more messages
153157 peek_messages = queue_client.peek_messages(max_messages=1)
154158 if not peek_messages:
155- print(" No more messages in queue - exiting successfully")
159+ print(f"Worker {worker_id}: No more messages in queue - exiting successfully")
156160 exit(0)
157161 except json.JSONDecodeError as e:
158- print(f"Error decoding message: {e}")
159- print(f"Raw message content: {message.content}")
162+ print(f"Worker {worker_id}: Error decoding message: {e}")
163+ print(f"Worker {worker_id}: Raw message content: {message.content}")
160164 except Exception as e:
161- print(f"Error processing messages: {e}")
165+ print(f"Worker {worker_id}: Error processing messages: {e}")
162166 break
163167
164- print(" Finished processing all messages")
168+ print(f"Worker {worker_id}: Finished processing all messages")
165169 EOF
170+ env :
171+ WORKER_ID : ${{ matrix.worker }}
166172
167173 cleanup-queue :
168174 needs : read-events
0 commit comments