11import argparse
22import json
33import os
4- import time
54from datetime import datetime
65from functools import partial
76from pathlib import Path
1110from jose import jwt
1211from workflows .transport .pika_transport import PikaTransport
1312
14- dlq_dump_path = Path ("./DLQ" )
1513
16-
17- def dlq_purge (queue : str , rabbitmq_credentials : Path ) -> list [Path ]:
14+ def dlq_purge (
15+ dlq_dump_path : Path , queue : str , rabbitmq_credentials : Path
16+ ) -> list [Path ]:
1817 transport = PikaTransport ()
1918 transport .load_configuration_file (rabbitmq_credentials )
2019 transport .connect ()
@@ -26,26 +25,8 @@ def dlq_purge(queue: str, rabbitmq_credentials: Path) -> list[Path]:
2625 def receive_dlq_message (header : dict , message : dict ) -> None :
2726 idlequeue .put_nowait ("start" )
2827 header ["x-death" ][0 ]["time" ] = datetime .timestamp (header ["x-death" ][0 ]["time" ])
29-
30- timestamp = time .localtime (int (header ["x-death" ][0 ]["time" ]))
31- filepath = dlq_dump_path / time .strftime ("%Y-%m-%d" , timestamp )
32- filepath .mkdir (parents = True , exist_ok = True )
33- filename = filepath / (
34- f"{ queue } -"
35- + time .strftime ("%Y%m%d-%H%M%S" , timestamp )
36- + "-"
37- + str (header ["message-id" ])
38- )
39-
40- dlqmsg = {
41- "exported" : {
42- "date" : time .strftime ("%Y-%m-%d" ),
43- "time" : time .strftime ("%H:%M:%S" ),
44- },
45- "header" : header ,
46- "message" : message ,
47- }
48-
28+ filename = dlq_dump_path / (f"{ queue } -" + str (header ["message-id" ]))
29+ dlqmsg = {"header" : header , "message" : message }
4930 with filename .open ("w" ) as fh :
5031 json .dump (dlqmsg , fh , indent = 2 , sort_keys = True )
5132 print (f"Message { header ['message-id' ]} exported to { filename } " )
@@ -60,11 +41,10 @@ def receive_dlq_message(header: dict, message: dict) -> None:
6041 acknowledgement = True ,
6142 )
6243 try :
63- idlequeue .get (True , 3 )
6444 while True :
6545 idlequeue .get (True , 0.1 )
6646 except Empty :
67- print ("Done. " )
47+ print ("Done dlq purge " )
6848 transport .disconnect ()
6949 return exported_messages
7050
@@ -75,20 +55,10 @@ def handle_dlq_messages(messages_path: list[Path], rabbitmq_credentials: Path):
7555 transport .connect ()
7656
7757 for f , dlqfile in enumerate (messages_path ):
78- if not Path (dlqfile ).is_file ():
79- print (f"Ignoring missing file { dlqfile } " )
58+ if not dlqfile .is_file ():
8059 continue
8160 with open (dlqfile ) as fh :
8261 dlqmsg = json .load (fh )
83- print (f"Parsing message from { dlqfile } " )
84- if (
85- not isinstance (dlqmsg , dict )
86- or not dlqmsg .get ("header" )
87- or not dlqmsg .get ("message" )
88- ):
89- print (f"File { dlqfile } is not a valid DLQ message." )
90- continue
91-
9262 header = dlqmsg ["header" ]
9363 header ["dlq-reinjected" ] = "True"
9464
@@ -109,7 +79,7 @@ def handle_dlq_messages(messages_path: list[Path], rabbitmq_credentials: Path):
10979 headers = clean_header ,
11080 )
11181 dlqfile .unlink ()
112- print (f"Done { dlqfile } \n " )
82+ print (f"Reinjected { dlqfile } \n " )
11383
11484 transport .disconnect ()
11585
@@ -158,6 +128,9 @@ def run():
158128 help = "Token username" ,
159129 required = True ,
160130 )
131+ parser .add_argument (
132+ "-d" , "--dir" , default = "DLQ" , help = "Directory to export messages to"
133+ )
161134 args = parser .parse_args ()
162135
163136 # Set the environment variable then read it by importing the security config
@@ -174,18 +147,17 @@ def run():
174147 )
175148
176149 # Purge the queue and repost/reinject any messages found
150+ dlq_dump_path = Path (args .dir )
151+ dlq_dump_path .mkdir (parents = True , exist_ok = True )
177152 exported_messages = dlq_purge (
178- security_config .feedback_queue , security_config .rabbitmq_credentials
153+ dlq_dump_path ,
154+ security_config .feedback_queue ,
155+ security_config .rabbitmq_credentials ,
179156 )
180157 handle_failed_posts (exported_messages , token )
181158 handle_dlq_messages (exported_messages , security_config .rabbitmq_credentials )
182159
183160 # Clean up any created directories
184- for date_directory in dlq_dump_path .glob ("*" ):
185- try :
186- date_directory .rmdir ()
187- except OSError :
188- print (f"Cannot remove { date_directory } as it is not empty" )
189161 try :
190162 dlq_dump_path .rmdir ()
191163 except OSError :
0 commit comments