11#!/usr/bin/env python3
2+ """Compare opt-out records between regular and SQS delta folders in S3."""
23
34import argparse
45import struct
56import sys
67import traceback
78from datetime import datetime
89from pathlib import Path
9- from typing import Dict , List , Optional , Set , Tuple
10+ from typing import Any , Dict , List , Optional , Set , Tuple
1011
1112try :
1213 import boto3
2829
2930
3031class OptOutRecord :
32+ """Represents a single opt-out record from a delta file."""
33+
3134 ENTRY_SIZE = IDENTITY_HASH_SIZE + ADVERTISING_ID_SIZE + TIMESTAMP_AND_METADATA_SIZE
3235
3336 def __init__ (self , identity_hash : bytes , advertising_id : bytes , timestamp : int ):
@@ -36,19 +39,20 @@ def __init__(self, identity_hash: bytes, advertising_id: bytes, timestamp: int):
3639 self .timestamp = timestamp
3740
3841 def is_sentinel (self ) -> bool :
42+ """Return True if this record is a sentinel (all zeros or all ones)."""
3943 return (self .identity_hash == b'\x00 ' * IDENTITY_HASH_SIZE or
4044 self .identity_hash == b'\xff ' * IDENTITY_HASH_SIZE )
4145
42- def __hash__ (self ):
46+ def __hash__ (self ) -> int :
4347 return hash ((self .identity_hash , self .advertising_id ))
4448
45- def __eq__ (self , other ) :
49+ def __eq__ (self , other : Any ) -> bool :
4650 if not isinstance (other , OptOutRecord ):
47- return False
51+ return NotImplemented
4852 return (self .identity_hash == other .identity_hash and
4953 self .advertising_id == other .advertising_id )
5054
51- def __repr__ (self ):
55+ def __repr__ (self ) -> str :
5256 hash_hex = self .identity_hash .hex ()[:16 ]
5357 id_hex = self .advertising_id .hex ()[:16 ]
5458 try :
@@ -60,6 +64,7 @@ def __repr__(self):
6064
6165
6266def parse_records_from_file (data : bytes ) -> List [OptOutRecord ]:
67+ """Parse binary data into a list of OptOutRecords, filtering invalid entries."""
6368 records = []
6469 offset = 0
6570 entry_size = OptOutRecord .ENTRY_SIZE
@@ -88,6 +93,7 @@ def parse_records_from_file(data: bytes) -> List[OptOutRecord]:
8893
8994
9095def get_cached_file (bucket : str , key : str ) -> Optional [bytes ]:
96+ """Return cached file contents if available, otherwise None."""
9197 filename = key .split ('/' )[- 1 ]
9298 cache_path = Path (CACHE_DIR ) / bucket / filename
9399 if cache_path .exists ():
@@ -96,6 +102,7 @@ def get_cached_file(bucket: str, key: str) -> Optional[bytes]:
96102
97103
98104def save_to_cache (bucket : str , key : str , data : bytes ) -> None :
105+ """Save file data to local cache directory."""
99106 filename = key .split ('/' )[- 1 ]
100107 cache_path = Path (CACHE_DIR ) / bucket / filename
101108 cache_path .parent .mkdir (parents = True , exist_ok = True )
@@ -120,6 +127,7 @@ def download_from_s3(bucket: str, key: str) -> Tuple[bytes, bool]:
120127
121128
122129def list_dat_files (bucket : str , prefix : str ) -> List [str ]:
130+ """List all .dat files in the given S3 bucket and prefix."""
123131 try :
124132 s3 = boto3 .client ('s3' )
125133 files = []
@@ -141,6 +149,7 @@ def list_dat_files(bucket: str, prefix: str) -> List[str]:
141149def load_records_from_folder (
142150 bucket : str , prefix : str , date_folder : str , quiet : bool = False
143151) -> Tuple [Set [OptOutRecord ], Dict [str , dict ]]:
152+ """Load all records from a single date folder, returning records and file stats."""
144153 full_prefix = f"{ prefix } { date_folder } /"
145154 files = list_dat_files (bucket , full_prefix )
146155
@@ -157,7 +166,8 @@ def load_records_from_folder(
157166 filename = file_key .split ('/' )[- 1 ]
158167 if not quiet :
159168 cache_info = f" ({ cached_count } cached)" if cached_count > 0 else ""
160- print (f"\r { date_folder } : [{ i } /{ len (files )} ] { total_records } records{ cache_info } " , end = '' , flush = True )
169+ progress = f"\r { date_folder } : [{ i } /{ len (files )} ] { total_records } records{ cache_info } "
170+ print (progress , end = '' , flush = True )
161171
162172 try :
163173 data , from_cache = download_from_s3 (bucket , file_key )
@@ -180,14 +190,16 @@ def load_records_from_folder(
180190
181191 if not quiet :
182192 cache_info = f" ({ cached_count } cached)" if cached_count > 0 else ""
183- print (f"\r { date_folder } : { len (files )} files, { total_records } records{ cache_info } " + " " * 20 )
193+ summary = f"\r { date_folder } : { len (files )} files, { total_records } records{ cache_info } "
194+ print (summary + " " * 20 )
184195
185196 return all_records , file_stats
186197
187198
188199def load_records_from_multiple_folders (
189200 bucket : str , prefix : str , date_folders : List [str ], quiet : bool = False
190201) -> Tuple [Set [OptOutRecord ], Dict [str , dict ]]:
202+ """Load and merge records from multiple date folders."""
191203 all_records = set ()
192204 all_stats = {}
193205
@@ -202,6 +214,11 @@ def load_records_from_multiple_folders(
202214def analyze_differences (regular_records : Set [OptOutRecord ],
203215 sqs_records : Set [OptOutRecord ],
204216 show_samples : int = 10 ) -> bool :
217+ """
218+ Compare record sets and print differences.
219+
220+ Returns True if all regular records exist in SQS.
221+ """
205222 print ("\n \n 📊 Analysis Results (unique records)" )
206223 print (f"\n Regular: { len (regular_records ):,} " )
207224 print (f" SQS: { len (sqs_records ):,} " )
@@ -237,6 +254,7 @@ def analyze_differences(regular_records: Set[OptOutRecord],
237254
238255
239256def print_file_stats (regular_stats : Dict [str , dict ], sqs_stats : Dict [str , dict ]) -> None :
257+ """Print summary statistics for regular and SQS delta files."""
240258 print ("\n \n 📈 File Statistics" )
241259
242260 print (f"\n Regular Delta Files: { len (regular_stats )} " )
@@ -257,6 +275,7 @@ def print_file_stats(regular_stats: Dict[str, dict], sqs_stats: Dict[str, dict])
257275
258276
259277def main () -> None :
278+ """Entry point: parse arguments and run the comparison."""
260279 parser = argparse .ArgumentParser (
261280 description = 'Compare opt-out records between regular and SQS delta folders'
262281 )
0 commit comments