Skip to content

Commit 39d3bcb

Browse files
authored
Add initial savepoint infos and partition counts (#48)
1 parent d3d3069 commit 39d3bcb

File tree

1 file changed

+30
-1
lines changed

1 file changed

+30
-1
lines changed

src/main/scala/com/scylladb/migrator/Migrator.scala

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ package com.scylladb.migrator
33
import java.nio.charset.StandardCharsets
44
import java.nio.file.{ Files, Paths }
55
import java.util.concurrent.{ ScheduledThreadPoolExecutor, TimeUnit }
6-
76
import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter
7+
import com.datastax.spark.connector.rdd.partitioner.{ CassandraPartition, CqlTokenRange }
88
import com.datastax.spark.connector.rdd.partitioner.dht.Token
99
import com.datastax.spark.connector.writer._
1010
import com.scylladb.migrator.config._
@@ -75,6 +75,35 @@ object Migrator {
7575
Some(tokenRangeAccumulator)
7676
}
7777

78+
log.info(
79+
"We need to transfer: " + sourceDF.dataFrame.rdd.getNumPartitions + " partitions in total")
80+
81+
val partitions = sourceDF.dataFrame.rdd.partitions
82+
val cassandraPartitions = partitions.map(p => { p.asInstanceOf[CassandraPartition[_, _]] })
83+
var allTokenRanges = Set[(Token[_], Token[_])]()
84+
cassandraPartitions.foreach(p => {
85+
p.tokenRanges
86+
.asInstanceOf[Vector[CqlTokenRange[_, _]]]
87+
.foreach(tr => {
88+
val range =
89+
Set((tr.range.start.asInstanceOf[Token[_]], tr.range.end.asInstanceOf[Token[_]]))
90+
allTokenRanges = allTokenRanges ++ range
91+
})
92+
93+
})
94+
95+
log.info("All token ranges extracted from partitions size:" + allTokenRanges.size)
96+
97+
if (migratorConfig.skipTokenRanges != None) {
98+
log.info(
99+
"Savepoints array defined, size of the array: " + migratorConfig.skipTokenRanges.size)
100+
101+
val diff = allTokenRanges.diff(migratorConfig.skipTokenRanges)
102+
log.info("Diff ... total diff of full ranges to savepoints is: " + diff.size)
103+
log.debug("Dump of the missing tokens: ")
104+
log.debug(diff)
105+
}
106+
78107
log.info("Starting write...")
79108

80109
try {

0 commit comments

Comments
 (0)