Skip to content

Commit 3c50029

Browse files
authored
Merge pull request #42 from PlaytikaOSS/feature/pasture-logs
Removed CheckedHerd and improve logs in PastureCoordinator
2 parents 822dc43 + ce7aefe commit 3c50029

File tree

5 files changed

+38
-52
lines changed

5 files changed

+38
-52
lines changed

kafka-functional-tests/src/test/java/com/playtika/shepherd/inernal/ShepherdTest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import java.util.concurrent.atomic.AtomicInteger;
1616
import java.util.stream.Stream;
1717

18-
import static com.playtika.shepherd.inernal.CheckedHerd.checked;
1918
import static com.playtika.shepherd.inernal.utils.BytesUtils.toBytes;
2019
import static java.time.Duration.ofSeconds;
2120
import static org.assertj.core.api.Assertions.assertThat;
@@ -35,7 +34,7 @@ public void shouldBalanceStaticHerd() {
3534
ByteBuffer cow2 = ByteBuffer.wrap(new byte[]{0});
3635
AtomicInteger pasturesCountParameter = new AtomicInteger();
3736

38-
Herd herd = checked(new Herd() {
37+
Herd herd = new Herd() {
3938
@Override
4039
public Population getPopulation(List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata) {
4140
pasturesCountParameter.set(allMemberMetadata.size());
@@ -45,7 +44,7 @@ public Population getPopulation(List<JoinGroupResponseData.JoinGroupResponseMemb
4544
@Override
4645
public void reset() {
4746
}
48-
});
47+
};
4948

5049

5150
LinkedBlockingQueue<ByteBuffer> cows1 = new LinkedBlockingQueue<>();
@@ -128,7 +127,7 @@ public void shouldBalanceDynamicHerd() {
128127
AtomicInteger version = new AtomicInteger(1);
129128
AtomicInteger pasturesCountParameter = new AtomicInteger();
130129

131-
Herd herd = checked(new Herd() {
130+
Herd herd = new Herd() {
132131
@Override
133132
public Population getPopulation(List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata) {
134133
pasturesCountParameter.set(allMemberMetadata.size());
@@ -138,7 +137,7 @@ public Population getPopulation(List<JoinGroupResponseData.JoinGroupResponseMemb
138137
@Override
139138
public void reset() {
140139
}
141-
});
140+
};
142141

143142
LinkedBlockingQueue<ByteBuffer> cows1 = new LinkedBlockingQueue<>();
144143
PastureListener<ByteBuffer> rebalanceListener1 = new PastureListener<>() {

kafka/src/main/java/com/playtika/shepherd/KafkaPullFarm.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import java.util.List;
1717
import java.util.Map;
1818

19-
import static com.playtika.shepherd.inernal.CheckedHerd.checked;
2019
import static com.playtika.shepherd.serde.SerDeUtils.BYTE_BUFFER_DE_SER;
2120
import static com.playtika.shepherd.serde.SerDeUtils.getSerDe;
2221

@@ -52,7 +51,7 @@ private <Breed> Pasture addBreedingPasture(Herd<Breed> herd, SerDe<Breed> serDe
5251
.setGroupId(herd.getName())
5352
.setProperties(properties)
5453
.setRebalanceListener(pullHerd)
55-
.setHerd(checked(pullHerd))
54+
.setHerd(pullHerd)
5655
.build();
5756

5857
pullHerd.setPastureShepherd(pastureShepherd);

kafka/src/main/java/com/playtika/shepherd/KafkaPushFarm.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import java.util.Map;
1919
import java.util.function.Supplier;
2020

21-
import static com.playtika.shepherd.inernal.CheckedHerd.checked;
2221
import static com.playtika.shepherd.inernal.utils.CacheUtils.memoize;
2322
import static com.playtika.shepherd.serde.SerDeUtils.BYTE_BUFFER_DE_SER;
2423
import static com.playtika.shepherd.serde.SerDeUtils.getSerDe;
@@ -56,7 +55,7 @@ private <Breed> Pasture<Breed> addBreedingPasture(String herdName, SerDe<Breed>
5655
.setGroupId(herdName)
5756
.setProperties(properties)
5857
.setRebalanceListener(pushHerd)
59-
.setHerd(checked(pushHerd))
58+
.setHerd(pushHerd)
6059
.build();
6160

6261
pushHerd.setPastureShepherd(pastureShepherd);
Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +0,0 @@
1-
package com.playtika.shepherd.inernal;
2-
3-
import org.apache.kafka.common.message.JoinGroupResponseData;
4-
5-
import java.util.List;
6-
7-
public class CheckedHerd implements Herd {
8-
9-
private final Herd herd;
10-
private boolean requested = false;
11-
12-
public static Herd checked(Herd herd){
13-
return new CheckedHerd(herd);
14-
}
15-
16-
private CheckedHerd(Herd herd) {
17-
this.herd = herd;
18-
}
19-
20-
@Override
21-
public Population getPopulation(List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata){
22-
if(requested){
23-
throw new IllegalStateException("Should be called only once on rebalance");
24-
}
25-
try {
26-
return herd.getPopulation(allMemberMetadata);
27-
} finally {
28-
requested = true;
29-
}
30-
}
31-
32-
@Override
33-
public void reset() {
34-
herd.reset();
35-
requested = false;
36-
}
37-
}

kafka/src/main/java/com/playtika/shepherd/inernal/PastureCoordinator.java

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.Collections;
3434
import java.util.List;
3535
import java.util.Map;
36+
import java.util.stream.Collectors;
3637

3738
import static com.playtika.shepherd.inernal.ProtocolHelper.decompress;
3839
import static com.playtika.shepherd.inernal.ProtocolHelper.deserializeAssignment;
@@ -176,14 +177,39 @@ protected Map<String, ByteBuffer> onLeaderElected(String leaderId,
176177
if (skipAssignment)
177178
throw new IllegalStateException("Can't skip assignment because static membership is not supported.");
178179

179-
Population population = herd.getPopulation(allMemberMetadata);
180-
leaderElected = leaderId;
180+
try {
181+
Population population = herd.getPopulation(allMemberMetadata);
182+
leaderElected = leaderId;
183+
184+
if (logger.isDebugEnabled()) {
185+
logger.debug("""
186+
Will assign population of size=[{}] among members count=[{}],
187+
members=[{}],
188+
population=[{}]""",
189+
population.getSheep().size(), allMemberMetadata.size(),
190+
collectMemberIds(allMemberMetadata),
191+
toBytes(population.getSheep()));
192+
} else {
193+
logger.info("""
194+
Will assign population of size=[{}] among members count=[{}],
195+
members=[{}]""",
196+
population.getSheep().size(), allMemberMetadata.size(),
197+
collectMemberIds(allMemberMetadata));
198+
}
181199

182-
logger.info("Will rebalance population: [{}]", toBytes(population.getSheep()));
200+
return assignor.performAssignment(leaderId, protocol,
201+
population.getSheep(), population.getVersion(),
202+
allMemberMetadata);
203+
} catch (Throwable t) {
204+
logger.error("Failed to assign population. Will return empty assignment.\n" +
205+
"members count=[{}], members=[{}]",
206+
allMemberMetadata.size(), collectMemberIds(allMemberMetadata));
207+
return Map.of();
208+
}
209+
}
183210

184-
return assignor.performAssignment(leaderId, protocol,
185-
population.getSheep(), population.getVersion(),
186-
allMemberMetadata);
211+
private static String collectMemberIds(List<JoinGroupResponseMember> allMemberMetadata) {
212+
return allMemberMetadata.stream().map(JoinGroupResponseMember::memberId).collect(Collectors.joining(", "));
187213
}
188214

189215
@Override

0 commit comments

Comments
 (0)