Skip to content

Commit d5304a2

Browse files
qyryqGazizonoki
authored andcommitted
Add TMessage::GetBrokenData method (#23656)
1 parent c806cb1 commit d5304a2

File tree

5 files changed

+124
-6
lines changed

5 files changed

+124
-6
lines changed

.github/last_commit.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
66e1273f270f166cf9a11288b0b2310f55e6de82
1+
af4a59730e62ce87813dd141cab505ff94c465d1

include/ydb-cpp-sdk/client/topic/read_events.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ struct TReadSessionEvent {
102102
virtual ~TMessageBase() = default;
103103

104104
virtual const std::string& GetData() const;
105+
virtual const std::string& GetBrokenData() const;
105106

106107
virtual void Commit() = 0;
107108

@@ -144,6 +145,8 @@ struct TReadSessionEvent {
144145
//! User data.
145146
//! Throws decompressor exception if decompression failed.
146147
const std::string& GetData() const override;
148+
//! Throws exception if decompression succeeded.
149+
const std::string& GetBrokenData() const override;
147150

148151
//! Commits single message.
149152
void Commit() override;

src/client/topic/impl/read_session_event.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ const std::string& TMessageBase::GetData() const {
8888
return Data;
8989
}
9090

91+
const std::string& TMessageBase::GetBrokenData() const {
92+
return Data;
93+
}
94+
9195
uint64_t TMessageBase::GetOffset() const {
9296
return Information.Offset;
9397
}
@@ -175,6 +179,13 @@ const std::string& TMessage::GetData() const {
175179
return TMessageBase::GetData();
176180
}
177181

182+
const std::string& TMessage::GetBrokenData() const {
183+
if (DecompressionException) {
184+
return TMessageBase::GetData();
185+
}
186+
ythrow yexception() << "Can not get broken data after successful decompression";
187+
}
188+
178189
bool TMessage::HasException() const {
179190
return DecompressionException != nullptr;
180191
}

src/client/topic/impl/read_session_impl.ipp

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1362,7 +1362,6 @@ inline void TSingleClusterReadSessionImpl<false>::StopPartitionSessionImpl(
13621362

13631363
if (graceful) {
13641364
auto committedOffset = partitionStream->GetMaxCommittedOffset();
1365-
LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "XXXXX PushEvent 1422 TStopPartitionSessionEvent");
13661365
pushRes = EventsQueue->PushEvent(
13671366
partitionStream,
13681367
// TODO(qyryq) Is it safe to use GetMaxCommittedOffset here instead of StopPartitionSessionRequest.commmitted_offset?
@@ -1375,7 +1374,6 @@ inline void TSingleClusterReadSessionImpl<false>::StopPartitionSessionImpl(
13751374
released.set_partition_session_id(partitionStream->GetAssignId());
13761375
WriteToProcessorImpl(std::move(req));
13771376
PartitionStreams.erase(partitionSessionId);
1378-
LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "XXXXX PushEvent 1435 TPartitionSessionClosedEvent");
13791377
pushRes = EventsQueue->PushEvent(
13801378
partitionStream,
13811379
TReadSessionEvent::TPartitionSessionClosedEvent(partitionStream, TReadSessionEvent::TPartitionSessionClosedEvent::EReason::Lost),
@@ -3064,16 +3062,15 @@ void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::operator(
30643062
data.set_data(TStringType{decompressed});
30653063
}
30663064
}
3067-
3068-
DecompressedSize += data.data().size();
30693065
} catch (...) {
30703066
parent->PutDecompressionError(std::current_exception(), messages.Batch, i);
3071-
data.clear_data(); // Free memory, because we don't count it.
30723067

30733068
if (auto session = parent->CbContext->LockShared()) {
30743069
session->GetLog() << TLOG_INFO << "Error decompressing data: " << CurrentExceptionMessage();
30753070
}
30763071
}
3072+
3073+
DecompressedSize += data.data().size();
30773074
}
30783075
}
30793076

tests/integration/topic/basic_usage.cpp

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -747,6 +747,113 @@ TEST_F(BasicUsage, TEST_NAME(TWriteSession_WriteEncoded)) {
747747
}
748748
}
749749

750+
TEST_F(BasicUsage, TEST_NAME(TWriteSession_WriteEncoded_Broken)) {
751+
// Write a broken compressed message.
752+
// GetData should throw an exception.
753+
// GetBrokenData should return the broken data.
754+
755+
// Write a correct compressed message.
756+
// GetData should return the correct data.
757+
// GetBrokenData should throw an exception.
758+
759+
auto driver = MakeDriver();
760+
761+
TTopicClient client(driver);
762+
763+
auto settings = TWriteSessionSettings()
764+
.Path(GetTopicPath())
765+
.MessageGroupId(TEST_MESSAGE_GROUP_ID);
766+
767+
auto writer = client.CreateWriteSession(settings);
768+
std::string brokenPacked = "some broken data";
769+
770+
{
771+
auto event = *writer->GetEvent(true);
772+
ASSERT_TRUE(std::holds_alternative<TWriteSessionEvent::TReadyToAcceptEvent>(event));
773+
writer->WriteEncoded(
774+
std::move(std::get<TWriteSessionEvent::TReadyToAcceptEvent>(event).ContinuationToken),
775+
TWriteMessage::CompressedMessage(brokenPacked, ECodec::GZIP, 100)
776+
);
777+
}
778+
779+
std::string correctMessage = "message";
780+
TString packed;
781+
{
782+
TStringOutput so(packed);
783+
TZLibCompress oss(&so, ZLib::GZip, 6);
784+
oss << correctMessage;
785+
}
786+
{
787+
auto event = *writer->GetEvent(true);
788+
ASSERT_TRUE(std::holds_alternative<TWriteSessionEvent::TReadyToAcceptEvent>(event));
789+
writer->WriteEncoded(
790+
std::move(std::get<TWriteSessionEvent::TReadyToAcceptEvent>(event).ContinuationToken),
791+
TWriteMessage::CompressedMessage(packed, ECodec::GZIP, correctMessage.size())
792+
);
793+
}
794+
795+
std::uint32_t acks = 0;
796+
while (acks < 2) {
797+
auto event = *writer->GetEvent(true);
798+
if (auto e = std::get_if<TWriteSessionEvent::TAcksEvent>(&event)) {
799+
acks += e->Acks.size();
800+
} else {
801+
continue;
802+
}
803+
}
804+
805+
ASSERT_EQ(acks, 2u);
806+
807+
auto readSettings = TReadSessionSettings()
808+
.ConsumerName(GetConsumerName())
809+
.AppendTopics(GetTopicPath())
810+
// .DirectRead(EnableDirectRead)
811+
;
812+
std::shared_ptr<IReadSession> readSession = client.CreateReadSession(readSettings);
813+
std::uint32_t readMessageCount = 0;
814+
while (readMessageCount < 2) {
815+
std::cerr << "Get event on client\n";
816+
auto event = *readSession->GetEvent(true);
817+
std::visit(TOverloaded {
818+
[&](TReadSessionEvent::TDataReceivedEvent& event) {
819+
for (auto& message: event.GetMessages()) {
820+
if (readMessageCount == 0) {
821+
ASSERT_THROW(message.GetData(), std::exception);
822+
std::string data = message.GetBrokenData();
823+
ASSERT_TRUE(brokenPacked == data);
824+
} else {
825+
ASSERT_THROW(message.GetBrokenData(), std::exception);
826+
std::string data = message.GetData();
827+
ASSERT_TRUE(correctMessage == data);
828+
}
829+
++readMessageCount;
830+
}
831+
},
832+
[&](TReadSessionEvent::TCommitOffsetAcknowledgementEvent&) {
833+
FAIL();
834+
},
835+
[&](TReadSessionEvent::TStartPartitionSessionEvent& event) {
836+
event.Confirm();
837+
},
838+
[&](TReadSessionEvent::TStopPartitionSessionEvent& event) {
839+
event.Confirm();
840+
},
841+
[&](TReadSessionEvent::TEndPartitionSessionEvent& event) {
842+
event.Confirm();
843+
},
844+
[&](TReadSessionEvent::TPartitionSessionStatusEvent&) {
845+
FAIL() << "Test does not support lock sessions yet";
846+
},
847+
[&](TReadSessionEvent::TPartitionSessionClosedEvent&) {
848+
FAIL() << "Test does not support lock sessions yet";
849+
},
850+
[&](TSessionClosedEvent&) {
851+
FAIL() << "Session closed";
852+
}
853+
}, event);
854+
}
855+
}
856+
750857
namespace {
751858
enum class EExpectedTestResult {
752859
SUCCESS,

0 commit comments

Comments
 (0)