diff --git a/mydata_did/v1_0/kafka_publisher.py b/mydata_did/v1_0/kafka_publisher.py new file mode 100644 index 0000000..7d3a3af --- /dev/null +++ b/mydata_did/v1_0/kafka_publisher.py @@ -0,0 +1,58 @@ +from confluent_kafka import Producer +import os +import json +import logging +from enum import Enum +from dataclasses import dataclass, asdict +import typing + +LOGGER = logging.getLogger(__name__) + +class DataAgreementOperations(Enum): + DACREATE = "DataAgreementCreate" + DAUPDATE = "DataAgreementUpdate" + DADELETE = "DataAgreementDelete" + DAPUBLISH = "DataAgreementPublish" + DAPERSONALDATAUPDATE = "DataAgreementPersonalDataUpdate" + DAPERSONALDATADELETE = "DataAgreementPersonalDataDelete" + + +@dataclass +class KafkaMessage: + payload: str + org_id: str + +@dataclass +class PublishEventToKafkaTopic: + key: str + message: str + topic: typing.Optional[str] = None + +async def publish_event_to_kafka_topic(publish_payload: PublishEventToKafkaTopic): + kafka_server_address = os.environ.get("KAFKA_SERVER_ADDRESS", 'localhost:9092') + igrantio_org_id = os.environ.get("IGRANTIO_ORG_ID") + + kafka_message = KafkaMessage(payload=publish_payload.message, org_id=igrantio_org_id) + kafka_message_str = json.dumps(asdict(kafka_message)) + + kafka_producer_configuration = { + 'bootstrap.servers': kafka_server_address, + } + kafka_producer = Producer(kafka_producer_configuration) + + def kafka_event_delivery_callback_handler(err: str, msg: str): + if err is not None: + log_message = f"Message delivery failed: {err}" + else: + log_message = f'Message delivered to {msg.topic()}' + LOGGER.debug(log_message) + + # Publish event to Kafka topic + kafka_producer.produce(publish_payload.topic,key=publish_payload.key, value=kafka_message_str, callback=kafka_event_delivery_callback_handler) + + # Flush to ensure that the message is sent to the Kafka broker + kafka_producer.flush() + +async def publish_event_to_data_agreement_topic(publish_payload: PublishEventToKafkaTopic): + publish_payload.topic = "data_agreement" + await publish_event_to_kafka_topic(publish_payload) \ No newline at end of file diff --git a/mydata_did/v1_0/manager.py b/mydata_did/v1_0/manager.py index 273648a..6cf2a4f 100644 --- a/mydata_did/v1_0/manager.py +++ b/mydata_did/v1_0/manager.py @@ -4055,7 +4055,15 @@ def serialize_data_agreement_record(self, *, data_agreement_records: typing.List return data_agreement_record_list if is_list else data_agreement_record_list[0] - async def create_data_agreement_and_personal_data_records(self, *, data_agreement: dict, existing_schema_id: str = None, draft: bool = False, existing_version: int = None, existing_data_agreement_id: str = None, update_ssi_payload: bool = True, existing_data_agreement_record: DataAgreementV1Record = None) -> typing.Tuple[DataAgreementV1Record, dict]: + async def create_data_agreement_and_personal_data_records(self, + *, + data_agreement: dict, + existing_schema_id: str = None, + draft: bool = False, + existing_version: int = None, + existing_data_agreement_id: str = None, + update_ssi_payload: bool = True, + existing_data_agreement_record: DataAgreementV1Record = None) -> typing.Tuple[DataAgreementV1Record, dict]: """ Create data agreement and personal data records. @@ -4526,7 +4534,7 @@ async def delete_da_personal_data_in_wallet(self, *, personal_data_id: str) -> N if len(personal_data) != 0: # Create new data agreement with incremented version - await self.create_data_agreement_and_personal_data_records( + (new_data_agreement_record, new_data_agreement_dict) = await self.create_data_agreement_and_personal_data_records( data_agreement=data_agreement_dict.serialize(), draft=data_agreement_record.is_draft, existing_version=data_agreement_dict.data_agreement_template_version, @@ -4538,6 +4546,8 @@ async def delete_da_personal_data_in_wallet(self, *, personal_data_id: str) -> N data_agreement_record._publish_flag = False await data_agreement_record.save(self.context) + return data_agreement_record.data_agreement_id, personal_data_record.attribute_name + except StorageError as e: raise ADAManagerError( f"Failed to delete data agreement; Reason: {e.roll_up}" diff --git a/mydata_did/v1_0/routes.py b/mydata_did/v1_0/routes.py index 6b83480..f6d8b67 100644 --- a/mydata_did/v1_0/routes.py +++ b/mydata_did/v1_0/routes.py @@ -53,6 +53,7 @@ from .utils.util import str_to_bool, bool_to_str, comma_separated_str_to_list, get_slices from .utils.regex import MYDATA_DID from .utils.jsonld.data_agreement import sign_data_agreement, verify_data_agreement, verify_data_agreement_with_proof_chain +from .kafka_publisher import publish_event_to_data_agreement_topic, DataAgreementOperations, PublishEventToKafkaTopic LOGGER = logging.getLogger(__name__) @@ -1572,6 +1573,16 @@ async def create_and_store_data_agreement_in_wallet_v2(request: web.BaseRequest) if not data_agreement_v2_record: raise web.HTTPBadRequest(reason="Data agreement not created") + + # Notify the iGrant.io backend about the creation of data agreement. + + daObject = { + "data_agreement_id": data_agreement_v2_record.data_agreement_id, + "data_agreement": data_agreement_v2_record.data_agreement, + "publish_flag": data_agreement_v2_record.publish_flag, + } + publish_payload = PublishEventToKafkaTopic(key=DataAgreementOperations.DACREATE.value, message=json.dumps(daObject)) + await publish_event_to_data_agreement_topic(publish_payload) except ADAManagerError as err: raise web.HTTPBadRequest(reason=err.roll_up) from err @@ -1624,6 +1635,15 @@ async def publish_data_agreement_handler(request: web.BaseRequest): if not data_agreement_v1_record: raise web.HTTPBadRequest(reason="Data agreement not published") + + # Notify the iGrant.io backend about the publish of data agreement. + + daObject = { + "data_agreement_id": data_agreement_id, + "publish_flag": data_agreement_v1_record.publish_flag, + } + publish_payload = PublishEventToKafkaTopic(key=DataAgreementOperations.DAPUBLISH.value, message=json.dumps(daObject)) + await publish_event_to_data_agreement_topic(publish_payload) except ADAManagerError as err: raise web.HTTPBadRequest(reason=err.roll_up) from err @@ -1773,6 +1793,17 @@ async def update_data_agreement_in_wallet_v2(request: web.BaseRequest): draft=draft ) + # Notify the iGrant.io backend about the updation of data agreement. + + daObject = { + "data_agreement_id": data_agreement_v2_record.data_agreement_id, + "data_agreement": data_agreement_v2_record.data_agreement, + "publish_flag": data_agreement_v2_record.publish_flag, + } + publish_payload = PublishEventToKafkaTopic(key=DataAgreementOperations.DAUPDATE.value, message=json.dumps(daObject)) + await publish_event_to_data_agreement_topic(publish_payload) + + except ADAManagerError as err: raise web.HTTPBadRequest(reason=err.roll_up) from err @@ -1812,6 +1843,14 @@ async def delete_data_agreement_in_wallet(request: web.BaseRequest): # Delete data agreement in the wallet await mydata_did_manager.delete_data_agreement_in_wallet(data_agreement_id=data_agreement_id) + # Notify iGrant.io backend about data agreement deletion + + daObject = { + "data_agreement_id": data_agreement_id + } + publish_payload = PublishEventToKafkaTopic(key=DataAgreementOperations.DADELETE.value, message=json.dumps(daObject)) + await publish_event_to_data_agreement_topic(publish_payload) + except ADAManagerError as err: raise web.HTTPBadRequest(reason=err.roll_up) from err @@ -2023,6 +2062,16 @@ async def update_da_personal_data_in_wallet(request: web.BaseRequest): updated_description=attribute_description ) + # Notify the iGrant.io backend about the updation of data agreement personal data + daObject = { + "data_agreement_id": personal_data_dict['data_agreement']['data_agreement_id'], + "attribute_id": personal_data_dict["attribute_id"], + "attribute_name": personal_data_dict["attribute_name"], + "attribute_description": personal_data_dict["attribute_description"], + } + publish_payload = PublishEventToKafkaTopic(key=DataAgreementOperations.DAPERSONALDATAUPDATE.value, message=json.dumps(daObject)) + await publish_event_to_data_agreement_topic(publish_payload) + except ADAManagerError as err: raise web.HTTPBadRequest(reason=err.roll_up) from err @@ -2065,9 +2114,17 @@ async def delete_da_personal_data_in_wallet(request: web.BaseRequest): try: - await ada_mgr.delete_da_personal_data_in_wallet( + data_agreement_id, attribute_name = await ada_mgr.delete_da_personal_data_in_wallet( personal_data_id=personal_data_id ) + # Notify the iGrant.io backend about the deletion of data agreement personal data + + daObject = { + "data_agreement_id": data_agreement_id, + "attribute_name": attribute_name + } + publish_payload = PublishEventToKafkaTopic(key=DataAgreementOperations.DAPERSONALDATADELETE.value, message=json.dumps(daObject)) + await publish_event_to_data_agreement_topic(publish_payload) except ADAManagerError as err: raise web.HTTPBadRequest(reason=err.roll_up) from err diff --git a/requirements.txt b/requirements.txt index cbaddc3..44b464e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -56,4 +56,5 @@ wrapt==1.12.1 yarl==1.5.1 py-multibase==1.0.3 validators==0.18.2 -semver==2.13.0 \ No newline at end of file +semver==2.13.0 +confluent-kafka==2.1.1 \ No newline at end of file