Skip to content

Commit 912448f

Browse files
committed
delta validation sqs
1 parent 50977be commit 912448f

File tree

2 files changed

+439
-0
lines changed

2 files changed

+439
-0
lines changed

compare_delta_folders.py

Lines changed: 317 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,317 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Compare opt-out records between regular delta and SQS delta folders for a given date.
4+
5+
This script downloads all delta files from both folders and verifies that all opt-out
6+
records in the regular delta folder are present in the SQS delta folder.
7+
8+
Delta file format: Each entry is 72 bytes (32-byte hash + 32-byte ID + 8-byte timestamp)
9+
10+
Usage:
11+
python3 compare_delta_folders.py --bucket my-bucket --date 2025-11-07
12+
python3 compare_delta_folders.py --bucket my-bucket --date 2025-11-07 \\
13+
--regular-prefix optout-v2/delta --sqs-prefix sqs-delta/delta
14+
"""
15+
16+
import argparse
17+
import struct
18+
import sys
19+
import traceback
20+
from datetime import datetime
21+
from typing import List, Set, Tuple
22+
23+
try:
24+
import boto3
25+
from botocore.exceptions import ClientError
26+
except ImportError:
27+
print("Error: boto3 not installed. Run: pip install boto3")
28+
sys.exit(1)
29+
30+
31+
class OptOutRecord:
32+
"""Represents a single opt-out record (hash + id + timestamp)"""
33+
34+
ENTRY_SIZE = 72 # 32 (identity_hash) + 32 (advertising_id) + 8 (timestamp)
35+
36+
def __init__(self, identity_hash: bytes, advertising_id: bytes, timestamp: int):
37+
self.identity_hash = identity_hash
38+
self.advertising_id = advertising_id
39+
self.timestamp = timestamp
40+
41+
def is_sentinel(self) -> bool:
42+
"""Check if this is a sentinel entry (start or end)"""
43+
return (self.identity_hash == b'\x00' * 32 or
44+
self.identity_hash == b'\xff' * 32)
45+
46+
def __hash__(self):
47+
"""Return hash for set/dict operations"""
48+
return hash((self.identity_hash, self.advertising_id, self.timestamp))
49+
50+
def __eq__(self, other):
51+
"""Compare two OptOutRecord instances for equality"""
52+
if not isinstance(other, OptOutRecord):
53+
return False
54+
return (self.identity_hash == other.identity_hash and
55+
self.advertising_id == other.advertising_id and
56+
self.timestamp == other.timestamp)
57+
58+
def __repr__(self):
59+
"""Return string representation of the opt-out record"""
60+
hash_hex = self.identity_hash.hex()[:16]
61+
id_hex = self.advertising_id.hex()[:16]
62+
try:
63+
dt = datetime.fromtimestamp(self.timestamp)
64+
dt_str = dt.strftime('%Y-%m-%d %H:%M:%S')
65+
except (ValueError, OSError, OverflowError):
66+
dt_str = "INVALID_TS"
67+
return f"OptOutRecord(hash={hash_hex}..., id={id_hex}..., ts={self.timestamp} [{dt_str}])"
68+
69+
70+
def parse_records_from_file(data: bytes) -> List[OptOutRecord]:
71+
"""Parse opt-out records from a delta file, skipping sentinels"""
72+
records = []
73+
offset = 0
74+
entry_size = OptOutRecord.ENTRY_SIZE # 72 bytes: 32 + 32 + 8
75+
76+
while offset + entry_size <= len(data):
77+
identity_hash = data[offset:offset + 32] # 32 bytes
78+
advertising_id = data[offset + 32:offset + 64] # 32 bytes
79+
timestamp = struct.unpack('<Q', data[offset + 64:offset + 72])[0] # 8 bytes
80+
81+
record = OptOutRecord(identity_hash, advertising_id, timestamp)
82+
83+
# Only add data records, skip sentinels
84+
if not record.is_sentinel():
85+
records.append(record)
86+
87+
offset += entry_size
88+
89+
return records
90+
91+
92+
def download_from_s3(bucket: str, key: str) -> bytes:
93+
"""Download file from S3"""
94+
try:
95+
s3 = boto3.client('s3')
96+
response = s3.get_object(Bucket=bucket, Key=key)
97+
return response['Body'].read()
98+
except ClientError as error:
99+
print(f"Error downloading s3://{bucket}/{key}: {error}")
100+
raise
101+
102+
103+
def list_files_in_folder(bucket: str, prefix: str) -> List[str]:
104+
"""List all .dat files in an S3 folder"""
105+
try:
106+
s3 = boto3.client('s3')
107+
files = []
108+
paginator = s3.get_paginator('list_objects_v2')
109+
110+
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
111+
if 'Contents' not in page:
112+
continue
113+
for obj in page['Contents']:
114+
if obj['Key'].endswith('.dat'):
115+
files.append(obj['Key'])
116+
117+
return sorted(files)
118+
except ClientError as error:
119+
print(f"Error listing files in s3://{bucket}/{prefix}: {error}")
120+
raise
121+
122+
123+
def load_records_from_folder(
124+
bucket: str, prefix: str, date_folder: str
125+
) -> Tuple[Set[OptOutRecord], dict]:
126+
"""Load all opt-out records from all files in a folder"""
127+
full_prefix = f"{prefix}{date_folder}/"
128+
129+
print(f"\n📂 Loading files from s3://{bucket}/{full_prefix}")
130+
files = list_files_in_folder(bucket, full_prefix)
131+
132+
if not files:
133+
print(" ⚠️ No .dat files found")
134+
return set(), {}
135+
136+
print(f" Found {len(files)} delta files")
137+
138+
all_records = set()
139+
file_stats = {}
140+
141+
for i, file_key in enumerate(files, 1):
142+
filename = file_key.split('/')[-1]
143+
print(f" [{i}/{len(files)}] Downloading {filename}...", end='', flush=True)
144+
145+
try:
146+
data = download_from_s3(bucket, file_key)
147+
records = parse_records_from_file(data)
148+
149+
all_records.update(records)
150+
total_entries_in_file = len(data) // OptOutRecord.ENTRY_SIZE
151+
file_stats[filename] = {
152+
'size': len(data),
153+
'entries': len(records),
154+
'total_entries': total_entries_in_file, # Includes sentinels
155+
'file_key': file_key
156+
}
157+
158+
print(f" {len(records)} records")
159+
except (ClientError, struct.error, ValueError) as error:
160+
print(f" ERROR: {error}")
161+
continue
162+
163+
return all_records, file_stats
164+
165+
166+
def analyze_differences(regular_records: Set[OptOutRecord],
167+
sqs_records: Set[OptOutRecord],
168+
show_samples: int = 10) -> bool:
169+
"""Analyze and report differences between record sets"""
170+
171+
print("\n📊 Analysis Results")
172+
print(f" Regular delta records: {len(regular_records):,}")
173+
print(f" SQS delta records: {len(sqs_records):,}")
174+
175+
# Records in regular but not in SQS (MISSING from SQS)
176+
missing_in_sqs = regular_records - sqs_records
177+
178+
# Records in SQS but not in regular (EXTRA in SQS)
179+
extra_in_sqs = sqs_records - regular_records
180+
181+
# Common records
182+
common = regular_records & sqs_records
183+
184+
print(f" Common records: {len(common):,}")
185+
print(f" Missing from SQS: {len(missing_in_sqs):,}")
186+
print(f" Extra in SQS: {len(extra_in_sqs):,}")
187+
188+
all_good = True
189+
190+
if missing_in_sqs:
191+
print(f"\n❌ MISSING: {len(missing_in_sqs)} records in regular delta are NOT in SQS delta")
192+
print(f" Sample of missing records (first {min(show_samples, len(missing_in_sqs))}):")
193+
for i, record in enumerate(list(missing_in_sqs)[:show_samples], 1):
194+
print(f" {i}. {record}")
195+
if len(missing_in_sqs) > show_samples:
196+
print(f" ... and {len(missing_in_sqs) - show_samples} more")
197+
all_good = False
198+
else:
199+
print("\n✅ All records from regular delta are present in SQS delta")
200+
201+
if extra_in_sqs:
202+
print(f"\n⚠️ EXTRA: {len(extra_in_sqs)} records in SQS delta are NOT in regular delta")
203+
print(" (This might be okay if SQS captured additional opt-outs)")
204+
print(f" Sample of extra records (first {min(show_samples, len(extra_in_sqs))}):")
205+
for i, record in enumerate(list(extra_in_sqs)[:show_samples], 1):
206+
print(f" {i}. {record}")
207+
if len(extra_in_sqs) > show_samples:
208+
print(f" ... and {len(extra_in_sqs) - show_samples} more")
209+
210+
return all_good
211+
212+
213+
def print_file_stats(regular_stats: dict, sqs_stats: dict) -> None:
214+
"""Print file statistics for both folders"""
215+
print("\n📈 File Statistics")
216+
217+
print(f"\n Regular Delta Files: {len(regular_stats)}")
218+
if regular_stats:
219+
total_size = sum(s['size'] for s in regular_stats.values())
220+
total_entries = sum(s['entries'] for s in regular_stats.values())
221+
print(f" Total size: {total_size:,} bytes")
222+
print(f" Total entries: {total_entries:,}")
223+
print(f" Avg entries/file: {total_entries / len(regular_stats):.1f}")
224+
225+
print(f"\n SQS Delta Files: {len(sqs_stats)}")
226+
if sqs_stats:
227+
total_size = sum(s['size'] for s in sqs_stats.values())
228+
total_entries = sum(s['entries'] for s in sqs_stats.values())
229+
print(f" Total size: {total_size:,} bytes")
230+
print(f" Total entries: {total_entries:,}")
231+
print(f" Avg entries/file: {total_entries / len(sqs_stats):.1f}")
232+
233+
234+
def main() -> None:
235+
"""Main entry point for comparing opt-out delta folders."""
236+
parser = argparse.ArgumentParser(
237+
description='Compare opt-out records between regular and SQS delta folders',
238+
formatter_class=argparse.RawDescriptionHelpFormatter,
239+
epilog="""
240+
Examples:
241+
# Compare folders for a specific date
242+
python3 compare_delta_folders.py --bucket my-bucket --date 2025-11-07
243+
244+
# Use custom prefixes
245+
python3 compare_delta_folders.py --bucket my-bucket --date 2025-11-07 \\
246+
--regular-prefix optout-v2/delta --sqs-prefix sqs-delta/delta
247+
"""
248+
)
249+
250+
parser.add_argument('--bucket', required=True,
251+
help='S3 bucket name')
252+
parser.add_argument('--date', required=True,
253+
help='Date folder to compare (e.g., 2025-11-07)')
254+
parser.add_argument('--regular-prefix', default='optout/delta/',
255+
help='S3 prefix for regular delta files (default: optout/delta/)')
256+
parser.add_argument('--sqs-prefix', default='sqs-delta/delta/',
257+
help='S3 prefix for SQS delta files (default: sqs-delta/delta/)')
258+
parser.add_argument('--show-samples', type=int, default=10,
259+
help='Number of sample records to show for differences (default: 10)')
260+
261+
args = parser.parse_args()
262+
263+
print("=" * 80)
264+
print(f"🔍 Comparing Opt-Out Delta Files for {args.date}")
265+
print("=" * 80)
266+
print(f"Bucket: {args.bucket}")
267+
print(f"Regular prefix: {args.regular_prefix}")
268+
print(f"SQS prefix: {args.sqs_prefix}")
269+
270+
try:
271+
# Load all records from both folders
272+
regular_records, regular_stats = load_records_from_folder(
273+
args.bucket, args.regular_prefix, args.date
274+
)
275+
276+
sqs_records, sqs_stats = load_records_from_folder(
277+
args.bucket, args.sqs_prefix, args.date
278+
)
279+
280+
if not regular_records and not sqs_records:
281+
print("\n❌ No records found in either folder")
282+
sys.exit(1)
283+
284+
if not regular_records:
285+
print("\n⚠️ No records in regular delta folder")
286+
287+
if not sqs_records:
288+
print("\n⚠️ No records in SQS delta folder")
289+
290+
# Print file statistics
291+
print_file_stats(regular_stats, sqs_stats)
292+
293+
# Analyze differences
294+
all_good = analyze_differences(regular_records, sqs_records, args.show_samples)
295+
296+
print("\n" + "=" * 80)
297+
if all_good:
298+
print("✅ SUCCESS: All regular delta records are present in SQS delta")
299+
print("=" * 80)
300+
sys.exit(0)
301+
else:
302+
print("❌ FAILURE: Some regular delta records are missing from SQS delta")
303+
print("=" * 80)
304+
sys.exit(1)
305+
306+
except (ClientError, ValueError, OSError) as error:
307+
print(f"\n❌ Error: {error}")
308+
traceback.print_exc()
309+
sys.exit(1)
310+
except Exception as error: # pylint: disable=broad-except
311+
print(f"\n❌ Unexpected error: {error}")
312+
traceback.print_exc()
313+
sys.exit(1)
314+
315+
316+
if __name__ == '__main__':
317+
main()

0 commit comments

Comments
 (0)