Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import java.io.Serializable;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.github.f4b6a3.uuid.UuidCreator;

Expand All @@ -13,6 +15,15 @@
* Represents the ID of a persistentTask, which is currently not running.
*/
public record TaskId<T extends Serializable>(String name) implements Serializable {

@SuppressWarnings("rawtypes")
private static final Map<String, TaskId> CACHE = new ConcurrentHashMap<>();

@SuppressWarnings("unchecked")
public static <T extends Serializable> TaskId<T> of(String taskId) {
if (taskId == null || taskId.isBlank()) return null;
return CACHE.computeIfAbsent(taskId, s -> new TaskId<>(s));
}

public TriggerBuilder<T> newTrigger() {
return new TriggerBuilder<>(this);
Expand All @@ -36,11 +47,6 @@ public TriggerRequest<T> newUniqueTrigger(T state) {
.build();
}

public static TaskId<Serializable> of(String taskId) {
if (taskId == null || taskId.isBlank()) return null;
return new TaskId<>(taskId);
}

@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public static class TriggerBuilder<T extends Serializable> {
private final TaskId<T> taskId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ public static TriggerKey of(@Nullable String id, TaskId<? extends Serializable>
return new TriggerKey(id, taskId.name());
}

public TaskId<Serializable> toTaskId() {
public <T extends Serializable> TaskId<T> toTaskId() {
if (taskName == null) return null;
return new TaskId<>(taskName);
return TaskId.of(taskName);
}

/**
* Builds a trigger for the given persistentTask name
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.sterl.spring.persistent_tasks.api.task;

import java.io.Serializable;

import org.springframework.lang.NonNull;
import org.sterl.spring.persistent_tasks.api.TaskId;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import lombok.RequiredArgsConstructor;

/**
* Default implementation to use jackson instead java serialization for a task state.
*
* @param <T> the type of the state for the deserialization
*/
@RequiredArgsConstructor
public class JacksonStateSerializer<T extends Serializable> implements StateSerializer<T> {

private final ObjectMapper mapper;
private final Class<T> clazz;

@Override
public byte[] serialize(@NonNull TaskId<T> id, @NonNull T obj)
throws SerializationFailedException{

try {
return mapper.writeValueAsBytes(obj);
} catch (JsonProcessingException e) {
throw new SerializationFailedException(obj, e);
}
}

@Override
public T deserialize(@NonNull TaskId<T> id, @NonNull byte[] bytes)
throws DeSerializationFailedException {
try {
return mapper.readValue(bytes, clazz);
} catch (Exception e) {
throw new DeSerializationFailedException(bytes, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.sterl.spring.persistent_tasks.api.task;

import java.io.Serializable;

/**
* /**
* Any task may provide an own serialization class.
*
* @since 2.3.0
* @param <T> the type of the task state
*/
public interface SerializationProvider<T extends Serializable> {
StateSerializer<T> getSerializer();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.sterl.spring.persistent_tasks.api.task;

import java.io.Serializable;

import org.springframework.lang.NonNull;
import org.sterl.spring.persistent_tasks.api.TaskId;
import org.sterl.spring.persistent_tasks.exception.SpringPersistentTaskException;

/**
* Interface for a state serialization of task.
*
* @since 2.3.0
* @param <T> the state type
*/
public interface StateSerializer<T extends Serializable> {

byte[] serialize(@NonNull TaskId<T> id, @NonNull T obj) throws SerializationFailedException;

T deserialize(@NonNull TaskId<T> id, @NonNull byte[] bytes) throws DeSerializationFailedException;

class DeSerializationFailedException extends SpringPersistentTaskException {
private static final long serialVersionUID = 1L;

public DeSerializationFailedException(byte[] bytes, Exception e) {
super("Failed to deserialize state of length " + bytes.length, bytes, e);
}
}

class SerializationFailedException extends SpringPersistentTaskException {
private static final long serialVersionUID = 1L;

public SerializationFailedException(Serializable obj, Exception e) {
super("Failed to serialize state " + obj.getClass(), obj, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,36 @@
package org.sterl.spring.persistent_tasks.config;

import java.io.Serializable;

import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.boot.autoconfigure.AutoConfigurationPackage;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Role;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.sterl.spring.persistent_tasks.EnableSpringPersistentTasks;
import org.sterl.spring.persistent_tasks.api.task.StateSerializer;
import org.sterl.spring.persistent_tasks.trigger.component.DefaultStateSerializer;

import lombok.extern.slf4j.Slf4j;

@EnableScheduling
@EnableAsync
@AutoConfigurationPackage(basePackageClasses = EnableSpringPersistentTasks.class)
@ComponentScan(basePackageClasses = EnableSpringPersistentTasks.class)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
@Configuration
@Slf4j
public class SpringPersistentTasksConfig {

@Bean
@ConditionalOnMissingBean(DefaultStateSerializer.class)
StateSerializer<Serializable> defaultStateSerializer() {
log.info("Using java serialization for task states.",
DefaultStateSerializer.class.getSimpleName());
return new DefaultStateSerializer();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.sterl.spring.persistent_tasks.history.api;

import org.springframework.lang.NonNull;
import org.springframework.stereotype.Component;
import org.sterl.spring.persistent_tasks.api.Trigger;
import org.sterl.spring.persistent_tasks.history.model.CompletedTriggerEntity;
import org.sterl.spring.persistent_tasks.shared.ExtendetConvert;
import org.sterl.spring.persistent_tasks.shared.converter.ToTrigger;

import lombok.RequiredArgsConstructor;

@Component
@RequiredArgsConstructor
public class FromCompletedTriggerEntityConverter implements ExtendetConvert<CompletedTriggerEntity, Trigger> {

private final ToTrigger toTrigger;

@NonNull
@Override
public Trigger convert(@NonNull CompletedTriggerEntity source) {
var result = toTrigger.convert(source);
result.setId(source.getId());
result.setInstanceId(source.getId());
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,12 @@

import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import org.sterl.spring.persistent_tasks.api.Trigger;
import org.sterl.spring.persistent_tasks.api.HistoryTrigger;
import org.sterl.spring.persistent_tasks.history.model.CompletedTriggerEntity;
import org.sterl.spring.persistent_tasks.history.model.HistoryTriggerEntity;
import org.sterl.spring.persistent_tasks.shared.ExtendetConvert;
import org.sterl.spring.persistent_tasks.shared.converter.ToTrigger;

interface HistoryConverter {

enum FromLastTriggerStateEntity implements ExtendetConvert<CompletedTriggerEntity, Trigger> {
INSTANCE;

@NonNull
@Override
public Trigger convert(@NonNull CompletedTriggerEntity source) {
var result = ToTrigger.INSTANCE.convert(source);
result.setId(source.getId());
result.setInstanceId(source.getId());
return result;
}
}

enum ToHistoryTrigger implements ExtendetConvert<HistoryTriggerEntity, HistoryTrigger> {
INSTANCE;

Expand All @@ -41,6 +25,5 @@ public HistoryTrigger convert(@NonNull HistoryTriggerEntity source) {
result.setStatus(source.getStatus());
return result;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.sterl.spring.persistent_tasks.api.HistoryTrigger;
import org.sterl.spring.persistent_tasks.api.TaskStatusHistoryOverview;
import org.sterl.spring.persistent_tasks.api.Trigger;
import org.sterl.spring.persistent_tasks.api.TriggerGroup;
import org.sterl.spring.persistent_tasks.api.HistoryTrigger;
import org.sterl.spring.persistent_tasks.api.TriggerKey;
import org.sterl.spring.persistent_tasks.api.TriggerSearch;
import org.sterl.spring.persistent_tasks.history.HistoryService;
import org.sterl.spring.persistent_tasks.history.api.HistoryConverter.FromLastTriggerStateEntity;
import org.sterl.spring.persistent_tasks.history.api.HistoryConverter.ToHistoryTrigger;

import lombok.RequiredArgsConstructor;
Expand All @@ -31,6 +30,7 @@ public class TriggerHistoryResource {

public static final String PATH_GROUP = "history-grouped";
private final HistoryService historyService;
private final FromCompletedTriggerEntityConverter converter;

@GetMapping("history/instance/{instanceId}")
public PagedModel<HistoryTrigger> listInstances(
Expand All @@ -56,7 +56,7 @@ public PagedModel<Trigger> list(
TriggerSearch search,
@PageableDefault(size = 100) Pageable page) {

return FromLastTriggerStateEntity.INSTANCE.toPage( //
return converter.toPage( //
historyService.searchTriggers(search, page));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
package org.sterl.spring.persistent_tasks.shared.converter;

import org.springframework.lang.NonNull;
import org.springframework.stereotype.Component;
import org.sterl.spring.persistent_tasks.api.Trigger;
import org.sterl.spring.persistent_tasks.shared.ExtendetConvert;
import org.sterl.spring.persistent_tasks.shared.model.HasTrigger;
import org.sterl.spring.persistent_tasks.shared.model.TriggerEntity;
import org.sterl.spring.persistent_tasks.trigger.component.StateSerializer;
import org.sterl.spring.persistent_tasks.trigger.component.StateSerializationComponent;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public enum ToTrigger implements ExtendetConvert<HasTrigger, Trigger> {
INSTANCE;

private final static StateSerializer SERIALIZER = new StateSerializer();
@Component
@RequiredArgsConstructor
public class ToTrigger implements ExtendetConvert<HasTrigger, Trigger> {

private final StateSerializationComponent stateSerialization;

@NonNull
@Override
Expand All @@ -33,7 +36,7 @@ public Trigger convert(@NonNull HasTrigger hasData) {
result.setRunningDurationInMs(source.getRunningDurationInMs());
result.setStart(source.getStart());
try {
result.setState(SERIALIZER.deserialize(source.getState()));
result.setState(stateSerialization.deserialize(source));
} catch (Exception e) {
var info = """
Failed to deserialize state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,43 @@
import org.springframework.transaction.support.TransactionTemplate;
import org.sterl.spring.persistent_tasks.api.TaskId;
import org.sterl.spring.persistent_tasks.api.task.PersistentTask;
import org.sterl.spring.persistent_tasks.api.task.SerializationProvider;
import org.sterl.spring.persistent_tasks.api.task.StateSerializer;
import org.sterl.spring.persistent_tasks.task.component.TaskTransactionComponent;
import org.sterl.spring.persistent_tasks.task.repository.TaskRepository;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Service
@RequiredArgsConstructor
@Slf4j
public class TaskService {

private final TaskTransactionComponent taskTransactionComponent;
private final TaskRepository taskRepository;

@SuppressWarnings("rawtypes")
private Map<TaskId, StateSerializer> serializers = new ConcurrentHashMap<>();

// TODO move it to the component
private final Map<PersistentTask<? extends Serializable>, Optional<TransactionTemplate>> cache = new ConcurrentHashMap<>();

public Set<TaskId<? extends Serializable>> findAllTaskIds() {
return this.taskRepository.all();
}

public <T extends Serializable> void registerStateSerializer(
@NonNull TaskId<T> id,
@NonNull StateSerializer<T> serializer) {
log.info("{} brings an own state serializer: {}", id, serializer.getClass().getSimpleName());
serializers.put(id, serializer);
}
@SuppressWarnings("unchecked")
public <T extends Serializable> StateSerializer<T> getStateSerializer(TaskId<T> taskId,
StateSerializer<?> fallBack) {
return serializers.getOrDefault(taskId, fallBack);
}

public <T extends Serializable> Optional<PersistentTask<T>> get(TaskId<T> id) {
return taskRepository.get(id);
Expand Down Expand Up @@ -84,16 +105,25 @@ public <T extends Serializable> TaskId<T> register(String name, PersistentTask<T
/**
* A way to manually register a PersistentTask, usually not needed as spring beans will be added automatically.
*/
@SuppressWarnings("unchecked")
public <T extends Serializable> TaskId<T> register(TaskId<T> id, PersistentTask<T> task) {
taskTransactionComponent.buildOrGetDefaultTransactionTemplate(task);
if (task instanceof SerializationProvider sp) {
registerStateSerializer(id, sp.getSerializer());
}
return taskRepository.addTask(id, task);
}
/**
* A way to manually register a PersistentTask, usually not needed as spring beans will be added automatically.
*/
@SuppressWarnings("unchecked")
public <T extends Serializable> TaskId<T> replace(String name, PersistentTask<T> task) {
var id = (TaskId<T>)TaskId.of(name);
return replace(TaskId.of(name), task);
}

/**
* A way to manually register a PersistentTask, usually not needed as spring beans will be added automatically.
*/
public <T extends Serializable> TaskId<T> replace(TaskId<T> id, PersistentTask<T> task) {
taskRepository.remove(id);
return register(id, task);
}
Expand Down
Loading