|
12 | 12 | from itertools import chain |
13 | 13 | from typing import List |
14 | 14 |
|
15 | | -from py4j.protocol import Py4JError |
16 | 15 | from pyspark.sql import DataFrame, SparkSession |
17 | 16 | from pyspark.sql.functions import col, concat, concat_ws, expr, lit, regexp_replace, to_date, transform, when |
18 | 17 | from pyspark.sql.types import ArrayType, DecimalType, StringType, StructType |
@@ -636,110 +635,6 @@ def write_csv_file( |
636 | 635 | return df_record_count |
637 | 636 |
|
638 | 637 |
|
639 | | -def hadoop_copy_merge( |
640 | | - spark: SparkSession, |
641 | | - parts_dir: str, |
642 | | - header: str, |
643 | | - part_merge_group_size: int, |
644 | | - logger=None, |
645 | | - file_format="csv", |
646 | | -) -> List[str]: |
647 | | - """PySpark impl of Hadoop 2.x copyMerge() (deprecated in Hadoop 3.x) |
648 | | - Merges files from a provided input directory and then redivides them |
649 | | - into multiple files based on merge group size. |
650 | | - Args: |
651 | | - spark: passed-in active SparkSession |
652 | | - parts_dir: Path to the dir that contains the input parts files. The parts dir name |
653 | | - determines the name of the merged files. Parts_dir cannot have a trailing slash. |
654 | | - header: A comma-separated list of field names, to be placed as the first row of every final CSV file. |
655 | | - Individual part files must NOT therefore be created with their own header. |
656 | | - part_merge_group_size: Final CSV data will be subdivided into numbered files. This indicates how many part files |
657 | | - should be combined into a numbered file. |
658 | | - logger: The logger to use. If one note provided (e.g. to log to console or stdout) the underlying JVM-based |
659 | | - Logger will be extracted from the ``spark`` ``SparkSession`` and used as the logger. |
660 | | - file_format: The format of the part files and the format of the final merged file, e.g. "csv" |
661 | | -
|
662 | | - Returns: |
663 | | - A list of file paths where each element in the list denotes a path to |
664 | | - a merged file that was generated during the copy merge. |
665 | | - """ |
666 | | - overwrite = True |
667 | | - hadoop = spark.sparkContext._jvm.org.apache.hadoop |
668 | | - conf = spark.sparkContext._jsc.hadoopConfiguration() |
669 | | - |
670 | | - # Guard against incorrectly formatted argument value |
671 | | - parts_dir = parts_dir.rstrip("/") |
672 | | - |
673 | | - parts_dir_path = hadoop.fs.Path(parts_dir) |
674 | | - |
675 | | - fs = parts_dir_path.getFileSystem(conf) |
676 | | - |
677 | | - if not fs.exists(parts_dir_path): |
678 | | - raise ValueError("Source directory {} does not exist".format(parts_dir)) |
679 | | - |
680 | | - file = parts_dir |
681 | | - file_path = hadoop.fs.Path(file) |
682 | | - |
683 | | - # Don't delete first if disallowing overwrite. |
684 | | - if not overwrite and fs.exists(file_path): |
685 | | - raise Py4JError(spark._jvm.org.apache.hadoop.fs.FileAlreadyExistsException(f"{str(file_path)} already exists")) |
686 | | - part_files = [] |
687 | | - |
688 | | - for f in fs.listStatus(parts_dir_path): |
689 | | - if f.isFile(): |
690 | | - # Sometimes part files can be empty, we need to ignore them |
691 | | - if f.getLen() == 0: |
692 | | - continue |
693 | | - file_path = f.getPath() |
694 | | - if file_path.getName().startswith("_"): |
695 | | - logger.debug(f"Skipping non-part file: {file_path.getName()}") |
696 | | - continue |
697 | | - logger.debug(f"Including part file: {file_path.getName()}") |
698 | | - part_files.append(f.getPath()) |
699 | | - if not part_files: |
700 | | - logger.warning("Source directory is empty with no part files. Attempting creation of file with CSV header only") |
701 | | - out_stream = None |
702 | | - try: |
703 | | - merged_file_path = f"{parts_dir}.{file_format}" |
704 | | - out_stream = fs.create(hadoop.fs.Path(merged_file_path), overwrite) |
705 | | - out_stream.writeBytes(header + "\n") |
706 | | - finally: |
707 | | - if out_stream is not None: |
708 | | - out_stream.close() |
709 | | - return [merged_file_path] |
710 | | - |
711 | | - part_files.sort(key=lambda f: str(f)) # put parts in order by part number for merging |
712 | | - paths_to_merged_files = [] |
713 | | - for parts_file_group in _merge_grouper(part_files, part_merge_group_size): |
714 | | - part_suffix = f"_{str(parts_file_group.part).zfill(2)}" if parts_file_group.part else "" |
715 | | - partial_merged_file = f"{parts_dir}.partial{part_suffix}" |
716 | | - partial_merged_file_path = hadoop.fs.Path(partial_merged_file) |
717 | | - merged_file_path = f"{parts_dir}{part_suffix}.{file_format}" |
718 | | - paths_to_merged_files.append(merged_file_path) |
719 | | - # Make path a hadoop path because we are working with a hadoop file system |
720 | | - merged_file_path = hadoop.fs.Path(merged_file_path) |
721 | | - if overwrite and fs.exists(merged_file_path): |
722 | | - fs.delete(merged_file_path, True) |
723 | | - out_stream = None |
724 | | - try: |
725 | | - if fs.exists(partial_merged_file_path): |
726 | | - fs.delete(partial_merged_file_path, True) |
727 | | - out_stream = fs.create(partial_merged_file_path) |
728 | | - out_stream.writeBytes(header + "\n") |
729 | | - _merge_file_parts(fs, out_stream, conf, hadoop, partial_merged_file_path, parts_file_group.file_list) |
730 | | - finally: |
731 | | - if out_stream is not None: |
732 | | - out_stream.close() |
733 | | - try: |
734 | | - fs.rename(partial_merged_file_path, merged_file_path) |
735 | | - except Exception: |
736 | | - if fs.exists(partial_merged_file_path): |
737 | | - fs.delete(partial_merged_file_path, True) |
738 | | - logger.exception("Exception encountered. See logs") |
739 | | - raise |
740 | | - return paths_to_merged_files |
741 | | - |
742 | | - |
743 | 638 | def _merge_file_parts(fs, out_stream, conf, hadoop, partial_merged_file_path, part_file_list): |
744 | 639 | """Read-in files in alphabetical order and append them one by one to the merged file""" |
745 | 640 |
|
|
0 commit comments