Skip to content

Commit 35e5942

Browse files
authored
Revert "KAFKA-13722: remove usage of old ProcessorContext (#18292)" (#20398)
This reverts commit f13a22a. Reviewers: Chia-Ping Tsai <[email protected]>, Eduwer Camacaro <[email protected]>, Mickael Maison <[email protected]>,
1 parent 2dbed66 commit 35e5942

28 files changed

+76
-96
lines changed

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ public void close() {
213213
private ValueAndTimestamp<KeyValue<? extends K1, ? extends V1>> mapValue(final K key, final ValueAndTimestamp<V> valueAndTimestamp) {
214214
return ValueAndTimestamp.make(
215215
mapper.apply(key, getValueOrNull(valueAndTimestamp)),
216-
valueAndTimestamp == null ? context.recordContext().timestamp() : valueAndTimestamp.timestamp()
216+
valueAndTimestamp == null ? context.timestamp() : valueAndTimestamp.timestamp()
217217
);
218218
}
219219
}

streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public <K, V> void forward(final Record<K, V> record, final String childName) {
8484

8585
@Override
8686
public <KIn, VIn> void forward(final KIn key, final VIn value) {
87-
forward(new Record<>(key, value, recordContext().timestamp(), headers()));
87+
forward(new Record<>(key, value, timestamp(), headers()));
8888
}
8989

9090
/**

streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,8 @@ public void update(final ConsumerRecord<byte[], byte[]> record) {
120120
final Record<Object, Object> toProcess = new Record<>(
121121
deserialized.key(),
122122
deserialized.value(),
123-
processorContext.recordContext().timestamp(),
124-
processorContext.recordContext().headers()
123+
processorContext.timestamp(),
124+
processorContext.headers()
125125
);
126126
((SourceNode<Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(toProcess);
127127
}

streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ public <K, V> void forward(final K key,
190190
final Record<K, V> toForward = new Record<>(
191191
key,
192192
value,
193-
recordContext.timestamp(),
193+
timestamp(),
194194
headers()
195195
);
196196
forward(toForward);
@@ -204,7 +204,7 @@ public <K, V> void forward(final K key,
204204
final Record<K, V> toForward = new Record<>(
205205
key,
206206
value,
207-
toInternal.hasTimestamp() ? toInternal.timestamp() : recordContext.timestamp(),
207+
toInternal.hasTimestamp() ? toInternal.timestamp() : timestamp(),
208208
headers()
209209
);
210210
forward(toForward, toInternal.child());
@@ -250,11 +250,11 @@ public <K, V> void forward(final Record<K, V> record, final String childName) {
250250
// old API processors wouldn't see the timestamps or headers of upstream
251251
// new API processors. But then again, from the perspective of those old-API
252252
// processors, even consulting the timestamp or headers when the record context
253-
// is undefined is itself not well-defined. Plus, I don't think we need to worry
253+
// is undefined is itself not well defined. Plus, I don't think we need to worry
254254
// too much about heterogeneous applications, in which the upstream processor is
255255
// implementing the new API and the downstream one is implementing the old API.
256256
// So, this seems like a fine compromise for now.
257-
if (recordContext != null && (record.timestamp() != recordContext.timestamp() || record.headers() != recordContext.headers())) {
257+
if (recordContext != null && (record.timestamp() != timestamp() || record.headers() != headers())) {
258258
recordContext = new ProcessorRecordContext(
259259
record.timestamp(),
260260
recordContext.offset(),

streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -209,13 +209,13 @@ public void process(final Record<KIn, VIn> record) {
209209
// (instead of `RuntimeException`) to work well with those languages
210210
final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(
211211
null, // only required to pass for DeserializationExceptionHandler
212-
internalProcessorContext.recordContext().topic(),
213-
internalProcessorContext.recordContext().partition(),
214-
internalProcessorContext.recordContext().offset(),
215-
internalProcessorContext.recordContext().headers(),
212+
internalProcessorContext.topic(),
213+
internalProcessorContext.partition(),
214+
internalProcessorContext.offset(),
215+
internalProcessorContext.headers(),
216216
internalProcessorContext.currentNode().name(),
217217
internalProcessorContext.taskId(),
218-
internalProcessorContext.recordContext().timestamp(),
218+
internalProcessorContext.timestamp(),
219219
internalProcessorContext.recordContext().sourceRawKey(),
220220
internalProcessorContext.recordContext().sourceRawValue()
221221
);

streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,9 @@ public void process(final Record<KIn, VIn> record) {
8585
final ProcessorRecordContext contextForExtraction =
8686
new ProcessorRecordContext(
8787
timestamp,
88-
context.recordContext().offset(),
89-
context.recordContext().partition(),
90-
context.recordContext().topic(),
88+
context.offset(),
89+
context.partition(),
90+
context.topic(),
9191
record.headers()
9292
);
9393

streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -866,8 +866,8 @@ private void doProcess(final long wallClockTime) {
866866
final Record<Object, Object> toProcess = new Record<>(
867867
record.key(),
868868
record.value(),
869-
processorContext.recordContext().timestamp(),
870-
processorContext.recordContext().headers()
869+
processorContext.timestamp(),
870+
processorContext.headers()
871871
);
872872
maybeMeasureLatency(() -> currNode.process(toProcess), time, processLatencySensor);
873873

streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -272,12 +272,12 @@ private void putInternal(final Bytes key,
272272
key,
273273
new LRUCacheEntry(
274274
value,
275-
internalContext.recordContext().headers(),
275+
internalContext.headers(),
276276
true,
277-
internalContext.recordContext().offset(),
278-
internalContext.recordContext().timestamp(),
279-
internalContext.recordContext().partition(),
280-
internalContext.recordContext().topic(),
277+
internalContext.offset(),
278+
internalContext.timestamp(),
279+
internalContext.partition(),
280+
internalContext.topic(),
281281
internalContext.recordContext().sourceRawKey(),
282282
internalContext.recordContext().sourceRawValue()
283283
)

streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -135,12 +135,12 @@ public void put(final Windowed<Bytes> key, final byte[] value) {
135135
final LRUCacheEntry entry =
136136
new LRUCacheEntry(
137137
value,
138-
internalContext.recordContext().headers(),
138+
internalContext.headers(),
139139
true,
140-
internalContext.recordContext().offset(),
141-
internalContext.recordContext().timestamp(),
142-
internalContext.recordContext().partition(),
143-
internalContext.recordContext().topic(),
140+
internalContext.offset(),
141+
internalContext.timestamp(),
142+
internalContext.partition(),
143+
internalContext.topic(),
144144
internalContext.recordContext().sourceRawKey(),
145145
internalContext.recordContext().sourceRawValue()
146146
);

streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -153,12 +153,12 @@ public synchronized void put(final Bytes key,
153153
final LRUCacheEntry entry =
154154
new LRUCacheEntry(
155155
value,
156-
internalContext.recordContext().headers(),
156+
internalContext.headers(),
157157
true,
158-
internalContext.recordContext().offset(),
159-
internalContext.recordContext().timestamp(),
160-
internalContext.recordContext().partition(),
161-
internalContext.recordContext().topic(),
158+
internalContext.offset(),
159+
internalContext.timestamp(),
160+
internalContext.partition(),
161+
internalContext.topic(),
162162
internalContext.recordContext().sourceRawKey(),
163163
internalContext.recordContext().sourceRawValue()
164164
);

0 commit comments

Comments
 (0)