Skip to content

Commit 4c208b4

Browse files
committed
DeleteJobRequest and GetJobRequest API's
1 parent 591e58d commit 4c208b4

File tree

4 files changed

+102
-11
lines changed

4 files changed

+102
-11
lines changed

sdk/src/main/java/io/dapr/client/DaprClient.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -666,14 +666,6 @@ Flux<SubscribeConfigurationResponse> subscribeConfiguration(String storeName, Li
666666
*/
667667
Mono<UnsubscribeConfigurationResponse> unsubscribeConfiguration(UnsubscribeConfigurationRequest request);
668668

669-
/**
670-
* ScheduleJobAlpha1 creates and schedules a job.
671-
*
672-
* @param <T> The type of the data for the job.
673-
* @param job job to be scheduled
674-
* @return a Mono plan of type Void.
675-
*/
676-
<T> Mono<Void> scheduleJobAlpha1(Job<T> job);
677669

678670
/**
679671
* Returns a newly created gRPC stub with proper interceptors and channel for gRPC proxy invocation.

sdk/src/main/java/io/dapr/client/DaprClientImpl.java

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package io.dapr.client;
1515

16+
import com.fasterxml.jackson.databind.ObjectMapper;
1617
import com.google.common.base.Strings;
1718
import com.google.protobuf.Any;
1819
import com.google.protobuf.ByteString;
@@ -59,6 +60,7 @@
5960
import io.dapr.client.domain.UnsubscribeConfigurationRequest;
6061
import io.dapr.client.domain.UnsubscribeConfigurationResponse;
6162
import io.dapr.client.resiliency.ResiliencyOptions;
63+
import io.dapr.config.Properties;
6264
import io.dapr.exceptions.DaprException;
6365
import io.dapr.internal.exceptions.DaprHttpException;
6466
import io.dapr.internal.grpc.DaprClientGrpcInterceptors;
@@ -92,6 +94,7 @@
9294
import reactor.util.retry.Retry;
9395

9496
import java.io.IOException;
97+
import java.nio.charset.Charset;
9598
import java.time.Duration;
9699
import java.util.ArrayList;
97100
import java.util.Arrays;
@@ -140,6 +143,10 @@ public class DaprClientImpl extends AbstractDaprClient {
140143
private final DaprHttp httpClient;
141144

142145
private final DaprClientGrpcInterceptors grpcInterceptors;
146+
147+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
148+
149+
private static final Charset CHARSET = Properties.STRING_CHARSET.get();
143150

144151
/**
145152
* Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder
@@ -1313,10 +1320,15 @@ public <T> Mono<Void> scheduleJobAlpha1(Job<T> job) {
13131320

13141321
DaprProtos.Job.Builder jobBuilder = DaprProtos.Job.newBuilder()
13151322
.setName(name);
1316-
if (data instanceof Message) {
1317-
jobBuilder.setData(Any.pack((Message)job.getData()));
1323+
if (data instanceof String) {
1324+
jobBuilder.setData(Any.newBuilder().setValue(ByteString.copyFrom((String) data, CHARSET)));
1325+
} else if (data instanceof byte[]) {
1326+
String base64 = OBJECT_MAPPER.writeValueAsString(data);
1327+
jobBuilder.setData(Any.newBuilder().setValue(ByteString.copyFrom(base64, CHARSET)));
13181328
} else {
1319-
jobBuilder.setData(Any.newBuilder().setValue(ByteString.copyFrom(this.objectSerializer.serialize(data))));
1329+
return Mono.error(() -> {
1330+
throw new IllegalArgumentException("Job data value must be String or byte[]");
1331+
});
13201332
}
13211333
if (job.getSchedule() != null && !job.getSchedule().trim().isEmpty()) {
13221334
jobBuilder.setSchedule(job.getSchedule());
@@ -1342,6 +1354,46 @@ public <T> Mono<Void> scheduleJobAlpha1(Job<T> job) {
13421354
return DaprException.wrapMono(ex);
13431355
}
13441356
}
1357+
1358+
@SuppressWarnings("unchecked")
1359+
@Override
1360+
public <T> Mono<Job<T>> getJobAlpha1(String name, Class<T> clazz) {
1361+
try {
1362+
if (name == null || name.trim().isEmpty()) {
1363+
throw new IllegalArgumentException("Job name cannot be null or empty");
1364+
}
1365+
return this.<DaprProtos.GetJobResponse>createMono(
1366+
it -> intercept(null, asyncStub).getJobAlpha1(DaprProtos.GetJobRequest.newBuilder().setName(name).build(), it))
1367+
.map(it -> {
1368+
DaprProtos.Job _job = it.getJob();
1369+
T data = null;
1370+
if (clazz.isInstance(String.class)) {
1371+
data = (T)_job.getData().toByteString().toString(CHARSET);
1372+
} else if (clazz.isInstance(byte[].class)) {
1373+
data = (T) _job.getData().toByteArray();
1374+
} else {
1375+
throw new IllegalArgumentException("Job data type must be String or byte[]");
1376+
}
1377+
return new Job<>(_job.getName(), _job.getSchedule(), _job.getRepeats(), _job.getDueTime(), _job.getTtl(), data);
1378+
});
1379+
} catch (Exception ex) {
1380+
return DaprException.wrapMono(ex);
1381+
}
1382+
}
1383+
1384+
@Override
1385+
public Mono<Void> deleteJobAlpha1(String name) {
1386+
try {
1387+
if (name == null || name.trim().isEmpty()) {
1388+
throw new IllegalArgumentException("Job name cannot be null or empty");
1389+
}
1390+
return this.<DaprProtos.DeleteJobResponse>createMono(
1391+
it -> intercept(null, asyncStub).deleteJobAlpha1(DaprProtos.DeleteJobRequest.newBuilder().setName(name).build(), it))
1392+
.then();
1393+
} catch (Exception ex) {
1394+
return DaprException.wrapMono(ex);
1395+
}
1396+
}
13451397

13461398
/**
13471399
* Build a new Configuration Item from provided parameter.

sdk/src/main/java/io/dapr/client/DaprPreviewClient.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.dapr.client.domain.BulkPublishRequest;
1818
import io.dapr.client.domain.BulkPublishResponse;
1919
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
20+
import io.dapr.client.domain.Job;
2021
import io.dapr.client.domain.LockRequest;
2122
import io.dapr.client.domain.QueryStateRequest;
2223
import io.dapr.client.domain.QueryStateResponse;
@@ -268,4 +269,30 @@ <T> Mono<BulkPublishResponse<T>> publishEvents(String pubsubName, String topicNa
268269
*/
269270
<T> Subscription subscribeToEvents(
270271
String pubsubName, String topic, SubscriptionListener<T> listener, TypeRef<T> type);
272+
273+
/**
274+
* ScheduleJobAlpha1 creates and schedules a job.
275+
*
276+
* @param <T> The type of the data for the job.
277+
* @param job job to be scheduled
278+
* @return a Mono plan of type Void.
279+
*/
280+
<T> Mono<Void> scheduleJobAlpha1(Job<T> job);
281+
282+
/**
283+
* GetJobAlpha1 retrieve Job by name.
284+
*
285+
* @param <T> The type of the data for the job.
286+
* @param name name of the job
287+
* @return a Mono of Job
288+
*/
289+
<T> Mono<Job<T>> getJobAlpha1(String name, Class<T> clazz);
290+
291+
/**
292+
* Delete a Job.
293+
*
294+
* @param name name of the job
295+
* @return
296+
*/
297+
Mono<Void> deleteJobAlpha1(String name);
271298
}

sdk/src/main/java/io/dapr/client/domain/Job.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,26 @@ public Job(String name, T data) {
3232
this.data = data;
3333
}
3434

35+
/**
36+
* Constructor for Job
37+
*
38+
* @param name name of the job to create
39+
* @param schedule schedule for the job
40+
* @param repeats jobs with fixed repeat counts (accounting for Actor Reminders).
41+
* @param dueTime sets time at which or time interval before the callback is invoked for the first time.
42+
* @param ttl Time To Live to allow for auto deletes (accounting for Actor Reminders).
43+
* @param data Job data
44+
*/
45+
public Job(String name, String schedule, Integer repeats, String dueTime, String ttl, T data) {
46+
super();
47+
this.name = name;
48+
this.schedule = schedule;
49+
this.repeats = repeats;
50+
this.dueTime = dueTime;
51+
this.ttl = ttl;
52+
this.data = data;
53+
}
54+
3555
public String getSchedule() {
3656
return schedule;
3757
}

0 commit comments

Comments
 (0)