Skip to content

Commit 00b57ce

Browse files
Support write mode when writing to socket (#148)
1 parent 4926e13 commit 00b57ce

File tree

1 file changed

+8
-3
lines changed

1 file changed

+8
-3
lines changed

spark_utils/common/functions.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,16 @@ def write_to_socket(
8888
partition_by: Optional[List[str]] = None,
8989
partition_count: Optional[int] = None,
9090
write_options: Optional[Dict[str, str]] = None,
91+
mode: str = "overwrite",
9192
) -> None:
9293
"""Writes data to socket
9394
9495
:param data: Dataframe to write
9596
:param socket: Socket to write to
9697
:param partition_by: List of column names to partition by
9798
:param partition_count: Number of partitions to split result into.
98-
:param write_options: Write options passed to spark (e.g. Parquet options
99+
:param write_options: Write options passed to spark (e.g. Parquet options)
100+
:param mode: Write mode
99101
found here: https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option)
100102
"""
101103
write_options = write_options or {}
@@ -105,10 +107,13 @@ def write_to_socket(
105107

106108
# ignore all external write options as Iceberg writer will take care of those
107109
if socket.data_format.startswith("iceberg"):
108-
data.writeTo(socket.data_path).createOrReplace()
110+
if mode == "overwrite":
111+
data.writeTo(socket.data_path).createOrReplace()
112+
if mode == "append":
113+
data.writeTo(socket.data_path).append()
109114
return
110115

111-
writer = data.write.mode("overwrite").options(**write_options)
116+
writer = data.write.mode(mode).options(**write_options)
112117

113118
if partition_by:
114119
writer = writer.partitionBy(*partition_by)

0 commit comments

Comments
 (0)