Skip to content

Commit a061df7

Browse files
committed
ACVP: Adjust to actual ACVP flow
This commit ports pq-code-package/mlkem-native#1052 With HashML-DSA support added in #498, we now have full coverage of the ACVP tests allowing us to switch to the actual ACVP flow: In real ACVP validation, the internalProjection is not available. Rather, one gets a prompt containing the inputs and has to produce a result json that has the match the expected results hold by the ACVP server. This commit modifies the acvp_client.py to follow this flow and aligns it with the client in mlkem-native. This in theory allows to perform CAVP validation without any changes, but I have not done that yet. Resolves #294 Signed-off-by: Matthias J. Kannwischer <[email protected]>
1 parent 91cece9 commit a061df7

File tree

1 file changed

+155
-71
lines changed

1 file changed

+155
-71
lines changed

test/acvp_client.py

Lines changed: 155 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
#!/usr/bin/env python3
2+
# Copyright (c) The mlkem-native project authors
23
# Copyright (c) The mldsa-native project authors
34
# SPDX-License-Identifier: Apache-2.0 OR ISC OR MIT
45

56
# ACVP client for ML-DSA
6-
#
7-
# Processes 'internalProjection.json' files from
8-
# https://github.com/usnistgov/ACVP-Server/blob/master/gen-val/json-files
9-
#
7+
# See https://pages.nist.gov/ACVP/draft-celi-acvp-ml-dsa.html and
8+
# https://github.com/usnistgov/ACVP-Server/tree/master/gen-val/json-files
109
# Invokes `acvp_mldsa{lvl}` under the hood.
1110

1211
import argparse
@@ -20,7 +19,7 @@
2019

2120
# Check if we need to use a wrapper for execution (e.g. QEMU)
2221
exec_prefix = os.environ.get("EXEC_WRAPPER", "")
23-
exec_prefix = [exec_prefix] if exec_prefix != "" else []
22+
exec_prefix = exec_prefix.split(" ") if exec_prefix != "" else []
2423

2524

2625
def download_acvp_files(version="v1.1.0.40"):
@@ -29,9 +28,12 @@ def download_acvp_files(version="v1.1.0.40"):
2928

3029
# Files we need to download for ML-KEM
3130
files_to_download = [
32-
"ML-DSA-keyGen-FIPS204/internalProjection.json",
33-
"ML-DSA-sigGen-FIPS204/internalProjection.json",
34-
"ML-DSA-sigVer-FIPS204/internalProjection.json",
31+
"ML-DSA-keyGen-FIPS204/prompt.json",
32+
"ML-DSA-keyGen-FIPS204/expectedResults.json",
33+
"ML-DSA-sigGen-FIPS204/prompt.json",
34+
"ML-DSA-sigGen-FIPS204/expectedResults.json",
35+
"ML-DSA-sigVer-FIPS204/prompt.json",
36+
"ML-DSA-sigVer-FIPS204/expectedResults.json",
3537
]
3638

3739
# Create directory structure
@@ -65,22 +67,36 @@ def download_acvp_files(version="v1.1.0.40"):
6567
return True
6668

6769

68-
def loadAcvpData(internalProjection):
69-
with open(internalProjection, "r") as f:
70-
internalProjectionData = json.load(f)
71-
return (internalProjection, internalProjectionData)
70+
def loadAcvpData(prompt, expectedResults):
71+
with open(prompt, "r") as f:
72+
promptData = json.load(f)
73+
expectedResultsData = None
74+
if expectedResults is not None:
75+
with open(expectedResults, "r") as f:
76+
expectedResultsData = json.load(f)
77+
78+
return (prompt, promptData, expectedResults, expectedResultsData)
7279

7380

7481
def loadDefaultAcvpData(version="v1.1.0.40"):
7582
data_dir = f"test/.acvp-data/{version}/files"
7683
acvp_jsons_for_version = [
77-
f"{data_dir}/ML-DSA-keyGen-FIPS204/internalProjection.json",
78-
f"{data_dir}/ML-DSA-sigGen-FIPS204/internalProjection.json",
79-
f"{data_dir}/ML-DSA-sigVer-FIPS204/internalProjection.json",
84+
(
85+
f"{data_dir}/ML-DSA-keyGen-FIPS204/prompt.json",
86+
f"{data_dir}/ML-DSA-keyGen-FIPS204/expectedResults.json",
87+
),
88+
(
89+
f"{data_dir}/ML-DSA-sigGen-FIPS204/prompt.json",
90+
f"{data_dir}/ML-DSA-sigGen-FIPS204/expectedResults.json",
91+
),
92+
(
93+
f"{data_dir}/ML-DSA-sigVer-FIPS204/prompt.json",
94+
f"{data_dir}/ML-DSA-sigVer-FIPS204/expectedResults.json",
95+
),
8096
]
8197
acvp_data = []
82-
for internalProjection in acvp_jsons_for_version:
83-
acvp_data.append(loadAcvpData(internalProjection))
98+
for prompt, expectedResults in acvp_jsons_for_version:
99+
acvp_data.append(loadAcvpData(prompt, expectedResults))
84100
return acvp_data
85101

86102

@@ -107,6 +123,8 @@ def get_acvp_binary(tg):
107123

108124
def run_keyGen_test(tg, tc):
109125
info(f"Running keyGen test case {tc['tcId']} ... ", end="")
126+
127+
results = {"tcId": tc["tcId"]}
110128
acvp_bin = get_acvp_binary(tg)
111129
assert tg["testType"] == "AFT"
112130
acvp_call = exec_prefix + [
@@ -120,14 +138,12 @@ def run_keyGen_test(tg, tc):
120138
err(f"{acvp_call} failed with error code {result.returncode}")
121139
err(result.stderr)
122140
exit(1)
123-
# Extract results and compare to expected data
141+
# Extract results
124142
for l in result.stdout.splitlines():
125143
(k, v) = l.split("=")
126-
if v != tc[k]:
127-
err("FAIL!")
128-
err(f"Mismatching result for {k}: expected {tc[k]}, got {v}")
129-
exit(1)
130-
info("OK")
144+
results[k] = v
145+
info("done")
146+
return results
131147

132148

133149
def compute_hash(msg, alg):
@@ -163,13 +179,13 @@ def compute_hash(msg, alg):
163179

164180
def run_sigGen_test(tg, tc):
165181
info(f"Running sigGen test case {tc['tcId']} ... ", end="")
182+
results = {"tcId": tc["tcId"]}
166183
acvp_bin = get_acvp_binary(tg)
167184

168185
assert tg["testType"] == "AFT"
169186

170187
is_deterministic = tg["deterministic"] is True
171-
172-
if tg["preHash"] == "preHash":
188+
if "preHash" in tg and tg["preHash"] == "preHash":
173189
assert len(tc["context"]) <= 2 * 255
174190

175191
# Use specialized SHAKE256 function that computes hash internally
@@ -200,7 +216,7 @@ def run_sigGen_test(tg, tc):
200216
f"hashAlg={tc['hashAlg']}",
201217
]
202218
elif tg["signatureInterface"] == "external":
203-
assert tc["hashAlg"] == "none"
219+
assert "hashAlg" not in tc or tc["hashAlg"] == "none"
204220
assert len(tc["context"]) <= 2 * 255
205221
assert len(tc["message"]) <= 2 * 65536
206222

@@ -213,7 +229,7 @@ def run_sigGen_test(tg, tc):
213229
f"context={tc['context']}",
214230
]
215231
else: # signatureInterface=internal
216-
assert tc["hashAlg"] == "none"
232+
assert "hashAlg" not in tc or tc["hashAlg"] == "none"
217233
externalMu = 0
218234
if tg["externalMu"] is True:
219235
externalMu = 1
@@ -242,21 +258,20 @@ def run_sigGen_test(tg, tc):
242258
err(f"{acvp_call} failed with error code {result.returncode}")
243259
err(result.stderr)
244260
exit(1)
245-
# Extract results and compare to expected data
261+
# Extract results
246262
for l in result.stdout.splitlines():
247263
(k, v) = l.split("=")
248-
if v != tc[k]:
249-
err("FAIL!")
250-
err(f"Mismatching result for {k}: expected {tc[k]}, got {v}")
251-
exit(1)
252-
info("OK")
264+
results[k] = v
265+
info("done")
266+
return results
253267

254268

255269
def run_sigVer_test(tg, tc):
256270
info(f"Running sigVer test case {tc['tcId']} ... ", end="")
271+
results = {"tcId": tc["tcId"]}
257272
acvp_bin = get_acvp_binary(tg)
258273

259-
if tg["preHash"] == "preHash":
274+
if "preHash" in tg and tg["preHash"] == "preHash":
260275
assert len(tc["context"]) <= 2 * 255
261276

262277
# Use specialized SHAKE256 function that computes hash internally
@@ -281,7 +296,7 @@ def run_sigVer_test(tg, tc):
281296
f"hashAlg={tc['hashAlg']}",
282297
]
283298
elif tg["signatureInterface"] == "external":
284-
assert tc["hashAlg"] == "none"
299+
assert "hashAlg" not in tc or tc["hashAlg"] == "none"
285300
assert len(tc["context"]) <= 2 * 255
286301
assert len(tc["message"]) <= 2 * 65536
287302

@@ -294,7 +309,7 @@ def run_sigVer_test(tg, tc):
294309
f"pk={tc['pk']}",
295310
]
296311
else: # signatureInterface=internal
297-
assert tc["hashAlg"] == "none"
312+
assert "hashAlg" not in tc or tc["hashAlg"] == "none"
298313
externalMu = 0
299314
if tg["externalMu"] is True:
300315
externalMu = 1
@@ -314,61 +329,127 @@ def run_sigVer_test(tg, tc):
314329
]
315330

316331
result = subprocess.run(acvp_call, encoding="utf-8", capture_output=True)
317-
318-
if (result.returncode == 0) != tc["testPassed"]:
319-
err("FAIL!")
320-
err(
321-
f"Mismatching verification result: expected {tc['testPassed']}, got {result.returncode == 0}"
322-
)
323-
exit(1)
324-
info("OK")
325-
326-
327-
def runTestSingle(internalProjectionName, internalProjection):
328-
info(f"Running ACVP tests for {internalProjectionName}")
329-
330-
assert internalProjection["algorithm"] == "ML-DSA"
332+
# Extract results
333+
results["testPassed"] = result.returncode == 0
334+
info("done")
335+
return results
336+
337+
338+
def runTestSingle(promptName, prompt, expectedResultName, expectedResult, output):
339+
info(f"Running ACVP tests for {promptName}")
340+
341+
assert expectedResult is not None or output is not None
342+
343+
# The ACVTS data structure is very slightly different from the sample files
344+
# in the usnistgov/ACVP-Server Github repository:
345+
# The prompt consists of a 2-element list, where the first element is
346+
# solely consisting of {"acvVersion": "1.0"} and the second element is
347+
# the usual prompt containing the test values.
348+
# See https://pages.nist.gov/ACVP/draft-celi-acvp-ml-dsa.txt for details.
349+
# We automatically detect that case here and extract the second element
350+
isAcvts = False
351+
if type(prompt) is list:
352+
isAcvts = True
353+
assert len(prompt) == 2
354+
acvVersion = prompt[0]
355+
assert len(acvVersion) == 1
356+
prompt = prompt[1]
357+
358+
assert prompt["algorithm"] == "ML-DSA"
331359
assert (
332-
internalProjection["mode"] == "keyGen"
333-
or internalProjection["mode"] == "sigGen"
334-
or internalProjection["mode"] == "sigVer"
360+
prompt["mode"] == "keyGen"
361+
or prompt["mode"] == "sigGen"
362+
or prompt["mode"] == "sigVer"
335363
)
336364

337365
# copy top level fields into the results
338-
results = internalProjection.copy()
366+
results = prompt.copy()
339367

340368
results["testGroups"] = []
341-
for tg in internalProjection["testGroups"]:
369+
for tg in prompt["testGroups"]:
342370
tgResult = {
343371
"tgId": tg["tgId"],
344372
"tests": [],
345373
}
346374
results["testGroups"].append(tgResult)
347375
for tc in tg["tests"]:
348-
if internalProjection["mode"] == "keyGen":
376+
if prompt["mode"] == "keyGen":
349377
result = run_keyGen_test(tg, tc)
350-
elif internalProjection["mode"] == "sigGen":
378+
elif prompt["mode"] == "sigGen":
351379
result = run_sigGen_test(tg, tc)
352-
elif internalProjection["mode"] == "sigVer":
380+
elif prompt["mode"] == "sigVer":
353381
result = run_sigVer_test(tg, tc)
354382
tgResult["tests"].append(result)
355383

384+
# In case the testvectors are from the ACVTS server, it is expected
385+
# that the acvVersion is included in the output results.
386+
# See note on ACVTS data structure above.
387+
if isAcvts is True:
388+
results = [acvVersion, results]
389+
390+
# Compare to expected results
391+
if expectedResult is not None:
392+
info(f"Comparing results with {expectedResultName}")
393+
# json.dumps() is guaranteed to preserve insertion order (since Python 3.7)
394+
# Enforce strictly the same order as in the expected Result
395+
if json.dumps(results) != json.dumps(expectedResult):
396+
err("FAIL!")
397+
err(f"Mismatching result for {promptName}")
398+
exit(1)
399+
info("OK")
400+
else:
401+
info(
402+
"Results could not be validated as no expected resulted were provided to --expected"
403+
)
404+
405+
# Write results to file
406+
if output is not None:
407+
info(f"Writing results to {output}")
408+
with open(output, "w") as f:
409+
json.dump(results, f)
410+
411+
412+
def runTest(data, output):
413+
# if output is defined we expect only one input
414+
assert output is None or len(data) == 1
356415

357-
def runTest(data):
358-
for internalProjectionName, internalProjection in data:
359-
runTestSingle(internalProjectionName, internalProjection)
416+
for promptName, prompt, expectedResultName, expectedResult in data:
417+
runTestSingle(promptName, prompt, expectedResultName, expectedResult, output)
360418
info("ALL GOOD!")
361419

362420

363-
def test(version="v1.1.0.40"):
364-
# load data from downloaded files
365-
data = loadDefaultAcvpData(version)
421+
def test(prompt, expected, output, version="v1.1.0.40"):
422+
assert (
423+
prompt is not None or output is None
424+
), "cannot produce output if there is no input"
425+
426+
assert prompt is None or (
427+
output is not None or expected is not None
428+
), "if there is a prompt, either output or expectedResult required"
366429

367-
runTest(data)
430+
# if prompt is passed, use it
431+
if prompt is not None:
432+
data = [loadAcvpData(prompt, expected)]
433+
else:
434+
# load data from downloaded files
435+
data = loadDefaultAcvpData(version)
368436

437+
runTest(data, output)
369438

370-
parser = argparse.ArgumentParser()
371439

440+
parser = argparse.ArgumentParser()
441+
parser.add_argument(
442+
"-p", "--prompt", help="Path to prompt file in json format", required=False
443+
)
444+
parser.add_argument(
445+
"-e",
446+
"--expected",
447+
help="Path to expectedResults file in json format",
448+
required=False,
449+
)
450+
parser.add_argument(
451+
"-o", "--output", help="Path to output file in json format", required=False
452+
)
372453
parser.add_argument(
373454
"--version",
374455
"-v",
@@ -377,9 +458,12 @@ def test(version="v1.1.0.40"):
377458
)
378459
args = parser.parse_args()
379460

380-
# Download files if needed
381-
if not download_acvp_files(args.version):
382-
print("Failed to download ACVP test files", file=sys.stderr)
383-
sys.exit(1)
461+
if args.prompt is None:
462+
print(f"Using ACVP test vectors version {args.version}", file=sys.stderr)
463+
464+
# Download files if needed
465+
if not download_acvp_files(args.version):
466+
print("Failed to download ACVP test files", file=sys.stderr)
467+
sys.exit(1)
384468

385-
test(args.version)
469+
test(args.prompt, args.expected, args.output, args.version)

0 commit comments

Comments
 (0)