Skip to content

Commit 20d69cc

Browse files
authored
PIP-432: Add isEncrypted field to EncryptionContext (#324) (#325)
* PIP-432: Add isEncrypted field to EncryptionContext * Add doc for EncryptionContext.IsEncrypted (cherry picked from commit 1eae9d1)
1 parent 84e536a commit 20d69cc

File tree

3 files changed

+43
-14
lines changed

3 files changed

+43
-14
lines changed

src/Pulsar.Client/Common/DTO.fs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,9 +242,11 @@ type EncryptionContext =
242242
CompressionType: CompressionType
243243
UncompressedMessageSize: int
244244
BatchSize: Nullable<int>
245+
// Indicates whether the message payload remains encrypted (true) or has been successfully decrypted (false)
246+
IsEncrypted: bool
245247
}
246248
with
247-
static member internal FromMetadata(metadata: Metadata) =
249+
static member internal FromMetadata(metadata: Metadata, isEncrypted: bool) =
248250
if metadata.EncryptionKeys.Length > 0 then
249251
{
250252
Keys = metadata.EncryptionKeys
@@ -253,6 +255,7 @@ type EncryptionContext =
253255
CompressionType = metadata.CompressionType
254256
UncompressedMessageSize = metadata.UncompressedMessageSize
255257
BatchSize = if metadata.HasNumMessagesInBatch then Nullable(metadata.NumMessages) else Nullable()
258+
IsEncrypted = isEncrypted
256259
} |> Some
257260
else
258261
None

src/Pulsar.Client/Internal/ConsumerImpl.fs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -661,7 +661,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
661661
trackMessage rawMessage.MessageId
662662
None
663663

664-
let handleSingleMessagePayload (rawMessage: RawMessage) msgId payload hasWaitingChannel hasWaitingBatchChannel schemaDecodeFunction =
664+
let handleSingleMessagePayload (rawMessage: RawMessage) msgId payload hasWaitingChannel hasWaitingBatchChannel isMessageUndecryptable schemaDecodeFunction =
665665
if duringSeek.IsSome || (isSameEntry(rawMessage.MessageId) && isPriorEntryIndex(rawMessage.MessageId.EntryId)) then
666666
// We need to discard entries that were prior to startMessageId
667667
Log.Logger.LogInformation("{0} Ignoring message from before the startMessageId: {1}", prefix, startMessageId)
@@ -677,7 +677,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
677677
%msgKey,
678678
rawMessage.IsKeyBase64Encoded,
679679
rawMessage.Properties,
680-
EncryptionContext.FromMetadata rawMessage.Metadata,
680+
EncryptionContext.FromMetadata(rawMessage.Metadata, isEncrypted = isMessageUndecryptable),
681681
getSchemaVersionBytes rawMessage.Metadata.SchemaVersion,
682682
rawMessage.Metadata.SequenceId,
683683
rawMessage.Metadata.OrderingKey,
@@ -715,15 +715,15 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
715715
if isChunkedMessage then
716716
match processMessageChunk rawMessage msgId with
717717
| Some (chunkedPayload, msgIdWithChunk) ->
718-
handleSingleMessagePayload rawMessage msgIdWithChunk chunkedPayload hasWaitingChannel hasWaitingBatchChannel schemaDecodeFunction
718+
handleSingleMessagePayload rawMessage msgIdWithChunk chunkedPayload hasWaitingChannel hasWaitingBatchChannel isMessageUndecryptable schemaDecodeFunction
719719
| None ->
720720
()
721721
else
722-
handleSingleMessagePayload rawMessage msgId rawMessage.Payload hasWaitingChannel hasWaitingBatchChannel schemaDecodeFunction
722+
handleSingleMessagePayload rawMessage msgId rawMessage.Payload hasWaitingChannel hasWaitingBatchChannel isMessageUndecryptable schemaDecodeFunction
723723
elif rawMessage.Metadata.NumMessages > 0 then
724724
// handle batch message enqueuing; uncompressed payload has all messages in batch
725725
try
726-
this.ReceiveIndividualMessagesFromBatch rawMessage schemaDecodeFunction
726+
this.ReceiveIndividualMessagesFromBatch rawMessage schemaDecodeFunction isMessageUndecryptable
727727
with ex ->
728728
Log.Logger.LogError(ex, "{0} Batch reading exception {1}", prefix, msgId)
729729
raise <| BatchDeserializationException "Batch reading exception"
@@ -1315,8 +1315,8 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
13151315
do startStatTimer()
13161316
do startChunkTimer()
13171317

1318-
abstract member ReceiveIndividualMessagesFromBatch: RawMessage -> (byte [] -> 'T) -> unit
1319-
default this.ReceiveIndividualMessagesFromBatch (rawMessage: RawMessage) schemaDecodeFunction =
1318+
abstract member ReceiveIndividualMessagesFromBatch: RawMessage -> (byte [] -> 'T) -> bool -> unit
1319+
default this.ReceiveIndividualMessagesFromBatch (rawMessage: RawMessage) schemaDecodeFunction isMessageUndecryptable =
13201320
let batchSize = rawMessage.Metadata.NumMessages
13211321
let acker = BatchMessageAcker(batchSize)
13221322
let mutable skippedMessages = 0
@@ -1366,7 +1366,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
13661366
%msgKey,
13671367
singleMessageMetadata.PartitionKeyB64Encoded,
13681368
properties,
1369-
EncryptionContext.FromMetadata rawMessage.Metadata,
1369+
EncryptionContext.FromMetadata(rawMessage.Metadata, isEncrypted = isMessageUndecryptable),
13701370
getSchemaVersionBytes rawMessage.Metadata.SchemaVersion,
13711371
%(int64 singleMessageMetadata.SequenceId),
13721372
singleMessageMetadata.OrderingKey,
@@ -1704,7 +1704,7 @@ and internal ZeroQueueConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T
17041704
if this.Waiters.Count > 0 then
17051705
this.SendFlowPermits this.Waiters.Count
17061706

1707-
override this.ReceiveIndividualMessagesFromBatch (_: RawMessage) _ =
1707+
override this.ReceiveIndividualMessagesFromBatch (_: RawMessage) _ _ =
17081708
Log.Logger.LogError("{0} Closing consumer due to unsupported received batch-message with zero receiver queue size", prefix)
17091709
let _ = postAndAsyncReply this.Mb ConsumerMessage.Close
17101710
let exn =

tests/IntegrationTests/MessageCrypto.fs

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,32 @@ type Consumer2KeyReader() =
9595
{ Key = Encoding.UTF8.GetBytes(privateKeysConsumer2.Item keyName); Metadata = null }
9696

9797

98+
let consumeMessagesAndCheckEncryption (consumer: IConsumer<byte[]>) number consumerName =
99+
task {
100+
for i in 1..number do
101+
let! message = consumer.ReceiveAsync()
102+
let received = Encoding.UTF8.GetString(message.Data)
103+
Log.Debug("{0} received {1}", consumerName, received)
104+
105+
// Check EncryptionContext
106+
match message.EncryptionContext with
107+
| Some context ->
108+
Log.Debug("{0} message {1} EncryptionContext.IsEncrypted = {2}", consumerName, i, context.IsEncrypted)
109+
if context.IsEncrypted then
110+
failwith $"Message {i} should not be encrypted, but IsEncrypted = true"
111+
| None ->
112+
failwith $"Message {i} should have EncryptionContext but it is None"
113+
114+
do! consumer.AcknowledgeAsync(message.MessageId)
115+
Log.Debug("{0} acknowledged {1}", consumerName, received)
116+
let expected = "Message #" + string i
117+
if received.StartsWith(expected) |> not then
118+
failwith $"Incorrect message expected {expected} received {received} consumer {consumerName}"
119+
120+
Log.Debug("{0} consumed {1} messages, all EncryptionContext.IsEncrypted = false", consumerName, number)
121+
}
122+
123+
98124
[<Tests>]
99125
let tests =
100126
testList "MessageCrypto" [
@@ -129,7 +155,7 @@ let tests =
129155
let consumerTask =
130156
Task.Run(fun () ->
131157
task {
132-
do! consumeMessages consumer numberOfMessages consumerName
158+
do! consumeMessagesAndCheckEncryption consumer numberOfMessages consumerName
133159
} :> Task)
134160

135161
do! Task.WhenAll(producerTask, consumerTask)
@@ -169,7 +195,7 @@ let tests =
169195
let consumer1Task =
170196
Task.Run(fun () ->
171197
task {
172-
do! consumeMessages consumer1 numberOfMessages consumerName
198+
do! consumeMessagesAndCheckEncryption consumer1 numberOfMessages consumerName
173199
} :> Task)
174200

175201
do! Task.WhenAll(producerTask, consumer1Task)
@@ -196,7 +222,7 @@ let tests =
196222
let consumer2Task =
197223
Task.Run(fun () ->
198224
task {
199-
do! consumeMessages consumer2 numberOfMessages consumerName
225+
do! consumeMessagesAndCheckEncryption consumer2 numberOfMessages consumerName
200226
} :> Task)
201227

202228
do! Task.WhenAll(producerTask2, consumer2Task)
@@ -242,7 +268,7 @@ let tests =
242268
Expect.isNonEmpty context.Param ""
243269
Expect.isNonEmpty context.Keys ""
244270
Expect.equal context.CompressionType compressionType ""
245-
271+
Expect.equal context.IsEncrypted true "Message should remain encrypted when decryption fails"
246272
do! Task.Delay 100
247273
Log.Debug("Ended Encryption send message and consume on fail")
248274
}

0 commit comments

Comments
 (0)