Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.mongodb.MongoClientSettings;
import com.typesafe.config.Config;
import java.time.Duration;
import java.util.Collections;
import org.hypertrace.core.documentstore.model.config.AggregatePipelineMode;
import org.hypertrace.core.documentstore.model.config.DatabaseType;
import org.hypertrace.core.documentstore.model.config.DatastoreConfig;
Expand Down Expand Up @@ -34,7 +35,8 @@ public DatastoreConfig convert(final Config config) {
null,
AggregatePipelineMode.DEFAULT_ALWAYS,
DataFreshness.SYSTEM_DEFAULT,
Duration.ofMinutes(20)) {
Duration.ofMinutes(20),
Collections.emptyMap()) {
public MongoClientSettings toSettings() {
final MongoClientSettings.Builder settingsBuilder =
MongoClientSettings.builder()
Expand Down Expand Up @@ -88,7 +90,8 @@ public DatastoreConfig convert(final Config config) {
connectionConfig.database(),
connectionConfig.credentials(),
connectionConfig.applicationName(),
connectionConfig.connectionPoolConfig()) {
connectionConfig.connectionPoolConfig(),
connectionConfig.customParameters()) {
@Override
public String toConnectionString() {
return config.hasPath("url")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
import com.google.common.base.Preconditions;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
Expand Down Expand Up @@ -34,18 +37,21 @@
@NonNull AggregatePipelineMode aggregationPipelineMode;
@NonNull DataFreshness dataFreshness;
@NonNull Duration queryTimeout;
@NonNull Map<String, String> customParameters;

public ConnectionConfig(
@NonNull List<@NonNull Endpoint> endpoints,
@NonNull String database,
@Nullable ConnectionCredentials credentials) {
@Nullable ConnectionCredentials credentials,
Map<String, String> customParameters) {
this(
endpoints,
database,
credentials,
AggregatePipelineMode.DEFAULT_ALWAYS,
DataFreshness.SYSTEM_DEFAULT,
Duration.ofMinutes(20));
Duration.ofMinutes(20),
customParameters != null ? customParameters : Collections.emptyMap());
}

public static ConnectionConfigBuilder builder() {
Expand All @@ -63,6 +69,13 @@
ConnectionCredentials credentials;
String applicationName = DEFAULT_APP_NAME;
String replicaSet;
Map<String, String> customParameters = new HashMap<>();

public ConnectionConfigBuilder customParameter(String key, String value) {
this.customParameters.put(key, value);
return this;

Check warning on line 76 in document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java

View check run for this annotation

Codecov / codecov/patch

document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java#L75-L76

Added lines #L75 - L76 were not covered by tests
}

ConnectionPoolConfig connectionPoolConfig;
AggregatePipelineMode aggregationPipelineMode = AggregatePipelineMode.DEFAULT_ALWAYS;
DataFreshness dataFreshness = DataFreshness.SYSTEM_DEFAULT;
Expand Down Expand Up @@ -96,15 +109,17 @@
connectionPoolConfig,
aggregationPipelineMode,
dataFreshness,
queryTimeout);
queryTimeout,
customParameters);

case POSTGRES:
return new PostgresConnectionConfig(
unmodifiableList(endpoints),
database,
credentials,
applicationName,
connectionPoolConfig);
connectionPoolConfig,
customParameters);
}

throw new IllegalArgumentException("Unsupported database type: " + type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@
import org.hypertrace.core.documentstore.model.config.DatastoreConfig.DatastoreConfigBuilder;
import org.hypertrace.core.documentstore.model.config.Endpoint.EndpointBuilder;
import org.hypertrace.core.documentstore.model.options.DataFreshness;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Value
public class TypesafeConfigDatastoreConfigExtractor {
private static final Logger LOGGER =
LoggerFactory.getLogger(TypesafeConfigDatastoreConfigExtractor.class);
private static final String DEFAULT_HOST_KEY = "host";
private static final String DEFAULT_PORT_KEY = "port";
private static final String DEFAULT_ENDPOINTS_KEY = "endpoints";
Expand All @@ -29,6 +33,7 @@
private static final String DEFAULT_AGGREGATION_PIPELINE_MODE_KEY = "aggregationPipelineMode";
private static final String DEFAULT_DATA_FRESHNESS_KEY = "dataFreshness";
private static final String DEFAULT_QUERY_TIMEOUT_KEY = "queryTimeout";
private static final String DEFAULT_CUSTOM_PARAMETERS_PREFIX = "customParams";

@NonNull Config config;
DatastoreConfigBuilder datastoreConfigBuilder;
Expand Down Expand Up @@ -74,7 +79,8 @@
.poolConnectionSurrenderTimeoutKey(DEFAULT_CONNECTION_IDLE_TIME_KEY)
.aggregationPipelineMode(DEFAULT_AGGREGATION_PIPELINE_MODE_KEY)
.dataFreshnessKey(DEFAULT_DATA_FRESHNESS_KEY)
.queryTimeoutKey(DEFAULT_QUERY_TIMEOUT_KEY);
.queryTimeoutKey(DEFAULT_QUERY_TIMEOUT_KEY)
.customParametersKey(DEFAULT_CUSTOM_PARAMETERS_PREFIX);
}

public static TypesafeConfigDatastoreConfigExtractor from(
Expand Down Expand Up @@ -169,6 +175,27 @@
return this;
}

public TypesafeConfigDatastoreConfigExtractor customParametersKey(@NonNull final String key) {
if (config.hasPath(key)) {
try {
// Try to extract parameters as an object (Config)
Config paramConfig = config.getConfig(key);
paramConfig
.entrySet()
.forEach(

Check warning on line 185 in document-store/src/main/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractor.java

View check run for this annotation

Codecov / codecov/patch

document-store/src/main/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractor.java#L182-L185

Added lines #L182 - L185 were not covered by tests
entry -> {
connectionConfigBuilder.customParameter(
entry.getKey(), paramConfig.getString(entry.getKey()));
});
} catch (Exception e) {

Check warning on line 190 in document-store/src/main/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractor.java

View check run for this annotation

Codecov / codecov/patch

document-store/src/main/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractor.java#L187-L190

Added lines #L187 - L190 were not covered by tests
// If not a Config object, log warning
LOGGER.warn("Custom parameters key '{}' exists but is not a config object", key);
}

Check warning on line 193 in document-store/src/main/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractor.java

View check run for this annotation

Codecov / codecov/patch

document-store/src/main/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractor.java#L192-L193

Added lines #L192 - L193 were not covered by tests
}

return this;
}

public TypesafeConfigDatastoreConfigExtractor poolMaxConnectionsKey(@NonNull final String key) {
if (config.hasPath(key)) {
connectionPoolConfigBuilder.maxConnections(config.getInt(key));
Expand Down Expand Up @@ -228,6 +255,7 @@
connectionConfigBuilder
.connectionPoolConfig(connectionPoolConfigBuilder.build())
.credentials(connectionCredentialsBuilder.build())
.customParameters(connectionConfigBuilder.customParameters())
.build())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -53,14 +54,16 @@ public MongoConnectionConfig(
@Nullable final ConnectionPoolConfig connectionPoolConfig,
@NonNull final AggregatePipelineMode aggregationPipelineMode,
@NonNull final DataFreshness dataFreshness,
@NonNull final Duration queryTimeout) {
@NonNull final Duration queryTimeout,
@NonNull final Map<String, String> customParameters) {
super(
ensureAtLeastOneEndpoint(endpoints),
getDatabaseOrDefault(database),
getCredentialsOrDefault(credentials, database),
aggregationPipelineMode,
dataFreshness,
queryTimeout);
queryTimeout,
customParameters);
this.applicationName = applicationName;
this.replicaSetName = replicaSetName;
this.connectionPoolConfig = getConnectionPoolConfigOrDefault(connectionPoolConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -45,11 +46,13 @@ public PostgresConnectionConfig(
@Nullable final String database,
@Nullable final ConnectionCredentials credentials,
@NonNull final String applicationName,
@Nullable final ConnectionPoolConfig connectionPoolConfig) {
@Nullable final ConnectionPoolConfig connectionPoolConfig,
@NonNull final Map<String, String> customParameters) {
super(
ensureSingleEndpoint(endpoints),
getDatabaseOrDefault(database),
getCredentialsOrDefault(credentials));
getCredentialsOrDefault(credentials),
customParameters);
this.applicationName = applicationName;
this.connectionPoolConfig = getConnectionPoolConfigOrDefault(connectionPoolConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig;
import org.hypertrace.core.documentstore.model.config.postgres.PostgresDefaults;
Expand Down Expand Up @@ -51,6 +52,10 @@ public Connection getPooledConnection() throws SQLException {
return connectionPool.getConnection();
}

public Map<String, String> getCustomParameters() {
return connectionConfig.customParameters();
}

public void close() {
if (connection != null) {
try {
Expand Down
Loading