Skip to content

Commit 025b7e4

Browse files
Stefán J. Sigurðarsonlukebakken
authored andcommitted
Reducing memory usage when deserializing frames into commands.
1 parent 4a7a93b commit 025b7e4

16 files changed

+516
-174
lines changed

projects/client/Apigen/src/apigen/Apigen.cs

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public static int GetInt(XmlNode n0, string path)
122122
/// <returns>renamed string</returns>
123123
private static string xmlStringMapper(string xmlString)
124124
{
125-
switch(xmlString)
125+
switch (xmlString)
126126
{
127127
case "no-wait":
128128
return "nowait";
@@ -445,6 +445,7 @@ public void EmitPrelude()
445445
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
446446
//---------------------------------------------------------------------------
447447
448+
using System;
448449
using System.Collections.Generic;
449450
using System.Text;
450451
@@ -840,44 +841,41 @@ public void EmitClassMethodImplementations(AmqpClass c)
840841

841842
public void EmitMethodArgumentReader()
842843
{
843-
EmitLine(" internal override Client.Impl.MethodBase DecodeMethodFrom(Util.NetworkBinaryReader reader)");
844+
EmitLine(" internal override Client.Impl.MethodBase DecodeMethodFrom(Memory<byte> memory)");
844845
EmitLine(" {");
845-
EmitLine(" ushort classId = reader.ReadUInt16();");
846-
EmitLine(" ushort methodId = reader.ReadUInt16();");
847-
EmitLine(" Client.Impl.MethodBase result = null;");
846+
EmitLine(" ushort classId = Util.NetworkOrderDeserializer.ReadUInt16(memory);");
847+
EmitLine(" ushort methodId = Util.NetworkOrderDeserializer.ReadUInt16(memory, 2);");
848+
EmitLine(" Client.Impl.MethodBase result = DecodeMethodFrom(classId, methodId);");
849+
EmitLine(" if(result != null)");
850+
EmitLine(" {");
851+
EmitLine(" result.ReadArgumentsFrom(new Client.Impl.MethodArgumentReader(memory.Slice(4)));");
852+
EmitLine(" return result;");
853+
EmitLine(" }");
848854
EmitLine("");
855+
EmitLine(" throw new Client.Impl.UnknownClassOrMethodException(classId, methodId);");
856+
EmitLine(" }");
857+
EmitLine("");
858+
EmitLine(" internal Client.Impl.MethodBase DecodeMethodFrom(ushort classId, ushort methodId)");
859+
EmitLine(" {");
849860
EmitLine(" switch ((classId << 16) | methodId)");
850861
EmitLine(" {");
851862
foreach (AmqpClass c in m_classes)
852863
{
853864
foreach (AmqpMethod m in c.m_Methods)
854865
{
855-
EmitLine($" case (ClassConstants.{MangleConstant(c.Name)} << 16) | {MangleConstant(c.Name)}MethodConstants.{MangleConstant(m.Name)}:");
856-
EmitLine(" {");
857-
EmitLine($" result = new Impl.{MangleMethodClass(c, m)}();");
858-
EmitLine($" break;");
859-
EmitLine(" }");
866+
EmitLine($" case (ClassConstants.{MangleConstant(c.Name)} << 16) | {MangleConstant(c.Name)}MethodConstants.{MangleConstant(m.Name)}: return new Impl.{MangleMethodClass(c, m)}();");
860867
}
861868
}
862-
EmitLine(" default: break;");
869+
EmitLine(" default: return null;");
863870
EmitLine(" }");
864-
EmitLine("");
865-
EmitLine(" if(result != null)");
866-
EmitLine(" {");
867-
EmitLine(" result.ReadArgumentsFrom(new Client.Impl.MethodArgumentReader(reader));");
868-
EmitLine(" return result;");
869-
EmitLine(" }");
870-
EmitLine("");
871-
EmitLine(" throw new Client.Impl.UnknownClassOrMethodException(classId, methodId);");
872871
EmitLine(" }");
872+
EmitLine("");
873873
}
874874

875875
public void EmitContentHeaderReader()
876876
{
877-
EmitLine(" internal override Client.Impl.ContentHeaderBase DecodeContentHeaderFrom(Util.NetworkBinaryReader reader)");
877+
EmitLine(" internal override Client.Impl.ContentHeaderBase DecodeContentHeaderFrom(ushort classId)");
878878
EmitLine(" {");
879-
EmitLine(" ushort classId = reader.ReadUInt16();");
880-
EmitLine("");
881879
EmitLine(" switch (classId)");
882880
EmitLine(" {");
883881
foreach (AmqpClass c in m_classes)
@@ -887,9 +885,8 @@ public void EmitContentHeaderReader()
887885
EmitLine($" case {c.Index}: return new {MangleClass(c.Name)}Properties();");
888886
}
889887
}
890-
EmitLine(" default: break;");
888+
EmitLine(" default: throw new Client.Impl.UnknownClassOrMethodException(classId, 0);");
891889
EmitLine(" }");
892-
EmitLine(" throw new Client.Impl.UnknownClassOrMethodException(classId, 0);");
893890
EmitLine(" }");
894891
}
895892

projects/client/RabbitMQ.Client/src/client/impl/CommandAssembler.cs

Lines changed: 38 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
3939
//---------------------------------------------------------------------------
4040

41+
using System;
4142
using System.IO;
4243

4344
using RabbitMQ.Client.Framing.Impl;
@@ -76,54 +77,51 @@ public Command HandleFrame(InboundFrame f)
7677
switch (m_state)
7778
{
7879
case AssemblyState.ExpectingMethod:
79-
{
80-
if (!f.IsMethod())
8180
{
82-
throw new UnexpectedFrameException(f);
81+
if (!f.IsMethod())
82+
{
83+
throw new UnexpectedFrameException(f);
84+
}
85+
m_method = m_protocol.DecodeMethodFrom(f.Payload.AsMemory(0, f.PayloadSize));
86+
m_state = m_method.HasContent
87+
? AssemblyState.ExpectingContentHeader
88+
: AssemblyState.Complete;
89+
return CompletedCommand();
8390
}
84-
m_method = m_protocol.DecodeMethodFrom(f.GetReader());
85-
m_state = m_method.HasContent
86-
? AssemblyState.ExpectingContentHeader
87-
: AssemblyState.Complete;
88-
return CompletedCommand();
89-
}
9091
case AssemblyState.ExpectingContentHeader:
91-
{
92-
if (!f.IsHeader())
9392
{
94-
throw new UnexpectedFrameException(f);
93+
if (!f.IsHeader())
94+
{
95+
throw new UnexpectedFrameException(f);
96+
}
97+
Memory<byte> memory = f.Payload.AsMemory(0, f.PayloadSize);
98+
m_header = m_protocol.DecodeContentHeaderFrom(NetworkOrderDeserializer.ReadUInt16(memory));
99+
ulong totalBodyBytes = m_header.ReadFrom(memory.Slice(2));
100+
if (totalBodyBytes > MaxArrayOfBytesSize)
101+
{
102+
throw new UnexpectedFrameException(f);
103+
}
104+
m_remainingBodyBytes = (int)totalBodyBytes;
105+
m_body = new byte[m_remainingBodyBytes];
106+
m_bodyStream = new MemoryStream(m_body, true);
107+
UpdateContentBodyState();
108+
return CompletedCommand();
95109
}
96-
NetworkBinaryReader reader = f.GetReader();
97-
m_header = m_protocol.DecodeContentHeaderFrom(reader);
98-
ulong totalBodyBytes = m_header.ReadFrom(reader);
99-
if (totalBodyBytes > MaxArrayOfBytesSize)
100-
{
101-
throw new UnexpectedFrameException(f);
102-
}
103-
m_remainingBodyBytes = (int)totalBodyBytes;
104-
m_body = new byte[m_remainingBodyBytes];
105-
m_bodyStream = new MemoryStream(m_body, true);
106-
UpdateContentBodyState();
107-
return CompletedCommand();
108-
}
109110
case AssemblyState.ExpectingContentBody:
110-
{
111-
if (!f.IsBody())
112-
{
113-
throw new UnexpectedFrameException(f);
114-
}
115-
if (f.Payload.Length > m_remainingBodyBytes)
116111
{
117-
throw new MalformedFrameException
118-
(string.Format("Overlong content body received - {0} bytes remaining, {1} bytes received",
119-
m_remainingBodyBytes,
120-
f.Payload.Length));
112+
if (!f.IsBody())
113+
{
114+
throw new UnexpectedFrameException(f);
115+
}
116+
if (f.PayloadSize > m_remainingBodyBytes)
117+
{
118+
throw new MalformedFrameException($"Overlong content body received - {m_remainingBodyBytes} bytes remaining, {f.PayloadSize} bytes received");
119+
}
120+
m_bodyStream.Write(f.Payload, 0, f.PayloadSize);
121+
m_remainingBodyBytes -= f.PayloadSize;
122+
UpdateContentBodyState();
123+
return CompletedCommand();
121124
}
122-
m_bodyStream.Write(f.Payload, 0, f.Payload.Length);
123-
m_remainingBodyBytes -= f.Payload.Length;
124-
UpdateContentBodyState();
125-
return CompletedCommand();
126-
}
127125
case AssemblyState.Complete:
128126
default:
129127
return null;

projects/client/RabbitMQ.Client/src/client/impl/Connection.cs

Lines changed: 43 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -562,52 +562,53 @@ public void MainLoop()
562562

563563
public void MainLoopIteration()
564564
{
565-
InboundFrame frame = _frameHandler.ReadFrame();
566-
567-
NotifyHeartbeatListener();
568-
// We have received an actual frame.
569-
if (frame.IsHeartbeat())
565+
using (InboundFrame frame = _frameHandler.ReadFrame())
570566
{
571-
// Ignore it: we've already just reset the heartbeat
572-
// latch.
573-
return;
574-
}
567+
NotifyHeartbeatListener();
568+
// We have received an actual frame.
569+
if (frame.IsHeartbeat())
570+
{
571+
// Ignore it: we've already just reset the heartbeat
572+
// latch.
573+
return;
574+
}
575575

576-
if (frame.Channel == 0)
577-
{
578-
// In theory, we could get non-connection.close-ok
579-
// frames here while we're quiescing (m_closeReason !=
580-
// null). In practice, there's a limited number of
581-
// things the server can ask of us on channel 0 -
582-
// essentially, just connection.close. That, combined
583-
// with the restrictions on pipelining, mean that
584-
// we're OK here to handle channel 0 traffic in a
585-
// quiescing situation, even though technically we
586-
// should be ignoring everything except
587-
// connection.close-ok.
588-
_session0.HandleFrame(frame);
589-
}
590-
else
591-
{
592-
// If we're still m_running, but have a m_closeReason,
593-
// then we must be quiescing, which means any inbound
594-
// frames for non-zero channels (and any inbound
595-
// commands on channel zero that aren't
596-
// Connection.CloseOk) must be discarded.
597-
if (_closeReason == null)
576+
if (frame.Channel == 0)
598577
{
599-
// No close reason, not quiescing the
600-
// connection. Handle the frame. (Of course, the
601-
// Session itself may be quiescing this particular
602-
// channel, but that's none of our concern.)
603-
ISession session = _sessionManager.Lookup(frame.Channel);
604-
if (session == null)
605-
{
606-
throw new ChannelErrorException(frame.Channel);
607-
}
608-
else
578+
// In theory, we could get non-connection.close-ok
579+
// frames here while we're quiescing (m_closeReason !=
580+
// null). In practice, there's a limited number of
581+
// things the server can ask of us on channel 0 -
582+
// essentially, just connection.close. That, combined
583+
// with the restrictions on pipelining, mean that
584+
// we're OK here to handle channel 0 traffic in a
585+
// quiescing situation, even though technically we
586+
// should be ignoring everything except
587+
// connection.close-ok.
588+
_session0.HandleFrame(frame);
589+
}
590+
else
591+
{
592+
// If we're still m_running, but have a m_closeReason,
593+
// then we must be quiescing, which means any inbound
594+
// frames for non-zero channels (and any inbound
595+
// commands on channel zero that aren't
596+
// Connection.CloseOk) must be discarded.
597+
if (_closeReason == null)
609598
{
610-
session.HandleFrame(frame);
599+
// No close reason, not quiescing the
600+
// connection. Handle the frame. (Of course, the
601+
// Session itself may be quiescing this particular
602+
// channel, but that's none of our concern.)
603+
ISession session = _sessionManager.Lookup(frame.Channel);
604+
if (session == null)
605+
{
606+
throw new ChannelErrorException(frame.Channel);
607+
}
608+
else
609+
{
610+
session.HandleFrame(frame);
611+
}
611612
}
612613
}
613614
}

projects/client/RabbitMQ.Client/src/client/impl/ContentHeaderBase.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,11 @@ public virtual object Clone()
6767
///<summary>
6868
/// Fill this instance from the given byte buffer stream.
6969
///</summary>
70-
internal ulong ReadFrom(NetworkBinaryReader reader)
70+
internal ulong ReadFrom(Memory<byte> memory)
7171
{
72-
reader.ReadUInt16(); // weight - not currently used
73-
ulong bodySize = reader.ReadUInt64();
74-
ReadPropertiesFrom(new ContentHeaderPropertyReader(reader));
72+
// Skipping the first two bytes since they arent used (weight - not currently used)
73+
ulong bodySize = NetworkOrderDeserializer.ReadUInt64(memory.Slice(2));
74+
ReadPropertiesFrom(new ContentHeaderPropertyReader(memory.Slice(10)));
7575
return bodySize;
7676
}
7777

0 commit comments

Comments
 (0)