Skip to content
This repository was archived by the owner on Aug 4, 2025. It is now read-only.

Commit 1aa8f27

Browse files
authored
cnap: add event log replay and verify RTMR value with measurement (#185)
Signed-off-by: Longyin Hu <[email protected]>
1 parent c0effba commit 1aa8f27

File tree

2 files changed

+134
-5
lines changed

2 files changed

+134
-5
lines changed

cnap/core/eventlog.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
"""A EventLog module.
2+
3+
This module provides functions related to event log processing. The event log can be fetched by
4+
Confidential Cloud-Native Primitives (CCNP), and is defined according to several tcg supported event
5+
log formats defined in TCG_PCClient Spec, Canonical Eventlog Spec, etc.
6+
7+
CCNP: https://github.com/intel/confidential-cloud-native-primitives
8+
TCG_PCClient Spec:
9+
https://trustedcomputinggroup.org/wp-content/uploads/TCG_PCClientSpecPlat_TPM_2p0_1p04_pub.pdf
10+
Canonical Eventlog Spec:
11+
https://trustedcomputinggroup.org/wp-content/uploads/TCG_IWG_CEL_v1_r0p41_pub.pdf
12+
13+
Functions:
14+
replay_event_log: Replay event logs by IMR index.
15+
verify_event_log: Verify event log by comparing IMR value from CCNP fetching and replayed from
16+
event log.
17+
"""
18+
import logging
19+
import base64
20+
from hashlib import sha1, sha256, sha384, sha512
21+
22+
from ccnp.eventlog.eventlog_sdk import CCEventLogEntry, CCAlgorithms
23+
from ccnp import Measurement, MeasurementType
24+
25+
LOG = logging.getLogger(__name__)
26+
27+
IMR_VERIFY_COUNT = 3
28+
29+
def replay_event_log(event_logs: list[CCEventLogEntry]) -> dict:
30+
"""Replay event logs by Integrated Measurement Register (IMR) index.
31+
32+
Args:
33+
event_logs (list[CCEventLogEntry]): Event logs fetched by CCNP.
34+
35+
Returns:
36+
dict: A dictionary containing the replay result displayed by IMR index and hash algorithm.
37+
Layer 1 key of the dict is the IMR index, the value is another dict which using the hash
38+
algorithm as the key and the replayed measurement as value.
39+
Sample value:
40+
{ 0: { 12: <measurement_replayed>}}
41+
"""
42+
measurement_dict = {}
43+
for event_log in event_logs:
44+
# pylint: disable-next=consider-iterating-dictionary
45+
if event_log.reg_idx not in measurement_dict.keys():
46+
measurement_dict[event_log.reg_idx] = {}
47+
48+
alg_id = event_log.alg_id.algo_id
49+
# Check algorithm type and prepare for replay
50+
if alg_id == CCAlgorithms.ALG_SHA1:
51+
algo = sha1()
52+
elif alg_id == CCAlgorithms.ALG_SHA384:
53+
algo = sha384()
54+
elif alg_id == CCAlgorithms.ALG_SHA256:
55+
algo = sha256()
56+
elif alg_id == CCAlgorithms.ALG_SHA512:
57+
algo = sha512()
58+
else:
59+
LOG.error("Unsupported hash algorithm %d", alg_id)
60+
continue
61+
62+
# Initialize value if alg_id not found in dict
63+
if alg_id not in measurement_dict[event_log.reg_idx].keys():
64+
measurement_dict[event_log.reg_idx][alg_id] = bytearray(
65+
event_log.alg_id.digest_size)
66+
67+
# Do replay and update the result into dict
68+
digest = list(map(int, event_log.digest.strip('[]').split(' ')))
69+
# pylint: disable-next=consider-using-f-string
70+
digest_hex = ''.join('{:02x}'.format(i) for i in digest)
71+
algo.update(measurement_dict[event_log.reg_idx][alg_id] + bytes.fromhex(digest_hex))
72+
measurement_dict[event_log.reg_idx][alg_id] = algo.digest()
73+
74+
return measurement_dict
75+
76+
def verify_event_log(measurement_dict: dict) -> bool:
77+
"""Verify event log by comparing IMR value from CCNP fetching and replayed via event log.
78+
79+
IMR: Integrated Measurement Register.
80+
81+
Args:
82+
measurement_dict (dict): The event logs replay result displayed by IMR index and hash
83+
algorithm.
84+
85+
Returns:
86+
bool: True if event log verify success, False if event log verify failed.
87+
"""
88+
LOG.info("Verify IMR measurement value and replayed value from event logs")
89+
for index in range(IMR_VERIFY_COUNT):
90+
# Fectch IMR measurement
91+
LOG.info("Fetch measurements in IMR[%d]", index)
92+
imr_measurement = base64.b64decode(Measurement.get_platform_measurement(
93+
MeasurementType.TYPE_TDX_RTMR, None, index))
94+
LOG.info("IMR[%d](measurement): %s", index, imr_measurement.hex())
95+
96+
# Get IMR value from replayed event log
97+
if index not in measurement_dict or measurement_dict[index] == {}:
98+
LOG.error("IMR[%d] verify failed, the replayed value from event logs doesn't exist",
99+
index)
100+
return False
101+
for value in measurement_dict[index].values():
102+
imr_replayed = value
103+
break
104+
105+
LOG.info("IMR[%d](replayed): %s", index, imr_replayed.hex())
106+
if imr_measurement == imr_replayed:
107+
LOG.info("IMR[%d] passed the verification.", index)
108+
else:
109+
LOG.error("IMR[%d] did not pass the verification.", index)
110+
return False
111+
112+
return True

cnap/core/keybroker.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@
1515
import struct
1616
import requests
1717

18-
from ccnp import Quote
18+
from ccnp import Eventlog, Quote
1919
from cryptography.hazmat.primitives import hashes
2020
from cryptography.hazmat.primitives import serialization
2121
from cryptography.hazmat.primitives.asymmetric import rsa
2222
from cryptography.hazmat.primitives.asymmetric import padding
2323
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
2424

25+
from core.eventlog import replay_event_log, verify_event_log
26+
2527
LOG = logging.getLogger(__name__)
2628

2729
# Set the connection timeout to 10s
@@ -68,17 +70,19 @@ class SimpleKeyBrokerClient(KeyBrokerClientBase):
6870
key (SWK) to encrypt the user key (wrapped_key).
6971
- Encrypt the SWK by the public key from client (wrapped_swk).
7072
For a key broker client, here is an example flow to get a key from KBS:
73+
- Get and replay all event logs, and verify by the measurement register.
7174
- Generate 2048 bit RSA key pair (a public key and a private key).
7275
- Encode the public key to base64 for transferring (user_data).
7376
- Get quote in the TEE with the hash of the public key for measurement (quote).
7477
- Request wrapped_key and wrapped_swk from KBS with quote and user_data.
7578
- Decrypt the user key by the SWK.
7679
"""
77-
def get_key(self, server_url: str, key_id: str) -> bytes:
80+
def get_key(self, server_url: str, key_id: str) -> bytes: # pylint: disable=too-many-locals
7881
"""Get model key by key ID from the KBS.
7982
80-
This method construct the request headers and body to request the wrapped_key and
81-
wrapped_swk from KBS, decrypt the user key by SWK and return the key.
83+
This method get and replay all event logs, and verify by the measurement register, then
84+
construct the request headers and body to request the wrapped_key and wrapped_swk from KBS,
85+
decrypt the user key by SWK and return the key.
8286
8387
A example requests and response:
8488
- request headers:
@@ -105,14 +109,27 @@ def get_key(self, server_url: str, key_id: str) -> bytes:
105109
106110
Raises:
107111
ValueError: If the server_url or key_id is None.
108-
RuntimeError: If get quote or get key failed.
112+
RuntimeError: If get or verify event log failed, and if get quote or get key failed.
109113
NotImplementedError: If the subclasses don't implement the method.
110114
"""
111115
if server_url is None:
112116
raise ValueError("KBS server url can not be None")
113117
if key_id is None:
114118
raise ValueError("KBS key id can not be None")
115119

120+
# Get and verify event logs before get quote.
121+
# The exectuion environment judgment will be implemented by ccnp in the future.
122+
LOG.debug("Getting event log by CCNP")
123+
event_logs = Eventlog.get_platform_eventlog()
124+
if event_logs is None:
125+
raise RuntimeError("Get event log failed")
126+
measurement_dict = replay_event_log(event_logs)
127+
if verify_event_log(measurement_dict):
128+
LOG.info("Event log verify successfully.\n")
129+
else:
130+
LOG.error("Event log verify failed.\n")
131+
raise RuntimeError("Event log verify failed.")
132+
116133
private_key = rsa.generate_private_key(public_exponent=65537, key_size=3072)
117134
pubkey = private_key.public_key()
118135
pubkey_der = pubkey.public_bytes(encoding=serialization.Encoding.DER,

0 commit comments

Comments
 (0)