Skip to content

Commit b9a793f

Browse files
[PECOBLR-642] Add header to metadata commands in SEA mode (databricks#1034)
## Description <!-- Provide a brief summary of the changes made and the issue they aim to address.--> Add header to metadata commands in SEA mode ## Testing <!-- Describe how the changes have been tested--> Unit + manual testing ## Additional Notes to the Reviewer <!-- Share any additional context or insights that may help the reviewer understand the changes better. This could include challenges faced, limitations, or compromises made during the development process. Also, mention any areas of the code that you would like the reviewer to focus on specifically. --> NO_CHANGELOG=true --------- Signed-off-by: Vikrant Puppala <vikrant.puppala@databricks.com>
1 parent 00f64c4 commit b9a793f

File tree

5 files changed

+195
-5
lines changed

5 files changed

+195
-5
lines changed

src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1077,4 +1077,9 @@ public TelemetryLogLevel getTelemetryLogLevel() {
10771077
return TelemetryLogLevel.parse(
10781078
getParameter(DatabricksJdbcUrlParams.TELEMETRY_LOG_LEVEL), TelemetryLogLevel.DEBUG);
10791079
}
1080+
1081+
@Override
1082+
public boolean isSeaSyncMetadataEnabled() {
1083+
return getParameter(DatabricksJdbcUrlParams.ENABLE_SEA_SYNC_METADATA).equals("1");
1084+
}
10801085
}

src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,4 +390,10 @@ public interface IDatabricksConnectionContext {
390390

391391
/* Returns whether metric view metadata is enabled */
392392
boolean getEnableMetricViewMetadata();
393+
394+
/**
395+
* Returns whether the x-databricks-sea-can-run-fully-sync header should be enabled for
396+
* synchronous metadata requests in SEA mode
397+
*/
398+
boolean isSeaSyncMetadataEnabled();
393399
}

src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,12 @@ public enum DatabricksJdbcUrlParams {
167167
IGNORE_TRANSACTIONS("IgnoreTransactions", "Ignore transaction-related method calls", "0"),
168168
ENABLE_METRIC_VIEW_METADATA("EnableMetricViewMetadata", "Enable metric view metadata", "0"),
169169
ENABLE_MULTIPLE_CATALOG_SUPPORT(
170-
"enableMultipleCatalogSupport", "Enable multiple catalog support", "1");
170+
"enableMultipleCatalogSupport", "Enable multiple catalog support", "1"),
171+
ENABLE_SEA_SYNC_METADATA(
172+
"EnableSeaSyncMetadata",
173+
"Enable x-databricks-sea-can-run-fully-sync header for synchronous metadata requests in SEA mode",
174+
"1");
175+
171176
private final String paramName;
172177
private final String defaultValue;
173178
private final String description;

src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ public DatabricksResultSet executeStatement(
192192
ExecuteStatementResponse response;
193193
try {
194194
Request req = new Request(Request.POST, STATEMENT_PATH, apiClient.serialize(request));
195-
req.withHeaders(getHeaders("executeStatement"));
195+
req.withHeaders(getHeaders("executeStatement", statementType, false));
196196
response = apiClient.execute(req, ExecuteStatementResponse.class);
197197
} catch (IOException e) {
198198
String errorMessage = "Error while processing the execute statement request";
@@ -299,9 +299,10 @@ public DatabricksResultSet executeStatementAsync(
299299
session,
300300
parentStatement);
301301
DatabricksThreadContextHolder.setSessionId(session.getSessionId());
302+
StatementType statementType = StatementType.SQL;
302303
ExecuteStatementRequest request =
303304
getRequest(
304-
StatementType.SQL,
305+
statementType,
305306
sql,
306307
((Warehouse) computeResource).getWarehouseId(),
307308
session,
@@ -311,7 +312,7 @@ public DatabricksResultSet executeStatementAsync(
311312
ExecuteStatementResponse response;
312313
try {
313314
Request req = new Request(Request.POST, STATEMENT_PATH, apiClient.serialize(request));
314-
req.withHeaders(getHeaders("executeStatement"));
315+
req.withHeaders(getHeaders("executeStatement", statementType, true));
315316
response = apiClient.execute(req, ExecuteStatementResponse.class);
316317
} catch (IOException e) {
317318
String errorMessage = "Error while processing the execute statement async request";
@@ -335,7 +336,7 @@ public DatabricksResultSet executeStatementAsync(
335336
typedStatementId,
336337
response.getResult(),
337338
response.getManifest(),
338-
StatementType.SQL,
339+
statementType,
339340
session,
340341
parentStatement);
341342
}
@@ -457,18 +458,58 @@ private boolean useCloudFetchForResult(StatementType statementType) {
457458
}
458459

459460
private Map<String, String> getHeaders(String method) {
461+
return getHeaders(method, null, false);
462+
}
463+
464+
private Map<String, String> getHeaders(
465+
String method, StatementType statementType, boolean isAsync) {
460466
Map<String, String> headers = new HashMap<>(JSON_HTTP_HEADERS);
461467
if (connectionContext.isRequestTracingEnabled()) {
462468
String traceHeader = TracingUtil.getTraceHeader();
463469
LOGGER.debug("Tracing header for method {}: [{}]", method, traceHeader);
464470
headers.put(TracingUtil.TRACE_HEADER, traceHeader);
465471
}
466472

473+
// Add SEA sync metadata header when appropriate
474+
if (shouldAddSeaSyncMetadataHeader(statementType, isAsync)) {
475+
headers.put("x-databricks-sea-can-run-fully-sync", "true");
476+
LOGGER.debug(
477+
"Adding x-databricks-sea-can-run-fully-sync header for synchronous metadata request");
478+
}
479+
467480
// Overriding with URL defined headers
468481
headers.putAll(this.connectionContext.getCustomHeaders());
469482
return headers;
470483
}
471484

485+
/**
486+
* Determines whether the x-databricks-sea-can-run-fully-sync header should be added to the
487+
* request.
488+
*
489+
* <p>This header is only added when all of the following conditions are met:
490+
*
491+
* <ul>
492+
* <li>The EnableSeaSyncMetadata URL parameter is enabled (default: true)
493+
* <li>The statement type is METADATA
494+
* <li>The execution mode is synchronous (not async)
495+
* </ul>
496+
*
497+
* <p>The header signals to the server that the metadata operation can be executed fully
498+
* synchronously in SEA mode, enabling optimized execution paths.
499+
*
500+
* <p>Note: This client is only used for SEA mode, so no client type check is needed.
501+
*
502+
* @param statementType the type of statement being executed (e.g., METADATA, QUERY, SQL)
503+
* @param isAsync true if the statement is being executed asynchronously, false for synchronous
504+
* execution
505+
* @return true if the header should be added, false otherwise
506+
*/
507+
private boolean shouldAddSeaSyncMetadataHeader(StatementType statementType, boolean isAsync) {
508+
return connectionContext.isSeaSyncMetadataEnabled()
509+
&& statementType == StatementType.METADATA
510+
&& !isAsync;
511+
}
512+
472513
private ExecuteStatementRequest getRequest(
473514
StatementType statementType,
474515
String sql,

src/test/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClientTest.java

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -589,4 +589,137 @@ private StatementParameterListItem getParam(String type, String value, int ordin
589589
.setType(type)
590590
.setValue(value);
591591
}
592+
593+
@Test
594+
public void testSeaSyncMetadataHeaderIsAdded() throws Exception {
595+
// Test that x-databricks-sea-can-run-fully-sync header is added for SEA + METADATA + sync
596+
setupClientMocks(true, false);
597+
IDatabricksConnectionContext connectionContext =
598+
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
599+
DatabricksSdkClient databricksSdkClient =
600+
new DatabricksSdkClient(connectionContext, statementExecutionService, apiClient);
601+
DatabricksConnection connection =
602+
new DatabricksConnection(connectionContext, databricksSdkClient);
603+
connection.open();
604+
DatabricksStatement statement = new DatabricksStatement(connection);
605+
606+
// Execute a metadata request (synchronous)
607+
databricksSdkClient.executeStatement(
608+
"SHOW CATALOGS",
609+
warehouse,
610+
new HashMap<>(),
611+
StatementType.METADATA,
612+
connection.getSession(),
613+
statement);
614+
615+
// Verify that the request was made with the correct header
616+
verify(apiClient, atLeastOnce())
617+
.execute(
618+
argThat(
619+
req -> {
620+
Map<String, String> headers = req.getHeaders();
621+
return headers != null
622+
&& "true".equals(headers.get("x-databricks-sea-can-run-fully-sync"));
623+
}),
624+
eq(ExecuteStatementResponse.class));
625+
}
626+
627+
@Test
628+
public void testSeaSyncMetadataHeaderNotAddedForAsyncExecution() throws Exception {
629+
// Test that header is NOT added for async execution
630+
setupClientMocks(false, true);
631+
IDatabricksConnectionContext connectionContext =
632+
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
633+
DatabricksSdkClient databricksSdkClient =
634+
new DatabricksSdkClient(connectionContext, statementExecutionService, apiClient);
635+
DatabricksConnection connection =
636+
new DatabricksConnection(connectionContext, databricksSdkClient);
637+
connection.open();
638+
DatabricksStatement statement = new DatabricksStatement(connection);
639+
640+
// Execute an async statement
641+
databricksSdkClient.executeStatementAsync(
642+
"SELECT * FROM table", warehouse, new HashMap<>(), connection.getSession(), statement);
643+
644+
// Verify that the request was made WITHOUT the header
645+
verify(apiClient)
646+
.execute(
647+
argThat(
648+
req -> {
649+
Map<String, String> headers = req.getHeaders();
650+
return headers == null
651+
|| !headers.containsKey("x-databricks-sea-can-run-fully-sync");
652+
}),
653+
eq(ExecuteStatementResponse.class));
654+
}
655+
656+
@Test
657+
public void testSeaSyncMetadataHeaderNotAddedForQueryType() throws Exception {
658+
// Test that header is NOT added for non-METADATA statement types
659+
setupClientMocks(true, false);
660+
IDatabricksConnectionContext connectionContext =
661+
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
662+
DatabricksSdkClient databricksSdkClient =
663+
new DatabricksSdkClient(connectionContext, statementExecutionService, apiClient);
664+
DatabricksConnection connection =
665+
new DatabricksConnection(connectionContext, databricksSdkClient);
666+
connection.open();
667+
DatabricksStatement statement = new DatabricksStatement(connection);
668+
669+
// Execute a regular query (not metadata)
670+
databricksSdkClient.executeStatement(
671+
"SELECT * FROM table",
672+
warehouse,
673+
new HashMap<>(),
674+
StatementType.QUERY,
675+
connection.getSession(),
676+
statement);
677+
678+
// Verify that the request was made WITHOUT the header
679+
verify(apiClient, atLeastOnce())
680+
.execute(
681+
argThat(
682+
req -> {
683+
Map<String, String> headers = req.getHeaders();
684+
return headers == null
685+
|| !headers.containsKey("x-databricks-sea-can-run-fully-sync");
686+
}),
687+
eq(ExecuteStatementResponse.class));
688+
}
689+
690+
@Test
691+
public void testSeaSyncMetadataHeaderNotAddedWhenDisabled() throws Exception {
692+
// Test that header is NOT added when the URL parameter is disabled
693+
setupClientMocks(true, false);
694+
String urlWithDisabledFlag =
695+
"jdbc:databricks://sample-host.18.azuredatabricks.net:4423/default;transportMode=http;ssl=1;AuthMech=3;httpPath=/sql/1.0/warehouses/99999999;EnableSeaSyncMetadata=0;";
696+
IDatabricksConnectionContext connectionContext =
697+
DatabricksConnectionContext.parse(urlWithDisabledFlag, new Properties());
698+
DatabricksSdkClient databricksSdkClient =
699+
new DatabricksSdkClient(connectionContext, statementExecutionService, apiClient);
700+
DatabricksConnection connection =
701+
new DatabricksConnection(connectionContext, databricksSdkClient);
702+
connection.open();
703+
DatabricksStatement statement = new DatabricksStatement(connection);
704+
705+
// Execute a metadata request (should NOT add header because flag is disabled)
706+
databricksSdkClient.executeStatement(
707+
"SHOW CATALOGS",
708+
warehouse,
709+
new HashMap<>(),
710+
StatementType.METADATA,
711+
connection.getSession(),
712+
statement);
713+
714+
// Verify that the request was made WITHOUT the header
715+
verify(apiClient, atLeastOnce())
716+
.execute(
717+
argThat(
718+
req -> {
719+
Map<String, String> headers = req.getHeaders();
720+
return headers == null
721+
|| !headers.containsKey("x-databricks-sea-can-run-fully-sync");
722+
}),
723+
eq(ExecuteStatementResponse.class));
724+
}
592725
}

0 commit comments

Comments
 (0)