Skip to content

Commit e371111

Browse files
authored
Merge pull request #963: Kafka watermarking
2 parents ce539d9 + f6e230d commit e371111

File tree

12 files changed

+129
-129
lines changed

12 files changed

+129
-129
lines changed

beam/core/src/test/java/cz/o2/proxima/beam/core/direct/io/BatchLogReadTest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.Collections;
4343
import java.util.List;
4444
import java.util.UUID;
45+
import java.util.concurrent.atomic.AtomicLong;
4546
import java.util.stream.Collectors;
4647
import java.util.stream.IntStream;
4748
import org.apache.beam.runners.direct.DirectRunner;
@@ -66,6 +67,7 @@
6667
import org.apache.beam.sdk.values.PBegin;
6768
import org.apache.beam.sdk.values.PCollection;
6869
import org.apache.beam.sdk.values.TypeDescriptors;
70+
import org.junit.Ignore;
6971
import org.junit.Test;
7072
import org.junit.runner.RunWith;
7173
import org.junit.runners.Parameterized;
@@ -139,6 +141,7 @@ public void testReadWithThroughputLimitDisabled() {
139141
}
140142

141143
@Test(timeout = 60000)
144+
@Ignore("Unreproducibly flaky with unknown purpose")
142145
public void testReadWithThroughputLimitWait() {
143146
int numElements = 1000;
144147
List<StreamElement> input = createInput(numElements);
@@ -323,14 +326,14 @@ private static ThroughputLimiter getThroughputLimiter() {
323326

324327
private static ThroughputLimiter getThroughputLimiter(int waitInvocations) {
325328
return new ThroughputLimiter() {
326-
long numInvocations = 0L;
329+
final AtomicLong numInvocations = new AtomicLong();
327330

328331
@Override
329332
public Duration getPauseTime(Context context) {
330-
if (++numInvocations > waitInvocations) {
333+
if (numInvocations.incrementAndGet() > waitInvocations) {
331334
return Duration.ofSeconds(5);
332335
}
333-
// on first two invocations return zero
336+
// on first N invocations return zero
334337
return Duration.ZERO;
335338
}
336339

core/src/main/java/cz/o2/proxima/core/transaction/KeyAttributes.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -76,19 +76,19 @@ private KeyAttributes() {}
7676
* @param entity the entity descriptor
7777
* @param key the entity key
7878
* @param attributeDescriptor descriptor of wildcard or regular attribute
79-
* @param sequenceId sequence ID of the read attribute
79+
* @param sequentialId sequence ID of the read attribute
8080
*/
8181
public static KeyAttribute ofAttributeDescriptor(
8282
EntityDescriptor entity,
8383
String key,
8484
AttributeDescriptor<?> attributeDescriptor,
85-
long sequenceId) {
85+
long sequentialId) {
8686

8787
Preconditions.checkArgument(
8888
!attributeDescriptor.isWildcard(),
8989
"Please specify attribute suffix for wildcard attributes. Got attribute %s",
9090
attributeDescriptor);
91-
return new KeyAttribute(entity, key, attributeDescriptor, sequenceId, false, null);
91+
return new KeyAttribute(entity, key, attributeDescriptor, sequentialId, false, null);
9292
}
9393

9494
/**
@@ -98,22 +98,22 @@ public static KeyAttribute ofAttributeDescriptor(
9898
* @param entity the entity descriptor
9999
* @param key the entity key
100100
* @param attributeDescriptor descriptor of wildcard or regular attribute
101-
* @param sequenceId sequence ID of the read attribute
101+
* @param sequentialId sequence ID of the read attribute
102102
* @param attributeSuffix a specific attribute suffix when {@code attributeDescriptor} is wildcard
103103
* attribute
104104
*/
105105
public static KeyAttribute ofAttributeDescriptor(
106106
EntityDescriptor entity,
107107
String key,
108108
AttributeDescriptor<?> attributeDescriptor,
109-
long sequenceId,
109+
long sequentialId,
110110
@Nullable String attributeSuffix) {
111111

112112
Preconditions.checkArgument(
113113
!attributeDescriptor.isWildcard() || attributeSuffix != null,
114114
"Please specify attribute suffix for wildcard attributes. Got attribute %s",
115115
attributeDescriptor);
116-
return new KeyAttribute(entity, key, attributeDescriptor, sequenceId, false, attributeSuffix);
116+
return new KeyAttribute(entity, key, attributeDescriptor, sequentialId, false, attributeSuffix);
117117
}
118118

119119
/**
@@ -166,7 +166,7 @@ public static KeyAttribute ofMissingAttribute(
166166
public static KeyAttribute ofStreamElement(StreamElement element) {
167167
Preconditions.checkArgument(
168168
element.hasSequentialId(),
169-
"Elements read with enabled transactions need to use sequenceIds got %s.",
169+
"Elements read with enabled transactions need to use sequentialIds got %s.",
170170
element);
171171
Preconditions.checkArgument(!element.isDeleteWildcard(), "Wildcard deletes not yet supported");
172172
return new KeyAttribute(

direct/core/src/main/java/cz/o2/proxima/direct/core/randomaccess/KeyValue.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ private KeyValue(
157157
private KeyValue(
158158
EntityDescriptor entityDesc,
159159
AttributeDescriptor<T> attrDesc,
160-
long sequenceId,
160+
long sequentialId,
161161
String key,
162162
String attribute,
163163
RandomOffset offset,
@@ -168,7 +168,7 @@ private KeyValue(
168168
super(
169169
entityDesc,
170170
attrDesc,
171-
sequenceId,
171+
sequentialId,
172172
key,
173173
attribute,
174174
stamp,

direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriter.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ public class Transaction implements AutoCloseable {
172172
@Getter private final String transactionId;
173173
private boolean commitAttempted = false;
174174
@Getter private State.Flags state;
175-
private long sequenceId = -1L;
175+
private long sequentialId = -1L;
176176
private long stamp = Long.MIN_VALUE;
177177
private final List<CompletableFuture<?>> runningUpdates =
178178
Collections.synchronizedList(new ArrayList<>());
@@ -289,7 +289,7 @@ private CompletableFuture<Response> sendCommitRequest(Collection<StreamElement>
289289
if (state != State.Flags.COMMITTED) {
290290
return CompletableFuture.completedFuture(Response.empty().aborted());
291291
}
292-
return CompletableFuture.completedFuture(Response.empty().duplicate(sequenceId));
292+
return CompletableFuture.completedFuture(Response.empty().duplicate(sequentialId));
293293
}
294294
log.debug("Sending commit request for transformed elements {}", transformed);
295295
return manager.commit(transactionId, transformed);
@@ -333,12 +333,12 @@ private void processArrivedResponse(@Nullable Response response, @Nullable Throw
333333
}
334334
if (response.hasSequenceId()) {
335335
Preconditions.checkState(
336-
sequenceId == -1 || sequenceId == response.getSeqId(),
336+
sequentialId == -1 || sequentialId == response.getSeqId(),
337337
"Updated sequence ID from %s to %s. That is a bug in proxima's transactions.",
338-
sequenceId,
338+
sequentialId,
339339
response.getSeqId());
340-
sequenceId = response.getSeqId();
341-
log.debug("Assigned sequence ID {} for transaction {}", sequenceId, transactionId);
340+
sequentialId = response.getSeqId();
341+
log.debug("Assigned sequence ID {} for transaction {}", sequentialId, transactionId);
342342
}
343343
if (response.hasStamp()) {
344344
Preconditions.checkState(
@@ -428,13 +428,13 @@ private CompletableFuture<Void> applyTransform(
428428
private StreamElement injectSequenceIdAndStamp(StreamElement in) {
429429

430430
Preconditions.checkState(
431-
sequenceId > 0, "Invalid sequence ID %s for %s", sequenceId, transactionId);
431+
sequentialId > 0, "Invalid sequence ID %s for %s", sequentialId, transactionId);
432432
Preconditions.checkArgument(!in.isDeleteWildcard(), "Wildcard deletes not yet supported");
433433

434434
return StreamElement.upsert(
435435
in.getEntityDescriptor(),
436436
in.getAttributeDescriptor(),
437-
sequenceId,
437+
sequentialId,
438438
in.getKey(),
439439
in.getAttribute(),
440440
stamp,

direct/core/src/main/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedView.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,13 +108,13 @@ protected void onCache(StreamElement element, boolean overwrite) {
108108
final Pair<Long, Payload> oldVal;
109109
synchronized (cache) {
110110
oldVal = cache.get(element.getKey(), attrName, Long.MAX_VALUE);
111-
long sequenceId = element.hasSequentialId() ? element.getSequentialId() : 0L;
111+
long sequentialId = element.hasSequentialId() ? element.getSequentialId() : 0L;
112112
updated =
113113
cache.put(
114114
element.getKey(),
115115
attrName,
116116
element.getStamp(),
117-
sequenceId,
117+
sequentialId,
118118
overwrite,
119119
parsed.orElse(null));
120120
}

direct/core/src/main/java/cz/o2/proxima/direct/core/view/TimeBoundedVersionedCache.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ synchronized boolean put(
198198
String key,
199199
String attribute,
200200
long stamp,
201-
long sequenceId,
201+
long sequentialId,
202202
boolean overwrite,
203203
@Nullable Object value) {
204204

@@ -215,7 +215,7 @@ synchronized boolean put(
215215
final Payload oldPayload = valueMap.get(stamp);
216216
if (overwrite || oldPayload == null || oldPayload.overridable) {
217217
logPayloadUpdateIfNecessary(key, attribute, stamp, value);
218-
Payload newPayload = new Payload(value, sequenceId, !overwrite);
218+
Payload newPayload = new Payload(value, sequentialId, !overwrite);
219219
valueMap.put(stamp, newPayload);
220220
updated.set(!newPayload.equals(oldPayload));
221221
}

direct/io-kafka/src/main/java/cz/o2/proxima/direct/io/kafka/KafkaAccessor.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import org.apache.kafka.clients.admin.Config;
4949
import org.apache.kafka.clients.admin.ConfigEntry;
5050
import org.apache.kafka.clients.admin.DescribeConfigsResult;
51-
import org.apache.kafka.clients.consumer.ConsumerConfig;
5251
import org.apache.kafka.common.config.ConfigResource;
5352
import org.apache.kafka.common.config.TopicConfig;
5453

@@ -75,10 +74,6 @@ public class KafkaAccessor extends SerializableAbstractStorage implements DataAc
7574
/** Maximal read speed in bytes per second. */
7675
public static final String MAX_BYTES_PER_SEC = "bytes-per-sec-max";
7776

78-
/** Number of records per poll() */
79-
public static final String MAX_POLL_RECORDS =
80-
KAFKA_CONFIG_PREFIX + ConsumerConfig.MAX_POLL_RECORDS_CONFIG;
81-
8277
/** Auto commit interval in milliseconds. */
8378
public static final String AUTO_COMMIT_INTERVAL_MS = "commit.auto-interval-ms";
8479

@@ -115,8 +110,6 @@ public class KafkaAccessor extends SerializableAbstractStorage implements DataAc
115110

116111
@Getter private long maxBytesPerSec = Long.MAX_VALUE;
117112

118-
@Getter private int maxPollRecords = 500;
119-
120113
@Getter private long autoCommitIntervalMs = Long.MAX_VALUE;
121114

122115
@Getter private long logStaleCommitIntervalMs = 60_000L;
@@ -164,11 +157,6 @@ private void configure(Map<String, Object> cfg) {
164157
.map(v -> Long.valueOf(v.toString()))
165158
.orElse(maxBytesPerSec);
166159

167-
this.maxPollRecords =
168-
Optional.ofNullable(cfg.get(MAX_POLL_RECORDS))
169-
.map(v -> Integer.valueOf(v.toString()))
170-
.orElse(maxPollRecords);
171-
172160
this.autoCommitIntervalMs =
173161
Optional.ofNullable(cfg.get(AUTO_COMMIT_INTERVAL_MS))
174162
.map(v -> Long.parseLong(v.toString()))
@@ -200,7 +188,6 @@ private void configure(Map<String, Object> cfg) {
200188
+ "partitionerClass {}, "
201189
+ "maxBytesPerSec {}, "
202190
+ "watermarkConfiguration {}, "
203-
+ "maxPollRecords {}, "
204191
+ "autoCommitIntervalNs {}, "
205192
+ "logStaleCommitIntervalMs {}, "
206193
+ "serializerClass {},"
@@ -209,7 +196,6 @@ private void configure(Map<String, Object> cfg) {
209196
partitioner.getClass(),
210197
maxBytesPerSec,
211198
watermarkConfiguration,
212-
maxPollRecords,
213199
autoCommitIntervalMs,
214200
logStaleCommitIntervalMs,
215201
serializerClass,

0 commit comments

Comments
 (0)