Skip to content

Commit 5279857

Browse files
committed
update the processing script
1 parent c650645 commit 5279857

File tree

1 file changed

+348
-35
lines changed

1 file changed

+348
-35
lines changed
Lines changed: 348 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,357 @@
1+
"""
2+
to run
3+
python clean_sde_dump.py inputs/sde_init_dump_04_28_2025.csv outputs/input.csv -v
4+
5+
to compress
6+
7z a -t7z -m0=lzma2 -mx=9 -mmt=on -md=512m outputs/output.7z outputs/input.csv
7+
"""
8+
9+
import argparse
110
import csv
11+
import logging
12+
import os
13+
import sys
14+
import time
15+
from datetime import timedelta
216

17+
# Set up logging
18+
# Define a custom formatter to include milliseconds
19+
log_formatter = logging.Formatter("%(asctime)s.%(msecs)03d - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
20+
# Get the root logger
21+
logger = logging.getLogger()
22+
logger.setLevel(logging.INFO) # Default level
23+
# Create a handler and set the formatter
24+
stream_handler = logging.StreamHandler()
25+
stream_handler.setFormatter(log_formatter)
26+
# Add the handler to the logger
27+
# Check if handlers already exist to avoid duplicates in interactive environments
28+
if not logger.hasHandlers():
29+
logger.addHandler(stream_handler)
330

4-
def process_large_csv(input_filename, output_filename):
5-
# Open the input file for reading and the output file for writing.
6-
with open(input_filename, encoding="utf-8") as infile, open(
7-
output_filename, "w", encoding="utf-8", newline=""
8-
) as outfile:
9-
10-
writer = csv.writer(outfile)
11-
# Write header if needed:
12-
writer.writerow(["id", "url1", "title", "collection", "treepath", "sourcestr56", "text", "sourcebool3"])
13-
14-
current_record = ""
15-
for line in infile:
16-
line = line.rstrip("\n")
17-
# Skip lines until the first record is found.
18-
if not current_record and not (line.startswith("/SDE/") or line.startswith("/SDE-TDAMM/")):
19-
continue
20-
if line.startswith("/SDE/") or line.startswith("/SDE-TDAMM/"):
21-
if current_record:
22-
parts = current_record.split("༜", 7)
23-
if len(parts) == 8:
24-
parts[6] = parts[6].replace("\n", "\\n")
25-
writer.writerow(parts)
26-
else:
27-
print("Warning: Expected 8 fields, got", len(parts))
28-
current_record = line
29-
else:
30-
current_record += "\n" + line
3131

32-
# After the loop, process the last accumulated record.
32+
def assemble_records_generator(input_filename, skip_header=False, progress_interval=100000):
33+
"""
34+
Reads the input file line by line and yields assembled multi-line records.
35+
Optionally skips the first line if it's a header.
36+
"""
37+
current_record = ""
38+
record_started = False
39+
lines_processed = 0
40+
start_time = time.time()
41+
last_report_time = start_time
42+
record_count = 0 # Keep track of records yielded
43+
44+
logger.info(f"Starting to process lines from {input_filename}")
45+
46+
try:
47+
with open(input_filename, encoding="utf-8") as infile:
48+
# Skip the header line if requested
49+
if skip_header:
50+
try:
51+
first_line = next(infile)
52+
lines_processed += 1
53+
logger.info(f"Skipped header line: {first_line.rstrip()}")
54+
except StopIteration:
55+
logger.warning("Input file appears to be empty or only contains a header.")
56+
return # Stop if file is empty after header
57+
58+
# Process remaining lines
59+
while True:
60+
try:
61+
line = next(infile)
62+
lines_processed += 1
63+
64+
# Report progress periodically based on lines read
65+
if lines_processed % progress_interval == 0:
66+
current_time = time.time()
67+
elapsed = current_time - start_time
68+
rate = lines_processed / elapsed if elapsed > 0 else 0
69+
70+
# Only log if at least 5 seconds have passed since the last report
71+
if current_time - last_report_time >= 5:
72+
logger.info(f"Read {lines_processed:,} lines - " f"Rate: {rate:.0f} lines/sec")
73+
last_report_time = current_time
74+
75+
line = line.rstrip("\n")
76+
77+
# Check if this line is the start of a new record
78+
is_record_start = (
79+
line.startswith("/SDE/") or line.startswith("/SDE-TDAMM/") or line.startswith("/scrapers/")
80+
)
81+
82+
# Skip lines until the first record is found
83+
# (This check might be less relevant if the first data line always starts a record)
84+
if not record_started and not is_record_start:
85+
# If we skipped the header, the first line *should* be a record start
86+
# Log a warning if it's not, but continue processing anyway
87+
if skip_header and lines_processed == 2: # Only warn on the first actual data line
88+
logger.warning(
89+
f"First data line does not start with a recognized record prefix: {line[:100]}..."
90+
)
91+
# continue -> We should actually process this line if it's part of the first record
92+
93+
if is_record_start:
94+
# If we've already been building a record, yield it
95+
if current_record:
96+
yield current_record
97+
record_count += 1
98+
99+
# Start a new record
100+
current_record = line
101+
record_started = True
102+
elif record_started: # Only append if a record has started
103+
# Append to the current record
104+
current_record += "\n" + line
105+
# Handle case where a non-starting line is encountered before the first record start
106+
# This might happen if skip_header=False and there's preamble before the first /SDE/
107+
elif not record_started and not is_record_start:
108+
continue # Explicitly skip lines before the first record start marker
109+
110+
except StopIteration: # End of file
111+
break
112+
except UnicodeDecodeError as e:
113+
logger.warning(f"Skipping line {lines_processed} due to UnicodeDecodeError: {e}")
114+
continue # Skip the problematic line
115+
116+
# Yield the last record if there is one
33117
if current_record:
34-
parts = current_record.split("༜", 7)
35-
if len(parts) == 8:
36-
parts[6] = parts[6].replace("\n", "\\n")
37-
writer.writerow(parts)
118+
yield current_record
119+
record_count += 1
120+
121+
elapsed = time.time() - start_time
122+
logger.info(
123+
f"Finished reading {lines_processed:,} lines and found {record_count:,} raw records in {elapsed:.2f} seconds" # noqa
124+
)
125+
126+
except FileNotFoundError:
127+
logger.error(f"Input file not found: {input_filename}")
128+
raise
129+
130+
131+
def process_records_generator(raw_records_iterator, delimiter="༜", expected_fields=None, batch_size=50000):
132+
"""
133+
Processes records from an iterator, yielding valid processed records.
134+
Requires expected_fields to be specified.
135+
"""
136+
if expected_fields is None:
137+
raise ValueError("process_records_generator requires 'expected_fields' to be specified.")
138+
139+
processed_count = 0
140+
invalid_records = 0
141+
start_time = time.time()
142+
last_report_time = start_time
143+
input_record_index = 0 # Track input records processed for logging
144+
145+
logger.info(f"Starting record processing (expecting {expected_fields} fields)...")
146+
147+
for i, record in enumerate(raw_records_iterator):
148+
input_record_index = i + 1
149+
# Split into AT MOST expected_fields parts. The last part will contain the rest.
150+
parts = record.split(delimiter, expected_fields - 1)
151+
152+
if len(parts) == expected_fields:
153+
# Replace literal newlines with '\\n' string in the text field
154+
# The text field is the last one, index expected_fields - 1
155+
text_field_index = expected_fields - 1
156+
if text_field_index < len(parts): # Ensure index exists
157+
parts[text_field_index] = parts[text_field_index].replace("\n", "\\n")
158+
else:
159+
# This case should technically not happen if len(parts) == expected_fields
160+
# but adding a safeguard.
161+
logger.warning(
162+
f"Record ~{input_record_index}: Field index {text_field_index} out of bounds unexpectedly."
163+
)
164+
165+
processed_count += 1
166+
yield parts # Yield the valid processed record
167+
else:
168+
invalid_records += 1
169+
if invalid_records <= 10: # Log only the first 10 invalid records
170+
logger.warning(
171+
f"Record ~{input_record_index}: Expected {expected_fields} fields, got {len(parts)}. Skipping."
172+
)
173+
# Optionally log the problematic record content (careful with large fields)
174+
# logger.debug(f"Problematic record content (first 100 chars): {record[:100]}")
175+
# Do not yield invalid records
176+
177+
# Report progress periodically based on input records processed
178+
if input_record_index % batch_size == 0:
179+
current_time = time.time()
180+
if current_time - last_report_time >= 5:
181+
elapsed = current_time - start_time
182+
rate = input_record_index / elapsed if elapsed > 0 else 0
183+
logger.info(
184+
f"Processed {input_record_index:,} raw records ({processed_count:,} valid) - "
185+
f"Rate: {rate:.0f} records/sec"
186+
)
187+
last_report_time = current_time
188+
189+
if invalid_records > 10:
190+
logger.warning(f"Additional {invalid_records - 10} invalid records were skipped (not shown in logs)")
191+
192+
logger.info(
193+
f"Finished processing {input_record_index:,} raw records. Found {processed_count:,} valid records and {invalid_records:,} invalid records." # noqa
194+
)
195+
196+
197+
def write_output_file(output_filename, processed_records_iterator, header=None, batch_size=100000):
198+
"""Write the processed records from an iterator to the output CSV file."""
199+
# Create the directory if it doesn't exist
200+
output_dir = os.path.dirname(output_filename)
201+
# Check if output_dir is not empty before creating
202+
if output_dir and not os.path.exists(output_dir):
203+
logger.info(f"Creating output directory: {output_dir}")
204+
os.makedirs(output_dir, exist_ok=True)
205+
206+
records_written = 0
207+
start_time = time.time()
208+
last_report_time = start_time
209+
210+
logger.info(f"Starting to write records to {output_filename}")
211+
212+
try:
213+
with open(output_filename, "w", encoding="utf-8", newline="") as outfile:
214+
writer = csv.writer(outfile)
215+
216+
# Write header if provided
217+
if header:
218+
logger.info(f"Writing header: {header}")
219+
writer.writerow(header)
38220
else:
39-
print("Warning: Expected 8 fields, got", len(parts))
221+
logger.warning("No header provided for output file.")
222+
223+
# Write records as they come from the iterator
224+
for i, record in enumerate(processed_records_iterator):
225+
writer.writerow(record)
226+
records_written += 1
227+
228+
# Report progress periodically
229+
if records_written % batch_size == 0:
230+
current_time = time.time()
231+
if current_time - last_report_time >= 5:
232+
elapsed = current_time - start_time
233+
rate = records_written / elapsed if elapsed > 0 else 0
234+
logger.info(f"Wrote {records_written:,} records - " f"Rate: {rate:.0f} records/sec")
235+
last_report_time = current_time
236+
237+
total_time = time.time() - start_time
238+
logger.info(f"Finished writing {records_written:,} records in {total_time:.2f} seconds")
239+
240+
except OSError as e:
241+
logger.error(f"Error writing to output file {output_filename}: {e}")
242+
raise
243+
244+
return records_written
245+
246+
247+
def process_large_csv(input_filename, output_filename, delimiter="༜", verbose=False):
248+
"""
249+
Process a large CSV file iteratively using generators.
250+
Determines header and field count from the first line of the input file.
251+
"""
252+
# Adjust logging level based on verbosity - should be set in main()
253+
if verbose:
254+
logger.setLevel(logging.INFO)
255+
256+
logger.info(f"Processing {input_filename} to {output_filename} with delimiter '{delimiter}'")
257+
total_start_time = time.time()
258+
259+
header = None
260+
expected_fields = 0
261+
262+
# --- Read header and determine expected fields ---
263+
try:
264+
with open(input_filename, encoding="utf-8") as infile:
265+
first_line = next(infile).rstrip("\n")
266+
header = first_line.split(delimiter)
267+
expected_fields = len(header)
268+
logger.info(f"Detected header: {header}")
269+
logger.info(f"Expecting {expected_fields} fields based on header.")
270+
if expected_fields == 0:
271+
logger.error("Header line is empty or could not be split. Cannot proceed.")
272+
return -1
273+
except FileNotFoundError:
274+
logger.error(f"Input file not found: {input_filename}")
275+
return -1 # Indicate file not found error
276+
except StopIteration:
277+
logger.error(f"Input file is empty: {input_filename}")
278+
return -1 # Indicate empty file error
279+
except Exception as e:
280+
logger.error(f"Error reading header from {input_filename}: {e}", exc_info=True)
281+
return -1 # Indicate general error during header read
282+
# --- End header reading ---
283+
284+
try:
285+
# Create iterators/generators
286+
# Pass skip_header=True to avoid processing the header line again
287+
raw_records_iter = assemble_records_generator(input_filename, skip_header=True)
288+
# Pass the dynamically determined expected_fields
289+
processed_records_iter = process_records_generator(raw_records_iter, delimiter, expected_fields)
290+
291+
# Write to output file by consuming the processed records iterator, using the detected header
292+
records_written = write_output_file(output_filename, processed_records_iter, header)
293+
294+
total_elapsed = time.time() - total_start_time
295+
rate = records_written / total_elapsed if total_elapsed > 0 else 0
296+
logger.info(
297+
f"Complete! Processed {records_written:,} valid records in {timedelta(seconds=int(total_elapsed))} "
298+
f"(Average rate: {rate:.1f} records/sec)"
299+
)
300+
return records_written
301+
302+
except FileNotFoundError:
303+
# Error already logged by assemble_records_generator if it happens later
304+
# This is unlikely if header reading succeeded, but kept for safety
305+
logger.error(f"Input file disappeared after reading header: {input_filename}")
306+
return -1
307+
except Exception as e:
308+
logger.error(f"An unexpected error occurred during processing: {e}", exc_info=True)
309+
return -1 # Indicate general error
310+
311+
312+
def main():
313+
"""Main entry point for the script."""
314+
# Set up command line arguments
315+
parser = argparse.ArgumentParser(
316+
description="Process large CSV files with special formatting efficiently, detecting header automatically."
317+
)
318+
parser.add_argument("input", help="Input file path")
319+
parser.add_argument("output", help="Output file path")
320+
parser.add_argument("--delimiter", default="༜", help="Field delimiter character (default: '༜')")
321+
# Removed --fields argument
322+
parser.add_argument("-v", "--verbose", action="store_true", help="Show detailed progress information (INFO level)")
323+
parser.add_argument("-q", "--quiet", action="store_true", help="Suppress all output except errors (ERROR level)")
324+
325+
args = parser.parse_args()
326+
327+
# Adjust logging level based on args BEFORE calling processing function
328+
if args.quiet:
329+
logger.setLevel(logging.ERROR)
330+
elif args.verbose:
331+
logger.setLevel(logging.INFO)
332+
else:
333+
# Default level if neither -v nor -q is specified
334+
logger.setLevel(logging.WARNING)
335+
336+
# Process the CSV file using the arguments passed
337+
# No longer passing expected_fields
338+
records_processed = process_large_csv(
339+
args.input, args.output, delimiter=args.delimiter, verbose=args.verbose # Pass verbose flag
340+
)
341+
342+
if records_processed >= 0 and not args.quiet:
343+
# Use standard print for final user confirmation message
344+
print(f"Successfully processed {records_processed:,} records.")
345+
return 0
346+
elif records_processed < 0:
347+
# Error messages already logged
348+
print("Processing failed. Check logs for details.", file=sys.stderr)
349+
return 1
350+
else: # records_processed == 0, potentially valid but maybe unexpected
351+
if not args.quiet:
352+
print("Processing finished, but 0 valid records were written.")
353+
return 0
40354

41355

42356
if __name__ == "__main__":
43-
# Replace with your actual file names.
44-
process_large_csv("./inputs/dump_delimeter.csv", "./outputs/cleaned_dump_delimeter.csv")
357+
sys.exit(main())

0 commit comments

Comments
 (0)