Skip to content

Commit f7b7b36

Browse files
committed
Remove s3 eventual consistency guardrails. #464
1 parent b5ce7c8 commit f7b7b36

15 files changed

+145
-324
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,8 @@ Please [send a Pull Request](https://github.com/awslabs/aws-data-wrangler/edit/m
161161

162162
## Amazon SageMaker Data Wrangler?
163163

164-
**Amazon SageMaker Data Wrangler** is a new SageMaker Studio feature that has a similar name but has nothing to do with the **AWS Data Wrangler** project.
164+
**Amazon SageMaker Data Wrangler** is a new SageMaker Studio feature that has a similar name but has a different purpose than the **AWS Data Wrangler** open source project.
165165

166166
* **AWS Data Wrangler** is open source, runs anywhere, and is focused on code.
167167

168-
* **Amazon SageMaker Data Wrangler** is specific for the SageMaker environment, it is **NOT** open source, and is focused on a visual interface.
168+
* **Amazon SageMaker Data Wrangler** is specific for the SageMaker Studio environment and is focused on a visual interface.

awswrangler/athena/_read.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -221,11 +221,9 @@ def _fetch_parquet_result(
221221
metadata_path: str = manifest_path.replace("-manifest.csv", ".metadata")
222222
_logger.debug("manifest_path: %s", manifest_path)
223223
_logger.debug("metadata_path: %s", metadata_path)
224-
s3.wait_objects_exist(paths=[manifest_path], use_threads=False, boto3_session=boto3_session)
225224
paths: List[str] = _extract_ctas_manifest_paths(path=manifest_path, boto3_session=boto3_session)
226225
if not paths:
227226
return _empty_dataframe_response(bool(chunked), query_metadata)
228-
s3.wait_objects_exist(paths=paths, use_threads=False, boto3_session=boto3_session)
229227
ret = s3.read_parquet(
230228
path=paths, use_threads=use_threads, boto3_session=boto3_session, chunked=chunked, categories=categories
231229
)
@@ -257,7 +255,6 @@ def _fetch_csv_result(
257255
chunked = _chunksize is not None
258256
return _empty_dataframe_response(chunked, query_metadata)
259257
path: str = query_metadata.output_location
260-
s3.wait_objects_exist(paths=[path], use_threads=False, boto3_session=boto3_session)
261258
_logger.debug("Start CSV reading from %s", path)
262259
ret = s3.read_csv(
263260
path=[path],

awswrangler/athena/_utils.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,6 @@ def _fetch_txt_result(
121121
if query_metadata.output_location is None or query_metadata.output_location.endswith(".txt") is False:
122122
return pd.DataFrame()
123123
path: str = query_metadata.output_location
124-
s3.wait_objects_exist(paths=[path], use_threads=False, boto3_session=boto3_session)
125124
_logger.debug("Start TXT reading from %s", path)
126125
df = s3.read_csv(
127126
path=[path],

tests/test__routines.py

Lines changed: 23 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ def test_routine_0(glue_database, glue_table, path, use_threads, concurrent_part
1414

1515
# Round 1 - Warm up
1616
df = pd.DataFrame({"c0": [0, None]}, dtype="Int64")
17-
paths = wr.s3.to_parquet(
17+
wr.s3.to_parquet(
1818
df=df,
1919
path=path,
2020
dataset=True,
@@ -26,9 +26,8 @@ def test_routine_0(glue_database, glue_table, path, use_threads, concurrent_part
2626
columns_comments={"c0": "0"},
2727
use_threads=use_threads,
2828
concurrent_partitioning=concurrent_partitioning,
29-
)["paths"]
29+
)
3030
assert wr.catalog.get_table_number_of_versions(table=glue_table, database=glue_database) == 1
31-
wr.s3.wait_objects_exist(paths=paths, use_threads=use_threads)
3231
df2 = wr.athena.read_sql_table(glue_table, glue_database, use_threads=use_threads)
3332
assert df.shape == df2.shape
3433
assert df.c0.sum() == df2.c0.sum()
@@ -43,7 +42,7 @@ def test_routine_0(glue_database, glue_table, path, use_threads, concurrent_part
4342

4443
# Round 2 - Overwrite
4544
df = pd.DataFrame({"c1": [None, 1, None]}, dtype="Int16")
46-
paths = wr.s3.to_parquet(
45+
wr.s3.to_parquet(
4746
df=df,
4847
path=path,
4948
dataset=True,
@@ -55,9 +54,8 @@ def test_routine_0(glue_database, glue_table, path, use_threads, concurrent_part
5554
columns_comments={"c1": "1"},
5655
use_threads=use_threads,
5756
concurrent_partitioning=concurrent_partitioning,
58-
)["paths"]
57+
)
5958
assert wr.catalog.get_table_number_of_versions(table=glue_table, database=glue_database) == 1
60-
wr.s3.wait_objects_exist(paths=paths, use_threads=use_threads)
6159
df2 = wr.athena.read_sql_table(glue_table, glue_database, use_threads=use_threads)
6260
assert df.shape == df2.shape
6361
assert df.c1.sum() == df2.c1.sum()
@@ -72,7 +70,7 @@ def test_routine_0(glue_database, glue_table, path, use_threads, concurrent_part
7270

7371
# Round 3 - Append
7472
df = pd.DataFrame({"c1": [None, 2, None]}, dtype="Int8")
75-
paths = wr.s3.to_parquet(
73+
wr.s3.to_parquet(
7674
df=df,
7775
path=path,
7876
dataset=True,
@@ -84,9 +82,8 @@ def test_routine_0(glue_database, glue_table, path, use_threads, concurrent_part
8482
columns_comments={"c1": "1"},
8583
use_threads=use_threads,
8684
concurrent_partitioning=concurrent_partitioning,
87-
)["paths"]
85+
)
8886
assert wr.catalog.get_table_number_of_versions(table=glue_table, database=glue_database) == 1
89-
wr.s3.wait_objects_exist(paths=paths, use_threads=use_threads)
9087
df2 = wr.athena.read_sql_table(glue_table, glue_database, use_threads=use_threads)
9188
assert len(df.columns) == len(df2.columns)
9289
assert len(df.index) * 2 == len(df2.index)
@@ -102,7 +99,7 @@ def test_routine_0(glue_database, glue_table, path, use_threads, concurrent_part
10299

103100
# Round 4 - Append + New Column
104101
df = pd.DataFrame({"c2": ["a", None, "b"], "c1": [None, None, None]})
105-
paths = wr.s3.to_parquet(
102+
wr.s3.to_parquet(
106103
df=df,
107104
path=path,
108105
dataset=True,
@@ -114,9 +111,8 @@ def test_routine_0(glue_database, glue_table, path, use_threads, concurrent_part
114111
columns_comments={"c1": "1", "c2": "2"},
115112
use_threads=use_threads,
116113
concurrent_partitioning=concurrent_partitioning,
117-
)["paths"]
114+
)
118115
assert wr.catalog.get_table_number_of_versions(table=glue_table, database=glue_database) == 1
119-
wr.s3.wait_objects_exist(paths=paths, use_threads=use_threads)
120116
df2 = wr.athena.read_sql_table(glue_table, glue_database, use_threads=use_threads)
121117
assert len(df2.columns) == 2
122118
assert len(df2.index) == 9
@@ -133,7 +129,7 @@ def test_routine_0(glue_database, glue_table, path, use_threads, concurrent_part
133129

134130
# Round 5 - Append + New Column + Wrong Types
135131
df = pd.DataFrame({"c2": [1], "c3": [True], "c1": ["1"]})
136-
paths = wr.s3.to_parquet(
132+
wr.s3.to_parquet(
137133
df=df,
138134
path=path,
139135
dataset=True,
@@ -145,9 +141,8 @@ def test_routine_0(glue_database, glue_table, path, use_threads, concurrent_part
145141
columns_comments={"c1": "1!", "c2": "2!", "c3": "3"},
146142
use_threads=use_threads,
147143
concurrent_partitioning=concurrent_partitioning,
148-
)["paths"]
144+
)
149145
assert wr.catalog.get_table_number_of_versions(table=glue_table, database=glue_database) == 1
150-
wr.s3.wait_objects_exist(paths=paths, use_threads=use_threads)
151146
df2 = wr.athena.read_sql_table(glue_table, glue_database, use_threads=use_threads)
152147
assert len(df2.columns) == 3
153148
assert len(df2.index) == 10
@@ -165,7 +160,7 @@ def test_routine_0(glue_database, glue_table, path, use_threads, concurrent_part
165160

166161
# Round 6 - Overwrite Partitioned
167162
df = pd.DataFrame({"c0": ["foo", None], "c1": [0, 1]})
168-
paths = wr.s3.to_parquet(
163+
wr.s3.to_parquet(
169164
df=df,
170165
path=path,
171166
dataset=True,
@@ -178,9 +173,8 @@ def test_routine_0(glue_database, glue_table, path, use_threads, concurrent_part
178173
columns_comments={"c0": "zero", "c1": "one"},
179174
use_threads=use_threads,
180175
concurrent_partitioning=concurrent_partitioning,
181-
)["paths"]
176+
)
182177
assert wr.catalog.get_table_number_of_versions(table=glue_table, database=glue_database) == 1
183-
wr.s3.wait_objects_exist(paths=paths, use_threads=use_threads)
184178
df2 = wr.athena.read_sql_table(glue_table, glue_database, use_threads=use_threads)
185179
assert df.shape == df2.shape
186180
assert df.c1.sum() == df2.c1.sum()
@@ -196,7 +190,7 @@ def test_routine_0(glue_database, glue_table, path, use_threads, concurrent_part
196190

197191
# Round 7 - Overwrite Partitions
198192
df = pd.DataFrame({"c0": [None, None], "c1": [0, 2]})
199-
paths = wr.s3.to_parquet(
193+
wr.s3.to_parquet(
200194
df=df,
201195
path=path,
202196
dataset=True,
@@ -209,9 +203,8 @@ def test_routine_0(glue_database, glue_table, path, use_threads, concurrent_part
209203
columns_comments={"c0": "zero", "c1": "one"},
210204
concurrent_partitioning=concurrent_partitioning,
211205
use_threads=use_threads,
212-
)["paths"]
206+
)
213207
assert wr.catalog.get_table_number_of_versions(table=glue_table, database=glue_database) == 1
214-
wr.s3.wait_objects_exist(paths=paths, use_threads=use_threads)
215208
df2 = wr.athena.read_sql_table(glue_table, glue_database, use_threads=use_threads)
216209
assert len(df2.columns) == 2
217210
assert len(df2.index) == 3
@@ -228,7 +221,7 @@ def test_routine_0(glue_database, glue_table, path, use_threads, concurrent_part
228221

229222
# Round 8 - Overwrite Partitions + New Column + Wrong Type
230223
df = pd.DataFrame({"c0": [1, 2], "c1": ["1", "3"], "c2": [True, False]})
231-
paths = wr.s3.to_parquet(
224+
wr.s3.to_parquet(
232225
df=df,
233226
path=path,
234227
dataset=True,
@@ -241,9 +234,8 @@ def test_routine_0(glue_database, glue_table, path, use_threads, concurrent_part
241234
columns_comments={"c0": "zero", "c1": "one", "c2": "two"},
242235
use_threads=use_threads,
243236
concurrent_partitioning=concurrent_partitioning,
244-
)["paths"]
237+
)
245238
assert wr.catalog.get_table_number_of_versions(table=glue_table, database=glue_database) == 1
246-
wr.s3.wait_objects_exist(paths=paths, use_threads=use_threads)
247239
df2 = wr.athena.read_sql_table(glue_table, glue_database, use_threads=use_threads)
248240
assert len(df2.columns) == 3
249241
assert len(df2.index) == 4
@@ -264,8 +256,7 @@ def test_routine_1(glue_database, glue_table, path):
264256

265257
# Round 1 - Warm up
266258
df = pd.DataFrame({"c0": [0, None]}, dtype="Int64")
267-
paths = wr.s3.to_parquet(df=df, path=path, dataset=True, mode="overwrite")["paths"]
268-
wr.s3.wait_objects_exist(paths=paths)
259+
wr.s3.to_parquet(df=df, path=path, dataset=True, mode="overwrite")
269260
wr.s3.store_parquet_metadata(
270261
path=path,
271262
dataset=True,
@@ -291,8 +282,7 @@ def test_routine_1(glue_database, glue_table, path):
291282

292283
# Round 2 - Overwrite
293284
df = pd.DataFrame({"c1": [None, 1, None]}, dtype="Int16")
294-
paths = wr.s3.to_parquet(df=df, path=path, dataset=True, mode="overwrite")["paths"]
295-
wr.s3.wait_objects_exist(paths=paths)
285+
wr.s3.to_parquet(df=df, path=path, dataset=True, mode="overwrite")
296286
wr.s3.store_parquet_metadata(
297287
path=path,
298288
dataset=True,
@@ -318,8 +308,7 @@ def test_routine_1(glue_database, glue_table, path):
318308

319309
# Round 3 - Append
320310
df = pd.DataFrame({"c1": [None, 2, None]}, dtype="Int16")
321-
paths = wr.s3.to_parquet(df=df, path=path, dataset=True, mode="append")["paths"]
322-
wr.s3.wait_objects_exist(paths=paths)
311+
wr.s3.to_parquet(df=df, path=path, dataset=True, mode="append")
323312
wr.s3.store_parquet_metadata(
324313
path=path,
325314
dataset=True,
@@ -347,8 +336,7 @@ def test_routine_1(glue_database, glue_table, path):
347336
# Round 4 - Append + New Column
348337
df = pd.DataFrame({"c2": ["a", None, "b"], "c1": [None, 1, None]})
349338
df["c1"] = df["c1"].astype("Int16")
350-
paths = wr.s3.to_parquet(df=df, path=path, dataset=True, mode="append")["paths"]
351-
wr.s3.wait_objects_exist(paths=paths)
339+
wr.s3.to_parquet(df=df, path=path, dataset=True, mode="append")
352340
wr.s3.store_parquet_metadata(
353341
path=path,
354342
dataset=True,
@@ -376,8 +364,7 @@ def test_routine_1(glue_database, glue_table, path):
376364

377365
# Round 5 - Overwrite Partitioned
378366
df = pd.DataFrame({"c0": ["foo", None], "c1": [0, 1]})
379-
paths = wr.s3.to_parquet(df=df, path=path, dataset=True, mode="overwrite", partition_cols=["c1"])["paths"]
380-
wr.s3.wait_objects_exist(paths=paths)
367+
wr.s3.to_parquet(df=df, path=path, dataset=True, mode="overwrite", partition_cols=["c1"])
381368
wr.s3.store_parquet_metadata(
382369
path=path,
383370
dataset=True,
@@ -404,10 +391,7 @@ def test_routine_1(glue_database, glue_table, path):
404391

405392
# Round 6 - Overwrite Partitions
406393
df = pd.DataFrame({"c0": [None, "boo"], "c1": [0, 2]})
407-
paths = wr.s3.to_parquet(df=df, path=path, dataset=True, mode="overwrite_partitions", partition_cols=["c1"])[
408-
"paths"
409-
]
410-
wr.s3.wait_objects_exist(paths=paths)
394+
wr.s3.to_parquet(df=df, path=path, dataset=True, mode="overwrite_partitions", partition_cols=["c1"])
411395
wr.s3.store_parquet_metadata(
412396
path=path,
413397
dataset=True,
@@ -435,10 +419,7 @@ def test_routine_1(glue_database, glue_table, path):
435419

436420
# Round 7 - Overwrite Partitions + New Column
437421
df = pd.DataFrame({"c0": ["bar", None], "c1": [1, 3], "c2": [True, False]})
438-
paths = wr.s3.to_parquet(df=df, path=path, dataset=True, mode="overwrite_partitions", partition_cols=["c1"])[
439-
"paths"
440-
]
441-
wr.s3.wait_objects_exist(paths=paths)
422+
wr.s3.to_parquet(df=df, path=path, dataset=True, mode="overwrite_partitions", partition_cols=["c1"])
442423
wr.s3.store_parquet_metadata(
443424
path=path,
444425
dataset=True,

0 commit comments

Comments
 (0)