Skip to content

Commit a33dc9a

Browse files
authored
[SYNPY-1584] Add a buffer.tell when truncating bytes during dataframe upload and drop writing header to csv (#1193)
* [SYNPY-1584] Add a buffer.tell when truncating bytes during dataframe upload and drop writing header to csv
1 parent a64023d commit a33dc9a

File tree

3 files changed

+70
-15
lines changed

3 files changed

+70
-15
lines changed

synapseclient/core/upload/upload_utils.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ def get_partial_dataframe_chunk(
5555
(total_size_of_chunks_being_uploaded - ((part_number - 1) * part_size)),
5656
part_size,
5757
)
58-
header_written = False
5958
# TODO: This is an area for optimization. It is possible to avoid writing the entire
6059
# dataframe to a buffer and then reading the buffer to get the bytes. Instead, we
6160
# might be able to do something like keeping markers at each 100 row increment how
@@ -68,12 +67,11 @@ def get_partial_dataframe_chunk(
6867
df.iloc[offset_start:end].to_csv(
6968
buffer,
7069
mode="a",
71-
header=(part_number == 1 and not header_written),
70+
header=False,
7271
index=False,
7372
float_format="%.12g",
7473
**(to_csv_kwargs or {}),
7574
)
76-
header_written = True
7775
number_of_bytes_in_buffer = buffer.tell()
7876
# Drop data from the front of the buffer until total_offset is 0
7977
if total_offset > 0 and total_offset >= number_of_bytes_in_buffer:
@@ -89,6 +87,7 @@ def get_partial_dataframe_chunk(
8987
buffer.truncate(0)
9088
buffer.write(copy_of_data)
9189
total_offset = 0
90+
number_of_bytes_in_buffer = buffer.tell()
9291

9392
if number_of_bytes_in_buffer >= max_bytes_to_read:
9493
# Return maximum number of bytes that can be read from the buffer

synapseclient/models/mixins/table_components.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3468,10 +3468,8 @@ async def _chunk_and_upload_df(
34683468

34693469
chunks_to_upload = []
34703470
size_of_chunk = 0
3471-
previous_chunk_byte_offset = 0
34723471
buffer = BytesIO()
34733472
total_df_bytes = 0
3474-
size_of_header = 0
34753473
header_line = None
34763474
md5_hashlib = hashlib.new("md5", usedforsecurity=False) # nosec
34773475
line_start_index_for_chunk = 0
@@ -3494,28 +3492,23 @@ async def _chunk_and_upload_df(
34943492
if start == 0:
34953493
buffer.seek(0)
34963494
header_line = buffer.readline()
3497-
size_of_header = len(header_line)
3498-
previous_chunk_byte_offset = size_of_header
34993495
md5_hashlib.update(buffer.getvalue())
35003496

35013497
if size_of_chunk >= insert_size_bytes:
35023498
chunks_to_upload.append(
35033499
(
3504-
previous_chunk_byte_offset,
35053500
size_of_chunk,
35063501
md5_hashlib.hexdigest(),
35073502
line_start_index_for_chunk,
35083503
line_end_index_for_chunk,
35093504
)
35103505
)
3511-
previous_chunk_byte_offset = size_of_header
35123506
size_of_chunk = 0
35133507
line_start_index_for_chunk = line_end_index_for_chunk
35143508
md5_hashlib = hashlib.new("md5", usedforsecurity=False) # nosec
35153509
if size_of_chunk > 0:
35163510
chunks_to_upload.append(
35173511
(
3518-
previous_chunk_byte_offset,
35193512
size_of_chunk,
35203513
md5_hashlib.hexdigest(),
35213514
line_start_index_for_chunk,
@@ -3558,7 +3551,6 @@ async def _chunk_and_upload_df(
35583551
wait_for_update_semaphore = asyncio.Semaphore(value=1)
35593552
part = 0
35603553
for (
3561-
byte_chunk_offset,
35623554
size_of_chunk,
35633555
md5,
35643556
line_start,
@@ -3569,7 +3561,7 @@ async def _chunk_and_upload_df(
35693561
self._stream_and_update_from_df(
35703562
client=client,
35713563
size_of_chunk=size_of_chunk,
3572-
byte_chunk_offset=byte_chunk_offset,
3564+
byte_chunk_offset=0,
35733565
md5=md5,
35743566
csv_table_descriptor=csv_table_descriptor,
35753567
job_timeout=job_timeout,

tests/integration/synapseclient/models/async/test_table_async.py

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
import json
22
import os
3+
import random
4+
import string
35
import tempfile
46
import uuid
57
from typing import Callable
8+
from unittest import skip
69

710
import pandas as pd
811
import pytest
@@ -901,9 +904,6 @@ async def test_store_rows_as_df_being_split_and_uploaded(
901904
"large_string": [large_string_a for _ in range(200)],
902905
}
903906
)
904-
filepath = f"{tempfile.mkdtemp()}/upload_{uuid.uuid4()}.csv"
905-
self.schedule_for_cleanup(filepath)
906-
data_for_table.to_csv(filepath, index=False, float_format="%.12g")
907907

908908
# WHEN I store the rows to the table
909909
await table.store_rows_async(
@@ -937,6 +937,70 @@ async def test_store_rows_as_df_being_split_and_uploaded(
937937
# Note: DataFrames have a minimum of 100 rows per batch
938938
assert spy_send_job.call_count == 2
939939

940+
@skip("Skip in normal testing because the large size makes it slow")
941+
async def test_store_rows_as_large_df_being_split_and_uploaded(
942+
self, project_model: Project, mocker: MockerFixture
943+
) -> None:
944+
# GIVEN a table in Synapse
945+
table_name = str(uuid.uuid4())
946+
table = Table(
947+
name=table_name,
948+
parent_id=project_model.id,
949+
columns=[
950+
Column(name="column_string", column_type=ColumnType.STRING),
951+
Column(name="column_to_order_on", column_type=ColumnType.INTEGER),
952+
Column(
953+
name="large_string",
954+
column_type=ColumnType.LARGETEXT,
955+
),
956+
],
957+
)
958+
table = await table.store_async(synapse_client=self.syn)
959+
self.schedule_for_cleanup(table.id)
960+
spy_send_job = mocker.spy(asynchronous_job_module, "send_job_async")
961+
962+
# AND data that will be split into multiple parts
963+
rows_in_table = 20
964+
random_string = "".join(random.choices(string.ascii_uppercase, k=500000))
965+
data_for_table = pd.DataFrame(
966+
{
967+
"column_string": [f"value{i}" for i in range(rows_in_table)],
968+
"column_to_order_on": [i for i in range(rows_in_table)],
969+
"large_string": [random_string for _ in range(rows_in_table)],
970+
}
971+
)
972+
973+
# WHEN I store the rows to the table
974+
await table.store_rows_async(
975+
values=data_for_table,
976+
schema_storage_strategy=None,
977+
synapse_client=self.syn,
978+
insert_size_bytes=1 * utils.KB,
979+
)
980+
981+
# AND I query the table
982+
results = await query_async(
983+
f"SELECT * FROM {table.id} ORDER BY column_to_order_on ASC",
984+
synapse_client=self.syn,
985+
)
986+
987+
# THEN the data in the columns should match
988+
pd.testing.assert_series_equal(
989+
results["column_string"], data_for_table["column_string"]
990+
)
991+
pd.testing.assert_series_equal(
992+
results["column_to_order_on"], data_for_table["column_to_order_on"]
993+
)
994+
pd.testing.assert_series_equal(
995+
results["large_string"], data_for_table["large_string"]
996+
)
997+
998+
# AND `rows_in_table` rows exist on the table
999+
assert len(results) == rows_in_table
1000+
1001+
# AND The spy should have been called in multiple batches
1002+
assert spy_send_job.call_count == 1
1003+
9401004

9411005
class TestUpsertRows:
9421006
@pytest.fixture(autouse=True, scope="function")

0 commit comments

Comments
 (0)