@@ -73,7 +73,12 @@ def extract_dynamic(self):
7373 if last_runtime :
7474 data [name ] = self .glue_context .create_dynamic_frame .from_options (
7575 connection_type = "s3" ,
76- connection_options = {"paths" : [self .source_path ], "recurse" : True },
76+ connection_options = {
77+ "paths" : [self .source_path ],
78+ "recurse" : True ,
79+ "groupFiles" : "inPartition" ,
80+ "groupSize" : "104857600" ,
81+ },
7782 format = "json" ,
7883 ).filter (
7984 f = lambda x , n = name : (x ["host" ].endswith (n ))
@@ -83,7 +88,12 @@ def extract_dynamic(self):
8388 else :
8489 data [name ] = self .glue_context .create_dynamic_frame .from_options (
8590 connection_type = "s3" ,
86- connection_options = {"paths" : [self .source_path ], "recurse" : True },
91+ connection_options = {
92+ "paths" : [self .source_path ],
93+ "recurse" : True ,
94+ "groupFiles" : "inPartition" ,
95+ "groupSize" : "104857600" ,
96+ },
8797 format = "json" ,
8898 ).filter (f = lambda x , n = name : x ["host" ].endswith (n ))
8999
@@ -109,7 +119,7 @@ def load(self, data):
109119 self .logger .info (
110120 f"Attempting to load dataframe { name } into { self .target_path } { name } "
111121 )
112- dataframe .coalesce ( 1 ). write .mode ("append" ).partitionBy (
122+ dataframe .write .mode ("append" ).partitionBy (
113123 * self .partition_cols
114124 ).parquet (f"{ self .target_path } { name } " )
115125 except :
0 commit comments