Skip to content

Commit ba07878

Browse files
authored
pq: activate stringref extension for more-compact PQ representation (#17849)
* pq: activate stringref extension for more-compact PQ representation When serializing a non-primitive value, CBOR encodes a two-element tuple containing the class name and the class-specific serialized value, which results in a significant amount of overhead in the form of frequently- repeated strings. Jackson CBOR supports the stringref extension, which allows it to avoid repeating the actual bytes of a string, and instead keeps track of the strings it has encountered and _referencing_ those strings by the index in which they occur. For example, the first `org.jruby.RubyString` looks like: ~~~ 74 # text(20) 6f72672e6a727562792e52756279537472696e67 # "org.jruby.RubyString" ~~~ While each subsequent string looks like: ~~~ d8 19 # tag(25) 05 # unsigned(5) ~~~ Enabling this extension allows us to save: - ~18 bytes from each `org.jruby.RubyString` - ~23 bytes from each `org.logstash.ConvertedMap` - ~24 bytes from each `org.logstash.ConvertedList` - ...etc. The CBOR implementation in Jackson _appears_ to support reading stringrefs regardless of whether this feature is enabled for serializing, which means that this change is not a rollback-barrier. * tests: adjust event sizes for DLQ boundaries with CBOR stringref extension With the CBOR stringref extension enabled, we add a 3-byte overhead to each event to activate the extension, and eliminate 24 bytes of overhead for each event's secondary instances of `org.logstash.ConvertedMap`. Since the events under test have exactly two instances of `org.logstash.ConvertedMap`, this is a net reduction of 21 bytes of overhead. This changes the specifically-constructed events to have the intended lengths to test their specific edge-cases. * compat: add test for deserializing stringref enabled and disabled
1 parent ef97a87 commit ba07878

File tree

7 files changed

+234
-11
lines changed

7 files changed

+234
-11
lines changed

logstash-core/src/main/java/org/logstash/ObjectMappers.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ public static StreamReadConstraintsUtil getConfiguredStreamReadConstraints() {
116116

117117
public static final ObjectMapper CBOR_MAPPER = new ObjectMapper(
118118
new CBORFactory().configure(CBORGenerator.Feature.WRITE_MINIMAL_INTS, false)
119+
.configure(CBORGenerator.Feature.STRINGREF, true)
119120
).registerModules(RUBY_SERIALIZERS, CBOR_DESERIALIZERS)
120121
.activateDefaultTyping(TYPE_VALIDATOR, ObjectMapper.DefaultTyping.NON_FINAL);
121122

logstash-core/src/test/java/org/logstash/EventTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,16 @@
2727
import org.junit.Test;
2828

2929
import java.io.IOException;
30+
import java.io.InputStream;
3031
import java.math.BigDecimal;
3132
import java.math.BigInteger;
33+
import java.nio.charset.StandardCharsets;
3234
import java.util.ArrayList;
3335
import java.util.Arrays;
3436
import java.util.Collection;
3537
import java.util.Collections;
3638
import java.util.HashMap;
39+
import java.util.HexFormat;
3740
import java.util.List;
3841
import java.util.Map;
3942

@@ -42,6 +45,7 @@
4245
import static org.hamcrest.MatcherAssert.assertThat;
4346
import static org.junit.Assert.assertEquals;
4447
import static org.junit.Assert.assertFalse;
48+
import static org.junit.Assert.assertNotNull;
4549
import static org.junit.Assert.assertNull;
4650
import static org.junit.Assert.assertTrue;
4751

@@ -143,6 +147,24 @@ public void bigNumsBinaryRoundtrip() throws Exception {
143147
assertEquals(bd, deserialized.getField("bd"));
144148
}
145149

150+
@Test
151+
public void deserializeStringrefExtensionEnabled() throws Exception {
152+
byte[] stringrefCBOR = loadAnnotatedCBORFixture("stringref-enabled.annotated-cbor.txt");
153+
Event event = Event.deserialize(stringrefCBOR);
154+
event.getField("[event][original]");
155+
assertEquals("stringref", event.getField("test"));
156+
assertEquals(true, event.getField("[extension][enabled]"));
157+
}
158+
159+
@Test
160+
public void deserializeStringrefExtensionDisabled() throws Exception {
161+
byte[] stringrefCBOR = loadAnnotatedCBORFixture("stringref-disabled.annotated-cbor.txt");
162+
Event event = Event.deserialize(stringrefCBOR);
163+
event.getField("[event][original]");
164+
assertEquals("stringref", event.getField("test"));
165+
assertEquals(true, event.getField("[extension][enabled]"));
166+
}
167+
146168
@Test
147169
public void testBareToJson() throws Exception {
148170
Event e = new Event();
@@ -565,4 +587,19 @@ public void allowTopLevelTagsListOfStrings() {
565587
assertNull(event.getField(Event.TAGS_FAILURE));
566588
assertEquals(event.getField("[tags]"), List.of("foo", "bar"));
567589
}
590+
591+
static byte[] loadAnnotatedCBORFixture(String name) throws IOException {
592+
try (InputStream resourceAsStream = EventTest.class.getResourceAsStream(name)) {
593+
assertNotNull(resourceAsStream);
594+
595+
String annotated = new String(resourceAsStream.readAllBytes(), StandardCharsets.UTF_8);
596+
// annotated CBOR: strip #-initiated line comments, then strip whitespace to get hex
597+
String hexBytes = annotated.replaceAll("#.*(\\n|$)", "").replaceAll("\\s", "");
598+
599+
// result should be even number of hex digits
600+
assert hexBytes.matches("(?i:[0-9a-f]{2})*");
601+
602+
return HexFormat.of().parseHex(hexBytes);
603+
}
604+
}
568605
}

logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public class DeadLetterQueueReaderTest {
7272
private Path dir;
7373
private int defaultDlqSize = 100_000_000; // 100mb
7474

75-
private static final int PAD_FOR_BLOCK_SIZE_EVENT = 32490;
75+
private static final int PAD_FOR_BLOCK_SIZE_EVENT = 32511;
7676

7777
@Rule
7878
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -240,7 +240,7 @@ private void validateEntries(Path firstLog, int startEntry, int endEntry, int st
240240
// This test tests for a single event that ends on a block boundary
241241
@Test
242242
public void testBlockBoundary() throws Exception {
243-
final int PAD_FOR_BLOCK_SIZE_EVENT = 32490;
243+
final int PAD_FOR_BLOCK_SIZE_EVENT = 32511;
244244
Event event = createEventWithConstantSerializationOverhead();
245245
char[] field = new char[PAD_FOR_BLOCK_SIZE_EVENT];
246246
Arrays.fill(field, 'e');
@@ -267,7 +267,7 @@ public void testBlockBoundary() throws Exception {
267267
@Test
268268
public void testBlockBoundaryMultiple() throws Exception {
269269
Event event = createEventWithConstantSerializationOverhead();
270-
char[] field = new char[7929];
270+
char[] field = new char[7950];
271271
Arrays.fill(field, 'x');
272272
event.setField("message", new String(field));
273273
long startTime = System.currentTimeMillis();
@@ -935,7 +935,7 @@ private int prepareFilledSegmentFiles(int segments) throws IOException {
935935

936936
private int prepareFilledSegmentFiles(int segments, long start) throws IOException {
937937
final Event event = createEventWithConstantSerializationOverhead(Collections.emptyMap());
938-
event.setField("message", generateMessageContent(32479));
938+
event.setField("message", generateMessageContent(32500));
939939

940940
DLQEntry entry = new DLQEntry(event, "", "", String.format("%05d", 1), constantSerializationLengthTimestamp(start));
941941
assertEquals("Serialized dlq entry + header MUST be 32Kb (size of a block)", BLOCK_SIZE, entry.serialize().length + 13);

logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterAgeRetentionTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public void setUp() throws Exception {
105105
@Test
106106
public void testRemovesOlderSegmentsWhenWriteOnReopenedDLQContainingExpiredSegments() throws IOException {
107107
final Event event = DeadLetterQueueReaderTest.createEventWithConstantSerializationOverhead(Collections.emptyMap());
108-
event.setField("message", DeadLetterQueueReaderTest.generateMessageContent(32479));
108+
event.setField("message", DeadLetterQueueReaderTest.generateMessageContent(32500));
109109

110110
final Clock pointInTimeFixedClock = Clock.fixed(Instant.parse("2022-02-22T10:20:30.00Z"), ZoneId.of("Europe/Rome"));
111111
final ForwardableClock fakeClock = new ForwardableClock(pointInTimeFixedClock);
@@ -160,7 +160,7 @@ private void prepareDLQWithFirstSegmentOlderThanRetainPeriod(Event event, Forwar
160160
@Test
161161
public void testRemovesOlderSegmentsWhenWritesIntoDLQContainingExpiredSegments() throws IOException {
162162
final Event event = DeadLetterQueueReaderTest.createEventWithConstantSerializationOverhead(Collections.emptyMap());
163-
event.setField("message", DeadLetterQueueReaderTest.generateMessageContent(32479));
163+
event.setField("message", DeadLetterQueueReaderTest.generateMessageContent(32500));
164164

165165
long startTime = fakeClock.instant().toEpochMilli();
166166
int messageSize = 0;
@@ -203,7 +203,7 @@ public void testRemovesOlderSegmentsWhenWritesIntoDLQContainingExpiredSegments()
203203
@Test
204204
public void testRemoveMultipleOldestSegmentsWhenRetainedAgeIsExceeded() throws IOException {
205205
final Event event = DeadLetterQueueReaderTest.createEventWithConstantSerializationOverhead(Collections.emptyMap());
206-
event.setField("message", DeadLetterQueueReaderTest.generateMessageContent(32479));
206+
event.setField("message", DeadLetterQueueReaderTest.generateMessageContent(32500));
207207

208208
long startTime = fakeClock.instant().toEpochMilli();
209209
int messageSize = 0;

logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueWriterTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ static String generateASCIIMessageContent(int size, byte fillChar) {
251251
@Test
252252
public void testRemoveOldestSegmentWhenRetainedSizeIsExceededAndDropOlderModeIsEnabled() throws IOException {
253253
Event event = DeadLetterQueueReaderTest.createEventWithConstantSerializationOverhead(Collections.emptyMap());
254-
event.setField("message", DeadLetterQueueReaderTest.generateMessageContent(32479));
254+
event.setField("message", DeadLetterQueueReaderTest.generateMessageContent(32500));
255255
long startTime = System.currentTimeMillis();
256256

257257
int messageSize = 0;
@@ -488,7 +488,7 @@ public void testReadTimestampOfLastEventInSegmentWithDeletedSegment() throws IOE
488488
@Test
489489
public void testDropEventCountCorrectlyNotEnqueuedEvents() throws IOException, InterruptedException {
490490
Event blockAlmostFullEvent = DeadLetterQueueReaderTest.createEventWithConstantSerializationOverhead(Collections.emptyMap());
491-
int serializationHeader = 286;
491+
int serializationHeader = 265;
492492
int notEnoughHeaderSpace = 5;
493493
blockAlmostFullEvent.setField("message", DeadLetterQueueReaderTest.generateMessageContent(BLOCK_SIZE - serializationHeader - RECORD_HEADER_SIZE + notEnoughHeaderSpace));
494494

@@ -501,7 +501,7 @@ public void testDropEventCountCorrectlyNotEnqueuedEvents() throws IOException, I
501501
// enqueue a record with size smaller than BLOCK_SIZE
502502
DLQEntry entry = new DLQEntry(blockAlmostFullEvent, "", "", "00001", DeadLetterQueueReaderTest.constantSerializationLengthTimestamp(System.currentTimeMillis()));
503503
assertEquals("Serialized plus header must not leave enough space for another record header ",
504-
entry.serialize().length, BLOCK_SIZE - RECORD_HEADER_SIZE - notEnoughHeaderSpace);
504+
BLOCK_SIZE - RECORD_HEADER_SIZE - notEnoughHeaderSpace, entry.serialize().length);
505505
writeManager.writeEntry(entry);
506506

507507
// enqueue a record bigger than BLOCK_SIZE
@@ -512,7 +512,7 @@ public void testDropEventCountCorrectlyNotEnqueuedEvents() throws IOException, I
512512

513513
// fill the queue to push out the segment with the 2 previous events
514514
Event event = DeadLetterQueueReaderTest.createEventWithConstantSerializationOverhead(Collections.emptyMap());
515-
event.setField("message", DeadLetterQueueReaderTest.generateMessageContent(32479));
515+
event.setField("message", DeadLetterQueueReaderTest.generateMessageContent(32500));
516516
try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter
517517
.newBuilder(dir, 10 * MB, 20 * MB, Duration.ofSeconds(1))
518518
.storageType(QueueStorageType.DROP_NEWER)
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
9f # array(*)
2+
71 # text(17)
3+
6a6176612e7574696c2e486173684d6170 # "java.util.HashMap"
4+
bf # map(*)
5+
64 # text(4)
6+
44415441 # "DATA"
7+
9f # array(*)
8+
78 19 # text(25)
9+
6f72672e6c6f6773746173682e436f6e # "org.logstash.Con"
10+
7665727465644d6170 # "vertedMap"
11+
bf # map(*)
12+
64 # text(4)
13+
686f7374 # "host"
14+
9f # array(*)
15+
78 19 # text(25)
16+
6f72672e6c6f6773746173682e436f6e # "org.logstash.Con"
17+
7665727465644d6170 # "vertedMap"
18+
bf # map(*)
19+
68 # text(8)
20+
686f73746e616d65 # "hostname"
21+
9f # array(*)
22+
74 # text(20)
23+
6f72672e6a727562792e52756279537472696e67 # "org.jruby.RubyString"
24+
67 # text(7)
25+
70657268617073 # "perhaps"
26+
ff # break
27+
ff # break
28+
ff # break
29+
65 # text(5)
30+
6576656e74 # "event"
31+
9f # array(*)
32+
78 19 # text(25)
33+
6f72672e6c6f6773746173682e436f6e # "org.logstash.Con"
34+
7665727465644d6170 # "vertedMap"
35+
bf # map(*)
36+
68 # text(8)
37+
6f726967696e616c # "original"
38+
9f # array(*)
39+
74 # text(20)
40+
6f72672e6a727562792e52756279537472696e67 # "org.jruby.RubyString"
41+
78 32 # text(50)
42+
7b2274657374223a22737472696e6772 # "{\"test\":\"stringr"
43+
6566222c22657874656e73696f6e223a # "ef\",\"extension\":"
44+
7b22656e61626c6564223a747275657d # "{\"enabled\":true}"
45+
7d0a # "}\n"
46+
ff # break
47+
ff # break
48+
ff # break
49+
68 # text(8)
50+
4076657273696f6e # "@version"
51+
61 # text(1)
52+
31 # "1"
53+
69 # text(9)
54+
657874656e73696f6e # "extension"
55+
9f # array(*)
56+
78 19 # text(25)
57+
6f72672e6c6f6773746173682e436f6e # "org.logstash.Con"
58+
7665727465644d6170 # "vertedMap"
59+
bf # map(*)
60+
67 # text(7)
61+
656e61626c6564 # "enabled"
62+
f5 # true, simple(21)
63+
ff # break
64+
ff # break
65+
6a # text(10)
66+
4074696d657374616d70 # "@timestamp"
67+
9f # array(*)
68+
76 # text(22)
69+
6f72672e6c6f6773746173682e54696d657374616d70 # "org.logstash.Timestamp"
70+
78 1b # text(27)
71+
323032352d30372d32385431363a3432 # "2025-07-28T16:42"
72+
3a32342e3432313434365a # ":24.421446Z"
73+
ff # break
74+
64 # text(4)
75+
74657374 # "test"
76+
9f # array(*)
77+
74 # text(20)
78+
6f72672e6a727562792e52756279537472696e67 # "org.jruby.RubyString"
79+
69 # text(9)
80+
737472696e67726566 # "stringref"
81+
ff # break
82+
ff # break
83+
ff # break
84+
64 # text(4)
85+
4d455441 # "META"
86+
9f # array(*)
87+
78 19 # text(25)
88+
6f72672e6c6f6773746173682e436f6e # "org.logstash.Con"
89+
7665727465644d6170 # "vertedMap"
90+
bf # map(*)
91+
ff # break
92+
ff # break
93+
ff # break
94+
ff # break

0 commit comments

Comments
 (0)