Skip to content

Commit e64faa7

Browse files
authored
feat(make_idempotent): support making check_and_set request idempotent in pegasus_write_service (#2239)
#2197 Implement higher-level APIs `make_idempotent()` and `put()` on `pegasus_write_service` for `check_and_set` request. Both of them will Internally call APIs provided by `pegasus_write_service::impl`. Two metrics are also introduced to measure the duration that make `incr` and `check_and_set` idempotent.
1 parent 6a40395 commit e64faa7

File tree

2 files changed

+80
-37
lines changed

2 files changed

+80
-37
lines changed

src/server/pegasus_write_service.cpp

Lines changed: 60 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,24 @@ METRIC_DEFINE_counter(replica,
8181
dsn::metric_unit::kRequests,
8282
"The number of CHECK_AND_MUTATE requests");
8383

84+
METRIC_DEFINE_percentile_int64(replica,
85+
make_incr_idempotent_latency_ns,
86+
dsn::metric_unit::kNanoSeconds,
87+
"The duration that an incr request is made idempotent, "
88+
"including reading the current value from storage engine, "
89+
"increasing it by a given amount and translating the incr "
90+
"request into the single-put request. Only used for the "
91+
"primary replicas");
92+
93+
METRIC_DEFINE_percentile_int64(replica,
94+
make_check_and_set_idempotent_latency_ns,
95+
dsn::metric_unit::kNanoSeconds,
96+
"The duration that a check_and_set request is made "
97+
"idempotent, including reading the check value from "
98+
"storage engine, validating the check conditions and "
99+
"translating the check_and_set request into the single-put "
100+
"request. Only used for the primary replicas");
101+
84102
METRIC_DEFINE_percentile_int64(replica,
85103
put_latency_ns,
86104
dsn::metric_unit::kNanoSeconds,
@@ -152,7 +170,6 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
152170
_server(server),
153171
_impl(new impl(server)),
154172
_batch_start_time(0),
155-
_make_incr_idempotent_duration_ns(0),
156173
_cu_calculator(server->_cu_calculator.get()),
157174
METRIC_VAR_INIT_replica(put_requests),
158175
METRIC_VAR_INIT_replica(multi_put_requests),
@@ -161,6 +178,8 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
161178
METRIC_VAR_INIT_replica(incr_requests),
162179
METRIC_VAR_INIT_replica(check_and_set_requests),
163180
METRIC_VAR_INIT_replica(check_and_mutate_requests),
181+
METRIC_VAR_INIT_replica(make_incr_idempotent_latency_ns),
182+
METRIC_VAR_INIT_replica(make_check_and_set_idempotent_latency_ns),
164183
METRIC_VAR_INIT_replica(put_latency_ns),
165184
METRIC_VAR_INIT_replica(multi_put_latency_ns),
166185
METRIC_VAR_INIT_replica(remove_latency_ns),
@@ -173,7 +192,8 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
173192
METRIC_VAR_INIT_replica(dup_lagging_writes),
174193
_put_batch_size(0),
175194
_remove_batch_size(0),
176-
_incr_batch_size(0)
195+
_incr_batch_size(0),
196+
_check_and_set_batch_size(0)
177197
{
178198
}
179199

@@ -217,22 +237,16 @@ int pegasus_write_service::make_idempotent(const dsn::apps::incr_request &req,
217237
dsn::apps::incr_response &err_resp,
218238
dsn::apps::update_request &update)
219239
{
220-
const uint64_t start_time = dsn_now_ns();
221-
222-
const int err = _impl->make_idempotent(req, err_resp, update);
223-
224-
// Calculate the duration that an incr request is translated into an idempotent put request.
225-
_make_incr_idempotent_duration_ns = dsn_now_ns() - start_time;
240+
METRIC_VAR_AUTO_LATENCY(make_incr_idempotent_latency_ns);
226241

227-
return err;
242+
return _impl->make_idempotent(req, err_resp, update);
228243
}
229244

230245
int pegasus_write_service::put(const db_write_context &ctx,
231246
const dsn::apps::update_request &update,
232247
dsn::apps::incr_response &resp)
233248
{
234-
// The total latency should also include the duration of the translation.
235-
METRIC_VAR_AUTO_LATENCY(incr_latency_ns, dsn_now_ns() - _make_incr_idempotent_duration_ns);
249+
METRIC_VAR_AUTO_LATENCY(incr_latency_ns);
236250
METRIC_VAR_INCREMENT(incr_requests);
237251

238252
const int err = _impl->put(ctx, update, resp);
@@ -260,6 +274,33 @@ int pegasus_write_service::incr(int64_t decree,
260274
return err;
261275
}
262276

277+
int pegasus_write_service::make_idempotent(const dsn::apps::check_and_set_request &req,
278+
dsn::apps::check_and_set_response &err_resp,
279+
dsn::apps::update_request &update)
280+
{
281+
METRIC_VAR_AUTO_LATENCY(make_check_and_set_idempotent_latency_ns);
282+
283+
return _impl->make_idempotent(req, err_resp, update);
284+
}
285+
286+
int pegasus_write_service::put(const db_write_context &ctx,
287+
const dsn::apps::update_request &update,
288+
const dsn::apps::check_and_set_request &req,
289+
dsn::apps::check_and_set_response &resp)
290+
{
291+
METRIC_VAR_AUTO_LATENCY(check_and_set_latency_ns);
292+
METRIC_VAR_INCREMENT(check_and_set_requests);
293+
294+
const int err = _impl->put(ctx, update, resp);
295+
296+
if (_server->is_primary()) {
297+
_cu_calculator->add_check_and_set_cu(
298+
resp.error, req.hash_key, req.check_sort_key, req.set_sort_key, req.set_value);
299+
}
300+
301+
return err;
302+
}
303+
263304
int pegasus_write_service::check_and_set(int64_t decree,
264305
const dsn::apps::check_and_set_request &update,
265306
dsn::apps::check_and_set_response &resp)
@@ -325,10 +366,12 @@ int pegasus_write_service::batch_put(const db_write_context &ctx,
325366
// request (i.e. the atomic write).
326367
if (update.type == dsn::apps::update_type::UT_INCR) {
327368
++_incr_batch_size;
369+
} else if (update.type == dsn::apps::update_type::UT_CHECK_AND_SET) {
370+
++_check_and_set_batch_size;
328371
}
329372
}
330373

331-
int err = _impl->batch_put(ctx, update, resp);
374+
const int err = _impl->batch_put(ctx, update, resp);
332375

333376
if (_server->is_primary()) {
334377
_cu_calculator->add_put_cu(resp.error, update.key, update.value);
@@ -344,7 +387,7 @@ int pegasus_write_service::batch_remove(int64_t decree,
344387
CHECK_GT_MSG(_batch_start_time, 0, "batch_remove must be called after batch_prepare");
345388

346389
++_remove_batch_size;
347-
int err = _impl->batch_remove(decree, key, resp);
390+
const int err = _impl->batch_remove(decree, key, resp);
348391

349392
if (_server->is_primary()) {
350393
_cu_calculator->add_remove_cu(resp.error, key);
@@ -357,7 +400,7 @@ int pegasus_write_service::batch_commit(int64_t decree)
357400
{
358401
CHECK_GT_MSG(_batch_start_time, 0, "batch_commit must be called after batch_prepare");
359402

360-
int err = _impl->batch_commit(decree);
403+
const int err = _impl->batch_commit(decree);
361404
batch_finish();
362405
return err;
363406
}
@@ -389,10 +432,10 @@ void pegasus_write_service::batch_finish()
389432
UPDATE_WRITE_BATCH_METRICS(put);
390433
UPDATE_WRITE_BATCH_METRICS(remove);
391434

392-
// Since the duration of translation is unknown for both possible situations where these
393-
// put requests are actually translated from atomic requests (see comments in batch_put()),
394-
// there's no need to add `_make_incr_idempotent_duration_ns` to the total latency.
435+
// These put requests are translated from atomic requests. See comments in batch_put()
436+
// for the two possible situations where we are now.
395437
UPDATE_WRITE_BATCH_METRICS(incr);
438+
UPDATE_WRITE_BATCH_METRICS(check_and_set);
396439

397440
_batch_start_time = 0;
398441

src/server/pegasus_write_service.h

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,19 @@ class pegasus_write_service : dsn::replication::replica_base
151151
// Write a non-idempotent INCR record.
152152
int incr(int64_t decree, const dsn::apps::incr_request &update, dsn::apps::incr_response &resp);
153153

154+
// Translate a CHECK_AND_SET request into an idempotent PUT request. Only called by
155+
// primary replicas.
156+
int make_idempotent(const dsn::apps::check_and_set_request &req,
157+
dsn::apps::check_and_set_response &err_resp,
158+
dsn::apps::update_request &update);
159+
160+
// Write an idempotent CHECK_AND_SET record (i.e. a PUT record) and reply to the client
161+
// with CHECK_AND_SET response. Only called by primary replicas.
162+
int put(const db_write_context &ctx,
163+
const dsn::apps::update_request &update,
164+
const dsn::apps::check_and_set_request &req,
165+
dsn::apps::check_and_set_response &resp);
166+
154167
// Write CHECK_AND_SET record.
155168
int check_and_set(int64_t decree,
156169
const dsn::apps::check_and_set_request &update,
@@ -217,26 +230,6 @@ class pegasus_write_service : dsn::replication::replica_base
217230

218231
uint64_t _batch_start_time;
219232

220-
// Only used for primary replica to calculate the duration that an incr request from
221-
// the client is translated into an idempotent put request before appended to plog,
222-
// including reading the current value from RocksDB and incrementing it by a given
223-
// amount.
224-
//
225-
// This variable is defined as per-replica rather than per-request, for the reason
226-
// that the current design for implementing idempotence is to make sure there is only
227-
// one atomic request being processed in the write pipeline for each replica. This
228-
// pipeline consists of the following stages:
229-
// (1) read the current value from RocksDB and built the idempotent request based on
230-
// it;
231-
// (2) append the corresponding mutation to plog;
232-
// (3) broadcast the prepare requests;
233-
// (4) apply the result for atomic operation back to RocksDB ultimately.
234-
// For a request, this variable will be set in stage (1) and read in stage (4); since
235-
// there is only one request in the pipeline, this variable is guaranteed not to be
236-
// set for another request before stage (4) is finished. Therefore, it is safe to
237-
// define this variable as per-replica.
238-
uint64_t _make_incr_idempotent_duration_ns;
239-
240233
capacity_unit_calculator *_cu_calculator;
241234

242235
METRIC_VAR_DECLARE_counter(put_requests);
@@ -247,6 +240,9 @@ class pegasus_write_service : dsn::replication::replica_base
247240
METRIC_VAR_DECLARE_counter(check_and_set_requests);
248241
METRIC_VAR_DECLARE_counter(check_and_mutate_requests);
249242

243+
METRIC_VAR_DECLARE_percentile_int64(make_incr_idempotent_latency_ns);
244+
METRIC_VAR_DECLARE_percentile_int64(make_check_and_set_idempotent_latency_ns);
245+
250246
METRIC_VAR_DECLARE_percentile_int64(put_latency_ns);
251247
METRIC_VAR_DECLARE_percentile_int64(multi_put_latency_ns);
252248
METRIC_VAR_DECLARE_percentile_int64(remove_latency_ns);
@@ -269,6 +265,10 @@ class pegasus_write_service : dsn::replication::replica_base
269265
// in batch applied into RocksDB for metrics.
270266
uint32_t _incr_batch_size;
271267

268+
// Measure the size of check_and_set requests (with each translated into an idempotent put
269+
// request) in batch applied into RocksDB for metrics.
270+
uint32_t _check_and_set_batch_size;
271+
272272
// TODO(wutao1): add metrics for failed rpc.
273273
};
274274

0 commit comments

Comments
 (0)