Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions topic/src/main/java/tech/ydb/topic/write/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,20 @@ public class Message {
private byte[] data;
private final Long seqNo;
private final Instant createTimestamp;
private List<MetadataItem> metadataItems;
private final List<MetadataItem> metadataItems;

private Message(Builder builder) {
this.data = builder.data;
this.seqNo = builder.seqNo;
this.createTimestamp = builder.createTimestamp != null ? builder.createTimestamp : Instant.now();
this.metadataItems = builder.metadataItems;
this.metadataItems = builder.metadataItems != null ? builder.metadataItems : new ArrayList<>();
}

private Message(byte[] data) {
this.data = data;
this.seqNo = null;
this.createTimestamp = Instant.now();
this.metadataItems = new ArrayList<>();
}

public static Message of(byte[] data) {
Expand All @@ -42,6 +43,7 @@ public byte[] getData() {
return data;
}

@Deprecated
public void setData(byte[] data) {
this.data = data;
}
Expand All @@ -50,10 +52,12 @@ public Long getSeqNo() {
return seqNo;
}

@Nonnull
public Instant getCreateTimestamp() {
return createTimestamp;
}

@Nonnull
public List<MetadataItem> getMetadataItems() {
return metadataItems;
}
Expand Down
118 changes: 77 additions & 41 deletions topic/src/main/java/tech/ydb/topic/write/impl/EnqueuedMessage.java
Original file line number Diff line number Diff line change
@@ -1,78 +1,114 @@
package tech.ydb.topic.write.impl;

import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import com.google.protobuf.UnsafeByteOperations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import tech.ydb.common.transaction.YdbTransaction;
import tech.ydb.core.utils.ProtobufUtils;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.description.Codec;
import tech.ydb.topic.description.MetadataItem;
import tech.ydb.topic.settings.SendSettings;
import tech.ydb.topic.utils.Encoder;
import tech.ydb.topic.write.Message;
import tech.ydb.topic.write.WriteAck;

public class EnqueuedMessage {
private final Message message;

// use logger from WriterImpl
private static final Logger logger = LoggerFactory.getLogger(WriterImpl.class);

private Long seqNo;
private byte[] bytes;
private final long originLength;
private final Instant createdAt;
private final List<MetadataItem> items;

private final CompletableFuture<WriteAck> future = new CompletableFuture<>();
private final AtomicBoolean isCompressed = new AtomicBoolean();
private final AtomicBoolean isProcessingFailed = new AtomicBoolean();
private final long uncompressedSizeBytes;
private final YdbTransaction transaction;
private long compressedSizeBytes;
private Long seqNo;

public EnqueuedMessage(Message message, SendSettings sendSettings) {
this.message = message;
this.uncompressedSizeBytes = message.getData().length;
this.transaction = sendSettings != null ? sendSettings.getTransaction() : null;
}
private volatile boolean isReady = false;
private volatile IOException comporessError = null;

public Message getMessage() {
return message;
}
public EnqueuedMessage(Message message, SendSettings sendSettings, boolean noCompression) {
this.bytes = message.getData();
this.createdAt = message.getCreateTimestamp();
this.items = message.getMetadataItems();
this.seqNo = message.getSeqNo();

public CompletableFuture<WriteAck> getFuture() {
return future;
}

public boolean isCompressed() {
return isCompressed.get();
this.originLength = bytes.length;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

originalSize?

this.transaction = sendSettings != null ? sendSettings.getTransaction() : null;
this.isReady = noCompression;
}

public void setCompressed(boolean compressed) {
this.isCompressed.set(compressed);
public boolean isReady() {
return isReady;
}

public boolean isProcessingFailed() {
return isProcessingFailed.get();
public long getOriginLength() {
return originLength;
}

public void setProcessingFailed(boolean processingFailed) {
isProcessingFailed.set(processingFailed);
}

public long getUncompressedSizeBytes() {
return uncompressedSizeBytes;
public long getLength() {
return bytes.length;
}

public long getCompressedSizeBytes() {
return compressedSizeBytes;
public IOException getCompressError() {
return comporessError;
}

public void setCompressedSizeBytes(long compressedSizeBytes) {
this.compressedSizeBytes = compressedSizeBytes;
public void encode(String id, Codec codec) {
logger.trace("[{}] Started encoding message", id);

try {
bytes = Encoder.encode(codec, bytes);
isReady = true;
logger.trace("[{}] Successfully finished encoding message", id);
} catch (IOException ex) {
logger.error("[{}] Exception while encoding message: ", id, ex);
isReady = true;
future.completeExceptionally(ex);
}
}

public long getSizeBytes() {
return isCompressed() ? getCompressedSizeBytes() : getUncompressedSizeBytes();
public CompletableFuture<WriteAck> getFuture() {
return future;
}

public Long getSeqNo() {
return seqNo;
}

public void setSeqNo(long seqNo) {
this.seqNo = seqNo;
}

public YdbTransaction getTransaction() {
return transaction;
}

long updateSeqNo(long lastSeqNo) {
if (seqNo == null) {
seqNo = lastSeqNo + 1;
return seqNo;
}
return Math.max(lastSeqNo, seqNo);
}

YdbTopic.StreamWriteMessage.WriteRequest.MessageData toMessageData() {
return YdbTopic.StreamWriteMessage.WriteRequest.MessageData.newBuilder()
.setSeqNo(seqNo)
.setData(UnsafeByteOperations.unsafeWrap(bytes))
.setCreatedAt(ProtobufUtils.instantToProto(createdAt))
.setUncompressedSize(originLength)
.addAllMetadataItems(items.stream().map(it -> YdbTopic.MetadataItem.newBuilder()
.setKey(it.getKey())
.setValue(UnsafeByteOperations.unsafeWrap(it.getValue()))
.build()
).collect(Collectors.toList()))
.build();
}
}
37 changes: 6 additions & 31 deletions topic/src/main/java/tech/ydb/topic/write/impl/MessageSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.stream.Collectors;

import com.google.protobuf.ByteString;
import org.slf4j.Logger;
Expand All @@ -13,7 +12,6 @@
import tech.ydb.common.transaction.YdbTransaction;
import tech.ydb.core.utils.ProtobufUtils;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.description.MetadataItem;
import tech.ydb.topic.settings.WriterSettings;
import tech.ydb.topic.utils.ProtoUtils;

Expand Down Expand Up @@ -146,46 +144,23 @@ public void tryAddMessageToRequest(EnqueuedMessage message) {
}
currentTransaction = message.getTransaction();
}
long messageSeqNo = message.getSeqNo() == null
? (message.getMessage().getSeqNo() == null ? ++seqNo : message.getMessage().getSeqNo())
: message.getSeqNo();
if (message.getSeqNo() == null) {
message.setSeqNo(messageSeqNo);
}

YdbTopic.StreamWriteMessage.WriteRequest.MessageData.Builder messageDataBuilder =
YdbTopic.StreamWriteMessage.WriteRequest.MessageData.newBuilder()
.setSeqNo(messageSeqNo)
.setData(ByteString.copyFrom(message.getMessage().getData()))
.setCreatedAt(ProtobufUtils.instantToProto(message.getMessage().getCreateTimestamp()))
.setUncompressedSize(message.getUncompressedSizeBytes());

List<MetadataItem> metadataItems = message.getMessage().getMetadataItems();
if (metadataItems != null && !metadataItems.isEmpty()) {
messageDataBuilder.addAllMetadataItems(metadataItems
.stream()
.map(metadataItem -> YdbTopic.MetadataItem.newBuilder()
.setKey(metadataItem.getKey())
.setValue(ByteString.copyFrom(metadataItem.getValue()))
.build())
.collect(Collectors.toList()));
}

YdbTopic.StreamWriteMessage.WriteRequest.MessageData messageData = messageDataBuilder.build();
seqNo = message.updateSeqNo(seqNo);

long sizeWithCurrentMessage = getCurrentRequestSize() + messageData.getSerializedSize() + messageOverheadBytes;
YdbTopic.StreamWriteMessage.WriteRequest.MessageData pb = message.toMessageData();
long sizeWithCurrentMessage = getCurrentRequestSize() + pb.getSerializedSize() + messageOverheadBytes;
if (sizeWithCurrentMessage <= MAX_GRPC_MESSAGE_SIZE) {
addMessage(messageData);
addMessage(pb);
} else {
if (messageCount > 0) {
logger.debug("Adding next message to the same request would lead to grpc request size overflow. " +
"Sending previous {} messages...", messageCount);
sendWriteRequest();
reset();
addMessage(messageData);
addMessage(pb);
} else {
logger.error("A single message is larger than grpc size limit. Sending it anyway...");
addMessage(messageData);
addMessage(pb);
sendWriteRequest();
reset();
}
Expand Down
Loading