Skip to content

Commit e81e54b

Browse files
authored
Merge pull request #43 from PlaytikaOSS/feature/assignment-data
Introduce AssignmentData with memberId field
2 parents 076ed20 + 8147669 commit e81e54b

File tree

11 files changed

+70
-57
lines changed

11 files changed

+70
-57
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package com.playtika.shepherd.common;
2+
3+
public record AssignmentData(
4+
long populationVersion,
5+
String memberId, int generation, boolean isLeader) {
6+
}

common/src/main/java/com/playtika/shepherd/common/PastureListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ public interface PastureListener<Breed> {
1212
/**
1313
* Invoked when new subpopulation assigned to this pasture
1414
*/
15-
void assigned(List<Breed> population, long version, int generation, boolean isLeader);
15+
void assigned(List<Breed> population, AssignmentData populationAssignment);
1616

1717
/**
1818
* Invoked on first phase of rebalance

kafka-example/src/test/java/com/playtika/shepherd/JoinPasture.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ public static void main(String[] args) {
3636

3737

3838
Pasture<String> skyNet = kafkaPushFarm.addBreedingPasture("SkyNet", String.class,
39-
(population, version, generation, isLeader) -> {
40-
logger.info("Assigned leader={} version={} [{}]", isLeader, version, population);
39+
(population, assignmentData) -> {
40+
logger.info("Assigned assignmentData={} [{}]", assignmentData, population);
4141
});
4242

4343

kafka-functional-tests/src/test/java/com/playtika/shepherd/KafkaPullFarmTest.java

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.playtika.shepherd;
22

3+
import com.playtika.shepherd.common.AssignmentData;
34
import com.playtika.shepherd.common.pull.Herd;
45
import com.playtika.shepherd.common.pull.Pasture;
56
import com.playtika.shepherd.common.pull.Shepherd;
@@ -38,8 +39,8 @@ public void shouldBalanceStaticHerd() {
3839

3940
AtomicReference<List<ByteBuffer>> cows1 = new AtomicReference<>(List.of());
4041

41-
Pasture pasture1 = kafkaRanch.addPasture(herd, (population, version, generation, isLeader) -> {
42-
logPopulation(1, population, version, isLeader);
42+
Pasture pasture1 = kafkaRanch.addPasture(herd, (population, assignmentData) -> {
43+
logPopulation(1, population, assignmentData);
4344
cows1.set(population);
4445
});
4546

@@ -52,8 +53,8 @@ public void shouldBalanceStaticHerd() {
5253

5354
//setup another pasture
5455
AtomicReference<List<ByteBuffer>> cows2 = new AtomicReference<>(List.of());
55-
Pasture pasture2 = kafkaRanch.addPasture(herd, (population, version, generation, isLeader) -> {
56-
logPopulation(2, population, version, isLeader);
56+
Pasture pasture2 = kafkaRanch.addPasture(herd, (population, assignmentData) -> {
57+
logPopulation(2, population, assignmentData);
5758
cows2.set(population);
5859
});
5960
pasture2.start();
@@ -65,8 +66,8 @@ public void shouldBalanceStaticHerd() {
6566

6667
//setup third pasture
6768
AtomicReference<List<ByteBuffer>> cows3 = new AtomicReference<>(List.of());
68-
Pasture pasture3 = kafkaRanch.addPasture(herd, (population, version, generation, isLeader) -> {
69-
logPopulation(3, population, version, isLeader);
69+
Pasture pasture3 = kafkaRanch.addPasture(herd, (population, assignmentData) -> {
70+
logPopulation(3, population, assignmentData);
7071
cows3.set(population);
7172
});
7273
pasture3.start();
@@ -113,20 +114,20 @@ private void shouldBalanceDynamicHerd(boolean versioned) {
113114

114115
AtomicReference<List<ByteBuffer>> cows1 = new AtomicReference<>(List.of());
115116
AtomicLong version1 = new AtomicLong();
116-
Pasture pasture1 = kafkaRanch.addPasture(herd, (population, version, generation, isLeader) -> {
117-
logPopulation(1, population, version, isLeader);
117+
Pasture pasture1 = kafkaRanch.addPasture(herd, (population, assignmentData) -> {
118+
logPopulation(1, population, assignmentData);
118119
cows1.set(population);
119-
version1.set(version);
120+
version1.set(assignmentData.populationVersion());
120121
});
121122
populationGlobal.set(new Herd.Population<>(new ByteBuffer[]{cow1, cow2}, ver1));
122123
pasture1.start();
123124

124125
AtomicReference<List<ByteBuffer>> cows2 = new AtomicReference<>(List.of());
125126
AtomicLong version2 = new AtomicLong();
126-
Pasture pasture2 = kafkaRanch.addPasture(herd, (population, version, generation, isLeader) -> {
127-
logPopulation(2, population, version, isLeader);
127+
Pasture pasture2 = kafkaRanch.addPasture(herd, (population, assignmentData) -> {
128+
logPopulation(2, population, assignmentData);
128129
cows2.set(population);
129-
version2.set(version);
130+
version2.set(assignmentData.populationVersion());
130131
});
131132
pasture2.start();
132133

@@ -206,7 +207,7 @@ public void shouldBalanceBreedingStaticHerd() {
206207
AtomicReference<List<BlackSheep>> subHerd1 = new AtomicReference<>(List.of());
207208

208209
Pasture pasture1 = kafkaRanch.addBreedingPasture(herd, BlackSheep.class,
209-
(population, version, generation, isLeader) -> {
210+
(population, assignmentData) -> {
210211
logger.info("Assigned sheep1 [{}]", population);
211212
subHerd1.set(population);
212213
});
@@ -219,7 +220,7 @@ public void shouldBalanceBreedingStaticHerd() {
219220
//setup another pasture
220221
AtomicReference<List<BlackSheep>> subHerd2 = new AtomicReference<>(List.of());
221222
Pasture pasture2 = kafkaRanch.addBreedingPasture(herd, BlackSheep.class,
222-
(population, version, generation, isLeader) -> {
223+
(population, assignmentData) -> {
223224
logger.info("Assigned sheep2 [{}]", population);
224225
subHerd2.set(population);
225226
});
@@ -232,7 +233,7 @@ public void shouldBalanceBreedingStaticHerd() {
232233

233234
//setup third pasture
234235
AtomicReference<List<BlackSheep>> subHerd3 = new AtomicReference<>(List.of());
235-
Pasture pasture3 = kafkaRanch.addBreedingPasture(herd, BlackSheep.class, (population, version, generation, isLeader) -> {
236+
Pasture pasture3 = kafkaRanch.addBreedingPasture(herd, BlackSheep.class, (population, assignmentData) -> {
236237
logger.info("Assigned cows3 [{}]", population);
237238
subHerd3.set(population);
238239
});
@@ -253,8 +254,8 @@ public void shouldBalanceBreedingStaticHerd() {
253254
});
254255
}
255256

256-
private static void logPopulation(int pastureIndex, List<ByteBuffer> population, long version, boolean isLeader) {
257-
logger.info("Assigned to pasture{} leader={} version={} [{}]", pastureIndex, isLeader, version, toBytes(population));
257+
private static void logPopulation(int pastureIndex, List<ByteBuffer> population, AssignmentData assignmentData) {
258+
logger.info("Assigned to pasture{} assignmentData={} [{}]", pastureIndex, assignmentData, toBytes(population));
258259
}
259260

260261
public static class TestHerd<Breed> implements Herd<Breed> {

kafka-functional-tests/src/test/java/com/playtika/shepherd/KafkaPushFarmTest.java

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.playtika.shepherd;
22

3+
import com.playtika.shepherd.common.AssignmentData;
34
import com.playtika.shepherd.common.push.Pasture;
45
import com.playtika.shepherd.common.push.Shepherd;
56
import org.junit.jupiter.api.Test;
@@ -37,8 +38,8 @@ public void shouldBalanceStaticHerd() {
3738
AtomicReference<List<ByteBuffer>> cows1 = new AtomicReference<>(List.of());
3839

3940
String herdName = "push-static-herd";
40-
Pasture<ByteBuffer> pasture1 = kafkaRanch.addPasture(herdName, (population, version, generation, isLeader) -> {
41-
logPopulation(1, population, version, isLeader);
41+
Pasture<ByteBuffer> pasture1 = kafkaRanch.addPasture(herdName, (population, assignmentData) -> {
42+
logPopulation(1, population, assignmentData);
4243
cows1.set(population);
4344
});
4445

@@ -51,8 +52,8 @@ public void shouldBalanceStaticHerd() {
5152

5253
//setup another pasture
5354
AtomicReference<List<ByteBuffer>> cows2 = new AtomicReference<>(List.of());
54-
Pasture<ByteBuffer> pasture2 = kafkaRanch.addPasture(herdName, (population, version, generation, isLeader) -> {
55-
logPopulation(2, population, version, isLeader);
55+
Pasture<ByteBuffer> pasture2 = kafkaRanch.addPasture(herdName, (population, assignmentData) -> {
56+
logPopulation(2, population, assignmentData);
5657
cows2.set(population);
5758
});
5859
pasture2.getShepherd().setPopulation(cows, -1);
@@ -65,8 +66,8 @@ public void shouldBalanceStaticHerd() {
6566

6667
//setup third pasture
6768
AtomicReference<List<ByteBuffer>> cows3 = new AtomicReference<>(List.of());
68-
Pasture<ByteBuffer> pasture3 = kafkaRanch.addPasture(herdName, (population, version, generation, isLeader) -> {
69-
logPopulation(3, population, version, isLeader);
69+
Pasture<ByteBuffer> pasture3 = kafkaRanch.addPasture(herdName, (population, assignmentData) -> {
70+
logPopulation(3, population, assignmentData);
7071
cows3.set(population);
7172
});
7273
pasture3.getShepherd().setPopulation(cows, -1);
@@ -113,20 +114,20 @@ private void shouldBalanceDynamicHerd(boolean versioned) {
113114
AtomicReference<List<ByteBuffer>> cows1 = new AtomicReference<>(List.of());
114115
AtomicLong version1 = new AtomicLong();
115116
String herdName = versioned ? "push-dynamic-group-versioned" : "push-dynamic-group";
116-
Pasture<ByteBuffer> pasture1 = kafkaRanch.addPasture(herdName, (population, version, generation, isLeader) -> {
117-
logPopulation(1, population, version, isLeader);
117+
Pasture<ByteBuffer> pasture1 = kafkaRanch.addPasture(herdName, (population, assignmentData) -> {
118+
logPopulation(1, population, assignmentData);
118119
cows1.set(population);
119-
version1.set(version);
120+
version1.set(assignmentData.populationVersion());
120121
});
121122
pasture1.getShepherd().setPopulation(cows, ver1);
122123
pasture1.start();
123124

124125
AtomicReference<List<ByteBuffer>> cows2 = new AtomicReference<>(List.of());
125126
AtomicLong version2 = new AtomicLong();
126-
Pasture<ByteBuffer> pasture2 = kafkaRanch.addPasture(herdName, (population, version, generation, isLeader) -> {
127-
logPopulation(2, population, version, isLeader);
127+
Pasture<ByteBuffer> pasture2 = kafkaRanch.addPasture(herdName, (population, assignmentData) -> {
128+
logPopulation(2, population, assignmentData);
128129
cows2.set(population);
129-
version2.set(version);
130+
version2.set(assignmentData.populationVersion());
130131
});
131132
pasture2.getShepherd().setPopulation(cows, ver1);
132133
pasture2.start();
@@ -212,16 +213,16 @@ public void shouldBalanceDynamicConcurrentSequenceHerd() {
212213

213214
AtomicReference<List<ByteBuffer>> cows1 = new AtomicReference<>(List.of());
214215
String herdName = "push-random-group";
215-
Pasture<ByteBuffer> pasture1 = kafkaRanch.addPasture(herdName, (population, version, generation, isLeader) -> {
216-
logPopulation(1, population, version, isLeader);
216+
Pasture<ByteBuffer> pasture1 = kafkaRanch.addPasture(herdName, (population, assignmentData) -> {
217+
logPopulation(1, population, assignmentData);
217218
cows1.set(population);
218219
});
219220
pasture1.getShepherd().setPopulation(new ByteBuffer[]{}, 0);
220221
pasture1.start();
221222

222223
AtomicReference<List<ByteBuffer>> cows2 = new AtomicReference<>(List.of());
223-
Pasture<ByteBuffer> pasture2 = kafkaRanch.addPasture(herdName, (population, version, generation, isLeader) -> {
224-
logPopulation(2, population, version, isLeader);
224+
Pasture<ByteBuffer> pasture2 = kafkaRanch.addPasture(herdName, (population, assignmentData) -> {
225+
logPopulation(2, population, assignmentData);
225226
cows2.set(population);
226227
});
227228
pasture2.getShepherd().setPopulation(new ByteBuffer[]{}, 0);
@@ -274,8 +275,7 @@ public void shouldBalanceBreedingStaticHerd() {
274275
AtomicReference<List<BlackSheep>> subHerd1 = new AtomicReference<>(List.of());
275276

276277
String herdName = "push-static-breeding-herd";
277-
Pasture<BlackSheep> pasture1 = kafkaRanch.addBreedingPasture(herdName, BlackSheep.class,
278-
(population, version, generation, isLeader) -> {
278+
Pasture<BlackSheep> pasture1 = kafkaRanch.addBreedingPasture(herdName, BlackSheep.class, (population, assignmentData) -> {
279279
logger.info("Assigned sheep1 [{}]", population);
280280
subHerd1.set(population);
281281
});
@@ -289,8 +289,7 @@ public void shouldBalanceBreedingStaticHerd() {
289289

290290
//setup another pasture
291291
AtomicReference<List<BlackSheep>> subHerd2 = new AtomicReference<>(List.of());
292-
Pasture<BlackSheep> pasture2 = kafkaRanch.addBreedingPasture(herdName, BlackSheep.class,
293-
(population, version, generation, isLeader) -> {
292+
Pasture<BlackSheep> pasture2 = kafkaRanch.addBreedingPasture(herdName, BlackSheep.class, (population, assignmentData) -> {
294293
logger.info("Assigned sheep2 [{}]", population);
295294
subHerd2.set(population);
296295
});
@@ -304,7 +303,7 @@ public void shouldBalanceBreedingStaticHerd() {
304303

305304
//setup third pasture
306305
AtomicReference<List<BlackSheep>> subHerd3 = new AtomicReference<>(List.of());
307-
Pasture<BlackSheep> pasture3 = kafkaRanch.addBreedingPasture(herdName, BlackSheep.class, (population, version, generation, isLeader) -> {
306+
Pasture<BlackSheep> pasture3 = kafkaRanch.addBreedingPasture(herdName, BlackSheep.class, (population, assignmentData) -> {
308307
logger.info("Assigned cows3 [{}]", population);
309308
subHerd3.set(population);
310309
});
@@ -326,8 +325,8 @@ public void shouldBalanceBreedingStaticHerd() {
326325
});
327326
}
328327

329-
private static void logPopulation(int pastureIndex, List<ByteBuffer> population, long version, boolean isLeader) {
330-
logger.info("Assigned to pasture{} leader={} version={} [{}]", pastureIndex, isLeader, version, toBytes(population));
328+
private static void logPopulation(int pastureIndex, List<ByteBuffer> population, AssignmentData assignmentData) {
329+
logger.info("Assigned to pasture{} assignmentData={} [{}]", pastureIndex, assignmentData, toBytes(population));
331330
}
332331

333332
static class BlackSheep {

kafka-functional-tests/src/test/java/com/playtika/shepherd/KafkaToxiFarmTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public void shouldRestoreBalanceForStaticHerd() throws IOException {
123123
AtomicReference<List<ByteBuffer>> cows1 = new AtomicReference<>(List.of());
124124

125125
String herdName = "static-herd";
126-
Pasture<ByteBuffer> pasture1 = kafkaRanch.addPasture(herdName, (population, version, generation, isLeader) -> {
126+
Pasture<ByteBuffer> pasture1 = kafkaRanch.addPasture(herdName, (population, assignmentData) -> {
127127
logger.info("Assigned cows1 [{}]", toBytes(population));
128128
cows1.set(population);
129129
});
@@ -138,7 +138,7 @@ public void shouldRestoreBalanceForStaticHerd() throws IOException {
138138
//setup toxi pasture
139139
KafkaPushFarm kafkaToxiRanch = new KafkaPushFarm(getToxiBootstrapServers(), TEST_PROPERTIES);
140140
AtomicReference<List<ByteBuffer>> cows2 = new AtomicReference<>(List.of());
141-
Pasture<ByteBuffer> pasture2 = kafkaToxiRanch.addPasture(herdName, (population, version, generation, isLeader) -> {
141+
Pasture<ByteBuffer> pasture2 = kafkaToxiRanch.addPasture(herdName, (population, assignmentData) -> {
142142
logger.info("Assigned cows2 [{}]", toBytes(population));
143143
cows2.set(population);
144144
});

kafka-functional-tests/src/test/java/com/playtika/shepherd/inernal/ShepherdTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.playtika.shepherd.inernal;
22

33
import com.playtika.shepherd.BasicKafkaTest;
4+
import com.playtika.shepherd.common.AssignmentData;
45
import com.playtika.shepherd.common.PastureListener;
56
import org.apache.kafka.common.message.JoinGroupResponseData;
67
import org.junit.jupiter.api.Test;
@@ -50,7 +51,7 @@ public void reset() {
5051
LinkedBlockingQueue<ByteBuffer> cows1 = new LinkedBlockingQueue<>();
5152
PastureListener<ByteBuffer> rebalanceListener1 = new PastureListener<>() {
5253
@Override
53-
public void assigned(List<ByteBuffer> population, long version, int generation, boolean isLeader) {
54+
public void assigned(List<ByteBuffer> population, AssignmentData assignmentData) {
5455
logger.info("Assigned cows1 [{}]", toBytes(population));
5556
cows1.addAll(population);
5657
}
@@ -80,7 +81,7 @@ public void cleanup() {
8081
LinkedBlockingQueue<ByteBuffer> cows2 = new LinkedBlockingQueue<>();
8182
PastureListener<ByteBuffer> rebalanceListener2 = new PastureListener<>() {
8283
@Override
83-
public void assigned(List<ByteBuffer> population, long version, int generation, boolean isLeader) {
84+
public void assigned(List<ByteBuffer> population, AssignmentData assignmentData) {
8485
logger.info("Assigned cows2 [{}]", toBytes(population));
8586
cows2.addAll(population);
8687
}
@@ -142,7 +143,7 @@ public void reset() {
142143
LinkedBlockingQueue<ByteBuffer> cows1 = new LinkedBlockingQueue<>();
143144
PastureListener<ByteBuffer> rebalanceListener1 = new PastureListener<>() {
144145
@Override
145-
public void assigned(List<ByteBuffer> population, long version, int generation, boolean isLeader) {
146+
public void assigned(List<ByteBuffer> population, AssignmentData assignmentData) {
146147
logger.info("Assigned cows1 [{}]", toBytes(population));
147148
cows1.addAll(population);
148149
}
@@ -166,7 +167,7 @@ public void cleanup() {
166167
LinkedBlockingQueue<ByteBuffer> cows2 = new LinkedBlockingQueue<>();
167168
PastureListener<ByteBuffer> rebalanceListener2 = new PastureListener<>() {
168169
@Override
169-
public void assigned(List<ByteBuffer> population, long version, int generation, boolean isLeader) {
170+
public void assigned(List<ByteBuffer> population, AssignmentData assignmentData) {
170171
logger.info("Assigned cows2 [{}]", toBytes(population));
171172
cows2.addAll(population);
172173
}

kafka/src/main/java/com/playtika/shepherd/KafkaPullFarm.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.playtika.shepherd;
22

3+
import com.playtika.shepherd.common.AssignmentData;
34
import com.playtika.shepherd.common.PastureListener;
45
import com.playtika.shepherd.common.pull.Farm;
56
import com.playtika.shepherd.common.pull.Herd;
@@ -106,8 +107,8 @@ public void rebalanceHerd() {
106107
}
107108

108109
@Override
109-
public void assigned(List<ByteBuffer> population, long version, int generation, boolean isLeader) {
110-
pastureListener.assigned(serDe.deserialize(population), version, generation, isLeader);
110+
public void assigned(List<ByteBuffer> population, AssignmentData assignmentData) {
111+
pastureListener.assigned(serDe.deserialize(population), assignmentData);
111112
}
112113

113114
@Override

kafka/src/main/java/com/playtika/shepherd/KafkaPushFarm.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.playtika.shepherd;
22

33
import com.playtika.shepherd.common.PastureListener;
4+
import com.playtika.shepherd.common.AssignmentData;
45
import com.playtika.shepherd.common.push.Farm;
56
import com.playtika.shepherd.common.push.Pasture;
67
import com.playtika.shepherd.common.push.Shepherd;
@@ -134,9 +135,9 @@ public synchronized void reset() {
134135
}
135136

136137
@Override
137-
public synchronized void assigned(List<ByteBuffer> population, long version, int generation, boolean isLeader) {
138-
this.pastureListener.assigned(serDe.deserialize(population), version, generation, isLeader);
139-
this.assignedVersion = version;
138+
public synchronized void assigned(List<ByteBuffer> population, AssignmentData assignmentData) {
139+
this.pastureListener.assigned(serDe.deserialize(population), assignmentData);
140+
this.assignedVersion = assignmentData.populationVersion();
140141
}
141142

142143
@Override

0 commit comments

Comments
 (0)