Skip to content

Commit 427d290

Browse files
authored
Add logging function to HoptimatorConnection so its clients can retrieve logs by registering hooks (#154)
Clients can retrieve logs at HoptimatorDdlExecutor level by registering hooks.
1 parent 9f1c12e commit 427d290

File tree

4 files changed

+103
-4
lines changed

4 files changed

+103
-4
lines changed

hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorConnection.java

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
import com.linkedin.hoptimator.Database;
44
import com.linkedin.hoptimator.Sink;
55
import com.linkedin.hoptimator.Source;
6+
import com.linkedin.hoptimator.avro.AvroConverter;
67
import com.linkedin.hoptimator.util.ConnectionService;
8+
import com.linkedin.hoptimator.util.DelegatingConnection;
79
import java.sql.PreparedStatement;
810
import java.sql.SQLException;
911
import java.sql.Statement;
@@ -13,17 +15,14 @@
1315
import java.util.Map;
1416
import java.util.Objects;
1517
import java.util.Properties;
18+
import java.util.function.Consumer;
1619
import java.util.stream.Collectors;
17-
1820
import org.apache.avro.Schema;
1921
import org.apache.calcite.jdbc.CalciteConnection;
2022
import org.apache.calcite.jdbc.CalcitePrepare;
2123
import org.apache.calcite.jdbc.CalciteSchema;
2224
import org.apache.calcite.plan.RelOptMaterialization;
2325
import org.apache.calcite.rel.RelNode;
24-
25-
import com.linkedin.hoptimator.avro.AvroConverter;
26-
import com.linkedin.hoptimator.util.DelegatingConnection;
2726
import org.apache.calcite.util.Util;
2827

2928

@@ -33,6 +32,8 @@ public class HoptimatorConnection extends DelegatingConnection {
3332
private final Properties connectionProperties;
3433
private final List<RelOptMaterialization> materializations = new ArrayList<>();
3534

35+
private final List<Consumer<String>> logHooks = new ArrayList<>();
36+
3637
public HoptimatorConnection(CalciteConnection connection, Properties connectionProperties) {
3738
super(connection);
3839
this.connection = connection;
@@ -95,6 +96,21 @@ public void registerMaterialization(List<String> viewPath, String querySql) {
9596
registerMaterialization(viewPath, tableRel, queryRel);
9697
}
9798

99+
/**
100+
* Returns a logger for a client of this connection. The logger logs to both SLF4J and hooks.
101+
*/
102+
HoptimatorConnectionDualLogger getLogger(Class<?> clazz) {
103+
return new HoptimatorConnectionDualLogger(clazz, logHooks);
104+
}
105+
106+
/**
107+
* Adds a log hook to all loggers. The hook will be called for all log messages for all statements executed regardless of thread ownership.
108+
* TODO: Revise to allow hooks to be added per statement.
109+
*/
110+
public void addLogHook(Consumer<String> hook) {
111+
logHooks.add(hook);
112+
}
113+
98114
private void registerMaterialization(List<String> viewPath, RelNode tableRel, RelNode queryRel) {
99115
materializations.add(new RelOptMaterialization(tableRel, queryRel, null, viewPath));
100116
}
@@ -114,4 +130,29 @@ private static String databaseName(CalcitePrepare.Context context, List<String>
114130
}
115131
return ((Database) schema.schema).databaseName();
116132
}
133+
134+
/**
135+
* A logger that logs to both SLF4J logger and registered hooks.
136+
*/
137+
static class HoptimatorConnectionDualLogger {
138+
private final String className;
139+
private final org.slf4j.Logger slf4jLogger;
140+
private final List<Consumer<String>> hooks;
141+
142+
HoptimatorConnectionDualLogger(Class<?> clazz, List<Consumer<String>> hooks) {
143+
this.className = clazz.getSimpleName();
144+
this.slf4jLogger = org.slf4j.LoggerFactory.getLogger(clazz);
145+
this.hooks = hooks;
146+
}
147+
148+
/**
149+
* Log a message with slf4j format at the INFO level.
150+
*/
151+
public void info(String format, Object... arguments) {
152+
slf4jLogger.info(format, arguments);
153+
String msg = org.slf4j.helpers.MessageFormatter.arrayFormat(format, arguments).getMessage();
154+
String msgWithClassName = String.format("[%s] %s", className, msg);
155+
hooks.forEach(hook -> hook.accept(msgWithClassName));
156+
}
157+
}
117158
}

hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,11 @@
7474
public final class HoptimatorDdlExecutor extends ServerDdlExecutor {
7575

7676
private final HoptimatorConnection connection;
77+
private final HoptimatorConnection.HoptimatorConnectionDualLogger logger;
7778

7879
public HoptimatorDdlExecutor(HoptimatorConnection connection) {
7980
this.connection = connection;
81+
this.logger = connection.getLogger(HoptimatorDdlExecutor.class);
8082
}
8183

8284
public static final SqlParserImplFactory PARSER_FACTORY = new SqlParserImplFactory() {
@@ -98,6 +100,7 @@ public DdlExecutor getDdlExecutor() {
98100
/** Executes a {@code CREATE VIEW} command. */
99101
@Override
100102
public void execute(SqlCreateView create, CalcitePrepare.Context context) {
103+
logger.info("Validating statement: {}", create);
101104
try {
102105
ValidationService.validateOrThrow(create);
103106
} catch (SQLException e) {
@@ -133,18 +136,27 @@ public void execute(SqlCreateView create, CalcitePrepare.Context context) {
133136
RelProtoDataType protoType = RelDataTypeImpl.proto(analyzed.rowType);
134137
ViewTable viewTable = new ViewTable(Object.class, protoType, sql, schemaPath, viewPath);
135138
View view = new View(viewPath, sql);
139+
logger.info("Validated sql statement. The view is named {} and has path {}",
140+
viewName, viewPath);
136141

137142
Collection<Deployer> deployers = null;
138143
try {
144+
logger.info("Validating view {} with deployers", viewName);
139145
ValidationService.validateOrThrow(viewTable);
140146
deployers = DeploymentService.deployers(view, connection);
141147
ValidationService.validateOrThrow(deployers);
148+
logger.info("Validated view {}", viewName);
142149
if (create.getReplace()) {
150+
logger.info("Deploying update view {}", viewName);
143151
DeploymentService.update(deployers);
144152
} else {
153+
logger.info("Deploying create view {}", viewName);
145154
DeploymentService.create(deployers);
146155
}
156+
logger.info("Deployed view {}", viewName);
147157
schemaPlus.add(viewName, viewTable);
158+
logger.info("Added view {} to schema {}", viewName, schemaPlus.getName());
159+
logger.info("CREATE VIEW {} completed", viewName);
148160
} catch (Exception e) {
149161
if (deployers != null) {
150162
DeploymentService.restore(deployers);
@@ -158,6 +170,7 @@ public void execute(SqlCreateView create, CalcitePrepare.Context context) {
158170
/** Executes a {@code CREATE MATERIALIZED VIEW} command. */
159171
@Override
160172
public void execute(SqlCreateMaterializedView create, CalcitePrepare.Context context) {
173+
logger.info("Validating statement: {}", create);
161174
try {
162175
ValidationService.validateOrThrow(create);
163176
} catch (SQLException e) {
@@ -214,6 +227,9 @@ public void execute(SqlCreateMaterializedView create, CalcitePrepare.Context con
214227
MaterializedViewTable materializedViewTable = new MaterializedViewTable(viewTable);
215228
RelDataType viewRowType = materializedViewTable.getRowType(typeFactory);
216229

230+
logger.info("Validated sql statement. The view is named {} and has path {}",
231+
viewName, viewPath);
232+
217233
// Support "partial views", i.e. CREATE VIEW FOO$BAR, where the view name
218234
// is "foo-bar" and the sink is just FOO.
219235
String[] viewParts = viewName.split("\\$", 2);
@@ -222,6 +238,7 @@ public void execute(SqlCreateMaterializedView create, CalcitePrepare.Context con
222238
if (viewParts.length > 1) {
223239
pipelineName = pipelineName + "-" + viewParts[1];
224240
}
241+
logger.info("Pipeline name for view {} is {}", viewName, pipelineName);
225242
Properties connectionProperties = connection.connectionProperties();
226243
connectionProperties.setProperty(DeploymentService.PIPELINE_OPTION, pipelineName);
227244
List<String> sinkPath = new ArrayList<>(schemaPath);
@@ -244,17 +261,23 @@ public void execute(SqlCreateMaterializedView create, CalcitePrepare.Context con
244261

245262
// Need to add the view table to the connection so that the ConnectorService can find it when resolving options.
246263
schemaPlus.add(viewName, materializedViewTable);
264+
logger.info("Added view {} to schema {}", viewName, schemaPlus.getName());
247265
Pipeline pipeline = plan.pipeline(viewName, connection);
248266
MaterializedView hook = new MaterializedView(database, viewPath, sql, pipeline.job().sql(), pipeline);
249267
// TODO support CREATE ... WITH (options...)
250268
ValidationService.validateOrThrow(hook);
251269
deployers = DeploymentService.deployers(hook, connection);
270+
logger.info("Validating view {} with deployers", viewName);
252271
ValidationService.validateOrThrow(deployers);
272+
logger.info("Validated view {}", viewName);
253273
if (create.getReplace()) {
274+
logger.info("Deploying update view {}", viewName);
254275
DeploymentService.update(deployers);
255276
} else {
277+
logger.info("Deploying create view {}", viewName);
256278
DeploymentService.create(deployers);
257279
}
280+
logger.info("Deployed view {}", viewName);
258281
} catch (SQLException | RuntimeException e) {
259282
if (deployers != null) {
260283
DeploymentService.restore(deployers);
@@ -266,6 +289,7 @@ public void execute(SqlCreateMaterializedView create, CalcitePrepare.Context con
266289
}
267290
throw new DdlException(create, e.getMessage(), e);
268291
}
292+
logger.info("CREATE MATERIALIZED VIEW {} completed", viewName);
269293
}
270294

271295
// N.B. largely copy-pasted from Apache Calcite

hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/JdbcTestBase.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,15 @@ protected void sql(String sql) throws SQLException {
3434
}
3535
}
3636

37+
protected List<String> sqlReturnsLogs(String sql) throws SQLException {
38+
var logs = new ArrayList<String>();
39+
((HoptimatorConnection) conn).addLogHook(logs::add);
40+
try (Statement stmt = conn.createStatement()) {
41+
stmt.executeUpdate(sql);
42+
}
43+
return logs;
44+
}
45+
3746
protected void assertQueriesEqual(String q1, String q2) throws SQLException {
3847
assertResultSetsEqual(query(q1), query(q2));
3948
}

hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/TestBasicSql.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@
22

33
import java.util.Arrays;
44

5+
import org.junit.jupiter.api.Assertions;
56
import org.junit.jupiter.api.Test;
67

8+
import java.util.List;
9+
710

811
public class TestBasicSql extends JdbcTestBase {
912

@@ -28,4 +31,26 @@ public void insertIntoSelectFrom() throws Exception {
2831
sql("DROP TABLE T1");
2932
sql("DROP TABLE T2");
3033
}
34+
35+
@Test
36+
public void createView() throws Exception {
37+
sql("CREATE TABLE T (X VARCHAR, Y VARCHAR)");
38+
sql("INSERT INTO T VALUES ('one', 'two')");
39+
assertQueriesEqual("SELECT * FROM T", "VALUES ('one', 'two')");
40+
var logs = sqlReturnsLogs("CREATE VIEW V AS SELECT * FROM T");
41+
assertResultSetsEqual(query("SELECT * FROM V"), query("SELECT * FROM T"));
42+
sql("DROP VIEW V");
43+
sql("DROP TABLE T");
44+
45+
var expectedLogs = List.of(
46+
"[HoptimatorDdlExecutor] Validating statement: CREATE VIEW `V` AS\nSELECT *\nFROM `T`",
47+
"[HoptimatorDdlExecutor] Validated sql statement. The view is named V and has path [DEFAULT, V]",
48+
"[HoptimatorDdlExecutor] Validating view V with deployers",
49+
"[HoptimatorDdlExecutor] Validated view V",
50+
"[HoptimatorDdlExecutor] Deploying create view V",
51+
"[HoptimatorDdlExecutor] Deployed view V",
52+
"[HoptimatorDdlExecutor] Added view V to schema DEFAULT",
53+
"[HoptimatorDdlExecutor] CREATE VIEW V completed");
54+
Assertions.assertEquals(expectedLogs, logs);
55+
}
3156
}

0 commit comments

Comments
 (0)