22import json
33from datetime import datetime
44from functools import partial
5+ from inspect import getfullargspec
56from pathlib import Path
67from queue import Empty , Queue
78
8- import requests
9- from jose import jwt
9+ from sqlmodel import Session
1010from workflows .transport .pika_transport import PikaTransport
1111
12+ import murfey .server .api .auth
13+ import murfey .server .api .bootstrap
14+ import murfey .server .api .clem
15+ import murfey .server .api .display
16+ import murfey .server .api .file_io_frontend
17+ import murfey .server .api .file_io_instrument
18+ import murfey .server .api .hub
19+ import murfey .server .api .instrument
20+ import murfey .server .api .mag_table
21+ import murfey .server .api .processing_parameters
22+ import murfey .server .api .prometheus
23+ import murfey .server .api .session_control
24+ import murfey .server .api .session_info
25+ import murfey .server .api .websocket
26+ import murfey .server .api .workflow
27+ from murfey .server .murfey_db import get_murfey_db_session
1228from murfey .util .config import security_from_file
1329
1430
@@ -85,26 +101,43 @@ def handle_dlq_messages(messages_path: list[Path], rabbitmq_credentials: Path):
85101 transport .disconnect ()
86102
87103
88- def handle_failed_posts (messages_path : list [Path ], token : str ):
104+ def handle_failed_posts (messages_path : list [Path ], murfey_db : Session ):
89105 """Deal with any messages that have been sent as failed client posts"""
90106 for json_file in messages_path :
91107 with open (json_file , "r" ) as json_data :
92108 message = json .load (json_data )
93-
94- if not message .get ("message" ) or not message ["message" ].get ("url" ):
95- print (f"{ json_file } is not a failed client post" )
109+ router_name = message .get ("router_name" , "" )
110+ router_base = router_name .split ("." )[0 ]
111+ function_name = message .get ("function_name" , "" )
112+ if not router_name or not function_name :
113+ print (
114+ f"Cannot repost { json_file } as it does not have a router or function name"
115+ )
96116 continue
97- dest = message ["message" ]["url" ]
98- message_json = message ["message" ]["json" ]
99117
100- response = requests . post (
101- dest , json = message_json , headers = { "Authorization" : f"Bearer { token } " }
118+ function_to_call = getattr (
119+ getattr ( murfey . server . api , router_base ), function_name
102120 )
103- if response .status_code != 200 :
104- print (f"Failed to repost { json_file } " )
105- else :
121+ expected_args = getfullargspec (function_to_call ).args
122+
123+ call_kwargs = message .get ("kwargs" , {})
124+ call_data = message .get ("data" , {})
125+ function_call_dict = {}
126+
127+ for call_arg in expected_args :
128+ if call_arg in call_kwargs .keys ():
129+ function_call_dict [call_arg ] = call_kwargs [call_arg ]
130+ elif call_arg == "db" :
131+ function_call_dict ["db" ] = murfey_db
132+ else :
133+ function_call_dict [call_arg ] = call_data
134+
135+ try :
136+ function_to_call (** function_call_dict )
106137 print (f"Reposted { json_file } " )
107138 json_file .unlink ()
139+ except Exception as e :
140+ print (f"Failed to post { json_file } to { function_name } : { e } " )
108141
109142
110143def run ():
@@ -123,26 +156,14 @@ def run():
123156 help = "Security config file" ,
124157 required = True ,
125158 )
126- parser .add_argument (
127- "-u" ,
128- "--username" ,
129- help = "Token username" ,
130- required = True ,
131- )
132159 parser .add_argument (
133160 "-d" , "--dir" , default = "DLQ" , help = "Directory to export messages to"
134161 )
135162 args = parser .parse_args ()
136163
137164 # Read the security config file
138165 security_config = security_from_file (args .config )
139-
140- # Get the token to post to the api with
141- token = jwt .encode (
142- {"user" : args .username },
143- security_config .auth_key ,
144- algorithm = security_config .auth_algorithm ,
145- )
166+ murfey_db = get_murfey_db_session (security_config )
146167
147168 # Purge the queue and repost/reinject any messages found
148169 dlq_dump_path = Path (args .dir )
@@ -152,7 +173,7 @@ def run():
152173 security_config .feedback_queue ,
153174 security_config .rabbitmq_credentials ,
154175 )
155- handle_failed_posts (exported_messages , token )
176+ handle_failed_posts (exported_messages , murfey_db )
156177 handle_dlq_messages (exported_messages , security_config .rabbitmq_credentials )
157178
158179 # Clean up any created directories
0 commit comments