Skip to content

Commit 549d99f

Browse files
Stefán J. Sigurðarsonlukebakken
authored andcommitted
Using recycled memory buffers when serializing frames.
1 parent 4069854 commit 549d99f

19 files changed

+734
-213
lines changed

projects/client/Apigen/src/apigen/Apigen.cs

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,7 @@ public void EmitPrelude()
452452
using RabbitMQ.Client;
453453
using RabbitMQ.Client.Exceptions;
454454
using RabbitMQ.Client.Framing.Impl;
455+
using RabbitMQ.Client.Impl;
455456
";
456457
EmitLine(prelude);
457458
}
@@ -682,6 +683,48 @@ public void EmitClassProperties(AmqpClass c)
682683
}
683684
EmitLine(" }");
684685
EmitLine("");
686+
EmitLine(" public override int GetRequiredPayloadBufferSize()");
687+
EmitLine(" {");
688+
EmitLine(" int bufferSize = 0;");
689+
EmitLine(" int fieldCount = 0;");
690+
foreach (AmqpField f in c.m_Fields)
691+
{
692+
switch (MapDomain(f.Domain))
693+
{
694+
case "byte":
695+
EmitLine($" if (_{MangleMethod(f.Name)}_present) {{ fieldCount++; bufferSize++; }} // _{MangleMethod(f.Name)} in bytes");
696+
break;
697+
case "string":
698+
EmitLine($" if (_{MangleMethod(f.Name)}_present) {{ fieldCount++; bufferSize += 1 + Encoding.UTF8.GetByteCount(_{MangleMethod(f.Name)}); }} // _{MangleMethod(f.Name)} in bytes");
699+
break;
700+
case "byte[]":
701+
EmitLine($" if (_{MangleMethod(f.Name)}_present) {{ fieldCount++; bufferSize += 4 + _{MangleMethod(f.Name)}.Length; }} // _{MangleMethod(f.Name)} in bytes");
702+
break;
703+
case "ushort":
704+
EmitLine($" if (_{MangleMethod(f.Name)}_present) {{ fieldCount++; bufferSize += 2; }} // _{MangleMethod(f.Name)} in bytes");
705+
break;
706+
case "uint":
707+
EmitLine($" if (_{MangleMethod(f.Name)}_present) {{ fieldCount++; bufferSize += 4; }} // _{MangleMethod(f.Name)} in bytes");
708+
break;
709+
case "ulong":
710+
case "AmqpTimestamp":
711+
EmitLine($" if (_{MangleMethod(f.Name)}_present) {{ fieldCount++; bufferSize += 8; }} // _{MangleMethod(f.Name)} in bytes");
712+
break;
713+
case "bool":
714+
// TODO: implement if used, not used anywhere yet
715+
break;
716+
case "IDictionary<string, object>":
717+
EmitLine($" if (_{MangleMethod(f.Name)}_present) {{ fieldCount++; bufferSize += WireFormatting.GetTableByteCount(_{MangleMethod(f.Name)}); }} // _{MangleMethod(f.Name)} in bytes");
718+
break;
719+
default:
720+
throw new ArgumentOutOfRangeException($"Can't handle size calculations for type = {f.Domain};");
721+
}
722+
}
723+
724+
EmitLine($" bufferSize += Math.Max((int)Math.Ceiling(fieldCount / 15.0), 1) * 2; // number of presence fields in bytes");
725+
EmitLine(" return bufferSize;");
726+
EmitLine(" }");
727+
EmitLine("");
685728
EmitLine(" public override void AppendPropertyDebugStringTo(StringBuilder sb)");
686729
EmitLine(" {");
687730
EmitLine(" sb.Append(\"(\");");
@@ -814,6 +857,51 @@ public void EmitClassMethodImplementations(AmqpClass c)
814857
}
815858
EmitLine(" }");
816859
EmitLine("");
860+
EmitLine(" public override int GetRequiredBufferSize()");
861+
EmitLine(" {");
862+
EmitLine(" int bufferSize = 0;");
863+
int bitCount = 0;
864+
foreach (AmqpField f in m.m_Fields)
865+
{
866+
switch (MapDomain(f.Domain))
867+
{
868+
case "byte":
869+
EmitLine($" bufferSize++; // _{MangleMethod(f.Name)} in bytes");
870+
break;
871+
case "string":
872+
EmitLine($" bufferSize += 1 + Encoding.UTF8.GetByteCount(_{MangleMethod(f.Name)}); // _{MangleMethod(f.Name)} in bytes");
873+
break;
874+
case "byte[]":
875+
EmitLine($" bufferSize += 4 + _{MangleMethod(f.Name)}.Length; // _{MangleMethod(f.Name)} in bytes");
876+
break;
877+
case "ushort":
878+
EmitLine($" bufferSize += 2; // _{MangleMethod(f.Name)} in bytes");
879+
break;
880+
case "uint":
881+
EmitLine($" bufferSize += 4; // _{MangleMethod(f.Name)} in bytes");
882+
break;
883+
case "ulong":
884+
case "AmqpTimestamp":
885+
EmitLine($" bufferSize += 8; // _{MangleMethod(f.Name)} in bytes");
886+
break;
887+
case "bool":
888+
bitCount++;
889+
break;
890+
case "IDictionary<string, object>":
891+
EmitLine($" bufferSize += WireFormatting.GetTableByteCount(_{MangleMethod(f.Name)}); // _{MangleMethod(f.Name)} in bytes");
892+
break;
893+
default:
894+
throw new ArgumentOutOfRangeException($"Can't handle size calculations for type = {f.Domain};");
895+
}
896+
}
897+
898+
if (bitCount > 0)
899+
{
900+
EmitLine($" bufferSize += {Math.Ceiling(bitCount / 8.0)}; // number of bit fields in bytes");
901+
}
902+
EmitLine(" return bufferSize;");
903+
EmitLine(" }");
904+
EmitLine("");
817905
EmitLine(" public override void AppendArgumentDebugStringTo(StringBuilder sb)");
818906
EmitLine(" {");
819907
EmitLine(" sb.Append(\"(\");");

projects/client/RabbitMQ.Client/src/client/content/BytesWireFormatting.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ public static void WriteSingle(NetworkBinaryWriter writer, float value)
150150

151151
public static void WriteString(NetworkBinaryWriter writer, string value)
152152
{
153-
int maxLength = Encoding.UTF8.GetMaxByteCount(value.Length);
153+
int maxLength = Encoding.UTF8.GetByteCount(value);
154154
byte[] bytes = ArrayPool<byte>.Shared.Rent(maxLength);
155155
try
156156
{

projects/client/RabbitMQ.Client/src/client/content/StreamWireFormatting.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ public static void WriteString(NetworkBinaryWriter writer, string value)
435435

436436
public static void WriteUntypedString(NetworkBinaryWriter writer, string value)
437437
{
438-
int maxLength = Encoding.UTF8.GetMaxByteCount(value.Length);
438+
int maxLength = Encoding.UTF8.GetByteCount(value);
439439
byte[] bytes = ArrayPool<byte>.Shared.Rent(maxLength + 1);
440440
try
441441
{

projects/client/RabbitMQ.Client/src/client/impl/BasicProperties.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,8 @@ public PublicationAddress ReplyToAddress
274274
/// </summary>
275275
public abstract bool IsUserIdPresent();
276276

277+
public abstract override int GetRequiredPayloadBufferSize();
278+
277279
public override object Clone()
278280
{
279281
var clone = MemberwiseClone() as BasicProperties;

projects/client/RabbitMQ.Client/src/client/impl/Command.cs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,8 @@
4040

4141
using System;
4242
using System.Collections.Generic;
43-
using System.IO;
4443

4544
using RabbitMQ.Client.Framing.Impl;
46-
using RabbitMQ.Util;
4745

4846
namespace RabbitMQ.Client.Impl
4947
{
@@ -82,10 +80,9 @@ internal Command(MethodBase method, ContentHeaderBase header, byte[] body)
8280
public static void CheckEmptyFrameSize()
8381
{
8482
var f = new EmptyOutboundFrame();
85-
var stream = new MemoryStream();
86-
var writer = new NetworkBinaryWriter(stream);
87-
f.WriteTo(writer);
88-
long actualLength = stream.Length;
83+
byte[] b = new byte[f.GetMinimumBufferSize()];
84+
f.WriteTo(b);
85+
long actualLength = f.ByteCount;
8986

9087
if (EmptyFrameSize != actualLength)
9188
{

projects/client/RabbitMQ.Client/src/client/impl/ContentHeaderBase.cs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,21 @@ internal ulong ReadFrom(Memory<byte> memory)
8080

8181
private const ushort ZERO = 0;
8282

83-
internal void WriteTo(NetworkBinaryWriter writer, ulong bodySize)
83+
internal int WriteTo(Memory<byte> memory, ulong bodySize)
8484
{
85-
writer.Write(ZERO); // weight - not currently used
86-
writer.Write(bodySize);
87-
WritePropertiesTo(new ContentHeaderPropertyWriter(writer));
85+
NetworkOrderSerializer.WriteUInt16(memory, ZERO); // Weight - not used
86+
NetworkOrderSerializer.WriteUInt64(memory.Slice(2), bodySize);
87+
88+
ContentHeaderPropertyWriter writer = new ContentHeaderPropertyWriter(memory.Slice(10));
89+
WritePropertiesTo(writer);
90+
return 10 + writer.Offset;
91+
}
92+
public int GetRequiredBufferSize()
93+
{
94+
// The first 10 bytes are the Weight (2 bytes) + body size (8 bytes)
95+
return 10 + GetRequiredPayloadBufferSize();
8896
}
97+
98+
public abstract int GetRequiredPayloadBufferSize();
8999
}
90100
}

projects/client/RabbitMQ.Client/src/client/impl/ContentHeaderPropertyWriter.cs

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
3939
//---------------------------------------------------------------------------
4040

41+
using System;
4142
using System.Collections.Generic;
4243

4344
using RabbitMQ.Util;
@@ -48,16 +49,16 @@ class ContentHeaderPropertyWriter
4849
{
4950
protected int m_bitCount;
5051
protected ushort m_flagWord;
52+
public int Offset { get; private set; } = 0;
53+
public Memory<byte> Memory { get; private set; }
5154

52-
public ContentHeaderPropertyWriter(NetworkBinaryWriter writer)
55+
public ContentHeaderPropertyWriter(Memory<byte> memory)
5356
{
54-
BaseWriter = writer;
57+
Memory = memory;
5558
m_flagWord = 0;
5659
m_bitCount = 0;
5760
}
5861

59-
public NetworkBinaryWriter BaseWriter { get; private set; }
60-
6162
public void FinishPresence()
6263
{
6364
EmitFlagWord(false);
@@ -70,22 +71,22 @@ public void WriteBit(bool bit)
7071

7172
public void WriteLong(uint val)
7273
{
73-
WireFormatting.WriteLong(BaseWriter, val);
74+
Offset += WireFormatting.WriteLong(Memory.Slice(Offset), val);
7475
}
7576

7677
public void WriteLonglong(ulong val)
7778
{
78-
WireFormatting.WriteLonglong(BaseWriter, val);
79+
Offset += WireFormatting.WriteLonglong(Memory.Slice(Offset), val);
7980
}
8081

8182
public void WriteLongstr(byte[] val)
8283
{
83-
WireFormatting.WriteLongstr(BaseWriter, val);
84+
Offset += WireFormatting.WriteLongstr(Memory.Slice(Offset), val);
8485
}
8586

8687
public void WriteOctet(byte val)
8788
{
88-
WireFormatting.WriteOctet(BaseWriter, val);
89+
Memory.Slice(Offset++).Span[0] = val;
8990
}
9091

9192
public void WritePresence(bool present)
@@ -105,27 +106,28 @@ public void WritePresence(bool present)
105106

106107
public void WriteShort(ushort val)
107108
{
108-
WireFormatting.WriteShort(BaseWriter, val);
109+
Offset += WireFormatting.WriteShort(Memory.Slice(Offset), val);
109110
}
110111

111112
public void WriteShortstr(string val)
112113
{
113-
WireFormatting.WriteShortstr(BaseWriter, val);
114+
Offset += WireFormatting.WriteShortstr(Memory.Slice(Offset), val);
114115
}
115116

116117
public void WriteTable(IDictionary<string, object> val)
117118
{
118-
WireFormatting.WriteTable(BaseWriter, val);
119+
Offset += WireFormatting.WriteTable(Memory.Slice(Offset), val);
119120
}
120121

121122
public void WriteTimestamp(AmqpTimestamp val)
122123
{
123-
WireFormatting.WriteTimestamp(BaseWriter, val);
124+
Offset += WireFormatting.WriteTimestamp(Memory.Slice(Offset), val);
124125
}
125126

126127
private void EmitFlagWord(bool continuationBit)
127128
{
128-
BaseWriter.Write((ushort)(continuationBit ? (m_flagWord | 1) : m_flagWord));
129+
NetworkOrderSerializer.WriteUInt16(Memory.Slice(Offset), (ushort)(continuationBit ? (m_flagWord | 1) : m_flagWord));
130+
Offset += 2;
129131
m_flagWord = 0;
130132
m_bitCount = 0;
131133
}

0 commit comments

Comments
 (0)