1717 * under the License.
1818 */
1919package org .apache .iceberg .data ;
20-
20+ import java .io .IOException ;
21+ import java .nio .ByteBuffer ;
22+ import java .nio .charset .StandardCharsets ;
23+ import java .security .MessageDigest ;
24+ import java .security .NoSuchAlgorithmException ;
25+ import java .util .ArrayList ;
2126import java .util .Collection ;
27+ import java .util .HashMap ;
2228import java .util .List ;
2329import java .util .Map ;
2430import java .util .Set ;
4450import org .apache .iceberg .types .Types ;
4551import org .apache .iceberg .util .StructLikeSet ;
4652import org .apache .iceberg .util .StructProjection ;
53+ import org .junit .jupiter .api .TestTemplate ;
4754import org .slf4j .Logger ;
4855import org .slf4j .LoggerFactory ;
4956
@@ -55,14 +62,16 @@ public abstract class DeleteFilter<T> {
5562 private final List <DeleteFile > eqDeletes ;
5663 private final Schema requiredSchema ;
5764 private final Accessor <StructLike > posAccessor ;
58- private final boolean hasIsDeletedColumn ;
65+ private final boolean hasIsDeletedClumn ;
5966 private final int isDeletedColumnPosition ;
6067 private final DeleteCounter counter ;
6168
6269 private volatile DeleteLoader deleteLoader = null ;
6370 private PositionDeleteIndex deleteRowPositions = null ;
6471 private List <Predicate <T >> isInDeleteSets = null ;
6572 private Predicate <T > eqDeleteRows = null ;
73+ private final int batchSize = 100 ; // Number of delete files to process per batch
74+
6675
6776 protected DeleteFilter (
6877 String filePath ,
@@ -149,7 +158,6 @@ private DeleteLoader deleteLoader() {
149158 if (deleteLoader == null ) {
150159 synchronized (this ) {
151160 if (deleteLoader == null ) {
152- this .deleteLoader = newDeleteLoader ();
153161 }
154162 }
155163 }
@@ -161,39 +169,97 @@ public CloseableIterable<T> filter(CloseableIterable<T> records) {
161169 return applyEqDeletes (applyPosDeletes (records ));
162170 }
163171
164- private List <Predicate <T >> applyEqDeletes () {
172+
173+ // refer to final int batchsize at the top
174+ // this is starter code, need to write a test to explore method further.
175+ public List <Predicate <T >> applyEqDeletes () {
165176 if (isInDeleteSets != null ) {
166177 return isInDeleteSets ;
167178 }
168179
169- isInDeleteSets = Lists . newArrayList ();
180+ isInDeleteSets = new ArrayList <> ();
170181 if (eqDeletes .isEmpty ()) {
171182 return isInDeleteSets ;
172183 }
173184
174- Multimap <Set <Integer >, DeleteFile > filesByDeleteIds =
175- Multimaps .newMultimap (Maps .newHashMap (), Lists ::newArrayList );
185+ List <DeleteFile > currentBatch = new ArrayList <>();
176186 for (DeleteFile delete : eqDeletes ) {
187+ currentBatch .add (delete );
188+
189+ if (currentBatch .size () >= batchSize ) {
190+ processBatchAndAddPredicates (currentBatch );
191+ currentBatch .clear ();
192+ }
193+ }
194+
195+ if (!currentBatch .isEmpty ()) {
196+ processBatchAndAddPredicates (currentBatch );
197+ }
198+
199+ return isInDeleteSets ;
200+ }
201+
202+ private void processBatchAndAddPredicates (List <DeleteFile > deleteBatch ) {
203+ Multimap <Set <Integer >, DeleteFile > filesByDeleteIds =
204+ Multimaps .newMultimap (Maps .newHashMap (), ArrayList ::new );
205+
206+ for (DeleteFile delete : deleteBatch ) {
177207 filesByDeleteIds .put (Sets .newHashSet (delete .equalityFieldIds ()), delete );
178208 }
179209
180210 for (Map .Entry <Set <Integer >, Collection <DeleteFile >> entry :
181- filesByDeleteIds .asMap ().entrySet ()) {
211+ filesByDeleteIds .asMap ().entrySet ()) {
182212 Set <Integer > ids = entry .getKey ();
183213 Iterable <DeleteFile > deletes = entry .getValue ();
184214
185215 Schema deleteSchema = TypeUtil .select (requiredSchema , ids );
186216
187- // a projection to select and reorder fields of the file schema to match the delete rows
188217 StructProjection projectRow = StructProjection .create (requiredSchema , deleteSchema );
218+ Map <Integer , List <StructLike >> hashBuckets = new HashMap <>();
219+ for (DeleteFile delete : deletes ) {
220+ for (StructLike deleteRecord : deleteLoader ().loadEqualityDeletes ((Iterable <DeleteFile >) delete , deleteSchema )) {
221+ StructLike projectedDeleteRecord = projectRow .wrap (deleteRecord );
222+
223+ int hash = computeHash (projectedDeleteRecord );
224+
225+ hashBuckets .computeIfAbsent (hash , k -> new ArrayList <>()).add (projectedDeleteRecord );
226+ }
227+ }
228+
229+ Predicate <T > isInDeleteSet = record -> {
230+ StructLike wrappedRecord = projectRow .wrap (asStructLike (record ));
231+
232+ int hash = computeHash (wrappedRecord );
233+
234+ if (!hashBuckets .containsKey (hash )) {
235+ return false ;
236+ }
237+
238+ List <StructLike > deleteRecords = hashBuckets .get (hash );
239+ for (StructLike deleteRecord : deleteRecords ) {
240+ if (deleteRecord .equals (wrappedRecord )) {
241+ return true ;
242+ }
243+ }
244+ return false ;
245+ };
189246
190- StructLikeSet deleteSet = deleteLoader ().loadEqualityDeletes (deletes , deleteSchema );
191- Predicate <T > isInDeleteSet =
192- record -> deleteSet .contains (projectRow .wrap (asStructLike (record )));
193247 isInDeleteSets .add (isInDeleteSet );
194248 }
249+ }
195250
196- return isInDeleteSets ;
251+
252+ private int computeHash (StructLike record ) {
253+ try {
254+ MessageDigest digest = MessageDigest .getInstance ("SHA-1" );
255+ byte [] bytes = record .toString ().getBytes (StandardCharsets .UTF_8 );
256+ byte [] hashBytes = digest .digest (bytes );
257+
258+ // Convert the first 4 bytes of the hash into an integer
259+ return ByteBuffer .wrap (hashBytes ).getInt ();
260+ } catch (NoSuchAlgorithmException e ) {
261+ throw new RuntimeException ("Error computing hash" , e );
262+ }
197263 }
198264
199265 public CloseableIterable <T > findEqualityDeleteRows (CloseableIterable <T > records ) {
0 commit comments