Skip to content

Commit 0b0d9eb

Browse files
authored
Merge pull request #379 from databendlabs/fix/rm-multihost
feat: rm support for multihost and node discovery
2 parents 3ea1ad0 + 57a2091 commit 0b0d9eb

File tree

10 files changed

+73
-837
lines changed

10 files changed

+73
-837
lines changed

databend-jdbc/src/main/java/com/databend/jdbc/ConnectionProperties.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,13 @@ public final class ConnectionProperties {
2121
public static final ConnectionProperty<String> WAREHOUSE = new Warehouse();
2222
public static final ConnectionProperty<String> SSL_MODE = new SSLMode();
2323
static final ConnectionProperty<String> TENANT = new Tenant();
24+
public static final ConnectionProperty<String> DATABASE = new Database();
25+
// Deprecated multi-host knobs kept for compatibility to avoid hard failures.
2426
public static final ConnectionProperty<Integer> MAX_FAILOVER_RETRY = new MaxFailoverRetry();
2527
public static final ConnectionProperty<String> LOAD_BALANCING_POLICY = new LoadBalancingPolicy();
2628
public static final ConnectionProperty<Boolean> AUTO_DISCOVERY = new AutoDiscovery();
2729
public static final ConnectionProperty<Integer> NODE_DISCOVERY_INTERVAL = new NodeDiscoveryInterval();
2830
public static final ConnectionProperty<Boolean> ENABLE_MOCK = new EnableMock();
29-
public static final ConnectionProperty<String> DATABASE = new Database();
3031
public static final ConnectionProperty<String> ACCESS_TOKEN = new AccessToken();
3132

3233
public static final ConnectionProperty<Integer> CONNECTION_TIMEOUT = new ConnectionTimeout();
@@ -55,7 +56,6 @@ public final class ConnectionProperties {
5556
.add(WAREHOUSE)
5657
.add(SSL_MODE)
5758
.add(TENANT)
58-
.add(LOAD_BALANCING_POLICY)
5959
.add(DATABASE)
6060
.add(ACCESS_TOKEN)
6161
.add(PRESIGNED_URL_DISABLED)
@@ -67,6 +67,7 @@ public final class ConnectionProperties {
6767
.add(MAX_ROWS_PER_PAGE)
6868
.add(SESSION_SETTINGS)
6969
.build();
70+
// Deprecated multi-host properties are intentionally excluded from ALL_PROPERTIES so we can detect user-specified values.
7071
private static final Map<String, String> DEFAULTS;
7172

7273
public static Set<ConnectionProperty<?>> allProperties() {
@@ -178,6 +179,7 @@ public EnableMock() {
178179
}
179180
}
180181

182+
181183
private static class AccessToken
182184
extends AbstractConnectionProperty<String> {
183185
public AccessToken() {

databend-jdbc/src/main/java/com/databend/jdbc/DatabendClientLoadBalancingPolicy.java

Lines changed: 0 additions & 122 deletions
This file was deleted.

databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java

Lines changed: 1 addition & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.io.File;
2323
import java.io.IOException;
2424
import java.io.InputStream;
25-
import java.net.ConnectException;
2625
import java.net.URI;
2726
import java.nio.charset.StandardCharsets;
2827
import java.sql.Array;
@@ -86,7 +85,6 @@ public class DatabendConnection implements Connection, DatabendConnectionExtensi
8685
private final OkHttpClient httpClient;
8786
private final ConcurrentHashMap<DatabendStatement, Boolean> statements = new ConcurrentHashMap<>();
8887
private final DatabendDriverUri driverUri;
89-
private boolean autoDiscovery;
9088
private final AtomicReference<DatabendSession> session = new AtomicReference<>();
9189

9290
private String routeHint;
@@ -131,8 +129,6 @@ private void initializeFileHandler() {
131129
this.driverUri = uri;
132130
this.schema.set(uri.getDatabase());
133131
this.routeHint = randRouteHint();
134-
// it maybe closed due to unsupported server versioning.
135-
this.autoDiscovery = uri.autoDiscovery();
136132
DatabendSession session = new DatabendSession.Builder().setDatabase(this.getSchema()).setSettings(uri.getSessionSettings()).build();
137133
this.setSession(session);
138134

@@ -454,10 +450,6 @@ public int getHoldability() throws SQLException {
454450
return 0;
455451
}
456452

457-
public int getMaxFailoverRetries() {
458-
return this.driverUri.getMaxFailoverRetry();
459-
}
460-
461453
@Override
462454
@NotImplemented
463455
public void setHoldability(int holdability) throws SQLException {
@@ -650,10 +642,6 @@ boolean copyPurge() {
650642
return this.driverUri.copyPurge();
651643
}
652644

653-
boolean isAutoDiscovery() {
654-
return this.autoDiscovery;
655-
}
656-
657645
String warehouse() {
658646
return this.driverUri.getWarehouse();
659647
}
@@ -734,98 +722,6 @@ private DatabendClient startQueryInternal(String sql, StageAttachment attach) th
734722
ClientSettings settings = sb.build();
735723
return new DatabendClientV1(httpClient, sql, settings, this, lastNodeID);
736724
}
737-
/**
738-
* Retry executing a query in case of connection errors. fail over mechanism is used to retry the query when connect error occur
739-
* It will find next target host based on configured Load balancing Policy.
740-
*
741-
* @param sql The SQL statement to execute.
742-
* @param attach The stage attachment to use for the query.
743-
* @return A DatabendClient instance representing the successful query execution.
744-
* @throws SQLException If the query fails after retrying the specified number of times.
745-
* @see DatabendClientLoadBalancingPolicy
746-
*/
747-
DatabendClient startQueryWithFailover(String sql, StageAttachment attach) throws SQLException {
748-
if (this.driverUri.getNodes().getUris().size() == 1 && !this.autoDiscovery) {
749-
return this.startQueryInternal(sql, attach);
750-
}
751-
int maxRetries = getMaxFailoverRetries();
752-
SQLException lastException = null;
753-
String lastHost = null;
754-
755-
for (int attempt = 0; attempt <= maxRetries; attempt++) {
756-
try {
757-
String queryId = UUID.randomUUID().toString().replace("-", "");
758-
String candidateHost = selectHostForQuery(queryId);
759-
if (candidateHost.equals(lastHost)) {
760-
continue;
761-
}
762-
lastHost = candidateHost;
763-
764-
// configure the client settings
765-
ClientSettings.Builder sb = this.makeClientSettings(queryId, candidateHost);
766-
if (attach != null) {
767-
sb.setStageAttachment(attach);
768-
}
769-
ClientSettings settings = sb.build();
770-
771-
logger.log(Level.FINE, "execute query #{0}: SQL: {1} host: {2}",
772-
new Object[]{attempt + 1, sql, settings.getHost()});
773-
774-
// need to retry the auto discovery in case of connection error
775-
if (this.autoDiscovery) {
776-
tryAutoDiscovery(httpClient, settings);
777-
}
778-
779-
return new DatabendClientV1(httpClient, sql, settings, this, lastNodeID);
780-
} catch (Exception e) {
781-
// handle the exception and retry the query
782-
if (shouldRetryException(e) && attempt < maxRetries) {
783-
lastException = wrapException("query failed", sql, e);
784-
try {
785-
// back off retry
786-
Thread.sleep(Math.min(100 * (1 << attempt), 5000));
787-
} catch (InterruptedException ie) {
788-
Thread.currentThread().interrupt();
789-
throw wrapException("query interrupt", sql, ie);
790-
}
791-
} else {
792-
// throw the exception
793-
if (e instanceof SQLException) {
794-
throw (SQLException) e;
795-
} else {
796-
throw wrapException("Query failed,no need to retry", sql, e);
797-
}
798-
}
799-
}
800-
}
801-
802-
throw new SQLException("after" + maxRetries + "times retry and failed: SQL: " + sql, lastException);
803-
}
804-
805-
private boolean shouldRetryException(Exception e) {
806-
Throwable cause = e.getCause();
807-
// connection error
808-
if (cause instanceof ConnectException) {
809-
return true;
810-
}
811-
812-
if (e instanceof IOException) {
813-
return (e.getMessage().contains("unexpected end of stream") ||
814-
e.getMessage().contains("timeout") ||
815-
e.getMessage().contains("connection refused"));
816-
}
817-
818-
if (e instanceof RuntimeException) {
819-
String message = e.getMessage();
820-
return message != null && (
821-
message.contains("520") ||
822-
message.contains("timeout") ||
823-
message.contains("retry")
824-
);
825-
}
826-
827-
return false;
828-
}
829725

830726
private String selectHostForQuery(String queryId) {
831727
String candidateHost = this.driverUri.getUri(queryId).toString();
@@ -844,48 +740,12 @@ private String selectHostForQuery(String queryId) {
844740
return candidateHost;
845741
}
846742

847-
private SQLException wrapException(String prefix, String sql, Exception e) {
848-
String message = prefix + ": SQL: " + sql;
849-
if (e.getMessage() != null) {
850-
message += " - " + e.getMessage();
851-
}
852-
if (e.getCause() != null) {
853-
message += " (Reason: " + e.getCause().getMessage() + ")";
854-
}
855-
return new SQLException(message, e);
856-
}
857-
/**
858-
* Try to auto discovery the databend nodes it will log exceptions when auto discovery failed and not affect real query execution
859-
*
860-
* @param client the http client to query on
861-
* @param settings the client settings to use
862-
*/
863-
void tryAutoDiscovery(OkHttpClient client, ClientSettings settings) {
864-
if (this.autoDiscovery) {
865-
if (this.driverUri.enableMock()) {
866-
settings.getAdditionalHeaders().put("~mock.unsupported.discovery", "true");
867-
}
868-
DatabendNodes nodes = this.driverUri.getNodes();
869-
if (nodes != null && nodes.needDiscovery()) {
870-
try {
871-
nodes.discoverUris(client, settings);
872-
} catch (UnsupportedOperationException e) {
873-
logger.log(Level.WARNING, "Current Query Node do not support auto discovery, close the functionality: " + e.getMessage());
874-
this.autoDiscovery = false;
875-
} catch (Exception e) {
876-
logger.log(Level.FINE, "Error auto discovery: " + " cause: " + e.getCause() + " message: " + e.getMessage());
877-
}
878-
}
879-
}
880-
881-
}
882-
883743
DatabendClient startQuery(String sql) throws SQLException {
884744
return startQuery(sql, null);
885745
}
886746

887747
DatabendClient startQuery(String sql, StageAttachment attach) throws SQLException {
888-
DatabendClient client = startQueryWithFailover(sql, attach);
748+
DatabendClient client = startQueryInternal(sql, attach);
889749
Long timeout = client.getResults().getResultTimeoutSecs();
890750
if (timeout != null && timeout != 0) {
891751
heartbeatManager.onStartQuery(timeout);

0 commit comments

Comments
 (0)