Skip to content

Conversation

@cocktail828
Copy link

Fixes #447

Motivation

Try to fix issue #447.

Modifications

Compress the payload when the payload is too bigger, and check whever the 'compressed payload' is still bigger than expected.

Copy link
Contributor

@pgier pgier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just had one in-line suggestion.

@Gleiphir2769
Copy link
Contributor

update: "a short-circuit and will work here"

Co-authored-by: Paul Gier <[email protected]>

// if msg is too large
if len(payload) > int(p._getConn().GetMaxMessageSize()) {
if len(payload) > int(p._getConn().GetMaxMessageSize()) &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that in this PR we do the compression twice when sending the message. The batch builder will do the compression here: https://github.com/apache/pulsar-client-go/blob/master/pulsar/internal/commands.go#L251
Could we move the compression from the batch builder to here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, this modification just an translation the implement of https://github.com/apache/pulsar/blob/782132561ac9fc8430ae3ef12913999e5871d3d2/pulsar-client-cpp/lib/ProducerImpl.cc#L431.
It indeed do compress twice.

// Wire format
// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [PAYLOAD]
// |
// compressed or uncompressed {[METADATA_SIZE_0] [METADATA_0] [PAYLOAD_0]} ...
Acording to the wire format above, SDK cannot compress messages one by one but should compress the whole batchs together.

Another way to avoid 'twice-compress', I think is flushing batchs immediately before and after add big message to batchBuilder.

@RobertIndie
Copy link
Member

Is it possible to add some tests to verify it?

@cocktail828
Copy link
Author

Hello, I think this problem may be resolved in #805 . https://github.com/apache/pulsar-client-go/pull/805/files#diff-8adb09af2175be5751840343e14df2220a2154ca4bc8de21157ed112be25b6b9L521-L533

image

Hi @Gleiphir2769, according to the implement of Java-Client, it should also check 'encryptedPayload' not only the payload in 'ProducerMessage'.
The valid payload is encapsuled with metaData headers and payloads ([metaDataSize][metaData][payload]....)(refer to https://pulsar.apache.org/docs/developing-binary-protocol)
https://github.com/apache/pulsar/blob/24c62c137670fcaff69ae91be30d2b531470ee03/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java#L201

@Gleiphir2769
Copy link
Contributor

Hello, I think this problem may be resolved in #805 . https://github.com/apache/pulsar-client-go/pull/805/files#diff-8adb09af2175be5751840343e14df2220a2154ca4bc8de21157ed112be25b6b9L521-L533
image

Hi @Gleiphir2769, according to the implement of Java-Client, it should also check 'encryptedPayload' not only the payload in 'ProducerMessage'. The valid payload is encapsuled with metaData headers and payloads ([metaDataSize][metaData][payload]....)(refer to https://pulsar.apache.org/docs/developing-binary-protocol) https://github.com/apache/pulsar/blob/24c62c137670fcaff69ae91be30d2b531470ee03/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java#L201

You are right. In the Java Client it checks "encryptedPayload" in batching. But Java Client only checks the "encryptedPayload" before sending when batching enabled, which means it will not check maxMessageSize in sendAsync. I think the check should be implemented in Flush or somewhere.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

pulsar-client-go appears to report message too large before compressing it.

4 participants