Skip to content

Commit 3297466

Browse files
authored
Merge pull request #26 from PlaytikaOSS/feature/long-version
Switch version from int to long.
2 parents ddc9832 + 6e96611 commit 3297466

File tree

15 files changed

+53
-96
lines changed

15 files changed

+53
-96
lines changed

common/src/main/java/com/playtika/shepherd/common/PastureListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ public interface PastureListener<Breed> {
1212
/**
1313
* Invoked when new subpopulation assigned to this pasture
1414
*/
15-
void assigned(List<Breed> population, int version, int generation, boolean isLeader);
15+
void assigned(List<Breed> population, long version, int generation, boolean isLeader);
1616

1717
/**
1818
* Invoked on first phase of rebalance

common/src/main/java/com/playtika/shepherd/common/pull/Herd.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,6 @@ public interface Herd<Breed> {
1212

1313
void reset();
1414

15-
record Population<Breed>(Breed[] population, int version) {
15+
record Population<Breed>(Breed[] population, long version) {
1616
}
1717
}

common/src/main/java/com/playtika/shepherd/common/push/Shepherd.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,6 @@ public interface Shepherd<Breed> {
1313
* @return true if it will cause rebalance, false if population will be ignored
1414
*/
1515

16-
boolean setPopulation(Breed[] population, int version);
16+
boolean setPopulation(Breed[] population, long version);
1717

1818
}

kafka-functional-tests/src/test/java/com/playtika/shepherd/KafkaPullFarmTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import java.nio.ByteBuffer;
1111
import java.time.Duration;
1212
import java.util.List;
13-
import java.util.concurrent.atomic.AtomicInteger;
13+
import java.util.concurrent.atomic.AtomicLong;
1414
import java.util.concurrent.atomic.AtomicReference;
1515
import java.util.stream.Stream;
1616

@@ -112,7 +112,7 @@ private void shouldBalanceDynamicHerd(boolean versioned) {
112112
int ver1 = nextVersion(0, versioned);
113113

114114
AtomicReference<List<ByteBuffer>> cows1 = new AtomicReference<>(List.of());
115-
AtomicInteger version1 = new AtomicInteger();
115+
AtomicLong version1 = new AtomicLong();
116116
Pasture pasture1 = kafkaRanch.addPasture(herd, (population, version, generation, isLeader) -> {
117117
logPopulation(1, population, version, isLeader);
118118
cows1.set(population);
@@ -122,7 +122,7 @@ private void shouldBalanceDynamicHerd(boolean versioned) {
122122
pasture1.start();
123123

124124
AtomicReference<List<ByteBuffer>> cows2 = new AtomicReference<>(List.of());
125-
AtomicInteger version2 = new AtomicInteger();
125+
AtomicLong version2 = new AtomicLong();
126126
Pasture pasture2 = kafkaRanch.addPasture(herd, (population, version, generation, isLeader) -> {
127127
logPopulation(2, population, version, isLeader);
128128
cows2.set(population);
@@ -253,7 +253,7 @@ public void shouldBalanceBreedingStaticHerd() {
253253
});
254254
}
255255

256-
private static void logPopulation(int pastureIndex, List<ByteBuffer> population, int version, boolean isLeader) {
256+
private static void logPopulation(int pastureIndex, List<ByteBuffer> population, long version, boolean isLeader) {
257257
logger.info("Assigned to pasture{} leader={} version={} [{}]", pastureIndex, isLeader, version, toBytes(population));
258258
}
259259

kafka-functional-tests/src/test/java/com/playtika/shepherd/KafkaPushFarmTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import java.util.List;
1212
import java.util.Objects;
1313
import java.util.Random;
14-
import java.util.concurrent.atomic.AtomicInteger;
14+
import java.util.concurrent.atomic.AtomicLong;
1515
import java.util.concurrent.atomic.AtomicReference;
1616
import java.util.stream.Stream;
1717

@@ -111,7 +111,7 @@ private void shouldBalanceDynamicHerd(boolean versioned) {
111111
int ver1 = nextVersion(0, versioned);
112112

113113
AtomicReference<List<ByteBuffer>> cows1 = new AtomicReference<>(List.of());
114-
AtomicInteger version1 = new AtomicInteger();
114+
AtomicLong version1 = new AtomicLong();
115115
String herdName = versioned ? "push-dynamic-group-versioned" : "push-dynamic-group";
116116
Pasture<ByteBuffer> pasture1 = kafkaRanch.addPasture(herdName, (population, version, generation, isLeader) -> {
117117
logPopulation(1, population, version, isLeader);
@@ -122,7 +122,7 @@ private void shouldBalanceDynamicHerd(boolean versioned) {
122122
pasture1.start();
123123

124124
AtomicReference<List<ByteBuffer>> cows2 = new AtomicReference<>(List.of());
125-
AtomicInteger version2 = new AtomicInteger();
125+
AtomicLong version2 = new AtomicLong();
126126
Pasture<ByteBuffer> pasture2 = kafkaRanch.addPasture(herdName, (population, version, generation, isLeader) -> {
127127
logPopulation(2, population, version, isLeader);
128128
cows2.set(population);
@@ -326,7 +326,7 @@ public void shouldBalanceBreedingStaticHerd() {
326326
});
327327
}
328328

329-
private static void logPopulation(int pastureIndex, List<ByteBuffer> population, int version, boolean isLeader) {
329+
private static void logPopulation(int pastureIndex, List<ByteBuffer> population, long version, boolean isLeader) {
330330
logger.info("Assigned to pasture{} leader={} version={} [{}]", pastureIndex, isLeader, version, toBytes(population));
331331
}
332332

kafka-functional-tests/src/test/java/com/playtika/shepherd/inernal/ShepherdTest.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@
99
import java.nio.ByteBuffer;
1010
import java.time.Duration;
1111
import java.util.List;
12-
import java.util.Set;
13-
import java.util.concurrent.ConcurrentHashMap;
12+
import java.util.concurrent.CopyOnWriteArrayList;
1413
import java.util.concurrent.LinkedBlockingQueue;
1514
import java.util.concurrent.atomic.AtomicInteger;
1615
import java.util.stream.Stream;
@@ -37,7 +36,7 @@ public void shouldBalanceStaticHerd() {
3736
Herd herd = checked(new Herd() {
3837
@Override
3938
public Population getPopulation() {
40-
return new Population(Set.of(cow1, cow2), -1);
39+
return new Population(List.of(cow1, cow2), -1);
4140
}
4241

4342
@Override
@@ -49,7 +48,7 @@ public void reset() {
4948
LinkedBlockingQueue<ByteBuffer> cows1 = new LinkedBlockingQueue<>();
5049
PastureListener<ByteBuffer> rebalanceListener1 = new PastureListener<>() {
5150
@Override
52-
public void assigned(List<ByteBuffer> population, int version, int generation, boolean isLeader) {
51+
public void assigned(List<ByteBuffer> population, long version, int generation, boolean isLeader) {
5352
logger.info("Assigned cows1 [{}]", toBytes(population));
5453
cows1.addAll(population);
5554
}
@@ -78,7 +77,7 @@ public void cleanup() {
7877
LinkedBlockingQueue<ByteBuffer> cows2 = new LinkedBlockingQueue<>();
7978
PastureListener<ByteBuffer> rebalanceListener2 = new PastureListener<>() {
8079
@Override
81-
public void assigned(List<ByteBuffer> population, int version, int generation, boolean isLeader) {
80+
public void assigned(List<ByteBuffer> population, long version, int generation, boolean isLeader) {
8281
logger.info("Assigned cows2 [{}]", toBytes(population));
8382
cows2.addAll(population);
8483
}
@@ -119,8 +118,7 @@ public void shouldBalanceDynamicHerd() {
119118

120119
ByteBuffer cow1 = ByteBuffer.wrap(new byte[]{1});
121120
ByteBuffer cow2 = ByteBuffer.wrap(new byte[]{0});
122-
Set<ByteBuffer> population = ConcurrentHashMap.newKeySet();
123-
population.addAll(List.of(cow1, cow2));
121+
List<ByteBuffer> population = new CopyOnWriteArrayList<>(List.of(cow1, cow2));
124122
AtomicInteger version = new AtomicInteger(1);
125123

126124
Herd herd = checked(new Herd() {
@@ -137,7 +135,7 @@ public void reset() {
137135
LinkedBlockingQueue<ByteBuffer> cows1 = new LinkedBlockingQueue<>();
138136
PastureListener<ByteBuffer> rebalanceListener1 = new PastureListener<>() {
139137
@Override
140-
public void assigned(List<ByteBuffer> population, int version, int generation, boolean isLeader) {
138+
public void assigned(List<ByteBuffer> population, long version, int generation, boolean isLeader) {
141139
logger.info("Assigned cows1 [{}]", toBytes(population));
142140
cows1.addAll(population);
143141
}
@@ -161,7 +159,7 @@ public void cleanup() {
161159
LinkedBlockingQueue<ByteBuffer> cows2 = new LinkedBlockingQueue<>();
162160
PastureListener<ByteBuffer> rebalanceListener2 = new PastureListener<>() {
163161
@Override
164-
public void assigned(List<ByteBuffer> population, int version, int generation, boolean isLeader) {
162+
public void assigned(List<ByteBuffer> population, long version, int generation, boolean isLeader) {
165163
logger.info("Assigned cows2 [{}]", toBytes(population));
166164
cows2.addAll(population);
167165
}

kafka/src/main/java/com/playtika/shepherd/KafkaPullFarm.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212

1313
import java.nio.ByteBuffer;
1414
import java.time.Duration;
15-
import java.util.HashSet;
1615
import java.util.List;
1716
import java.util.Map;
1817

@@ -76,7 +75,7 @@ static class PullHerd<Breed> implements com.playtika.shepherd.inernal.Herd, Past
7675
@Override
7776
public Population getPopulation() {
7877
Herd.Population<Breed> population = herd.getPopulation();
79-
return new Population(new HashSet<>(serDe.serialize(List.of(population.population()))), population.version());
78+
return new Population(serDe.serialize(List.of(population.population())), population.version());
8079
}
8180

8281
@Override
@@ -107,7 +106,7 @@ public void rebalanceHerd() {
107106
}
108107

109108
@Override
110-
public void assigned(List<ByteBuffer> population, int version, int generation, boolean isLeader) {
109+
public void assigned(List<ByteBuffer> population, long version, int generation, boolean isLeader) {
111110
pastureListener.assigned(serDe.deserialize(population), version, generation, isLeader);
112111
}
113112

kafka/src/main/java/com/playtika/shepherd/KafkaPushFarm.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package com.playtika.shepherd;
22

3+
import com.playtika.shepherd.common.PastureListener;
34
import com.playtika.shepherd.common.push.Farm;
45
import com.playtika.shepherd.common.push.Pasture;
5-
import com.playtika.shepherd.common.PastureListener;
66
import com.playtika.shepherd.common.push.Shepherd;
77
import com.playtika.shepherd.inernal.Herd;
88
import com.playtika.shepherd.inernal.PastureShepherd;
@@ -13,10 +13,8 @@
1313
import java.nio.ByteBuffer;
1414
import java.time.Duration;
1515
import java.util.Arrays;
16-
import java.util.HashSet;
1716
import java.util.List;
1817
import java.util.Map;
19-
import java.util.Set;
2018
import java.util.function.Supplier;
2119

2220
import static com.playtika.shepherd.inernal.CheckedHerd.checked;
@@ -83,21 +81,21 @@ static final class PushHerd<Breed> implements Herd, Pasture<Breed>, Shepherd<Bre
8381
private Population snapshot;
8482
private Population latest;
8583

86-
private int assignedVersion = Integer.MIN_VALUE;
84+
private long assignedVersion = Long.MIN_VALUE;
8785

8886
PushHerd(PastureListener<Breed> pastureListener, SerDe<Breed> serDe) {
8987
this.pastureListener = pastureListener;
9088
this.serDe = serDe;
9189
}
9290

9391
@Override
94-
public synchronized boolean setPopulation(Breed[] population, int version) {
92+
public synchronized boolean setPopulation(Breed[] population, long version) {
9593
//Ignore outdated non-static version
9694
if(version >=0 && version <= assignedVersion){
9795
return false;
9896
}
9997

100-
Supplier<Set<ByteBuffer>> latest = memoize(() -> new HashSet<>(serDe.serialize(Arrays.asList(population))));
98+
Supplier<List<ByteBuffer>> latest = memoize(() -> serDe.serialize(Arrays.asList(population)));
10199
if(this.snapshot == null
102100
|| version >= 0 && version > this.snapshot.getVersion()
103101
|| version < 0 && !this.snapshot.getSheep().equals(latest.get())){
@@ -136,7 +134,7 @@ public synchronized void reset() {
136134
}
137135

138136
@Override
139-
public synchronized void assigned(List<ByteBuffer> population, int version, int generation, boolean isLeader) {
137+
public synchronized void assigned(List<ByteBuffer> population, long version, int generation, boolean isLeader) {
140138
this.pastureListener.assigned(serDe.deserialize(population), version, generation, isLeader);
141139
this.assignedVersion = version;
142140
}

kafka/src/main/java/com/playtika/shepherd/inernal/Assignment.java

Lines changed: 1 addition & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,37 +2,6 @@
22

33
import java.nio.ByteBuffer;
44
import java.util.List;
5-
import java.util.Objects;
6-
7-
public class Assignment {
8-
9-
private final String leader;
10-
private final List<ByteBuffer> assigned;
11-
private final int version;
12-
13-
public Assignment(String leader, int version, List<ByteBuffer> assigned) {
14-
this.leader = leader;
15-
this.assigned = assigned;
16-
this.version = version;
17-
}
18-
19-
public String getLeader() {
20-
return leader;
21-
}
22-
23-
public List<ByteBuffer> getAssigned() {
24-
return assigned;
25-
}
26-
27-
public int getVersion() {
28-
return version;
29-
}
30-
31-
@Override
32-
public boolean equals(Object obj){
33-
Assignment assignment = (Assignment) obj;
34-
return Objects.equals(leader, assignment.leader)
35-
&& Objects.equals(assigned, assignment.assigned);
36-
}
375

6+
public record Assignment(String leader, long version, List<ByteBuffer> assigned) {
387
}

kafka/src/main/java/com/playtika/shepherd/inernal/Assignor.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,9 @@
55
import java.nio.ByteBuffer;
66
import java.util.List;
77
import java.util.Map;
8-
import java.util.Set;
98

109
public interface Assignor {
1110
Map<String, ByteBuffer> performAssignment(
12-
String leaderId, String protocol, Set<ByteBuffer> population, int version,
11+
String leaderId, String protocol, List<ByteBuffer> population, long version,
1312
List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata);
1413
}

0 commit comments

Comments
 (0)