Skip to content

Commit 0c7a3b0

Browse files
authored
Merge pull request #78 from accelerated/producer_retry
Producer retry
2 parents f15b59c + 972a008 commit 0c7a3b0

File tree

12 files changed

+539
-45
lines changed

12 files changed

+539
-45
lines changed

include/cppkafka/configuration.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ class CPPKAFKA_API Configuration : public ConfigurationBase<Configuration> {
145145
Configuration& set_default_topic_configuration(TopicConfiguration config);
146146

147147
/**
148-
* Returns true iff the given property name has been set
148+
* Returns true if the given property name has been set
149149
*/
150150
bool has_property(const std::string& name) const;
151151

include/cppkafka/cppkafka.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
#include <cppkafka/macros.h>
4545
#include <cppkafka/message.h>
4646
#include <cppkafka/message_builder.h>
47+
#include <cppkafka/message_internal.h>
4748
#include <cppkafka/metadata.h>
4849
#include <cppkafka/producer.h>
4950
#include <cppkafka/queue.h>

include/cppkafka/message.h

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
namespace cppkafka {
4444

4545
class MessageTimestamp;
46+
struct Internal;
4647

4748
/**
4849
* \brief Thin wrapper over a rdkafka message handle
@@ -56,6 +57,8 @@ class MessageTimestamp;
5657
*/
5758
class CPPKAFKA_API Message {
5859
public:
60+
friend class MessageInternal;
61+
using InternalPtr = std::shared_ptr<Internal>;
5962
/**
6063
* Constructs a message that won't take ownership of the given pointer
6164
*/
@@ -134,14 +137,13 @@ class CPPKAFKA_API Message {
134137
}
135138

136139
/**
137-
* \brief Gets the private data.
140+
* \brief Gets the private user data.
138141
*
139142
* This should only be used on messages produced by a Producer that were set a private data
140143
* attribute
141144
*/
142145
void* get_user_data() const {
143-
assert(handle_);
144-
return handle_->_private;
146+
return user_data_;
145147
}
146148

147149
/**
@@ -164,17 +166,27 @@ class CPPKAFKA_API Message {
164166
rd_kafka_message_t* get_handle() const {
165167
return handle_.get();
166168
}
169+
170+
/**
171+
* Internal private const data accessor (internal use only)
172+
*/
173+
InternalPtr internal() const {
174+
return internal_;
175+
}
167176
private:
168177
using HandlePtr = std::unique_ptr<rd_kafka_message_t, decltype(&rd_kafka_message_destroy)>;
169178

170179
struct NonOwningTag { };
171180

172181
Message(rd_kafka_message_t* handle, NonOwningTag);
173182
Message(HandlePtr handle);
183+
Message& load_internal();
174184

175185
HandlePtr handle_;
176186
Buffer payload_;
177187
Buffer key_;
188+
void* user_data_;
189+
InternalPtr internal_;
178190
};
179191

180192
using MessageList = std::vector<Message>;

include/cppkafka/message_builder.h

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,13 @@ class BasicMessageBuilder {
166166
* Gets the message's user data pointer
167167
*/
168168
void* user_data() const;
169+
170+
/**
171+
* Private data accessor (internal use only)
172+
*/
173+
Message::InternalPtr internal() const;
174+
Concrete& internal(Message::InternalPtr internal);
175+
169176
private:
170177
void construct_buffer(BufferType& lhs, const BufferType& rhs);
171178
Concrete& get_concrete();
@@ -176,11 +183,13 @@ class BasicMessageBuilder {
176183
BufferType payload_;
177184
std::chrono::milliseconds timestamp_{0};
178185
void* user_data_;
186+
Message::InternalPtr internal_;
179187
};
180188

181189
template <typename T, typename C>
182190
BasicMessageBuilder<T, C>::BasicMessageBuilder(std::string topic)
183-
: topic_(std::move(topic)) {
191+
: topic_(std::move(topic)),
192+
user_data_(nullptr) {
184193
}
185194

186195
template <typename T, typename C>
@@ -190,16 +199,16 @@ BasicMessageBuilder<T, C>::BasicMessageBuilder(const Message& message)
190199
payload_(Buffer(message.get_payload().get_data(), message.get_payload().get_size())),
191200
timestamp_(message.get_timestamp() ? message.get_timestamp().get().get_timestamp() :
192201
std::chrono::milliseconds(0)),
193-
user_data_(message.get_user_data())
194-
{
195-
202+
user_data_(message.get_user_data()),
203+
internal_(message.internal()) {
196204
}
197205

198206
template <typename T, typename C>
199207
template <typename U, typename V>
200208
BasicMessageBuilder<T, C>::BasicMessageBuilder(const BasicMessageBuilder<U, V>& rhs)
201209
: topic_(rhs.topic()), partition_(rhs.partition()), timestamp_(rhs.timestamp()),
202-
user_data_(rhs.user_data()) {
210+
user_data_(rhs.user_data()),
211+
internal_(rhs.internal()) {
203212
get_concrete().construct_buffer(key_, rhs.key());
204213
get_concrete().construct_buffer(payload_, rhs.payload());
205214
}
@@ -292,6 +301,17 @@ void* BasicMessageBuilder<T, C>::user_data() const {
292301
return user_data_;
293302
}
294303

304+
template <typename T, typename C>
305+
Message::InternalPtr BasicMessageBuilder<T, C>::internal() const {
306+
return internal_;
307+
}
308+
309+
template <typename T, typename C>
310+
C& BasicMessageBuilder<T, C>::internal(Message::InternalPtr internal) {
311+
internal_ = internal;
312+
return get_concrete();
313+
}
314+
295315
template <typename T, typename C>
296316
void BasicMessageBuilder<T, C>::construct_buffer(T& lhs, const T& rhs) {
297317
lhs = rhs;
@@ -328,6 +348,15 @@ class MessageBuilder : public BasicMessageBuilder<Buffer, MessageBuilder> {
328348
void construct_buffer(Buffer& lhs, const T& rhs) {
329349
lhs = Buffer(rhs);
330350
}
351+
352+
MessageBuilder clone() const {
353+
return std::move(MessageBuilder(topic()).
354+
key(Buffer(key().get_data(), key().get_size())).
355+
payload(Buffer(payload().get_data(), payload().get_size())).
356+
timestamp(timestamp()).
357+
user_data(user_data()).
358+
internal(internal()));
359+
}
331360
};
332361

333362
/**
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright (c) 2017, Matias Fontanini
3+
* All rights reserved.
4+
*
5+
* Redistribution and use in source and binary forms, with or without
6+
* modification, are permitted provided that the following conditions are
7+
* met:
8+
*
9+
* * Redistributions of source code must retain the above copyright
10+
* notice, this list of conditions and the following disclaimer.
11+
* * Redistributions in binary form must reproduce the above
12+
* copyright notice, this list of conditions and the following disclaimer
13+
* in the documentation and/or other materials provided with the
14+
* distribution.
15+
*
16+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27+
*
28+
*/
29+
30+
#ifndef CPPKAFKA_MESSAGE_INTERNAL_H
31+
#define CPPKAFKA_MESSAGE_INTERNAL_H
32+
33+
#include <memory>
34+
35+
namespace cppkafka {
36+
37+
class Message;
38+
39+
class Internal {
40+
public:
41+
virtual ~Internal() = default;
42+
};
43+
using InternalPtr = std::shared_ptr<Internal>;
44+
45+
/**
46+
* \brief Private message data structure
47+
*/
48+
class MessageInternal {
49+
public:
50+
MessageInternal(void* user_data, std::shared_ptr<Internal> internal);
51+
static std::unique_ptr<MessageInternal> load(Message& message);
52+
void* get_user_data() const;
53+
InternalPtr get_internal() const;
54+
private:
55+
void* user_data_;
56+
InternalPtr internal_;
57+
};
58+
59+
template <typename BuilderType>
60+
class MessageInternalGuard {
61+
public:
62+
MessageInternalGuard(BuilderType& builder)
63+
: builder_(builder),
64+
user_data_(builder.user_data()) {
65+
if (builder_.internal()) {
66+
// Swap contents with user_data
67+
ptr_.reset(new MessageInternal(user_data_, builder_.internal()));
68+
builder_.user_data(ptr_.get()); //overwrite user data
69+
}
70+
}
71+
~MessageInternalGuard() {
72+
//Restore user data
73+
builder_.user_data(user_data_);
74+
}
75+
void release() {
76+
ptr_.release();
77+
}
78+
private:
79+
BuilderType& builder_;
80+
std::unique_ptr<MessageInternal> ptr_;
81+
void* user_data_;
82+
};
83+
84+
}
85+
86+
#endif //CPPKAFKA_MESSAGE_INTERNAL_H

0 commit comments

Comments
 (0)