Skip to content

Commit eabe900

Browse files
feat: Implement internal CursorClient which will be used by kafka shim. (#252)
1 parent 4111f09 commit eabe900

File tree

5 files changed

+583
-0
lines changed

5 files changed

+583
-0
lines changed
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.pubsublite.internal;
17+
18+
import com.google.api.core.ApiFuture;
19+
import com.google.api.gax.core.BackgroundResource;
20+
import com.google.cloud.pubsublite.CloudRegion;
21+
import com.google.cloud.pubsublite.Offset;
22+
import com.google.cloud.pubsublite.Partition;
23+
import com.google.cloud.pubsublite.SubscriptionPath;
24+
import io.grpc.StatusException;
25+
import java.util.Map;
26+
27+
public interface CursorClient extends BackgroundResource {
28+
29+
static CursorClient create(CursorClientSettings settings) throws StatusException {
30+
return settings.instantiate();
31+
}
32+
33+
/** The Google Cloud region this client operates on. */
34+
CloudRegion region();
35+
36+
/**
37+
* List the cursors for a given subscription.
38+
*
39+
* @param path The subscription to list cursors for.
40+
* @return A future holding the map of Partition to Offset of the cursors.
41+
*/
42+
ApiFuture<Map<Partition, Offset>> listPartitionCursors(SubscriptionPath path);
43+
44+
/**
45+
* Commit a single cursor.
46+
*
47+
* @param path The subscription to commit a cursor for.
48+
* @param partition The partition to commit a cursor for.
49+
* @param offset The offset to commit.
50+
* @return A future for the operation's completion.
51+
*/
52+
ApiFuture<Void> commitCursor(SubscriptionPath path, Partition partition, Offset offset);
53+
54+
/**
55+
* Tear down this admin client.
56+
*
57+
* @throws StatusException on a failure to properly terminate.
58+
*/
59+
@Override
60+
void close() throws StatusException;
61+
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.pubsublite.internal;
17+
18+
import com.google.api.core.ApiFuture;
19+
import com.google.api.gax.core.BackgroundResource;
20+
import com.google.api.gax.core.ExecutorAsBackgroundResource;
21+
import com.google.api.gax.retrying.RetrySettings;
22+
import com.google.api.gax.retrying.RetryingExecutor;
23+
import com.google.cloud.pubsublite.CloudRegion;
24+
import com.google.cloud.pubsublite.Offset;
25+
import com.google.cloud.pubsublite.Partition;
26+
import com.google.cloud.pubsublite.SubscriptionPath;
27+
import com.google.cloud.pubsublite.proto.CommitCursorRequest;
28+
import com.google.cloud.pubsublite.proto.CommitCursorResponse;
29+
import com.google.cloud.pubsublite.proto.Cursor;
30+
import com.google.cloud.pubsublite.proto.CursorServiceGrpc.CursorServiceBlockingStub;
31+
import com.google.cloud.pubsublite.proto.ListPartitionCursorsRequest;
32+
import com.google.cloud.pubsublite.proto.ListPartitionCursorsResponse;
33+
import com.google.cloud.pubsublite.proto.PartitionCursor;
34+
import com.google.common.collect.ImmutableMap;
35+
import io.grpc.StatusException;
36+
import java.util.Map;
37+
import java.util.concurrent.Executors;
38+
import java.util.concurrent.ScheduledExecutorService;
39+
import java.util.concurrent.TimeUnit;
40+
41+
public class CursorClientImpl implements BackgroundResource, CursorClient {
42+
private final ExecutorAsBackgroundResource executorResource;
43+
private final CloudRegion region;
44+
private final CursorServiceBlockingStub stub;
45+
private final RetryingExecutor<Map<Partition, Offset>> listRetryingExecutor;
46+
private final RetryingExecutor<Void> voidRetryingExecutor;
47+
48+
public CursorClientImpl(
49+
CloudRegion region, CursorServiceBlockingStub stub, RetrySettings retrySettings) {
50+
this(
51+
region,
52+
stub,
53+
retrySettings,
54+
// TODO: Consider allowing tuning in the future.
55+
Executors.newScheduledThreadPool(6));
56+
}
57+
58+
private CursorClientImpl(
59+
CloudRegion region,
60+
CursorServiceBlockingStub stub,
61+
RetrySettings retrySettings,
62+
ScheduledExecutorService executor) {
63+
this.executorResource = new ExecutorAsBackgroundResource(executor);
64+
this.region = region;
65+
this.stub = stub;
66+
this.listRetryingExecutor = RetryingExecutorUtil.retryingExecutor(retrySettings, executor);
67+
this.voidRetryingExecutor = RetryingExecutorUtil.retryingExecutor(retrySettings, executor);
68+
}
69+
70+
@Override
71+
public CloudRegion region() {
72+
return region;
73+
}
74+
75+
// BackgroundResource implementation.
76+
@Override
77+
public void shutdown() {
78+
executorResource.shutdown();
79+
}
80+
81+
@Override
82+
public boolean isShutdown() {
83+
return executorResource.isShutdown();
84+
}
85+
86+
@Override
87+
public boolean isTerminated() {
88+
return executorResource.isTerminated();
89+
}
90+
91+
@Override
92+
public void shutdownNow() {
93+
executorResource.shutdownNow();
94+
}
95+
96+
@Override
97+
public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
98+
return executorResource.awaitTermination(duration, unit);
99+
}
100+
101+
@Override
102+
public void close() throws StatusException {
103+
try {
104+
executorResource.close();
105+
} catch (Exception e) {
106+
throw ExtractStatus.toCanonical(e);
107+
}
108+
}
109+
110+
// CursorClient Implementation
111+
@Override
112+
public ApiFuture<Map<Partition, Offset>> listPartitionCursors(SubscriptionPath path) {
113+
return RetryingExecutorUtil.runWithRetries(
114+
() -> {
115+
ListPartitionCursorsResponse response =
116+
stub.listPartitionCursors(
117+
ListPartitionCursorsRequest.newBuilder().setParent(path.toString()).build());
118+
ImmutableMap.Builder<Partition, Offset> resultBuilder = ImmutableMap.builder();
119+
for (PartitionCursor partitionCursor : response.getPartitionCursorsList()) {
120+
resultBuilder.put(
121+
Partition.of(partitionCursor.getPartition()),
122+
Offset.of(partitionCursor.getCursor().getOffset()));
123+
}
124+
return resultBuilder.build();
125+
},
126+
listRetryingExecutor);
127+
}
128+
129+
@Override
130+
public ApiFuture<Void> commitCursor(SubscriptionPath path, Partition partition, Offset offset) {
131+
return RetryingExecutorUtil.runWithRetries(
132+
() -> {
133+
CommitCursorResponse unusedResponse =
134+
stub.commitCursor(
135+
CommitCursorRequest.newBuilder()
136+
.setSubscription(path.toString())
137+
.setPartition(partition.value())
138+
.setCursor(Cursor.newBuilder().setOffset(offset.value()))
139+
.build());
140+
return null;
141+
},
142+
voidRetryingExecutor);
143+
}
144+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.pubsublite.internal;
17+
18+
import com.google.api.gax.retrying.RetrySettings;
19+
import com.google.auto.value.AutoValue;
20+
import com.google.cloud.pubsublite.CloudRegion;
21+
import com.google.cloud.pubsublite.Constants;
22+
import com.google.cloud.pubsublite.Stubs;
23+
import com.google.cloud.pubsublite.proto.CursorServiceGrpc;
24+
import com.google.cloud.pubsublite.proto.CursorServiceGrpc.CursorServiceBlockingStub;
25+
import io.grpc.StatusException;
26+
import java.util.Optional;
27+
28+
@AutoValue
29+
public abstract class CursorClientSettings {
30+
31+
// Required parameters.
32+
abstract CloudRegion region();
33+
34+
// Optional parameters.
35+
abstract RetrySettings retrySettings();
36+
37+
abstract Optional<CursorServiceBlockingStub> stub();
38+
39+
public static Builder newBuilder() {
40+
return new AutoValue_CursorClientSettings.Builder()
41+
.setRetrySettings(Constants.DEFAULT_RETRY_SETTINGS);
42+
}
43+
44+
@AutoValue.Builder
45+
public abstract static class Builder {
46+
47+
// Required parameters.
48+
public abstract Builder setRegion(CloudRegion region);
49+
50+
public abstract Builder setRetrySettings(RetrySettings retrySettings);
51+
52+
// Optional parameters.
53+
public abstract Builder setStub(CursorServiceBlockingStub stub);
54+
55+
public abstract CursorClientSettings build();
56+
}
57+
58+
CursorClient instantiate() throws StatusException {
59+
CursorServiceBlockingStub stub;
60+
if (stub().isPresent()) {
61+
stub = stub().get();
62+
} else {
63+
stub = Stubs.defaultStub(region(), CursorServiceGrpc::newBlockingStub);
64+
}
65+
return new CursorClientImpl(region(), stub, retrySettings());
66+
}
67+
}

0 commit comments

Comments
 (0)