@@ -32,9 +32,9 @@ replicate_batcher::replicate_batcher(consensus* ptr, size_t cache_size)
3232 : _ptr(ptr)
3333 , _max_batch_size_sem(cache_size) {}
3434
35- ss::future<result<replicate_result>>
36- replicate_batcher::replicate ( model::record_batch_reader&& r) {
37- return do_cache (std::move (r)).then ([this ](item_ptr i) {
35+ ss::future<result<replicate_result>> replicate_batcher::replicate (
36+ std::optional<model::term_id> expected_term, model::record_batch_reader&& r) {
37+ return do_cache (expected_term, std::move (r)).then ([this ](item_ptr i) {
3838 return _lock.with ([this ] { return flush (); }).then ([i] {
3939 return i->_promise .get_future ();
4040 });
@@ -51,13 +51,14 @@ ss::future<> replicate_batcher::stop() {
5151 " finish replicating pending entries" ));
5252 }
5353 _item_cache.clear ();
54- _data_cache.clear ();
5554 });
5655}
57- ss::future<replicate_batcher::item_ptr>
58- replicate_batcher::do_cache (model::record_batch_reader&& r) {
56+
57+ ss::future<replicate_batcher::item_ptr> replicate_batcher::do_cache (
58+ std::optional<model::term_id> expected_term, model::record_batch_reader&& r) {
5959 return model::consume_reader_to_memory (std::move (r), model::no_timeout)
60- .then ([this ](ss::circular_buffer<model::record_batch> batches) {
60+ .then ([this ,
61+ expected_term](ss::circular_buffer<model::record_batch> batches) {
6162 ss::circular_buffer<model::record_batch> data;
6263 size_t bytes = std::accumulate (
6364 batches.cbegin (),
@@ -66,27 +67,31 @@ replicate_batcher::do_cache(model::record_batch_reader&& r) {
6667 [](size_t sum, const model::record_batch& b) {
6768 return sum + b.size_bytes ();
6869 });
69- return do_cache_with_backpressure (std::move (batches), bytes);
70+ return do_cache_with_backpressure (
71+ expected_term, std::move (batches), bytes);
7072 });
7173}
7274
7375ss::future<replicate_batcher::item_ptr>
7476replicate_batcher::do_cache_with_backpressure (
75- ss::circular_buffer<model::record_batch> batches, size_t bytes) {
77+ std::optional<model::term_id> expected_term,
78+ ss::circular_buffer<model::record_batch> batches,
79+ size_t bytes) {
7680 return ss::get_units (_max_batch_size_sem, bytes)
77- .then ([this , batches = std::move (batches)](
81+ .then ([this , expected_term, batches = std::move (batches)](
7882
7983 ss::semaphore_units<> u) mutable {
8084 size_t record_count = 0 ;
85+ auto i = ss::make_lw_shared<item>();
8186 for (auto & b : batches) {
8287 record_count += b.record_count ();
8388 if (b.header ().ctx .owner_shard == ss::this_shard_id ()) {
84- _data_cache .push_back (std::move (b));
89+ i-> data .push_back (std::move (b));
8590 } else {
86- _data_cache .push_back (b.copy ());
91+ i-> data .push_back (b.copy ());
8792 }
8893 }
89- auto i = ss::make_lw_shared<item>() ;
94+ i-> expected_term = expected_term ;
9095 i->record_count = record_count;
9196 i->units = std::move (u);
9297 _item_cache.emplace_back (i);
@@ -95,20 +100,14 @@ replicate_batcher::do_cache_with_backpressure(
95100}
96101
97102ss::future<> replicate_batcher::flush () {
98- auto notifications = std::exchange (_item_cache, {});
99- auto data = std::exchange (_data_cache, {});
100- if (notifications.empty ()) {
103+ auto item_cache = std::exchange (_item_cache, {});
104+ if (item_cache.empty ()) {
101105 return ss::now ();
102106 }
103107 return ss::with_gate (
104- _ptr->_bg ,
105- [this ,
106- data = std::move (data),
107- notifications = std::move (notifications)]() mutable {
108+ _ptr->_bg , [this , item_cache = std::move (item_cache)]() mutable {
108109 return _ptr->_op_lock .get_units ().then (
109- [this ,
110- data = std::move (data),
111- notifications = std::move (notifications)](
110+ [this , item_cache = std::move (item_cache)](
112111 ss::semaphore_units<> u) mutable {
113112 // we have to check if we are the leader
114113 // it is critical as term could have been updated already by
@@ -117,17 +116,35 @@ ss::future<> replicate_batcher::flush() {
117116 // this problem caused truncation failure.
118117
119118 if (!_ptr->is_leader ()) {
120- for (auto & n : notifications ) {
119+ for (auto & n : item_cache ) {
121120 n->_promise .set_value (errc::not_leader);
122121 }
123122 return ss::make_ready_future<>();
124123 }
125124
126125 auto meta = _ptr->meta ();
127126 auto const term = model::term_id (meta.term );
128- for (auto & b : data) {
129- b.set_term (term);
127+ ss::circular_buffer<model::record_batch> data;
128+ std::vector<item_ptr> notifications;
129+
130+ for (auto & n : item_cache) {
131+ if (
132+ !n->expected_term .has_value ()
133+ || n->expected_term .value () == term) {
134+ for (auto & b : n->data ) {
135+ b.set_term (term);
136+ data.push_back (std::move (b));
137+ }
138+ notifications.push_back (std::move (n));
139+ } else {
140+ n->_promise .set_value (errc::not_leader);
141+ }
142+ }
143+
144+ if (notifications.empty ()) {
145+ return ss::now ();
130146 }
147+
131148 auto seqs = _ptr->next_followers_request_seq ();
132149 append_entries_request req (
133150 _ptr->_self ,
@@ -141,6 +158,7 @@ ss::future<> replicate_batcher::flush() {
141158 });
142159 });
143160}
161+
144162static void propagate_result (
145163 result<replicate_result> r,
146164 std::vector<replicate_batcher::item_ptr>& notifications) {
0 commit comments