Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,19 @@ public Mutex provideReadLock(CuratorFramework curatorFramework) {

/**
* Factory method for Elasticsearch client.
*
* TIMEOUT CONFIGURATION: This method configures the ES REST client with HTTP-level timeouts:
* - connectTimeout: Time to establish connection (default 6 seconds from EsConfig)
* - socketTimeout: Time to wait for data after connection (default 100 seconds from EsConfig)
*
* IMPORTANT: These are HTTP client timeouts, not Elasticsearch query timeouts. The socketTimeout
* determines how long the client will wait for a response from ES, but it does NOT limit how long
* ES will execute a query. If ES is slow or overloaded, queries can take longer than expected.
*
* POTENTIAL HANG POINT: Individual search/count requests made with this client do not set explicit
* query-level timeouts (SearchSourceBuilder.timeout()). This means slow ES queries will only be
* constrained by the socketTimeout, which may not be sufficient for detecting and handling slow
* queries gracefully. Consider adding explicit query timeouts in the calling code.
*/
public static RestHighLevelClient esClient(WorkflowConfiguration workflowConfiguration) {
EsConfig esConfig = EsConfig.fromProperties(workflowConfiguration.getDownloadSettings(), ES_PREFIX);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public class DownloadEsClient implements Closeable {
/**
* Executes the ElasticSearch query and returns the number of records found. If an error occurs
* 'ERROR_COUNT' is returned.
*
* POTENTIAL HANG POINT: This count request does not set an explicit query-level timeout.
* It relies solely on the HTTP client's socketTimeout (default 100 seconds from EsConfig).
* If Elasticsearch is slow or overloaded, this call could hang for up to socketTimeout duration.
* Consider adding: new CountRequest().timeout(TimeValue) to set an explicit query timeout.
*/
@SneakyThrows
public long getRecordCount(Predicate predicate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,17 @@ public static boolean isSmallDownloadCount(
+ ES_COUNT_MARGIN_ERROR;
}

/**
* Gets the record count for the download.
* If the download already has a total records count set, returns that value.
* Otherwise, queries Elasticsearch to get an accurate count.
*
* POTENTIAL HANG POINT: The call to downloadEsClient.getRecordCount() can hang if ES is slow.
* This is often the "calling ES" log message that appears before a hang, as reported in issue
* for small downloads. The ES count request has no explicit query timeout and relies on the
* HTTP client socketTimeout (default 100 seconds). If ES is overloaded, this can appear as
* a hang at the end of the Spark job initialization.
*/
private long recordCount(Download download) {
// if set, dont recalculate
if (download.getTotalRecords() > 0){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,13 @@ private Lock getLock() {

/**
* Executes a search and get number of records.
*
* POTENTIAL HANG POINT: This search request does not set an explicit query-level timeout.
* It relies solely on the HTTP client's socketTimeout (default 100 seconds from EsConfig).
* If Elasticsearch is slow or overloaded, this call could hang for up to socketTimeout duration.
* Since this is called during actor initialization (runActors method), a hang here will prevent
* the download from starting and may appear as if the job is stuck "calling ES".
* Consider adding: searchSourceBuilder.timeout(TimeValue) to set an explicit query timeout.
*/
private Long getSearchCount(String query) {
try {
Expand Down Expand Up @@ -290,7 +297,15 @@ private void runActors(Start start) {
*/
public static class Start { }

/** Creates an instance of the download actor/job to be used. */
/**
* Creates an instance of the download actor/job to be used.
*
* THREADING NOTE: The returned Props are used with RoundRobinPool to create multiple actor
* instances that process downloads concurrently. Each actor shares the same SearchHitConverter
* and OccurrenceEsResponseParser instances. The SearchHitConverter.mapTerm() method must be
* thread-safe to avoid ConcurrentModificationException when multiple actors process verbatim
* fields simultaneously.
*/
private Props createDownloadActor() {

DownloadFormat downloadFormat = jobConfiguration.getDownloadFormat();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ public SearchQueryProcessor(EsResponseParser<T, P> esResponseParser) {
*
* @param downloadFileWork it's used to determine how to page through the results and the search query to be used
* @param resultHandler predicate that process each result, receives as parameter the occurrence key
*
* POTENTIAL HANG POINT: This method makes multiple ES search requests in a loop without explicit
* query-level timeouts. Each search relies on the HTTP client's socketTimeout (default 100 seconds).
* If Elasticsearch becomes slow or overloaded during download processing, each iteration could hang
* for up to socketTimeout duration. For large downloads with many iterations, this compounds the issue.
* Consider adding: searchSourceBuilder.timeout(TimeValue) to set an explicit query timeout.
*/
public void processQuery(DownloadFileWork downloadFileWork, Consumer<T> resultHandler) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,13 +335,19 @@ protected Optional<String> extractStringValue(Map<String, Object> fields, String
/**
* Re-maps terms to handle Unknown terms. This has to be done because Pipelines preserve Unknown
* terms and do not add the URI for unknown terms.
*
* THREAD SAFETY: TermFactory.findTerm() can modify internal HashMap via addTerm() when creating
* unknown terms. In multi-threaded contexts (e.g., Akka actors with RoundRobinPool), this causes
* ConcurrentModificationException. Synchronization on TERM_FACTORY ensures thread-safe access.
*/
protected static Term mapTerm(String verbatimTerm) {
Term term = TERM_FACTORY.findTerm(verbatimTerm);
if (term instanceof UnknownTerm) {
return UnknownTerm.build(term.simpleName(), false);
synchronized (TERM_FACTORY) {
Term term = TERM_FACTORY.findTerm(verbatimTerm);
if (term instanceof UnknownTerm) {
return UnknownTerm.build(term.simpleName(), false);
}
return term;
}
return term;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,15 @@ public class EsConfig {
private static final Splitter SPLITTER = Splitter.on(",");

// defaults
private static final int CONNECT_TIMEOUT_DEFAULT = 6000;
private static final int SOCKET_TIMEOUT_DEFAULT = 100000;
// TIMEOUT NOTE: connectTimeout is the time to establish a connection to ES server
private static final int CONNECT_TIMEOUT_DEFAULT = 6000; // 6 seconds

// TIMEOUT NOTE: socketTimeout is the time to wait for data after connection is established.
// This is the timeout for the entire HTTP request/response cycle, NOT the query execution time.
// If an ES query takes longer than this to return results, the request will timeout.
// Default of 100 seconds may not be sufficient for slow queries or overloaded ES clusters.
private static final int SOCKET_TIMEOUT_DEFAULT = 100000; // 100 seconds

private static final int SNIFF_INTERVAL_DEFAULT = -1;
private static final int SNIFF_AFTER_FAILURE_DELAY_DEFAULT = 60000;

Expand Down