2525from inspect import signature
2626from typing import TYPE_CHECKING , Any , Awaitable , Callable , ClassVar
2727
28+ from prometheus_client import Gauge
29+
2830from twisted .internet .error import ConnectError , DNSLookupError
2931from twisted .web .server import Request
3032
4749
4850logger = logging .getLogger (__name__ )
4951
50- _pending_outgoing_requests = meter . create_up_down_counter (
52+ _pending_outgoing_requests = Gauge (
5153 "synapse_pending_outgoing_replication_requests" ,
52- description = "Number of active outgoing replication requests, by replication method name" ,
54+ "Number of active outgoing replication requests, by replication method name" ,
55+ labelnames = ["name" , SERVER_NAME_LABEL ],
5356)
5457
5558_outgoing_request_counter = meter .create_counter (
@@ -209,9 +212,9 @@ def make_client(cls, hs: "HomeServer") -> Callable:
209212
210213 instance_map = hs .config .worker .instance_map
211214
212- outgoing_gauge = meter . create_up_down_counter (
213- "synapse_pending_outgoing_replication_requests" ,
214- description = "Number of active outgoing replication requests, by replication method name" ,
215+ outgoing_gauge = _pending_outgoing_requests . labels (
216+ name = cls . NAME ,
217+ ** { SERVER_NAME_LABEL : server_name } ,
215218 )
216219
217220 replication_secret = None
@@ -228,143 +231,65 @@ async def send_request(
228231 streams = hs .get_replication_command_handler ().get_streams_to_replicate ()
229232 replication = hs .get_replication_data_handler ()
230233
231- outgoing_gauge .add (
232- 1 ,
233- {"name" : cls .NAME , SERVER_NAME_LABEL : server_name },
234- )
235- if instance_name == local_instance_name :
236- raise Exception ("Trying to send HTTP request to self" )
237- if instance_name not in instance_map :
238- raise Exception (
239- "Instance %r not in 'instance_map' config" % (instance_name ,)
240- )
241-
242- data = await cls ._serialize_payload (** kwargs )
243-
244- if cls .METHOD != "GET" and cls .WAIT_FOR_STREAMS :
245- # Include the current stream positions that we write to. We
246- # don't do this for GETs as they don't have a body, and we
247- # generally assume that a GET won't rely on data we have
248- # written.
249- if _STREAM_POSITION_KEY in data :
234+ with outgoing_gauge .track_inprogress ():
235+ if instance_name == local_instance_name :
236+ raise Exception ("Trying to send HTTP request to self" )
237+ if instance_name not in instance_map :
250238 raise Exception (
251- "data to send contains %r key" , _STREAM_POSITION_KEY
239+ "Instance %r not in 'instance_map' config" % ( instance_name ,)
252240 )
253241
254- data [_STREAM_POSITION_KEY ] = {
255- "streams" : {
256- stream .NAME : stream .minimal_local_current_token ()
257- for stream in streams
258- },
259- "instance_name" : local_instance_name ,
260- }
261-
262- url_args = [
263- urllib .parse .quote (kwargs [name ], safe = "" ) for name in cls .PATH_ARGS
264- ]
265-
266- if cls .CACHE :
267- txn_id = random_string (10 )
268- url_args .append (txn_id )
269-
270- if cls .METHOD == "POST" :
271- request_func : Callable [..., Awaitable [Any ]] = client .post_json_get_json
272- elif cls .METHOD == "PUT" :
273- request_func = client .put_json
274- elif cls .METHOD == "GET" :
275- request_func = client .get_json
276- else :
277- # We have already asserted in the constructor that a
278- # compatible was picked, but lets be paranoid.
279- raise Exception (
280- "Unknown METHOD on %s replication endpoint" % (cls .NAME ,)
281- )
282-
283- # Hard code a special scheme to show this only used for replication. The
284- # instance_name will be passed into the ReplicationEndpointFactory to
285- # determine connection details from the instance_map.
286- uri = "synapse-replication://%s/_synapse/replication/%s/%s" % (
287- instance_name ,
288- cls .NAME ,
289- "/" .join (url_args ),
290- )
242+ data = await cls ._serialize_payload (** kwargs )
291243
292- headers : Dict [bytes , List [bytes ]] = {}
293- # Add an authorization header, if configured.
294- if replication_secret :
295- headers [b"Authorization" ] = [b"Bearer " + replication_secret ]
296- opentracing .inject_header_dict (headers , check_destination = False )
297-
298- try :
299- # Keep track of attempts made so we can bail if we don't manage to
300- # connect to the target after N tries.
301- attempts = 0
302- # We keep retrying the same request for timeouts. This is so that we
303- # have a good idea that the request has either succeeded or failed
304- # on the master, and so whether we should clean up or not.
305- while True :
306- try :
307- result = await request_func (uri , data , headers = headers )
308- break
309- except RequestTimedOutError :
310- if not cls .RETRY_ON_TIMEOUT :
311- raise
312-
313- logger .warning ("%s request timed out; retrying" , cls .NAME )
314-
315- # If we timed out we probably don't need to worry about backing
316- # off too much, but lets just wait a little anyway.
317- await clock .sleep (1 )
318- except (ConnectError , DNSLookupError ) as e :
319- if not cls .RETRY_ON_CONNECT_ERROR :
320- raise
321- if attempts > cls .RETRY_ON_CONNECT_ERROR_ATTEMPTS :
322- raise
323-
324- delay = 2 ** attempts
325- logger .warning (
326- "%s request connection failed; retrying in %ds: %r" ,
327- cls .NAME ,
328- delay ,
329- e ,
244+ if cls .METHOD != "GET" and cls .WAIT_FOR_STREAMS :
245+ # Include the current stream positions that we write to. We
246+ # don't do this for GETs as they don't have a body, and we
247+ # generally assume that a GET won't rely on data we have
248+ # written.
249+ if _STREAM_POSITION_KEY in data :
250+ raise Exception (
251+ "data to send contains %r key" , _STREAM_POSITION_KEY
330252 )
331253
332- await clock .sleep (delay )
333- attempts += 1
334- except HttpResponseException as e :
335- # We convert to SynapseError as we know that it was a SynapseError
336- # on the main process that we should send to the client. (And
337- # importantly, not stack traces everywhere)
338- _outgoing_request_counter .add (
339- 1 ,
340- {
341- "name" : cls .NAME ,
342- "code" : e .code ,
343- SERVER_NAME_LABEL : server_name ,
344- },
345- )
346- raise e .to_synapse_error ()
347- except Exception as e :
348- _outgoing_request_counter .add (
349- 1 ,
350- {
351- "name" : cls .NAME ,
352- "code" : "ERR" ,
353- SERVER_NAME_LABEL : server_name ,
354- },
254+ data [_STREAM_POSITION_KEY ] = {
255+ "streams" : {
256+ stream .NAME : stream .minimal_local_current_token ()
257+ for stream in streams
258+ },
259+ "instance_name" : local_instance_name ,
260+ }
261+
262+ url_args = [
263+ urllib .parse .quote (kwargs [name ], safe = "" ) for name in cls .PATH_ARGS
264+ ]
265+
266+ if cls .CACHE :
267+ txn_id = random_string (10 )
268+ url_args .append (txn_id )
269+
270+ if cls .METHOD == "POST" :
271+ request_func : Callable [..., Awaitable [Any ]] = (
272+ client .post_json_get_json
273+ )
274+ elif cls .METHOD == "PUT" :
275+ request_func = client .put_json
276+ elif cls .METHOD == "GET" :
277+ request_func = client .get_json
278+ else :
279+ # We have already asserted in the constructor that a
280+ # compatible was picked, but lets be paranoid.
281+ raise Exception (
282+ "Unknown METHOD on %s replication endpoint" % (cls .NAME ,)
283+ )
284+
285+ # Hard code a special scheme to show this only used for replication. The
286+ # instance_name will be passed into the ReplicationEndpointFactory to
287+ # determine connection details from the instance_map.
288+ uri = "synapse-replication://%s/_synapse/replication/%s/%s" % (
289+ instance_name ,
290+ cls .NAME ,
291+ "/" .join (url_args ),
355292 )
356- raise SynapseError (
357- 502 , f"Failed to talk to { instance_name } process"
358- ) from e
359-
360- _outgoing_request_counter .add (
361- 1 ,
362- {
363- "name" : cls .NAME ,
364- "code" : 200 ,
365- SERVER_NAME_LABEL : server_name ,
366- },
367- )
368293
369294 headers : dict [bytes , list [bytes ]] = {}
370295 # Add an authorization header, if configured.
@@ -412,27 +337,32 @@ async def send_request(
412337 # We convert to SynapseError as we know that it was a SynapseError
413338 # on the main process that we should send to the client. (And
414339 # importantly, not stack traces everywhere)
415- _outgoing_request_counter .labels (
416- name = cls .NAME ,
417- code = e .code ,
418- ** {SERVER_NAME_LABEL : server_name },
419- ).inc ()
340+ _outgoing_request_counter .add (
341+ 1 ,
342+ {
343+ "name" : cls .NAME ,
344+ "code" : e .code ,
345+ SERVER_NAME_LABEL : server_name ,
346+ },
347+ )
420348 raise e .to_synapse_error ()
421349 except Exception as e :
422- _outgoing_request_counter .labels (
423- name = cls .NAME ,
424- code = "ERR" ,
425- ** {SERVER_NAME_LABEL : server_name },
426- ).inc ()
350+ _outgoing_request_counter .add (
351+ 1 ,
352+ {
353+ "name" : cls .NAME ,
354+ "code" : "ERR" ,
355+ SERVER_NAME_LABEL : server_name ,
356+ },
357+ )
427358 raise SynapseError (
428359 502 , f"Failed to talk to { instance_name } process"
429360 ) from e
430361
431- _outgoing_request_counter .labels (
432- name = cls .NAME ,
433- code = 200 ,
434- ** {SERVER_NAME_LABEL : server_name },
435- ).inc ()
362+ _outgoing_request_counter .add (
363+ 1 ,
364+ {"name" : cls .NAME , "code" : 200 , SERVER_NAME_LABEL : server_name },
365+ )
436366
437367 # Wait on any streams that the remote may have written to.
438368 for stream_name , position in result .pop (
@@ -444,7 +374,7 @@ async def send_request(
444374 position = position ,
445375 )
446376
447- return result
377+ return result
448378
449379 return send_request
450380
0 commit comments