Skip to content

Commit 611bdba

Browse files
authored
refactor: refactor all TargetSystems (#385)
1 parent 3a93af6 commit 611bdba

File tree

10 files changed

+95
-224
lines changed

10 files changed

+95
-224
lines changed

core/flamingock-core/src/main/java/io/flamingock/internal/core/targets/TransactionalTargetSystem.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,21 +41,14 @@ public abstract class TransactionalTargetSystem<HOLDER extends TransactionalTarg
4141
extends AbstractTargetSystem<HOLDER>
4242
implements ContextInitializable {
4343

44+
protected String onGoingTasksRepositoryName = "flamingockOnGoingTasks";
4445
protected boolean autoCreate = true;
46+
protected TargetSystemAuditMarker markerRepository;
4547

4648
public TransactionalTargetSystem(String id) {
4749
super(id);
4850
}
4951

50-
public HOLDER withAutoCreate(boolean autoCreate) {
51-
this.autoCreate = autoCreate;
52-
return getSelf();
53-
}
54-
55-
public boolean isAutoCreate() {
56-
return autoCreate;
57-
}
58-
5952
public boolean hasMarker() {
6053
TargetSystemAuditMarker onGoingTaskStatusRepository = getOnGoingTaskStatusRepository();
6154
return onGoingTaskStatusRepository != null && !(onGoingTaskStatusRepository instanceof NoOpTargetSystemAuditMarker);
@@ -90,7 +83,9 @@ public final <T> T applyChangeTransactional(Function<ExecutionRuntime, T> change
9083
*
9184
* @return the audit marker instance
9285
*/
93-
abstract public TargetSystemAuditMarker getOnGoingTaskStatusRepository();
86+
public TargetSystemAuditMarker getOnGoingTaskStatusRepository() {
87+
return markerRepository;
88+
}
9489

9590
/**
9691
* Returns the transaction wrapper for this target system.

core/target-systems/couchbase-target-system/src/main/java/io/flamingock/targetsystem/couchbase/CouchbaseTargetSystem.java

Lines changed: 8 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -21,51 +21,25 @@
2121
import com.couchbase.client.java.transactions.TransactionAttemptContext;
2222
import io.flamingock.internal.common.core.context.ContextResolver;
2323
import io.flamingock.internal.core.builder.FlamingockEdition;
24-
import io.flamingock.internal.core.store.audit.community.CommunityPersistenceConstants;
2524
import io.flamingock.internal.core.transaction.TransactionManager;
2625
import io.flamingock.internal.core.targets.TransactionalTargetSystem;
2726
import io.flamingock.internal.core.targets.mark.NoOpTargetSystemAuditMarker;
28-
import io.flamingock.internal.core.targets.mark.TargetSystemAuditMarker;
2927
import io.flamingock.internal.core.transaction.TransactionWrapper;
3028
import io.flamingock.internal.common.core.error.FlamingockException;
3129

3230
public class CouchbaseTargetSystem extends TransactionalTargetSystem<CouchbaseTargetSystem> {
3331

34-
private static final String COUCHBASE_SCOPE_NAME_PROPERTY = "couchbase.scopeName";
35-
private static final String COUCHBASE_ON_GOING_TASKS_REPOSITORY_NAME_PROPERTY = "couchbase.onGoingTasksRepositoryName";
36-
37-
private TargetSystemAuditMarker taskStatusRepository;
38-
private CouchbaseTxWrapper txWrapper;
3932
private Cluster cluster;
4033
private Bucket bucket;
4134
private String bucketName;
35+
private String scopeName = CollectionIdentifier.DEFAULT_SCOPE;
36+
37+
private CouchbaseTxWrapper txWrapper;
4238

4339
public CouchbaseTargetSystem(String id, Cluster cluster, String bucketName) {
4440
super(id);
4541
this.cluster = cluster;
4642
this.bucketName = bucketName;
47-
this.validate();
48-
this.bucket = cluster.bucket(bucketName);
49-
this.autoCreate = true;
50-
targetSystemContext.addDependency(cluster);
51-
targetSystemContext.addDependency(bucket);
52-
targetSystemContext.setProperty("autoCreate", true);
53-
}
54-
55-
public CouchbaseTargetSystem withScopeName(String scopeName) {
56-
targetSystemContext.setProperty(COUCHBASE_SCOPE_NAME_PROPERTY, scopeName);
57-
return this;
58-
}
59-
60-
public CouchbaseTargetSystem withOnGoingTasksRepositoryName(String onGoingTasksRepositoryName) {
61-
targetSystemContext.setProperty(COUCHBASE_ON_GOING_TASKS_REPOSITORY_NAME_PROPERTY, onGoingTasksRepositoryName);
62-
return this;
63-
}
64-
65-
public CouchbaseTargetSystem withAutoCreate(boolean autoCreate) {
66-
this.autoCreate = autoCreate;
67-
targetSystemContext.setProperty("autoCreate", autoCreate);
68-
return this;
6943
}
7044

7145
public Cluster getCluster() {
@@ -83,27 +57,16 @@ public TransactionManager<TransactionAttemptContext> getTxManager() {
8357
@Override
8458
public void initialize(ContextResolver baseContext) {
8559
this.validate();
86-
FlamingockEdition edition = baseContext.getDependencyValue(FlamingockEdition.class)
87-
.orElse(FlamingockEdition.CLOUD);
88-
89-
String scopeName = targetSystemContext.getPropertyAs(COUCHBASE_SCOPE_NAME_PROPERTY, String.class)
90-
.orElseGet(() -> baseContext.getPropertyAs(COUCHBASE_SCOPE_NAME_PROPERTY, String.class)
91-
.orElse(CollectionIdentifier.DEFAULT_SCOPE));
60+
targetSystemContext.addDependency(cluster);
61+
bucket = cluster.bucket(bucketName);
62+
targetSystemContext.addDependency(bucket);
9263

93-
String onGoingTasksRepositoryName = targetSystemContext.getPropertyAs(COUCHBASE_ON_GOING_TASKS_REPOSITORY_NAME_PROPERTY, String.class)
94-
.orElseGet(() -> baseContext.getPropertyAs(COUCHBASE_ON_GOING_TASKS_REPOSITORY_NAME_PROPERTY, String.class)
95-
.orElse(CommunityPersistenceConstants.DEFAULT_ON_GOING_TASKS_STORE_NAME));
9664

9765
TransactionManager<TransactionAttemptContext> txManager = new TransactionManager<>(null); //TODO: update as needed
9866
txWrapper = new CouchbaseTxWrapper(cluster, txManager);
9967

100-
taskStatusRepository = edition == FlamingockEdition.COMMUNITY
101-
? new NoOpTargetSystemAuditMarker(this.getId())
102-
: CouchbaseTargetSystemAuditMarker.builder(cluster, bucket, txManager)
103-
.withScopeName(scopeName)
104-
.withCollectionName(onGoingTasksRepositoryName)
105-
.withAutoCreate(autoCreate)
106-
.build();
68+
//TODO: inject marker repository based on edition(baseContext.getDependencyValue(FlamingockEdition.class))
69+
markerRepository = new NoOpTargetSystemAuditMarker(this.getId());
10770
}
10871

10972
private void validate() {
@@ -113,22 +76,13 @@ private void validate() {
11376
if (bucketName == null || bucketName.trim().isEmpty()) {
11477
throw new FlamingockException("The 'bucketName' property is required.");
11578
}
116-
Bucket testBucket = cluster.bucket(bucketName);
117-
if (testBucket == null) {
118-
throw new FlamingockException("The 'bucketName' property is invalid. The cluster does not contain a bucket named '%s'", bucketName);
119-
}
12079
}
12180

12281
@Override
12382
protected CouchbaseTargetSystem getSelf() {
12483
return this;
12584
}
12685

127-
@Override
128-
public TargetSystemAuditMarker getOnGoingTaskStatusRepository() {
129-
return taskStatusRepository;
130-
}
131-
13286
@Override
13387
public TransactionWrapper getTxWrapper() {
13488
return txWrapper;

core/target-systems/couchbase-target-system/src/test/java/io/flamingock/targetsystem/couchbase/CouchbaseTargetSystemTest.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,7 @@ void happyPath() {
172172
));
173173

174174

175-
TargetSystem couchbaseTargetSystem = new CouchbaseTargetSystem("couchbase-ts", cluster, BUCKET_NAME)
176-
.withScopeName(SCOPE_NAME);
175+
TargetSystem couchbaseTargetSystem = new CouchbaseTargetSystem("couchbase-ts", cluster, BUCKET_NAME);
177176
flamingockBuilder
178177
.addTargetSystem(couchbaseTargetSystem)
179178
.build()
@@ -184,12 +183,14 @@ void happyPath() {
184183

185184
// check clients changes
186185
couchbaseTestHelper.checkCount(bucket.scope(SCOPE_NAME).collection(CLIENTS_COLLECTION), 1);
186+
//TODO add when cloud added
187187
// check ongoing status
188-
couchbaseTestHelper.checkOngoingTask(ongoingCount -> ongoingCount == 0);
188+
// couchbaseTestHelper.checkOngoingTask(ongoingCount -> ongoingCount == 0);
189189
}
190190
}
191191

192192
@Test
193+
@Disabled("adapt when adding cloud support")
193194
@DisplayName("Should rollback the ongoing deletion when a task fails")
194195
void failedTasks() {
195196
String executionId = "execution-1";
@@ -223,8 +224,7 @@ void failedTasks() {
223224
new Trio<>(UnhappyInsertClientsChange.class, Collections.singletonList(Bucket.class))
224225
));
225226

226-
TargetSystem couchbaseTargetSystem = new CouchbaseTargetSystem("couchbase-ts", cluster, BUCKET_NAME)
227-
.withScopeName(SCOPE_NAME);
227+
TargetSystem couchbaseTargetSystem = new CouchbaseTargetSystem("couchbase-ts", cluster, BUCKET_NAME);
228228

229229
Runner runner = flamingockBuilder
230230
.addTargetSystem(couchbaseTargetSystem)
@@ -237,8 +237,10 @@ void failedTasks() {
237237

238238
// check clients changes
239239
couchbaseTestHelper.checkCount(bucket.scope(SCOPE_NAME).collection(CLIENTS_COLLECTION), 0);
240+
241+
//TODO when cloud enabled
240242
// check ongoing status
241-
couchbaseTestHelper.checkEmptyTargetSystemAuditMarker();
243+
// couchbaseTestHelper.checkEmptyTargetSystemAuditMarker();
242244
}
243245
}
244246

@@ -279,8 +281,7 @@ void shouldSendOngoingTaskInExecutionPlan() {
279281
new Trio<>(HappyInsertClientsChange.class, Collections.singletonList(Bucket.class))
280282
));
281283

282-
TargetSystem couchbaseTargetSystem = new CouchbaseTargetSystem("couchbase-ts", cluster, BUCKET_NAME)
283-
.withScopeName(SCOPE_NAME);
284+
TargetSystem couchbaseTargetSystem = new CouchbaseTargetSystem("couchbase-ts", cluster, BUCKET_NAME);
284285

285286
flamingockBuilder
286287
.addTargetSystem(couchbaseTargetSystem)

core/target-systems/dynamodb-target-system/src/main/java/io/flamingock/targetsystem/dynamodb/DynamoDBTargetSystem.java

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,34 +16,24 @@
1616
package io.flamingock.targetsystem.dynamodb;
1717

1818
import io.flamingock.internal.common.core.context.ContextResolver;
19+
import io.flamingock.internal.common.core.error.FlamingockException;
1920
import io.flamingock.internal.core.builder.FlamingockEdition;
2021
import io.flamingock.internal.core.transaction.TransactionManager;
2122
import io.flamingock.internal.core.targets.mark.NoOpTargetSystemAuditMarker;
22-
import io.flamingock.internal.core.targets.mark.TargetSystemAuditMarker;
2323
import io.flamingock.internal.core.targets.TransactionalTargetSystem;
2424
import io.flamingock.internal.core.transaction.TransactionWrapper;
2525
import software.amazon.awssdk.enhanced.dynamodb.model.TransactWriteItemsEnhancedRequest;
2626
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
2727

2828
public class DynamoDBTargetSystem extends TransactionalTargetSystem<DynamoDBTargetSystem> {
29-
private static final String FLAMINGOCK_ON_GOING_TASKS = "flamingockOnGoingTasks";
3029

31-
private TargetSystemAuditMarker taskStatusRepository;
32-
private DynamoDBTxWrapper txWrapper;
3330
private DynamoDbClient client;
3431

32+
private DynamoDBTxWrapper txWrapper;
33+
3534
public DynamoDBTargetSystem(String id, DynamoDbClient dynamoDBClient) {
3635
super(id);
3736
this.client = dynamoDBClient;
38-
this.autoCreate = true;
39-
targetSystemContext.addDependency(dynamoDBClient);
40-
targetSystemContext.setProperty("autoCreate", true);
41-
}
42-
43-
public DynamoDBTargetSystem withAutoCreate(boolean autoCreate) {
44-
this.autoCreate = autoCreate;
45-
targetSystemContext.setProperty("autoCreate", autoCreate);
46-
return this;
4737
}
4838

4939
public DynamoDbClient getClient() {
@@ -56,28 +46,28 @@ public TransactionManager<TransactWriteItemsEnhancedRequest.Builder> getTxManage
5646

5747
@Override
5848
public void initialize(ContextResolver baseContext) {
49+
this.validate();
50+
targetSystemContext.addDependency(client);
51+
5952
FlamingockEdition edition = baseContext.getDependencyValue(FlamingockEdition.class)
6053
.orElse(FlamingockEdition.CLOUD);
6154

6255
TransactionManager<TransactWriteItemsEnhancedRequest.Builder> txManager = new TransactionManager<>(TransactWriteItemsEnhancedRequest::builder);
6356
txWrapper = new DynamoDBTxWrapper(client, txManager);
6457

65-
taskStatusRepository = edition == FlamingockEdition.COMMUNITY
66-
? new NoOpTargetSystemAuditMarker(this.getId())
67-
: DynamoDBTargetSystemAuditMarker.builder(client, txManager)
68-
.setTableName(FLAMINGOCK_ON_GOING_TASKS)
69-
.withAutoCreate(autoCreate)
70-
.build();
58+
//TODO: inject marker repository based on edition(baseContext.getDependencyValue(FlamingockEdition.class))
59+
markerRepository = new NoOpTargetSystemAuditMarker(this.getId());
7160
}
7261

73-
@Override
74-
protected DynamoDBTargetSystem getSelf() {
75-
return this;
62+
private void validate() {
63+
if (client == null) {
64+
throw new FlamingockException("The 'DynamoDbClient' instance is required.");
65+
}
7666
}
7767

7868
@Override
79-
public TargetSystemAuditMarker getOnGoingTaskStatusRepository() {
80-
return taskStatusRepository;
69+
protected DynamoDBTargetSystem getSelf() {
70+
return this;
8171
}
8272

8373
@Override

core/target-systems/dynamodb-target-system/src/test/java/io/flamingock/targetsystem/dynamodb/DynamoDBCloudTargetSystemTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,12 +182,15 @@ void happyPath() {
182182
.build()
183183
.table(UserEntity.tableName, TableSchema.fromBean(UserEntity.class)),
184184
1);
185+
186+
//TODO when cloud enabled
185187
// check ongoing status
186-
dynamoDBTestHelper.checkOngoingTask(ongoingCount -> ongoingCount == 0);
188+
// dynamoDBTestHelper.checkOngoingTask(ongoingCount -> ongoingCount == 0);
187189
}
188190
}
189191

190192
@Test
193+
@Disabled("adapt when adding cloud support")
191194
@DisplayName("Should rollback the ongoing deletion when a task fails")
192195
void failedTasks() {
193196
String executionId = "execution-1";

core/target-systems/mongodb-springdata-target-system/src/main/java/io/flamingock/targetsystem/mongodb/springdata/MongoDBSpringDataTargetSystem.java

Lines changed: 7 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,57 +21,40 @@
2121
import io.flamingock.internal.common.core.context.ContextResolver;
2222
import io.flamingock.internal.common.core.error.FlamingockException;
2323
import io.flamingock.internal.core.targets.mark.NoOpTargetSystemAuditMarker;
24-
import io.flamingock.internal.core.targets.mark.TargetSystemAuditMarker;
2524
import io.flamingock.internal.core.targets.TransactionalTargetSystem;
2625
import io.flamingock.internal.core.transaction.TransactionWrapper;
2726
import org.springframework.data.mongodb.core.MongoTemplate;
2827

2928

3029
public class MongoDBSpringDataTargetSystem extends TransactionalTargetSystem<MongoDBSpringDataTargetSystem> {
3130

32-
private final WriteConcern DEFAULT_WRITE_CONCERN = WriteConcern.MAJORITY.withJournal(true);
33-
private final ReadConcern DEFAULT_READ_CONCERN = ReadConcern.MAJORITY;
34-
private final ReadPreference DEFAULT_READ_PREFERENCE = ReadPreference.primary();
35-
36-
private TargetSystemAuditMarker taskStatusRepository;
31+
private MongoTemplate mongoTemplate;
32+
private WriteConcern writeConcern = WriteConcern.MAJORITY.withJournal(true);
33+
private ReadConcern readConcern = ReadConcern.MAJORITY;
34+
private ReadPreference readPreference = ReadPreference.primary();
3735

3836
private MongoDBSpringDataTxWrapper txWrapper;
39-
private MongoTemplate mongoTemplate;
40-
private WriteConcern writeConcern = DEFAULT_WRITE_CONCERN;
41-
private ReadConcern readConcern = DEFAULT_READ_CONCERN;
42-
private ReadPreference readPreference = DEFAULT_READ_PREFERENCE;
43-
// private boolean autoCreate = true;
4437

4538
public MongoDBSpringDataTargetSystem(String id, MongoTemplate mongoTemplate) {
4639
super(id);
4740
this.mongoTemplate = mongoTemplate;
48-
targetSystemContext.addDependency(mongoTemplate);
4941
}
5042

5143
public MongoDBSpringDataTargetSystem withReadConcern(ReadConcern readConcern) {
5244
this.readConcern = readConcern;
53-
targetSystemContext.addDependency(readConcern);
5445
return this;
5546
}
5647

5748
public MongoDBSpringDataTargetSystem withReadPreference(ReadPreference readPreference) {
5849
this.readPreference = readPreference;
59-
targetSystemContext.addDependency(readPreference);
6050
return this;
6151
}
6252

6353
public MongoDBSpringDataTargetSystem withWriteConcern(WriteConcern writeConcern) {
6454
this.writeConcern = writeConcern;
65-
targetSystemContext.addDependency(writeConcern);
6655
return this;
6756
}
6857

69-
// public MongoDBSpringDataTargetSystem withAutoCreate(boolean autoCreate) {
70-
// this.autoCreate = autoCreate;
71-
// targetSystemContext.setProperty("autoCreate", autoCreate);
72-
// return this;
73-
// }
74-
7558
public MongoTemplate getMongoTemplate() {
7659
return mongoTemplate;
7760
}
@@ -91,6 +74,7 @@ public ReadPreference getReadPreference() {
9174
@Override
9275
public void initialize(ContextResolver baseContext) {
9376
this.validate();
77+
targetSystemContext.addDependency(mongoTemplate);
9478

9579
txWrapper = MongoDBSpringDataTxWrapper.builder()
9680
.mongoTemplate(mongoTemplate)
@@ -99,8 +83,8 @@ public void initialize(ContextResolver baseContext) {
9983
.writeConcern(writeConcern)
10084
.build();
10185

102-
//TODO create MongoDBSpringDataOnGoingTaskStatusRepository for cloud edition
103-
taskStatusRepository = new NoOpTargetSystemAuditMarker(this.getId());
86+
//TODO: inject marker repository based on edition(baseContext.getDependencyValue(FlamingockEdition.class))
87+
markerRepository = new NoOpTargetSystemAuditMarker(this.getId());
10488
}
10589

10690
private void validate() {
@@ -124,11 +108,6 @@ protected MongoDBSpringDataTargetSystem getSelf() {
124108
return this;
125109
}
126110

127-
@Override
128-
public TargetSystemAuditMarker getOnGoingTaskStatusRepository() {
129-
return taskStatusRepository;
130-
}
131-
132111
@Override
133112
public TransactionWrapper getTxWrapper() {
134113
return txWrapper;

0 commit comments

Comments
 (0)