Skip to content

Commit c671ac0

Browse files
authored
[feat] Support KeyValue Schema. (#22)
### Motivation C++ client Support KeyValue Schema. For key and value schema, just only support `AVRO` and `JSON` type(consistent with java client). This PR has been reviewed in pulsar repo: apache/pulsar#17125 ### Modifications - A new constructor is added in `SchemaInfo` to combine key and value schemas. - Add a new `KeyValue` class, to help users merge and parse key and value data.
1 parent 1ea58e1 commit c671ac0

23 files changed

+894
-25
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ apache-pulsar-client-cpp-*.tar.gz
4545
/examples/SampleConsumerListener
4646
/examples/SampleConsumerListenerCApi
4747
/examples/SampleReaderCApi
48+
/examples/SampleKeyValueSchemaConsumer
49+
/examples/SampleKeyValueSchemaProducer
4850
/examples/SampleFileLogger
4951
/tests/main
5052
/tests/pulsar-tests
@@ -98,4 +100,4 @@ vcpkg_installed/
98100
*.rej
99101
.tests-container-id.txt
100102
Testing
101-
.test-token.txt
103+
.test-token.txt

examples/CMakeLists.txt

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -60,25 +60,36 @@ set(SAMPLE_CONSUMER_LISTENER_C_SOURCES
6060
set(SAMPLE_READER_C_SOURCES
6161
SampleReaderCApi.c
6262
)
63+
set(SAMPLE_KEY_VALUE_SCHEMA_CONSUMER
64+
SampleKeyValueSchemaConsumer.cc
65+
)
66+
67+
set(SAMPLE_KEY_VALUE_SCHEMA_PRODUCER
68+
SampleKeyValueSchemaProducer.cc
69+
)
6370

64-
add_executable(SampleAsyncProducer ${SAMPLE_ASYNC_PRODUCER_SOURCES})
65-
add_executable(SampleConsumer ${SAMPLE_CONSUMER_SOURCES})
66-
add_executable(SampleConsumerListener ${SAMPLE_CONSUMER_LISTENER_SOURCES})
67-
add_executable(SampleProducer ${SAMPLE_PRODUCER_SOURCES})
68-
add_executable(SampleFileLogger ${SAMPLE_FILE_LOGGER_SOURCES})
69-
add_executable(SampleProducerCApi ${SAMPLE_PRODUCER_C_SOURCES})
70-
add_executable(SampleConsumerCApi ${SAMPLE_CONSUMER_C_SOURCES})
71-
add_executable(SampleAsyncConsumerCApi ${SAMPLE_CONSUMER_LISTENER_C_SOURCES})
72-
add_executable(SampleConsumerListenerCApi ${SAMPLE_CONSUMER_LISTENER_C_SOURCES})
73-
add_executable(SampleReaderCApi ${SAMPLE_READER_C_SOURCES})
71+
add_executable(SampleAsyncProducer ${SAMPLE_ASYNC_PRODUCER_SOURCES})
72+
add_executable(SampleConsumer ${SAMPLE_CONSUMER_SOURCES})
73+
add_executable(SampleConsumerListener ${SAMPLE_CONSUMER_LISTENER_SOURCES})
74+
add_executable(SampleProducer ${SAMPLE_PRODUCER_SOURCES})
75+
add_executable(SampleFileLogger ${SAMPLE_FILE_LOGGER_SOURCES})
76+
add_executable(SampleProducerCApi ${SAMPLE_PRODUCER_C_SOURCES})
77+
add_executable(SampleConsumerCApi ${SAMPLE_CONSUMER_C_SOURCES})
78+
add_executable(SampleAsyncConsumerCApi ${SAMPLE_CONSUMER_LISTENER_C_SOURCES})
79+
add_executable(SampleConsumerListenerCApi ${SAMPLE_CONSUMER_LISTENER_C_SOURCES})
80+
add_executable(SampleReaderCApi ${SAMPLE_READER_C_SOURCES})
81+
add_executable(SampleKeyValueSchemaConsumer ${SAMPLE_KEY_VALUE_SCHEMA_CONSUMER})
82+
add_executable(SampleKeyValueSchemaProducer ${SAMPLE_KEY_VALUE_SCHEMA_PRODUCER})
7483

75-
target_link_libraries(SampleAsyncProducer ${CLIENT_LIBS} pulsarShared)
76-
target_link_libraries(SampleConsumer ${CLIENT_LIBS} pulsarShared)
77-
target_link_libraries(SampleConsumerListener ${CLIENT_LIBS} pulsarShared)
78-
target_link_libraries(SampleProducer ${CLIENT_LIBS} pulsarShared)
79-
target_link_libraries(SampleFileLogger ${CLIENT_LIBS} pulsarShared)
80-
target_link_libraries(SampleProducerCApi ${CLIENT_LIBS} pulsarShared)
81-
target_link_libraries(SampleConsumerCApi ${CLIENT_LIBS} pulsarShared)
82-
target_link_libraries(SampleAsyncConsumerCApi ${CLIENT_LIBS} pulsarShared)
83-
target_link_libraries(SampleConsumerListenerCApi ${CLIENT_LIBS} pulsarShared)
84-
target_link_libraries(SampleReaderCApi ${CLIENT_LIBS} pulsarShared)
84+
target_link_libraries(SampleAsyncProducer ${CLIENT_LIBS} pulsarShared)
85+
target_link_libraries(SampleConsumer ${CLIENT_LIBS} pulsarShared)
86+
target_link_libraries(SampleConsumerListener ${CLIENT_LIBS} pulsarShared)
87+
target_link_libraries(SampleProducer ${CLIENT_LIBS} pulsarShared)
88+
target_link_libraries(SampleFileLogger ${CLIENT_LIBS} pulsarShared)
89+
target_link_libraries(SampleProducerCApi ${CLIENT_LIBS} pulsarShared)
90+
target_link_libraries(SampleConsumerCApi ${CLIENT_LIBS} pulsarShared)
91+
target_link_libraries(SampleAsyncConsumerCApi ${CLIENT_LIBS} pulsarShared)
92+
target_link_libraries(SampleConsumerListenerCApi ${CLIENT_LIBS} pulsarShared)
93+
target_link_libraries(SampleReaderCApi ${CLIENT_LIBS} pulsarShared)
94+
target_link_libraries(SampleKeyValueSchemaConsumer ${CLIENT_LIBS} pulsarShared)
95+
target_link_libraries(SampleKeyValueSchemaProducer ${CLIENT_LIBS} pulsarShared)
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#include <lib/LogUtils.h>
20+
#include <pulsar/Client.h>
21+
22+
#include <iostream>
23+
24+
DECLARE_LOG_OBJECT()
25+
26+
using namespace pulsar;
27+
28+
int main() {
29+
Client client("pulsar://localhost:6650");
30+
31+
std::string jsonSchema =
32+
R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
33+
34+
SchemaInfo keySchema(JSON, "key-json", jsonSchema);
35+
SchemaInfo valueSchema(JSON, "value-json", jsonSchema);
36+
SchemaInfo keyValueSchema(keySchema, valueSchema, KeyValueEncodingType::INLINE);
37+
ConsumerConfiguration consumerConfiguration;
38+
consumerConfiguration.setSchema(keyValueSchema);
39+
40+
Consumer consumer;
41+
Result result = client.subscribe("persistent://public/default/kv-schema", "consumer-1",
42+
consumerConfiguration, consumer);
43+
if (result != ResultOk) {
44+
LOG_ERROR("Failed to subscribe: " << result);
45+
return -1;
46+
}
47+
48+
LOG_INFO("Start receive message.")
49+
50+
Message msg;
51+
while (true) {
52+
consumer.receive(msg);
53+
LOG_INFO("Received: " << msg << " with payload '" << msg.getDataAsString() << "'");
54+
LOG_INFO("Received: " << msg << " with partitionKey '" << msg.getPartitionKey() << "'");
55+
KeyValue keyValue = msg.getKeyValueData();
56+
LOG_INFO("Received: " << msg << " with key '" << keyValue.getKey() << "'");
57+
LOG_INFO("Received: " << msg << " with value '" << keyValue.getValueAsString() << "'");
58+
consumer.acknowledge(msg);
59+
}
60+
61+
client.close();
62+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#include <lib/LogUtils.h>
20+
#include <pulsar/Client.h>
21+
22+
#include <iostream>
23+
#include <thread>
24+
25+
DECLARE_LOG_OBJECT()
26+
27+
using namespace pulsar;
28+
29+
int main() {
30+
Client client("pulsar://localhost:6650");
31+
32+
std::string jsonSchema =
33+
R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
34+
35+
SchemaInfo keySchema(JSON, "key-json", jsonSchema);
36+
SchemaInfo valueSchema(JSON, "value-json", jsonSchema);
37+
SchemaInfo keyValueSchema(keySchema, valueSchema, KeyValueEncodingType::INLINE);
38+
LOG_INFO("KeyValue schema content: " << keyValueSchema.getSchema());
39+
40+
ProducerConfiguration producerConfiguration;
41+
producerConfiguration.setSchema(keyValueSchema);
42+
43+
Producer producer;
44+
Result result =
45+
client.createProducer("persistent://public/default/kv-schema", producerConfiguration, producer);
46+
if (result != ResultOk) {
47+
LOG_ERROR("Error creating producer: " << result);
48+
return -1;
49+
}
50+
51+
std::string jsonData = "{\"re\":2.1,\"im\":1.23}";
52+
53+
KeyValue keyValue(std::move(jsonData), std::move(jsonData));
54+
55+
Message msg = MessageBuilder().setContent(keyValue).setProperty("x", "1").build();
56+
result = producer.send(msg);
57+
if (result == ResultOk) {
58+
LOG_INFO("send message ok");
59+
}
60+
client.close();
61+
}

include/pulsar/KeyValue.h

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#ifndef KEY_VALUE_HPP_
20+
#define KEY_VALUE_HPP_
21+
22+
#include <memory>
23+
#include <string>
24+
25+
#include "Schema.h"
26+
#include "defines.h"
27+
28+
namespace pulsar {
29+
30+
class KeyValueImpl;
31+
32+
/**
33+
* Use to when the user uses key value schema.
34+
*/
35+
class PULSAR_PUBLIC KeyValue {
36+
public:
37+
/**
38+
* Constructor key value, according to keyValueEncodingType, whether key and value be encoded together.
39+
*
40+
* @param key key data.
41+
* @param value value data.
42+
* @param keyValueEncodingType key value encoding type.
43+
*/
44+
KeyValue(std::string &&key, std::string &&value);
45+
46+
/**
47+
* Get the key of KeyValue.
48+
*
49+
* @return character stream for key
50+
*/
51+
std::string getKey() const;
52+
53+
/**
54+
* Get the value of the KeyValue.
55+
*
56+
*
57+
* @return the pointer to the KeyValue value
58+
*/
59+
const void *getValue() const;
60+
61+
/**
62+
* Get the value length of the keyValue.
63+
*
64+
* @return the length of the KeyValue value
65+
*/
66+
size_t getValueLength() const;
67+
68+
/**
69+
* Get string representation of the KeyValue value.
70+
*
71+
* @return the string representation of the KeyValue value
72+
*/
73+
std::string getValueAsString() const;
74+
75+
private:
76+
typedef std::shared_ptr<KeyValueImpl> KeyValueImplPtr;
77+
KeyValue(KeyValueImplPtr keyValueImplPtr);
78+
KeyValueImplPtr impl_;
79+
friend class Message;
80+
friend class MessageBuilder;
81+
};
82+
} // namespace pulsar
83+
84+
#endif /* KEY_VALUE_HPP_ */

include/pulsar/Message.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <memory>
2626
#include <string>
2727

28+
#include "KeyValue.h"
2829
#include "MessageId.h"
2930

3031
namespace pulsar {
@@ -92,6 +93,13 @@ class PULSAR_PUBLIC Message {
9293
*/
9394
std::string getDataAsString() const;
9495

96+
/**
97+
* Get key value message.
98+
*
99+
* @return key value message.
100+
*/
101+
KeyValue getKeyValueData() const;
102+
95103
/**
96104
* Get the unique message ID associated with this message.
97105
*

include/pulsar/MessageBuilder.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#ifndef MESSAGE_BUILDER_H
2020
#define MESSAGE_BUILDER_H
2121

22+
#include <pulsar/KeyValue.h>
2223
#include <pulsar/Message.h>
2324
#include <pulsar/defines.h>
2425

@@ -60,6 +61,13 @@ class PULSAR_PUBLIC MessageBuilder {
6061
*/
6162
MessageBuilder& setContent(std::string&& data);
6263

64+
/**
65+
* Set the key value content of the message
66+
*
67+
* @param data the content of the key value.
68+
*/
69+
MessageBuilder& setContent(const KeyValue& data);
70+
6371
/**
6472
* Set content of the message to a buffer already allocated by the caller. No copies of
6573
* this buffer will be made. The caller is responsible to ensure the memory buffer is

include/pulsar/Schema.h

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,27 @@
2727

2828
namespace pulsar {
2929

30+
/**
31+
* Encoding types of supported KeyValueSchema for Pulsar messages.
32+
*/
33+
enum class KeyValueEncodingType
34+
{
35+
/**
36+
* Key is stored as message key, while value is stored as message payload.
37+
*/
38+
SEPARATED,
39+
40+
/**
41+
* Key and value are stored as message payload.
42+
*/
43+
INLINE
44+
};
45+
46+
// Return string representation of result code
47+
PULSAR_PUBLIC const char *strEncodingType(pulsar::KeyValueEncodingType encodingType);
48+
49+
PULSAR_PUBLIC KeyValueEncodingType enumEncodingType(std::string encodingTypeStr);
50+
3051
enum SchemaType
3152
{
3253
/**
@@ -143,6 +164,14 @@ class PULSAR_PUBLIC SchemaInfo {
143164
SchemaInfo(SchemaType schemaType, const std::string &name, const std::string &schema,
144165
const StringMap &properties = StringMap());
145166

167+
/**
168+
* @param keySchema the key schema.
169+
* @param valueSchema the value schema.
170+
* @param keyValueEncodingType Encoding types of supported KeyValueSchema for Pulsar messages.
171+
*/
172+
SchemaInfo(const SchemaInfo &keySchema, const SchemaInfo &valueSchema,
173+
const KeyValueEncodingType &keyValueEncodingType = KeyValueEncodingType::INLINE);
174+
146175
/**
147176
* @return the schema type
148177
*/
@@ -166,8 +195,11 @@ class PULSAR_PUBLIC SchemaInfo {
166195
private:
167196
typedef std::shared_ptr<SchemaInfoImpl> SchemaInfoImplPtr;
168197
SchemaInfoImplPtr impl_;
198+
static constexpr uint32_t INVALID_SIZE = 0xFFFFFFFF;
169199
};
170200

171201
} // namespace pulsar
172202

173203
PULSAR_PUBLIC std::ostream &operator<<(std::ostream &s, pulsar::SchemaType schemaType);
204+
205+
PULSAR_PUBLIC std::ostream &operator<<(std::ostream &s, pulsar::KeyValueEncodingType encodingType);

lib/Commands.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ static inline bool isBuiltInSchema(SchemaType schemaType) {
6969
case AVRO:
7070
case PROTOBUF:
7171
case PROTOBUF_NATIVE:
72+
case KEY_VALUE:
7273
return true;
7374

7475
default:
@@ -90,6 +91,8 @@ static inline proto::Schema_Type getSchemaType(SchemaType type) {
9091
return proto::Schema_Type_Avro;
9192
case PROTOBUF_NATIVE:
9293
return proto::Schema_Type_ProtobufNative;
94+
case KEY_VALUE:
95+
return proto::Schema_Type_KeyValue;
9396
default:
9497
return proto::Schema_Type_None;
9598
}

0 commit comments

Comments
 (0)