Skip to content

Commit 25a4c2c

Browse files
authored
Add checksum caching (#134)
* Implement calculate_checksum on Instance * Use new checksumming option throughout * Rev version
1 parent d73e22a commit 25a4c2c

File tree

8 files changed

+149
-79
lines changed

8 files changed

+149
-79
lines changed
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Copyright 2017 the HERA Collaboration
2+
# Licensed under the 2-clause BSD License.
3+
4+
"""Add cached checksums
5+
6+
Revision ID: 38a604ac628b
7+
Revises: 1def8c988372
8+
Create Date: 2025-02-20 11:12:54.446089
9+
10+
"""
11+
import sqlalchemy as sa
12+
13+
from alembic import op
14+
15+
revision = "38a604ac628b"
16+
down_revision = "1def8c988372"
17+
branch_labels = None
18+
depends_on = None
19+
20+
21+
def upgrade():
22+
with op.batch_alter_table("instances") as batch_op:
23+
batch_op.add_column(
24+
sa.Column("calculated_checksum", sa.String(), nullable=True)
25+
)
26+
batch_op.add_column(sa.Column("calculated_size", sa.Integer(), nullable=True))
27+
batch_op.add_column(sa.Column("checksum_time", sa.DateTime(), nullable=True))
28+
29+
30+
def downgrade():
31+
op.drop_column("instances", "calculated_checksum")
32+
op.drop_column("instances", "calculated_size")
33+
op.drop_column("instances", "checksum_time")

librarian_background/check_integrity.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from schedule import CancelJob
1010
from sqlalchemy.orm import Session
1111

12-
from hera_librarian.utils import compare_checksums, get_hash_function_from_hash
12+
from hera_librarian.utils import compare_checksums
1313
from librarian_server.database import get_session
1414
from librarian_server.orm import Instance, StoreMetadata
1515
from librarian_server.orm.file import CorruptFile, File
@@ -81,10 +81,7 @@ def core(self, session: Session):
8181
for file in files:
8282
# Now we can check the integrity of each file.
8383
try:
84-
hash_function = get_hash_function_from_hash(file.file.checksum)
85-
path_info = store.store_manager.path_info(
86-
file.path, hash_function=hash_function
87-
)
84+
checksum, size = file.calculate_checksum(session=session, commit=True)
8885
except FileNotFoundError:
8986
all_files_fine = False
9087
logger.error(
@@ -98,7 +95,7 @@ def core(self, session: Session):
9895
# Compare checksum to database
9996
expected_checksum = file.file.checksum
10097

101-
if compare_checksums(expected_checksum, path_info.checksum):
98+
if compare_checksums(expected_checksum, checksum):
10299
logger.info(
103100
"Instance {} on store {} has been validated (Instance: {})",
104101
file.path,
@@ -120,8 +117,8 @@ def core(self, session: Session):
120117
if corrupt_file is None:
121118
corrupt_file = CorruptFile.new_corrupt_file(
122119
instance=file,
123-
size=path_info.size,
124-
checksum=path_info.checksum,
120+
size=size,
121+
checksum=checksum,
125122
)
126123
session.add(corrupt_file)
127124
session.commit()
@@ -134,7 +131,7 @@ def core(self, session: Session):
134131
file.path,
135132
store.name,
136133
expected_checksum,
137-
path_info.checksum,
134+
checksum,
138135
file.id,
139136
)
140137
if all_files_fine:

librarian_background/corruption_fixer.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
CorruptionResendResponse,
1818
)
1919
from hera_librarian.transfer import TransferStatus
20-
from hera_librarian.utils import compare_checksums, get_hash_function_from_hash
20+
from hera_librarian.utils import compare_checksums
2121
from librarian_server.database import get_session
2222
from librarian_server.orm.file import CorruptFile, File
2323
from librarian_server.orm.instance import Instance
@@ -91,13 +91,11 @@ def core(self, session: Session) -> bool:
9191

9292
# Step A: Check that the file is actually corrupt
9393
try:
94-
hash_function = get_hash_function_from_hash(potential_file.checksum)
95-
store = potential_instance.store
96-
path_info = store.store_manager.path_info(
97-
potential_instance.path, hash_function=hash_function
94+
potential_checksum, potential_size = (
95+
potential_instance.calculate_checksum(session=session, commit=True)
9896
)
9997

100-
if compare_checksums(potential_file.checksum, path_info.checksum):
98+
if compare_checksums(potential_file.checksum, potential_checksum):
10199
logger.info(
102100
"CorruptFile {id} stated that file {name} was corrupt in instance {inst_id} "
103101
"but we just checked the checksums: {chk_a}=={chk_b} and the file is fine "
@@ -106,7 +104,7 @@ def core(self, session: Session) -> bool:
106104
name=corrupt.file_name,
107105
inst_id=corrupt.instance_id,
108106
chk_a=potential_file.checksum,
109-
chk_b=path_info.checksum,
107+
chk_b=potential_checksum,
110108
)
111109
session.delete(corrupt)
112110
session.commit()

librarian_server/api/corrupt.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
CorruptionResendRequest,
1414
CorruptionResendResponse,
1515
)
16-
from hera_librarian.utils import compare_checksums, get_hash_function_from_hash
16+
from hera_librarian.utils import compare_checksums
1717
from librarian_server.orm.file import File
1818
from librarian_server.orm.instance import Instance, RemoteInstance
1919
from librarian_server.orm.librarian import Librarian
@@ -99,12 +99,11 @@ def user_and_librarian_validation_flow(
9999
),
100100
)
101101

102-
hash_function = get_hash_function_from_hash(file.checksum)
103-
path_info = best_instance.store.store_manager.path_info(
104-
best_instance.path, hash_function=hash_function
102+
recomputed_checksum, _ = best_instance.calculate_checksum(
103+
session=session, commit=True, force_recompute=True
105104
)
106105

107-
if not compare_checksums(file.checksum, path_info.checksum):
106+
if not compare_checksums(file.checksum, recomputed_checksum):
108107
logger.error(
109108
"Our copy of the file {} is corrupt, we cannot send it to {}",
110109
file_name,

librarian_server/api/validate.py

Lines changed: 17 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
FileValidationResponse,
2828
FileValidationResponseItem,
2929
)
30-
from hera_librarian.utils import compare_checksums, get_hash_function_from_hash
30+
from hera_librarian.utils import compare_checksums
3131

3232
from ..database import yield_session
3333
from ..logger import log
@@ -39,80 +39,42 @@
3939

4040
router = APIRouter(prefix="/api/v2/validate")
4141

42-
VALIDATION_TIMEOUT = datetime.timedelta(hours=8)
43-
VALIDATION_CACHE = {}
44-
45-
46-
async def cached_calculate_checksum_of_local_copy(
47-
original_checksum: str,
48-
original_size: int,
49-
path_info_function: callable,
50-
path: Path,
51-
store_id: int,
52-
instance_id: int,
53-
):
54-
key = f"{original_checksum}-{instance_id}"
55-
56-
cached = VALIDATION_CACHE.get(key, None)
57-
58-
if cached is None or (
59-
(datetime.datetime.now(datetime.timezone.utc) - cached[1]) > VALIDATION_TIMEOUT
60-
):
61-
result = await asyncify(calculate_checksum_of_local_copy)(
62-
original_checksum=original_checksum,
63-
original_size=original_size,
64-
path_info_function=path_info_function,
65-
path=path,
66-
store_id=store_id,
67-
instance_id=instance_id,
68-
)
69-
70-
VALIDATION_CACHE[key] = (result, datetime.datetime.now(datetime.timezone.utc))
71-
else:
72-
log.info(
73-
f"Using cached result for instance {instance_id}", instance_id=instance_id
74-
)
75-
result = cached[0]
76-
77-
return result
78-
7942

8043
def calculate_checksum_of_local_copy(
8144
original_checksum: str,
8245
original_size: int,
83-
path_info_function: callable,
84-
path: Path,
85-
store_id: int,
86-
instance_id: int,
46+
instance: Instance,
47+
session: Session,
8748
):
8849
start = perf_counter()
89-
hash_function = get_hash_function_from_hash(original_checksum)
9050
try:
91-
path_info = path_info_function(path, hash_function=hash_function)
51+
current_checksum, current_size = instance.calculate_checksum(
52+
session=session, commit=True
53+
)
9254
response = FileValidationResponseItem(
9355
librarian=server_settings.name,
94-
store=store_id,
95-
instance_id=instance_id,
56+
store=instance.store_id,
57+
instance_id=instance.id,
9658
original_checksum=original_checksum,
9759
original_size=original_size,
98-
current_checksum=path_info.checksum,
99-
current_size=path_info.size,
60+
current_checksum=current_checksum,
61+
current_size=current_size,
10062
computed_same_checksum=compare_checksums(
101-
original_checksum, path_info.checksum
63+
original_checksum, current_checksum
10264
),
10365
)
10466
end = perf_counter()
10567

10668
log.debug(
107-
f"Calculated path info for {instance_id} ({path_info.size} B) "
108-
f"in {end - start:.2f} seconds."
69+
f"Calculated path info for {response.instance_id} / {instance.path} "
70+
f"({response.current_size} B) in {end - start:.2f} seconds"
10971
)
11072

11173
return [response]
11274
except FileNotFoundError:
11375
# A mistakenly 'available' file that is not actually available.
11476
log.error(
115-
f"File {path} in store {store_id} marked as available but does not exist."
77+
f"File {instance.path} in store {instance.store_id} marked as available but does not exist."
11678
)
11779

11880
return []
@@ -224,13 +186,11 @@ async def validate_file(
224186
if not instance.available:
225187
continue
226188

227-
this_checksum_info = cached_calculate_checksum_of_local_copy(
189+
this_checksum_info = asyncify(calculate_checksum_of_local_copy)(
228190
original_checksum=file.checksum,
229191
original_size=file.size,
230-
path_info_function=instance.store.store_manager.path_info,
231-
path=instance.path,
232-
store_id=instance.store.id,
233-
instance_id=instance.id,
192+
instance=instance,
193+
session=session,
234194
)
235195

236196
coroutines.append(this_checksum_info)

librarian_server/orm/instance.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from sqlalchemy.orm import Session
1212

1313
from hera_librarian.deletion import DeletionPolicy
14+
from hera_librarian.utils import get_hash_function_from_hash
1415

1516
from .. import database as db
1617
from ..settings import server_settings
@@ -50,6 +51,13 @@ class Instance(db.Base):
5051
available = db.Column(db.Boolean, nullable=False)
5152
"Whether or not this file is available on our librarian."
5253

54+
calculated_checksum = db.Column(db.String, nullable=True)
55+
"The checksum that has been calculated for this on-disk instance"
56+
calculated_size = db.Column(db.Integer, nullable=True)
57+
"The size of the file that was calculated at the same time as the checksum"
58+
checksum_time = db.Column(db.DateTime, nullable=True)
59+
"The time at which the calculated_checksum was checked"
60+
5361
@classmethod
5462
def new_instance(
5563
self,
@@ -124,6 +132,79 @@ def delete(
124132

125133
return
126134

135+
def calculate_checksum(
136+
self,
137+
session: Session,
138+
commit: bool = True,
139+
force_recompute: bool = False,
140+
) -> tuple[str, int]:
141+
"""
142+
Calculates the checksum of the instance on disk. It will use the stored checksum
143+
in the table instead if it has not yet timed out and has been recently calculated.
144+
145+
Parameters
146+
----------
147+
session: Session
148+
The database session to use; this can be committed to if commit=True below.
149+
Session must be active as we make sub-queries for file.
150+
commit: bool = True
151+
Whether to commit any new changes to the database.
152+
force_recompute: bool = False
153+
Re-compute the checksum even when a hashed version is available.
154+
155+
Returns
156+
-------
157+
checksum: str
158+
The checksum, calculated or drawn from the database.
159+
size: int
160+
Size of the on-disk file in bytes.
161+
162+
Raises
163+
------
164+
FileNotFoundError
165+
If the file was not found on disk.
166+
"""
167+
168+
current_time = datetime.now(timezone.utc)
169+
170+
if (
171+
(self.checksum_time is not None)
172+
and (self.calculated_checksum is not None)
173+
and (not force_recompute)
174+
):
175+
checksum_time = self.checksum_time.astimezone(timezone.utc)
176+
if (current_time - checksum_time) < server_settings.checksum_timeout:
177+
logger.info(
178+
"Returning a cached checksum from {time} for instance {id} at "
179+
"{path} - {checksum}",
180+
time=checksum_time,
181+
id=self.id,
182+
path=self.path,
183+
checksum=self.calculated_checksum,
184+
)
185+
return self.calculated_checksum, self.calculated_size
186+
187+
# We must calculate the checksum fresh.
188+
hash_function = get_hash_function_from_hash(self.file.checksum)
189+
path_info = self.store.store_manager.path_info(self.path, hash_function)
190+
191+
logger.info(
192+
"Calculated a fresh checksum at {time} for instance {id} at {path} - {checksum}",
193+
time=current_time,
194+
id=self.id,
195+
path=self.path,
196+
checksum=self.calculated_checksum,
197+
)
198+
199+
self.calculated_checksum = path_info.checksum
200+
self.calculated_size = path_info.size
201+
self.checksum_time = current_time
202+
203+
if commit:
204+
session.commit()
205+
206+
return self.calculated_checksum, self.calculated_size
207+
127208

128209
class RemoteInstance(db.Base):
129210
"""

librarian_server/settings.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
deserialized from the available librarian config path.
44
"""
55

6+
import datetime
67
import os
78
from pathlib import Path
89
from typing import TYPE_CHECKING, Optional
@@ -179,6 +180,7 @@ class ServerSettings(BaseSettings):
179180

180181
# Checksumming options
181182
checksum_threads: int = 4
183+
checksum_timeout: datetime.timedelta = datetime.timedelta(days=1)
182184

183185
model_config = SettingsConfigDict(env_prefix="librarian_server_")
184186

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ exclude=["*tests*"]
77
[project]
88
name="hera_librarian"
99
requires-python = ">=3.10"
10-
version = "3.1.2"
10+
version = "4.0.0"
1111
dependencies = [
1212
"alembic",
1313
"argon2-cffi",

0 commit comments

Comments
 (0)