Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions mydata_did/v1_0/kafka_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from confluent_kafka import Producer
import os
import json
import logging
from enum import Enum
from dataclasses import dataclass, asdict
import typing

LOGGER = logging.getLogger(__name__)

class DataAgreementOperations(Enum):
DACREATE = "DataAgreementCreate"
DAUPDATE = "DataAgreementUpdate"
DADELETE = "DataAgreementDelete"
DAPUBLISH = "DataAgreementPublish"
DAPERSONALDATAUPDATE = "DataAgreementPersonalDataUpdate"
DAPERSONALDATADELETE = "DataAgreementPersonalDataDelete"


@dataclass
class KafkaMessage:
payload: str
org_id: str

@dataclass
class PublishEventToKafkaTopic:
key: str
message: str
topic: typing.Optional[str] = None

async def publish_event_to_kafka_topic(publish_payload: PublishEventToKafkaTopic):
kafka_server_address = os.environ.get("KAFKA_SERVER_ADDRESS", 'localhost:9092')
igrantio_org_id = os.environ.get("IGRANTIO_ORG_ID")

kafka_message = KafkaMessage(payload=publish_payload.message, org_id=igrantio_org_id)
kafka_message_str = json.dumps(asdict(kafka_message))

kafka_producer_configuration = {
'bootstrap.servers': kafka_server_address,
}
kafka_producer = Producer(kafka_producer_configuration)

def kafka_event_delivery_callback_handler(err: str, msg: str):
if err is not None:
log_message = f"Message delivery failed: {err}"
else:
log_message = f'Message delivered to {msg.topic()}'
LOGGER.debug(log_message)

# Publish event to Kafka topic
kafka_producer.produce(publish_payload.topic,key=publish_payload.key, value=kafka_message_str, callback=kafka_event_delivery_callback_handler)

# Flush to ensure that the message is sent to the Kafka broker
kafka_producer.flush()

async def publish_event_to_data_agreement_topic(publish_payload: PublishEventToKafkaTopic):
publish_payload.topic = "data_agreement"
await publish_event_to_kafka_topic(publish_payload)
14 changes: 12 additions & 2 deletions mydata_did/v1_0/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4055,7 +4055,15 @@ def serialize_data_agreement_record(self, *, data_agreement_records: typing.List

return data_agreement_record_list if is_list else data_agreement_record_list[0]

async def create_data_agreement_and_personal_data_records(self, *, data_agreement: dict, existing_schema_id: str = None, draft: bool = False, existing_version: int = None, existing_data_agreement_id: str = None, update_ssi_payload: bool = True, existing_data_agreement_record: DataAgreementV1Record = None) -> typing.Tuple[DataAgreementV1Record, dict]:
async def create_data_agreement_and_personal_data_records(self,
*,
data_agreement: dict,
existing_schema_id: str = None,
draft: bool = False,
existing_version: int = None,
existing_data_agreement_id: str = None,
update_ssi_payload: bool = True,
existing_data_agreement_record: DataAgreementV1Record = None) -> typing.Tuple[DataAgreementV1Record, dict]:
"""
Create data agreement and personal data records.

Expand Down Expand Up @@ -4526,7 +4534,7 @@ async def delete_da_personal_data_in_wallet(self, *, personal_data_id: str) -> N

if len(personal_data) != 0:
# Create new data agreement with incremented version
await self.create_data_agreement_and_personal_data_records(
(new_data_agreement_record, new_data_agreement_dict) = await self.create_data_agreement_and_personal_data_records(
data_agreement=data_agreement_dict.serialize(),
draft=data_agreement_record.is_draft,
existing_version=data_agreement_dict.data_agreement_template_version,
Expand All @@ -4538,6 +4546,8 @@ async def delete_da_personal_data_in_wallet(self, *, personal_data_id: str) -> N
data_agreement_record._publish_flag = False
await data_agreement_record.save(self.context)

return data_agreement_record.data_agreement_id, personal_data_record.attribute_name

except StorageError as e:
raise ADAManagerError(
f"Failed to delete data agreement; Reason: {e.roll_up}"
Expand Down
59 changes: 58 additions & 1 deletion mydata_did/v1_0/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from .utils.util import str_to_bool, bool_to_str, comma_separated_str_to_list, get_slices
from .utils.regex import MYDATA_DID
from .utils.jsonld.data_agreement import sign_data_agreement, verify_data_agreement, verify_data_agreement_with_proof_chain
from .kafka_publisher import publish_event_to_data_agreement_topic, DataAgreementOperations, PublishEventToKafkaTopic


LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -1572,6 +1573,16 @@ async def create_and_store_data_agreement_in_wallet_v2(request: web.BaseRequest)

if not data_agreement_v2_record:
raise web.HTTPBadRequest(reason="Data agreement not created")

# Notify the iGrant.io backend about the creation of data agreement.

daObject = {
"data_agreement_id": data_agreement_v2_record.data_agreement_id,
"data_agreement": data_agreement_v2_record.data_agreement,
"publish_flag": data_agreement_v2_record.publish_flag,
}
publish_payload = PublishEventToKafkaTopic(key=DataAgreementOperations.DACREATE.value, message=json.dumps(daObject))
await publish_event_to_data_agreement_topic(publish_payload)

except ADAManagerError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
Expand Down Expand Up @@ -1624,6 +1635,15 @@ async def publish_data_agreement_handler(request: web.BaseRequest):

if not data_agreement_v1_record:
raise web.HTTPBadRequest(reason="Data agreement not published")

# Notify the iGrant.io backend about the publish of data agreement.

daObject = {
"data_agreement_id": data_agreement_id,
"publish_flag": data_agreement_v1_record.publish_flag,
}
publish_payload = PublishEventToKafkaTopic(key=DataAgreementOperations.DAPUBLISH.value, message=json.dumps(daObject))
await publish_event_to_data_agreement_topic(publish_payload)

except ADAManagerError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
Expand Down Expand Up @@ -1773,6 +1793,17 @@ async def update_data_agreement_in_wallet_v2(request: web.BaseRequest):
draft=draft
)

# Notify the iGrant.io backend about the updation of data agreement.

daObject = {
"data_agreement_id": data_agreement_v2_record.data_agreement_id,
"data_agreement": data_agreement_v2_record.data_agreement,
"publish_flag": data_agreement_v2_record.publish_flag,
}
publish_payload = PublishEventToKafkaTopic(key=DataAgreementOperations.DAUPDATE.value, message=json.dumps(daObject))
await publish_event_to_data_agreement_topic(publish_payload)


except ADAManagerError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err

Expand Down Expand Up @@ -1812,6 +1843,14 @@ async def delete_data_agreement_in_wallet(request: web.BaseRequest):
# Delete data agreement in the wallet
await mydata_did_manager.delete_data_agreement_in_wallet(data_agreement_id=data_agreement_id)

# Notify iGrant.io backend about data agreement deletion

daObject = {
"data_agreement_id": data_agreement_id
}
publish_payload = PublishEventToKafkaTopic(key=DataAgreementOperations.DADELETE.value, message=json.dumps(daObject))
await publish_event_to_data_agreement_topic(publish_payload)

except ADAManagerError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err

Expand Down Expand Up @@ -2023,6 +2062,16 @@ async def update_da_personal_data_in_wallet(request: web.BaseRequest):
updated_description=attribute_description
)

# Notify the iGrant.io backend about the updation of data agreement personal data
daObject = {
"data_agreement_id": personal_data_dict['data_agreement']['data_agreement_id'],
"attribute_id": personal_data_dict["attribute_id"],
"attribute_name": personal_data_dict["attribute_name"],
"attribute_description": personal_data_dict["attribute_description"],
}
publish_payload = PublishEventToKafkaTopic(key=DataAgreementOperations.DAPERSONALDATAUPDATE.value, message=json.dumps(daObject))
await publish_event_to_data_agreement_topic(publish_payload)

except ADAManagerError as err:

raise web.HTTPBadRequest(reason=err.roll_up) from err
Expand Down Expand Up @@ -2065,9 +2114,17 @@ async def delete_da_personal_data_in_wallet(request: web.BaseRequest):

try:

await ada_mgr.delete_da_personal_data_in_wallet(
data_agreement_id, attribute_name = await ada_mgr.delete_da_personal_data_in_wallet(
personal_data_id=personal_data_id
)
# Notify the iGrant.io backend about the deletion of data agreement personal data

daObject = {
"data_agreement_id": data_agreement_id,
"attribute_name": attribute_name
}
publish_payload = PublishEventToKafkaTopic(key=DataAgreementOperations.DAPERSONALDATADELETE.value, message=json.dumps(daObject))
await publish_event_to_data_agreement_topic(publish_payload)

except ADAManagerError as err:
raise web.HTTPBadRequest(reason=err.roll_up) from err
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,5 @@ wrapt==1.12.1
yarl==1.5.1
py-multibase==1.0.3
validators==0.18.2
semver==2.13.0
semver==2.13.0
confluent-kafka==2.1.1