@@ -341,3 +341,126 @@ async def fetch_relayed_messages(
341341 "to" : [relay_did ],
342342 }
343343 await send_http_message (dmp , my_did , message , target = relay_did )
344+
345+
346+ # '##:::::'##:'########:'########:::'######:::'#######:::'######::'##:::'##:
347+ # ##:'##: ##: ##.....:: ##.... ##:'##... ##:'##.... ##:'##... ##: ##::'##::
348+ # ##: ##: ##: ##::::::: ##:::: ##: ##:::..:: ##:::: ##: ##:::..:: ##:'##:::
349+ # ##: ##: ##: ######::: ########::. ######:: ##:::: ##: ##::::::: #####::::
350+ # ##: ##: ##: ##...:::: ##.... ##::..... ##: ##:::: ##: ##::::::: ##. ##:::
351+ # ##: ##: ##: ##::::::: ##:::: ##:'##::: ##: ##:::: ##: ##::: ##: ##:. ##::
352+ # . ###. ###:: ########: ########::. ######::. #######::. ######:: ##::. ##:
353+ # :...::...:::........::........::::......::::.......::::......:::..::::..::
354+
355+ # The following functions are used for establishing a persistant WebSocket
356+ # connection with a mediator/relay. This allows messages to be delivered as
357+ # soon as they arrive instead of waiting to be picked up with the pickup
358+ # protocol.
359+
360+ # TODO add comments to the below code segments and tidy it up
361+
362+ try :
363+ import websockets
364+ import asyncio
365+ except ImportError :
366+ import warnings
367+
368+ warnings .warn (
369+ "Missing websockets or asyncio import, live-delivery will be unavailable" ,
370+ ImportWarning ,
371+ )
372+
373+
374+ async def handle_websocket (
375+ dmp : DIDCommMessaging ,
376+ relay_did : DID ,
377+ mediator_websocket : Awaitable ["websockets.WebSocketClientProtocol" ],
378+ live_delivery_message : bytes ,
379+ callback : Callable [[Dict [str , Any ]], Awaitable [None ]] = _message_callback ,
380+ ):
381+ """Loop over messages received and process them."""
382+ async with mediator_websocket as websocket :
383+ await websocket .send (live_delivery_message )
384+ LOG .debug ("Connected to WebSocket and requested Live Delivery!" )
385+ while True :
386+ message = await websocket .recv ()
387+ LOG .debug ("Received message over websocket" )
388+
389+ try :
390+ unpacked_message , metadata = await dmp .packaging .unpack (message )
391+ msg = unpacked_message .decode ()
392+ msg = json .loads (msg )
393+ LOG .debug ("Received websocket message %s" , msg ["type" ])
394+ if msg ["from" ] != relay_did :
395+ await callback (msg )
396+
397+ except Exception as err :
398+ LOG .error ("Error encountered while decrypting websocket message" )
399+ LOG .exception (err )
400+
401+ await websocket .close ()
402+
403+
404+ async def activate_websocket (
405+ dmp : DIDCommMessaging ,
406+ my_did : DID ,
407+ relay_did : DID ,
408+ callback : Callable [[Dict [str , Any ]], Awaitable [None ]] = _message_callback ,
409+ create_task : bool = False ,
410+ ) -> Union [Awaitable [None ], "asyncio.Task" ]:
411+ """Connect to a websocket and request message forwarding."""
412+
413+ message = {
414+ "type" : "https://didcomm.org/messagepickup/3.0/live-delivery-change" ,
415+ "id" : str (uuid .uuid4 ()),
416+ "body" : {
417+ "live_delivery" : True ,
418+ },
419+ "from" : my_did ,
420+ "to" : [relay_did ],
421+ }
422+ packed = await dmp .pack (
423+ message = message ,
424+ to = relay_did ,
425+ frm = my_did ,
426+ )
427+ endpoint = packed .get_endpoint ("ws" )
428+ LOG .info ("Relay Websocket Address: %s" , endpoint )
429+ if endpoint :
430+ LOG .info ("Found Relay websocket, connecting" )
431+ mediator_websocket = websockets .connect (uri = endpoint )
432+ websocket_handler = handle_websocket (
433+ dmp ,
434+ relay_did ,
435+ mediator_websocket ,
436+ packed .message ,
437+ callback ,
438+ )
439+ if create_task :
440+ return asyncio .create_task (websocket_handler )
441+ else :
442+ return websocket_handler
443+
444+
445+ async def websocket_loop (
446+ dmp : DIDCommMessaging ,
447+ my_did : DID ,
448+ relay_did : DID ,
449+ callback : Callable [[Dict [str , Any ]], Awaitable [None ]] = _message_callback ,
450+ ) -> None :
451+ """Run the websocket handler in a task and reconnect on failure."""
452+
453+ async def create_task ():
454+ return await activate_websocket (dmp , my_did , relay_did , callback , True )
455+
456+ mediator_websocket_proc = await create_task ()
457+
458+ while True :
459+ await asyncio .sleep (5 )
460+ if mediator_websocket_proc .done ():
461+ LOG .exception (mediator_websocket_proc .exception ())
462+ try :
463+ LOG .error ("Websocket died, re-establishing connection!" )
464+ except Exception :
465+ pass
466+ mediator_websocket_proc = await create_task ()
0 commit comments