Skip to content

Commit 43f324f

Browse files
authored
Merge pull request #1304 from NASA-IMPACT/1232-process-the-full-text-dump
1232 process the full text dump
2 parents e518189 + 14b1fea commit 43f324f

File tree

4 files changed

+453
-0
lines changed

4 files changed

+453
-0
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,13 @@ For each PR made, an entry should be added to this changelog. It should contain
1212
- etc.
1313

1414
## Changelog
15+
### 3.1.??
16+
- 1232-process-the-full-text-dump
17+
- Description: A script was added `/scripts/sde_dump_processing/clean_text_dump.py` which cleans dumps from sinequa. The sinequa dump does not respect normal csv new line formatting, so that a dump of 1.8 million records becomes a csv of 900 million lines. This script can detect the headers and process the dump with the three possible sources TDAMM, SDE, and scripts, in order to create a final, clean csv. It has a simple CLI which allows setting the input and output, the verbosity of the logs, etc. Because the input files can be very large, the script streams them instead of holding them in memory.
18+
- Changes:
19+
- add file /scripts/sde_dump_processing/clean_text_dump.py`
1520

21+
### 3.1.0
1622
- 1209-bug-fix-document-type-creator-form
1723
- Description: The dropdown on the pattern creation form needs to be set as multi as the default option since this is why the doc type creator form is used for the majority of multi-URL pattern creations. This should be applied to doc types, division types, and titles as well.
1824
- Changes:
Lines changed: 357 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,357 @@
1+
"""
2+
to run
3+
python clean_sde_dump.py inputs/sde_init_dump_04_28_2025.csv outputs/input.csv -v
4+
5+
to compress
6+
7z a -t7z -m0=lzma2 -mx=9 -mmt=on -md=512m outputs/output.7z outputs/input.csv
7+
"""
8+
9+
import argparse
10+
import csv
11+
import logging
12+
import os
13+
import sys
14+
import time
15+
from datetime import timedelta
16+
17+
# Set up logging
18+
# Define a custom formatter to include milliseconds
19+
log_formatter = logging.Formatter("%(asctime)s.%(msecs)03d - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
20+
# Get the root logger
21+
logger = logging.getLogger()
22+
logger.setLevel(logging.INFO) # Default level
23+
# Create a handler and set the formatter
24+
stream_handler = logging.StreamHandler()
25+
stream_handler.setFormatter(log_formatter)
26+
# Add the handler to the logger
27+
# Check if handlers already exist to avoid duplicates in interactive environments
28+
if not logger.hasHandlers():
29+
logger.addHandler(stream_handler)
30+
31+
32+
def assemble_records_generator(input_filename, skip_header=False, progress_interval=100000):
33+
"""
34+
Reads the input file line by line and yields assembled multi-line records.
35+
Optionally skips the first line if it's a header.
36+
"""
37+
current_record = ""
38+
record_started = False
39+
lines_processed = 0
40+
start_time = time.time()
41+
last_report_time = start_time
42+
record_count = 0 # Keep track of records yielded
43+
44+
logger.info(f"Starting to process lines from {input_filename}")
45+
46+
try:
47+
with open(input_filename, encoding="utf-8") as infile:
48+
# Skip the header line if requested
49+
if skip_header:
50+
try:
51+
first_line = next(infile)
52+
lines_processed += 1
53+
logger.info(f"Skipped header line: {first_line.rstrip()}")
54+
except StopIteration:
55+
logger.warning("Input file appears to be empty or only contains a header.")
56+
return # Stop if file is empty after header
57+
58+
# Process remaining lines
59+
while True:
60+
try:
61+
line = next(infile)
62+
lines_processed += 1
63+
64+
# Report progress periodically based on lines read
65+
if lines_processed % progress_interval == 0:
66+
current_time = time.time()
67+
elapsed = current_time - start_time
68+
rate = lines_processed / elapsed if elapsed > 0 else 0
69+
70+
# Only log if at least 5 seconds have passed since the last report
71+
if current_time - last_report_time >= 5:
72+
logger.info(f"Read {lines_processed:,} lines - " f"Rate: {rate:.0f} lines/sec")
73+
last_report_time = current_time
74+
75+
line = line.rstrip("\n")
76+
77+
# Check if this line is the start of a new record
78+
is_record_start = (
79+
line.startswith("/SDE/") or line.startswith("/SDE-TDAMM/") or line.startswith("/scrapers/")
80+
)
81+
82+
# Skip lines until the first record is found
83+
# (This check might be less relevant if the first data line always starts a record)
84+
if not record_started and not is_record_start:
85+
# If we skipped the header, the first line *should* be a record start
86+
# Log a warning if it's not, but continue processing anyway
87+
if skip_header and lines_processed == 2: # Only warn on the first actual data line
88+
logger.warning(
89+
f"First data line does not start with a recognized record prefix: {line[:100]}..."
90+
)
91+
# continue -> We should actually process this line if it's part of the first record
92+
93+
if is_record_start:
94+
# If we've already been building a record, yield it
95+
if current_record:
96+
yield current_record
97+
record_count += 1
98+
99+
# Start a new record
100+
current_record = line
101+
record_started = True
102+
elif record_started: # Only append if a record has started
103+
# Append to the current record
104+
current_record += "\n" + line
105+
# Handle case where a non-starting line is encountered before the first record start
106+
# This might happen if skip_header=False and there's preamble before the first /SDE/
107+
elif not record_started and not is_record_start:
108+
continue # Explicitly skip lines before the first record start marker
109+
110+
except StopIteration: # End of file
111+
break
112+
except UnicodeDecodeError as e:
113+
logger.warning(f"Skipping line {lines_processed} due to UnicodeDecodeError: {e}")
114+
continue # Skip the problematic line
115+
116+
# Yield the last record if there is one
117+
if current_record:
118+
yield current_record
119+
record_count += 1
120+
121+
elapsed = time.time() - start_time
122+
logger.info(
123+
f"Finished reading {lines_processed:,} lines and found {record_count:,} raw records in {elapsed:.2f} seconds" # noqa
124+
)
125+
126+
except FileNotFoundError:
127+
logger.error(f"Input file not found: {input_filename}")
128+
raise
129+
130+
131+
def process_records_generator(raw_records_iterator, delimiter="༜", expected_fields=None, batch_size=50000):
132+
"""
133+
Processes records from an iterator, yielding valid processed records.
134+
Requires expected_fields to be specified.
135+
"""
136+
if expected_fields is None:
137+
raise ValueError("process_records_generator requires 'expected_fields' to be specified.")
138+
139+
processed_count = 0
140+
invalid_records = 0
141+
start_time = time.time()
142+
last_report_time = start_time
143+
input_record_index = 0 # Track input records processed for logging
144+
145+
logger.info(f"Starting record processing (expecting {expected_fields} fields)...")
146+
147+
for i, record in enumerate(raw_records_iterator):
148+
input_record_index = i + 1
149+
# Split into AT MOST expected_fields parts. The last part will contain the rest.
150+
parts = record.split(delimiter, expected_fields - 1)
151+
152+
if len(parts) == expected_fields:
153+
# Replace literal newlines with '\\n' string in the text field
154+
# The text field is the last one, index expected_fields - 1
155+
text_field_index = expected_fields - 1
156+
if text_field_index < len(parts): # Ensure index exists
157+
parts[text_field_index] = parts[text_field_index].replace("\n", "\\n")
158+
else:
159+
# This case should technically not happen if len(parts) == expected_fields
160+
# but adding a safeguard.
161+
logger.warning(
162+
f"Record ~{input_record_index}: Field index {text_field_index} out of bounds unexpectedly."
163+
)
164+
165+
processed_count += 1
166+
yield parts # Yield the valid processed record
167+
else:
168+
invalid_records += 1
169+
if invalid_records <= 10: # Log only the first 10 invalid records
170+
logger.warning(
171+
f"Record ~{input_record_index}: Expected {expected_fields} fields, got {len(parts)}. Skipping."
172+
)
173+
# Optionally log the problematic record content (careful with large fields)
174+
# logger.debug(f"Problematic record content (first 100 chars): {record[:100]}")
175+
# Do not yield invalid records
176+
177+
# Report progress periodically based on input records processed
178+
if input_record_index % batch_size == 0:
179+
current_time = time.time()
180+
if current_time - last_report_time >= 5:
181+
elapsed = current_time - start_time
182+
rate = input_record_index / elapsed if elapsed > 0 else 0
183+
logger.info(
184+
f"Processed {input_record_index:,} raw records ({processed_count:,} valid) - "
185+
f"Rate: {rate:.0f} records/sec"
186+
)
187+
last_report_time = current_time
188+
189+
if invalid_records > 10:
190+
logger.warning(f"Additional {invalid_records - 10} invalid records were skipped (not shown in logs)")
191+
192+
logger.info(
193+
f"Finished processing {input_record_index:,} raw records. Found {processed_count:,} valid records and {invalid_records:,} invalid records." # noqa
194+
)
195+
196+
197+
def write_output_file(output_filename, processed_records_iterator, header=None, batch_size=100000):
198+
"""Write the processed records from an iterator to the output CSV file."""
199+
# Create the directory if it doesn't exist
200+
output_dir = os.path.dirname(output_filename)
201+
# Check if output_dir is not empty before creating
202+
if output_dir and not os.path.exists(output_dir):
203+
logger.info(f"Creating output directory: {output_dir}")
204+
os.makedirs(output_dir, exist_ok=True)
205+
206+
records_written = 0
207+
start_time = time.time()
208+
last_report_time = start_time
209+
210+
logger.info(f"Starting to write records to {output_filename}")
211+
212+
try:
213+
with open(output_filename, "w", encoding="utf-8", newline="") as outfile:
214+
writer = csv.writer(outfile)
215+
216+
# Write header if provided
217+
if header:
218+
logger.info(f"Writing header: {header}")
219+
writer.writerow(header)
220+
else:
221+
logger.warning("No header provided for output file.")
222+
223+
# Write records as they come from the iterator
224+
for i, record in enumerate(processed_records_iterator):
225+
writer.writerow(record)
226+
records_written += 1
227+
228+
# Report progress periodically
229+
if records_written % batch_size == 0:
230+
current_time = time.time()
231+
if current_time - last_report_time >= 5:
232+
elapsed = current_time - start_time
233+
rate = records_written / elapsed if elapsed > 0 else 0
234+
logger.info(f"Wrote {records_written:,} records - " f"Rate: {rate:.0f} records/sec")
235+
last_report_time = current_time
236+
237+
total_time = time.time() - start_time
238+
logger.info(f"Finished writing {records_written:,} records in {total_time:.2f} seconds")
239+
240+
except OSError as e:
241+
logger.error(f"Error writing to output file {output_filename}: {e}")
242+
raise
243+
244+
return records_written
245+
246+
247+
def process_large_csv(input_filename, output_filename, delimiter="༜", verbose=False):
248+
"""
249+
Process a large CSV file iteratively using generators.
250+
Determines header and field count from the first line of the input file.
251+
"""
252+
# Adjust logging level based on verbosity - should be set in main()
253+
if verbose:
254+
logger.setLevel(logging.INFO)
255+
256+
logger.info(f"Processing {input_filename} to {output_filename} with delimiter '{delimiter}'")
257+
total_start_time = time.time()
258+
259+
header = None
260+
expected_fields = 0
261+
262+
# --- Read header and determine expected fields ---
263+
try:
264+
with open(input_filename, encoding="utf-8") as infile:
265+
first_line = next(infile).rstrip("\n")
266+
header = first_line.split(delimiter)
267+
expected_fields = len(header)
268+
logger.info(f"Detected header: {header}")
269+
logger.info(f"Expecting {expected_fields} fields based on header.")
270+
if expected_fields == 0:
271+
logger.error("Header line is empty or could not be split. Cannot proceed.")
272+
return -1
273+
except FileNotFoundError:
274+
logger.error(f"Input file not found: {input_filename}")
275+
return -1 # Indicate file not found error
276+
except StopIteration:
277+
logger.error(f"Input file is empty: {input_filename}")
278+
return -1 # Indicate empty file error
279+
except Exception as e:
280+
logger.error(f"Error reading header from {input_filename}: {e}", exc_info=True)
281+
return -1 # Indicate general error during header read
282+
# --- End header reading ---
283+
284+
try:
285+
# Create iterators/generators
286+
# Pass skip_header=True to avoid processing the header line again
287+
raw_records_iter = assemble_records_generator(input_filename, skip_header=True)
288+
# Pass the dynamically determined expected_fields
289+
processed_records_iter = process_records_generator(raw_records_iter, delimiter, expected_fields)
290+
291+
# Write to output file by consuming the processed records iterator, using the detected header
292+
records_written = write_output_file(output_filename, processed_records_iter, header)
293+
294+
total_elapsed = time.time() - total_start_time
295+
rate = records_written / total_elapsed if total_elapsed > 0 else 0
296+
logger.info(
297+
f"Complete! Processed {records_written:,} valid records in {timedelta(seconds=int(total_elapsed))} "
298+
f"(Average rate: {rate:.1f} records/sec)"
299+
)
300+
return records_written
301+
302+
except FileNotFoundError:
303+
# Error already logged by assemble_records_generator if it happens later
304+
# This is unlikely if header reading succeeded, but kept for safety
305+
logger.error(f"Input file disappeared after reading header: {input_filename}")
306+
return -1
307+
except Exception as e:
308+
logger.error(f"An unexpected error occurred during processing: {e}", exc_info=True)
309+
return -1 # Indicate general error
310+
311+
312+
def main():
313+
"""Main entry point for the script."""
314+
# Set up command line arguments
315+
parser = argparse.ArgumentParser(
316+
description="Process large CSV files with special formatting efficiently, detecting header automatically."
317+
)
318+
parser.add_argument("input", help="Input file path")
319+
parser.add_argument("output", help="Output file path")
320+
parser.add_argument("--delimiter", default="༜", help="Field delimiter character (default: '༜')")
321+
# Removed --fields argument
322+
parser.add_argument("-v", "--verbose", action="store_true", help="Show detailed progress information (INFO level)")
323+
parser.add_argument("-q", "--quiet", action="store_true", help="Suppress all output except errors (ERROR level)")
324+
325+
args = parser.parse_args()
326+
327+
# Adjust logging level based on args BEFORE calling processing function
328+
if args.quiet:
329+
logger.setLevel(logging.ERROR)
330+
elif args.verbose:
331+
logger.setLevel(logging.INFO)
332+
else:
333+
# Default level if neither -v nor -q is specified
334+
logger.setLevel(logging.WARNING)
335+
336+
# Process the CSV file using the arguments passed
337+
# No longer passing expected_fields
338+
records_processed = process_large_csv(
339+
args.input, args.output, delimiter=args.delimiter, verbose=args.verbose # Pass verbose flag
340+
)
341+
342+
if records_processed >= 0 and not args.quiet:
343+
# Use standard print for final user confirmation message
344+
print(f"Successfully processed {records_processed:,} records.")
345+
return 0
346+
elif records_processed < 0:
347+
# Error messages already logged
348+
print("Processing failed. Check logs for details.", file=sys.stderr)
349+
return 1
350+
else: # records_processed == 0, potentially valid but maybe unexpected
351+
if not args.quiet:
352+
print("Processing finished, but 0 valid records were written.")
353+
return 0
354+
355+
356+
if __name__ == "__main__":
357+
sys.exit(main())

0 commit comments

Comments
 (0)