Skip to content

Commit a376dc7

Browse files
Merge pull request #35 from julianleonard123/materialize-client
Initial commit of Materialize Client example.
2 parents cb9bf97 + 2c0f942 commit a376dc7

File tree

5 files changed

+321
-0
lines changed

5 files changed

+321
-0
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
cursor.txt
2+
zed_token.txt
3+
/target/**
4+
.idea/
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>com.authzed</groupId>
8+
<artifactId>materialize-client-example</artifactId>
9+
<version>1.0-SNAPSHOT</version>
10+
11+
<properties>
12+
<maven.compiler.source>21</maven.compiler.source>
13+
<maven.compiler.target>21</maven.compiler.target>
14+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
15+
</properties>
16+
17+
<dependencies>
18+
<dependency>
19+
<groupId>com.authzed.api</groupId>
20+
<artifactId>authzed</artifactId>
21+
<version>1.3.1</version>
22+
</dependency>
23+
<dependency>
24+
<groupId>io.grpc</groupId>
25+
<artifactId>grpc-stub</artifactId>
26+
<version>1.72.0</version>
27+
</dependency>
28+
<dependency>
29+
<groupId>io.grpc</groupId>
30+
<artifactId>grpc-api</artifactId>
31+
<version>1.72.0</version>
32+
</dependency>
33+
<dependency>
34+
<groupId>io.grpc</groupId>
35+
<artifactId>grpc-netty-shaded</artifactId>
36+
<version>1.72.0</version>
37+
</dependency>
38+
</dependencies>
39+
</project>
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package com.authzed;
2+
3+
import com.authzed.api.materialize.v0.Cursor;
4+
import com.authzed.api.materialize.v0.LookupPermissionSetsResponse;
5+
import com.authzed.api.materialize.v0.PermissionSetChange;
6+
import io.grpc.StatusRuntimeException;
7+
import io.grpc.stub.StreamObserver;
8+
9+
import java.util.concurrent.CountDownLatch;
10+
import java.util.concurrent.atomic.AtomicBoolean;
11+
import java.util.concurrent.atomic.AtomicInteger;
12+
13+
import static com.authzed.MaterializeClient.printCounts;
14+
import static com.authzed.MaterializeClient.writeCursorToFile;
15+
16+
17+
public class LookupPermissionSetsStreamObserver implements StreamObserver<LookupPermissionSetsResponse> {
18+
private final Cursor[] cursor = {null};
19+
private final CountDownLatch[] pageLatch;
20+
private final int pageLimit;
21+
private final AtomicBoolean lastPage;
22+
private final AtomicInteger recordCount = new AtomicInteger(0);
23+
private final AtomicInteger overallRecordCount = new AtomicInteger(0);
24+
private final AtomicInteger memberToSetRecordCount = new AtomicInteger(0);
25+
private final AtomicInteger setToSetRecordCount = new AtomicInteger(0);
26+
27+
public LookupPermissionSetsStreamObserver(CountDownLatch[] pageLatch, AtomicBoolean lastPage, int pageLimit) {
28+
this.pageLatch = pageLatch;
29+
this.lastPage = lastPage;
30+
this.pageLimit = pageLimit;
31+
}
32+
33+
@Override
34+
public void onNext(LookupPermissionSetsResponse response) {
35+
cursor[0] = response.getCursor();
36+
writeCursorToFile(cursor[0]);
37+
38+
overallRecordCount.incrementAndGet();
39+
recordCount.incrementAndGet();
40+
41+
PermissionSetChange permissionSetChange = response.getChange();
42+
if (permissionSetChange.getOperation() == PermissionSetChange.SetOperation.SET_OPERATION_ADDED) {
43+
if (permissionSetChange.hasChildMember()) {
44+
memberToSetRecordCount.incrementAndGet();
45+
} else {
46+
setToSetRecordCount.incrementAndGet();
47+
}
48+
}
49+
}
50+
51+
@Override
52+
public void onError(Throwable throwable) {
53+
System.out.printf("Lookup permission sets api error %s%n",
54+
throwable instanceof StatusRuntimeException ?
55+
((StatusRuntimeException) throwable).getStatus() : throwable.getMessage());
56+
if (cursor[0] != null) {
57+
System.out.println("Current cursor: " + cursor[0]);
58+
}
59+
}
60+
61+
@Override
62+
public void onCompleted() {
63+
if (recordCount.get() == this.pageLimit) {
64+
printCounts(memberToSetRecordCount, overallRecordCount, recordCount, setToSetRecordCount);
65+
recordCount.set(0);
66+
} else {
67+
System.out.printf("Less than full page received, all pages complete. Record count: %s\n", recordCount.get());
68+
printCounts(memberToSetRecordCount, overallRecordCount, recordCount, setToSetRecordCount);
69+
lastPage.set(true);
70+
}
71+
pageLatch[0].countDown();
72+
}
73+
74+
75+
}
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
package com.authzed;
2+
3+
import com.authzed.api.materialize.v0.Cursor;
4+
import com.authzed.api.materialize.v0.LookupPermissionSetsRequest;
5+
import com.authzed.api.materialize.v0.WatchPermissionSetsRequest;
6+
import com.authzed.api.materialize.v0.WatchPermissionSetsServiceGrpc;
7+
import com.authzed.api.v1.ZedToken;
8+
import com.authzed.grpcutil.BearerToken;
9+
import com.google.protobuf.TextFormat;
10+
import io.grpc.ManagedChannel;
11+
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
12+
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
13+
import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
14+
15+
import javax.net.ssl.SSLException;
16+
import java.io.FileWriter;
17+
import java.io.IOException;
18+
import java.nio.file.Files;
19+
import java.nio.file.Paths;
20+
import java.util.concurrent.CountDownLatch;
21+
import java.util.concurrent.atomic.AtomicBoolean;
22+
import java.util.concurrent.atomic.AtomicInteger;
23+
24+
public class MaterializeClient {
25+
26+
private static final String BEARER_TOKEN = "some token";
27+
private static final String SERVER_HOST = "localhost";
28+
private static final int SERVER_PORT = 50054;
29+
private static final String CURSOR_FILE = "cursor.txt";
30+
private static final String ZED_TOKEN_FILE = "zed_token.txt";
31+
private static final int PAGE_LIMIT = 10000;
32+
33+
public static void main(String[] args) throws InterruptedException, SSLException {
34+
35+
WatchPermissionSetsServiceGrpc.WatchPermissionSetsServiceStub stub = buildWatchPermissionSetsServiceStub();
36+
37+
System.out.println("Starting LookupPermissionSets...");
38+
lookupPermissionSets(stub);
39+
40+
System.out.println("Starting WatchPermissionSets...");
41+
watchPermissionSets(stub);
42+
}
43+
44+
private static void lookupPermissionSets(WatchPermissionSetsServiceGrpc.WatchPermissionSetsServiceStub stub) throws InterruptedException {
45+
final CountDownLatch[] pageLatch = {new CountDownLatch(1)};
46+
AtomicBoolean lastPage = new AtomicBoolean(false);
47+
48+
LookupPermissionSetsStreamObserver responseStreamObserver = new LookupPermissionSetsStreamObserver(
49+
pageLatch, lastPage, PAGE_LIMIT);
50+
51+
LookupPermissionSetsRequest request = LookupPermissionSetsRequest.newBuilder().setLimit(PAGE_LIMIT)
52+
.build();
53+
54+
// Initial call to LPS, without any cursor.
55+
stub.lookupPermissionSets(request, responseStreamObserver);
56+
pageLatch[0].await();
57+
58+
// Page through results, using cursor to iterate through pages.
59+
while (!lastPage.get()) {
60+
pageLatch[0] = new CountDownLatch(1);
61+
request = LookupPermissionSetsRequest.newBuilder().setLimit(PAGE_LIMIT).setOptionalStartingAfterCursor(readCursorFromFile())
62+
.build();
63+
stub.lookupPermissionSets(request, responseStreamObserver);
64+
pageLatch[0].await();
65+
}
66+
}
67+
68+
private static void watchPermissionSets(WatchPermissionSetsServiceGrpc.WatchPermissionSetsServiceStub stub) {
69+
70+
WatchPermissionSetsStreamObserver responseStreamObserver = new WatchPermissionSetsStreamObserver();
71+
72+
Cursor cursor = readCursorFromFile();
73+
74+
WatchPermissionSetsRequest.Builder requestBuilder = WatchPermissionSetsRequest.newBuilder();
75+
if (cursor != null) {
76+
requestBuilder.setOptionalStartingAfter(cursor.getToken());
77+
}
78+
WatchPermissionSetsRequest request = requestBuilder.build();
79+
80+
stub.watchPermissionSets(request, responseStreamObserver);
81+
82+
// Keep the main thread alive to maintain the stream
83+
try {
84+
Thread.sleep(Long.MAX_VALUE);
85+
} catch (InterruptedException e) {
86+
System.out.println("Stream interrupted");
87+
}
88+
89+
}
90+
91+
private static WatchPermissionSetsServiceGrpc.WatchPermissionSetsServiceStub buildWatchPermissionSetsServiceStub() throws SSLException {
92+
BearerToken bearerToken = new BearerToken(BEARER_TOKEN);
93+
ManagedChannel channel = NettyChannelBuilder.forAddress(SERVER_HOST, SERVER_PORT)
94+
.sslContext(GrpcSslContexts.forClient()
95+
.trustManager(InsecureTrustManagerFactory.INSTANCE)
96+
.build())
97+
.build();
98+
99+
return WatchPermissionSetsServiceGrpc.newStub(channel)
100+
.withCallCredentials(bearerToken);
101+
}
102+
103+
private static Cursor readCursorFromFile() {
104+
try {
105+
String content = Files.readString(Paths.get(CURSOR_FILE));
106+
Cursor.Builder cursorBuilder = Cursor.newBuilder();
107+
TextFormat.merge(content, cursorBuilder);
108+
return cursorBuilder.build();
109+
} catch (Exception e) {
110+
System.out.println("Could not read or parse cursor.txt file: " + e.getMessage());
111+
return null;
112+
}
113+
}
114+
115+
static void writeCursorToFile(Cursor cursor) {
116+
try (FileWriter writer = new FileWriter(CURSOR_FILE, false)) {
117+
writer.write(cursor.toString());
118+
} catch (IOException e) {
119+
System.err.println("Error writing cursor to file: " + e.getMessage());
120+
}
121+
}
122+
123+
static void writeZedTokenToFile(ZedToken zedToken) {
124+
try (FileWriter writer = new FileWriter(ZED_TOKEN_FILE, false)) {
125+
writer.write(zedToken.toString());
126+
} catch (IOException e) {
127+
System.err.println("Error writing zed token to file: " + e.getMessage());
128+
}
129+
}
130+
131+
private static ZedToken readZedTokenFromFile() {
132+
try {
133+
String content = Files.readString(Paths.get(CURSOR_FILE));
134+
ZedToken.Builder zedTokenBuilder = ZedToken.newBuilder();
135+
TextFormat.merge(content, zedTokenBuilder);
136+
return zedTokenBuilder.build();
137+
} catch (Exception e) {
138+
System.out.println("Could not read or parse zed_token.txt file: " + e.getMessage());
139+
return null;
140+
}
141+
}
142+
143+
static void printCounts(AtomicInteger memberToSetRecordCount, AtomicInteger overallRecordCount, AtomicInteger recordCount, AtomicInteger setToSetRecordCount) {
144+
System.out.printf("RecordCount: %d. OverallRecordCount: %d. MemberToSetCount: %d. SetToSetCount: %d.\n",
145+
recordCount.get(),
146+
overallRecordCount.get(),
147+
memberToSetRecordCount.get(),
148+
setToSetRecordCount.get()
149+
);
150+
}
151+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package com.authzed;
2+
3+
import com.authzed.api.materialize.v0.WatchPermissionSetsResponse;
4+
import com.authzed.api.materialize.v0.PermissionSetChange;
5+
import com.authzed.api.v1.ZedToken;
6+
import io.grpc.StatusRuntimeException;
7+
import io.grpc.stub.StreamObserver;
8+
9+
import java.util.concurrent.atomic.AtomicInteger;
10+
11+
import static com.authzed.MaterializeClient.printCounts;
12+
import static com.authzed.MaterializeClient.writeZedTokenToFile;
13+
14+
15+
public class WatchPermissionSetsStreamObserver implements StreamObserver<WatchPermissionSetsResponse> {
16+
private final AtomicInteger recordCount = new AtomicInteger(0);
17+
private final AtomicInteger memberToSetRecordCount = new AtomicInteger(0);
18+
private final AtomicInteger setToSetRecordCount = new AtomicInteger(0);
19+
20+
@Override
21+
public void onNext(WatchPermissionSetsResponse response) {
22+
ZedToken zedToken = response.getCompletedRevision();
23+
writeZedTokenToFile(zedToken);
24+
25+
recordCount.incrementAndGet();
26+
27+
PermissionSetChange permissionSetChange = response.getChange();
28+
if (permissionSetChange.getOperation() == PermissionSetChange.SetOperation.SET_OPERATION_ADDED ||
29+
permissionSetChange.getOperation() == PermissionSetChange.SetOperation.SET_OPERATION_REMOVED) {
30+
if (permissionSetChange.hasChildMember()) {
31+
memberToSetRecordCount.incrementAndGet();
32+
printCounts(memberToSetRecordCount, recordCount, recordCount, setToSetRecordCount);
33+
} else {
34+
setToSetRecordCount.incrementAndGet();
35+
printCounts(memberToSetRecordCount, recordCount, recordCount, setToSetRecordCount);
36+
}
37+
}
38+
}
39+
40+
@Override
41+
public void onError(Throwable throwable) {
42+
System.out.printf("Watch permission sets api error %s%n",
43+
throwable instanceof StatusRuntimeException ?
44+
((StatusRuntimeException) throwable).getStatus() : throwable.getMessage());
45+
}
46+
47+
@Override
48+
public void onCompleted() {
49+
System.out.println("Stream completed");
50+
printCounts(memberToSetRecordCount, recordCount, recordCount, setToSetRecordCount);
51+
}
52+
}

0 commit comments

Comments
 (0)