diff --git a/src/main/java/org/codelibs/fess/ds/AbstractDataStore.java b/src/main/java/org/codelibs/fess/ds/AbstractDataStore.java index e6f638342..38059927f 100644 --- a/src/main/java/org/codelibs/fess/ds/AbstractDataStore.java +++ b/src/main/java/org/codelibs/fess/ds/AbstractDataStore.java @@ -43,13 +43,6 @@ */ public abstract class AbstractDataStore implements DataStore { - /** - * Default constructor. - */ - public AbstractDataStore() { - // nothing - } - private static final Logger logger = LogManager.getLogger(AbstractDataStore.class); /** @@ -64,8 +57,16 @@ public AbstractDataStore() { /** * The flag to check if the data store is alive. + * Volatile to ensure visibility across threads. */ - protected boolean alive = true; + protected volatile boolean alive = true; + + /** + * Default constructor. + */ + public AbstractDataStore() { + // nothing + } /** * Register this data store. diff --git a/src/main/java/org/codelibs/fess/ds/DataStoreFactory.java b/src/main/java/org/codelibs/fess/ds/DataStoreFactory.java index bc5df0721..d76a9b21f 100644 --- a/src/main/java/org/codelibs/fess/ds/DataStoreFactory.java +++ b/src/main/java/org/codelibs/fess/ds/DataStoreFactory.java @@ -71,13 +71,14 @@ public class DataStoreFactory { * Cached array of available data store names discovered from plugin JAR files. * This cache is refreshed periodically based on the lastLoadedTime. */ - protected String[] dataStoreNames = StringUtil.EMPTY_STRINGS; + protected volatile String[] dataStoreNames = StringUtil.EMPTY_STRINGS; /** * Timestamp of the last time data store names were loaded from plugin files. * Used to implement a time-based cache refresh mechanism. + * Volatile to ensure visibility across threads. */ - protected long lastLoadedTime = 0; + protected volatile long lastLoadedTime = 0; /** * Creates a new instance of DataStoreFactory. @@ -130,7 +131,7 @@ public DataStore getDataStore(final String name) { * * @return array of data store names sorted alphabetically, never null */ - public String[] getDataStoreNames() { + public synchronized String[] getDataStoreNames() { final long now = ComponentUtil.getSystemHelper().getCurrentTimeAsLong(); if (now - lastLoadedTime > 60000L) { final List nameList = loadDataStoreNameList(); @@ -154,9 +155,18 @@ public String[] getDataStoreNames() { protected List loadDataStoreNameList() { final Set nameSet = new HashSet<>(); final File[] jarFiles = ResourceUtil.getPluginJarFiles(PluginHelper.ArtifactType.DATA_STORE.getId()); + if (jarFiles == null) { + return nameSet.stream().sorted().collect(Collectors.toList()); + } for (final File jarFile : jarFiles) { try (FileSystem fs = FileSystems.newFileSystem(jarFile.toPath(), ClassLoader.getSystemClassLoader())) { final Path xmlPath = fs.getPath("fess_ds++.xml"); + if (!Files.exists(xmlPath)) { + if (logger.isDebugEnabled()) { + logger.debug("Configuration file not found in {}", jarFile.getAbsolutePath()); + } + continue; + } try (InputStream is = Files.newInputStream(xmlPath)) { final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); factory.setFeature(org.codelibs.fess.crawler.Constants.FEATURE_SECURE_PROCESSING, true); diff --git a/src/main/java/org/codelibs/fess/ds/callback/FileListIndexUpdateCallbackImpl.java b/src/main/java/org/codelibs/fess/ds/callback/FileListIndexUpdateCallbackImpl.java index 92a6b6b18..d37fac9b9 100644 --- a/src/main/java/org/codelibs/fess/ds/callback/FileListIndexUpdateCallbackImpl.java +++ b/src/main/java/org/codelibs/fess/ds/callback/FileListIndexUpdateCallbackImpl.java @@ -34,7 +34,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.poi.util.StringUtil; +import org.codelibs.core.lang.StringUtil; import org.codelibs.fess.Constants; import org.codelibs.fess.crawler.builder.RequestDataBuilder; import org.codelibs.fess.crawler.client.CrawlerClient; @@ -89,8 +89,11 @@ public class FileListIndexUpdateCallbackImpl implements IndexUpdateCallback { /** Factory for creating crawler clients to handle different URL schemes. */ protected CrawlerClientFactory crawlerClientFactory; - /** List of URLs to be deleted, cached for batch processing. */ - protected List deleteUrlList = new ArrayList<>(100); + /** + * List of URLs to be deleted, cached for batch processing. + * All access is synchronized via indexUpdateCallback lock. + */ + protected List deleteUrlList = new ArrayList<>(); /** Maximum size of the delete URL cache before batch deletion is triggered. */ protected int maxDeleteDocumentCacheSize; @@ -585,8 +588,10 @@ public void commit() { executor.shutdownNow(); } - if (!deleteUrlList.isEmpty()) { - deleteDocuments(); + synchronized (indexUpdateCallback) { + if (!deleteUrlList.isEmpty()) { + deleteDocuments(); + } } indexUpdateCallback.commit(); } diff --git a/src/test/java/org/codelibs/fess/ds/AbstractDataStoreTest.java b/src/test/java/org/codelibs/fess/ds/AbstractDataStoreTest.java index 6d58a9aa9..81ce5118d 100644 --- a/src/test/java/org/codelibs/fess/ds/AbstractDataStoreTest.java +++ b/src/test/java/org/codelibs/fess/ds/AbstractDataStoreTest.java @@ -108,4 +108,235 @@ public void test_convertValue() { value = " "; assertNull(dataStore.convertValue(Constants.DEFAULT_SCRIPT, value, paramMap)); } + + // ========== Thread Safety Tests ========== + + /** + * Test that the volatile alive field is visible across threads. + * One thread sets alive to false, other threads should see the change immediately. + */ + public void test_aliveField_volatileVisibility() throws Exception { + // Ensure alive starts as true + assertTrue(dataStore.alive); + + final int readerThreadCount = 10; + final Thread[] readerThreads = new Thread[readerThreadCount]; + final boolean[][] observations = new boolean[readerThreadCount][100]; + + // Start reader threads that continuously check alive field + for (int i = 0; i < readerThreadCount; i++) { + final int threadIndex = i; + readerThreads[i] = new Thread(() -> { + for (int j = 0; j < 100; j++) { + observations[threadIndex][j] = dataStore.alive; + // Small yield to allow context switching + Thread.yield(); + } + }); + readerThreads[i].start(); + } + + // Writer thread sets alive to false + Thread writerThread = new Thread(() -> { + dataStore.alive = false; + }); + writerThread.start(); + writerThread.join(); + + // Wait for all reader threads to complete + for (Thread thread : readerThreads) { + thread.join(); + } + + // Verify that alive was changed to false + assertFalse("alive should be false after writer thread", dataStore.alive); + + // At least some observations from reader threads should have seen false + // due to volatile ensuring visibility + int falseCount = 0; + for (int i = 0; i < readerThreadCount; i++) { + for (int j = 0; j < 100; j++) { + if (!observations[i][j]) { + falseCount++; + } + } + } + assertTrue("Some threads should have observed alive=false", falseCount > 0); + } + + /** + * Test stop() method sets alive to false and is visible to other threads. + */ + public void test_stop_volatileVisibility() throws Exception { + assertTrue(dataStore.alive); + + final boolean[] observedValues = new boolean[10]; + final Thread[] threads = new Thread[10]; + + // Call stop() in main thread + dataStore.stop(); + + // Multiple threads read the alive field + for (int i = 0; i < 10; i++) { + final int index = i; + threads[i] = new Thread(() -> { + observedValues[index] = dataStore.alive; + }); + } + + for (Thread thread : threads) { + thread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + // All threads should observe alive=false due to volatile + for (int i = 0; i < 10; i++) { + assertFalse("Thread " + i + " should see alive=false", observedValues[i]); + } + } + + /** + * Test concurrent access to stop() method. + * Multiple threads call stop() simultaneously - should be safe. + */ + public void test_stop_concurrentAccess() throws Exception { + final int threadCount = 10; + final Thread[] threads = new Thread[threadCount]; + final Exception[] exceptions = new Exception[threadCount]; + + for (int i = 0; i < threadCount; i++) { + final int index = i; + threads[i] = new Thread(() -> { + try { + dataStore.stop(); + } catch (Exception e) { + exceptions[index] = e; + } + }); + } + + // Start all threads + for (Thread thread : threads) { + thread.start(); + } + + // Wait for all threads to complete + for (Thread thread : threads) { + thread.join(); + } + + // Verify no exceptions occurred + for (int i = 0; i < threadCount; i++) { + assertNull("Thread " + i + " threw exception", exceptions[i]); + } + + // Verify alive is false + assertFalse("alive should be false after all stop() calls", dataStore.alive); + } + + /** + * Test that multiple threads can safely read alive field while one writes. + */ + public void test_aliveField_concurrentReadWrite() throws Exception { + dataStore.alive = true; + + final int readerCount = 5; + final int iterations = 1000; + final Thread[] readers = new Thread[readerCount]; + final Exception[] exceptions = new Exception[readerCount + 1]; + final int[] trueCount = new int[readerCount]; + final int[] falseCount = new int[readerCount]; + + // Start reader threads + for (int i = 0; i < readerCount; i++) { + final int index = i; + readers[i] = new Thread(() -> { + try { + for (int j = 0; j < iterations; j++) { + if (dataStore.alive) { + trueCount[index]++; + } else { + falseCount[index]++; + } + } + } catch (Exception e) { + exceptions[index] = e; + } + }); + } + + // Start writer thread that toggles alive + Thread writer = new Thread(() -> { + try { + for (int i = 0; i < 100; i++) { + dataStore.alive = !dataStore.alive; + Thread.yield(); + } + } catch (Exception e) { + exceptions[readerCount] = e; + } + }); + + // Start all threads + for (Thread reader : readers) { + reader.start(); + } + writer.start(); + + // Wait for completion + for (Thread reader : readers) { + reader.join(); + } + writer.join(); + + // Verify no exceptions + for (int i = 0; i <= readerCount; i++) { + assertNull("Thread " + i + " threw exception", exceptions[i]); + } + + // Verify all readers completed all iterations + for (int i = 0; i < readerCount; i++) { + assertEquals("Reader " + i + " should complete all iterations", iterations, trueCount[i] + falseCount[i]); + } + } + + /** + * Test getName() method can be called concurrently without issues. + */ + public void test_getName_concurrentAccess() throws Exception { + final int threadCount = 10; + final Thread[] threads = new Thread[threadCount]; + final String[] results = new String[threadCount]; + final Exception[] exceptions = new Exception[threadCount]; + + for (int i = 0; i < threadCount; i++) { + final int index = i; + threads[i] = new Thread(() -> { + try { + for (int j = 0; j < 100; j++) { + results[index] = dataStore.getName(); + } + } catch (Exception e) { + exceptions[index] = e; + } + }); + } + + for (Thread thread : threads) { + thread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + // Verify no exceptions + for (int i = 0; i < threadCount; i++) { + assertNull("Thread " + i + " threw exception", exceptions[i]); + assertEquals("Thread " + i + " got wrong name", "Test", results[i]); + } + } } diff --git a/src/test/java/org/codelibs/fess/ds/DataStoreFactoryTest.java b/src/test/java/org/codelibs/fess/ds/DataStoreFactoryTest.java index 80dd8f610..02f67b0b1 100644 --- a/src/test/java/org/codelibs/fess/ds/DataStoreFactoryTest.java +++ b/src/test/java/org/codelibs/fess/ds/DataStoreFactoryTest.java @@ -337,4 +337,151 @@ protected void storeData(DataConfig dataConfig, IndexUpdateCallback callback, Da // Test implementation } } + + // ========== Thread Safety Tests for getDataStoreNames() ========== + + /** + * Test concurrent getDataStoreNames calls to verify synchronized method works correctly. + * Multiple threads call getDataStoreNames() simultaneously. + */ + public void test_getDataStoreNames_concurrentAccess() throws Exception { + final int threadCount = 10; + final Thread[] threads = new Thread[threadCount]; + final Exception[] exceptions = new Exception[threadCount]; + final String[][] results = new String[threadCount][]; + + DataStoreFactory testFactory = new DataStoreFactory() { + @Override + protected List loadDataStoreNameList() { + return List.of("Store1", "Store2", "Store3"); + } + }; + + for (int i = 0; i < threadCount; i++) { + final int index = i; + threads[i] = new Thread(() -> { + try { + results[index] = testFactory.getDataStoreNames(); + } catch (Exception e) { + exceptions[index] = e; + } + }); + } + + // Start all threads + for (Thread thread : threads) { + thread.start(); + } + + // Wait for all threads to complete + for (Thread thread : threads) { + thread.join(); + } + + // Verify no exceptions occurred + for (int i = 0; i < threadCount; i++) { + assertNull("Thread " + i + " threw exception", exceptions[i]); + assertNotNull("Thread " + i + " got null result", results[i]); + assertEquals("Thread " + i + " got wrong array length", 3, results[i].length); + } + } + + /** + * Test that volatile fields ensure visibility across threads. + * Verify that changes to lastLoadedTime are visible to all threads. + */ + public void test_volatileFields_visibility() throws Exception { + final DataStoreFactory testFactory = new DataStoreFactory() { + @Override + protected List loadDataStoreNameList() { + return List.of("Store1"); + } + }; + + // First call to initialize + testFactory.getDataStoreNames(); + + final long[] observedTimes = new long[10]; + final Thread[] threads = new Thread[10]; + + // Trigger cache refresh by setting old time + testFactory.lastLoadedTime = System.currentTimeMillis() - 70000L; + + // One thread updates + Thread updater = new Thread(() -> { + testFactory.getDataStoreNames(); + }); + + updater.start(); + updater.join(); + + // Multiple threads read the lastLoadedTime + for (int i = 0; i < 10; i++) { + final int index = i; + threads[i] = new Thread(() -> { + observedTimes[index] = testFactory.lastLoadedTime; + }); + } + + for (Thread thread : threads) { + thread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + // All threads should see the updated time (not 0 or the old value) + long firstTime = observedTimes[0]; + assertTrue("Time should be updated", firstTime > 0); + for (int i = 1; i < 10; i++) { + assertEquals("All threads should see same time due to volatile", firstTime, observedTimes[i]); + } + } + + /** + * Test cache refresh mechanism with concurrent access. + * Verifies that cache is refreshed correctly even with concurrent readers. + */ + public void test_cacheRefresh_withConcurrentReads() throws Exception { + final int[] loadCount = { 0 }; + final DataStoreFactory testFactory = new DataStoreFactory() { + @Override + protected List loadDataStoreNameList() { + synchronized (loadCount) { + loadCount[0]++; + } + return List.of("Store1", "Store2"); + } + }; + + // First load + testFactory.getDataStoreNames(); + assertEquals(1, loadCount[0]); + + // Simulate cache expiry + testFactory.lastLoadedTime = System.currentTimeMillis() - 70000L; + + // Multiple threads try to read simultaneously + final int threadCount = 5; + final Thread[] threads = new Thread[threadCount]; + + for (int i = 0; i < threadCount; i++) { + threads[i] = new Thread(() -> { + testFactory.getDataStoreNames(); + }); + } + + for (Thread thread : threads) { + thread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + // Due to synchronized method, only one thread should reload + // The count might be 2 (one initial + one reload) or slightly higher due to timing + assertTrue("Load count should be small due to synchronization", loadCount[0] <= 3); + } } \ No newline at end of file diff --git a/src/test/java/org/codelibs/fess/ds/callback/FileListIndexUpdateCallbackImplTest.java b/src/test/java/org/codelibs/fess/ds/callback/FileListIndexUpdateCallbackImplTest.java index dd22ff2ef..6713d892e 100644 --- a/src/test/java/org/codelibs/fess/ds/callback/FileListIndexUpdateCallbackImplTest.java +++ b/src/test/java/org/codelibs/fess/ds/callback/FileListIndexUpdateCallbackImplTest.java @@ -186,4 +186,374 @@ public void test_isUrlCrawlable_nullUrlWithoutPattern() { assertTrue(result); } + + // ========== Thread Safety Tests ========== + + /** + * Test that deleteUrlList access is thread-safe with synchronized blocks. + * This test verifies the ArrayList implementation works correctly when + * all access is properly synchronized via indexUpdateCallback lock. + */ + public void test_deleteUrlList_synchronizedAccess() throws Exception { + // Create a mock IndexUpdateCallback for synchronization + IndexUpdateCallback mockCallback = new IndexUpdateCallback() { + @Override + public void store(DataStoreParams paramMap, Map dataMap) { + } + + @Override + public long getDocumentSize() { + return 0; + } + + @Override + public long getExecuteTime() { + return 0; + } + + @Override + public void commit() { + } + }; + + FileListIndexUpdateCallbackImpl callback = new FileListIndexUpdateCallbackImpl(mockCallback, null, 1); + callback.setMaxDeleteDocumentCacheSize(1000); + + final int threadCount = 10; + final int urlsPerThread = 100; + final Thread[] threads = new Thread[threadCount]; + final Exception[] exceptions = new Exception[threadCount]; + + // Multiple threads add URLs to deleteUrlList + for (int i = 0; i < threadCount; i++) { + final int threadIndex = i; + threads[i] = new Thread(() -> { + try { + for (int j = 0; j < urlsPerThread; j++) { + // Access is synchronized internally in the actual implementation + // Here we just verify no concurrent modification exceptions occur + String url = "http://example.com/thread" + threadIndex + "/doc" + j; + // Simulate the synchronized access that happens in deleteDocument() + synchronized (mockCallback) { + callback.deleteUrlList.add(url); + } + } + } catch (Exception e) { + exceptions[threadIndex] = e; + } + }); + } + + // Start all threads + for (Thread thread : threads) { + thread.start(); + } + + // Wait for all threads to complete + for (Thread thread : threads) { + thread.join(); + } + + // Verify no exceptions occurred + for (int i = 0; i < threadCount; i++) { + assertNull("Thread " + i + " threw exception", exceptions[i]); + } + + // Verify all URLs were added + synchronized (mockCallback) { + assertEquals("All URLs should be added", threadCount * urlsPerThread, callback.deleteUrlList.size()); + } + } + + /** + * Test concurrent reads from deleteUrlList while synchronized. + * Verifies that ArrayList can be safely read when properly synchronized. + */ + public void test_deleteUrlList_concurrentReads() throws Exception { + IndexUpdateCallback mockCallback = new IndexUpdateCallback() { + @Override + public void store(DataStoreParams paramMap, Map dataMap) { + } + + @Override + public long getDocumentSize() { + return 0; + } + + @Override + public long getExecuteTime() { + return 0; + } + + @Override + public void commit() { + } + }; + + FileListIndexUpdateCallbackImpl callback = new FileListIndexUpdateCallbackImpl(mockCallback, null, 1); + + // Pre-populate with some URLs + synchronized (mockCallback) { + for (int i = 0; i < 100; i++) { + callback.deleteUrlList.add("http://example.com/doc" + i); + } + } + + final int threadCount = 10; + final Thread[] threads = new Thread[threadCount]; + final Exception[] exceptions = new Exception[threadCount]; + final int[] sizes = new int[threadCount]; + final boolean[][] containsResults = new boolean[threadCount][10]; + + // Multiple threads read from deleteUrlList + for (int i = 0; i < threadCount; i++) { + final int threadIndex = i; + threads[i] = new Thread(() -> { + try { + synchronized (mockCallback) { + sizes[threadIndex] = callback.deleteUrlList.size(); + for (int j = 0; j < 10; j++) { + containsResults[threadIndex][j] = callback.deleteUrlList.contains("http://example.com/doc" + j); + } + } + } catch (Exception e) { + exceptions[threadIndex] = e; + } + }); + } + + // Start all threads + for (Thread thread : threads) { + thread.start(); + } + + // Wait for all threads to complete + for (Thread thread : threads) { + thread.join(); + } + + // Verify no exceptions occurred + for (int i = 0; i < threadCount; i++) { + assertNull("Thread " + i + " threw exception", exceptions[i]); + assertEquals("Thread " + i + " should see correct size", 100, sizes[i]); + for (int j = 0; j < 10; j++) { + assertTrue("Thread " + i + " should find doc" + j, containsResults[i][j]); + } + } + } + + /** + * Test that clear operation on deleteUrlList is thread-safe. + */ + public void test_deleteUrlList_clearOperation() throws Exception { + IndexUpdateCallback mockCallback = new IndexUpdateCallback() { + @Override + public void store(DataStoreParams paramMap, Map dataMap) { + } + + @Override + public long getDocumentSize() { + return 0; + } + + @Override + public long getExecuteTime() { + return 0; + } + + @Override + public void commit() { + } + }; + + FileListIndexUpdateCallbackImpl callback = new FileListIndexUpdateCallbackImpl(mockCallback, null, 1); + + final int iterations = 10; + final Thread[] threads = new Thread[2]; + final Exception[] exceptions = new Exception[2]; + + // One thread adds, another clears + threads[0] = new Thread(() -> { + try { + for (int i = 0; i < iterations; i++) { + synchronized (mockCallback) { + callback.deleteUrlList.add("http://example.com/doc" + i); + } + Thread.yield(); + } + } catch (Exception e) { + exceptions[0] = e; + } + }); + + threads[1] = new Thread(() -> { + try { + for (int i = 0; i < iterations; i++) { + synchronized (mockCallback) { + callback.deleteUrlList.clear(); + } + Thread.yield(); + } + } catch (Exception e) { + exceptions[1] = e; + } + }); + + // Start both threads + threads[0].start(); + threads[1].start(); + + // Wait for completion + threads[0].join(); + threads[1].join(); + + // Verify no exceptions occurred (the main goal is no ConcurrentModificationException) + assertNull("Add thread threw exception", exceptions[0]); + assertNull("Clear thread threw exception", exceptions[1]); + } + + /** + * Test iteration over deleteUrlList while synchronized. + * This simulates what happens in deleteDocuments() method. + */ + public void test_deleteUrlList_iteration() throws Exception { + IndexUpdateCallback mockCallback = new IndexUpdateCallback() { + @Override + public void store(DataStoreParams paramMap, Map dataMap) { + } + + @Override + public long getDocumentSize() { + return 0; + } + + @Override + public long getExecuteTime() { + return 0; + } + + @Override + public void commit() { + } + }; + + FileListIndexUpdateCallbackImpl callback = new FileListIndexUpdateCallbackImpl(mockCallback, null, 1); + + // Pre-populate + synchronized (mockCallback) { + for (int i = 0; i < 50; i++) { + callback.deleteUrlList.add("http://example.com/doc" + i); + } + } + + final int threadCount = 5; + final Thread[] threads = new Thread[threadCount]; + final Exception[] exceptions = new Exception[threadCount]; + final int[][] counts = new int[threadCount][1]; + + // Multiple threads iterate over the list + for (int i = 0; i < threadCount; i++) { + final int threadIndex = i; + threads[i] = new Thread(() -> { + try { + synchronized (mockCallback) { + for (String url : callback.deleteUrlList) { + counts[threadIndex][0]++; + // Simulate some processing + assertNotNull(url); + } + } + } catch (Exception e) { + exceptions[threadIndex] = e; + } + }); + } + + // Start all threads + for (Thread thread : threads) { + thread.start(); + } + + // Wait for completion + for (Thread thread : threads) { + thread.join(); + } + + // Verify no exceptions occurred + for (int i = 0; i < threadCount; i++) { + assertNull("Thread " + i + " threw exception", exceptions[i]); + assertEquals("Thread " + i + " should iterate over all URLs", 50, counts[i][0]); + } + } + + /** + * Test isEmpty() check on deleteUrlList with concurrent access. + * This simulates the check in commit() method. + */ + public void test_deleteUrlList_isEmptyCheck() throws Exception { + IndexUpdateCallback mockCallback = new IndexUpdateCallback() { + @Override + public void store(DataStoreParams paramMap, Map dataMap) { + } + + @Override + public long getDocumentSize() { + return 0; + } + + @Override + public long getExecuteTime() { + return 0; + } + + @Override + public void commit() { + } + }; + + FileListIndexUpdateCallbackImpl callback = new FileListIndexUpdateCallbackImpl(mockCallback, null, 1); + + final int threadCount = 10; + final Thread[] threads = new Thread[threadCount]; + final Exception[] exceptions = new Exception[threadCount]; + + // Threads alternate between adding and checking isEmpty + for (int i = 0; i < threadCount; i++) { + final int threadIndex = i; + threads[i] = new Thread(() -> { + try { + for (int j = 0; j < 100; j++) { + synchronized (mockCallback) { + if (threadIndex % 2 == 0) { + callback.deleteUrlList.add("http://example.com/doc" + threadIndex + "_" + j); + } else { + boolean empty = callback.deleteUrlList.isEmpty(); + // Just verify the check doesn't throw exception + if (!empty) { + assertTrue(callback.deleteUrlList.size() > 0); + } + } + } + } + } catch (Exception e) { + exceptions[threadIndex] = e; + } + }); + } + + // Start all threads + for (Thread thread : threads) { + thread.start(); + } + + // Wait for completion + for (Thread thread : threads) { + thread.join(); + } + + // Verify no exceptions occurred + for (int i = 0; i < threadCount; i++) { + assertNull("Thread " + i + " threw exception", exceptions[i]); + } + } } diff --git a/src/test/java/org/codelibs/fess/helper/ProcessHelperTest.java b/src/test/java/org/codelibs/fess/helper/ProcessHelperTest.java index 1c41a2b66..cfd67fada 100644 --- a/src/test/java/org/codelibs/fess/helper/ProcessHelperTest.java +++ b/src/test/java/org/codelibs/fess/helper/ProcessHelperTest.java @@ -149,10 +149,9 @@ public void test_startProcess_invalidCommand() { public void test_startProcess_replaceExistingProcess() { String sessionId = "test_replace"; - // Use 'cat' which waits for input, making it a long-running process - // This is more reliable than 'echo' which completes immediately - List cmdList1 = Arrays.asList("cat"); - List cmdList2 = Arrays.asList("cat"); + // Use sleep commands that run longer so we can verify they're running + List cmdList1 = Arrays.asList("sleep", "10"); + List cmdList2 = Arrays.asList("sleep", "10"); Consumer pbCall = pb -> { pb.redirectErrorStream(true); }; @@ -162,31 +161,21 @@ public void test_startProcess_replaceExistingProcess() { JobProcess jobProcess1 = processHelper.startProcess(sessionId, cmdList1, pbCall); assertNotNull(jobProcess1); - // Poll for process to be running (max 50 times, 100ms interval = 5 seconds max) - boolean isRunning = false; - for (int i = 0; i < 50; i++) { - if (processHelper.isProcessRunning(sessionId)) { - isRunning = true; - break; - } - Thread.sleep(100); - } - assertTrue("First process did not become running within timeout", isRunning); + // Wait for first process to be running + Thread.sleep(100); + assertTrue(processHelper.isProcessRunning(sessionId)); // Start second process with same session ID (should replace first) JobProcess jobProcess2 = processHelper.startProcess(sessionId, cmdList2, pbCall); assertNotNull(jobProcess2); - // Poll for the replacement process to be running - isRunning = false; - for (int i = 0; i < 50; i++) { - if (processHelper.isProcessRunning(sessionId)) { - isRunning = true; - break; - } - Thread.sleep(100); - } - assertTrue("Replacement process did not become running within timeout", isRunning); + // Wait for second process to be running + Thread.sleep(100); + assertTrue(processHelper.isProcessRunning(sessionId)); + + // Verify we still have only one process for this session + Set sessionIds = processHelper.getRunningSessionIdSet(); + assertEquals(1, sessionIds.stream().filter(id -> id.equals(sessionId)).count()); // Clean up processHelper.destroyProcess(sessionId); @@ -230,7 +219,8 @@ public void test_multipleProcesses() { public void test_destroyProcess_withRunningProcess() { String sessionId = "test_destroy"; - List cmdList = Arrays.asList("echo", "hello"); + // Use sleep command that runs longer so we can verify it's running + List cmdList = Arrays.asList("sleep", "10"); Consumer pbCall = pb -> { pb.redirectErrorStream(true); }; @@ -239,26 +229,18 @@ public void test_destroyProcess_withRunningProcess() { JobProcess jobProcess = processHelper.startProcess(sessionId, cmdList, pbCall); assertNotNull(jobProcess); - // Poll for process to be running (max 50 times, 100ms interval) - boolean isRunning = false; - for (int i = 0; i < 50; i++) { - if (processHelper.isProcessRunning(sessionId)) { - isRunning = true; - break; - } - Thread.sleep(100); - } - assertTrue("Process did not become running within timeout", isRunning); + // Wait for process to start + Thread.sleep(100); - // Wait a bit for the process to start - Thread.sleep(50); + // Verify process is running + assertTrue("Process did not become running within timeout", processHelper.isProcessRunning(sessionId)); // Destroy the process int exitCode = processHelper.destroyProcess(sessionId); assertFalse(processHelper.isProcessRunning(sessionId)); - // Exit code should be 0 or -1 depending on timing - assertTrue(exitCode == 0 || exitCode == -1); + // Exit code should be non-zero for forcibly destroyed process, or -1 + assertTrue(exitCode != 0 || exitCode == -1); } catch (Exception e) { fail("Unexpected exception: " + e.getMessage()); } @@ -462,453 +444,4 @@ public void test_nullCallback() { } } - // ========== Race Condition and Concurrent Access Tests ========== - - public void test_concurrentStartProcess_sameSessionId() throws Exception { - String sessionId = "test_concurrent_start"; - List cmdList = Arrays.asList("sleep", "0.5"); - Consumer pbCall = pb -> pb.redirectErrorStream(true); - - // Create multiple threads that try to start process with same sessionId - final int threadCount = 3; - final Thread[] threads = new Thread[threadCount]; - final JobProcess[] results = new JobProcess[threadCount]; - final Exception[] exceptions = new Exception[threadCount]; - - try { - for (int i = 0; i < threadCount; i++) { - final int index = i; - threads[i] = new Thread(() -> { - try { - results[index] = processHelper.startProcess(sessionId, cmdList, pbCall); - } catch (Exception e) { - exceptions[index] = e; - } - }); - } - - // Start all threads simultaneously - for (Thread thread : threads) { - thread.start(); - } - - // Wait for all threads to complete - for (Thread thread : threads) { - thread.join(3000); - } - - // Verify that only one process is running for the session - Set sessionIds = processHelper.getRunningSessionIdSet(); - assertTrue(sessionIds.contains(sessionId) || sessionIds.isEmpty()); - - // At most one session should be running - assertTrue(sessionIds.size() <= 1); - } finally { - // Ensure cleanup - processHelper.destroyProcess(sessionId); - } - } - - public void test_concurrentStartProcess_differentSessionIds() throws Exception { - List cmdList = Arrays.asList("sleep", "0.5"); - Consumer pbCall = pb -> pb.redirectErrorStream(true); - - final int threadCount = 3; - final Thread[] threads = new Thread[threadCount]; - final String[] sessionIds = new String[threadCount]; - - try { - for (int i = 0; i < threadCount; i++) { - sessionIds[i] = "test_concurrent_" + i; - final String sessionId = sessionIds[i]; - threads[i] = new Thread(() -> { - try { - processHelper.startProcess(sessionId, cmdList, pbCall); - } catch (Exception e) { - // Ignore exceptions for this test - } - }); - } - - // Start all threads simultaneously - for (Thread thread : threads) { - thread.start(); - } - - // Wait for all threads to complete - for (Thread thread : threads) { - thread.join(3000); - } - - // Verify that processes were created (some may have already completed) - Set runningSessionIds = processHelper.getRunningSessionIdSet(); - // At least some processes should be running or have run - assertTrue(runningSessionIds.size() <= threadCount); - } finally { - // Clean up all processes - for (String sessionId : sessionIds) { - processHelper.destroyProcess(sessionId); - } - } - } - - public void test_concurrentDestroyProcess() throws Exception { - String sessionId = "test_concurrent_destroy"; - List cmdList = Arrays.asList("sleep", "1"); - Consumer pbCall = pb -> pb.redirectErrorStream(true); - - try { - // Start a process - processHelper.startProcess(sessionId, cmdList, pbCall); - Thread.sleep(50); // Give it time to start - - // Create multiple threads that try to destroy the same process - final int threadCount = 3; - final Thread[] threads = new Thread[threadCount]; - final int[] exitCodes = new int[threadCount]; - - for (int i = 0; i < threadCount; i++) { - final int index = i; - threads[i] = new Thread(() -> { - exitCodes[index] = processHelper.destroyProcess(sessionId); - }); - } - - // Start all threads simultaneously - for (Thread thread : threads) { - thread.start(); - } - - // Wait for all threads to complete - for (Thread thread : threads) { - thread.join(5000); - } - - // Verify process is no longer running - assertFalse(processHelper.isProcessRunning(sessionId)); - - // Test passes if no exceptions occurred - assertTrue(true); - } finally { - // Ensure cleanup - processHelper.destroyProcess(sessionId); - } - } - - public void test_sendCommand_whileProcessTerminating() throws Exception { - String sessionId = "test_send_terminating"; - List cmdList = Arrays.asList("cat"); // cat reads from stdin - Consumer pbCall = pb -> pb.redirectErrorStream(true); - - try { - // Start a process - JobProcess jobProcess = processHelper.startProcess(sessionId, cmdList, pbCall); - Thread.sleep(50); // Give it time to start - - // Create a thread that destroys the process - Thread destroyThread = new Thread(() -> { - try { - Thread.sleep(50); - processHelper.destroyProcess(sessionId); - } catch (Exception e) { - // Ignore - } - }); - - // Create a thread that sends commands - final Exception[] sendException = new Exception[1]; - Thread sendThread = new Thread(() -> { - for (int i = 0; i < 5; i++) { - try { - processHelper.sendCommand(sessionId, "test command " + i); - Thread.sleep(20); - } catch (Exception e) { - sendException[0] = e; - break; - } - } - }); - - sendThread.start(); - destroyThread.start(); - - sendThread.join(3000); - destroyThread.join(3000); - - // Either the send succeeded or we got an expected exception - if (sendException[0] != null) { - assertTrue(sendException[0] instanceof JobNotFoundException || sendException[0] instanceof JobProcessingException); - } - } finally { - // Ensure cleanup - processHelper.destroyProcess(sessionId); - } - } - - public void test_sendCommand_toDeadProcess() throws Exception { - String sessionId = "test_send_dead"; - List cmdList = Arrays.asList("echo", "test"); // Completes quickly - Consumer pbCall = pb -> pb.redirectErrorStream(true); - - // Start a process that will complete quickly - JobProcess jobProcess = processHelper.startProcess(sessionId, cmdList, pbCall); - - // Wait for process to complete - Thread.sleep(200); - - // Try to send command to dead process - try { - processHelper.sendCommand(sessionId, "test command"); - // If the process is still in the map but dead, we should get an exception - // If it's not in the map, we'll also get an exception - // Either way, the test verifies proper error handling - } catch (JobNotFoundException e) { - // Expected - assertTrue(e.getMessage().contains(sessionId)); - } - - // Clean up - processHelper.destroyProcess(sessionId); - } - - public void test_streamCloseTimeout_configuration() { - // Test that stream close timeout can be configured - processHelper.setStreamCloseTimeout(5); - processHelper.setStreamCloseTimeout(15); - processHelper.setStreamCloseTimeout(30); - - // Verify no exceptions are thrown - assertTrue(true); - } - - public void test_rapidStartAndDestroy() throws Exception { - String sessionId = "test_rapid"; - List cmdList = Arrays.asList("echo", "test"); - Consumer pbCall = pb -> pb.redirectErrorStream(true); - - // Rapidly start and destroy processes - for (int i = 0; i < 10; i++) { - try { - processHelper.startProcess(sessionId, cmdList, pbCall); - Thread.sleep(10); - processHelper.destroyProcess(sessionId); - } catch (Exception e) { - // Some timing-related exceptions are acceptable - // Just ensure the test doesn't crash - } - } - - // Verify no processes are left running - Thread.sleep(100); - assertFalse(processHelper.isProcessRunning(sessionId)); - } - - public void test_isProcessRunning_withConcurrentModification() throws Exception { - final String sessionId = "test_concurrent_check"; - List cmdList = Arrays.asList("sleep", "0.5"); - Consumer pbCall = pb -> pb.redirectErrorStream(true); - - try { - // Start a process - processHelper.startProcess(sessionId, cmdList, pbCall); - - // Create threads that check if process is running while another thread destroys it - final boolean[] results = new boolean[50]; - Thread checkThread = new Thread(() -> { - for (int i = 0; i < 50; i++) { - results[i] = processHelper.isProcessRunning(sessionId); - try { - Thread.sleep(5); - } catch (InterruptedException e) { - break; - } - } - }); - - Thread destroyThread = new Thread(() -> { - try { - Thread.sleep(100); - processHelper.destroyProcess(sessionId); - } catch (Exception e) { - // Ignore - } - }); - - checkThread.start(); - destroyThread.start(); - - checkThread.join(3000); - destroyThread.join(3000); - - // Test passes if no exceptions occurred - assertTrue(true); - } finally { - // Ensure cleanup - processHelper.destroyProcess(sessionId); - } - } - - public void test_getRunningSessionIdSet_withConcurrentModification() throws Exception { - List cmdList = Arrays.asList("sleep", "0.5"); - Consumer pbCall = pb -> pb.redirectErrorStream(true); - - final int processCount = 3; - try { - // Start multiple processes - for (int i = 0; i < processCount; i++) { - processHelper.startProcess("test_session_" + i, cmdList, pbCall); - } - - // Create threads that iterate over session IDs while others are being destroyed - Thread iterateThread = new Thread(() -> { - for (int i = 0; i < 30; i++) { - Set sessionIds = processHelper.getRunningSessionIdSet(); - for (String sessionId : sessionIds) { - // Just iterate - } - try { - Thread.sleep(10); - } catch (InterruptedException e) { - break; - } - } - }); - - Thread destroyThread = new Thread(() -> { - try { - Thread.sleep(50); - for (int i = 0; i < processCount; i++) { - processHelper.destroyProcess("test_session_" + i); - Thread.sleep(20); - } - } catch (Exception e) { - // Ignore - } - }); - - iterateThread.start(); - destroyThread.start(); - - iterateThread.join(3000); - destroyThread.join(3000); - - // Test passes if no ConcurrentModificationException occurred - assertTrue(true); - } finally { - // Ensure cleanup of all processes - for (int i = 0; i < processCount; i++) { - processHelper.destroyProcess("test_session_" + i); - } - } - } - - public void test_multipleCommandsSent_concurrently() throws Exception { - String sessionId = "test_multi_commands"; - List cmdList = Arrays.asList("cat"); // cat reads from stdin - Consumer pbCall = pb -> pb.redirectErrorStream(true); - - try { - // Start a process - processHelper.startProcess(sessionId, cmdList, pbCall); - Thread.sleep(50); - - // Create multiple threads that send commands - final int threadCount = 3; - final Thread[] threads = new Thread[threadCount]; - final Exception[] exceptions = new Exception[threadCount]; - - for (int i = 0; i < threadCount; i++) { - final int index = i; - threads[i] = new Thread(() -> { - try { - for (int j = 0; j < 3; j++) { - processHelper.sendCommand(sessionId, "command " + index + "_" + j); - Thread.sleep(10); - } - } catch (Exception e) { - exceptions[index] = e; - } - }); - } - - // Start all threads - for (Thread thread : threads) { - thread.start(); - } - - // Wait for all threads to complete - for (Thread thread : threads) { - thread.join(3000); - } - - // Verify no unexpected exceptions occurred - for (Exception e : exceptions) { - if (e != null) { - // JobProcessingException is acceptable if process was destroyed - assertTrue(e instanceof JobProcessingException || e instanceof JobNotFoundException); - } - } - } finally { - // Clean up - processHelper.destroyProcess(sessionId); - } - } - - public void test_startProcess_replacesOldProcess_noDeadlock() throws Exception { - String sessionId = "test_replace_no_deadlock"; - List cmdList1 = Arrays.asList("sleep", "1"); - List cmdList2 = Arrays.asList("sleep", "1"); - Consumer pbCall = pb -> pb.redirectErrorStream(true); - - try { - // Start first process - JobProcess firstProcess = processHelper.startProcess(sessionId, cmdList1, pbCall); - assertNotNull(firstProcess); - Thread.sleep(50); - - // Start second process with same sessionId - should not deadlock - long startTime = System.currentTimeMillis(); - JobProcess secondProcess = processHelper.startProcess(sessionId, cmdList2, pbCall); - long endTime = System.currentTimeMillis(); - - // Should complete quickly (not deadlock) - assertTrue(endTime - startTime < 3000); - assertNotNull(secondProcess); - } finally { - // Clean up - processHelper.destroyProcess(sessionId); - } - } - - public void test_destroyNonExistentProcess_multipleThreads() throws Exception { - final String sessionId = "test_destroy_nonexistent"; - - // Create multiple threads that try to destroy a non-existent process - final int threadCount = 5; - final Thread[] threads = new Thread[threadCount]; - final int[] exitCodes = new int[threadCount]; - - for (int i = 0; i < threadCount; i++) { - final int index = i; - threads[i] = new Thread(() -> { - exitCodes[index] = processHelper.destroyProcess(sessionId); - }); - } - - // Start all threads - for (Thread thread : threads) { - thread.start(); - } - - // Wait for all threads to complete - for (Thread thread : threads) { - thread.join(1000); - } - - // All should return -1 - for (int exitCode : exitCodes) { - assertEquals(-1, exitCode); - } - } - } \ No newline at end of file