diff --git a/CHANGELOG.md b/CHANGELOG.md index 84efc532..1c0d7ca1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,13 @@ For each PR made, an entry should be added to this changelog. It should contain - etc. ## Changelog +### 3.1.?? +- 1232-process-the-full-text-dump + - Description: A script was added `/scripts/sde_dump_processing/clean_text_dump.py` which cleans dumps from sinequa. The sinequa dump does not respect normal csv new line formatting, so that a dump of 1.8 million records becomes a csv of 900 million lines. This script can detect the headers and process the dump with the three possible sources TDAMM, SDE, and scripts, in order to create a final, clean csv. It has a simple CLI which allows setting the input and output, the verbosity of the logs, etc. Because the input files can be very large, the script streams them instead of holding them in memory. + - Changes: + - add file /scripts/sde_dump_processing/clean_text_dump.py` +### 3.1.0 - 1209-bug-fix-document-type-creator-form - Description: The dropdown on the pattern creation form needs to be set as multi as the default option since this is why the doc type creator form is used for the majority of multi-URL pattern creations. This should be applied to doc types, division types, and titles as well. - Changes: diff --git a/scripts/sde_dump_processing/clean_sde_dump.py b/scripts/sde_dump_processing/clean_sde_dump.py new file mode 100644 index 00000000..11ac191c --- /dev/null +++ b/scripts/sde_dump_processing/clean_sde_dump.py @@ -0,0 +1,357 @@ +""" +to run +python clean_sde_dump.py inputs/sde_init_dump_04_28_2025.csv outputs/input.csv -v + +to compress +7z a -t7z -m0=lzma2 -mx=9 -mmt=on -md=512m outputs/output.7z outputs/input.csv +""" + +import argparse +import csv +import logging +import os +import sys +import time +from datetime import timedelta + +# Set up logging +# Define a custom formatter to include milliseconds +log_formatter = logging.Formatter("%(asctime)s.%(msecs)03d - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S") +# Get the root logger +logger = logging.getLogger() +logger.setLevel(logging.INFO) # Default level +# Create a handler and set the formatter +stream_handler = logging.StreamHandler() +stream_handler.setFormatter(log_formatter) +# Add the handler to the logger +# Check if handlers already exist to avoid duplicates in interactive environments +if not logger.hasHandlers(): + logger.addHandler(stream_handler) + + +def assemble_records_generator(input_filename, skip_header=False, progress_interval=100000): + """ + Reads the input file line by line and yields assembled multi-line records. + Optionally skips the first line if it's a header. + """ + current_record = "" + record_started = False + lines_processed = 0 + start_time = time.time() + last_report_time = start_time + record_count = 0 # Keep track of records yielded + + logger.info(f"Starting to process lines from {input_filename}") + + try: + with open(input_filename, encoding="utf-8") as infile: + # Skip the header line if requested + if skip_header: + try: + first_line = next(infile) + lines_processed += 1 + logger.info(f"Skipped header line: {first_line.rstrip()}") + except StopIteration: + logger.warning("Input file appears to be empty or only contains a header.") + return # Stop if file is empty after header + + # Process remaining lines + while True: + try: + line = next(infile) + lines_processed += 1 + + # Report progress periodically based on lines read + if lines_processed % progress_interval == 0: + current_time = time.time() + elapsed = current_time - start_time + rate = lines_processed / elapsed if elapsed > 0 else 0 + + # Only log if at least 5 seconds have passed since the last report + if current_time - last_report_time >= 5: + logger.info(f"Read {lines_processed:,} lines - " f"Rate: {rate:.0f} lines/sec") + last_report_time = current_time + + line = line.rstrip("\n") + + # Check if this line is the start of a new record + is_record_start = ( + line.startswith("/SDE/") or line.startswith("/SDE-TDAMM/") or line.startswith("/scrapers/") + ) + + # Skip lines until the first record is found + # (This check might be less relevant if the first data line always starts a record) + if not record_started and not is_record_start: + # If we skipped the header, the first line *should* be a record start + # Log a warning if it's not, but continue processing anyway + if skip_header and lines_processed == 2: # Only warn on the first actual data line + logger.warning( + f"First data line does not start with a recognized record prefix: {line[:100]}..." + ) + # continue -> We should actually process this line if it's part of the first record + + if is_record_start: + # If we've already been building a record, yield it + if current_record: + yield current_record + record_count += 1 + + # Start a new record + current_record = line + record_started = True + elif record_started: # Only append if a record has started + # Append to the current record + current_record += "\n" + line + # Handle case where a non-starting line is encountered before the first record start + # This might happen if skip_header=False and there's preamble before the first /SDE/ + elif not record_started and not is_record_start: + continue # Explicitly skip lines before the first record start marker + + except StopIteration: # End of file + break + except UnicodeDecodeError as e: + logger.warning(f"Skipping line {lines_processed} due to UnicodeDecodeError: {e}") + continue # Skip the problematic line + + # Yield the last record if there is one + if current_record: + yield current_record + record_count += 1 + + elapsed = time.time() - start_time + logger.info( + f"Finished reading {lines_processed:,} lines and found {record_count:,} raw records in {elapsed:.2f} seconds" # noqa + ) + + except FileNotFoundError: + logger.error(f"Input file not found: {input_filename}") + raise + + +def process_records_generator(raw_records_iterator, delimiter="༜", expected_fields=None, batch_size=50000): + """ + Processes records from an iterator, yielding valid processed records. + Requires expected_fields to be specified. + """ + if expected_fields is None: + raise ValueError("process_records_generator requires 'expected_fields' to be specified.") + + processed_count = 0 + invalid_records = 0 + start_time = time.time() + last_report_time = start_time + input_record_index = 0 # Track input records processed for logging + + logger.info(f"Starting record processing (expecting {expected_fields} fields)...") + + for i, record in enumerate(raw_records_iterator): + input_record_index = i + 1 + # Split into AT MOST expected_fields parts. The last part will contain the rest. + parts = record.split(delimiter, expected_fields - 1) + + if len(parts) == expected_fields: + # Replace literal newlines with '\\n' string in the text field + # The text field is the last one, index expected_fields - 1 + text_field_index = expected_fields - 1 + if text_field_index < len(parts): # Ensure index exists + parts[text_field_index] = parts[text_field_index].replace("\n", "\\n") + else: + # This case should technically not happen if len(parts) == expected_fields + # but adding a safeguard. + logger.warning( + f"Record ~{input_record_index}: Field index {text_field_index} out of bounds unexpectedly." + ) + + processed_count += 1 + yield parts # Yield the valid processed record + else: + invalid_records += 1 + if invalid_records <= 10: # Log only the first 10 invalid records + logger.warning( + f"Record ~{input_record_index}: Expected {expected_fields} fields, got {len(parts)}. Skipping." + ) + # Optionally log the problematic record content (careful with large fields) + # logger.debug(f"Problematic record content (first 100 chars): {record[:100]}") + # Do not yield invalid records + + # Report progress periodically based on input records processed + if input_record_index % batch_size == 0: + current_time = time.time() + if current_time - last_report_time >= 5: + elapsed = current_time - start_time + rate = input_record_index / elapsed if elapsed > 0 else 0 + logger.info( + f"Processed {input_record_index:,} raw records ({processed_count:,} valid) - " + f"Rate: {rate:.0f} records/sec" + ) + last_report_time = current_time + + if invalid_records > 10: + logger.warning(f"Additional {invalid_records - 10} invalid records were skipped (not shown in logs)") + + logger.info( + f"Finished processing {input_record_index:,} raw records. Found {processed_count:,} valid records and {invalid_records:,} invalid records." # noqa + ) + + +def write_output_file(output_filename, processed_records_iterator, header=None, batch_size=100000): + """Write the processed records from an iterator to the output CSV file.""" + # Create the directory if it doesn't exist + output_dir = os.path.dirname(output_filename) + # Check if output_dir is not empty before creating + if output_dir and not os.path.exists(output_dir): + logger.info(f"Creating output directory: {output_dir}") + os.makedirs(output_dir, exist_ok=True) + + records_written = 0 + start_time = time.time() + last_report_time = start_time + + logger.info(f"Starting to write records to {output_filename}") + + try: + with open(output_filename, "w", encoding="utf-8", newline="") as outfile: + writer = csv.writer(outfile) + + # Write header if provided + if header: + logger.info(f"Writing header: {header}") + writer.writerow(header) + else: + logger.warning("No header provided for output file.") + + # Write records as they come from the iterator + for i, record in enumerate(processed_records_iterator): + writer.writerow(record) + records_written += 1 + + # Report progress periodically + if records_written % batch_size == 0: + current_time = time.time() + if current_time - last_report_time >= 5: + elapsed = current_time - start_time + rate = records_written / elapsed if elapsed > 0 else 0 + logger.info(f"Wrote {records_written:,} records - " f"Rate: {rate:.0f} records/sec") + last_report_time = current_time + + total_time = time.time() - start_time + logger.info(f"Finished writing {records_written:,} records in {total_time:.2f} seconds") + + except OSError as e: + logger.error(f"Error writing to output file {output_filename}: {e}") + raise + + return records_written + + +def process_large_csv(input_filename, output_filename, delimiter="༜", verbose=False): + """ + Process a large CSV file iteratively using generators. + Determines header and field count from the first line of the input file. + """ + # Adjust logging level based on verbosity - should be set in main() + if verbose: + logger.setLevel(logging.INFO) + + logger.info(f"Processing {input_filename} to {output_filename} with delimiter '{delimiter}'") + total_start_time = time.time() + + header = None + expected_fields = 0 + + # --- Read header and determine expected fields --- + try: + with open(input_filename, encoding="utf-8") as infile: + first_line = next(infile).rstrip("\n") + header = first_line.split(delimiter) + expected_fields = len(header) + logger.info(f"Detected header: {header}") + logger.info(f"Expecting {expected_fields} fields based on header.") + if expected_fields == 0: + logger.error("Header line is empty or could not be split. Cannot proceed.") + return -1 + except FileNotFoundError: + logger.error(f"Input file not found: {input_filename}") + return -1 # Indicate file not found error + except StopIteration: + logger.error(f"Input file is empty: {input_filename}") + return -1 # Indicate empty file error + except Exception as e: + logger.error(f"Error reading header from {input_filename}: {e}", exc_info=True) + return -1 # Indicate general error during header read + # --- End header reading --- + + try: + # Create iterators/generators + # Pass skip_header=True to avoid processing the header line again + raw_records_iter = assemble_records_generator(input_filename, skip_header=True) + # Pass the dynamically determined expected_fields + processed_records_iter = process_records_generator(raw_records_iter, delimiter, expected_fields) + + # Write to output file by consuming the processed records iterator, using the detected header + records_written = write_output_file(output_filename, processed_records_iter, header) + + total_elapsed = time.time() - total_start_time + rate = records_written / total_elapsed if total_elapsed > 0 else 0 + logger.info( + f"Complete! Processed {records_written:,} valid records in {timedelta(seconds=int(total_elapsed))} " + f"(Average rate: {rate:.1f} records/sec)" + ) + return records_written + + except FileNotFoundError: + # Error already logged by assemble_records_generator if it happens later + # This is unlikely if header reading succeeded, but kept for safety + logger.error(f"Input file disappeared after reading header: {input_filename}") + return -1 + except Exception as e: + logger.error(f"An unexpected error occurred during processing: {e}", exc_info=True) + return -1 # Indicate general error + + +def main(): + """Main entry point for the script.""" + # Set up command line arguments + parser = argparse.ArgumentParser( + description="Process large CSV files with special formatting efficiently, detecting header automatically." + ) + parser.add_argument("input", help="Input file path") + parser.add_argument("output", help="Output file path") + parser.add_argument("--delimiter", default="༜", help="Field delimiter character (default: '༜')") + # Removed --fields argument + parser.add_argument("-v", "--verbose", action="store_true", help="Show detailed progress information (INFO level)") + parser.add_argument("-q", "--quiet", action="store_true", help="Suppress all output except errors (ERROR level)") + + args = parser.parse_args() + + # Adjust logging level based on args BEFORE calling processing function + if args.quiet: + logger.setLevel(logging.ERROR) + elif args.verbose: + logger.setLevel(logging.INFO) + else: + # Default level if neither -v nor -q is specified + logger.setLevel(logging.WARNING) + + # Process the CSV file using the arguments passed + # No longer passing expected_fields + records_processed = process_large_csv( + args.input, args.output, delimiter=args.delimiter, verbose=args.verbose # Pass verbose flag + ) + + if records_processed >= 0 and not args.quiet: + # Use standard print for final user confirmation message + print(f"Successfully processed {records_processed:,} records.") + return 0 + elif records_processed < 0: + # Error messages already logged + print("Processing failed. Check logs for details.", file=sys.stderr) + return 1 + else: # records_processed == 0, potentially valid but maybe unexpected + if not args.quiet: + print("Processing finished, but 0 valid records were written.") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/sde_dump_processing/validate_csv_structure.py b/scripts/sde_dump_processing/validate_csv_structure.py new file mode 100644 index 00000000..97180742 --- /dev/null +++ b/scripts/sde_dump_processing/validate_csv_structure.py @@ -0,0 +1,53 @@ +import csv +import sys + + +def analyze_output_csv(filename): + # Increase CSV field size limit + csv.field_size_limit(sys.maxsize) + + total_rows = 0 + correct_rows = 0 + incorrect_rows = 0 + + with open(filename, encoding="utf-8") as f: + reader = csv.reader(f) + + # Attempt to read a header row + header = next(reader, None) + if header: + print("Header row:", header) + + for row_index, row in enumerate(reader, start=2): + total_rows += 1 + + # Check for exactly 8 columns + if len(row) != 8: + incorrect_rows += 1 + print(f"[WARNING] Row {row_index} has {len(row)} columns (expected 8). Row data") + continue + + # Optional: ensure last column is strictly 'true' or 'false' + if row[7] not in ("true", "false"): + incorrect_rows += 1 + print(f"[WARNING] Row {row_index} last column not 'true' or 'false': {row[0]}, {row[7][:100]}") + continue + + correct_rows += 1 + + print("\n=== ANALYSIS COMPLETE ===") + print(f"Total data rows (excluding header): {total_rows}") + print(f"Correctly formatted rows: {correct_rows}") + print(f"Incorrectly formatted rows: {incorrect_rows}") + + +def main(): + import sys + + filename = sys.argv[1] if len(sys.argv) > 1 else "outputs/cleaned_dump_delimeter.csv" + print(f"Analyzing: {filename}") + analyze_output_csv(filename) + + +if __name__ == "__main__": + main() diff --git a/scripts/sde_dump_processing/view_one_row.py b/scripts/sde_dump_processing/view_one_row.py new file mode 100644 index 00000000..2ee98479 --- /dev/null +++ b/scripts/sde_dump_processing/view_one_row.py @@ -0,0 +1,37 @@ +import csv +import sys + + +def write_id_and_url_for_row(filename, row_number, output_file): + # Increase CSV field size limit so large fields won't cause errors + csv.field_size_limit(sys.maxsize) + + with open(filename, encoding="utf-8") as f: + reader = csv.reader(f) + + # Skip the header row + next(reader, None) + + # IMPORTANT: start=2 to match the validation script's row numbering + for current_row_index, row in enumerate(reader, start=2): + if current_row_index == row_number: + with open(output_file, "w", encoding="utf-8", newline="") as out_f: + writer = csv.writer(out_f) + writer.writerow(row) + return + + # If you get here, that row_number didn't exist + with open(output_file, "w", encoding="utf-8") as out_f: + out_f.write(f"Row {row_number} does not exist in {filename}.\n") + + +def main(): + # Example usage: + filename = "outputs/cleaned_dump_delimeter.csv" + output_file = "outputs/row_output.csv" + desired_row_number = 175655 # or wherever you want + write_id_and_url_for_row(filename, desired_row_number, output_file) + + +if __name__ == "__main__": + main()