Skip to content

Commit 9254122

Browse files
authored
Timestamp Configs for FlatPostgresCollection (#274)
* WIP * Update config keys * Added more coverage * Remove FlatStoreConstants.java * Revert "Remove FlatStoreConstants.java" This reverts commit e86721a. * Added more coverage * Added FlatPostgresCollectionTest * WIP * Fix failing tests * Trigger CI * WIP * Fix failing test cases * WIP * Added more UTs for coverage * Added more UTs for coverage * Spotless * Fix conflicts
1 parent 683b702 commit 9254122

File tree

11 files changed

+705
-7
lines changed

11 files changed

+705
-7
lines changed

document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java

Lines changed: 215 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.sql.Connection;
1818
import java.sql.PreparedStatement;
1919
import java.sql.ResultSet;
20+
import java.sql.Timestamp;
2021
import java.util.ArrayList;
2122
import java.util.Collections;
2223
import java.util.HashMap;
@@ -96,6 +97,12 @@ public static void init() throws IOException {
9697
postgresConfig.put("url", postgresConnectionUrl);
9798
postgresConfig.put("user", "postgres");
9899
postgresConfig.put("password", "postgres");
100+
postgresConfig.put(
101+
"postgres.collectionConfigs." + FLAT_COLLECTION_NAME + ".timestampFields.created",
102+
"createdTime");
103+
postgresConfig.put(
104+
"postgres.collectionConfigs." + FLAT_COLLECTION_NAME + ".timestampFields.lastUpdated",
105+
"lastUpdateTime");
99106

100107
postgresDatastore =
101108
DatastoreProvider.getDatastore("Postgres", ConfigFactory.parseMap(postgresConfig));
@@ -126,7 +133,9 @@ private static void createFlatCollectionSchema() {
126133
+ "\"big_number\" BIGINT,"
127134
+ "\"rating\" REAL,"
128135
+ "\"created_date\" DATE,"
129-
+ "\"weight\" DOUBLE PRECISION"
136+
+ "\"weight\" DOUBLE PRECISION,"
137+
+ "\"createdTime\" BIGINT,"
138+
+ "\"lastUpdateTime\" TIMESTAMP WITH TIME ZONE"
130139
+ ");",
131140
FLAT_COLLECTION_NAME);
132141

@@ -3830,4 +3839,209 @@ void testDrop() {
38303839
assertThrows(UnsupportedOperationException.class, () -> flatCollection.drop());
38313840
}
38323841
}
3842+
3843+
@Nested
3844+
@DisplayName("Timestamp Auto-Population Tests")
3845+
class TimestampTests {
3846+
3847+
@Test
3848+
@DisplayName(
3849+
"Should auto-populate createdTime (BIGINT) and lastUpdateTime (TIMESTAMPTZ) on create")
3850+
void testTimestampsOnCreate() throws Exception {
3851+
long beforeCreate = System.currentTimeMillis();
3852+
3853+
ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
3854+
objectNode.put("id", "ts-test-1");
3855+
objectNode.put("item", "TimestampTestItem");
3856+
objectNode.put("price", 100);
3857+
Document document = new JSONDocument(objectNode);
3858+
Key key = new SingleValueKey(DEFAULT_TENANT, "ts-test-1");
3859+
3860+
CreateResult result = flatCollection.create(key, document);
3861+
assertTrue(result.isSucceed());
3862+
3863+
long afterCreate = System.currentTimeMillis();
3864+
3865+
PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore;
3866+
try (Connection conn = pgDatastore.getPostgresClient();
3867+
PreparedStatement ps =
3868+
conn.prepareStatement(
3869+
String.format(
3870+
"SELECT \"createdTime\", \"lastUpdateTime\" FROM \"%s\" WHERE \"id\" = '%s'",
3871+
FLAT_COLLECTION_NAME, key));
3872+
ResultSet rs = ps.executeQuery()) {
3873+
assertTrue(rs.next());
3874+
3875+
long createdTime = rs.getLong("createdTime");
3876+
assertFalse(rs.wasNull(), "createdTime should not be NULL");
3877+
assertTrue(
3878+
createdTime >= beforeCreate && createdTime <= afterCreate,
3879+
"createdTime should be within test execution window");
3880+
3881+
Timestamp lastUpdateTime = rs.getTimestamp("lastUpdateTime");
3882+
assertNotNull(lastUpdateTime, "lastUpdateTime should not be NULL");
3883+
assertTrue(
3884+
lastUpdateTime.getTime() >= beforeCreate && lastUpdateTime.getTime() <= afterCreate,
3885+
"lastUpdateTime should be within test execution window");
3886+
}
3887+
}
3888+
3889+
@Test
3890+
@DisplayName("Should preserve createdTime and update lastUpdateTime on upsert")
3891+
void testTimestampsOnUpsert() throws Exception {
3892+
// First create
3893+
ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
3894+
objectNode.put("id", "ts-test-2");
3895+
objectNode.put("item", "UpsertTimestampTest");
3896+
objectNode.put("price", 100);
3897+
Document document = new JSONDocument(objectNode);
3898+
Key key = new SingleValueKey(DEFAULT_TENANT, "ts-test-2");
3899+
3900+
flatCollection.create(key, document);
3901+
3902+
PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore;
3903+
long originalCreatedTime;
3904+
long originalLastUpdateTime;
3905+
try (Connection conn = pgDatastore.getPostgresClient();
3906+
PreparedStatement ps =
3907+
conn.prepareStatement(
3908+
String.format(
3909+
"SELECT \"createdTime\", \"lastUpdateTime\" FROM \"%s\" WHERE \"id\" = '%s'",
3910+
FLAT_COLLECTION_NAME, key.toString()));
3911+
ResultSet rs = ps.executeQuery()) {
3912+
assertTrue(rs.next());
3913+
originalCreatedTime = rs.getLong("createdTime");
3914+
originalLastUpdateTime = rs.getTimestamp("lastUpdateTime").getTime();
3915+
}
3916+
3917+
// Wait a bit to ensure time difference
3918+
Thread.sleep(50);
3919+
3920+
// Upsert (update existing)
3921+
long beforeUpsert = System.currentTimeMillis();
3922+
objectNode.put("price", 200);
3923+
Document updatedDoc = new JSONDocument(objectNode);
3924+
flatCollection.createOrReplace(key, updatedDoc);
3925+
long afterUpsert = System.currentTimeMillis();
3926+
3927+
try (Connection conn = pgDatastore.getPostgresClient();
3928+
PreparedStatement ps =
3929+
conn.prepareStatement(
3930+
String.format(
3931+
"SELECT \"createdTime\", \"lastUpdateTime\" FROM \"%s\" WHERE \"id\" = '%s'",
3932+
FLAT_COLLECTION_NAME, key.toString()));
3933+
ResultSet rs = ps.executeQuery()) {
3934+
assertTrue(rs.next());
3935+
3936+
long newCreatedTime = rs.getLong("createdTime");
3937+
assertEquals(
3938+
originalCreatedTime, newCreatedTime, "createdTime should be preserved on upsert");
3939+
3940+
long newLastUpdateTime = rs.getTimestamp("lastUpdateTime").getTime();
3941+
assertTrue(newLastUpdateTime > originalLastUpdateTime, "lastUpdateTime should be updated");
3942+
assertTrue(
3943+
newLastUpdateTime >= beforeUpsert && newLastUpdateTime <= afterUpsert,
3944+
"lastUpdateTime should be within upsert execution window");
3945+
}
3946+
}
3947+
3948+
@Test
3949+
@DisplayName(
3950+
"Should not throw exception when timestampFields config is missing - cols remain NULL")
3951+
void testNoExceptionWhenTimestampConfigMissing() throws Exception {
3952+
// Create a collection WITHOUT timestampFields config
3953+
String postgresConnectionUrl =
3954+
String.format("jdbc:postgresql://localhost:%s/", postgres.getMappedPort(5432));
3955+
3956+
Map<String, String> configWithoutTimestamps = new HashMap<>();
3957+
configWithoutTimestamps.put("url", postgresConnectionUrl);
3958+
configWithoutTimestamps.put("user", "postgres");
3959+
configWithoutTimestamps.put("password", "postgres");
3960+
// Note: NO customParams.timestampFields config
3961+
3962+
Datastore datastoreWithoutTimestamps =
3963+
DatastoreProvider.getDatastore(
3964+
"Postgres", ConfigFactory.parseMap(configWithoutTimestamps));
3965+
Collection collectionWithoutTimestamps =
3966+
datastoreWithoutTimestamps.getCollectionForType(FLAT_COLLECTION_NAME, DocumentType.FLAT);
3967+
3968+
// Create a document - should NOT throw exception
3969+
ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
3970+
objectNode.put("id", "ts-test-no-config");
3971+
objectNode.put("item", "NoTimestampConfigTest");
3972+
objectNode.put("price", 100);
3973+
Document document = new JSONDocument(objectNode);
3974+
Key key = new SingleValueKey(DEFAULT_TENANT, "ts-test-no-config");
3975+
3976+
CreateResult result = collectionWithoutTimestamps.create(key, document);
3977+
assertTrue(result.isSucceed());
3978+
3979+
// Verify timestamp columns are NULL
3980+
PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore;
3981+
try (Connection conn = pgDatastore.getPostgresClient();
3982+
PreparedStatement ps =
3983+
conn.prepareStatement(
3984+
String.format(
3985+
"SELECT \"createdTime\", \"lastUpdateTime\" FROM \"%s\" WHERE \"id\" = '%s'",
3986+
FLAT_COLLECTION_NAME, key));
3987+
ResultSet rs = ps.executeQuery()) {
3988+
assertTrue(rs.next());
3989+
3990+
assertNull(
3991+
rs.getObject("createdTime"), "createdTime should be NULL when config is missing");
3992+
assertNull(
3993+
rs.getObject("lastUpdateTime"), "lastUpdateTime should be NULL when config is missing");
3994+
}
3995+
}
3996+
3997+
@Test
3998+
@DisplayName(
3999+
"Should not throw exception when timestampFields config is invalid JSON - cols remain NULL")
4000+
void testNoExceptionWhenTimestampConfigInvalidJson() throws Exception {
4001+
// Create a collection with INVALID JSON in timestampFields config
4002+
String postgresConnectionUrl =
4003+
String.format("jdbc:postgresql://localhost:%s/", postgres.getMappedPort(5432));
4004+
4005+
Map<String, String> configWithInvalidJson = new HashMap<>();
4006+
configWithInvalidJson.put("url", postgresConnectionUrl);
4007+
configWithInvalidJson.put("user", "postgres");
4008+
configWithInvalidJson.put("password", "postgres");
4009+
// Invalid JSON - missing quotes, malformed
4010+
configWithInvalidJson.put("customParams.timestampFields", "not valid json {{{");
4011+
4012+
Datastore datastoreWithInvalidConfig =
4013+
DatastoreProvider.getDatastore("Postgres", ConfigFactory.parseMap(configWithInvalidJson));
4014+
Collection collectionWithInvalidConfig =
4015+
datastoreWithInvalidConfig.getCollectionForType(FLAT_COLLECTION_NAME, DocumentType.FLAT);
4016+
4017+
// Create a document - should NOT throw exception
4018+
ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
4019+
objectNode.put("id", "ts-test-invalid-json");
4020+
objectNode.put("item", "InvalidJsonConfigTest");
4021+
objectNode.put("price", 100);
4022+
Document document = new JSONDocument(objectNode);
4023+
Key key = new SingleValueKey(DEFAULT_TENANT, "ts-test-invalid-json");
4024+
4025+
CreateResult result = collectionWithInvalidConfig.create(key, document);
4026+
assertTrue(result.isSucceed());
4027+
4028+
// Verify timestamp columns are NULL (config parsing failed gracefully)
4029+
PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore;
4030+
try (Connection conn = pgDatastore.getPostgresClient();
4031+
PreparedStatement ps =
4032+
conn.prepareStatement(
4033+
String.format(
4034+
"SELECT \"createdTime\", \"lastUpdateTime\" FROM \"%s\" WHERE \"id\" = '%s'",
4035+
FLAT_COLLECTION_NAME, key.toString()));
4036+
ResultSet rs = ps.executeQuery()) {
4037+
assertTrue(rs.next());
4038+
4039+
rs.getLong("createdTime");
4040+
assertTrue(rs.wasNull(), "createdTime should be NULL when config JSON is invalid");
4041+
4042+
rs.getTimestamp("lastUpdateTime");
4043+
assertTrue(rs.wasNull(), "lastUpdateTime should be NULL when config JSON is invalid");
4044+
}
4045+
}
4046+
}
38334047
}

document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ public DatastoreConfig convert(final Config config) {
9494
connectionConfig.applicationName(),
9595
connectionConfig.connectionPoolConfig(),
9696
connectionConfig.queryTimeout(),
97-
connectionConfig.customParameters()) {
97+
connectionConfig.customParameters(),
98+
connectionConfig.collectionConfigs()) {
9899
@Override
99100
public String toConnectionString() {
100101
return config.hasPath("url")

document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,21 @@ public static class ConnectionConfigBuilder {
7575
String applicationName = DEFAULT_APP_NAME;
7676
String replicaSet;
7777
Map<String, String> customParameters = new HashMap<>();
78+
Map<String, org.hypertrace.core.documentstore.model.config.postgres.CollectionConfig>
79+
collectionConfigs = new HashMap<>();
7880

7981
public ConnectionConfigBuilder customParameter(String key, String value) {
8082
this.customParameters.put(key, value);
8183
return this;
8284
}
8385

86+
public ConnectionConfigBuilder collectionConfig(
87+
String collectionName,
88+
org.hypertrace.core.documentstore.model.config.postgres.CollectionConfig config) {
89+
this.collectionConfigs.put(collectionName, config);
90+
return this;
91+
}
92+
8493
ConnectionPoolConfig connectionPoolConfig;
8594
AggregatePipelineMode aggregationPipelineMode = AggregatePipelineMode.DEFAULT_ALWAYS;
8695
DataFreshness dataFreshness = DataFreshness.SYSTEM_DEFAULT;
@@ -127,7 +136,8 @@ public ConnectionConfig build() {
127136
applicationName,
128137
connectionPoolConfig,
129138
queryTimeout,
130-
customParameters);
139+
customParameters,
140+
collectionConfigs);
131141
}
132142

133143
throw new IllegalArgumentException("Unsupported database type: " + type);

document-store/src/main/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractor.java

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,14 @@
1010
import org.hypertrace.core.documentstore.model.config.ConnectionPoolConfig.ConnectionPoolConfigBuilder;
1111
import org.hypertrace.core.documentstore.model.config.DatastoreConfig.DatastoreConfigBuilder;
1212
import org.hypertrace.core.documentstore.model.config.Endpoint.EndpointBuilder;
13+
import org.hypertrace.core.documentstore.model.config.postgres.TimestampFieldsConfig;
1314
import org.hypertrace.core.documentstore.model.options.DataFreshness;
1415
import org.slf4j.Logger;
1516
import org.slf4j.LoggerFactory;
1617

1718
@Value
1819
public class TypesafeConfigDatastoreConfigExtractor {
20+
1921
private static final Logger LOGGER =
2022
LoggerFactory.getLogger(TypesafeConfigDatastoreConfigExtractor.class);
2123
private static final String DEFAULT_HOST_KEY = "host";
@@ -35,6 +37,10 @@ public class TypesafeConfigDatastoreConfigExtractor {
3537
private static final String DEFAULT_QUERY_TIMEOUT_KEY = "queryTimeout";
3638
private static final String DEFAULT_CONNECTION_TIMEOUT_KEY = "connectionTimeout";
3739
private static final String DEFAULT_CUSTOM_PARAMETERS_PREFIX = "customParams";
40+
private static final String DEFAULT_COLLECTION_CONFIGS_PREFIX = "collectionConfigs";
41+
private static final String TIMESTAMP_FIELDS_KEY = "timestampFields";
42+
private static final String TIMESTAMP_CREATED_KEY = "created";
43+
private static final String TIMESTAMP_LAST_UPDATED_KEY = "lastUpdated";
3844

3945
@NonNull Config config;
4046
DatastoreConfigBuilder datastoreConfigBuilder;
@@ -82,7 +88,8 @@ private TypesafeConfigDatastoreConfigExtractor(
8288
.dataFreshnessKey(DEFAULT_DATA_FRESHNESS_KEY)
8389
.queryTimeoutKey(DEFAULT_QUERY_TIMEOUT_KEY)
8490
.connectionTimeoutKey(DEFAULT_CONNECTION_TIMEOUT_KEY)
85-
.customParametersKey(DEFAULT_CUSTOM_PARAMETERS_PREFIX);
91+
.customParametersKey(DEFAULT_CUSTOM_PARAMETERS_PREFIX)
92+
.collectionConfigsKey(dataStoreType + "." + DEFAULT_COLLECTION_CONFIGS_PREFIX);
8693
}
8794

8895
public static TypesafeConfigDatastoreConfigExtractor from(
@@ -253,6 +260,53 @@ public TypesafeConfigDatastoreConfigExtractor connectionTimeoutKey(@NonNull fina
253260
return this;
254261
}
255262

263+
public TypesafeConfigDatastoreConfigExtractor collectionConfigsKey(@NonNull final String key) {
264+
if (!config.hasPath(key)) {
265+
return this;
266+
}
267+
268+
try {
269+
Config collectionsConfig = config.getConfig(key);
270+
collectionsConfig
271+
.root()
272+
.keySet()
273+
.forEach(
274+
collectionName -> {
275+
Config collectionConfig = collectionsConfig.getConfig(collectionName);
276+
org.hypertrace.core.documentstore.model.config.postgres.CollectionConfig
277+
.CollectionConfigBuilder
278+
builder =
279+
org.hypertrace.core.documentstore.model.config.postgres.CollectionConfig
280+
.builder();
281+
282+
if (collectionConfig.hasPath(TIMESTAMP_FIELDS_KEY)) {
283+
builder.timestampFields(
284+
parseTimestampFieldsConfig(collectionConfig.getConfig(TIMESTAMP_FIELDS_KEY)));
285+
}
286+
287+
connectionConfigBuilder.collectionConfig(collectionName, builder.build());
288+
});
289+
} catch (Exception e) {
290+
LOGGER.warn(
291+
"Collection configs key '{}' exists but could not be parsed: {}", key, e.getMessage());
292+
}
293+
294+
return this;
295+
}
296+
297+
private TimestampFieldsConfig parseTimestampFieldsConfig(Config timestampConfig) {
298+
TimestampFieldsConfig.TimestampFieldsConfigBuilder tsBuilder = TimestampFieldsConfig.builder();
299+
300+
if (timestampConfig.hasPath(TIMESTAMP_CREATED_KEY)) {
301+
tsBuilder.created(timestampConfig.getString(TIMESTAMP_CREATED_KEY));
302+
}
303+
if (timestampConfig.hasPath(TIMESTAMP_LAST_UPDATED_KEY)) {
304+
tsBuilder.lastUpdated(timestampConfig.getString(TIMESTAMP_LAST_UPDATED_KEY));
305+
}
306+
307+
return tsBuilder.build();
308+
}
309+
256310
public DatastoreConfig extract() {
257311
if (connectionConfigBuilder.endpoints().isEmpty()
258312
&& !Endpoint.builder().build().equals(endpointBuilder.build())) {
@@ -265,6 +319,7 @@ public DatastoreConfig extract() {
265319
.connectionPoolConfig(connectionPoolConfigBuilder.build())
266320
.credentials(connectionCredentialsBuilder.build())
267321
.customParameters(connectionConfigBuilder.customParameters())
322+
.collectionConfigs(connectionConfigBuilder.collectionConfigs())
268323
.build())
269324
.build();
270325
}

0 commit comments

Comments
 (0)