Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package org.hypertrace.core.documentstore;

import static java.util.Collections.emptyList;
import static org.hypertrace.core.documentstore.model.config.ConnectionConfig.DEFAULT_CONNECTION_TIMEOUT;
import static org.hypertrace.core.documentstore.model.config.ConnectionConfig.DEFAULT_QUERY_TIMEOUT;

import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.typesafe.config.Config;
import java.time.Duration;
import java.util.Collections;
import org.hypertrace.core.documentstore.model.config.AggregatePipelineMode;
import org.hypertrace.core.documentstore.model.config.DatabaseType;
Expand Down Expand Up @@ -35,7 +36,8 @@ public DatastoreConfig convert(final Config config) {
null,
AggregatePipelineMode.DEFAULT_ALWAYS,
DataFreshness.SYSTEM_DEFAULT,
Duration.ofMinutes(20),
DEFAULT_QUERY_TIMEOUT,
DEFAULT_CONNECTION_TIMEOUT,
Collections.emptyMap()) {
public MongoClientSettings toSettings() {
final MongoClientSettings.Builder settingsBuilder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,16 @@
public class ConnectionConfig {
private static final String DEFAULT_APP_NAME = "document-store";

public static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(10);
public static final Duration DEFAULT_QUERY_TIMEOUT = Duration.ofMinutes(20);

@Singular @NonNull List<@NonNull Endpoint> endpoints;
@NonNull String database;
@Nullable ConnectionCredentials credentials;
@NonNull AggregatePipelineMode aggregationPipelineMode;
@NonNull DataFreshness dataFreshness;
@NonNull Duration queryTimeout;
@NonNull Duration connectionTimeout;
@NonNull Map<String, String> customParameters;

public ConnectionConfig(
Expand All @@ -50,7 +54,8 @@ public ConnectionConfig(
credentials,
AggregatePipelineMode.DEFAULT_ALWAYS,
DataFreshness.SYSTEM_DEFAULT,
Duration.ofMinutes(20),
DEFAULT_QUERY_TIMEOUT,
DEFAULT_CONNECTION_TIMEOUT,
customParameters != null ? customParameters : Collections.emptyMap());
}

Expand Down Expand Up @@ -79,7 +84,8 @@ public ConnectionConfigBuilder customParameter(String key, String value) {
ConnectionPoolConfig connectionPoolConfig;
AggregatePipelineMode aggregationPipelineMode = AggregatePipelineMode.DEFAULT_ALWAYS;
DataFreshness dataFreshness = DataFreshness.SYSTEM_DEFAULT;
Duration queryTimeout = Duration.ofMinutes(20);
Duration queryTimeout = DEFAULT_QUERY_TIMEOUT;
Duration connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;

public ConnectionConfigBuilder type(final DatabaseType type) {
this.type = type;
Expand Down Expand Up @@ -110,6 +116,7 @@ public ConnectionConfig build() {
aggregationPipelineMode,
dataFreshness,
queryTimeout,
connectionTimeout,
customParameters);

case POSTGRES:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class TypesafeConfigDatastoreConfigExtractor {
private static final String DEFAULT_AGGREGATION_PIPELINE_MODE_KEY = "aggregationPipelineMode";
private static final String DEFAULT_DATA_FRESHNESS_KEY = "dataFreshness";
private static final String DEFAULT_QUERY_TIMEOUT_KEY = "queryTimeout";
private static final String DEFAULT_CONNECTION_TIMEOUT_KEY = "connectionTimeout";
private static final String DEFAULT_CUSTOM_PARAMETERS_PREFIX = "customParams";

@NonNull Config config;
Expand Down Expand Up @@ -80,6 +81,7 @@ private TypesafeConfigDatastoreConfigExtractor(
.aggregationPipelineMode(DEFAULT_AGGREGATION_PIPELINE_MODE_KEY)
.dataFreshnessKey(DEFAULT_DATA_FRESHNESS_KEY)
.queryTimeoutKey(DEFAULT_QUERY_TIMEOUT_KEY)
.connectionTimeoutKey(DEFAULT_CONNECTION_TIMEOUT_KEY)
.customParametersKey(DEFAULT_CUSTOM_PARAMETERS_PREFIX);
}

Expand Down Expand Up @@ -244,6 +246,13 @@ public TypesafeConfigDatastoreConfigExtractor queryTimeoutKey(@NonNull final Str
return this;
}

public TypesafeConfigDatastoreConfigExtractor connectionTimeoutKey(@NonNull final String key) {
if (config.hasPath(key)) {
connectionConfigBuilder.connectionTimeout(config.getDuration(key));
}
return this;
}

public DatastoreConfig extract() {
if (connectionConfigBuilder.endpoints().isEmpty()
&& !Endpoint.builder().build().equals(endpointBuilder.build())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.mongodb.ServerAddress;
import com.mongodb.connection.ClusterSettings;
import com.mongodb.connection.ConnectionPoolSettings;
import com.mongodb.connection.SocketSettings;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -55,6 +56,7 @@ public MongoConnectionConfig(
@NonNull final AggregatePipelineMode aggregationPipelineMode,
@NonNull final DataFreshness dataFreshness,
@NonNull final Duration queryTimeout,
@NonNull final Duration connectionTimeout,
@NonNull final Map<String, String> customParameters) {
super(
ensureAtLeastOneEndpoint(endpoints),
Expand All @@ -63,6 +65,7 @@ public MongoConnectionConfig(
aggregationPipelineMode,
dataFreshness,
queryTimeout,
connectionTimeout,
customParameters);
this.applicationName = applicationName;
this.replicaSetName = replicaSetName;
Expand All @@ -79,6 +82,7 @@ public MongoClientSettings toSettings() {
applyClusterSettings(settingsBuilder);
applyConnectionPoolSettings(settingsBuilder);
applyCredentialSettings(settingsBuilder);
applyConnectionTimeoutSettings(settingsBuilder);

return settingsBuilder.build();
}
Expand Down Expand Up @@ -174,4 +178,12 @@ private void applyCredentialSettings(final Builder settingsBuilder) {
settingsBuilder.credential(credential);
}
}

private void applyConnectionTimeoutSettings(final Builder settingsBuilder) {
SocketSettings socketSettings =
SocketSettings.builder()
.connectTimeout(connectionTimeout().toMillis(), TimeUnit.MILLISECONDS)
.build();
settingsBuilder.applyToSocketSettings(builder -> builder.applySettings(socketSettings));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class TypesafeConfigDatastoreConfigExtractorTest {
private static final String AGGREGATION_PIPELINE_MODE_KEY = "aggregationPipelineMode";
private static final String DATA_FRESHNESS_KEY = "dataFreshness";
private static final String QUERY_TIMEOUT_KEY = "queryTimeout";
private static final String CONNECTION_TIMEOUT_KEY = "connectionTimeout";

private static final String host = "red.planet";
private static final String host1 = "RED_PLANET";
Expand All @@ -50,6 +51,7 @@ class TypesafeConfigDatastoreConfigExtractorTest {
private static final AggregatePipelineMode aggregatePipelineMode = SORT_OPTIMIZED_IF_POSSIBLE;
private static final DataFreshness dataFreshness = NEAR_REALTIME_FRESHNESS;
private static final Duration queryTimeout = Duration.ofSeconds(45);
private static final Duration connectionTimeout = Duration.ofSeconds(30);

@SuppressWarnings("ConstantConditions")
@Test
Expand Down Expand Up @@ -109,6 +111,7 @@ void testBuildMongo() {
.aggregationPipelineMode(AGGREGATION_PIPELINE_MODE_KEY)
.dataFreshnessKey(DATA_FRESHNESS_KEY)
.queryTimeoutKey(QUERY_TIMEOUT_KEY)
.connectionTimeoutKey(CONNECTION_TIMEOUT_KEY)
.extract()
.connectionConfig();
final ConnectionConfig expected =
Expand All @@ -134,6 +137,7 @@ void testBuildMongo() {
.dataFreshness(dataFreshness)
.aggregationPipelineMode(aggregatePipelineMode)
.queryTimeout(queryTimeout)
.connectionTimeout(connectionTimeout)
.build();

assertEquals(expected, config);
Expand Down Expand Up @@ -363,6 +367,7 @@ private Config buildConfigMap() {
entry(CONNECTION_SURRENDER_TIMEOUT_KEY, surrenderTimeout),
entry(AGGREGATION_PIPELINE_MODE_KEY, SORT_OPTIMIZED_IF_POSSIBLE.name()),
entry(DATA_FRESHNESS_KEY, NEAR_REALTIME_FRESHNESS.name()),
entry(CONNECTION_TIMEOUT_KEY, connectionTimeout),
entry(QUERY_TIMEOUT_KEY, queryTimeout)));
}

Expand Down
Loading