Skip to content
This repository was archived by the owner on Dec 8, 2021. It is now read-only.

Commit 10c27eb

Browse files
authored
feat: optimistic concurrency control loop for IAM (#1162)
Adds functions to DatabaseAdminClient and InstanceAdminClient that run a optimistic concurrency control (OCC) loop for IAM updates. That makes the customer code a bit easier to write and less error prone. This is analogous to the Commit() loops for the spanner::Client.
1 parent 4c7f5df commit 10c27eb

12 files changed

+558
-65
lines changed

google/cloud/spanner/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ add_library(
117117
database_admin_connection.h
118118
date.cc
119119
date.h
120+
iam_updater.h
120121
instance.cc
121122
instance.h
122123
instance_admin_client.cc

google/cloud/spanner/database_admin_client.cc

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,54 @@ StatusOr<google::iam::v1::Policy> DatabaseAdminClient::SetIamPolicy(
6464
return conn_->SetIamPolicy({std::move(db), std::move(policy)});
6565
}
6666

67+
StatusOr<google::iam::v1::Policy> DatabaseAdminClient::SetIamPolicy(
68+
Database const& db, IamUpdater const& updater) {
69+
auto const rerun_maximum_duration = std::chrono::minutes(15);
70+
auto default_rerun_policy =
71+
LimitedTimeTransactionRerunPolicy(rerun_maximum_duration).clone();
72+
73+
auto const backoff_initial_delay = std::chrono::milliseconds(1000);
74+
auto const backoff_maximum_delay = std::chrono::minutes(5);
75+
auto const backoff_scaling = 2.0;
76+
auto default_backoff_policy =
77+
ExponentialBackoffPolicy(backoff_initial_delay, backoff_maximum_delay,
78+
backoff_scaling)
79+
.clone();
80+
81+
return SetIamPolicy(db, updater, std::move(default_rerun_policy),
82+
std::move(default_backoff_policy));
83+
}
84+
85+
StatusOr<google::iam::v1::Policy> DatabaseAdminClient::SetIamPolicy(
86+
Database const& db, IamUpdater const& updater,
87+
std::unique_ptr<TransactionRerunPolicy> rerun_policy,
88+
std::unique_ptr<BackoffPolicy> backoff_policy) {
89+
using RerunnablePolicy = internal::SafeTransactionRerun;
90+
91+
Status last_status;
92+
do {
93+
auto current_policy = GetIamPolicy(db);
94+
if (!current_policy) {
95+
last_status = std::move(current_policy).status();
96+
} else {
97+
auto etag = current_policy->etag();
98+
auto desired = updater(*current_policy);
99+
if (!desired.has_value()) {
100+
return current_policy;
101+
}
102+
desired->set_etag(std::move(etag));
103+
auto result = SetIamPolicy(db, *std::move(desired));
104+
if (RerunnablePolicy::IsOk(result.status())) {
105+
return result;
106+
}
107+
last_status = std::move(result).status();
108+
}
109+
if (!rerun_policy->OnFailure(last_status)) break;
110+
std::this_thread::sleep_for(backoff_policy->OnCompletion());
111+
} while (!rerun_policy->IsExhausted());
112+
return last_status;
113+
}
114+
67115
StatusOr<google::iam::v1::TestIamPermissionsResponse>
68116
DatabaseAdminClient::TestIamPermissions(Database db,
69117
std::vector<std::string> permissions) {

google/cloud/spanner/database_admin_client.h

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "google/cloud/spanner/connection_options.h"
1919
#include "google/cloud/spanner/database.h"
2020
#include "google/cloud/spanner/database_admin_connection.h"
21+
#include "google/cloud/spanner/iam_updater.h"
2122
#include "google/cloud/spanner/instance.h"
2223
#include "google/cloud/future.h"
2324
#include "google/cloud/status_or.h"
@@ -231,9 +232,6 @@ class DatabaseAdminClient {
231232
* Therefore, the underlying RPCs are only retried if the field is set, and
232233
* the function returns the first RPC error in any other case.
233234
*
234-
* @par Example
235-
* @snippet samples.cc add-database-reader-on-database
236-
*
237235
* @see The [Cloud Spanner
238236
* documentation](https://cloud.google.com/spanner/docs/iam) for a
239237
* description of the roles and permissions supported by Cloud Spanner.
@@ -244,6 +242,50 @@ class DatabaseAdminClient {
244242
StatusOr<google::iam::v1::Policy> SetIamPolicy(
245243
Database db, google::iam::v1::Policy policy);
246244

245+
/**
246+
* Updates the IAM policy for an instance using an optimistic concurrency
247+
* control loop.
248+
*
249+
* This function repeatedly reads the current IAM policy in @p db, and then
250+
* calls the @p updater with the this policy. The @p updater returns an empty
251+
* optional if no changes are required, or it returns the new desired value
252+
* for the IAM policy. This function then updates the policy.
253+
*
254+
* Updating an IAM policy can fail with retryable errors or can be aborted
255+
* because there were simultaneous changes the to IAM policy. In these cases
256+
* this function reruns the loop until it succeeds.
257+
*
258+
* The function returns the final IAM policy, or an error if the rerun policy
259+
* for the underlying connection has expired.
260+
*
261+
* @par Idempotency
262+
* This function always sets the `etag` field on the policy, so the underlying
263+
* RPCs are retried automatically.
264+
*
265+
* @par Example
266+
* @snippet samples.cc add-database-reader-on-database
267+
*
268+
* @param db the identifier for the database where you want to change the IAM
269+
* policy.
270+
* @param updater a callback to modify the policy. Return an unset optional
271+
* to indicate that no changes to the policy are needed.
272+
*/
273+
StatusOr<google::iam::v1::Policy> SetIamPolicy(Database const& db,
274+
IamUpdater const& updater);
275+
276+
/**
277+
* @copydoc SetIamPolicy(Database const&,IamUpdater const&)
278+
*
279+
* @param rerun_policy controls for how long (or how many times) the updater
280+
* will be rerun after the IAM policy update aborts.
281+
* @param backoff_policy controls how long `SetIamPolicy` waits between
282+
* reruns.
283+
*/
284+
StatusOr<google::iam::v1::Policy> SetIamPolicy(
285+
Database const& db, IamUpdater const& updater,
286+
std::unique_ptr<TransactionRerunPolicy> rerun_policy,
287+
std::unique_ptr<BackoffPolicy> backoff_policy);
288+
247289
/**
248290
* Get the subset of the permissions the caller has on the given database.
249291
*

google/cloud/spanner/database_admin_client_test.cc

Lines changed: 123 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ namespace {
2525

2626
using ::google::cloud::spanner_mocks::MockDatabaseAdminConnection;
2727
using ::testing::_;
28+
using ::testing::AtLeast;
2829
using ::testing::ElementsAre;
30+
using ::testing::HasSubstr;
2931
namespace gcsa = ::google::spanner::admin::database::v1;
3032

3133
/// @test Verify DatabaseAdminClient uses CreateDatabase() correctly.
@@ -186,10 +188,130 @@ TEST(DatabaseAdminClientTest, SetIamPolicy) {
186188
return p.policy;
187189
});
188190
DatabaseAdminClient client(std::move(mock));
189-
auto response = client.SetIamPolicy(expected_db, {});
191+
auto response = client.SetIamPolicy(expected_db, google::iam::v1::Policy{});
190192
EXPECT_STATUS_OK(response);
191193
}
192194

195+
TEST(DatabaseAdminClientTest, SetIamPolicyOccGetFailure) {
196+
Database const db("test-project", "test-instance", "test-database");
197+
auto mock = std::make_shared<MockDatabaseAdminConnection>();
198+
EXPECT_CALL(*mock, GetIamPolicy(_))
199+
.WillOnce([&db](DatabaseAdminConnection::GetIamPolicyParams const& p) {
200+
EXPECT_EQ(db, p.database);
201+
return Status(StatusCode::kPermissionDenied, "uh-oh");
202+
});
203+
204+
DatabaseAdminClient client(mock);
205+
auto actual = client.SetIamPolicy(db, [](google::iam::v1::Policy const&) {
206+
return optional<google::iam::v1::Policy>{};
207+
});
208+
EXPECT_EQ(StatusCode::kPermissionDenied, actual.status().code());
209+
}
210+
211+
TEST(DatabaseAdminClientTest, SetIamPolicyOccNoUpdates) {
212+
Database const db("test-project", "test-instance", "test-database");
213+
auto mock = std::make_shared<MockDatabaseAdminConnection>();
214+
EXPECT_CALL(*mock, GetIamPolicy(_))
215+
.WillOnce([&db](DatabaseAdminConnection::GetIamPolicyParams const& p) {
216+
EXPECT_EQ(db, p.database);
217+
google::iam::v1::Policy r;
218+
r.set_etag("test-etag");
219+
return r;
220+
});
221+
EXPECT_CALL(*mock, SetIamPolicy(_)).Times(0);
222+
223+
DatabaseAdminClient client(mock);
224+
auto actual = client.SetIamPolicy(db, [](google::iam::v1::Policy const& p) {
225+
EXPECT_EQ("test-etag", p.etag());
226+
return optional<google::iam::v1::Policy>{};
227+
});
228+
ASSERT_STATUS_OK(actual);
229+
EXPECT_EQ("test-etag", actual->etag());
230+
}
231+
232+
std::unique_ptr<TransactionRerunPolicy> RerunPolicyForTesting() {
233+
return LimitedErrorCountTransactionRerunPolicy(/*maximum_failures=*/3)
234+
.clone();
235+
}
236+
237+
std::unique_ptr<BackoffPolicy> BackoffPolicyForTesting() {
238+
return ExponentialBackoffPolicy(
239+
/*initial_delay=*/std::chrono::microseconds(1),
240+
/*maximum_delay=*/std::chrono::microseconds(1), /*scaling=*/2.0)
241+
.clone();
242+
}
243+
244+
TEST(DatabaseAdminClientTest, SetIamPolicyOccRetryAborted) {
245+
Database const db("test-project", "test-instance", "test-database");
246+
auto mock = std::make_shared<MockDatabaseAdminConnection>();
247+
EXPECT_CALL(*mock, GetIamPolicy(_))
248+
.WillOnce([&db](DatabaseAdminConnection::GetIamPolicyParams const& p) {
249+
EXPECT_EQ(db, p.database);
250+
google::iam::v1::Policy r;
251+
r.set_etag("test-etag-1");
252+
return r;
253+
})
254+
.WillOnce([&db](DatabaseAdminConnection::GetIamPolicyParams const& p) {
255+
EXPECT_EQ(db, p.database);
256+
google::iam::v1::Policy r;
257+
r.set_etag("test-etag-2");
258+
return r;
259+
});
260+
EXPECT_CALL(*mock, SetIamPolicy(_))
261+
.WillOnce([&db](DatabaseAdminConnection::SetIamPolicyParams const& p) {
262+
EXPECT_EQ(db, p.database);
263+
EXPECT_EQ("test-etag-1", p.policy.etag());
264+
return Status(StatusCode::kAborted, "aborted");
265+
})
266+
.WillOnce([&db](DatabaseAdminConnection::SetIamPolicyParams const& p) {
267+
EXPECT_EQ(db, p.database);
268+
EXPECT_EQ("test-etag-2", p.policy.etag());
269+
google::iam::v1::Policy r;
270+
r.set_etag("test-etag-3");
271+
return r;
272+
});
273+
274+
DatabaseAdminClient client(mock);
275+
int counter = 0;
276+
auto actual = client.SetIamPolicy(
277+
db,
278+
[&counter](google::iam::v1::Policy p) {
279+
EXPECT_EQ("test-etag-" + std::to_string(++counter), p.etag());
280+
return p;
281+
},
282+
RerunPolicyForTesting(), BackoffPolicyForTesting());
283+
ASSERT_STATUS_OK(actual);
284+
EXPECT_EQ("test-etag-3", actual->etag());
285+
}
286+
287+
TEST(DatabaseAdminClientTest, SetIamPolicyOccRetryAbortedTooManyFailures) {
288+
Database const db("test-project", "test-instance", "test-database");
289+
auto mock = std::make_shared<MockDatabaseAdminConnection>();
290+
EXPECT_CALL(*mock, GetIamPolicy(_))
291+
.WillRepeatedly(
292+
[&db](DatabaseAdminConnection::GetIamPolicyParams const& p) {
293+
EXPECT_EQ(db, p.database);
294+
google::iam::v1::Policy r;
295+
r.set_etag("test-etag-1");
296+
return r;
297+
});
298+
EXPECT_CALL(*mock, SetIamPolicy(_))
299+
.Times(AtLeast(2))
300+
.WillRepeatedly(
301+
[&db](DatabaseAdminConnection::SetIamPolicyParams const& p) {
302+
EXPECT_EQ(db, p.database);
303+
EXPECT_EQ("test-etag-1", p.policy.etag());
304+
return Status(StatusCode::kAborted, "test-msg");
305+
});
306+
307+
DatabaseAdminClient client(mock);
308+
auto actual = client.SetIamPolicy(
309+
db, [](google::iam::v1::Policy p) { return p; }, RerunPolicyForTesting(),
310+
BackoffPolicyForTesting());
311+
EXPECT_EQ(StatusCode::kAborted, actual.status().code());
312+
EXPECT_THAT(actual.status().message(), HasSubstr("test-msg"));
313+
}
314+
193315
/// @test Verify DatabaseAdminClient uses TestIamPermissions() correctly.
194316
TEST(DatabaseAdminClientTest, TestIamPermissions) {
195317
auto mock = std::make_shared<MockDatabaseAdminConnection>();

google/cloud/spanner/iam_updater.h

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Copyright 2019 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef GOOGLE_CLOUD_CPP_SPANNER_GOOGLE_CLOUD_SPANNER_IAM_UPDATER_H_
16+
#define GOOGLE_CLOUD_CPP_SPANNER_GOOGLE_CLOUD_SPANNER_IAM_UPDATER_H_
17+
18+
#include "google/cloud/optional.h"
19+
#include <google/iam/v1/policy.pb.h>
20+
#include <functional>
21+
22+
namespace google {
23+
namespace cloud {
24+
namespace spanner {
25+
inline namespace SPANNER_CLIENT_NS {
26+
27+
using IamUpdater =
28+
std::function<optional<google::iam::v1::Policy>(google::iam::v1::Policy)>;
29+
30+
} // namespace SPANNER_CLIENT_NS
31+
} // namespace spanner
32+
} // namespace cloud
33+
} // namespace google
34+
35+
#endif // GOOGLE_CLOUD_CPP_SPANNER_GOOGLE_CLOUD_SPANNER_IAM_UPDATER_H_

google/cloud/spanner/instance_admin_client.cc

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,54 @@ StatusOr<google::iam::v1::Policy> InstanceAdminClient::SetIamPolicy(
6767
return conn_->SetIamPolicy({in.FullName(), std::move(policy)});
6868
}
6969

70+
StatusOr<google::iam::v1::Policy> InstanceAdminClient::SetIamPolicy(
71+
Instance const& in, IamUpdater const& updater) {
72+
auto const rerun_maximum_duration = std::chrono::minutes(15);
73+
auto default_rerun_policy =
74+
LimitedTimeTransactionRerunPolicy(rerun_maximum_duration).clone();
75+
76+
auto const backoff_initial_delay = std::chrono::milliseconds(1000);
77+
auto const backoff_maximum_delay = std::chrono::minutes(5);
78+
auto const backoff_scaling = 2.0;
79+
auto default_backoff_policy =
80+
ExponentialBackoffPolicy(backoff_initial_delay, backoff_maximum_delay,
81+
backoff_scaling)
82+
.clone();
83+
84+
return SetIamPolicy(in, updater, std::move(default_rerun_policy),
85+
std::move(default_backoff_policy));
86+
}
87+
88+
StatusOr<google::iam::v1::Policy> InstanceAdminClient::SetIamPolicy(
89+
Instance const& in, IamUpdater const& updater,
90+
std::unique_ptr<TransactionRerunPolicy> rerun_policy,
91+
std::unique_ptr<BackoffPolicy> backoff_policy) {
92+
using RerunnablePolicy = internal::SafeTransactionRerun;
93+
94+
Status last_status;
95+
do {
96+
auto current_policy = GetIamPolicy(in);
97+
if (!current_policy) {
98+
last_status = std::move(current_policy).status();
99+
} else {
100+
auto etag = current_policy->etag();
101+
auto desired = updater(*current_policy);
102+
if (!desired.has_value()) {
103+
return current_policy;
104+
}
105+
desired->set_etag(std::move(etag));
106+
auto result = SetIamPolicy(in, *std::move(desired));
107+
if (RerunnablePolicy::IsOk(result.status())) {
108+
return result;
109+
}
110+
last_status = std::move(result).status();
111+
}
112+
if (!rerun_policy->OnFailure(last_status)) break;
113+
std::this_thread::sleep_for(backoff_policy->OnCompletion());
114+
} while (!rerun_policy->IsExhausted());
115+
return last_status;
116+
}
117+
70118
StatusOr<google::iam::v1::TestIamPermissionsResponse>
71119
InstanceAdminClient::TestIamPermissions(Instance const& in,
72120
std::vector<std::string> permissions) {

0 commit comments

Comments
 (0)