Skip to content
This repository was archived by the owner on Jul 1, 2022. It is now read-only.

Commit 74adce4

Browse files
yhparkyurishkuro
authored andcommitted
Fix ThriftSender max span size check (#670)
* Fix ThriftSender max span size check Signed-off-by: Yeonghoon Park <[email protected]> * Refactor getMaxSpanSize() and getMaxBatchBytes() on ThriftSender/ThriftSenderBase Signed-off-by: Yeonghoon Park <[email protected]> * Keep track of span buffer size in ThriftSender Signed-off-by: Yeonghoon Park <[email protected]> * fix reset buffer size on ThriftSender, add test Signed-off-by: Yeonghoon Park <[email protected]>
1 parent 4bde959 commit 74adce4

File tree

4 files changed

+85
-23
lines changed

4 files changed

+85
-23
lines changed

jaeger-thrift/src/main/java/io/jaegertracing/thrift/internal/senders/ThriftSender.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public abstract class ThriftSender extends ThriftSenderBase implements Sender {
2929

3030
private Process process;
3131
private int processBytesSize;
32-
private int byteBufferSize;
32+
private int spanBytesSize;
3333

3434
@ToString.Exclude private List<io.jaegertracing.thriftjava.Span> spanBuffer;
3535

@@ -49,7 +49,6 @@ public int append(JaegerSpan span) throws SenderException {
4949
process = new Process(span.getTracer().getServiceName());
5050
process.setTags(JaegerThriftSpanConverter.buildTags(span.getTracer().tags()));
5151
processBytesSize = calculateProcessSize(process);
52-
byteBufferSize += processBytesSize;
5352
}
5453

5554
io.jaegertracing.thriftjava.Span thriftSpan = JaegerThriftSpanConverter.convertSpan(span);
@@ -59,10 +58,10 @@ public int append(JaegerSpan span) throws SenderException {
5958
spanSize, getMaxSpanBytes()), null, 1);
6059
}
6160

62-
byteBufferSize += spanSize;
63-
if (byteBufferSize <= getMaxSpanBytes()) {
61+
spanBytesSize += spanSize;
62+
if (spanBytesSize <= getMaxSpanBytes()) {
6463
spanBuffer.add(thriftSpan);
65-
if (byteBufferSize < getMaxSpanBytes()) {
64+
if (spanBytesSize < getMaxSpanBytes()) {
6665
return 0;
6766
}
6867
return flush();
@@ -77,7 +76,7 @@ public int append(JaegerSpan span) throws SenderException {
7776
}
7877

7978
spanBuffer.add(thriftSpan);
80-
byteBufferSize = processBytesSize + spanSize;
79+
spanBytesSize = spanSize;
8180
return n;
8281
}
8382

@@ -97,6 +96,10 @@ protected int calculateSpanSize(io.jaegertracing.thriftjava.Span span) throws Se
9796
}
9897
}
9998

99+
protected int getMaxSpanBytes() {
100+
return getMaxBatchBytes() - processBytesSize;
101+
}
102+
100103
public abstract void send(Process process, List<io.jaegertracing.thriftjava.Span> spans) throws SenderException;
101104

102105
@Override
@@ -112,7 +115,7 @@ public int flush() throws SenderException {
112115
throw new SenderException("Failed to flush spans.", e, n);
113116
} finally {
114117
spanBuffer.clear();
115-
byteBufferSize = processBytesSize;
118+
spanBytesSize = 0;
116119
}
117120
return n;
118121
}

jaeger-thrift/src/main/java/io/jaegertracing/thrift/internal/senders/ThriftSenderBase.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public enum ProtocolType {
3737

3838
protected final TProtocolFactory protocolFactory;
3939
private final TSerializer serializer;
40-
private final int maxSpanBytes;
40+
private final int maxBatchBytes;
4141

4242
@ToString.Exclude private AutoExpandingBufferWriteTransport memoryTransport;
4343

@@ -63,13 +63,13 @@ public ThriftSenderBase(ProtocolType protocolType, int maxPacketSize) {
6363
maxPacketSize = ThriftUdpTransport.MAX_PACKET_SIZE;
6464
}
6565

66-
maxSpanBytes = maxPacketSize - EMIT_BATCH_OVERHEAD;
66+
maxBatchBytes = maxPacketSize - EMIT_BATCH_OVERHEAD;
6767
memoryTransport = new AutoExpandingBufferWriteTransport(maxPacketSize, 2);
6868
serializer = new TSerializer(protocolFactory);
6969
}
7070

71-
protected int getMaxSpanBytes() {
72-
return maxSpanBytes;
71+
protected int getMaxBatchBytes() {
72+
return maxBatchBytes;
7373
}
7474

7575
protected byte[] serialize(TBase<?,?> thriftBase) throws Exception {

jaeger-thrift/src/test/java/io/jaegertracing/thrift/internal/senders/ThriftSenderTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,21 @@ public void send(Process process, List<Span> spans) throws SenderException {
4949
sender.calculateSpanSize(null);
5050
}
5151

52+
@Test(expected = SenderException.class)
53+
public void appendFail() throws Exception {
54+
int size = 0;
55+
ThriftSender sender = new ThriftSender(ProtocolType.Compact, 0) {
56+
@Override
57+
public void send(Process process, List<Span> spans) throws SenderException {
58+
throw new SenderException("", null, spans.size());
59+
}
60+
};
61+
62+
JaegerTracer tracer = new JaegerTracer.Builder("failure").build();
63+
sender.append(tracer.buildSpan("flush-fail").start());
64+
sender.flush();
65+
}
66+
5267
@Test(expected = SenderException.class)
5368
public void flushFail() throws Exception {
5469
ThriftSender sender = new ThriftSender(ProtocolType.Compact, 0) {

jaeger-thrift/src/test/java/io/jaegertracing/thrift/internal/senders/UdpSenderTest.java

Lines changed: 56 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -84,21 +84,30 @@ public void tearDown() throws Exception {
8484
reporter.close();
8585
}
8686

87-
@Test(expected = SenderException.class)
88-
public void testAppendSpanTooLarge() throws Exception {
87+
/**
88+
* create a mock span with target size
89+
*/
90+
private JaegerSpan buildSpanWithSize(int targetSpanSize) throws Exception {
8991
JaegerSpan jaegerSpan = tracer.buildSpan("raza").start();
90-
String msg = "";
91-
for (int i = 0; i < 10001; i++) {
92-
msg += ".";
93-
jaegerSpan.log(msg);
94-
}
92+
jaegerSpan.setTag("mock", "");
9593

96-
try {
97-
sender.append(jaegerSpan);
98-
} catch (SenderException e) {
99-
assertEquals(e.getDroppedSpanCount(), 1);
100-
throw e;
94+
// contains mock span tag with empty string value, which is later used to match target span size
95+
io.jaegertracing.thriftjava.Span emptySpan = JaegerThriftSpanConverter.convertSpan(jaegerSpan);
96+
97+
int emptySpanSize = sender.getSize(emptySpan);
98+
int freePacketSizeLeft = targetSpanSize - emptySpanSize;
99+
100+
// each "0" takes 1 byte in UTF-8
101+
String mockStr = String.join("", Collections.nCopies(freePacketSizeLeft, "0"));
102+
jaegerSpan.setTag("mock", mockStr);
103+
104+
// subtract some chars since the actual encoding takes a few extra bytes
105+
while (targetSpanSize != sender.getSize(JaegerThriftSpanConverter.convertSpan(jaegerSpan))) {
106+
mockStr = mockStr.substring(1);
107+
jaegerSpan.setTag("mock", mockStr);
101108
}
109+
110+
return jaegerSpan;
102111
}
103112

104113
@Test
@@ -131,6 +140,41 @@ public void testAppend() throws Exception {
131140
assertEquals(expectedNumSpans, result);
132141
}
133142

143+
@Test
144+
public void testAppendMaxSize() throws Exception {
145+
Process process = new Process(tracer.getServiceName())
146+
.setTags(JaegerThriftSpanConverter.buildTags(tracer.tags()));
147+
148+
int processSize = sender.getSize(process);
149+
150+
JaegerSpan jaegerSpan = buildSpanWithSize(maxPacketSize - UdpSender.EMIT_BATCH_OVERHEAD - processSize);
151+
152+
int result = sender.append(jaegerSpan);
153+
154+
assertEquals(1, result);
155+
156+
// test if the buffer is reinitialized correctly
157+
result = sender.append(jaegerSpan);
158+
159+
assertEquals(1, result);
160+
}
161+
162+
@Test(expected = SenderException.class)
163+
public void testAppendSpanTooLarge() throws Exception {
164+
Process process = new Process(tracer.getServiceName())
165+
.setTags(JaegerThriftSpanConverter.buildTags(tracer.tags()));
166+
int processSize = sender.getSize(process);
167+
168+
JaegerSpan jaegerSpan = buildSpanWithSize(maxPacketSize - UdpSender.EMIT_BATCH_OVERHEAD - processSize + 1);
169+
170+
try {
171+
sender.append(jaegerSpan);
172+
} catch (SenderException e) {
173+
assertEquals(e.getDroppedSpanCount(), 1);
174+
throw e;
175+
}
176+
}
177+
134178
@Test
135179
public void testFlushSendsSpan() throws Exception {
136180
int timeout = 50; // in milliseconds

0 commit comments

Comments
 (0)