Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions services/echo_console/Main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
#include "services/network/HttpClientImpl.hpp"
#include "services/network/WebSocketClientConnectionObserver.hpp"
#include "services/tracer/GlobalTracer.hpp"

#ifdef PeekMessage
#undef PeekMessage
#endif

#include "services/util/EchoPolicyDiffieHellman.hpp"
#include "services/util/EchoPolicySymmetricKey.hpp"
#include "services/util/SesameCobs.hpp"
Expand Down
1 change: 1 addition & 0 deletions services/util/Sesame.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ namespace services
virtual void SendMessageStreamAvailable(infra::SharedPtr<infra::StreamWriter>&& writer) = 0;
virtual void MessageSent(std::size_t encodedSize) = 0;
virtual void ReceivedMessage(infra::SharedPtr<infra::StreamReaderWithRewinding>&& reader, std::size_t encodedSize) = 0;
virtual void PeekMessage(infra::StreamReaderWithRewinding& reader, std::size_t encodedSize) = 0;
Copy link

Copilot AI Feb 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new API name PeekMessage collides with the common Windows macro PeekMessage (as evidenced by the local #undef workaround added in services/echo_console/Main.cpp). This is likely to break other translation units depending on include order. Prefer renaming the API to a less collision-prone identifier (e.g., PeekReceivedMessage, InspectIncomingMessage, OnMessagePeek) rather than relying on per-file #undef fixes.

Suggested change
virtual void PeekMessage(infra::StreamReaderWithRewinding& reader, std::size_t encodedSize) = 0;
virtual void PeekReceivedMessage(infra::StreamReaderWithRewinding& reader, std::size_t encodedSize) = 0;

Copilot uses AI. Check for mistakes.
};

class SesameEncoded
Expand Down
96 changes: 95 additions & 1 deletion services/util/SesameCobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ namespace services
receivedMessage.clear();
receiveSizeEncoded = 0;
currentMessageSize = 0;
nextPeekOverhead = 1;
peekOverheadPositionIsPseudo = true;
receivedPeekMessage.clear();
receiveSizePeekEncoded = 0;
currentPeekMessageSize = 0;
Comment on lines 51 to +58
Copy link

Copilot AI Feb 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

receivePeekIndex is used to track how far peeking progressed (see DataReceived()), but it is not reset in Stop(). After stop/start cycles (or reusing the instance), peeking can start at a stale index and skip data. Reset receivePeekIndex (and any other peek cursor state) alongside the other peek-related fields in Stop().

Copilot uses AI. Check for mistakes.
receivedDataReader.OnAllocatable([]() {});
sendReqestedSize.reset();

Expand All @@ -75,13 +80,25 @@ namespace services
{
receiving = true;

auto& reader = hal::BufferedSerialCommunicationObserver::Subject().Reader();

while (true)
{
auto data = reader.PeekContiguousRange(receivePeekIndex);
if (data.empty())
break;
receivePeekIndex += data.size();
ReceivedPeekData(data);
}

while (receivedDataReader.Allocatable())
{
auto& reader = hal::BufferedSerialCommunicationObserver::Subject().Reader();
auto data = reader.ExtractContiguousRange(std::numeric_limits<uint32_t>::max());
if (data.empty())
break;
auto dataSize = data.size();
ReceivedData(data);
receivePeekIndex -= dataSize - data.size();
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The calculation dataSize - data.size() is unclear without context. Consider extracting this into a named variable like consumedBytes or adding a comment explaining that this represents the amount of data consumed from the peek buffer.

Suggested change
receivePeekIndex -= dataSize - data.size();
auto consumedBytes = dataSize - data.size();
receivePeekIndex -= consumedBytes;

Copilot uses AI. Check for mistakes.
reader.Rewind(reader.ConstructSaveMarker() - data.size());
hal::BufferedSerialCommunicationObserver::Subject().AckReceived();
}
Expand Down Expand Up @@ -173,6 +190,83 @@ namespace services
}
}

void SesameCobs::ReceivedPeekData(infra::ConstByteRange& data)
{
while (!data.empty())
{
if (nextPeekOverhead == 1)
ExtractPeekOverhead(data);
else
ExtractPeekData(data);
}
}

void SesameCobs::ExtractPeekOverhead(infra::ConstByteRange& data)
{
nextPeekOverhead = data.front();

if (nextPeekOverhead == 0)
PeekMessageBoundary(data);
else
{
++receiveSizePeekEncoded;
data.pop_front();
if (!peekOverheadPositionIsPseudo)
ReceivedPeekPayload(infra::MakeByteRange(messageDelimiter));
peekOverheadPositionIsPseudo = nextPeekOverhead == 255;
}
}

void SesameCobs::ExtractPeekData(infra::ConstByteRange& data)
{
infra::ConstByteRange preDelimiter;
infra::ConstByteRange postDelimiter;
std::tie(preDelimiter, postDelimiter) = infra::FindAndSplit(infra::Head(data, nextPeekOverhead - 1), messageDelimiter);

if (!preDelimiter.empty())
ForwardPeekData(preDelimiter, data);

if (!postDelimiter.empty())
PeekMessageBoundary(data);
}

void SesameCobs::ForwardPeekData(infra::ConstByteRange contents, infra::ConstByteRange& data)
{
ReceivedPeekPayload(contents);
data.pop_front(contents.size());
receiveSizePeekEncoded += contents.size();
nextPeekOverhead -= static_cast<uint8_t>(contents.size());
}

void SesameCobs::PeekMessageBoundary(infra::ConstByteRange& data)
{
++receiveSizePeekEncoded;
FinishPeekMessage();
data.pop_front();
nextPeekOverhead = 1;
peekOverheadPositionIsPseudo = true;
}

void SesameCobs::ReceivedPeekPayload(infra::ConstByteRange data)
{
auto fittingData = infra::Head(data, receivedPeekMessage.max_size() - receivedPeekMessage.size());
receivedPeekMessage.insert(receivedPeekMessage.end(), fittingData.begin(), fittingData.end());
currentPeekMessageSize += data.size();
}

void SesameCobs::FinishPeekMessage()
{
auto messageSize = currentPeekMessageSize;
currentPeekMessageSize = 0;

if (messageSize != 0)
{
infra::LimitedStreamReaderWithRewinding::WithInput<infra::BoundedDequeInputStreamReader> reader(std::in_place, receivedPeekMessage, messageSize);
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The receivedPeekMessage deque has a max size of 3 bytes (line 93), but messageSize from currentPeekMessageSize can be larger than 3 (line 254 shows it accumulates all data.size()). This creates a mismatch where the reader is constructed with a size that exceeds the actual data available in the deque, which could lead to reading invalid data or undefined behavior.

Copilot uses AI. Check for mistakes.
GetObserver().PeekMessage(reader, std::exchange(receiveSizePeekEncoded, 0));
receivedPeekMessage.clear();
Comment on lines +264 to +266
Copy link

Copilot AI Feb 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

receivedPeekMessage is capped to 3 bytes, but messageSize tracks the full decoded payload size. Constructing a reader with messageSize greater than the underlying stored bytes can misreport availability and can cause peek handlers to attempt to read beyond what is actually buffered. Consider constructing the peek reader with the actual buffered size (e.g., receivedPeekMessage.size()) or increasing the peek buffer to guarantee messageSize bytes are present.

Copilot uses AI. Check for mistakes.
}
}

void SesameCobs::CheckReadyToSendUserData()
{
if (!sendingUserData && sendReqestedSize != std::nullopt)
Expand Down
14 changes: 14 additions & 0 deletions services/util/SesameCobs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ namespace services
void ReceivedPayload(infra::ConstByteRange data);
void FinishMessage();

void ReceivedPeekData(infra::ConstByteRange& data);
void ExtractPeekOverhead(infra::ConstByteRange& data);
void ExtractPeekData(infra::ConstByteRange& data);
void ForwardPeekData(infra::ConstByteRange contents, infra::ConstByteRange& data);
void PeekMessageBoundary(infra::ConstByteRange& data);
void ReceivedPeekPayload(infra::ConstByteRange data);
void FinishPeekMessage();

void CheckReadyToSendUserData();
void SendSerialData(const infra::ConstByteRange data, const infra::Function<void()>& onSendDataDone);
void SendStreamFilled();
Expand All @@ -82,6 +90,12 @@ namespace services
infra::BoundedDeque<uint8_t>& receivedMessage;
std::size_t receiveSizeEncoded = 0;
std::size_t currentMessageSize = 0;
infra::BoundedDeque<uint8_t>::WithMaxSize<3> receivedPeekMessage;
std::size_t receivePeekIndex = 0;
uint8_t nextPeekOverhead = 1;
std::size_t receiveSizePeekEncoded = 0;
bool peekOverheadPositionIsPseudo = true;
std::size_t currentPeekMessageSize = 0;
infra::NotifyingSharedOptional<infra::LimitedStreamReaderWithRewinding::WithInput<infra::BoundedDequeInputStreamReader>> receivedDataReader;

infra::BoundedVector<uint8_t>& sendStorage;
Expand Down
55 changes: 39 additions & 16 deletions services/util/SesameWindowed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ namespace services
switch (stream.Extract<Operation>())
{
case Operation::init:
discardUntilInit = false;
otherAvailableWindow = stream.Extract<infra::LittleEndian<uint16_t>>();
ReceivedInit(otherAvailableWindow);
sendInitResponse = true;
Expand All @@ -77,12 +78,7 @@ namespace services
break;
case Operation::releaseWindow:
if (initialized)
{
releasedWindow += encodedSize;
auto oldOtherAvailableWindow = otherAvailableWindow;
otherAvailableWindow += stream.Extract<infra::LittleEndian<uint16_t>>();
ReceivedReleaseWindow(oldOtherAvailableWindow, otherAvailableWindow);
}
break;
case Operation::message:
if (initialized)
Expand All @@ -96,6 +92,35 @@ namespace services
SetNextState();
}

void SesameWindowed::PeekMessage(infra::StreamReaderWithRewinding& reader, std::size_t encodedSize)
{
infra::DataInputStream::WithErrorPolicy stream(reader, infra::noFail);
switch (stream.Extract<Operation>())
{
case Operation::init:
{
// No need to send any data until the init is processed.
// Discard anything while normal processing continues.
discardUntilInit = true;
break;
}
case Operation::initResponse:
break;
case Operation::releaseWindow:
if (initialized)
{
auto oldOtherAvailableWindow = otherAvailableWindow;
otherAvailableWindow += stream.Extract<infra::LittleEndian<uint16_t>>();
ReceivedReleaseWindow(oldOtherAvailableWindow, otherAvailableWindow);
}
break;
case Operation::message:
break;
}

SetNextState();
}

void SesameWindowed::ReceivedInitialize()
{
maxUsableBufferSize = otherAvailableWindow;
Expand Down Expand Up @@ -125,9 +150,9 @@ namespace services
if (receivedMessageReader == nullptr)
state.Emplace<StateSendingInitResponse>(*this).Request();
}
else if (requestedSendMessageSize != std::nullopt && SesameEncodedObserver::Subject().MessageSize(*requestedSendMessageSize + 1) + releaseWindowSize <= otherAvailableWindow)
else if (requestedSendMessageSize != std::nullopt && (discardUntilInit || SesameEncodedObserver::Subject().MessageSize(*requestedSendMessageSize + 1) + releaseWindowSize <= otherAvailableWindow))
state.Emplace<StateSendingMessage>(*this).Request();
else if (releasedWindow > releaseWindowSize && releaseWindowSize <= otherAvailableWindow)
else if (!discardUntilInit && releasedWindow > releaseWindowSize && releaseWindowSize <= otherAvailableWindow)
state.Emplace<StateSendingReleaseWindow>(*this).Request();
else
state.Emplace<StateOperational>(*this);
Expand Down Expand Up @@ -239,7 +264,13 @@ namespace services

void SesameWindowed::StateSendingMessage::Request()
{
communication.SesameEncodedObserver::Subject().RequestSendMessage(requestedSize);
if (communication.discardUntilInit)
{
communication.requestedSendMessageSize.reset();
MessageSent(0);
}
else
communication.SesameEncodedObserver::Subject().RequestSendMessage(requestedSize);
}

void SesameWindowed::StateSendingMessage::SendMessageStreamAvailable(infra::SharedPtr<infra::StreamWriter>&& writer)
Expand All @@ -252,14 +283,6 @@ namespace services
communication.GetObserver().SendMessageStreamAvailable(std::move(writer));
}

void SesameWindowed::StateSendingMessage::MessageSent(std::size_t encodedSize)
{
communication.otherAvailableWindow -= encodedSize;

communication.sending = false;
communication.SetNextState();
}

SesameWindowed::StateSendingReleaseWindow::StateSendingReleaseWindow(SesameWindowed& communication)
: State(communication)
{
Expand Down
3 changes: 2 additions & 1 deletion services/util/SesameWindowed.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ namespace services
void SendMessageStreamAvailable(infra::SharedPtr<infra::StreamWriter>&& writer) override;
void MessageSent(std::size_t encodedSize) override;
void ReceivedMessage(infra::SharedPtr<infra::StreamReaderWithRewinding>&& reader, std::size_t encodedSize) override;
void PeekMessage(infra::StreamReaderWithRewinding& reader, std::size_t encodedSize) override;

private:
void ReceivedInitialize();
Expand Down Expand Up @@ -141,7 +142,6 @@ namespace services

void Request() override;
void SendMessageStreamAvailable(infra::SharedPtr<infra::StreamWriter>&& writer) override;
void MessageSent(std::size_t encodedSize) override;

private:
std::size_t requestedSize;
Expand All @@ -161,6 +161,7 @@ namespace services
const uint16_t ownBufferSize;
const uint16_t releaseWindowSize;
bool initialized = false;
bool discardUntilInit = false;
infra::SharedPtr<infra::StreamReaderWithRewinding> receivedMessageReader;
infra::AccessedBySharedPtr readerAccess;
uint16_t otherAvailableWindow{ 0 };
Expand Down
Loading
Loading