Skip to content
This repository was archived by the owner on Feb 6, 2026. It is now read-only.

Commit 902c035

Browse files
committed
Various USM fixes coming from samples in issue #4
usm: USM cipher identity fix usm: Always prepend header data usm: Allow HCACodec key specification in get_audios Add `save()` helper methods to builder classes
1 parent e4f1bb2 commit 902c035

File tree

4 files changed

+128
-56
lines changed

4 files changed

+128
-56
lines changed

PyCriCodecsEx/acb.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,3 +320,12 @@ def build(self) -> bytes:
320320
# Check whether all AWB indices are valid
321321
binary = UTFBuilder(self.acb.dictarray, encoding=self.acb.encoding, table_name=self.acb.table_name)
322322
return binary.bytes()
323+
324+
def save(self, filepath: str) -> None:
325+
"""Saves the built ACB to a file.
326+
327+
Args:
328+
filepath (str): The path to save the ACB file to.
329+
"""
330+
with open(filepath, "wb") as f:
331+
f.write(self.build())

PyCriCodecsEx/awb.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,4 +165,13 @@ def build(self) -> bytes:
165165
if len(fl) % self.align != 0 and idx != len(self.infiles) - 1:
166166
fl = fl.ljust(len(fl) + (self.align - (len(fl) % self.align)), b"\x00")
167167
outfile.write(fl)
168-
return outfile.getvalue()
168+
return outfile.getvalue()
169+
170+
def save(self, filepath: str) -> None:
171+
"""Saves the built AWB to a file.
172+
173+
Args:
174+
filepath (str): The path to save the AWB file to.
175+
"""
176+
with open(filepath, "wb") as f:
177+
f.write(self.build())

PyCriCodecsEx/usm.py

Lines changed: 64 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ class USMCrypt:
2121
videomask2: bytearray
2222
audiomask: bytearray
2323
usm_key: int = 0
24+
2425
def init_key(self, key: str):
2526
if type(key) == str:
2627
if len(key) <= 16:
@@ -97,13 +98,14 @@ def init_key(self, key: str):
9798
self.audiomask[x] = self.videomask2[x]
9899

99100
# Decrypt SFV chunks or ALP chunks, should only be used if the video data is encrypted.
100-
def VideoMask(self, memObj: bytearray) -> bytearray:
101+
def VideoMask(self, memObj: bytearray) -> bytearray:
101102
head = memObj[:0x40]
102-
memObj = memObj[0x40:]
103+
memObj = memObj[0x40:]
103104
size = len(memObj)
104105
# memObj len is a cached property, very fast to lookup
105106
if size <= 0x200:
106107
return head + memObj
108+
assert type(memObj) == bytearray, "memObj must be a bytearray."
107109
data_view = memoryview(memObj).cast("Q")
108110

109111
# mask 2
@@ -112,24 +114,48 @@ def VideoMask(self, memObj: bytearray) -> bytearray:
112114
vmask = self.videomask2
113115
vmask_view = memoryview(vmask).cast("Q")
114116

115-
mask_index = 0
116-
117117
for i in range(32, size // 8):
118-
data_view[i] ^= mask_view[mask_index]
119-
mask_view[mask_index] = data_view[i] ^ vmask_view[mask_index]
120-
mask_index = (mask_index + 1) % 4
121-
118+
data_view[i] ^= mask_view[i % 4]
119+
mask_view[i % 4] = data_view[i] ^ vmask_view[i % 4]
120+
122121
# mask 1
123122
mask = bytearray(self.videomask1)
124123
mask_view = memoryview(mask).cast("Q")
125-
mask_index = 0
126124
for i in range(32):
127-
mask_view[mask_index] ^= data_view[i + 32]
128-
data_view[i] ^= mask_view[mask_index]
129-
mask_index = (mask_index + 1) % 4
130-
125+
mask_view[i % 4] ^= data_view[i + 32]
126+
data_view[i] ^= mask_view[i % 4]
131127
return head + memObj
132128

129+
# Encrypt SFV chunks or ALP chunks, should only be used if the video data needs to be encrypted.
130+
def VideoMaskInv(self, memObj: bytearray) -> bytearray:
131+
head = memObj[:0x40]
132+
memObj = memObj[0x40:]
133+
size = len(memObj)
134+
# memObj len is a cached property, very fast to lookup
135+
if size <= 0x200:
136+
return head + memObj
137+
assert type(memObj) == bytearray, "memObj must be a bytearray."
138+
data_view = memoryview(memObj).cast("Q")
139+
140+
# mask 1
141+
mask = bytearray(self.videomask1)
142+
mask_view = memoryview(mask).cast("Q")
143+
for i in range(32):
144+
mask_view[i % 4] ^= data_view[i + 32]
145+
data_view[i] ^= mask_view[i % 4]
146+
147+
# mask 2
148+
mask = bytearray(self.videomask2)
149+
mask_view = memoryview(mask).cast("Q")
150+
vmask = self.videomask2
151+
vmask_view = memoryview(vmask).cast("Q")
152+
153+
for i in range(32, size // 8):
154+
temp = data_view[i]
155+
data_view[i] ^= mask_view[i % 4]
156+
mask_view[i % 4] = temp ^ vmask_view[i % 4]
157+
return head + memObj
158+
133159
# Decrypts SFA chunks, should just be used with ADX files.
134160
def AudioMask(self, memObj: bytearray) -> bytearray:
135161
head = memObj[:0x140]
@@ -202,7 +228,7 @@ def framerate(self):
202228
# Lesson learned. Do NOT trust the metadata.
203229
# num, denom = self.stream["r_frame_rate"].split("/")
204230
# return int(int(num) / int(denom))
205-
return 1 / min((dt for _, _, _, dt in self.frames()))
231+
return 1 / min((dt for _, _, _, dt in self.frames() if dt))
206232

207233
@cached_property
208234
def avg_framerate(self):
@@ -211,7 +237,7 @@ def avg_framerate(self):
211237
# if avg_frame_rate:
212238
# num, denom = avg_frame_rate.split("/")
213239
# return int(int(num) / int(denom))
214-
return self.frame_count / sum((dt for _, _, _, dt in self.frames()))
240+
return self.frame_count / sum((dt for _, _, _, dt in self.frames() if dt))
215241

216242
@property
217243
def packets(self):
@@ -232,7 +258,8 @@ def frame_count(self):
232258
def frames(self):
233259
"""Generator of [frame data, frame dict, is keyframe, duration]"""
234260
offsets = [int(packet["pos"]) for packet in self.packets] + [self.filesize]
235-
for i, frame in enumerate(self.packets):
261+
offsets[0] = 0 # Includes the metadata packet as well
262+
for i, frame in enumerate(self.packets):
236263
frame_size = offsets[i + 1] - offsets[i]
237264
self.file.seek(offsets[i])
238265
raw_frame = self.file.read(frame_size)
@@ -266,8 +293,9 @@ def generate_SFV(self, builder: "USMBuilder"):
266293
0,
267294
0,
268295
)
296+
data += b"\x00" * padlen
269297
if builder.encrypt:
270-
data = builder.VideoMask(data)
298+
data = builder.VideoMaskInv(bytearray(data))
271299
SFV_chunk += data
272300
SFV_chunk = SFV_chunk.ljust(datalen + 0x18 + padlen + 0x8, b"\x00")
273301
SFV_list.append(SFV_chunk)
@@ -354,7 +382,7 @@ def __init__(self, filename : str | BinaryIO, key: str | int = None):
354382
355383
Args:
356384
filename (str): The path to the USM file.
357-
key (str, optional): The decryption key. Either int64 or a hex string. Defaults to None.
385+
key (str, optional): The USM decryption key. Either int64 or a hex string. Defaults to None.
358386
"""
359387
self.filename = filename
360388
self.decrypt = False
@@ -507,19 +535,19 @@ def get_video(self) -> VP9Codec | H264Codec | MPEG1Codec:
507535
stream.filename = sfname
508536
return stream
509537

510-
def get_audios(self, hca_key = 0, hca_subkey = 0) -> List[ADXCodec | HCACodec]:
538+
def get_audios(self, hca_key = -1, hca_subkey = 0) -> List[ADXCodec | HCACodec]:
511539
"""Create a list of audio codecs from the available streams.
512540
513541
Args:
514-
hca_key (int, optional): The HCA decryption key. Either int64 or a hex string. Defaults to 0 - in which
542+
hca_key (int, optional): The HCA decryption key. Either int64 or a hex string. Defaults to -1, in which
515543
case the key for USM (if used) would also be used for HCA decryption.
516544
hca_subkey (int, optional): The HCA decryption subkey. Either int64 or a hex string. Defaults to 0.
517545
"""
518546
match self.audio_codec:
519547
case ADXCodec.AUDIO_CODEC:
520548
return [ADXCodec(s[2], s[1]) for s in self.streams if s[0] == USMChunckHeaderType.SFA.value]
521549
case HCACodec.AUDIO_CODEC:
522-
return [HCACodec(s[2], s[1], key=hca_key or self.usm_key, subkey=hca_subkey) for s in self.streams if s[0] == USMChunckHeaderType.SFA.value] # HCAs are never encrypted in USM
550+
return [HCACodec(s[2], s[1], key=hca_key if hca_key != -1 else self.usm_key, subkey=hca_subkey) for s in self.streams if s[0] == USMChunckHeaderType.SFA.value] # HCAs are never encrypted in USM
523551
case _:
524552
return []
525553

@@ -530,23 +558,23 @@ class USMBuilder(USMCrypt):
530558

531559
key: int = None
532560
encrypt: bool = False
533-
encrypt_audio: bool = False
534561

535562
def __init__(
536563
self,
537-
key = None,
538-
encrypt_audio = False
564+
key = None
539565
) -> None:
540566
"""Initialize the USMBuilder from set source files.
541567
542568
Args:
543-
key (str | int, optional): The encryption key. Either int64 or a hex string. Defaults to None.
544-
encrypt_audio (bool, optional): Whether to also encrypt the audio. Defaults to False.
569+
key (str | int, optional): The USM encryption key. Either int64 or a hex string. Defaults to None.
570+
571+
Note:
572+
For USM with key set, HCA audio streams *usually* use the same key for encryption.
573+
Thus when adding HCA audio streams, make sure your HCACodec is initialized with a key itself.
545574
"""
546575
if key:
547576
self.init_key(key)
548577
self.encrypt = True
549-
self.encrypt_audio = encrypt_audio
550578
self.audio_streams = []
551579

552580
def add_video(self, video : str | H264Codec | VP9Codec | MPEG1Codec):
@@ -624,6 +652,15 @@ def chunk_key_sort(chunk):
624652
self.usm += chunks
625653
return self.usm
626654

655+
def save(self, filepath: str) -> None:
656+
"""Saves the built USM to a file.
657+
658+
Args:
659+
filepath (str): The path to save the USM file to.
660+
"""
661+
with open(filepath, "wb") as f:
662+
f.write(self.build())
663+
627664
def _build_header(
628665
self, SFV_list: list, SFA_chunks: list, SBT_chunks: list # TODO: Not used
629666
) -> bytes:

Tests/issue4.py

Lines changed: 45 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,58 @@
11
# USM Sample from Digimon Story: Cyber Sleuth (PC)
22
# Uses same key for both HCA and USM decryption
3-
# HCA Key {2897314143465725881}, // 283553DCE3FD5FB9
4-
# USM Key 2897314143465725881
3+
USM_KEY = 2897314143465725881
54
from . import sample_file_path, temp_file_path
6-
from PyCriCodecsEx.usm import USM, USMBuilder, ADXCodec, HCACodec
7-
8-
def test_usm_decode_and_mux():
9-
usm = USM(temp_file_path('S01_B.usm'), 2897314143465725881)
5+
from PyCriCodecsEx.usm import USM, USMBuilder, ADXCodec, HCACodec, VP9Codec
6+
import ffmpeg, os
7+
def mux_av(video_src: str, audio_src: str, output: str, delete: bool = False):
8+
(
9+
ffmpeg.output(
10+
ffmpeg.input(video_src),
11+
ffmpeg.input(audio_src),
12+
output,
13+
vcodec='copy',
14+
acodec='copy',
15+
).overwrite_output()
16+
).run()
17+
if delete:
18+
print('* Cleaning up temporary files')
19+
os.unlink(video_src)
20+
os.unlink(audio_src)
21+
print(f'* Result available at: {output}')
22+
def decode_one(path : str):
23+
usm = USM(path, USM_KEY)
1024
audio = usm.get_audios()
1125
video = usm.get_video()
1226
audio = audio[0] if audio else None
13-
# Mux into MP4
14-
import ffmpeg, os
15-
def mux_av(video_src: str, audio_src: str, output: str, delete: bool = False):
16-
(
17-
ffmpeg.output(
18-
ffmpeg.input(video_src),
19-
ffmpeg.input(audio_src),
20-
output,
21-
vcodec='copy',
22-
acodec='copy',
23-
).overwrite_output()
24-
).run()
25-
if delete:
26-
print('* Cleaning up temporary files')
27-
os.unlink(video_src)
28-
os.unlink(audio_src)
29-
print(f'* Result available at: {output}')
30-
saved_video = temp_file_path('tmp_video.mp4')
31-
saved_audio = temp_file_path('tmp_audio.wav')
32-
saved_hca = temp_file_path('tmp_audio.hca')
33-
result = temp_file_path('muxed_result1.mp4')
34-
open(saved_hca,'wb').write(audio.get_hca())
27+
return audio, video
28+
def test_usm_decode_and_mux():
29+
saved_video = temp_file_path('tmp_video.ivf') # <- must be IVF container
30+
saved_audio = temp_file_path('tmp_audio.wav')
31+
result = temp_file_path('muxed_result1.mp4')
32+
# Decode
33+
audio, video = decode_one(temp_file_path('S01_B.usm'))
3534
video.save(saved_video)
3635
audio.save(saved_audio)
36+
# Mux
3737
mux_av(saved_video, saved_audio, result)
3838
print('Remux Done.')
39+
# Rebuild
40+
usm_builder = USMBuilder(USM_KEY)
41+
usm_builder.add_video(saved_video)
42+
usm_builder.add_audio(HCACodec(saved_audio, key=USM_KEY))
43+
rebuilt_usm_path = temp_file_path('rebuilt_usm.usm')
44+
usm_builder.save(rebuilt_usm_path)
45+
print(f'Rebuilt USM saved at: {rebuilt_usm_path}')
46+
# Decoded again
47+
audio2, video2 = decode_one(rebuilt_usm_path)
48+
saved_video2 = temp_file_path('tmp_video2.ivf') # <- must be IVF container
49+
saved_audio2 = temp_file_path('tmp_audio2.wav')
50+
result2 = temp_file_path('muxed_result2.mp4')
51+
video2.save(saved_video2)
52+
audio2.save(saved_audio2)
53+
# Mux again
54+
mux_av(saved_video2, saved_audio2, result2)
55+
print('Remux Done (decode-mux).')
3956

4057
if __name__ == "__main__":
4158
test_usm_decode_and_mux()

0 commit comments

Comments
 (0)