Skip to content

Commit 675f6c3

Browse files
authored
SPARKC-651 move to Github Actions (#1312)
* SPARKC-651 fix BucketingRangeIndexSpec * SPARKC-651 move to github actions * SPARKC-651 use JAVA_HOME if CCM_JAVA_HOME is missing * SPARKC-651 add waitForNode method with configurable timeout ccm waits only for 30 seconds, the new method allows us to have configurable timeout which may be useful for slower environments. * SPARKC-651 add --skip-wait-other-notice dse wait-other-notice causes the start process to hang for 10 minutes and eventually fail the build. Don't wait for other nodes to notice the new coming node. * SPARKC-651 add job matrix 2.5.x will be tested against C*/DSE versioned listed in main.yml
1 parent ad26072 commit 675f6c3

File tree

8 files changed

+157
-65
lines changed

8 files changed

+157
-65
lines changed

.github/workflows/main.yml

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
name: CI
2+
3+
on:
4+
# Triggers the workflow on push or pull request events but only for the master branch
5+
push:
6+
branches: [ b2.5, b3.0, master]
7+
pull_request:
8+
branches: [ b2.5, b3.0, master, SPARKC-651 ]
9+
10+
workflow_dispatch:
11+
12+
jobs:
13+
build:
14+
runs-on: ubuntu-latest
15+
strategy:
16+
matrix:
17+
scala: [2.11.12]
18+
db-version: [6.8.9, 3.11.9, 4.0-beta4]
19+
20+
steps:
21+
- uses: actions/checkout@v2
22+
23+
- name: ccm pip installation
24+
uses: BSFishy/pip-action@v1
25+
with:
26+
packages: git+git://github.com/riptano/ccm.git@435f3210e16d0b648fbf33d6390d5ab4c9e630d4
27+
28+
- name: Setup Scala
29+
uses: olafurpg/setup-scala@v10
30+
with:
31+
java-version: "[email protected]"
32+
33+
- name: sbt tests
34+
env:
35+
TEST_PARALLEL_TASKS: 1
36+
CCM_CASSANDRA_VERSION: ${{ matrix.db-version }}
37+
PUBLISH_VERSION: test
38+
run: sbt/sbt ++${{ matrix.scala }} test it:test

.travis.yml

Lines changed: 0 additions & 44 deletions
This file was deleted.

connector/src/test/scala/com/datastax/spark/connector/rdd/partitioner/BucketingRangeIndexSpec.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,15 +116,16 @@ class BucketingRangeIndexSpec extends FlatSpec with ScalaCheckPropertyChecks wit
116116
private type LV = Long
117117

118118
private val longTokenFactory = Murmur3TokenFactory
119-
120119
private val LongFullRange = TokenRange[LV, LT](
121120
longTokenFactory.minToken,
122121
longTokenFactory.minToken,
123122
Set.empty,
124123
Murmur3TokenFactory)
125124

125+
"Murmur3Bucketing" should "map all tokens to a single wrapping range" in {
126+
implicit val tokenOrdering: Ordering[LT] = longTokenFactory.tokenOrdering
127+
implicit val tokenBucketing: MonotonicBucketing[LT] = longTokenFactory.tokenBucketing
126128

127-
"Murmur3Bucketing" should " map all tokens to a single wrapping range" in {
128129
val singleRangeBucketing =
129130
new BucketingRangeIndex[LTR, LT](Seq(TokenRangeWithPartitionIndex(LongFullRange, 0)))
130131

@@ -148,6 +149,9 @@ class BucketingRangeIndexSpec extends FlatSpec with ScalaCheckPropertyChecks wit
148149

149150

150151
"RandomBucketing" should " map all tokens to a single wrapping range" in {
152+
implicit val tokenOrdering: Ordering[BT] = bigTokenFactory.tokenOrdering
153+
implicit val tokenBucketing: MonotonicBucketing[BT] = bigTokenFactory.tokenBucketing
154+
151155
val singleRangeBucketing =
152156
new BucketingRangeIndex[BITR, BT](Seq(TokenRangeWithPartitionIndex(bigFullRange, 0)))
153157

project/Publishing.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import scala.sys.process._
1010
object Publishing extends sbt.librarymanagement.DependencyBuilders {
1111

1212
val Version: String = {
13-
sys.props.get("publish.version").getOrElse("git describe --tags" !!).stripLineEnd.stripPrefix("v")
13+
sys.props.get("publish.version").orElse(sys.env.get("PUBLISH_VERSION")).getOrElse("git describe --tags" !!).stripLineEnd.stripPrefix("v")
1414
}
1515

1616
val altReleaseDeploymentRepository = sys.props.get("publish.repository.name")

project/Testing.scala

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import java.lang.management.ManagementFactory
2-
32
import com.sun.management.OperatingSystemMXBean
43
import sbt.Tests._
54
import sbt.{ForkOptions, TestDefinition}
65

6+
import scala.util.Try
7+
78
object Testing {
89

910
private def interfacesImplementingFixture(c: Class[_], fixture: Class[_]): Seq[Class[_]] = {
@@ -42,16 +43,25 @@ object Testing {
4243
}
4344
}
4445

46+
/**
47+
* Guesses if the given version is DSE or not.
48+
* Internally it checks if the major version component is ge 6 which should suffice for a long time.
49+
* CCM_IS_DSE env setting takes precedence over this guess.
50+
*/
51+
private def isDse(version: Option[String]): Boolean = {
52+
version.flatMap(v => Try(v.split('.').head.toInt >= 6).toOption).getOrElse(false)
53+
}
54+
4555
def getCCMJvmOptions = {
46-
val ccmCassVersion = sys.env.get("CCM_CASSANDRA_VERSION").map(version => s"-Dccm.version=$version")
47-
val ccmCassVersion2 = sys.env.get("CCM_CASSANDRA_VERSION").map(version => s"-Dcassandra.version=$version")
48-
val dseInVersion = if (sys.env.get("CCM_CASSANDRA_VERSION").contains("dse")) Some(true) else None
49-
val ccmDse = sys.env.get("CCM_IS_DSE").orElse(dseInVersion).map(isDSE => s"-Dccm.dse=$isDSE")
50-
val ccmDse2 = sys.env.get("CCM_IS_DSE").orElse(dseInVersion).map(isDSE => s"-Ddse=$isDSE")
56+
val dbVersion = sys.env.get("CCM_CASSANDRA_VERSION")
57+
val ccmCassVersion = dbVersion.map(version => s"-Dccm.version=$version")
58+
val ccmCassVersion2 = dbVersion.map(version => s"-Dcassandra.version=$version")
59+
val ccmDse = sys.env.get("CCM_IS_DSE").map(_.toLowerCase == "true").orElse(Some(isDse(dbVersion)))
60+
.map(isDSE => s"-Dccm.dse=$isDSE")
5161
val cassandraDirectory = sys.env.get("CCM_INSTALL_DIR").map(dir => s"-Dcassandra.directory=$dir")
52-
val ccmJava = sys.env.get("CCM_JAVA_HOME").map(dir => s"-Dccm.java.home=$dir")
53-
val ccmPath = sys.env.get("CCM_JAVA_HOME").map(dir => s"-Dccm.path=$dir/bin")
54-
val options = Seq(ccmCassVersion, ccmDse, ccmCassVersion2, ccmDse2, cassandraDirectory, ccmJava, ccmPath)
62+
val ccmJava = sys.env.get("CCM_JAVA_HOME").orElse(sys.env.get("JAVA_HOME")).map(dir => s"-Dccm.java.home=$dir")
63+
val ccmPath = sys.env.get("CCM_JAVA_HOME").orElse(sys.env.get("JAVA_HOME")).map(dir => s"-Dccm.path=$dir/bin")
64+
val options = Seq(ccmCassVersion, ccmDse, ccmCassVersion2, cassandraDirectory, ccmJava, ccmPath)
5565
options
5666
}
5767

test-support/src/main/scala/com/datastax/spark/connector/ccm/CcmConfig.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ case class CcmConfig(
2222
version: Version = Version.parse(System.getProperty("ccm.version", "3.11.6")),
2323
installDirectory: Option[String] = Option(System.getProperty("ccm.directory")),
2424
installBranch: Option[String] = Option(System.getProperty("ccm.branch")),
25-
dseEnabled: Boolean = Option(System.getProperty("ccm.dse")).isDefined,
25+
dseEnabled: Boolean = Option(System.getProperty("ccm.dse")).exists(_.toLowerCase == "true"),
2626
mode: ClusterMode = ClusterModes.fromEnvVar) {
2727

2828
def withSsl(keystorePath: String, keystorePassword: String): CcmConfig = {

test-support/src/main/scala/com/datastax/spark/connector/ccm/mode/ClusterModeExecutor.scala

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package com.datastax.spark.connector.ccm.mode
22

3-
import java.nio.file.Path
4-
53
import com.datastax.spark.connector.ccm.{CcmBridge, CcmConfig}
64
import org.apache.commons.exec.CommandLine
75

6+
import java.nio.file.{Path, Paths}
7+
import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration}
8+
import scala.util.control.NonFatal
9+
810
private[ccm] trait ClusterModeExecutor {
911

1012
protected val config: CcmConfig
@@ -32,4 +34,46 @@ private[ccm] trait ClusterModeExecutor {
3234
CcmBridge.execute(cli)
3335
}
3436

37+
def getLastRepositoryLogLines(linesCount: Int): Seq[String] = synchronized {
38+
val log = Paths.get(
39+
sys.props.get("user.home").get,
40+
".ccm",
41+
"repository",
42+
"ccm-repository.log").toString
43+
44+
getLastLogLines(log, linesCount)
45+
}
46+
47+
def getLastLogLines(path: String, linesCount: Int): Seq[String] = synchronized {
48+
val command = s"tail -$linesCount $path"
49+
CcmBridge.execute(CommandLine.parse(command))
50+
}
51+
52+
/**
53+
* Waits for the node to become alive. The first check is performed after the first interval.
54+
*/
55+
def waitForNode(nodeNo: Int, timeout: FiniteDuration, interval: Duration = 5.seconds): Boolean = {
56+
val deadline = timeout.fromNow
57+
while (!deadline.isOverdue()) {
58+
Thread.sleep(interval.toMillis)
59+
if (isAlive(nodeNo, interval)) {
60+
return true
61+
}
62+
}
63+
false;
64+
}
65+
66+
private def isAlive(nodeNo: Int, timeout: Duration): Boolean = {
67+
import java.net.Socket
68+
val address = config.addressOfNode(nodeNo)
69+
val socket = new Socket
70+
try {
71+
socket.connect(address, timeout.toMillis.toInt)
72+
socket.close()
73+
true
74+
} catch {
75+
case NonFatal(_) =>
76+
false
77+
}
78+
}
3579
}

test-support/src/main/scala/com/datastax/spark/connector/ccm/mode/StandardModeExecutor.scala

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,53 @@
11
package com.datastax.spark.connector.ccm.mode
22

3-
import java.io.File
3+
import java.io.{File, FileFilter}
44
import java.nio.file.{Files, Path, Paths}
55
import java.util.concurrent.atomic.AtomicBoolean
6-
76
import com.datastax.oss.driver.api.core.Version
87
import com.datastax.spark.connector.ccm.CcmConfig
98
import com.datastax.spark.connector.ccm.CcmConfig.V6_8_5
109
import org.slf4j.{Logger, LoggerFactory}
1110

11+
import scala.concurrent.duration.DurationInt
12+
import scala.util.Try
13+
import scala.util.control.NonFatal
14+
1215
private[mode] trait DefaultExecutor extends ClusterModeExecutor {
1316
private val logger: Logger = LoggerFactory.getLogger(classOf[StandardModeExecutor])
1417

1518
private val created = new AtomicBoolean()
1619

20+
private def waitForNode(nodeNo: Int): Unit = {
21+
logger.info(s"Waiting for node $nodeNo to become alive...")
22+
if (!waitForNode(nodeNo, 2.minutes)) {
23+
throw new IllegalStateException(s"Timeouted on waiting for node $nodeNo")
24+
}
25+
logger.info(s"Node $nodeNo is alive")
26+
}
27+
28+
private def logStdErr(nodeNo: Int): Unit = {
29+
Try {
30+
val linesCount = 1000
31+
val logsDir = new File(s"${dir}/ccm_1/node${nodeNo}/logs/")
32+
33+
if (logsDir.exists() && logsDir.isDirectory) {
34+
val stdErrFile = logsDir.listFiles().filter(_.getName.endsWith("stderr.log")).head
35+
logger.error(s"Start command failed, here is the last $linesCount lines of startup-stderr file: \n" +
36+
getLastLogLines(stdErrFile.getAbsolutePath, linesCount).mkString("\n"))
37+
}
38+
}
39+
}
40+
1741
override def start(nodeNo: Int): Unit = {
1842
val formattedJvmArgs = config.jvmArgs.map(arg => s" --jvm_arg=$arg").mkString(" ")
19-
execute(s"node$nodeNo", "start", formattedJvmArgs + "--wait-for-binary-proto")
43+
try {
44+
execute(s"node$nodeNo", "start", formattedJvmArgs + "-v", "--skip-wait-other-notice")
45+
waitForNode(nodeNo)
46+
} catch {
47+
case NonFatal(e) =>
48+
logStdErr(nodeNo)
49+
throw e
50+
}
2051
}
2152

2253
private def eventually[T](hint: String = "", f: =>T ): T = {
@@ -39,13 +70,15 @@ private[mode] trait DefaultExecutor extends ClusterModeExecutor {
3970
/**
4071
* Remove this once C* 4.0.0 is released.
4172
*
42-
* This is a workaround that allows running it:test against 4.0.0-beta1. This version of C* is published as
43-
* 4.0-beta1 which breaks versioning convention used in integration tests.
73+
* This is a workaround that allows running it:test against 4.0.0-betaX and 4.0.0-rcX. These C* versions are
74+
* published as 4.0-betaX and 4.0-rcX, lack of patch version breaks versioning convention used in integration tests.
4475
*/
4576
private def adjustCassandraBetaVersion(version: String): String = {
4677
val beta = "4.0.0-beta(\\d+)".r
78+
val rc = "4.0.0-rc(\\d+)".r
4779
version match {
4880
case beta(betaNo) => s"4.0-beta$betaNo"
81+
case rc(rcNo) => s"4.0-rc$rcNo"
4982
case other => other
5083
}
5184
}
@@ -74,7 +107,14 @@ private[mode] trait DefaultExecutor extends ClusterModeExecutor {
74107
eventually(f = Files.exists(repositoryDir.resolve("bin")))
75108
}
76109

77-
execute( createArgs: _*)
110+
try {
111+
execute(createArgs: _*)
112+
} catch {
113+
case NonFatal(e) =>
114+
Try(logger.error("Create command failed, here is the last 500 lines of ccm repository log: \n" +
115+
getLastRepositoryLogLines(500).mkString("\n")))
116+
throw e
117+
}
78118

79119
eventually("Checking to make sure repository was correctly expanded", {
80120
Files.exists(repositoryDir.resolve("bin"))

0 commit comments

Comments
 (0)