Skip to content

Commit 8d42e40

Browse files
committed
[DOP-30603] Fix parsing file format options
1 parent 1f2a4e8 commit 8d42e40

File tree

2 files changed

+28
-4
lines changed

2 files changed

+28
-4
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
Fix some file format options were ignored by SyncMaster worker:
2+
* ``XML(root_tag, row_tag)``
3+
* ``Excel(start_cell, include_header)``
4+
* ``CSV(include_header, line_sep)``
5+
* ``JSON(line_sep)``, ``JSONLine(line_sep)``

syncmaster/dto/transfers.py

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,30 @@ def __post_init__(self):
6161

6262
self.options.setdefault("if_exists", "replace_overlapping_partitions")
6363

64+
@staticmethod
65+
def _rewrite_option_name(file_format: dict, from_name: str, to_name: str): # noqa: WPS602
66+
if from_name in file_format:
67+
file_format[to_name] = file_format.pop(from_name)
68+
6469
def _get_file_format(self, file_format: dict) -> CSV | JSONLine | JSON | Excel | XML | ORC | Parquet:
6570
file_type = file_format.pop("type", None)
66-
# XML at spark-xml has no "none" option https://github.com/databricks/spark-xml?tab=readme-ov-file#features
67-
if file_type == "xml" and file_format.get("compression") == "none":
68-
file_format.pop("compression")
71+
if file_type == "xml":
72+
self._rewrite_option_name(file_format, "root_tag", "rootTag")
73+
self._rewrite_option_name(file_format, "row_tag", "rowTag")
74+
# XML at spark-xml has no "none" option https://github.com/databricks/spark-xml?tab=readme-ov-file#features
75+
if file_format.get("compression") == "none":
76+
file_format.pop("compression")
77+
78+
if file_type == "excel":
79+
self._rewrite_option_name(file_format, "include_header", "header")
80+
self._rewrite_option_name(file_format, "start_cell", "dataAddress")
81+
82+
if file_type == "csv":
83+
self._rewrite_option_name(file_format, "line_sep", "lineSep")
84+
self._rewrite_option_name(file_format, "include_header", "header")
85+
86+
if file_type == "json" or file_type == "jsonline":
87+
self._rewrite_option_name(file_format, "line_sep", "lineSep")
6988

7089
parser_class = self._format_parsers.get(file_type)
7190
if parser_class is not None:
@@ -111,7 +130,7 @@ def __post_init__(self):
111130
@dataclass
112131
class IcebergRESTCatalogS3TransferDTO(DBTransferDTO):
113132
type: ClassVar[str] = "iceberg_rest_s3"
114-
catalog_name: str = field(default_factory=lambda: f"iceberg_rest_s3_{uuid4().hex[:8]}")
133+
catalog_name: str = field(default_factory=lambda: f"iceberg_rest_s3_{uuid4().hex[:8]}") # noqa: WPS237
115134

116135
def __post_init__(self):
117136
super().__post_init__()

0 commit comments

Comments
 (0)