Skip to content

Commit f5d1454

Browse files
committed
add features for cli
1 parent 2c09395 commit f5d1454

File tree

4 files changed

+112
-43
lines changed

4 files changed

+112
-43
lines changed
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package com.datastax.astra.sdk.streaming;
2+
3+
import java.util.function.Supplier;
4+
5+
import org.apache.pulsar.client.admin.PulsarAdmin;
6+
import org.apache.pulsar.client.api.AuthenticationFactory;
7+
import org.apache.pulsar.client.api.PulsarClientException;
8+
9+
import com.datastax.astra.sdk.streaming.domain.Tenant;
10+
11+
/**
12+
* Delegate PulsarClient to a sub class.
13+
*
14+
* @author Cedrick LUNVEN (@clunven)
15+
*/
16+
public class PulsarAdminProvider implements Supplier<PulsarAdmin>{
17+
18+
/* Use as singleton. */
19+
private PulsarAdmin pulsarAdmin;
20+
21+
/**
22+
* Default constructor.
23+
*
24+
* @param tenant
25+
* parent tenant
26+
*/
27+
public PulsarAdminProvider(Tenant tenant) {
28+
try {
29+
pulsarAdmin = PulsarAdmin.builder()
30+
.allowTlsInsecureConnection(false)
31+
.enableTlsHostnameVerification(true)
32+
.useKeyStoreTls(false)
33+
.tlsTrustStoreType("JKS")
34+
.tlsTrustStorePath("")
35+
.tlsTrustStorePassword("")
36+
.serviceHttpUrl(tenant.getWebServiceUrl())
37+
.authentication(AuthenticationFactory.token(tenant.getPulsarToken()))
38+
.build();
39+
} catch (PulsarClientException e) {
40+
throw new IllegalArgumentException("Cannot use Pulsar admin", e);
41+
}
42+
}
43+
44+
/** {@inheritDoc} */
45+
@Override
46+
public PulsarAdmin get() {
47+
return pulsarAdmin;
48+
}
49+
50+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.datastax.astra.sdk.streaming;
2+
3+
import java.util.function.Supplier;
4+
5+
import org.apache.pulsar.client.api.AuthenticationFactory;
6+
import org.apache.pulsar.client.api.PulsarClient;
7+
import org.apache.pulsar.client.api.PulsarClientException;
8+
9+
import com.datastax.astra.sdk.streaming.domain.Tenant;
10+
11+
/**
12+
* Delegate PulsarClient to a sub class.
13+
*
14+
* @author Cedrick LUNVEN (@clunven)
15+
*/
16+
public class PulsarClientProvider implements Supplier<PulsarClient>{
17+
18+
/* Use as singleton. */
19+
private PulsarClient pulsarClient;
20+
21+
/**
22+
* Default constructor.
23+
*
24+
* @param tenant
25+
* parent tenant
26+
*/
27+
public PulsarClientProvider(Tenant tenant) {
28+
try {
29+
pulsarClient = PulsarClient.builder()
30+
.serviceUrl(tenant.getBrokerServiceUrl())
31+
.authentication(AuthenticationFactory.token(tenant.getPulsarToken()))
32+
.build();
33+
} catch (PulsarClientException e) {
34+
throw new IllegalArgumentException("Cannot connect to pulsar", e);
35+
}
36+
}
37+
38+
/** {@inheritDoc} */
39+
@Override
40+
public PulsarClient get() {
41+
return pulsarClient;
42+
}
43+
44+
}

astra-sdk/src/main/java/com/datastax/astra/sdk/streaming/TenantClient.java

Lines changed: 16 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,6 @@
88
import java.util.Optional;
99
import java.util.stream.Stream;
1010

11-
import org.apache.pulsar.client.admin.PulsarAdmin;
12-
import org.apache.pulsar.client.api.AuthenticationFactory;
13-
import org.apache.pulsar.client.api.PulsarClient;
14-
import org.apache.pulsar.client.api.PulsarClientException;
15-
1611
import com.datastax.astra.sdk.streaming.domain.CreateTenant;
1712
import com.datastax.astra.sdk.streaming.domain.Tenant;
1813
import com.datastax.astra.sdk.streaming.domain.TenantLimit;
@@ -32,11 +27,11 @@ public class TenantClient {
3227
/** Streaming client. */
3328
private final StreamingClient streamClient;
3429

35-
/** we woudl like to use client and admin as singletong for a tenant. */
36-
private PulsarClient pulsarClient;
37-
38-
/** we woudl like to use client and admin as singletong for a tenant. */
39-
private PulsarAdmin pulsarAdmin;
30+
/** Pulsar Client wrapper. */
31+
private PulsarClientProvider pulsarClientProvider;
32+
33+
/** Pulsar Admin wrapper. */
34+
private PulsarAdminProvider pulsarAdminProvider;
4035

4136
/** Syntax sugar. */
4237
private HttpApisClient http = HttpApisClient.getInstance();
@@ -122,31 +117,24 @@ public Stream<TenantLimit> limits() {
122117
}
123118

124119
// ---------------------------------
125-
// ---- PulsarClient ----
120+
// ---- PulsarClient ----
126121
// ---------------------------------
127122

128123
/**
129-
* Create a client.
124+
* Accessing pulsarClient.
130125
*
131126
* @return
132-
* pulsar client.
127+
* pulsar client provider
133128
*/
134-
public PulsarClient pulsarClient() {
135-
if (pulsarClient == null) {
129+
public PulsarClientProvider pulsarClient() {
130+
if (pulsarClientProvider ==null) {
136131
Optional<Tenant> tenant = find();
137132
if (!tenant.isPresent()) {
138133
throw new IllegalArgumentException("Tenant " + tenantId + " cannot be found");
139134
}
140-
try {
141-
pulsarClient = PulsarClient.builder()
142-
.serviceUrl(tenant.get().getBrokerServiceUrl())
143-
.authentication(AuthenticationFactory.token(tenant.get().getPulsarToken()))
144-
.build();
145-
} catch (PulsarClientException e) {
146-
throw new IllegalArgumentException("Cannot connect to pulsar", e);
147-
}
135+
pulsarClientProvider = new PulsarClientProvider(tenant.get());
148136
}
149-
return pulsarClient;
137+
return pulsarClientProvider;
150138
}
151139

152140
// ---------------------------------
@@ -159,28 +147,15 @@ public PulsarClient pulsarClient() {
159147
* @return
160148
* pulsar admin
161149
*/
162-
public PulsarAdmin pulsarAdmin() {
163-
if (pulsarAdmin == null) {
150+
public PulsarAdminProvider pulsarAdmin() {
151+
if (pulsarAdminProvider ==null) {
164152
Optional<Tenant> tenant = find();
165153
if (!tenant.isPresent()) {
166154
throw new IllegalArgumentException("Tenant " + tenantId + " cannot be found");
167155
}
168-
try {
169-
pulsarAdmin = PulsarAdmin.builder()
170-
.allowTlsInsecureConnection(false)
171-
.enableTlsHostnameVerification(true)
172-
.useKeyStoreTls(false)
173-
.tlsTrustStoreType("JKS")
174-
.tlsTrustStorePath("")
175-
.tlsTrustStorePassword("")
176-
.serviceHttpUrl(tenant.get().getWebServiceUrl())
177-
.authentication(AuthenticationFactory.token(tenant.get().getPulsarToken()))
178-
.build();
179-
} catch (PulsarClientException e) {
180-
throw new IllegalArgumentException("Cannot use Pulsar admin", e);
181-
}
156+
pulsarAdminProvider = new PulsarAdminProvider(tenant.get());
182157
}
183-
return pulsarAdmin;
158+
return pulsarAdminProvider;
184159
}
185160

186161
// ---------------------------------

astra-sdk/src/test/java/com/datastax/astra/sdk/devops/ApiDevopsStreamingAstraTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public void should_create_tenant() throws InterruptedException {
9494
// When
9595
Assert.assertFalse(sc.tenant(tmpTenant).exist());
9696
LOGGER.info("Tenant " + tmpTenant + " does not exist");
97-
sc.createTenant( new CreateTenant(tmpTenant, "cedrick.lunven@datastax.com"));
97+
sc.createTenant( new CreateTenant(tmpTenant, "astra-cli@datastax.com"));
9898
Thread.sleep(1000);
9999
Assert.assertTrue(sc.tenant(tmpTenant).exist());
100100
LOGGER.info("Tenant " + tmpTenant + " now exist");
@@ -109,7 +109,7 @@ public void should_work_withTenant() throws Exception {
109109

110110
try(PulsarAdmin admin = astraClient.apiDevopsStreaming()
111111
.tenant(tmpTenant)
112-
.pulsarAdmin()) {
112+
.pulsarAdmin().get()) {
113113
Assert.assertTrue(admin
114114
.namespaces()
115115
.getNamespaces(tmpTenant).size() > 0);

0 commit comments

Comments
 (0)