@@ -182,85 +182,87 @@ def make_client(cls, hs):
182182 )
183183
184184 @trace (opname = "outgoing_replication_request" )
185- @outgoing_gauge .track_inprogress ()
186185 async def send_request (* , instance_name = "master" , ** kwargs ):
187- if instance_name == local_instance_name :
188- raise Exception ("Trying to send HTTP request to self" )
189- if instance_name == "master" :
190- host = master_host
191- port = master_port
192- elif instance_name in instance_map :
193- host = instance_map [instance_name ].host
194- port = instance_map [instance_name ].port
195- else :
196- raise Exception (
197- "Instance %r not in 'instance_map' config" % (instance_name ,)
186+ with outgoing_gauge .track_inprogress ():
187+ if instance_name == local_instance_name :
188+ raise Exception ("Trying to send HTTP request to self" )
189+ if instance_name == "master" :
190+ host = master_host
191+ port = master_port
192+ elif instance_name in instance_map :
193+ host = instance_map [instance_name ].host
194+ port = instance_map [instance_name ].port
195+ else :
196+ raise Exception (
197+ "Instance %r not in 'instance_map' config" % (instance_name ,)
198+ )
199+
200+ data = await cls ._serialize_payload (** kwargs )
201+
202+ url_args = [
203+ urllib .parse .quote (kwargs [name ], safe = "" ) for name in cls .PATH_ARGS
204+ ]
205+
206+ if cls .CACHE :
207+ txn_id = random_string (10 )
208+ url_args .append (txn_id )
209+
210+ if cls .METHOD == "POST" :
211+ request_func = client .post_json_get_json
212+ elif cls .METHOD == "PUT" :
213+ request_func = client .put_json
214+ elif cls .METHOD == "GET" :
215+ request_func = client .get_json
216+ else :
217+ # We have already asserted in the constructor that a
218+ # compatible was picked, but lets be paranoid.
219+ raise Exception (
220+ "Unknown METHOD on %s replication endpoint" % (cls .NAME ,)
221+ )
222+
223+ uri = "http://%s:%s/_synapse/replication/%s/%s" % (
224+ host ,
225+ port ,
226+ cls .NAME ,
227+ "/" .join (url_args ),
198228 )
199229
200- data = await cls ._serialize_payload (** kwargs )
201-
202- url_args = [
203- urllib .parse .quote (kwargs [name ], safe = "" ) for name in cls .PATH_ARGS
204- ]
205-
206- if cls .CACHE :
207- txn_id = random_string (10 )
208- url_args .append (txn_id )
209-
210- if cls .METHOD == "POST" :
211- request_func = client .post_json_get_json
212- elif cls .METHOD == "PUT" :
213- request_func = client .put_json
214- elif cls .METHOD == "GET" :
215- request_func = client .get_json
216- else :
217- # We have already asserted in the constructor that a
218- # compatible was picked, but lets be paranoid.
219- raise Exception (
220- "Unknown METHOD on %s replication endpoint" % (cls .NAME ,)
221- )
222-
223- uri = "http://%s:%s/_synapse/replication/%s/%s" % (
224- host ,
225- port ,
226- cls .NAME ,
227- "/" .join (url_args ),
228- )
229-
230- try :
231- # We keep retrying the same request for timeouts. This is so that we
232- # have a good idea that the request has either succeeded or failed on
233- # the master, and so whether we should clean up or not.
234- while True :
235- headers : Dict [bytes , List [bytes ]] = {}
236- # Add an authorization header, if configured.
237- if replication_secret :
238- headers [b"Authorization" ] = [b"Bearer " + replication_secret ]
239- opentracing .inject_header_dict (headers , check_destination = False )
240- try :
241- result = await request_func (uri , data , headers = headers )
242- break
243- except RequestTimedOutError :
244- if not cls .RETRY_ON_TIMEOUT :
245- raise
246-
247- logger .warning ("%s request timed out; retrying" , cls .NAME )
248-
249- # If we timed out we probably don't need to worry about backing
250- # off too much, but lets just wait a little anyway.
251- await clock .sleep (1 )
252- except HttpResponseException as e :
253- # We convert to SynapseError as we know that it was a SynapseError
254- # on the main process that we should send to the client. (And
255- # importantly, not stack traces everywhere)
256- _outgoing_request_counter .labels (cls .NAME , e .code ).inc ()
257- raise e .to_synapse_error ()
258- except Exception as e :
259- _outgoing_request_counter .labels (cls .NAME , "ERR" ).inc ()
260- raise SynapseError (502 , "Failed to talk to main process" ) from e
261-
262- _outgoing_request_counter .labels (cls .NAME , 200 ).inc ()
263- return result
230+ try :
231+ # We keep retrying the same request for timeouts. This is so that we
232+ # have a good idea that the request has either succeeded or failed
233+ # on the master, and so whether we should clean up or not.
234+ while True :
235+ headers : Dict [bytes , List [bytes ]] = {}
236+ # Add an authorization header, if configured.
237+ if replication_secret :
238+ headers [b"Authorization" ] = [
239+ b"Bearer " + replication_secret
240+ ]
241+ opentracing .inject_header_dict (headers , check_destination = False )
242+ try :
243+ result = await request_func (uri , data , headers = headers )
244+ break
245+ except RequestTimedOutError :
246+ if not cls .RETRY_ON_TIMEOUT :
247+ raise
248+
249+ logger .warning ("%s request timed out; retrying" , cls .NAME )
250+
251+ # If we timed out we probably don't need to worry about backing
252+ # off too much, but lets just wait a little anyway.
253+ await clock .sleep (1 )
254+ except HttpResponseException as e :
255+ # We convert to SynapseError as we know that it was a SynapseError
256+ # on the main process that we should send to the client. (And
257+ # importantly, not stack traces everywhere)
258+ _outgoing_request_counter .labels (cls .NAME , e .code ).inc ()
259+ raise e .to_synapse_error ()
260+ except Exception as e :
261+ _outgoing_request_counter .labels (cls .NAME , "ERR" ).inc ()
262+ raise SynapseError (502 , "Failed to talk to main process" ) from e
263+
264+ _outgoing_request_counter .labels (cls .NAME , 200 ).inc ()
265+ return result
264266
265267 return send_request
266268
0 commit comments