11"""
2- Folder status and integrity checking utilities for ArchiveBox.
2+ Folder utilities for ArchiveBox.
3+
4+ Note: This file only contains legacy cleanup utilities.
5+ The DB is the single source of truth - use Snapshot.objects queries for all status checks.
36"""
47
58__package__ = 'archivebox.misc'
811import json
912import shutil
1013from pathlib import Path
11- from itertools import chain
12- from typing import Dict , Optional , List , Tuple , TYPE_CHECKING
13-
14- from django .db .models import QuerySet
14+ from typing import Tuple , List
1515
1616from archivebox .config import DATA_DIR , CONSTANTS
1717from archivebox .misc .util import enforce_types
1818
19- if TYPE_CHECKING :
20- from core .models import Snapshot
21-
22-
23- def _is_valid_snapshot (snapshot : 'Snapshot' ) -> bool :
24- """Check if a snapshot's data directory is valid"""
25- dir_exists = Path (snapshot .output_dir ).exists ()
26- index_exists = (Path (snapshot .output_dir ) / "index.json" ).exists ()
27- if not dir_exists :
28- return False
29- if dir_exists and not index_exists :
30- return False
31- if dir_exists and index_exists :
32- try :
33- with open (Path (snapshot .output_dir ) / "index.json" , 'r' ) as f :
34- data = json .load (f )
35- return snapshot .url == data .get ('url' )
36- except Exception :
37- pass
38- return False
39-
40-
41- def _is_corrupt_snapshot (snapshot : 'Snapshot' ) -> bool :
42- """Check if a snapshot's data directory is corrupted"""
43- if not Path (snapshot .output_dir ).exists ():
44- return False
45- return not _is_valid_snapshot (snapshot )
46-
47-
48- def get_indexed_folders (snapshots : QuerySet , out_dir : Path = DATA_DIR ) -> Dict [str , 'Snapshot' ]:
49- """indexed snapshots without checking archive status or data directory validity"""
50- return {
51- snapshot .output_dir : snapshot
52- for snapshot in snapshots .iterator (chunk_size = 500 )
53- }
54-
55-
56- def get_archived_folders (snapshots : QuerySet , out_dir : Path = DATA_DIR ) -> Dict [str , 'Snapshot' ]:
57- """indexed snapshots that are archived with a valid data directory"""
58- return {
59- snapshot .output_dir : snapshot
60- for snapshot in snapshots .iterator (chunk_size = 500 )
61- if snapshot .is_archived
62- }
63-
64-
65- def get_unarchived_folders (snapshots : QuerySet , out_dir : Path = DATA_DIR ) -> Dict [str , 'Snapshot' ]:
66- """indexed snapshots that are unarchived with no data directory or an empty data directory"""
67- return {
68- snapshot .output_dir : snapshot
69- for snapshot in snapshots .iterator (chunk_size = 500 )
70- if not snapshot .is_archived
71- }
72-
73-
74- def get_present_folders (snapshots : QuerySet , out_dir : Path = DATA_DIR ) -> Dict [str , Optional ['Snapshot' ]]:
75- """dirs that actually exist in the archive/ folder"""
76- from core .models import Snapshot
77-
78- all_folders = {}
79- archive_dir = out_dir / CONSTANTS .ARCHIVE_DIR_NAME
80- if not archive_dir .exists ():
81- return all_folders
82- for entry in archive_dir .iterdir ():
83- if entry .is_dir ():
84- snapshot = None
85- try :
86- snapshot = Snapshot .objects .get (timestamp = entry .name )
87- except Snapshot .DoesNotExist :
88- pass
89- all_folders [entry .name ] = snapshot
90- return all_folders
91-
92-
93- def get_valid_folders (snapshots : QuerySet , out_dir : Path = DATA_DIR ) -> Dict [str , 'Snapshot' ]:
94- """dirs with a valid index matched to the main index and archived content"""
95- return {
96- snapshot .output_dir : snapshot
97- for snapshot in snapshots .iterator (chunk_size = 500 )
98- if _is_valid_snapshot (snapshot )
99- }
100-
101-
102- def get_invalid_folders (snapshots : QuerySet , out_dir : Path = DATA_DIR ) -> Dict [str , Optional ['Snapshot' ]]:
103- """dirs that are invalid for any reason: corrupted/duplicate/orphaned/unrecognized"""
104- duplicate = get_duplicate_folders (snapshots , out_dir = out_dir )
105- orphaned = get_orphaned_folders (snapshots , out_dir = out_dir )
106- corrupted = get_corrupted_folders (snapshots , out_dir = out_dir )
107- unrecognized = get_unrecognized_folders (snapshots , out_dir = out_dir )
108- return {** duplicate , ** orphaned , ** corrupted , ** unrecognized }
109-
110-
111- def get_duplicate_folders (snapshots : QuerySet , out_dir : Path = DATA_DIR ) -> Dict [str , Optional ['Snapshot' ]]:
112- """dirs that conflict with other directories that have the same URL or timestamp"""
113- from core .models import Snapshot as SnapshotModel
114-
115- by_url : Dict [str , int ] = {}
116- by_timestamp : Dict [str , int ] = {}
117- duplicate_folders : Dict [str , Optional ['Snapshot' ]] = {}
118-
119- archive_dir = CONSTANTS .ARCHIVE_DIR
120- if not archive_dir .exists ():
121- return duplicate_folders
122-
123- data_folders = (
124- str (entry )
125- for entry in archive_dir .iterdir ()
126- if entry .is_dir () and not snapshots .filter (timestamp = entry .name ).exists ()
127- )
128-
129- for item in chain (snapshots .iterator (chunk_size = 500 ), data_folders ):
130- snapshot = None
131- if isinstance (item , str ):
132- path = item
133- timestamp = Path (path ).name
134- try :
135- snapshot = SnapshotModel .objects .get (timestamp = timestamp )
136- except SnapshotModel .DoesNotExist :
137- pass
138- else :
139- snapshot = item
140- path = snapshot .output_dir
141-
142- if snapshot :
143- by_timestamp [snapshot .timestamp ] = by_timestamp .get (snapshot .timestamp , 0 ) + 1
144- if by_timestamp [snapshot .timestamp ] > 1 :
145- duplicate_folders [path ] = snapshot
146-
147- by_url [snapshot .url ] = by_url .get (snapshot .url , 0 ) + 1
148- if by_url [snapshot .url ] > 1 :
149- duplicate_folders [path ] = snapshot
150- return duplicate_folders
151-
152-
153- def get_orphaned_folders (snapshots : QuerySet , out_dir : Path = DATA_DIR ) -> Dict [str , Optional ['Snapshot' ]]:
154- """dirs that contain a valid index but aren't listed in the main index"""
155- orphaned_folders : Dict [str , Optional ['Snapshot' ]] = {}
156-
157- archive_dir = CONSTANTS .ARCHIVE_DIR
158- if not archive_dir .exists ():
159- return orphaned_folders
160-
161- for entry in archive_dir .iterdir ():
162- if entry .is_dir ():
163- index_path = entry / "index.json"
164- if index_path .exists () and not snapshots .filter (timestamp = entry .name ).exists ():
165- orphaned_folders [str (entry )] = None
166- return orphaned_folders
167-
168-
169- def get_corrupted_folders (snapshots : QuerySet , out_dir : Path = DATA_DIR ) -> Dict [str , 'Snapshot' ]:
170- """dirs that exist but have corrupted/invalid index files"""
171- corrupted : Dict [str , 'Snapshot' ] = {}
172- for snapshot in snapshots .iterator (chunk_size = 500 ):
173- if _is_corrupt_snapshot (snapshot ):
174- corrupted [snapshot .output_dir ] = snapshot
175- return corrupted
176-
177-
178- def get_unrecognized_folders (snapshots : QuerySet , out_dir : Path = DATA_DIR ) -> Dict [str , None ]:
179- """dirs that don't contain recognizable archive data and aren't listed in the main index"""
180- unrecognized_folders : Dict [str , None ] = {}
181-
182- archive_dir = Path (out_dir ) / CONSTANTS .ARCHIVE_DIR_NAME
183- if not archive_dir .exists ():
184- return unrecognized_folders
185-
186- for entry in archive_dir .iterdir ():
187- if entry .is_dir ():
188- index_exists = (entry / "index.json" ).exists ()
189-
190- if index_exists :
191- try :
192- with open (entry / "index.json" , 'r' ) as f :
193- json .load (f )
194- except Exception :
195- unrecognized_folders [str (entry )] = None
196- else :
197- timestamp = entry .name
198- if not snapshots .filter (timestamp = timestamp ).exists ():
199- unrecognized_folders [str (entry )] = None
200- return unrecognized_folders
201-
20219
20320@enforce_types
20421def fix_invalid_folder_locations (out_dir : Path = DATA_DIR ) -> Tuple [List [str ], List [str ]]:
@@ -210,11 +27,7 @@ def fix_invalid_folder_locations(out_dir: Path = DATA_DIR) -> Tuple[List[str], L
21027 """
21128 fixed = []
21229 cant_fix = []
213- archive_dir = out_dir / CONSTANTS .ARCHIVE_DIR_NAME
214- if not archive_dir .exists ():
215- return fixed , cant_fix
216-
217- for entry in os .scandir (archive_dir ):
30+ for entry in os .scandir (out_dir / CONSTANTS .ARCHIVE_DIR_NAME ):
21831 if entry .is_dir (follow_symlinks = True ):
21932 index_path = Path (entry .path ) / 'index.json'
22033 if index_path .exists ():
@@ -230,7 +43,7 @@ def fix_invalid_folder_locations(out_dir: Path = DATA_DIR) -> Tuple[List[str], L
23043 continue
23144
23245 if not entry .path .endswith (f'/{ timestamp } ' ):
233- dest = archive_dir / timestamp
46+ dest = out_dir / CONSTANTS . ARCHIVE_DIR_NAME / timestamp
23447 if dest .exists ():
23548 cant_fix .append (entry .path )
23649 else :
0 commit comments