6969from .messages .read_all_data_agreement_template_response import ReadAllDataAgreementTemplateResponseMessage
7070from .messages .data_controller_details import DataControllerDetailsMessage
7171from .messages .data_controller_details_response import DataControllerDetailsResponseMessage
72+ from .messages .existing_connections import ExistingConnectionsMessage
7273
7374from .models .data_agreement_model import DATA_AGREEMENT_V1_SCHEMA_CONTEXT , DataAgreementEventSchema , DataAgreementV1 , DataAgreementPersonalData , DataAgreementV1Schema
7475from .models .read_data_agreement_model import ReadDataAgreementBody
8889from .models .json_ld_processed_response_model import JSONLDProcessedResponseBody
8990from .models .json_ld_processed_model import JSONLDProcessedBody
9091from .models .data_controller_model import DataController , DataControllerSchema
92+ from .models .existing_connections_model import ExistingConnectionsBody
9193
9294from .utils .diddoc import DIDDoc
9395from .utils .did .mydata_did import DIDMyData
@@ -146,6 +148,9 @@ class ADAManager:
146148 # Record for data controller details
147149 RECORD_TYPE_DATA_CONTROLLER_DETAILS = "data_controller_details"
148150
151+ # Record for existing connection details.
152+ RECORD_TYPE_EXISTING_CONNECTION = "existing_connection"
153+
149154 DATA_AGREEMENT_RECORD_TYPE = "dataagreement_record"
150155
151156 def __init__ (self , context : InjectionContext ) -> None :
@@ -1549,7 +1554,7 @@ async def create_and_store_data_agreement_in_wallet(self, data_agreement: dict,
15491554 await data_agreement_v1_record .save (self .context )
15501555
15511556 return data_agreement_v1_record
1552-
1557+
15531558 async def publish_data_agreement_in_wallet (self , data_agreement_id : str ) -> DataAgreementV1Record :
15541559 """
15551560 Publish a data agreement in the wallet.
@@ -1579,13 +1584,15 @@ async def publish_data_agreement_in_wallet(self, data_agreement_id: str) -> Data
15791584 {"data_agreement_id" : data_agreement_id }
15801585 ).fetch_single ()
15811586
1582- personal_data_new_list_for_proof_request = json .loads (storage_record .value )
1587+ personal_data_new_list_for_proof_request = json .loads (
1588+ storage_record .value )
15831589
15841590 # Delete the temporary record
15851591 await storage .delete_record (storage_record )
15861592
15871593 # Generate data agreement model class instance
1588- data_agreement : DataAgreementV1 = DataAgreementV1Schema ().load (data_agreement_record .data_agreement )
1594+ data_agreement : DataAgreementV1 = DataAgreementV1Schema ().load (
1595+ data_agreement_record .data_agreement )
15891596
15901597 if data_agreement .method_of_use == DataAgreementV1Record .METHOD_OF_USE_DATA_SOURCE :
15911598 # If method-of-use is "data-source", then create a schema and credential defintion
@@ -4100,9 +4107,8 @@ async def send_read_all_data_agreement_template_message(self, conn_id: str) -> N
41004107 # Send JSONLD Processed Message
41014108 if responder :
41024109 await responder .send_reply (read_all_data_agreement_template_message , connection_id = connection_record .connection_id )
4103-
41044110
4105- async def fetch_org_details_from_igrantio (self )-> str :
4111+ async def fetch_org_details_from_igrantio (self ) -> str :
41064112 """
41074113 Fetch org details from iGrant.io.
41084114 """
@@ -4139,7 +4145,7 @@ async def fetch_org_details_from_igrantio(self)-> str:
41394145
41404146 for exclude_key in exclude_keys :
41414147 organization_details .pop (exclude_key , None )
4142-
4148+
41434149 data_controller = DataController (
41444150 organisation_id = organization_details ["ID" ],
41454151 organisation_name = organization_details ["Name" ],
@@ -4151,7 +4157,7 @@ async def fetch_org_details_from_igrantio(self)-> str:
41514157 policy_url = organization_details ["PolicyURL" ],
41524158 eula_url = organization_details ["EulaURL" ]
41534159 ).to_json ()
4154-
4160+
41554161 return data_controller
41564162
41574163 async def create_or_update_data_controller_details_in_wallet (self ) -> StorageRecord :
@@ -4169,7 +4175,6 @@ async def create_or_update_data_controller_details_in_wallet(self) -> StorageRec
41694175 self .RECORD_TYPE_DATA_CONTROLLER_DETAILS
41704176 ).fetch_single ()
41714177
4172-
41734178 data_controller = await self .fetch_org_details_from_igrantio ()
41744179
41754180 await storage .update_record_value (storage_record , data_controller )
@@ -4197,14 +4202,13 @@ async def create_or_update_data_controller_details_in_wallet(self) -> StorageRec
41974202 result ,
41984203 )
41994204
4200-
42014205 await storage .add_record (storage_record )
42024206
42034207 return result
4204-
4208+
42054209 except ADAManagerError as err :
42064210 pass
4207-
4211+
42084212 return result
42094213
42104214 async def process_data_controller_details_message (self , data_controller_details_message : DataControllerDetailsMessage , receipt : MessageReceipt ) -> None :
@@ -4225,7 +4229,8 @@ async def process_data_controller_details_message(self, data_controller_details_
42254229 data_controller_details : str = await self .create_or_update_data_controller_details_in_wallet ()
42264230
42274231 # Construct Data Controller model class
4228- data_controller : DataController = DataControllerSchema ().load (json .loads (data_controller_details ))
4232+ data_controller : DataController = DataControllerSchema ().load (
4233+ json .loads (data_controller_details ))
42294234
42304235 # Construct DataControllerDetailsResponseMessage
42314236 data_controller_details_response_message = DataControllerDetailsResponseMessage (
@@ -4239,7 +4244,7 @@ async def process_data_controller_details_message(self, data_controller_details_
42394244
42404245 if responder :
42414246 await responder .send_reply (data_controller_details_response_message , connection_id = self .context .connection_record .connection_id )
4242-
4247+
42434248 async def send_data_controller_details_message (self , conn_id : str ) -> None :
42444249 """Send data controller details message."""
42454250
@@ -4278,3 +4283,124 @@ async def send_data_controller_details_message(self, conn_id: str) -> None:
42784283 # Send message
42794284 if responder :
42804285 await responder .send_reply (data_controller_details_message , connection_id = connection_record .connection_id )
4286+
4287+ async def process_existing_connections_message (self , existing_connections_message : ExistingConnectionsMessage , receipt : MessageReceipt ) -> None :
4288+ """Processing connections/1.0/exists message."""
4289+
4290+ # Storage instance
4291+ storage = await self .context .inject (BaseStorage )
4292+
4293+ invitation_key = receipt .recipient_verkey
4294+
4295+ # fetch current connection record using invitation key
4296+ connection = await ConnectionRecord .retrieve_by_invitation_key (
4297+ self .context , invitation_key )
4298+
4299+ # Fetch existing connections record for the current connection.
4300+
4301+ existing_connection = await storage .search_records (
4302+ type_filter = self .RECORD_TYPE_EXISTING_CONNECTION ,
4303+ tag_query = {
4304+ "connection_id" : connection .connection_id
4305+ }
4306+ ).fetch_all ()
4307+
4308+ if existing_connection :
4309+ # delete existing connections record
4310+ existing_connection = existing_connection [0 ]
4311+ await storage .delete_record (existing_connection )
4312+
4313+ existing_connection = None
4314+
4315+ # fetch the existing connection by did
4316+ existing_connection = await ConnectionRecord .retrieve_by_did (
4317+ self .context , their_did = None , my_did = existing_connections_message .body .theirdid )
4318+
4319+ # create existing_connections record with connection_id, did, connection_status available
4320+ record_tags = {
4321+ "existing_connection_id" : existing_connection .connection_id ,
4322+ "my_did" : existing_connection .my_did ,
4323+ "connection_status" : "available" ,
4324+ "connection_id" : connection .connection_id
4325+ }
4326+
4327+ record = StorageRecord (
4328+ self .RECORD_TYPE_EXISTING_CONNECTION ,
4329+ connection .connection_id ,
4330+ record_tags
4331+ )
4332+ await storage .add_record (record )
4333+
4334+ # updating the current connection invitation status to inactive
4335+ connection .state = ConnectionRecord .STATE_INACTIVE
4336+ await connection .save (context = self .context )
4337+
4338+ async def send_existing_connections_message (self , theirdid : str , connection_id : str ) -> None :
4339+ """Send existing connections message."""
4340+
4341+ # Responder instance
4342+ responder : DispatcherResponder = await self .context .inject (BaseResponder , required = False )
4343+
4344+ try :
4345+ connection_mgr = ConnectionManager (self .context )
4346+
4347+ # Retrieve connection record by id
4348+ connection_record : ConnectionRecord = await ConnectionRecord .retrieve_by_id (
4349+ self .context ,
4350+ connection_id
4351+ )
4352+
4353+ connection_invitation : ConnectionInvitation = await connection_record .retrieve_invitation (self .context )
4354+
4355+ request = await connection_mgr .create_request (connection_record )
4356+
4357+ except StorageError as err :
4358+
4359+ raise ADAManagerError (
4360+ f"Failed to retrieve connection record: { err } "
4361+ )
4362+
4363+ # From and to mydata dids
4364+ from_did : DIDMyData = DIDMyData .from_public_key_b58 (
4365+ request .connection .did , key_type = KeyType .ED25519
4366+ )
4367+ to_did : DIDMyData = DIDMyData .from_public_key_b58 (
4368+ request .connection .did , key_type = KeyType .ED25519
4369+ )
4370+
4371+ # Construct ExistingConnectionsMessage Message
4372+ existing_connections_message = ExistingConnectionsMessage (
4373+ from_did = from_did .did ,
4374+ to_did = to_did .did ,
4375+ created_time = round (time .time () * 1000 ),
4376+ body = ExistingConnectionsBody (
4377+ theirdid = theirdid
4378+ )
4379+ )
4380+
4381+ # Send message
4382+ if responder :
4383+ await responder .send_reply (existing_connections_message , connection_id = connection_record .connection_id )
4384+
4385+ async def fetch_existing_connections_record_for_current_connection (self , connection_id : str ) -> dict :
4386+ """
4387+ Fetch existing connections record for the current connection.
4388+ """
4389+
4390+ # Storage instance
4391+ storage = await self .context .inject (BaseStorage )
4392+
4393+ # Fetch existing connections record for the current connection.
4394+
4395+ existing_connection = await storage .search_records (
4396+ type_filter = self .RECORD_TYPE_EXISTING_CONNECTION ,
4397+ tag_query = {
4398+ "connection_id" : connection_id
4399+ }
4400+ ).fetch_all ()
4401+
4402+ if existing_connection :
4403+ existing_connection = existing_connection [0 ]
4404+ return existing_connection .tags
4405+ else :
4406+ return {}
0 commit comments