Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@
<dep.datasketches-memory.version>2.2.0</dep.datasketches-memory.version>
<dep.datasketches-java.version>5.0.1</dep.datasketches-java.version>

<dep.cassandra-java-driver.version>4.19.2</dep.cassandra-java-driver.version>

<release.autoPublish>true</release.autoPublish>
</properties>

Expand Down Expand Up @@ -2688,9 +2690,22 @@
</dependency>

<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.11.5</version>
<groupId>org.apache.cassandra</groupId>
<artifactId>java-driver-core</artifactId>
<version>${dep.cassandra-java-driver.version}</version>
</dependency>

<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>java-driver-query-builder</artifactId>
<version>${dep.cassandra-java-driver.version}</version>
</dependency>

<dependency>
<groupId>at.yawk.lz4</groupId>
<artifactId>lz4-java</artifactId>
<version>1.10.1</version>
<scope>runtime</scope>
</dependency>

<dependency>
Expand Down
25 changes: 23 additions & 2 deletions presto-cassandra/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,21 @@

<dependencies>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<groupId>org.apache.cassandra</groupId>
<artifactId>java-driver-core</artifactId>
</dependency>

<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>java-driver-query-builder</artifactId>
</dependency>

<!-- LZ4 compression library required by Cassandra driver 4.x at runtime -->
<!-- https://github.com/apache/cassandra-java-driver/blob/4.19.2/core/src/main/resources/reference.conf#L1117 -->
<dependency>
<groupId>at.yawk.lz4</groupId>
<artifactId>lz4-java</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
Expand All @@ -39,6 +52,11 @@
<artifactId>jakarta.annotation-api</artifactId>
</dependency>

<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>bootstrap</artifactId>
Expand Down Expand Up @@ -248,6 +266,9 @@
<ignoredNonTestScopedDependencies>
<ignoredNonTestScopedDependency>com.fasterxml.jackson.core:jackson-databind</ignoredNonTestScopedDependency>
</ignoredNonTestScopedDependencies>
<ignoredUnusedDeclaredDependencies>
<ignoredUnusedDeclaredDependency>com.google.code.findbugs:jsr305</ignoredUnusedDeclaredDependency>
</ignoredUnusedDeclaredDependencies>
</configuration>
</plugin>
</plugins>
Expand Down
111 changes: 111 additions & 0 deletions presto-cassandra/src/main/java/com/TimestampCodec.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.cassandra;

import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.type.DataType;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
import com.datastax.oss.driver.api.core.type.reflect.GenericType;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.time.Instant;

/**
* Custom codec for converting between Cassandra TIMESTAMP and java.sql.Timestamp.
* Driver 4.x removed built-in support for java.sql.Timestamp, only supporting java.time.Instant.
* This codec bridges the gap for legacy code that uses java.sql.Timestamp.
*/
public class TimestampCodec
implements TypeCodec<Timestamp>
{
public static final TimestampCodec INSTANCE = new TimestampCodec();

private TimestampCodec()
{
// Singleton
}

@Nonnull
@Override
public GenericType<Timestamp> getJavaType()
{
return GenericType.of(Timestamp.class);
}

@Nonnull
@Override
public DataType getCqlType()
{
return DataTypes.TIMESTAMP;
}

@Nullable
@Override
public ByteBuffer encode(@Nullable Timestamp value, @Nonnull ProtocolVersion protocolVersion)
{
if (value == null) {
return null;
}
// Convert Timestamp to Instant, then encode as milliseconds since epoch
Instant instant = value.toInstant();
long millisSinceEpoch = instant.toEpochMilli();
ByteBuffer buffer = ByteBuffer.allocate(8);
buffer.putLong(0, millisSinceEpoch);
return buffer;
}

@Nullable
@Override
public Timestamp decode(@Nullable ByteBuffer bytes, @Nonnull ProtocolVersion protocolVersion)
{
if (bytes == null || bytes.remaining() == 0) {
return null;
}
// Decode milliseconds since epoch and convert to Timestamp
long millisSinceEpoch = bytes.getLong(bytes.position());
return new Timestamp(millisSinceEpoch);
}

@Nonnull
@Override
public String format(@Nullable Timestamp value)
{
if (value == null) {
return "NULL";
}
return value.toInstant().toString();
}

@Nullable
@Override
public Timestamp parse(@Nullable String value)
{
if (value == null || value.isEmpty() || value.equalsIgnoreCase("NULL")) {
return null;
}
try {
// Parse as ISO-8601 instant
Instant instant = Instant.parse(value);
return Timestamp.from(instant);
}
catch (Exception e) {
throw new IllegalArgumentException("Cannot parse timestamp value: " + value, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,63 +13,99 @@
*/
package com.facebook.presto.cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.WriteType;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.policies.DefaultRetryPolicy;
import com.datastax.driver.core.policies.RetryPolicy;
import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.retry.RetryDecision;
import com.datastax.oss.driver.api.core.retry.RetryPolicy;
import com.datastax.oss.driver.api.core.servererrors.CoordinatorException;
import com.datastax.oss.driver.api.core.servererrors.WriteType;
import com.datastax.oss.driver.api.core.session.Request;

import javax.annotation.Nonnull;

import java.util.concurrent.ThreadLocalRandom;

public class BackoffRetryPolicy
implements RetryPolicy
{
public static final BackoffRetryPolicy INSTANCE = new BackoffRetryPolicy();
private static final int MAX_RETRIES = 10;

private BackoffRetryPolicy() {}

@Override
public RetryDecision onUnavailable(Statement statement, ConsistencyLevel consistencyLevel, int requiredReplica, int aliveReplica, int retries)
public RetryDecision onReadTimeout(
@Nonnull Request request,
@Nonnull ConsistencyLevel cl,
int blockFor,
int received,
boolean dataPresent,
int retryCount)
{
if (retries >= 10) {
return RetryDecision.rethrow();
// Delegate to default behavior for read timeouts
return RetryDecision.RETHROW;
}

@Override
public RetryDecision onWriteTimeout(
@Nonnull Request request,
@Nonnull ConsistencyLevel cl,
@Nonnull WriteType writeType,
int blockFor,
int received,
int retryCount)
{
// Delegate to default behavior for write timeouts
return RetryDecision.RETHROW;
}

@Override
public RetryDecision onUnavailable(
@Nonnull Request request,
@Nonnull ConsistencyLevel cl,
int required,
int alive,
int retryCount)
{
if (retryCount >= MAX_RETRIES) {
return RetryDecision.RETHROW;
}

try {
// Exponential backoff with jitter
int jitter = ThreadLocalRandom.current().nextInt(100);
int delay = (100 * (retries + 1)) + jitter;
int delay = (100 * (retryCount + 1)) + jitter;
Thread.sleep(delay);
return RetryDecision.retry(consistencyLevel);
return RetryDecision.RETRY_SAME;
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return RetryDecision.rethrow();
return RetryDecision.RETHROW;
}
}

@Override
public RetryDecision onReadTimeout(Statement statement, ConsistencyLevel cl, int requiredResponses, int receivedResponses, boolean dataRetrieved, int nbRetry)
public RetryDecision onRequestAborted(
@Nonnull Request request,
@Nonnull Throwable error,
int retryCount)
{
return DefaultRetryPolicy.INSTANCE.onReadTimeout(statement, cl, requiredResponses, receivedResponses, dataRetrieved, nbRetry);
// Try next host on request aborted
return RetryDecision.RETRY_NEXT;
}

@Override
public RetryDecision onWriteTimeout(Statement statement, ConsistencyLevel cl, WriteType writeType, int requiredAcks, int receivedAcks, int nbRetry)
public RetryDecision onErrorResponse(
@Nonnull Request request,
@Nonnull CoordinatorException error,
int retryCount)
{
return DefaultRetryPolicy.INSTANCE.onWriteTimeout(statement, cl, writeType, requiredAcks, receivedAcks, nbRetry);
// Try next host on coordinator errors
return RetryDecision.RETRY_NEXT;
}

@Override
public RetryDecision onRequestError(Statement statement, ConsistencyLevel cl, DriverException e, int nbRetry)
public void close()
{
return RetryDecision.tryNextHost(cl);
// No resources to clean up
}

@Override
public void init(Cluster cluster) {}

@Override
public void close() {}
}
Loading
Loading