Skip to content

Commit 4853757

Browse files
authored
Add Schema Registry to Provide Table Metadata (#263)
1 parent a7acef1 commit 4853757

File tree

15 files changed

+1374
-2
lines changed

15 files changed

+1374
-2
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
package org.hypertrace.core.documentstore.postgres;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertFalse;
5+
import static org.junit.jupiter.api.Assertions.assertNotNull;
6+
import static org.junit.jupiter.api.Assertions.assertNotSame;
7+
import static org.junit.jupiter.api.Assertions.assertSame;
8+
import static org.junit.jupiter.api.Assertions.assertTrue;
9+
10+
import com.typesafe.config.Config;
11+
import com.typesafe.config.ConfigFactory;
12+
import java.sql.Connection;
13+
import java.sql.PreparedStatement;
14+
import java.util.HashMap;
15+
import java.util.Map;
16+
import java.util.Optional;
17+
import org.hypertrace.core.documentstore.DatastoreProvider;
18+
import org.hypertrace.core.documentstore.commons.SchemaRegistry;
19+
import org.hypertrace.core.documentstore.expression.impl.DataType;
20+
import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata;
21+
import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType;
22+
import org.junit.jupiter.api.AfterAll;
23+
import org.junit.jupiter.api.BeforeAll;
24+
import org.junit.jupiter.api.BeforeEach;
25+
import org.junit.jupiter.api.Test;
26+
import org.testcontainers.containers.GenericContainer;
27+
import org.testcontainers.containers.wait.strategy.Wait;
28+
import org.testcontainers.junit.jupiter.Testcontainers;
29+
import org.testcontainers.utility.DockerImageName;
30+
31+
@Testcontainers
32+
class PostgresLazyilyLoadedSchemaRegistryIntegrationTest {
33+
34+
private static final String TABLE_NAME = "myTestFlat";
35+
36+
private static GenericContainer<?> postgres;
37+
private static PostgresDatastore datastore;
38+
private static SchemaRegistry<PostgresColumnMetadata> registry;
39+
40+
@BeforeAll
41+
static void init() throws Exception {
42+
postgres =
43+
new GenericContainer<>(DockerImageName.parse("postgres:13.1"))
44+
.withEnv("POSTGRES_PASSWORD", "postgres")
45+
.withEnv("POSTGRES_USER", "postgres")
46+
.withExposedPorts(5432)
47+
.waitingFor(Wait.forListeningPort());
48+
postgres.start();
49+
50+
String connectionUrl =
51+
String.format("jdbc:postgresql://localhost:%s/", postgres.getMappedPort(5432));
52+
53+
Map<String, String> postgresConfig = new HashMap<>();
54+
postgresConfig.put("url", connectionUrl);
55+
postgresConfig.put("user", "postgres");
56+
postgresConfig.put("password", "postgres");
57+
Config config = ConfigFactory.parseMap(postgresConfig);
58+
59+
datastore = (PostgresDatastore) DatastoreProvider.getDatastore("Postgres", config);
60+
61+
createFlatTable();
62+
63+
registry = datastore.getSchemaRegistry();
64+
}
65+
66+
private static void createFlatTable() throws Exception {
67+
String createTableSQL =
68+
String.format(
69+
"CREATE TABLE IF NOT EXISTS \"%s\" ("
70+
+ "\"_id\" INTEGER PRIMARY KEY,"
71+
+ "\"item\" TEXT,"
72+
+ "\"price\" INTEGER,"
73+
+ "\"quantity\" BIGINT,"
74+
+ "\"rating\" REAL,"
75+
+ "\"score\" DOUBLE PRECISION,"
76+
+ "\"date\" TIMESTAMPTZ,"
77+
+ "\"created_date\" DATE,"
78+
+ "\"in_stock\" BOOLEAN,"
79+
+ "\"tags\" TEXT[],"
80+
+ "\"props\" JSONB"
81+
+ ");",
82+
TABLE_NAME);
83+
84+
try (Connection connection = datastore.getPostgresClient();
85+
PreparedStatement statement = connection.prepareStatement(createTableSQL)) {
86+
statement.execute();
87+
System.out.println("Created flat table: " + TABLE_NAME);
88+
}
89+
}
90+
91+
@BeforeEach
92+
void setUp() {
93+
registry.invalidate(TABLE_NAME);
94+
}
95+
96+
@AfterAll
97+
static void shutdown() {
98+
if (postgres != null) {
99+
postgres.stop();
100+
}
101+
}
102+
103+
@Test
104+
void getSchemaReturnsAllColumns() {
105+
Map<String, PostgresColumnMetadata> schema = registry.getSchema(TABLE_NAME);
106+
107+
assertNotNull(schema);
108+
assertEquals(11, schema.size());
109+
assertTrue(schema.containsKey("_id"));
110+
assertTrue(schema.containsKey("item"));
111+
assertTrue(schema.containsKey("price"));
112+
assertTrue(schema.containsKey("quantity"));
113+
assertTrue(schema.containsKey("rating"));
114+
assertTrue(schema.containsKey("score"));
115+
assertTrue(schema.containsKey("date"));
116+
assertTrue(schema.containsKey("created_date"));
117+
assertTrue(schema.containsKey("in_stock"));
118+
assertTrue(schema.containsKey("tags"));
119+
assertTrue(schema.containsKey("props"));
120+
}
121+
122+
@Test
123+
void getSchemaReturnsCorrectIntegerMapping() {
124+
Map<String, PostgresColumnMetadata> schema = registry.getSchema(TABLE_NAME);
125+
126+
PostgresColumnMetadata idMeta = schema.get("_id");
127+
assertEquals("_id", idMeta.getName());
128+
assertEquals(DataType.INTEGER, idMeta.getCanonicalType());
129+
assertEquals(PostgresDataType.INTEGER, idMeta.getPostgresType());
130+
assertFalse(idMeta.isNullable());
131+
132+
PostgresColumnMetadata priceMeta = schema.get("price");
133+
assertEquals(DataType.INTEGER, priceMeta.getCanonicalType());
134+
assertEquals(PostgresDataType.INTEGER, priceMeta.getPostgresType());
135+
assertTrue(priceMeta.isNullable());
136+
}
137+
138+
@Test
139+
void getSchemaReturnsCorrectBigintMapping() {
140+
Map<String, PostgresColumnMetadata> schema = registry.getSchema(TABLE_NAME);
141+
142+
PostgresColumnMetadata quantityMeta = schema.get("quantity");
143+
assertEquals(DataType.LONG, quantityMeta.getCanonicalType());
144+
assertEquals(PostgresDataType.BIGINT, quantityMeta.getPostgresType());
145+
}
146+
147+
@Test
148+
void getSchemaReturnsCorrectFloatMapping() {
149+
Map<String, PostgresColumnMetadata> schema = registry.getSchema(TABLE_NAME);
150+
151+
PostgresColumnMetadata ratingMeta = schema.get("rating");
152+
assertEquals(DataType.FLOAT, ratingMeta.getCanonicalType());
153+
assertEquals(PostgresDataType.REAL, ratingMeta.getPostgresType());
154+
}
155+
156+
@Test
157+
void getSchemaReturnsCorrectDoubleMapping() {
158+
Map<String, PostgresColumnMetadata> schema = registry.getSchema(TABLE_NAME);
159+
160+
PostgresColumnMetadata scoreMeta = schema.get("score");
161+
assertEquals(DataType.DOUBLE, scoreMeta.getCanonicalType());
162+
assertEquals(PostgresDataType.DOUBLE_PRECISION, scoreMeta.getPostgresType());
163+
}
164+
165+
@Test
166+
void getSchemaReturnsCorrectTextMapping() {
167+
Map<String, PostgresColumnMetadata> schema = registry.getSchema(TABLE_NAME);
168+
169+
PostgresColumnMetadata itemMeta = schema.get("item");
170+
assertEquals(DataType.STRING, itemMeta.getCanonicalType());
171+
assertEquals(PostgresDataType.TEXT, itemMeta.getPostgresType());
172+
}
173+
174+
@Test
175+
void getSchemaReturnsCorrectBooleanMapping() {
176+
Map<String, PostgresColumnMetadata> schema = registry.getSchema(TABLE_NAME);
177+
178+
PostgresColumnMetadata inStockMeta = schema.get("in_stock");
179+
assertEquals(DataType.BOOLEAN, inStockMeta.getCanonicalType());
180+
assertEquals(PostgresDataType.BOOLEAN, inStockMeta.getPostgresType());
181+
}
182+
183+
@Test
184+
void getSchemaReturnsCorrectTimestamptzMapping() {
185+
Map<String, PostgresColumnMetadata> schema = registry.getSchema(TABLE_NAME);
186+
187+
PostgresColumnMetadata dateMeta = schema.get("date");
188+
assertEquals(DataType.TIMESTAMPTZ, dateMeta.getCanonicalType());
189+
assertEquals(PostgresDataType.TIMESTAMPTZ, dateMeta.getPostgresType());
190+
}
191+
192+
@Test
193+
void getSchemaReturnsCorrectDateMapping() {
194+
Map<String, PostgresColumnMetadata> schema = registry.getSchema(TABLE_NAME);
195+
196+
PostgresColumnMetadata createdDateMeta = schema.get("created_date");
197+
assertEquals(DataType.DATE, createdDateMeta.getCanonicalType());
198+
assertEquals(PostgresDataType.DATE, createdDateMeta.getPostgresType());
199+
}
200+
201+
@Test
202+
void getSchemaReturnsCorrectJsonbMapping() {
203+
Map<String, PostgresColumnMetadata> schema = registry.getSchema(TABLE_NAME);
204+
205+
PostgresColumnMetadata propsMeta = schema.get("props");
206+
assertEquals(DataType.JSON, propsMeta.getCanonicalType());
207+
assertEquals(PostgresDataType.JSONB, propsMeta.getPostgresType());
208+
}
209+
210+
@Test
211+
void getColumnOrRefreshReturnsExistingColumn() {
212+
Optional<PostgresColumnMetadata> result = registry.getColumnOrRefresh(TABLE_NAME, "item");
213+
214+
assertTrue(result.isPresent());
215+
assertEquals("item", result.get().getName());
216+
assertEquals(DataType.STRING, result.get().getCanonicalType());
217+
}
218+
219+
@Test
220+
void getColumnOrRefreshReturnsEmptyForNonExistentColumn() {
221+
Optional<PostgresColumnMetadata> result =
222+
registry.getColumnOrRefresh(TABLE_NAME, "nonexistent_column");
223+
224+
assertFalse(result.isPresent());
225+
}
226+
227+
@Test
228+
void getColumnOrRefreshFindsNewlyAddedColumnAfterInvalidation() throws Exception {
229+
// First, verify the new column doesn't exist
230+
Optional<PostgresColumnMetadata> before = registry.getColumnOrRefresh(TABLE_NAME, "new_column");
231+
assertFalse(before.isPresent());
232+
233+
// Add a new column to the table
234+
try (Connection connection = datastore.getPostgresClient();
235+
PreparedStatement statement =
236+
connection.prepareStatement(
237+
String.format("ALTER TABLE \"%s\" ADD COLUMN \"new_column\" TEXT", TABLE_NAME))) {
238+
statement.execute();
239+
}
240+
241+
// Invalidate cache to force reload
242+
registry.invalidate(TABLE_NAME);
243+
244+
// Now the registry should find the new column after reload
245+
Optional<PostgresColumnMetadata> after = registry.getColumnOrRefresh(TABLE_NAME, "new_column");
246+
assertTrue(after.isPresent());
247+
assertEquals("new_column", after.get().getName());
248+
assertEquals(DataType.STRING, after.get().getCanonicalType());
249+
250+
// Cleanup: drop the column
251+
try (Connection connection = datastore.getPostgresClient();
252+
PreparedStatement statement =
253+
connection.prepareStatement(
254+
String.format("ALTER TABLE \"%s\" DROP COLUMN \"new_column\"", TABLE_NAME))) {
255+
statement.execute();
256+
}
257+
}
258+
259+
@Test
260+
void cacheReturnsSameInstanceOnSubsequentCalls() {
261+
Map<String, PostgresColumnMetadata> schema1 = registry.getSchema(TABLE_NAME);
262+
Map<String, PostgresColumnMetadata> schema2 = registry.getSchema(TABLE_NAME);
263+
264+
// Should be the same cached instance
265+
assertSame(schema1, schema2);
266+
}
267+
268+
@Test
269+
void invalidateCausesReload() {
270+
Map<String, PostgresColumnMetadata> schema1 = registry.getSchema(TABLE_NAME);
271+
272+
registry.invalidate(TABLE_NAME);
273+
274+
Map<String, PostgresColumnMetadata> schema2 = registry.getSchema(TABLE_NAME);
275+
276+
// Should be different instances after invalidation
277+
assertNotSame(schema1, schema2);
278+
// But same content
279+
assertEquals(schema1.keySet(), schema2.keySet());
280+
}
281+
}

document-store/src/main/java/org/hypertrace/core/documentstore/Datastore.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import java.util.Map;
44
import java.util.Set;
5+
import org.hypertrace.core.documentstore.commons.ColumnMetadata;
6+
import org.hypertrace.core.documentstore.commons.SchemaRegistry;
57
import org.hypertrace.core.documentstore.metric.DocStoreMetricProvider;
68

79
public interface Datastore {
@@ -19,6 +21,10 @@ public interface Datastore {
1921
@SuppressWarnings("unused")
2022
DocStoreMetricProvider getDocStoreMetricProvider();
2123

24+
default <T extends ColumnMetadata> SchemaRegistry<T> getSchemaRegistry() {
25+
return null;
26+
}
27+
2228
void close();
2329

2430
/**
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package org.hypertrace.core.documentstore.commons;
2+
3+
import org.hypertrace.core.documentstore.expression.impl.DataType;
4+
5+
public interface ColumnMetadata {
6+
7+
/**
8+
* @return the col name
9+
*/
10+
String getName();
11+
12+
/**
13+
* @return the col's canonical type, as defined here: {@link DataType}
14+
*/
15+
DataType getCanonicalType();
16+
17+
/**
18+
* @return whether this column can be set to NULL
19+
*/
20+
boolean isNullable();
21+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package org.hypertrace.core.documentstore.commons;
2+
3+
import java.util.Map;
4+
import java.util.Optional;
5+
6+
/**
7+
* SchemaRegistry is an interface for a registry of schemas. This interface does not places any
8+
* restrictions on how schemas are loaded. They can be loaded at bootstrap and cached, or loaded
9+
* lazily, or via any other method.
10+
*
11+
* @param <T> the type of metadata for a particular column in the registry
12+
*/
13+
public interface SchemaRegistry<T extends ColumnMetadata> {
14+
15+
/**
16+
* Returns the schema for a particular table. If the schema is not available for that table, it
17+
* returns null (note that some implementations may fetch the schema if this happens. The
18+
* interface does not make it mandatory).
19+
*
20+
* @param tableName the table for which schema has to be fetched
21+
* @return a map of column name to their metadata
22+
*/
23+
Map<String, T> getSchema(String tableName);
24+
25+
/**
26+
* Invalidates the current schema of the table that the schema registry is holding
27+
*
28+
* @param tableName the table name
29+
*/
30+
void invalidate(String tableName);
31+
32+
/**
33+
* Returns the metadata of a col from the registry. If the metadata is not found, an
34+
* implementation might fetch it from the source synchronously.
35+
*
36+
* @param tableName the table name
37+
* @param colName the col name
38+
* @return optional of the col metadata.
39+
*/
40+
Optional<T> getColumnOrRefresh(String tableName, String colName);
41+
}

document-store/src/main/java/org/hypertrace/core/documentstore/expression/impl/DataType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ public enum DataType {
2222
FLOAT,
2323
DOUBLE,
2424
BOOLEAN,
25+
JSON,
2526
// timestamp with time-zone information. For example: 2004-10-19 10:23:54+02.
2627
// For more info, see: https://www.postgresql.org/docs/current/datatype-datetime.html
2728
TIMESTAMPTZ,

0 commit comments

Comments
 (0)