Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.beam.it.gcp.datagenerator.DataGenerator;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
Expand All @@ -43,7 +42,6 @@
@Category(TemplateLoadTest.class)
@TemplateLoadTest(SpannerToSourceDb.class)
@RunWith(JUnit4.class)
@Ignore("Disabling incorrect LT. b/446480838")
public class SpannerToCassandraSourceLT extends SpannerToCassandraLTBase {

private static final Logger LOG = LoggerFactory.getLogger(SpannerToCassandraSourceLT.class);
Expand All @@ -68,8 +66,7 @@ public void setup() throws IOException {
artifactBucket,
gcsResourceManager
.uploadArtifact(
"input/schema.json",
Resources.getResource(dataGeneratorSchemaResource).getPath())
SCHEMA_FILE_NAME, Resources.getResource(dataGeneratorSchemaResource).getPath())
.name());
jobInfo =
launchDataflowJob(
Expand All @@ -78,7 +75,8 @@ public void setup() throws IOException {
maxWorkers,
null,
CASSANDRA_SOURCE_TYPE,
SOURCE_SHARDS_FILE_NAME);
SOURCE_SHARDS_FILE_NAME,
null);
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ public void setup() throws IOException, InterruptedException {
artifactBucket,
gcsResourceManager
.uploadArtifact(
"input/schema.json",
Resources.getResource(dataGeneratorSchemaResource).getPath())
SCHEMA_FILE_NAME, Resources.getResource(dataGeneratorSchemaResource).getPath())
.name());

createMySQLSchema(jdbcResourceManagers);
Expand All @@ -89,7 +88,8 @@ public void setup() throws IOException, InterruptedException {
maxWorkers,
customTransformation,
MYSQL_SOURCE_TYPE,
SOURCE_SHARDS_FILE_NAME);
SOURCE_SHARDS_FILE_NAME,
SESSION_FILE_NAME);
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ public void setup() throws IOException {
artifactBucket,
gcsResourceManager
.uploadArtifact(
"input/schema.json",
Resources.getResource(dataGeneratorSchemaResource).getPath())
SCHEMA_FILE_NAME, Resources.getResource(dataGeneratorSchemaResource).getPath())
.name());

createMySQLSchema(jdbcResourceManagers);
Expand All @@ -84,7 +83,8 @@ public void setup() throws IOException {
maxWorkers,
null,
MYSQL_SOURCE_TYPE,
SOURCE_SHARDS_FILE_NAME);
SOURCE_SHARDS_FILE_NAME,
SESSION_FILE_NAME);
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public class SpannerToSourceDbLTBase extends TemplateLoadTestBase {

private static final Logger LOG = LoggerFactory.getLogger(SpannerToSourceDbLTBase.class);
public static final String SOURCE_SHARDS_FILE_NAME = "input/shard.json";
public static final String SESSION_FILE_NAME = "input/session.json";
public static final String SCHEMA_FILE_NAME = "input/schema.json";
private static final String TEMPLATE_SPEC_PATH =
MoreObjects.firstNonNull(
TestProperties.specPath(), "gs://dataflow-templates/latest/flex/Spanner_to_SourceDb");
Expand All @@ -76,7 +78,7 @@ public void setupResourceManagers(
GcsResourceManager.builder(artifactBucket, getClass().getSimpleName(), CREDENTIALS).build();

gcsResourceManager.uploadArtifact(
"input/session.json", Resources.getResource(sessionFileResource).getPath());
SESSION_FILE_NAME, Resources.getResource(sessionFileResource).getPath());

pubsubResourceManager = setUpPubSubResourceManager();
subscriptionName =
Expand Down Expand Up @@ -193,16 +195,19 @@ public PipelineLauncher.LaunchInfo launchDataflowJob(
int maxWorkers,
CustomTransformation customTransformation,
String sourceType,
String shardFileName)
String shardFileName,
String sessionFileName)
throws IOException {
// default parameters

Map<String, String> params =
new HashMap<>() {
{
put(
"sessionFilePath",
getGcsPath(artifactBucket, "input/session.json", gcsResourceManager));
if (sessionFileName != null) {
put(
"sessionFilePath",
getGcsPath(artifactBucket, sessionFileName, gcsResourceManager));
}
put("instanceId", spannerResourceManager.getInstanceId());
put("databaseId", spannerResourceManager.getDatabaseId());
put("spannerProjectId", project);
Expand Down
Loading