-
-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Closed
Description
Module
Cassandra
Testcontainers version
1.20.2
Using the latest Testcontainers version?
Yes
Host OS
Linux
Host Arch
x86_64
Docker version
Client:
Version: 28.1.1+1
API version: 1.48
Go version: go1.23.8
Git commit: 068a01e
Built: Fri Jun 13 16:11:30 2025
OS/Arch: linux/amd64
Context: default
Server:
Engine:
Version: 28.1.1+1
API version: 1.49 (minimum version 1.24)
Go version: go1.23.8
Git commit: 01f442b
Built: Fri Jun 13 16:12:14 2025
OS/Arch: linux/amd64
Experimental: false
containerd:
Version: v1.7.27
GitCommit: 05044ec0a9a75232cad458027ca83437aae3f4da
runc:
Version: 1.2.6
GitCommit:
docker-init:
Version: 0.19.0
GitCommit: de40ad0What happened?
When starting 2 Cassandra containers to form a cluster, the second container fails to start with this exception:
ERROR [main] 2025-06-24 07:59:37,931 CassandraDaemon.java:911 - Fatal configuration error
org.apache.cassandra.exceptions.ConfigurationException: Bootstrapping to existing token 0 is not allowed (decommission/removenode the old node first).
at org.apache.cassandra.dht.BootStrapper.getSpecifiedTokens(BootStrapper.java:200)
at org.apache.cassandra.dht.BootStrapper.getBootstrapTokens(BootStrapper.java:168)
at org.apache.cassandra.service.StorageService.prepareForBootstrap(StorageService.java:1679)
at org.apache.cassandra.service.StorageService.joinTokenRing(StorageService.java:1058)
at org.apache.cassandra.service.StorageService.joinTokenRing(StorageService.java:1020)
at org.apache.cassandra.service.StorageService.initServer(StorageService.java:802)
at org.apache.cassandra.service.StorageService.initServer(StorageService.java:732)
at org.apache.cassandra.service.CassandraDaemon.setup(CassandraDaemon.java:420)
at org.apache.cassandra.service.CassandraDaemon.activate(CassandraDaemon.java:765)
at org.apache.cassandra.service.CassandraDaemon.main(CassandraDaemon.java:889)
Other tests done:
- The provided code works with a single Cassandra container
- adding 2 containers to a cluster using the same Cassandra image and the same environment (docker version etc...) works with simple docker (outside of testContainers) using:
docker network create cassnet
docker run -d --name cassandra1 --network cassnet cassandra:4.0.8
sleep 60
docker run -d --name cassandra2 --network cassnet -e CASSANDRA_SEEDS="cassandra1" cassandra:4.0.8
In the provided code :
- I have set a waiting strategy for the second node to wait for the availability of the first (tried the cassandra wait strategy and the log based one waiting for "Startup complete" message)
- I ensured the containers were not reused between tests
- I created two temp directories bound to /var/lib/cassandra in the containers to avoid data sharing as it looks like a data collision.
- I tried specifying the CASSANDRA_CLUSTER_NAME env variable for the same cluster name in both containers
It looks like a data collision (second node trying to read the same tokens), is this supposed to be supported ?
Relevant log output
Additional Information
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.cassandra;
import org.apache.flink.connector.testframe.TestResource;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.SocketOptions;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.CassandraContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.CassandraQueryWaitStrategy;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
/**
* Junit test environment that contains everything needed at the test suite level: testContainer
* setup, keyspace setup, Cassandra cluster/session management ClusterBuilder setup).
*/
@Testcontainers
public class CassandraTestEnvironment implements TestResource {
private static final Logger LOG = LoggerFactory.getLogger(CassandraTestEnvironment.class);
private static final String DOCKER_CASSANDRA_IMAGE = "cassandra:4.0.8";
private static final int CQL_PORT = 9042;
private static final int READ_TIMEOUT_MILLIS = 36000;
// flushing mem table to SS tables is an asynchronous operation that may take a while
private static final long FLUSH_MEMTABLES_DELAY = 30_000L;
public static final String KEYSPACE = "flink";
private static final String CREATE_KEYSPACE_QUERY =
"CREATE KEYSPACE "
+ KEYSPACE
+ " WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
public static final String SPLITS_TABLE = "flinksplits";
private static final String CREATE_SPLITS_TABLE_QUERY =
"CREATE TABLE " + KEYSPACE + "." + SPLITS_TABLE + " (id int PRIMARY KEY, counter int);";
private static final String INSERT_INTO_FLINK_SPLITS =
"INSERT INTO " + KEYSPACE + "." + SPLITS_TABLE + " (id, counter)" + " VALUES (%d, %d)";
private static final int NB_SPLITS_RECORDS = 1000;
@Container private final CassandraContainer cassandraContainer1;
@Container private final CassandraContainer cassandraContainer2;
boolean insertTestDataForSplitSizeTests;
private Cluster cluster;
private Session session;
private ClusterBuilder builderForReading;
private ClusterBuilder builderForWriting;
public CassandraTestEnvironment(boolean insertTestDataForSplitSizeTests) {
this.insertTestDataForSplitSizeTests = insertTestDataForSplitSizeTests;
Network network = Network.newNetwork();
cassandraContainer1 = new CassandraContainer(DOCKER_CASSANDRA_IMAGE);
final Path cassandra1Data;
try {
cassandra1Data = Files.createTempDirectory("cassandra1-data");
cassandraContainer1.withFileSystemBind(
cassandra1Data.toAbsolutePath().toString(), "/var/lib/cassandra");
} catch (IOException e) {
throw new RuntimeException(e);
}
cassandraContainer1
.withNetwork(network)
.withNetworkAliases("cassandra1")
.withReuse(false)
.withEnv("CASSANDRA_CLUSTER_NAME", "my-cluster");
// more generous timeouts
addJavaOpts(
cassandraContainer1,
"-Dcassandra.request_timeout_in_ms=30000",
"-Dcassandra.read_request_timeout_in_ms=15000",
"-Dcassandra.write_request_timeout_in_ms=6000");
cassandraContainer2 = new CassandraContainer(DOCKER_CASSANDRA_IMAGE);
try {
final Path cassandra2Data = Files.createTempDirectory("cassandra2-data");
cassandraContainer2.withFileSystemBind(
cassandra2Data.toAbsolutePath().toString(), "/var/lib/cassandra");
} catch (IOException e) {
throw new RuntimeException(e);
}
cassandraContainer2
.withNetwork(network)
.withNetworkAliases("cassandra2")
.withReuse(false)
.withEnv("CASSANDRA_SEEDS", "cassandra1")
.withEnv("CASSANDRA_CLUSTER_NAME", "my-cluster");
// more generous timeouts
addJavaOpts(
cassandraContainer2,
"-Dcassandra.request_timeout_in_ms=30000",
"-Dcassandra.read_request_timeout_in_ms=15000",
"-Dcassandra.write_request_timeout_in_ms=6000");
}
@Override
public void startUp() throws Exception {
startEnv();
}
@Override
public void tearDown() throws Exception {
stopEnv();
}
private static void addJavaOpts(GenericContainer<?> container, String... opts) {
String jvmOpts = container.getEnvMap().getOrDefault("JVM_OPTS", "");
container.withEnv("JVM_OPTS", jvmOpts + " " + StringUtils.join(opts, " "));
}
private void startEnv() throws Exception {
// configure container start to wait until cassandra is ready to receive queries
WaitStrategy waitStrategy =
new CassandraQueryWaitStrategy().withStartupTimeout(Duration.ofSeconds(60));
cassandraContainer1.waitingFor(waitStrategy);
cassandraContainer2.waitingFor(waitStrategy);
// start with retrials
cassandraContainer1.start();
cassandraContainer2.start();
cassandraContainer1.followOutput(
new Slf4jLogConsumer(LOG),
OutputFrame.OutputType.END,
OutputFrame.OutputType.STDERR,
OutputFrame.OutputType.STDOUT);
cassandraContainer2.followOutput(
new Slf4jLogConsumer(LOG),
OutputFrame.OutputType.END,
OutputFrame.OutputType.STDERR,
OutputFrame.OutputType.STDOUT);
cluster = cassandraContainer1.getCluster();
// ConsistencyLevel.ONE is the minimum level for reading
builderForReading =
createBuilderWithConsistencyLevel(
ConsistencyLevel.ONE,
cassandraContainer1.getHost(),
cassandraContainer1.getMappedPort(CQL_PORT));
// Lower consistency level ANY is only available for writing.
builderForWriting =
createBuilderWithConsistencyLevel(
ConsistencyLevel.ANY,
cassandraContainer1.getHost(),
cassandraContainer1.getMappedPort(CQL_PORT));
session = cluster.connect();
executeRequestWithTimeout(CREATE_KEYSPACE_QUERY);
// create a dedicated table for split size tests (to avoid having to flush with each test)
if (insertTestDataForSplitSizeTests) {
insertTestDataForSplitSizeTests();
}
}
private void insertTestDataForSplitSizeTests() throws Exception {
executeRequestWithTimeout(CREATE_SPLITS_TABLE_QUERY);
for (int i = 0; i < NB_SPLITS_RECORDS; i++) {
executeRequestWithTimeout(String.format(INSERT_INTO_FLINK_SPLITS, i, i));
}
flushMemTables(SPLITS_TABLE);
}
private void stopEnv() {
if (session != null) {
session.close();
}
if (cluster != null) {
cluster.close();
}
cassandraContainer1.stop();
cassandraContainer2.stop();
}
private ClusterBuilder createBuilderWithConsistencyLevel(
ConsistencyLevel consistencyLevel, String host, int port) {
return new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPointsWithPorts(new InetSocketAddress(host, port))
.withQueryOptions(
new QueryOptions()
.setConsistencyLevel(consistencyLevel)
.setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL))
.withSocketOptions(
new SocketOptions()
// default timeout x 3
.setConnectTimeoutMillis(15000)
// default timeout x3 and higher than
// request_timeout_in_ms at the cluster level
.setReadTimeoutMillis(READ_TIMEOUT_MILLIS))
.build();
}
};
}
/**
* Force the flush of cassandra memTables to SSTables in order to update size_estimates. It is
* needed for the tests because we just inserted records, we need to force cassandra to update
* size_estimates system table.
*/
void flushMemTables(String table) throws Exception {
cassandraContainer1.execInContainer("nodetool", "flush", KEYSPACE, table);
cassandraContainer2.execInContainer("nodetool", "flush", KEYSPACE, table);
Thread.sleep(FLUSH_MEMTABLES_DELAY);
}
public ResultSet executeRequestWithTimeout(String query) {
return session.execute(
new SimpleStatement(query).setReadTimeoutMillis(READ_TIMEOUT_MILLIS));
}
public ClusterBuilder getBuilderForReading() {
return builderForReading;
}
public ClusterBuilder getBuilderForWriting() {
return builderForWriting;
}
public Session getSession() {
return session;
}
}