Skip to content

Commit da0c207

Browse files
authored
KAFKA-20134: Implement TimestampedWindowStoreWithHeaders (1/N) (#21465)
This PR adds `TimestampedSegmentWithHeaders`, `TimestampedSegmentsWithHeaders` and the corresponding unit tests for the `TimestampedWindowStoreWithHeaders` introduced in KIP-1271. Reviewers: Alieh Saeedi <asaeedi@confluent.io>, Matthias J. Sax <matthias@confluent.io>
1 parent a45d36c commit da0c207

File tree

6 files changed

+571
-0
lines changed

6 files changed

+571
-0
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.streams.state.internals;
18+
19+
/**
20+
* A RocksDB-backed segmented bytes store with timestamp and headers support.
21+
* <p>
22+
* This store uses {@link TimestampedSegmentsWithHeaders} to manage segments,
23+
* where each segment is a {@link TimestampedSegmentWithHeaders} that extends
24+
* {@link RocksDBTimestampedStoreWithHeaders}. This provides automatic dual-CF
25+
* migration support from timestamp-only format to timestamp+headers format.
26+
*/
27+
public class RocksDBTimestampedSegmentedBytesStoreWithHeaders extends AbstractRocksDBSegmentedBytesStore<TimestampedSegmentWithHeaders> {
28+
29+
RocksDBTimestampedSegmentedBytesStoreWithHeaders(final String name,
30+
final String metricsScope,
31+
final long retention,
32+
final long segmentInterval,
33+
final KeySchema keySchema) {
34+
super(name, retention, keySchema, new TimestampedSegmentsWithHeaders(name, metricsScope, retention, segmentInterval));
35+
}
36+
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.streams.state.internals;
18+
19+
import org.apache.kafka.common.utils.Bytes;
20+
import org.apache.kafka.common.utils.Utils;
21+
import org.apache.kafka.streams.query.Position;
22+
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
23+
24+
import java.io.File;
25+
import java.io.IOException;
26+
import java.util.Map;
27+
import java.util.Objects;
28+
29+
/**
30+
* A segment that stores timestamped key-value pairs with headers.
31+
* <p>
32+
* This segment extends {@link RocksDBTimestampedStoreWithHeaders} to provide
33+
* header-aware storage with dual-column-family migration support from
34+
* timestamp-only format to timestamp+headers format.
35+
*/
36+
class TimestampedSegmentWithHeaders extends RocksDBTimestampedStoreWithHeaders
37+
implements Comparable<TimestampedSegmentWithHeaders>, Segment {
38+
39+
public final long id;
40+
41+
TimestampedSegmentWithHeaders(final String segmentName,
42+
final String windowName,
43+
final long id,
44+
final Position position,
45+
final RocksDBMetricsRecorder metricsRecorder) {
46+
super(segmentName, windowName, metricsRecorder);
47+
this.id = id;
48+
this.position = position;
49+
}
50+
51+
@Override
52+
public void destroy() throws IOException {
53+
Utils.delete(dbDir);
54+
}
55+
56+
@Override
57+
public void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
58+
throw new UnsupportedOperationException();
59+
}
60+
61+
@Override
62+
public int compareTo(final TimestampedSegmentWithHeaders segment) {
63+
return Long.compare(id, segment.id);
64+
}
65+
66+
@Override
67+
public void openDB(final Map<String, Object> configs, final File stateDir) {
68+
super.openDB(configs, stateDir);
69+
// skip the registering step
70+
}
71+
72+
@Override
73+
public String toString() {
74+
return "TimestampedSegmentWithHeaders(id=" + id + ", name=" + name() + ")";
75+
}
76+
77+
@Override
78+
public boolean equals(final Object obj) {
79+
if (obj == null || getClass() != obj.getClass()) {
80+
return false;
81+
}
82+
final TimestampedSegmentWithHeaders segment = (TimestampedSegmentWithHeaders) obj;
83+
return id == segment.id;
84+
}
85+
86+
@Override
87+
public int hashCode() {
88+
return Objects.hash(id);
89+
}
90+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.streams.state.internals;
18+
19+
import org.apache.kafka.streams.processor.StateStoreContext;
20+
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
21+
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
22+
23+
/**
24+
* Manages the {@link TimestampedSegmentWithHeaders}s that are used by the {@link RocksDBTimestampedSegmentedBytesStoreWithHeaders}.
25+
*/
26+
class TimestampedSegmentsWithHeaders extends AbstractSegments<TimestampedSegmentWithHeaders> {
27+
28+
private final RocksDBMetricsRecorder metricsRecorder;
29+
30+
TimestampedSegmentsWithHeaders(final String name,
31+
final String metricsScope,
32+
final long retentionPeriod,
33+
final long segmentInterval) {
34+
super(name, retentionPeriod, segmentInterval);
35+
metricsRecorder = new RocksDBMetricsRecorder(metricsScope, name);
36+
}
37+
38+
@Override
39+
public TimestampedSegmentWithHeaders getOrCreateSegment(final long segmentId,
40+
final StateStoreContext context) {
41+
if (segments.containsKey(segmentId)) {
42+
return segments.get(segmentId);
43+
} else {
44+
final TimestampedSegmentWithHeaders newSegment =
45+
new TimestampedSegmentWithHeaders(segmentName(segmentId), name, segmentId, position, metricsRecorder);
46+
47+
if (segments.put(segmentId, newSegment) != null) {
48+
throw new IllegalStateException("TimestampedSegmentWithHeaders already exists. Possible concurrent access.");
49+
}
50+
51+
newSegment.openDB(context.appConfigs(), context.stateDir());
52+
return newSegment;
53+
}
54+
}
55+
56+
@Override
57+
public TimestampedSegmentWithHeaders getOrCreateSegmentIfLive(final long segmentId,
58+
final StateStoreContext context,
59+
final long streamTime) {
60+
final TimestampedSegmentWithHeaders segment = super.getOrCreateSegmentIfLive(segmentId, context, streamTime);
61+
cleanupExpiredSegments(streamTime);
62+
return segment;
63+
}
64+
65+
@Override
66+
public void openExisting(final StateStoreContext context, final long streamTime) {
67+
metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId());
68+
super.openExisting(context, streamTime);
69+
}
70+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.streams.state.internals;
18+
19+
public class RocksDBTimestampedSegmentedBytesStoreWithHeadersTest
20+
extends AbstractRocksDBSegmentedBytesStoreTest<TimestampedSegmentWithHeaders> {
21+
22+
private static final String METRICS_SCOPE = "metrics-scope";
23+
24+
RocksDBTimestampedSegmentedBytesStoreWithHeaders getBytesStore() {
25+
return new RocksDBTimestampedSegmentedBytesStoreWithHeaders(
26+
storeName,
27+
METRICS_SCOPE,
28+
retention,
29+
segmentInterval,
30+
schema
31+
);
32+
}
33+
34+
@Override
35+
TimestampedSegmentsWithHeaders newSegments() {
36+
return new TimestampedSegmentsWithHeaders(storeName, METRICS_SCOPE, retention, segmentInterval);
37+
}
38+
}
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.streams.state.internals;
18+
19+
import org.apache.kafka.common.metrics.Metrics;
20+
import org.apache.kafka.common.utils.MockTime;
21+
import org.apache.kafka.streams.processor.TaskId;
22+
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
23+
import org.apache.kafka.streams.query.Position;
24+
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
25+
import org.apache.kafka.test.TestUtils;
26+
27+
import org.junit.jupiter.api.BeforeEach;
28+
import org.junit.jupiter.api.Test;
29+
import org.junit.jupiter.api.extension.ExtendWith;
30+
import org.mockito.junit.jupiter.MockitoExtension;
31+
import org.mockito.junit.jupiter.MockitoSettings;
32+
import org.mockito.quality.Strictness;
33+
34+
import java.io.File;
35+
import java.util.HashSet;
36+
import java.util.Set;
37+
38+
import static org.apache.kafka.common.utils.Utils.mkEntry;
39+
import static org.apache.kafka.common.utils.Utils.mkMap;
40+
import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
41+
import static org.junit.jupiter.api.Assertions.assertEquals;
42+
import static org.junit.jupiter.api.Assertions.assertFalse;
43+
import static org.junit.jupiter.api.Assertions.assertNotEquals;
44+
import static org.junit.jupiter.api.Assertions.assertTrue;
45+
46+
@ExtendWith(MockitoExtension.class)
47+
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
48+
public class TimestampedSegmentWithHeadersTest {
49+
50+
private final RocksDBMetricsRecorder metricsRecorder =
51+
new RocksDBMetricsRecorder("metrics-scope", "store-name");
52+
53+
@BeforeEach
54+
public void setUp() {
55+
metricsRecorder.init(
56+
new StreamsMetricsImpl(new Metrics(), "test-client", new MockTime()),
57+
new TaskId(0, 0)
58+
);
59+
}
60+
61+
@Test
62+
public void shouldDeleteStateDirectoryOnDestroy() throws Exception {
63+
final TimestampedSegmentWithHeaders segment =
64+
new TimestampedSegmentWithHeaders("segment", "window", 0L, Position.emptyPosition(), metricsRecorder);
65+
final String directoryPath = TestUtils.tempDirectory().getAbsolutePath();
66+
final File directory = new File(directoryPath);
67+
68+
segment.openDB(mkMap(mkEntry(METRICS_RECORDING_LEVEL_CONFIG, "INFO")), directory);
69+
70+
assertTrue(new File(directoryPath, "window").exists());
71+
assertTrue(new File(directoryPath + File.separator + "window", "segment").exists());
72+
assertTrue(new File(directoryPath + File.separator + "window", "segment").list().length > 0);
73+
segment.destroy();
74+
assertFalse(new File(directoryPath + File.separator + "window", "segment").exists());
75+
assertTrue(new File(directoryPath, "window").exists());
76+
77+
segment.close();
78+
}
79+
80+
@Test
81+
public void shouldBeEqualIfIdIsEqual() {
82+
final TimestampedSegmentWithHeaders segment =
83+
new TimestampedSegmentWithHeaders("anyName", "anyName", 0L, Position.emptyPosition(), metricsRecorder);
84+
final TimestampedSegmentWithHeaders segmentSameId =
85+
new TimestampedSegmentWithHeaders("someOtherName", "someOtherName", 0L, Position.emptyPosition(), metricsRecorder);
86+
final TimestampedSegmentWithHeaders segmentDifferentId =
87+
new TimestampedSegmentWithHeaders("anyName", "anyName", 1L, Position.emptyPosition(), metricsRecorder);
88+
89+
assertEquals(segment, segment);
90+
assertEquals(segment, segmentSameId);
91+
assertNotEquals(segment, segmentDifferentId);
92+
assertNotEquals(segment, null);
93+
assertNotEquals(segment, "anyName");
94+
95+
segment.close();
96+
segmentSameId.close();
97+
segmentDifferentId.close();
98+
}
99+
100+
@Test
101+
public void shouldHashOnSegmentIdOnly() {
102+
final TimestampedSegmentWithHeaders segment =
103+
new TimestampedSegmentWithHeaders("anyName", "anyName", 0L, Position.emptyPosition(), metricsRecorder);
104+
final TimestampedSegmentWithHeaders segmentSameId =
105+
new TimestampedSegmentWithHeaders("someOtherName", "someOtherName", 0L, Position.emptyPosition(), metricsRecorder);
106+
final TimestampedSegmentWithHeaders segmentDifferentId =
107+
new TimestampedSegmentWithHeaders("anyName", "anyName", 1L, Position.emptyPosition(), metricsRecorder);
108+
109+
final Set<TimestampedSegmentWithHeaders> set = new HashSet<>();
110+
assertTrue(set.add(segment));
111+
assertFalse(set.add(segmentSameId));
112+
assertTrue(set.add(segmentDifferentId));
113+
114+
segment.close();
115+
segmentSameId.close();
116+
segmentDifferentId.close();
117+
}
118+
119+
@Test
120+
public void shouldCompareSegmentIdOnly() {
121+
final TimestampedSegmentWithHeaders segment1 =
122+
new TimestampedSegmentWithHeaders("a", "C", 50L, Position.emptyPosition(), metricsRecorder);
123+
final TimestampedSegmentWithHeaders segment2 =
124+
new TimestampedSegmentWithHeaders("b", "B", 100L, Position.emptyPosition(), metricsRecorder);
125+
final TimestampedSegmentWithHeaders segment3 =
126+
new TimestampedSegmentWithHeaders("c", "A", 0L, Position.emptyPosition(), metricsRecorder);
127+
128+
assertEquals(0, segment1.compareTo(segment1));
129+
assertEquals(-1, segment1.compareTo(segment2));
130+
assertEquals(1, segment2.compareTo(segment1));
131+
assertEquals(1, segment1.compareTo(segment3));
132+
assertEquals(-1, segment3.compareTo(segment1));
133+
assertEquals(1, segment2.compareTo(segment3));
134+
assertEquals(-1, segment3.compareTo(segment2));
135+
136+
segment1.close();
137+
segment2.close();
138+
segment3.close();
139+
}
140+
}

0 commit comments

Comments
 (0)