Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions src/Pulsar.Client/Common/DTO.fs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ type internal Metadata =
EncryptionAlgo: string
OrderingKey: byte[]
ReplicatedFrom: string
ProducerName: string
}

type MessageKey =
Expand Down Expand Up @@ -264,7 +265,7 @@ type Message<'T> internal (messageId: MessageId, data: byte[], key: PartitionKey
properties: IReadOnlyDictionary<string, string>, encryptionCtx: EncryptionContext option,
schemaVersion: byte[], sequenceId: SequenceId, orderingKey: byte[], publishTime: TimeStamp,
eventTime: Nullable<TimeStamp>,
redeliveryCount: int32, replicatedFrom: string,
redeliveryCount: int32, replicatedFrom: string, producerName: string,
getValue: unit -> 'T) =
/// Get the unique message ID associated with this message.
member this.MessageId = messageId
Expand Down Expand Up @@ -293,26 +294,27 @@ type Message<'T> internal (messageId: MessageId, data: byte[], key: PartitionKey
member this.RedeliveryCount = redeliveryCount
/// Get name of cluster, from which the message is replicated.
member this.ReplicatedFrom = replicatedFrom

/// Get name of producer of the message
member this.ProducerName = producerName
/// Get the de-serialized value of the message, according the configured Schema.
member this.GetValue() =
getValue()

member internal this.WithMessageId messageId =
Message(messageId, data, key, hasBase64EncodedKey, properties, encryptionCtx, schemaVersion, sequenceId,
orderingKey, publishTime, eventTime, redeliveryCount, replicatedFrom, getValue)
orderingKey, publishTime, eventTime, redeliveryCount, replicatedFrom, producerName, getValue)
/// Get a new instance of the message with updated data
member this.WithData data =
Message(messageId, data, key, hasBase64EncodedKey, properties, encryptionCtx, schemaVersion, sequenceId,
orderingKey, publishTime, eventTime, redeliveryCount, replicatedFrom, getValue)
orderingKey, publishTime, eventTime, redeliveryCount, replicatedFrom, producerName, getValue)
/// Get a new instance of the message with updated key
member this.WithKey (key, hasBase64EncodedKey) =
Message(messageId, data, key, hasBase64EncodedKey, properties, encryptionCtx, schemaVersion, sequenceId,
orderingKey, publishTime, eventTime, redeliveryCount, replicatedFrom, getValue)
orderingKey, publishTime, eventTime, redeliveryCount, replicatedFrom, producerName, getValue)
/// Get a new instance of the message with updated properties
member this.WithProperties properties =
Message(messageId, data, key, hasBase64EncodedKey, properties, encryptionCtx, schemaVersion, sequenceId,
orderingKey, publishTime, eventTime, redeliveryCount, replicatedFrom, getValue)
orderingKey, publishTime, eventTime, redeliveryCount, replicatedFrom, producerName, getValue)

type Messages<'T> internal(maxNumberOfMessages: int, maxSizeOfMessages: int64) =

Expand Down
1 change: 1 addition & 0 deletions src/Pulsar.Client/Internal/ClientCnx.fs
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ and internal ClientCnx (config: PulsarClientConfiguration,
EncryptionAlgo = messageMetadata.EncryptionAlgo
OrderingKey = messageMetadata.OrderingKey
ReplicatedFrom = messageMetadata.ReplicatedFrom
ProducerName = messageMetadata.ProducerName
}

{
Expand Down
17 changes: 9 additions & 8 deletions src/Pulsar.Client/Internal/ConsumerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
ackRequests.Clear()

let getNewIndividualMsgIdWithPartition messageId =
{ messageId with Type = MessageIdType.Single; Partition = partitionIndex; TopicName = %"" }
{ messageId with Type = MessageIdType.Single; Partition = partitionIndex; TopicName = topicName.CompleteTopicName }

let processPossibleToDLQ (messageId : MessageId) =
let acknowledge = trySendAcknowledge Individual EmptyProperties None
Expand Down Expand Up @@ -685,6 +685,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
rawMessage.Metadata.EventTime,
rawMessage.RedeliveryCount,
rawMessage.Metadata.ReplicatedFrom,
rawMessage.Metadata.ProducerName,
getValue
)
if (rawMessage.RedeliveryCount >= deadLettersProcessor.MaxRedeliveryCount) then
Expand Down Expand Up @@ -1336,13 +1337,12 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
elif rawMessage.AckSet.Count > 0 && not rawMessage.AckSet[i] then
skippedMessages <- skippedMessages + 1
else
let messageId =
{
rawMessage.MessageId with
Partition = partitionIndex
Type = Batch(%i, acker)
TopicName = %""
}
let messageId = {
rawMessage.MessageId with
Partition = partitionIndex
Type = Batch(%i, acker)
TopicName = topicName.CompleteTopicName
}
let msgKey = singleMessageMetadata.PartitionKey
let getValue () =
keyValueProcessor
Expand Down Expand Up @@ -1374,6 +1374,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
eventTime,
rawMessage.RedeliveryCount,
rawMessage.Metadata.ReplicatedFrom,
rawMessage.Metadata.ProducerName,
getValue
)
if (rawMessage.RedeliveryCount >= deadLettersProcessor.MaxRedeliveryCount) then
Expand Down
10 changes: 8 additions & 2 deletions tests/IntegrationTests/Basic.fs
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,22 @@ let tests =

testList "Basic" [

testTask "Sent messageId should be equal to received messageId" {
testTask "Sent message/messageId should be equal to received message/messageId" {

Log.Debug("Started Sent messageId should be equal to received messageId")
let client = getClient()
let topicName = "public/default/topic-" + Guid.NewGuid().ToString("N")
let topicName = "persistent://public/default/topic-" + Guid.NewGuid().ToString("N")

let! (producer1 : IProducer<byte[]>) =
client.NewProducer()
.Topic(topicName)
.ProducerName("producer1")
.CreateAsync()

let! (producer2 : IProducer<byte[]>) =
client.NewProducer()
.Topic(topicName)
.ProducerName("producer2")
.EnableBatching(false)
.CreateAsync()

Expand All @@ -52,9 +54,13 @@ let tests =

Expect.isTrue "" (msg1Id = msg1.MessageId)
Expect.equal "" [| 0uy |] <| msg1.GetValue()
Expect.equal "Message ID topic name should match" topicName (string msg1.MessageId.TopicName)
Expect.equal "Message producer name should match" "producer1" (string msg1.ProducerName)

Expect.isTrue "" (msg2Id = msg2.MessageId)
Expect.equal "" [| 1uy |] <| msg2.GetValue()
Expect.equal "Message ID topic name should match" topicName (string msg2.MessageId.TopicName)
Expect.equal "Message producer name should match" "producer2" (string msg2.ProducerName)

Log.Debug("Finished Sent messageId should be equal to received messageId")
}
Expand Down
1 change: 1 addition & 0 deletions tests/UnitTests/Internal/ChunkedMessageTrackerTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ let tests =
EventTime = Nullable()
OrderingKey = [||]
ReplicatedFrom = ""
ProducerName = ""
}

let testRawMessage =
Expand Down
Loading