Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions src/main/java/org/codelibs/fess/ds/AbstractDataStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,6 @@
*/
public abstract class AbstractDataStore implements DataStore {

/**
* Default constructor.
*/
public AbstractDataStore() {
// nothing
}

private static final Logger logger = LogManager.getLogger(AbstractDataStore.class);

/**
Expand All @@ -64,8 +57,16 @@ public AbstractDataStore() {

/**
* The flag to check if the data store is alive.
* Volatile to ensure visibility across threads.
*/
protected boolean alive = true;
protected volatile boolean alive = true;

/**
* Default constructor.
*/
public AbstractDataStore() {
// nothing
}

/**
* Register this data store.
Expand Down
16 changes: 13 additions & 3 deletions src/main/java/org/codelibs/fess/ds/DataStoreFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,14 @@ public class DataStoreFactory {
* Cached array of available data store names discovered from plugin JAR files.
* This cache is refreshed periodically based on the lastLoadedTime.
*/
protected String[] dataStoreNames = StringUtil.EMPTY_STRINGS;
protected volatile String[] dataStoreNames = StringUtil.EMPTY_STRINGS;

/**
* Timestamp of the last time data store names were loaded from plugin files.
* Used to implement a time-based cache refresh mechanism.
* Volatile to ensure visibility across threads.
*/
protected long lastLoadedTime = 0;
protected volatile long lastLoadedTime = 0;

/**
* Creates a new instance of DataStoreFactory.
Expand Down Expand Up @@ -130,7 +131,7 @@ public DataStore getDataStore(final String name) {
*
* @return array of data store names sorted alphabetically, never null
*/
public String[] getDataStoreNames() {
public synchronized String[] getDataStoreNames() {
final long now = ComponentUtil.getSystemHelper().getCurrentTimeAsLong();
if (now - lastLoadedTime > 60000L) {
final List<String> nameList = loadDataStoreNameList();
Expand All @@ -154,9 +155,18 @@ public String[] getDataStoreNames() {
protected List<String> loadDataStoreNameList() {
final Set<String> nameSet = new HashSet<>();
final File[] jarFiles = ResourceUtil.getPluginJarFiles(PluginHelper.ArtifactType.DATA_STORE.getId());
if (jarFiles == null) {
return nameSet.stream().sorted().collect(Collectors.toList());
}
for (final File jarFile : jarFiles) {
try (FileSystem fs = FileSystems.newFileSystem(jarFile.toPath(), ClassLoader.getSystemClassLoader())) {
final Path xmlPath = fs.getPath("fess_ds++.xml");
if (!Files.exists(xmlPath)) {
if (logger.isDebugEnabled()) {
logger.debug("Configuration file not found in {}", jarFile.getAbsolutePath());
}
continue;
}
try (InputStream is = Files.newInputStream(xmlPath)) {
final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
factory.setFeature(org.codelibs.fess.crawler.Constants.FEATURE_SECURE_PROCESSING, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.poi.util.StringUtil;
import org.codelibs.core.lang.StringUtil;
import org.codelibs.fess.Constants;
import org.codelibs.fess.crawler.builder.RequestDataBuilder;
import org.codelibs.fess.crawler.client.CrawlerClient;
Expand Down Expand Up @@ -89,8 +89,11 @@ public class FileListIndexUpdateCallbackImpl implements IndexUpdateCallback {
/** Factory for creating crawler clients to handle different URL schemes. */
protected CrawlerClientFactory crawlerClientFactory;

/** List of URLs to be deleted, cached for batch processing. */
protected List<String> deleteUrlList = new ArrayList<>(100);
/**
* List of URLs to be deleted, cached for batch processing.
* All access is synchronized via indexUpdateCallback lock.
*/
protected List<String> deleteUrlList = new ArrayList<>();

/** Maximum size of the delete URL cache before batch deletion is triggered. */
protected int maxDeleteDocumentCacheSize;
Expand Down Expand Up @@ -585,8 +588,10 @@ public void commit() {
executor.shutdownNow();
}

if (!deleteUrlList.isEmpty()) {
deleteDocuments();
synchronized (indexUpdateCallback) {
if (!deleteUrlList.isEmpty()) {
deleteDocuments();
}
}
indexUpdateCallback.commit();
}
Expand Down
231 changes: 231 additions & 0 deletions src/test/java/org/codelibs/fess/ds/AbstractDataStoreTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,235 @@ public void test_convertValue() {
value = " ";
assertNull(dataStore.convertValue(Constants.DEFAULT_SCRIPT, value, paramMap));
}

// ========== Thread Safety Tests ==========

/**
* Test that the volatile alive field is visible across threads.
* One thread sets alive to false, other threads should see the change immediately.
*/
public void test_aliveField_volatileVisibility() throws Exception {
// Ensure alive starts as true
assertTrue(dataStore.alive);

final int readerThreadCount = 10;
final Thread[] readerThreads = new Thread[readerThreadCount];
final boolean[][] observations = new boolean[readerThreadCount][100];

// Start reader threads that continuously check alive field
for (int i = 0; i < readerThreadCount; i++) {
final int threadIndex = i;
readerThreads[i] = new Thread(() -> {
for (int j = 0; j < 100; j++) {
observations[threadIndex][j] = dataStore.alive;
// Small yield to allow context switching
Thread.yield();
}
});
readerThreads[i].start();
}

// Writer thread sets alive to false
Thread writerThread = new Thread(() -> {
dataStore.alive = false;
});
writerThread.start();
writerThread.join();

// Wait for all reader threads to complete
for (Thread thread : readerThreads) {
thread.join();
}

// Verify that alive was changed to false
assertFalse("alive should be false after writer thread", dataStore.alive);

// At least some observations from reader threads should have seen false
// due to volatile ensuring visibility
int falseCount = 0;
for (int i = 0; i < readerThreadCount; i++) {
for (int j = 0; j < 100; j++) {
if (!observations[i][j]) {
falseCount++;
}
}
}
assertTrue("Some threads should have observed alive=false", falseCount > 0);
}

/**
* Test stop() method sets alive to false and is visible to other threads.
*/
public void test_stop_volatileVisibility() throws Exception {
assertTrue(dataStore.alive);

final boolean[] observedValues = new boolean[10];
final Thread[] threads = new Thread[10];

// Call stop() in main thread
dataStore.stop();

// Multiple threads read the alive field
for (int i = 0; i < 10; i++) {
final int index = i;
threads[i] = new Thread(() -> {
observedValues[index] = dataStore.alive;
});
}

for (Thread thread : threads) {
thread.start();
}

for (Thread thread : threads) {
thread.join();
}

// All threads should observe alive=false due to volatile
for (int i = 0; i < 10; i++) {
assertFalse("Thread " + i + " should see alive=false", observedValues[i]);
}
}

/**
* Test concurrent access to stop() method.
* Multiple threads call stop() simultaneously - should be safe.
*/
public void test_stop_concurrentAccess() throws Exception {
final int threadCount = 10;
final Thread[] threads = new Thread[threadCount];
final Exception[] exceptions = new Exception[threadCount];

for (int i = 0; i < threadCount; i++) {
final int index = i;
threads[i] = new Thread(() -> {
try {
dataStore.stop();
} catch (Exception e) {
exceptions[index] = e;
}
});
}

// Start all threads
for (Thread thread : threads) {
thread.start();
}

// Wait for all threads to complete
for (Thread thread : threads) {
thread.join();
}

// Verify no exceptions occurred
for (int i = 0; i < threadCount; i++) {
assertNull("Thread " + i + " threw exception", exceptions[i]);
}

// Verify alive is false
assertFalse("alive should be false after all stop() calls", dataStore.alive);
}

/**
* Test that multiple threads can safely read alive field while one writes.
*/
public void test_aliveField_concurrentReadWrite() throws Exception {
dataStore.alive = true;

final int readerCount = 5;
final int iterations = 1000;
final Thread[] readers = new Thread[readerCount];
final Exception[] exceptions = new Exception[readerCount + 1];
final int[] trueCount = new int[readerCount];
final int[] falseCount = new int[readerCount];

// Start reader threads
for (int i = 0; i < readerCount; i++) {
final int index = i;
readers[i] = new Thread(() -> {
try {
for (int j = 0; j < iterations; j++) {
if (dataStore.alive) {
trueCount[index]++;
} else {
falseCount[index]++;
}
}
} catch (Exception e) {
exceptions[index] = e;
}
});
}

// Start writer thread that toggles alive
Thread writer = new Thread(() -> {
try {
for (int i = 0; i < 100; i++) {
dataStore.alive = !dataStore.alive;
Thread.yield();
}
} catch (Exception e) {
exceptions[readerCount] = e;
}
});

// Start all threads
for (Thread reader : readers) {
reader.start();
}
writer.start();

// Wait for completion
for (Thread reader : readers) {
reader.join();
}
writer.join();

// Verify no exceptions
for (int i = 0; i <= readerCount; i++) {
assertNull("Thread " + i + " threw exception", exceptions[i]);
}

// Verify all readers completed all iterations
for (int i = 0; i < readerCount; i++) {
assertEquals("Reader " + i + " should complete all iterations", iterations, trueCount[i] + falseCount[i]);
}
}

/**
* Test getName() method can be called concurrently without issues.
*/
public void test_getName_concurrentAccess() throws Exception {
final int threadCount = 10;
final Thread[] threads = new Thread[threadCount];
final String[] results = new String[threadCount];
final Exception[] exceptions = new Exception[threadCount];

for (int i = 0; i < threadCount; i++) {
final int index = i;
threads[i] = new Thread(() -> {
try {
for (int j = 0; j < 100; j++) {
results[index] = dataStore.getName();
}
} catch (Exception e) {
exceptions[index] = e;
}
});
}

for (Thread thread : threads) {
thread.start();
}

for (Thread thread : threads) {
thread.join();
}

// Verify no exceptions
for (int i = 0; i < threadCount; i++) {
assertNull("Thread " + i + " threw exception", exceptions[i]);
assertEquals("Thread " + i + " got wrong name", "Test", results[i]);
}
}
}
Loading