Skip to content

Commit 49c32f9

Browse files
CAMEL-23267: Use a LinkedHashMap as inProgressRepository for file component
1 parent 11da8f8 commit 49c32f9

File tree

3 files changed

+68
-2
lines changed

3 files changed

+68
-2
lines changed

components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
189189
+ "org.apache.camel.spi.IdempotentRepository. The in-progress repository is used to account the current in "
190190
+ "progress files being consumed. By default a memory based repository is used.")
191191
protected IdempotentRepository inProgressRepository
192-
= MemoryIdempotentRepository.memoryIdempotentRepository(DEFAULT_IN_PROGRESS_CACHE_SIZE);
192+
= MemoryIdempotentRepository.memoryIdempotentRepositoryInsertionOrder(DEFAULT_IN_PROGRESS_CACHE_SIZE);
193193
@UriParam(label = "consumer,advanced", description = "When consuming, a local work directory can be used to "
194194
+ "store the remote file content directly in local files, to avoid loading the content into memory. This "
195195
+ "is beneficial, if you consume a very big remote file and thus can conserve memory.")
@@ -1502,7 +1502,7 @@ public IdempotentRepository getInProgressRepository() {
15021502

15031503
/**
15041504
* A pluggable in-progress repository org.apache.camel.spi.IdempotentRepository. The in-progress repository is used
1505-
* to account the current in progress files being consumed. By default a memory based repository is used.
1505+
* to account the current in progress files being consumed. By default, a memory based repository is used.
15061506
*/
15071507
public void setInProgressRepository(IdempotentRepository inProgressRepository) {
15081508
this.inProgressRepository = inProgressRepository;

core/camel-support/src/main/java/org/apache/camel/support/processor/idempotent/MemoryIdempotentRepository.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
*/
1717
package org.apache.camel.support.processor.idempotent;
1818

19+
import java.util.LinkedHashMap;
1920
import java.util.Map;
21+
import java.util.Map.Entry;
2022
import java.util.concurrent.locks.Lock;
2123
import java.util.concurrent.locks.ReentrantLock;
2224

@@ -77,6 +79,21 @@ public static IdempotentRepository memoryIdempotentRepository(int cacheSize) {
7779
return answer;
7880
}
7981

82+
/**
83+
* Creates a new memory based repository using a {@link java.util.LinkedHashMap} as its store, with the given
84+
* maximum capacity. When a new entry is added and the store has reached its maximum capacity, the oldest entry is
85+
* removed.
86+
*/
87+
public static IdempotentRepository memoryIdempotentRepositoryInsertionOrder(int cacheSize) {
88+
LinkedHashMap<String, Object> map = new LinkedHashMap<>() {
89+
@Override
90+
protected boolean removeEldestEntry(Entry<String, Object> eldest) {
91+
return size() > cacheSize;
92+
}
93+
};
94+
return memoryIdempotentRepository(map);
95+
}
96+
8097
/**
8198
* Creates a new memory based repository using the given {@link Map} to use to store the processed message ids.
8299
* <p/>
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.camel.support;
18+
19+
import java.io.IOException;
20+
21+
import org.apache.camel.spi.IdempotentRepository;
22+
import org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository;
23+
import org.junit.jupiter.api.Test;
24+
25+
import static org.junit.jupiter.api.Assertions.assertFalse;
26+
import static org.junit.jupiter.api.Assertions.assertTrue;
27+
28+
class MemoryIdempotentRepositoryTest {
29+
30+
@Test
31+
void repositoryEvictsOldestEntryWhenRepositoryIsFull() throws IOException {
32+
final int cacheSize = 5;
33+
final int entriesNotFittingInRepository = 4;
34+
try (IdempotentRepository repository = MemoryIdempotentRepository.memoryIdempotentRepositoryInsertionOrder(
35+
cacheSize)) {
36+
37+
for (int i = 0; i < cacheSize + entriesNotFittingInRepository; i++) {
38+
repository.add(String.valueOf(i));
39+
}
40+
41+
for (int i = entriesNotFittingInRepository; i < cacheSize + entriesNotFittingInRepository; i++) {
42+
assertTrue(repository.contains(String.valueOf(i)), "Repository should contain entry " + i);
43+
}
44+
for (int i = 0; i < cacheSize - entriesNotFittingInRepository; i++) {
45+
assertFalse(repository.contains(String.valueOf(i)), "Repository should not contain entry " + i);
46+
}
47+
}
48+
}
49+
}

0 commit comments

Comments
 (0)