Skip to content

Commit 083092a

Browse files
marevolclaude
andauthored
Improve data store handling with thread safety and resource management enhancements (#2954)
* Improve data store handling with thread safety and resource management enhancements This commit addresses several improvements in the data store processing implementation: **Thread Safety Improvements:** - DataStoreFactory: Changed dataStoreMap from LinkedHashMap to ConcurrentHashMap for thread-safe concurrent access - DataStoreFactory: Made dataStoreNames and lastLoadedTime volatile to ensure visibility across threads - DataStoreFactory: Made getDataStoreNames() synchronized to prevent race conditions during cache refresh - AbstractDataStore: Made alive field volatile to ensure proper visibility in multi-threaded scenarios - FileListIndexUpdateCallbackImpl: Changed deleteUrlList from ArrayList to CopyOnWriteArrayList for thread-safe access from executor threads **Resource Handling:** - DataStoreFactory: Added null check for jarFiles array to prevent NullPointerException - DataStoreFactory: Added existence check for XML configuration files before attempting to read them, reducing unnecessary exception handling **Code Quality:** - AbstractDataStore: Moved static logger declaration before instance fields following Java best practices - FileListIndexUpdateCallbackImpl: Fixed incorrect StringUtil import (was using Apache POI's StringUtil instead of Fess's) These improvements enhance the reliability and thread safety of the data store processing layer, particularly important for concurrent crawling operations. * Optimize deleteUrlList by replacing CopyOnWriteArrayList with ArrayList Changed deleteUrlList from CopyOnWriteArrayList to ArrayList since all access is already protected by synchronized(indexUpdateCallback) blocks. CopyOnWriteArrayList creates a copy on every write operation which is unnecessary overhead when synchronization is already in place. Changes: - Replaced CopyOnWriteArrayList with ArrayList for deleteUrlList - Added synchronized block in commit() method for consistency - Updated javadoc to clarify synchronization strategy This improves performance for batch delete operations by eliminating unnecessary array copying on each URL addition. * Fix ProcessHelperTest timing issues by using long-running processes Fixed two failing tests that were using 'echo' commands which complete almost instantly, making it impossible to verify process running state: 1. test_startProcess_replaceExistingProcess: - Changed from 'echo' to 'sleep 10' commands - Added proper wait time before checking process state - Added verification that only one process exists for the session 2. test_destroyProcess_withRunningProcess: - Changed from 'echo' to 'sleep 10' command - Simplified polling logic - just wait 100ms for process to start - Updated exit code assertion (forcibly destroyed processes may have non-zero exit codes) These changes ensure the tests can reliably verify that processes are running before attempting to check their state or destroy them. * Add comprehensive thread safety tests for data store improvements Added extensive test coverage for the thread safety improvements made to the data store handling layer. DataStoreFactoryTest additions: - test_add_concurrentAccess: Verifies ConcurrentHashMap handles concurrent additions - test_getDataStore_concurrentWithAdd: Tests concurrent read/write operations - test_getDataStoreNames_concurrentAccess: Verifies synchronized method correctness - test_volatileFields_visibility: Ensures volatile fields are visible across threads - test_cacheRefresh_withConcurrentReads: Validates cache refresh with concurrent access - test_nullSafety_concurrentAccess: Tests null handling under concurrent load AbstractDataStoreTest additions: - test_aliveField_volatileVisibility: Verifies volatile alive field visibility - test_stop_volatileVisibility: Tests stop() method sets alive and is visible - test_stop_concurrentAccess: Validates concurrent calls to stop() - test_aliveField_concurrentReadWrite: Tests concurrent read/write of alive field - test_getName_concurrentAccess: Verifies getName() is thread-safe FileListIndexUpdateCallbackImplTest additions: - test_deleteUrlList_synchronizedAccess: Validates ArrayList with synchronized blocks - test_deleteUrlList_concurrentReads: Tests concurrent reads from deleteUrlList - test_deleteUrlList_clearOperation: Verifies clear() operation thread safety - test_deleteUrlList_iteration: Tests iteration with proper synchronization - test_deleteUrlList_isEmptyCheck: Validates isEmpty() check under concurrent access These tests verify: 1. ConcurrentHashMap correctly handles concurrent operations in DataStoreFactory 2. Volatile fields ensure proper visibility across threads 3. Synchronized methods prevent race conditions during cache refresh 4. ArrayList works correctly when all access is properly synchronized 5. No ConcurrentModificationException occurs in any concurrent scenario All tests use minimal or no sleep to maintain fast test execution time. * Revert dataStoreMap from ConcurrentHashMap to LinkedHashMap Based on feedback that dataStoreMap is not accessed by multiple threads, reverted the ConcurrentHashMap change back to LinkedHashMap. Changes: - Reverted dataStoreMap from ConcurrentHashMap to LinkedHashMap - Removed ConcurrentHashMap import - Removed dataStoreMap-related concurrency tests: * test_add_concurrentAccess * test_getDataStore_concurrentWithAdd * test_nullSafety_concurrentAccess Kept volatile fields and synchronized getDataStoreNames(): - lastLoadedTime and dataStoreNames remain volatile (for cache visibility) - getDataStoreNames() remains synchronized (for cache refresh safety) - Related tests for getDataStoreNames() concurrency are maintained The dataStoreMap is accessed in single-threaded context during application initialization and data store registration. --------- Co-authored-by: Claude <[email protected]>
1 parent 44146b4 commit 083092a

File tree

7 files changed

+801
-504
lines changed

7 files changed

+801
-504
lines changed

src/main/java/org/codelibs/fess/ds/AbstractDataStore.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,6 @@
4343
*/
4444
public abstract class AbstractDataStore implements DataStore {
4545

46-
/**
47-
* Default constructor.
48-
*/
49-
public AbstractDataStore() {
50-
// nothing
51-
}
52-
5346
private static final Logger logger = LogManager.getLogger(AbstractDataStore.class);
5447

5548
/**
@@ -64,8 +57,16 @@ public AbstractDataStore() {
6457

6558
/**
6659
* The flag to check if the data store is alive.
60+
* Volatile to ensure visibility across threads.
6761
*/
68-
protected boolean alive = true;
62+
protected volatile boolean alive = true;
63+
64+
/**
65+
* Default constructor.
66+
*/
67+
public AbstractDataStore() {
68+
// nothing
69+
}
6970

7071
/**
7172
* Register this data store.

src/main/java/org/codelibs/fess/ds/DataStoreFactory.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,14 @@ public class DataStoreFactory {
7171
* Cached array of available data store names discovered from plugin JAR files.
7272
* This cache is refreshed periodically based on the lastLoadedTime.
7373
*/
74-
protected String[] dataStoreNames = StringUtil.EMPTY_STRINGS;
74+
protected volatile String[] dataStoreNames = StringUtil.EMPTY_STRINGS;
7575

7676
/**
7777
* Timestamp of the last time data store names were loaded from plugin files.
7878
* Used to implement a time-based cache refresh mechanism.
79+
* Volatile to ensure visibility across threads.
7980
*/
80-
protected long lastLoadedTime = 0;
81+
protected volatile long lastLoadedTime = 0;
8182

8283
/**
8384
* Creates a new instance of DataStoreFactory.
@@ -130,7 +131,7 @@ public DataStore getDataStore(final String name) {
130131
*
131132
* @return array of data store names sorted alphabetically, never null
132133
*/
133-
public String[] getDataStoreNames() {
134+
public synchronized String[] getDataStoreNames() {
134135
final long now = ComponentUtil.getSystemHelper().getCurrentTimeAsLong();
135136
if (now - lastLoadedTime > 60000L) {
136137
final List<String> nameList = loadDataStoreNameList();
@@ -154,9 +155,18 @@ public String[] getDataStoreNames() {
154155
protected List<String> loadDataStoreNameList() {
155156
final Set<String> nameSet = new HashSet<>();
156157
final File[] jarFiles = ResourceUtil.getPluginJarFiles(PluginHelper.ArtifactType.DATA_STORE.getId());
158+
if (jarFiles == null) {
159+
return nameSet.stream().sorted().collect(Collectors.toList());
160+
}
157161
for (final File jarFile : jarFiles) {
158162
try (FileSystem fs = FileSystems.newFileSystem(jarFile.toPath(), ClassLoader.getSystemClassLoader())) {
159163
final Path xmlPath = fs.getPath("fess_ds++.xml");
164+
if (!Files.exists(xmlPath)) {
165+
if (logger.isDebugEnabled()) {
166+
logger.debug("Configuration file not found in {}", jarFile.getAbsolutePath());
167+
}
168+
continue;
169+
}
160170
try (InputStream is = Files.newInputStream(xmlPath)) {
161171
final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
162172
factory.setFeature(org.codelibs.fess.crawler.Constants.FEATURE_SECURE_PROCESSING, true);

src/main/java/org/codelibs/fess/ds/callback/FileListIndexUpdateCallbackImpl.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434

3535
import org.apache.logging.log4j.LogManager;
3636
import org.apache.logging.log4j.Logger;
37-
import org.apache.poi.util.StringUtil;
37+
import org.codelibs.core.lang.StringUtil;
3838
import org.codelibs.fess.Constants;
3939
import org.codelibs.fess.crawler.builder.RequestDataBuilder;
4040
import org.codelibs.fess.crawler.client.CrawlerClient;
@@ -89,8 +89,11 @@ public class FileListIndexUpdateCallbackImpl implements IndexUpdateCallback {
8989
/** Factory for creating crawler clients to handle different URL schemes. */
9090
protected CrawlerClientFactory crawlerClientFactory;
9191

92-
/** List of URLs to be deleted, cached for batch processing. */
93-
protected List<String> deleteUrlList = new ArrayList<>(100);
92+
/**
93+
* List of URLs to be deleted, cached for batch processing.
94+
* All access is synchronized via indexUpdateCallback lock.
95+
*/
96+
protected List<String> deleteUrlList = new ArrayList<>();
9497

9598
/** Maximum size of the delete URL cache before batch deletion is triggered. */
9699
protected int maxDeleteDocumentCacheSize;
@@ -585,8 +588,10 @@ public void commit() {
585588
executor.shutdownNow();
586589
}
587590

588-
if (!deleteUrlList.isEmpty()) {
589-
deleteDocuments();
591+
synchronized (indexUpdateCallback) {
592+
if (!deleteUrlList.isEmpty()) {
593+
deleteDocuments();
594+
}
590595
}
591596
indexUpdateCallback.commit();
592597
}

src/test/java/org/codelibs/fess/ds/AbstractDataStoreTest.java

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,4 +108,235 @@ public void test_convertValue() {
108108
value = " ";
109109
assertNull(dataStore.convertValue(Constants.DEFAULT_SCRIPT, value, paramMap));
110110
}
111+
112+
// ========== Thread Safety Tests ==========
113+
114+
/**
115+
* Test that the volatile alive field is visible across threads.
116+
* One thread sets alive to false, other threads should see the change immediately.
117+
*/
118+
public void test_aliveField_volatileVisibility() throws Exception {
119+
// Ensure alive starts as true
120+
assertTrue(dataStore.alive);
121+
122+
final int readerThreadCount = 10;
123+
final Thread[] readerThreads = new Thread[readerThreadCount];
124+
final boolean[][] observations = new boolean[readerThreadCount][100];
125+
126+
// Start reader threads that continuously check alive field
127+
for (int i = 0; i < readerThreadCount; i++) {
128+
final int threadIndex = i;
129+
readerThreads[i] = new Thread(() -> {
130+
for (int j = 0; j < 100; j++) {
131+
observations[threadIndex][j] = dataStore.alive;
132+
// Small yield to allow context switching
133+
Thread.yield();
134+
}
135+
});
136+
readerThreads[i].start();
137+
}
138+
139+
// Writer thread sets alive to false
140+
Thread writerThread = new Thread(() -> {
141+
dataStore.alive = false;
142+
});
143+
writerThread.start();
144+
writerThread.join();
145+
146+
// Wait for all reader threads to complete
147+
for (Thread thread : readerThreads) {
148+
thread.join();
149+
}
150+
151+
// Verify that alive was changed to false
152+
assertFalse("alive should be false after writer thread", dataStore.alive);
153+
154+
// At least some observations from reader threads should have seen false
155+
// due to volatile ensuring visibility
156+
int falseCount = 0;
157+
for (int i = 0; i < readerThreadCount; i++) {
158+
for (int j = 0; j < 100; j++) {
159+
if (!observations[i][j]) {
160+
falseCount++;
161+
}
162+
}
163+
}
164+
assertTrue("Some threads should have observed alive=false", falseCount > 0);
165+
}
166+
167+
/**
168+
* Test stop() method sets alive to false and is visible to other threads.
169+
*/
170+
public void test_stop_volatileVisibility() throws Exception {
171+
assertTrue(dataStore.alive);
172+
173+
final boolean[] observedValues = new boolean[10];
174+
final Thread[] threads = new Thread[10];
175+
176+
// Call stop() in main thread
177+
dataStore.stop();
178+
179+
// Multiple threads read the alive field
180+
for (int i = 0; i < 10; i++) {
181+
final int index = i;
182+
threads[i] = new Thread(() -> {
183+
observedValues[index] = dataStore.alive;
184+
});
185+
}
186+
187+
for (Thread thread : threads) {
188+
thread.start();
189+
}
190+
191+
for (Thread thread : threads) {
192+
thread.join();
193+
}
194+
195+
// All threads should observe alive=false due to volatile
196+
for (int i = 0; i < 10; i++) {
197+
assertFalse("Thread " + i + " should see alive=false", observedValues[i]);
198+
}
199+
}
200+
201+
/**
202+
* Test concurrent access to stop() method.
203+
* Multiple threads call stop() simultaneously - should be safe.
204+
*/
205+
public void test_stop_concurrentAccess() throws Exception {
206+
final int threadCount = 10;
207+
final Thread[] threads = new Thread[threadCount];
208+
final Exception[] exceptions = new Exception[threadCount];
209+
210+
for (int i = 0; i < threadCount; i++) {
211+
final int index = i;
212+
threads[i] = new Thread(() -> {
213+
try {
214+
dataStore.stop();
215+
} catch (Exception e) {
216+
exceptions[index] = e;
217+
}
218+
});
219+
}
220+
221+
// Start all threads
222+
for (Thread thread : threads) {
223+
thread.start();
224+
}
225+
226+
// Wait for all threads to complete
227+
for (Thread thread : threads) {
228+
thread.join();
229+
}
230+
231+
// Verify no exceptions occurred
232+
for (int i = 0; i < threadCount; i++) {
233+
assertNull("Thread " + i + " threw exception", exceptions[i]);
234+
}
235+
236+
// Verify alive is false
237+
assertFalse("alive should be false after all stop() calls", dataStore.alive);
238+
}
239+
240+
/**
241+
* Test that multiple threads can safely read alive field while one writes.
242+
*/
243+
public void test_aliveField_concurrentReadWrite() throws Exception {
244+
dataStore.alive = true;
245+
246+
final int readerCount = 5;
247+
final int iterations = 1000;
248+
final Thread[] readers = new Thread[readerCount];
249+
final Exception[] exceptions = new Exception[readerCount + 1];
250+
final int[] trueCount = new int[readerCount];
251+
final int[] falseCount = new int[readerCount];
252+
253+
// Start reader threads
254+
for (int i = 0; i < readerCount; i++) {
255+
final int index = i;
256+
readers[i] = new Thread(() -> {
257+
try {
258+
for (int j = 0; j < iterations; j++) {
259+
if (dataStore.alive) {
260+
trueCount[index]++;
261+
} else {
262+
falseCount[index]++;
263+
}
264+
}
265+
} catch (Exception e) {
266+
exceptions[index] = e;
267+
}
268+
});
269+
}
270+
271+
// Start writer thread that toggles alive
272+
Thread writer = new Thread(() -> {
273+
try {
274+
for (int i = 0; i < 100; i++) {
275+
dataStore.alive = !dataStore.alive;
276+
Thread.yield();
277+
}
278+
} catch (Exception e) {
279+
exceptions[readerCount] = e;
280+
}
281+
});
282+
283+
// Start all threads
284+
for (Thread reader : readers) {
285+
reader.start();
286+
}
287+
writer.start();
288+
289+
// Wait for completion
290+
for (Thread reader : readers) {
291+
reader.join();
292+
}
293+
writer.join();
294+
295+
// Verify no exceptions
296+
for (int i = 0; i <= readerCount; i++) {
297+
assertNull("Thread " + i + " threw exception", exceptions[i]);
298+
}
299+
300+
// Verify all readers completed all iterations
301+
for (int i = 0; i < readerCount; i++) {
302+
assertEquals("Reader " + i + " should complete all iterations", iterations, trueCount[i] + falseCount[i]);
303+
}
304+
}
305+
306+
/**
307+
* Test getName() method can be called concurrently without issues.
308+
*/
309+
public void test_getName_concurrentAccess() throws Exception {
310+
final int threadCount = 10;
311+
final Thread[] threads = new Thread[threadCount];
312+
final String[] results = new String[threadCount];
313+
final Exception[] exceptions = new Exception[threadCount];
314+
315+
for (int i = 0; i < threadCount; i++) {
316+
final int index = i;
317+
threads[i] = new Thread(() -> {
318+
try {
319+
for (int j = 0; j < 100; j++) {
320+
results[index] = dataStore.getName();
321+
}
322+
} catch (Exception e) {
323+
exceptions[index] = e;
324+
}
325+
});
326+
}
327+
328+
for (Thread thread : threads) {
329+
thread.start();
330+
}
331+
332+
for (Thread thread : threads) {
333+
thread.join();
334+
}
335+
336+
// Verify no exceptions
337+
for (int i = 0; i < threadCount; i++) {
338+
assertNull("Thread " + i + " threw exception", exceptions[i]);
339+
assertEquals("Thread " + i + " got wrong name", "Test", results[i]);
340+
}
341+
}
111342
}

0 commit comments

Comments
 (0)