|
19 | 19 | import hashlib |
20 | 20 | from pathlib import Path |
21 | 21 | import tempfile |
| 22 | +import shlex |
| 23 | + |
22 | 24 |
|
23 | 25 | def download_tool(url): |
24 | 26 | """ |
@@ -784,41 +786,68 @@ def write_dataframe_to_csv(dataframe, outname): |
784 | 786 | return |
785 | 787 |
|
786 | 788 |
|
787 | | - |
788 | | -def deduplicate_final_csv(csv_path, subset=None): |
| 789 | +def deduplicate_final_csv(csv_path: str, subset=None): |
789 | 790 | """ |
790 | | - Re-open the finished CSV in streaming mode, drop duplicate rows (optionally |
791 | | - only on a subset of columns), and atomically replace the file. |
792 | | -
|
793 | | - Parameters |
794 | | - ---------- |
795 | | - csv_path : str |
796 | | - Full path to the written CSV (gzip OK; .gz recognised automatically). |
797 | | - subset : list[str] | None |
798 | | - Columns to consider when identifying duplicates. |
799 | | - None = all columns (same behaviour as df.drop_duplicates()). |
800 | | -
|
801 | | - Returns |
802 | | - ------- |
803 | | - None (file is overwritten in-place) |
| 791 | + Very low-RAM dedup via external sort+uniq. |
| 792 | + - Preserves header. |
| 793 | + - Full-line dedupe (subset ignored). |
| 794 | + - Writes gz output; replaces input if it was .gz, else creates <input>.gz and removes the original. |
804 | 795 | """ |
805 | | - fd, tmp = tempfile.mkstemp(suffix=".csv.gz") |
806 | | - os.close(fd) |
807 | | - ( |
808 | | - pl.scan_csv(csv_path) |
809 | | - .unique(subset=subset, maintain_order=True) |
810 | | - .sink_csv(tmp, |
811 | | - has_header=True, |
812 | | - compression="gzip", |
813 | | - separator=",") |
814 | | - ) |
815 | | - |
816 | | - out_path = csv_path + ".gz" |
817 | | - os.replace(tmp, out_path) |
818 | | - os.remove(csv_path) |
819 | | - print(f"De-duplicated and gzipped file written to: {out_path}") |
820 | | - # 6. free memory |
821 | | - gc.collect() |
| 796 | + if subset is not None: |
| 797 | + print("Warning: 'subset' is ignored by sort/uniq dedup (full-line dedupe).") |
| 798 | + |
| 799 | + is_gz = csv_path.endswith(".gz") |
| 800 | + out_path = csv_path if is_gz else f"{csv_path}.gz" |
| 801 | + # temp gz path |
| 802 | + tmp_fd, tmp_path = tempfile.mkstemp(suffix=".csv.gz") |
| 803 | + os.close(tmp_fd) |
| 804 | + # Decompressor |
| 805 | + src_cmd = f"gzip -cd {shlex.quote(csv_path)}" if is_gz else f"cat {shlex.quote(csv_path)}" |
| 806 | + |
| 807 | + # Detect sort verion |
| 808 | + def _has_gnu_sort(): |
| 809 | + try: |
| 810 | + out = subprocess.run(["sort", "--version"], capture_output=True, text=True) |
| 811 | + return out.returncode == 0 and "GNU coreutils" in (out.stdout + out.stderr) |
| 812 | + except Exception: |
| 813 | + return False |
| 814 | + |
| 815 | + gnu = _has_gnu_sort() |
| 816 | + sort_opts = [] |
| 817 | + if gnu: |
| 818 | + # Safe defaults |
| 819 | + sort_opts += ["-S", "1G", "--temporary-directory", "."] |
| 820 | + |
| 821 | + sort_cmd = " ".join(["sort"] + [shlex.quote(o) for o in sort_opts]) |
| 822 | + |
| 823 | + try: |
| 824 | + # Grab header first |
| 825 | + header = subprocess.check_output( |
| 826 | + f"{src_cmd} | head -n 1", |
| 827 | + shell=True, |
| 828 | + executable="/bin/bash", |
| 829 | + text=True |
| 830 | + ).rstrip("\n") |
| 831 | + |
| 832 | + # Print header, then stream body,sort,uniq, then gzip |
| 833 | + cmd = ( |
| 834 | + "{ " |
| 835 | + f"printf %s\\\\n {shlex.quote(header)}; " |
| 836 | + f"LC_ALL=C {src_cmd} | tail -n +2 | {sort_cmd} | uniq; " |
| 837 | + f"}} | gzip -c > {shlex.quote(tmp_path)}" |
| 838 | + ) |
| 839 | + |
| 840 | + subprocess.run(cmd, shell=True, check=True, executable="/bin/bash") |
| 841 | + os.replace(tmp_path, out_path) |
| 842 | + if not is_gz and os.path.exists(csv_path): |
| 843 | + os.remove(csv_path) |
| 844 | + print(f"De-duplicated and gzipped file written to: {out_path}") |
| 845 | + except subprocess.CalledProcessError as e: |
| 846 | + if os.path.exists(tmp_path): |
| 847 | + os.remove(tmp_path) |
| 848 | + raise RuntimeError(f"Deduplication failed with exit code {e.returncode}") from e |
| 849 | + finally: |
| 850 | + gc.collect() |
822 | 851 |
|
823 | 852 |
|
824 | 853 |
|
@@ -929,7 +958,7 @@ def main(): |
929 | 958 | print(f"Done. Dataset written to {args.outname}") |
930 | 959 |
|
931 | 960 | print("Running global de-duplication pass …") |
932 | | - deduplicate_final_csv(args.outname) # subset=None ⇒ all columns |
| 961 | + deduplicate_final_csv(args.outname) |
933 | 962 | print("All done.") |
934 | 963 |
|
935 | 964 |
|
|
0 commit comments