-
Notifications
You must be signed in to change notification settings - Fork 5.5k
feat(plugin-cassandra): Upgrade to Cassandra Java Driver 4.x #27029
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
msmygit
wants to merge
7
commits into
prestodb:master
Choose a base branch
from
msmygit:attempt2
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Contributor
Reviewer's GuideUpgrades the Presto Cassandra connector from the DataStax 3.x driver to the Apache Cassandra Java Driver 4.x, refactoring session/metadata APIs, query building, type mappings, retries, and configuration to align with the new driver while adding Astra/secure-connect support and updating tests and dependencies. Class diagram for updated Cassandra session and client wiringclassDiagram
class CassandraClientModule {
+createCassandraSession(ConnectorId connectorId, CassandraClientConfig config, JsonCodec_List_ExtraColumnMetadata extraColumnMetadataCodec) CassandraSession
}
class CassandraSession {
<<interface>>
+String PRESTO_COMMENT_METADATA
+String getCassandraVersion()
+String getPartitioner()
+Set~TokenRange~ getTokenRanges()
+Set~Node~ getReplicas(String caseSensitiveSchemaName, TokenRange tokenRange)
+Set~Node~ getReplicas(String caseSensitiveSchemaName, ByteBuffer partitionKey)
+String getCaseSensitiveSchemaName(String caseInsensitiveSchemaName)
+List~String~ getCaseSensitiveSchemaNames()
+List~String~ getCaseSensitiveTableNames(String caseSensitiveSchemaName)
+CassandraTable getTable(SchemaTableName schemaTableName)
+List~SizeEstimate~ getSizeEstimates(String keyspaceName, String tableName)
+PreparedStatement prepare(String statement)
+ResultSet execute(String cql, Object values)
+ResultSet execute(Statement statement)
}
class NativeCassandraSession {
-String connectorId
-JsonCodec_List_ExtraColumnMetadata extraColumnMetadataCodec
-ReopeningSession reopeningSession
-Supplier~CqlSession~ session
-Duration noHostAvailableRetryTimeout
-static boolean caseSensitiveNameMatchingEnabled
+NativeCassandraSession(String connectorId, JsonCodec_List_ExtraColumnMetadata extraColumnMetadataCodec, ReopeningSession reopeningSession, Duration noHostAvailableRetryTimeout, boolean caseSensitiveNameMatchingEnabled)
+String getCassandraVersion()
+String getPartitioner()
+Set~TokenRange~ getTokenRanges()
+Set~Node~ getReplicas(String caseSensitiveSchemaName, TokenRange tokenRange)
+Set~Node~ getReplicas(String caseSensitiveSchemaName, ByteBuffer partitionKey)
+String getCaseSensitiveSchemaName(String caseSensitiveSchemaName)
+List~String~ getCaseSensitiveSchemaNames()
+List~String~ getCaseSensitiveTableNames(String caseSensitiveSchemaName)
+CassandraTable getTable(SchemaTableName schemaTableName)
+boolean isMaterializedView(SchemaTableName schemaTableName)
+List~SizeEstimate~ getSizeEstimates(String keyspaceName, String tableName)
+PreparedStatement prepare(String statement)
+ResultSet execute(String cql, Object values)
+ResultSet execute(Statement statement)
-T executeWithSession(SessionCallable sessionCallable)
}
class ReopeningSession {
-CqlSession delegate
-boolean closed
-Supplier~CqlSession~ sessionSupplier
+ReopeningSession(Supplier~CqlSession~ sessionSupplier)
+synchronized CqlSession get()
+synchronized CqlSession getSession()
+synchronized void close()
+synchronized boolean isClosed()
}
class CassandraClientConfig {
-DefaultConsistencyLevel consistencyLevel
-int fetchSize
-List~String~ contactPoints
-int nativeProtocolPort
-boolean allowDropTable
-String username
-String password
-Duration clientReadTimeout
-Duration clientConnectTimeout
-Integer clientSoLinger
-RetryPolicyType retryPolicy
-boolean useDCAware
-String dcAwareLocalDC
-int dcAwareUsedHostsPerRemoteDc
-boolean dcAwareAllowRemoteDCsForLocal
-boolean useTokenAware
-boolean tokenAwareShuffleReplicas
-Duration noHostAvailableRetryTimeout
-int speculativeExecutionLimit
-Duration speculativeExecutionDelay
-boolean tlsEnabled
-File truststorePath
-String truststorePassword
-File keystorePath
-String keystorePassword
-File secureConnectBundle
-boolean caseSensitiveNameMatchingEnabled
+DefaultConsistencyLevel getConsistencyLevel()
+CassandraClientConfig setConsistencyLevel(DefaultConsistencyLevel level)
+List~String~ getContactPoints()
+CassandraClientConfig setContactPoints(String commaSeparatedList)
+Optional~File~ getSecureConnectBundle()
+CassandraClientConfig setSecureConnectBundle(File secureConnectBundle)
+Duration getNoHostAvailableRetryTimeout()
+boolean isTlsEnabled()
+String getDcAwareLocalDC()
+boolean isUseDCAware()
+boolean isUseTokenAware()
+RetryPolicyType getRetryPolicy()
+int getSpeculativeExecutionLimit()
+Duration getSpeculativeExecutionDelay()
}
class RetryPolicyType {
<<enum>>
+DEFAULT
+BACKOFF
+DOWNGRADING_CONSISTENCY
+FALLTHROUGH
-Class_retryPolicy_ policyClass
+Class_retryPolicy_ getPolicyClass()
}
class BackoffRetryPolicy {
<<implements RetryPolicy>>
-static int MAX_RETRIES
+RetryDecision onReadTimeout(Request request, ConsistencyLevel cl, int blockFor, int received, boolean dataPresent, int retryCount)
+RetryDecision onWriteTimeout(Request request, ConsistencyLevel cl, WriteType writeType, int blockFor, int received, int retryCount)
+RetryDecision onUnavailable(Request request, ConsistencyLevel cl, int required, int alive, int retryCount)
+RetryDecision onRequestAborted(Request request, Throwable error, int retryCount)
+RetryDecision onErrorResponse(Request request, CoordinatorException error, int retryCount)
+void close()
}
CassandraClientModule ..> CassandraClientConfig : uses
CassandraClientModule ..> ReopeningSession : creates
CassandraClientModule ..> CqlSession : builds
CassandraClientModule ..> RetryPolicyType : reads
CassandraClientModule ..> DefaultLoadBalancingPolicy : configures
NativeCassandraSession ..> CqlSession : uses
NativeCassandraSession ..> ReopeningSession : holds
NativeCassandraSession ..|> CassandraSession
ReopeningSession ..> CqlSession : wraps
CassandraClientConfig ..> RetryPolicyType
RetryPolicyType ..> RetryPolicy : policyClass
BackoffRetryPolicy ..|> RetryPolicy
Class diagram for updated Cassandra type mapping and value handlingclassDiagram
class CassandraType {
<<enum>>
ASCII
BIGINT
BLOB
BOOLEAN
COUNTER
CUSTOM
DATE
DECIMAL
DOUBLE
FLOAT
INET
INT
LIST
MAP
SET
SMALLINT
TEXT
TIMESTAMP
TIMEUUID
TINYINT
TUPLE
UUID
VARCHAR
VARINT
-Type nativeType
-Class_java_javaType
+int getTypeArgumentSize()
+static CassandraType getCassandraType(DataType dataType)
+static NullableValue getColumnValue(Row row, int position, CassandraType cassandraType, List~CassandraType~ typeArguments)
+static NullableValue getColumnValueForPartitionKey(Row row, int position, CassandraType cassandraType, List~CassandraType~ typeArguments)
+static String getColumnValueForCql(Row row, int position, CassandraType cassandraType, List~CassandraType~ typeArguments)
+Object getJavaValue(Object nativeValue)
+static CassandraType toCassandraType(Type type)
-static String bytesToHex(ByteBuffer buffer)
}
class NativeCassandraSession {
-CassandraColumnHandle buildColumnHandle(TableMetadata tableMetadata, ColumnMetadata columnMeta, boolean partitionKey, boolean clusteringKey, int ordinalPosition, boolean hidden) CassandraColumnHandle
}
class CassandraPageSink {
-CassandraSession cassandraSession
-PreparedStatement insert
-List~Type~ columnTypes
-boolean generateUUID
+CassandraPageSink(CassandraSession cassandraSession, String schemaName, String tableName, List~String~ columnNames, List~Type~ columnTypes, boolean generateUUID)
+Collection~Slice~ finish()
+void appendPage(Page page)
-Object toCassandraObject(Type type, Block block, int position)
}
class CassandraRecordCursor {
-List~FullCassandraType~ fullCassandraTypes
-ResultSet rs
-Row currentRow
-long count
-Iterator_Row_ iterator
+CassandraRecordCursor(CassandraSession cassandraSession, List~FullCassandraType~ fullCassandraTypes, String cql)
+boolean advanceNextPosition()
+double getDouble(int i)
+long getLong(int i)
}
CassandraType ..> DataType : maps
CassandraType ..> Row : reads
CassandraType ..> NullableValue : returns
CassandraType ..> ByteBuffer : uses
CassandraType ..> Instant : uses
CassandraType ..> LocalDate : uses
NativeCassandraSession ..> CassandraType : uses
CassandraPageSink ..> CassandraType : via Type mapping
CassandraRecordCursor ..> CassandraType : via FullCassandraType
File-Level Changes
Assessment against linked issues
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
d43e8ab to
3f521cc
Compare
fb5735d to
38a3f11
Compare
6cadce7 to
a87b630
Compare
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Description
Upgrade to latest Cassandra Java Driver 4.x
Motivation and Context
Cassandra Java Driver
3.xversions has reached EOL and is mostly not receiving any fixes or features. DataStax has already donated it to the ASF and is spearheading the4.xseries.Resolves #26852 #26762
Impact
This will get presto to a more stable and future-forward and supported non-EOL Cassandra Java driver.
Test Plan
All relevant tests are now updated to leverage the latest Cassandra Java driver flavor
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.
Summary by Sourcery
Upgrade the Cassandra connector to use the Cassandra Java Driver 4.x and adapt the connector’s session, metadata, type handling, and configuration to the new driver APIs, including support for DataStax Astra secure connect bundles.
New Features:
Enhancements:
Build: