diff --git a/spring-modulith-events/spring-modulith-events-neo4j/src/main/java/org/springframework/modulith/events/neo4j/Neo4jEventPublication.java b/spring-modulith-events/spring-modulith-events-neo4j/src/main/java/org/springframework/modulith/events/neo4j/Neo4jEventPublication.java index 16e74e000..a3cb15ae1 100644 --- a/spring-modulith-events/spring-modulith-events-neo4j/src/main/java/org/springframework/modulith/events/neo4j/Neo4jEventPublication.java +++ b/spring-modulith-events/spring-modulith-events-neo4j/src/main/java/org/springframework/modulith/events/neo4j/Neo4jEventPublication.java @@ -19,6 +19,7 @@ import java.util.UUID; import org.jspecify.annotations.Nullable; +import org.springframework.modulith.events.EventPublication; /** * The event publication entity definition. @@ -33,11 +34,15 @@ class Neo4jEventPublication { public final String listenerId; public final Object event; public final String eventHash; + public final int completionAttempts; public @Nullable Instant completionDate; + public @Nullable Instant lastResubmissionDate; + public EventPublication.Status status; public Neo4jEventPublication(UUID identifier, Instant publicationDate, String listenerId, Object event, - String eventHash, @Nullable Instant completionDate) { + String eventHash, @Nullable Instant completionDate, EventPublication.Status status, int completionAttempts, + @Nullable Instant lastResubmissionDate) { this.identifier = identifier; this.publicationDate = publicationDate; @@ -45,5 +50,8 @@ public Neo4jEventPublication(UUID identifier, Instant publicationDate, String li this.event = event; this.eventHash = eventHash; this.completionDate = completionDate; + this.status = status; + this.lastResubmissionDate = lastResubmissionDate; + this.completionAttempts = completionAttempts; } } diff --git a/spring-modulith-events/spring-modulith-events-neo4j/src/main/java/org/springframework/modulith/events/neo4j/Neo4jEventPublicationRepository.java b/spring-modulith-events/spring-modulith-events-neo4j/src/main/java/org/springframework/modulith/events/neo4j/Neo4jEventPublicationRepository.java index 91153054f..a8cb006e5 100644 --- a/spring-modulith-events/spring-modulith-events-neo4j/src/main/java/org/springframework/modulith/events/neo4j/Neo4jEventPublicationRepository.java +++ b/spring-modulith-events/spring-modulith-events-neo4j/src/main/java/org/springframework/modulith/events/neo4j/Neo4jEventPublicationRepository.java @@ -21,6 +21,7 @@ import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -41,6 +42,7 @@ import org.neo4j.driver.types.TypeSystem; import org.springframework.data.neo4j.core.Neo4jClient; import org.springframework.data.util.Lazy; +import org.springframework.modulith.events.EventPublication; import org.springframework.modulith.events.core.EventPublicationRepository; import org.springframework.modulith.events.core.EventSerializer; import org.springframework.modulith.events.core.PublicationTargetIdentifier; @@ -68,12 +70,17 @@ class Neo4jEventPublicationRepository implements EventPublicationRepository { private static final String LISTENER_ID = "listenerId"; private static final String PUBLICATION_DATE = "publicationDate"; private static final String COMPLETION_DATE = "completionDate"; + private static final String STATUS = "status"; + private static final String COMPLETION_ATTEMPTS = "completionAttempts"; + private static final String LAST_RESUBMISSION_DATE = "lastResubmissionDate"; + + // return references + private static final String STATUS_COUNT = "statusCount"; private static final Collection ALL_PROPERTIES = List.of(ID, EVENT_SERIALIZED, EVENT_HASH, EVENT_TYPE, - LISTENER_ID, PUBLICATION_DATE, COMPLETION_DATE); + LISTENER_ID, PUBLICATION_DATE, COMPLETION_DATE, STATUS, COMPLETION_ATTEMPTS, LAST_RESUBMISSION_DATE); - private static final Node EVENT_PUBLICATION_NODE = node("Neo4jEventPublication") - .named("neo4jEventPublication"); + private static final Node EVENT_PUBLICATION_NODE = node("Neo4jEventPublication").named("neo4jEventPublication"); private static final Node EVENT_PUBLICATION_ARCHIVE_NODE = node("Neo4jEventPublicationArchive") .named("neo4jEventPublicationArchive"); @@ -81,38 +88,27 @@ class Neo4jEventPublicationRepository implements EventPublicationRepository { private static final Statement INCOMPLETE_BY_EVENT_AND_TARGET_IDENTIFIER_STATEMENT = match(EVENT_PUBLICATION_NODE) .where(EVENT_PUBLICATION_NODE.property(EVENT_HASH).eq(parameter(EVENT_HASH))) .and(EVENT_PUBLICATION_NODE.property(LISTENER_ID).eq(parameter(LISTENER_ID))) - .and(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).isNull()) - .returning(EVENT_PUBLICATION_NODE) - .build(); + .and(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).isNull()).returning(EVENT_PUBLICATION_NODE).build(); private static final Statement DELETE_BY_EVENT_AND_LISTENER_ID = match(EVENT_PUBLICATION_NODE) .where(EVENT_PUBLICATION_NODE.property(EVENT_HASH).eq(parameter(EVENT_HASH))) - .and(EVENT_PUBLICATION_NODE.property(LISTENER_ID).eq(parameter(LISTENER_ID))) - .delete(EVENT_PUBLICATION_NODE) + .and(EVENT_PUBLICATION_NODE.property(LISTENER_ID).eq(parameter(LISTENER_ID))).delete(EVENT_PUBLICATION_NODE) .build(); private static final Statement DELETE_BY_ID_STATEMENT = match(EVENT_PUBLICATION_NODE) - .where(EVENT_PUBLICATION_NODE.property(ID).in(parameter(ID))) - .delete(EVENT_PUBLICATION_NODE) - .build(); + .where(EVENT_PUBLICATION_NODE.property(ID).in(parameter(ID))).delete(EVENT_PUBLICATION_NODE).build(); private static final Function DELETE_COMPLETED_STATEMENT = node -> match(node) - .where(node.property(COMPLETION_DATE).isNotNull()) - .delete(node) - .build(); + .where(node.property(COMPLETION_DATE).isNotNull()).delete(node).build(); private static final Function DELETE_COMPLETED_BEFORE_STATEMENT = node -> match(node) .where(node.property(PUBLICATION_DATE).lt(parameter(PUBLICATION_DATE))) - .and(node.property(COMPLETION_DATE).isNotNull()) - .delete(node) - .build(); + .and(node.property(COMPLETION_DATE).isNotNull()).delete(node).build(); private static final Statement INCOMPLETE_PUBLISHED_BEFORE_STATEMENT = match(EVENT_PUBLICATION_NODE) .where(EVENT_PUBLICATION_NODE.property(PUBLICATION_DATE).lt(parameter(PUBLICATION_DATE))) - .and(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).isNull()) - .returning(EVENT_PUBLICATION_NODE) - .orderBy(EVENT_PUBLICATION_NODE.property(PUBLICATION_DATE)) - .build(); + .and(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).isNull()).returning(EVENT_PUBLICATION_NODE) + .orderBy(EVENT_PUBLICATION_NODE.property(PUBLICATION_DATE)).build(); private static final Statement CREATE_STATEMENT = Cypher.create(EVENT_PUBLICATION_NODE) .set(EVENT_PUBLICATION_NODE.property(ID).to(parameter(ID))) @@ -121,28 +117,40 @@ class Neo4jEventPublicationRepository implements EventPublicationRepository { .set(EVENT_PUBLICATION_NODE.property(EVENT_TYPE).to(parameter(EVENT_TYPE))) .set(EVENT_PUBLICATION_NODE.property(LISTENER_ID).to(parameter(LISTENER_ID))) .set(EVENT_PUBLICATION_NODE.property(PUBLICATION_DATE).to(parameter(PUBLICATION_DATE))) - .set(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).to(parameter(COMPLETION_DATE))) - .build(); + .set(EVENT_PUBLICATION_NODE.property(STATUS).to(parameter(STATUS))).build(); private static final Statement COMPLETE_STATEMENT = match(EVENT_PUBLICATION_NODE) .where(EVENT_PUBLICATION_NODE.property(EVENT_HASH).eq(parameter(EVENT_HASH))) .and(EVENT_PUBLICATION_NODE.property(LISTENER_ID).eq(parameter(LISTENER_ID))) .and(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).isNull()) - .set(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).to(parameter(COMPLETION_DATE))) - .build(); - - private static final Lazy COMPLETE_IN_ARCHIVE_BY_ID_STATEMENT = Lazy - .of(() -> applyProperties(match(EVENT_PUBLICATION_NODE) - .where(EVENT_PUBLICATION_NODE.property(ID).eq(parameter(ID))) + .set(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).to(parameter(COMPLETION_DATE))).build(); + + private static final Statement COUNT_BY_STATUS_STATEMENT = match(EVENT_PUBLICATION_NODE) + .where(EVENT_PUBLICATION_NODE.property(STATUS).eq(parameter(STATUS))) + .returning(count(EVENT_PUBLICATION_NODE).as(STATUS_COUNT)).build(); + + private static final Statement UPDATE_STATUS_STATEMENT = match(EVENT_PUBLICATION_NODE) + .where(EVENT_PUBLICATION_NODE.property(ID).eq(parameter(ID))) + .and(EVENT_PUBLICATION_NODE.property(STATUS).ne(parameter(STATUS))) + .set(EVENT_PUBLICATION_NODE.property(STATUS).to(parameter(STATUS))).build(); + + private static final Statement RESUBMIT_STATEMENT = match(EVENT_PUBLICATION_NODE) + .where(EVENT_PUBLICATION_NODE.property(ID).eq(parameter(ID))) + .and(EVENT_PUBLICATION_NODE.property(STATUS).ne(literalOf(EventPublication.Status.RESUBMITTED.name()))) + .set(EVENT_PUBLICATION_NODE.property(STATUS).to(parameter(STATUS))) + .set(EVENT_PUBLICATION_NODE.property(COMPLETION_ATTEMPTS) + .to(EVENT_PUBLICATION_NODE.property(COMPLETION_ATTEMPTS).add(literalOf(1)))) + .set(EVENT_PUBLICATION_NODE.property(LAST_RESUBMISSION_DATE).to(parameter(LAST_RESUBMISSION_DATE))).build(); + + private static final Lazy COMPLETE_IN_ARCHIVE_BY_ID_STATEMENT = Lazy.of( + () -> applyProperties(match(EVENT_PUBLICATION_NODE).where(EVENT_PUBLICATION_NODE.property(ID).eq(parameter(ID))) .and(not(exists(match(EVENT_PUBLICATION_ARCHIVE_NODE) - .where(EVENT_PUBLICATION_ARCHIVE_NODE.property(ID).eq(parameter(ID))) - .returning(literalTrue()).build()))) + .where(EVENT_PUBLICATION_ARCHIVE_NODE.property(ID).eq(parameter(ID))).returning(literalTrue()).build()))) .with(EVENT_PUBLICATION_NODE))); private static final Lazy COMPLETE_IN_ARCHIVE_BY_EVENT_AND_LISTENER_ID_STATEMENT = Lazy .of(() -> applyProperties( - match(EVENT_PUBLICATION_NODE) - .where(EVENT_PUBLICATION_NODE.property(EVENT_HASH).eq(parameter(EVENT_HASH))) + match(EVENT_PUBLICATION_NODE).where(EVENT_PUBLICATION_NODE.property(EVENT_HASH).eq(parameter(EVENT_HASH))) .and(EVENT_PUBLICATION_NODE.property(LISTENER_ID).eq(parameter(LISTENER_ID))) .and(not(exists(match(EVENT_PUBLICATION_ARCHIVE_NODE) .where(EVENT_PUBLICATION_ARCHIVE_NODE.property(EVENT_HASH).eq(parameter(EVENT_HASH))) @@ -153,30 +161,22 @@ class Neo4jEventPublicationRepository implements EventPublicationRepository { private static Statement applyProperties(OrderableOngoingReadingAndWithWithoutWhere source) { var operations = ALL_PROPERTIES.stream() - .map(it -> EVENT_PUBLICATION_ARCHIVE_NODE.property(it).to(EVENT_PUBLICATION_NODE.property(it))) - .toList(); + .map(it -> EVENT_PUBLICATION_ARCHIVE_NODE.property(it).to(EVENT_PUBLICATION_NODE.property(it))).toList(); - return source.create(EVENT_PUBLICATION_ARCHIVE_NODE) - .set(operations) - .set(EVENT_PUBLICATION_ARCHIVE_NODE.property(COMPLETION_DATE).to(parameter(COMPLETION_DATE))) - .build(); + return source.create(EVENT_PUBLICATION_ARCHIVE_NODE).set(operations) + .set(EVENT_PUBLICATION_ARCHIVE_NODE.property(COMPLETION_DATE).to(parameter(COMPLETION_DATE))).build(); } private static final Function COMPLETE_BY_ID_STATEMENT = node -> match(node) - .where(node.property(ID).eq(parameter(ID))) - .set(node.property(COMPLETION_DATE).to(parameter(COMPLETION_DATE))) + .where(node.property(ID).eq(parameter(ID))).set(node.property(COMPLETION_DATE).to(parameter(COMPLETION_DATE))) .build(); private static final ResultStatement INCOMPLETE_STATEMENT = match(EVENT_PUBLICATION_NODE) - .where(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).isNull()) - .returning(EVENT_PUBLICATION_NODE) - .orderBy(EVENT_PUBLICATION_NODE.property(PUBLICATION_DATE)) - .build(); + .where(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).isNull()).returning(EVENT_PUBLICATION_NODE) + .orderBy(EVENT_PUBLICATION_NODE.property(PUBLICATION_DATE)).build(); private static final Function ALL_COMPLETED_STATEMENT = node -> match(node) - .where(node.property(COMPLETION_DATE).isNotNull()) - .returning(node) - .orderBy(node.property(PUBLICATION_DATE)) + .where(node.property(COMPLETION_DATE).isNotNull()).returning(node).orderBy(node.property(PUBLICATION_DATE)) .build(); private final Neo4jClient neo4jClient; @@ -203,8 +203,7 @@ private static Statement applyProperties(OrderableOngoingReadingAndWithWithoutWh this.eventSerializer = eventSerializer; this.completionMode = completionMode; - this.completedNode = completionMode == CompletionMode.ARCHIVE - ? EVENT_PUBLICATION_ARCHIVE_NODE + this.completedNode = completionMode == CompletionMode.ARCHIVE ? EVENT_PUBLICATION_ARCHIVE_NODE : EVENT_PUBLICATION_NODE; this.deleteCompletedStatement = DELETE_COMPLETED_STATEMENT.apply(completedNode); @@ -231,14 +230,9 @@ public TargetEventPublication create(TargetEventPublication publication) { var eventHash = DigestUtils.md5DigestAsHex(eventSerialized.getBytes()); neo4jClient.query(renderer.render(CREATE_STATEMENT)) - .bindAll(Map.of( - ID, Values.value(identifier.toString()), - EVENT_SERIALIZED, eventSerialized, - EVENT_HASH, eventHash, - EVENT_TYPE, eventType, - LISTENER_ID, listenerId, - PUBLICATION_DATE, Values.value(publicationDate.atOffset(ZoneOffset.UTC)), - COMPLETION_DATE, Values.NULL)) + .bindAll(Map.of(ID, Values.value(identifier.toString()), EVENT_SERIALIZED, eventSerialized, EVENT_HASH, + eventHash, EVENT_TYPE, eventType, LISTENER_ID, listenerId, PUBLICATION_DATE, + Values.value(publicationDate.atOffset(ZoneOffset.UTC)), STATUS, publication.getStatus().name())) .run(); return publication; @@ -256,31 +250,22 @@ public void markCompleted(Object event, PublicationTargetIdentifier identifier, if (completionMode == CompletionMode.DELETE) { - neo4jClient.query(renderer.render(DELETE_BY_EVENT_AND_LISTENER_ID)) - .bind(eventHash).to(EVENT_HASH) - .bind(identifier.getValue()).to(LISTENER_ID) - .run(); + neo4jClient.query(renderer.render(DELETE_BY_EVENT_AND_LISTENER_ID)).bind(eventHash).to(EVENT_HASH) + .bind(identifier.getValue()).to(LISTENER_ID).run(); } else if (completionMode == CompletionMode.ARCHIVE) { - neo4jClient.query(renderer.render(COMPLETE_IN_ARCHIVE_BY_EVENT_AND_LISTENER_ID_STATEMENT.get())) - .bind(eventHash).to(EVENT_HASH) - .bind(identifier.getValue()).to(LISTENER_ID) - .bind(Values.value(completionDate.atOffset(ZoneOffset.UTC))).to(COMPLETION_DATE) - .run(); + neo4jClient.query(renderer.render(COMPLETE_IN_ARCHIVE_BY_EVENT_AND_LISTENER_ID_STATEMENT.get())).bind(eventHash) + .to(EVENT_HASH).bind(identifier.getValue()).to(LISTENER_ID) + .bind(Values.value(completionDate.atOffset(ZoneOffset.UTC))).to(COMPLETION_DATE).run(); - neo4jClient.query(renderer.render(DELETE_BY_EVENT_AND_LISTENER_ID)) - .bind(eventHash).to(EVENT_HASH) - .bind(identifier.getValue()).to(LISTENER_ID) - .run(); + neo4jClient.query(renderer.render(DELETE_BY_EVENT_AND_LISTENER_ID)).bind(eventHash).to(EVENT_HASH) + .bind(identifier.getValue()).to(LISTENER_ID).run(); } else { - neo4jClient.query(renderer.render(COMPLETE_STATEMENT)) - .bind(eventHash).to(EVENT_HASH) - .bind(identifier.getValue()).to(LISTENER_ID) - .bind(Values.value(completionDate.atOffset(ZoneOffset.UTC))).to(COMPLETION_DATE) - .run(); + neo4jClient.query(renderer.render(COMPLETE_STATEMENT)).bind(eventHash).to(EVENT_HASH).bind(identifier.getValue()) + .to(LISTENER_ID).bind(Values.value(completionDate.atOffset(ZoneOffset.UTC))).to(COMPLETION_DATE).run(); } } @@ -299,18 +284,15 @@ public void markCompleted(UUID identifier, Instant completionDate) { } else if (completionMode == CompletionMode.ARCHIVE) { neo4jClient.query(renderer.render(COMPLETE_IN_ARCHIVE_BY_ID_STATEMENT.get())) - .bind(Values.value(identifier.toString())).to(ID) - .bind(Values.value(completionDate.atOffset(ZoneOffset.UTC))).to(COMPLETION_DATE) - .run(); + .bind(Values.value(identifier.toString())).to(ID).bind(Values.value(completionDate.atOffset(ZoneOffset.UTC))) + .to(COMPLETION_DATE).run(); deletePublications(List.of(identifier)); } else { - neo4jClient.query(renderer.render(completedByIdStatement)) - .bind(Values.value(identifier.toString())).to(ID) - .bind(Values.value(completionDate.atOffset(ZoneOffset.UTC))).to(COMPLETION_DATE) - .run(); + neo4jClient.query(renderer.render(completedByIdStatement)).bind(Values.value(identifier.toString())).to(ID) + .bind(Values.value(completionDate.atOffset(ZoneOffset.UTC))).to(COMPLETION_DATE).run(); } } @@ -323,10 +305,8 @@ public void markCompleted(UUID identifier, Instant completionDate) { @Transactional(readOnly = true) public List findIncompletePublications() { - return List.copyOf(neo4jClient.query(renderer.render(INCOMPLETE_STATEMENT)) - .fetchAs(TargetEventPublication.class) - .mappedBy(incompleteMapping()) - .all()); + return List.copyOf(neo4jClient.query(renderer.render(INCOMPLETE_STATEMENT)).fetchAs(TargetEventPublication.class) + .mappedBy(incompleteMapping()).all()); } /* @@ -338,10 +318,8 @@ public List findIncompletePublications() { public List findIncompletePublicationsPublishedBefore(Instant instant) { return List.copyOf(neo4jClient.query(renderer.render(INCOMPLETE_PUBLISHED_BEFORE_STATEMENT)) - .bind(Values.value(instant.atOffset(ZoneOffset.UTC))).to(PUBLICATION_DATE) - .fetchAs(TargetEventPublication.class) - .mappedBy(incompleteMapping()) - .all()); + .bind(Values.value(instant.atOffset(ZoneOffset.UTC))).to(PUBLICATION_DATE).fetchAs(TargetEventPublication.class) + .mappedBy(incompleteMapping()).all()); } /* @@ -357,10 +335,8 @@ public Optional findIncompletePublicationsByEventAndTarg var listenerId = targetIdentifier.getValue(); return neo4jClient.query(renderer.render(INCOMPLETE_BY_EVENT_AND_TARGET_IDENTIFIER_STATEMENT)) - .bindAll(Map.of(EVENT_HASH, eventHash, LISTENER_ID, listenerId)) - .fetchAs(TargetEventPublication.class) - .mappedBy(incompleteMapping()) - .one(); + .bindAll(Map.of(EVENT_HASH, eventHash, LISTENER_ID, listenerId)).fetchAs(TargetEventPublication.class) + .mappedBy(incompleteMapping()).one(); } /* @@ -371,9 +347,7 @@ public Optional findIncompletePublicationsByEventAndTarg public List findCompletedPublications() { return new ArrayList<>(neo4jClient.query(renderer.render(allCompletedStatement)) - .fetchAs(TargetEventPublication.class) - .mappedBy(completeMapping()) - .all()); + .fetchAs(TargetEventPublication.class).mappedBy(completeMapping()).all()); } /* @@ -384,9 +358,8 @@ public List findCompletedPublications() { @Transactional public void deletePublications(List identifiers) { - neo4jClient.query(renderer.render(DELETE_BY_ID_STATEMENT)) - .bind(identifiers.stream().map(UUID::toString).toList()).to(ID) - .run(); + neo4jClient.query(renderer.render(DELETE_BY_ID_STATEMENT)).bind(identifiers.stream().map(UUID::toString).toList()) + .to(ID).run(); } /* @@ -408,8 +381,65 @@ public void deleteCompletedPublications() { public void deleteCompletedPublicationsBefore(Instant instant) { neo4jClient.query(renderer.render(deleteCompletedBeforeStatement)) - .bind(Values.value(instant.atOffset(ZoneOffset.UTC))).to(PUBLICATION_DATE) - .run(); + .bind(Values.value(instant.atOffset(ZoneOffset.UTC))).to(PUBLICATION_DATE).run(); + } + + @Override + public int countByStatus(EventPublication.Status status) { + return neo4jClient.query(renderer.render(COUNT_BY_STATUS_STATEMENT)).bind(status.name()).to(STATUS) + .fetchAs(Integer.class).one().get(); + } + + @Override + public List findByStatus(EventPublication.Status status) { + return EventPublicationRepository.super.findByStatus(status); + } + + @Override + public List findFailedPublications(FailedCriteria criteria) { + Map parameters = new HashMap<>(); + // in place CypherDSL usage because of conditional + var match = match(EVENT_PUBLICATION_NODE) + .where(EVENT_PUBLICATION_NODE.property(STATUS).eq(literalOf(EventPublication.Status.FAILED.name()))) + .or(EVENT_PUBLICATION_NODE.property(STATUS).isNull() + .and(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).isNull())); + var instant = criteria.getPublicationDateReference(); + if (instant != null) { + match = match.and(EVENT_PUBLICATION_NODE.property(PUBLICATION_DATE).lt(parameter(PUBLICATION_DATE))); + parameters.put(PUBLICATION_DATE, Values.value(criteria.getPublicationDateReference().atOffset(ZoneOffset.UTC))); + } + + var limit = criteria.getMaxItemsToRead(); + Statement statement = null; + if (limit != -1) { + statement = match.returning(EVENT_PUBLICATION_NODE).orderBy(EVENT_PUBLICATION_NODE.property(PUBLICATION_DATE)) + .ascending().limit(limit).build(); + } else { + statement = match.returning(EVENT_PUBLICATION_NODE).orderBy(EVENT_PUBLICATION_NODE.property(PUBLICATION_DATE)) + .ascending().build(); + } + return List.copyOf(neo4jClient.query(renderer.render(statement)).bindAll(parameters) + .fetchAs(TargetEventPublication.class).mappedBy(incompleteMapping()).all()); + } + + @Override + public void markFailed(UUID identifier) { + neo4jClient.query(renderer.render(UPDATE_STATUS_STATEMENT)).bind(Values.value(identifier.toString())).to(ID) + .bind(EventPublication.Status.FAILED.name()).to(STATUS).run(); + } + + @Override + public void markProcessing(UUID identifier) { + neo4jClient.query(renderer.render(UPDATE_STATUS_STATEMENT)).bind(Values.value(identifier.toString())).to(ID) + .bind(EventPublication.Status.PROCESSING.name()).to(STATUS).run(); + } + + @Override + public boolean markResubmitted(UUID identifier, Instant resubmissionDate) { + var update = neo4jClient.query(renderer.render(RESUBMIT_STATEMENT)).bind(Values.value(identifier.toString())).to(ID) + .bind(EventPublication.Status.RESUBMITTED.name()).to(STATUS) + .bind(Values.value(resubmissionDate.atOffset(ZoneOffset.UTC))).to(LAST_RESUBMISSION_DATE).run(); + return update.counters().propertiesSet() > 1; } private BiFunction incompleteMapping() { @@ -431,12 +461,17 @@ private Neo4jEventPublicationAdapter mapRecordToPublication(TypeSystem typeSyste var eventHash = publicationNode.get(EVENT_HASH).asString(); var eventType = publicationNode.get(EVENT_TYPE).asString(); var completionDate = publicationNode.get(COMPLETION_DATE); + var status = publicationNode.get(STATUS).asString(); + var completionAttempts = publicationNode.get(COMPLETION_ATTEMPTS); + var lastResubmissionDate = publicationNode.get(LAST_RESUBMISSION_DATE); try { var event = eventSerializer.deserialize(eventSerialized, Class.forName(eventType)); - var publication = new Neo4jEventPublication(identifier, publicationDate, listenerId, event, - eventHash, completionDate.isNull() ? null : completionDate.asZonedDateTime().toInstant()); + var publication = new Neo4jEventPublication(identifier, publicationDate, listenerId, event, eventHash, + completionDate.isNull() ? null : completionDate.asZonedDateTime().toInstant(), + EventPublication.Status.valueOf(status), completionAttempts != Values.NULL ? completionAttempts.asInt() : 0, + lastResubmissionDate.isNull() ? null : lastResubmissionDate.asZonedDateTime().toInstant()); return new Neo4jEventPublicationAdapter(publication); diff --git a/spring-modulith-events/spring-modulith-events-neo4j/src/test/java/org/springframework/modulith/events/neo4j/Neo4jEventPublicationRepositoryTest.java b/spring-modulith-events/spring-modulith-events-neo4j/src/test/java/org/springframework/modulith/events/neo4j/Neo4jEventPublicationRepositoryTest.java index 4a37b4b7c..1150a3dcc 100644 --- a/spring-modulith-events/spring-modulith-events-neo4j/src/test/java/org/springframework/modulith/events/neo4j/Neo4jEventPublicationRepositoryTest.java +++ b/spring-modulith-events/spring-modulith-events-neo4j/src/test/java/org/springframework/modulith/events/neo4j/Neo4jEventPublicationRepositoryTest.java @@ -37,6 +37,8 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.core.env.Environment; +import org.springframework.modulith.events.EventPublication; +import org.springframework.modulith.events.core.EventPublicationRepository; import org.springframework.modulith.events.core.EventSerializer; import org.springframework.modulith.events.core.PublicationTargetIdentifier; import org.springframework.modulith.events.core.TargetEventPublication; @@ -97,8 +99,7 @@ void createEventPublication() { try (var session = driver.session()) { - var result = session.run("MATCH (p:Neo4jEventPublication) return p") - .single(); + var result = session.run("MATCH (p:Neo4jEventPublication) return p").single(); var neo4jEventPublicationNode = result.get("p").asNode(); @@ -132,8 +133,7 @@ void updateEventPublication() { var now = Instant.now(); repository.markCompleted(event1, now); - assertThat(repository.findIncompletePublications()).hasSize(1) - .element(0) + assertThat(repository.findIncompletePublications()).hasSize(1).element(0) .extracting(TargetEventPublication::getEvent).isEqualTo(event2.getEvent()); } @@ -151,8 +151,7 @@ void findInCompletePastPublications() { var newer = Instant.now().plus(1L, ChronoUnit.MINUTES); var older = Instant.now().minus(1L, ChronoUnit.MINUTES); - assertThat(repository.findIncompletePublicationsPublishedBefore(newer)).hasSize(1) - .element(0) + assertThat(repository.findIncompletePublicationsPublishedBefore(newer)).hasSize(1).element(0) .extracting(TargetEventPublication::getEvent).isEqualTo(event.getEvent()); assertThat(repository.findIncompletePublicationsPublishedBefore(older)).hasSize(0); @@ -171,7 +170,7 @@ void findIncompleteByEventAndTargetIdentifier() { assertThat( repository.findIncompletePublicationsByEventAndTargetIdentifier(testEvent, event.getTargetIdentifier())) - .isPresent(); + .isPresent(); } @Test @@ -266,11 +265,8 @@ void findsCompletedPublications() { } else { - assertThat(repository.findCompletedPublications()) - .hasSize(1) - .element(0) - .extracting(TargetEventPublication::getEvent) - .isEqualTo(event); + assertThat(repository.findCompletedPublications()).hasSize(1).element(0) + .extracting(TargetEventPublication::getEvent).isEqualTo(event); } } @@ -289,10 +285,98 @@ void marksPublicationAsCompletedById() { } else { - assertThat(repository.findCompletedPublications()) - .extracting(TargetEventPublication::getIdentifier) + assertThat(repository.findCompletedPublications()).extracting(TargetEventPublication::getIdentifier) .containsExactly(publication.getIdentifier()); } + + } + + @Test // GH-1337 + void countsByStatus() { + + var event = new TestEvent("first"); + var publication = createPublication(event); + + assertOneByStatus(EventPublication.Status.PUBLISHED); + + repository.markFailed(publication.getIdentifier()); + assertOneByStatus(EventPublication.Status.FAILED); + + var resubmitted = repository.markResubmitted(publication.getIdentifier(), Instant.now()); + assertThat(resubmitted).isTrue(); + assertOneByStatus(EventPublication.Status.RESUBMITTED); + + } + + @Test // GH-1337 + void looksUpFailedPublication() { + + var event = new TestEvent("first"); + var publication = createPublication(event); + + repository.markFailed(publication.getIdentifier()); + + assertThat(repository.findFailedPublications(EventPublicationRepository.FailedCriteria.ALL)) + .extracting(TargetEventPublication::getIdentifier).containsExactly(publication.getIdentifier()); + } + + @Test // GH-1337 + void claimsResubmissionOnce() { + + var event = new TestEvent("first"); + var publication = createPublication(event); + + repository.markFailed(publication.getIdentifier()); + + var now = Instant.now(); + + assertThat(repository.markResubmitted(publication.getIdentifier(), now)).isTrue(); + assertThat(repository.markResubmitted(publication.getIdentifier(), now)).isFalse(); + } + + @Test // GH-1337 + void marksPublicationAsProcessing() { + + var event = new TestEvent("first"); + var publication = createPublication(event); + + repository.markProcessing(publication.getIdentifier()); + } + + @Test // GH-1337 + void looksUpFailedPublicationInBatch() { + + var event = new TestEvent("first"); + var publication = createPublication(event); + + repository.markFailed(publication.getIdentifier()); + + assertThat(repository.findFailedPublications(EventPublicationRepository.FailedCriteria.ALL.withItemsToRead(10))) + .extracting(TargetEventPublication::getIdentifier).containsExactly(publication.getIdentifier()); + } + + @Test // GH-1337 + void looksUpFailedPublicationWithReferenceDate() throws Exception { + + var event = new TestEvent("first"); + var publication = createPublication(event); + + repository.markFailed(publication.getIdentifier()); + + Thread.sleep(200); + + var criteria = EventPublicationRepository.FailedCriteria.ALL + .withPublicationsPublishedBefore(publication.getPublicationDate().plusMillis(50)); + + assertThat(repository.findFailedPublications(criteria)).extracting(TargetEventPublication::getIdentifier) + .containsExactly(publication.getIdentifier()); + } + + private void assertOneByStatus(EventPublication.Status reference) { + + for (var status : EventPublication.Status.values()) { + assertThat(repository.countByStatus(status)).isEqualTo(status == reference ? 1 : 0); + } } private TargetEventPublication createPublication(Object event) { @@ -318,7 +402,8 @@ class WithDeleteCompletionTest extends TestBase {} @TestPropertySource(properties = CompletionMode.PROPERTY + "=ARCHIVE") class WithArchiveCompletionTest extends TestBase {} - private record TestEvent(String eventId) {} + private record TestEvent(String eventId) { + } @Import({ TestApplication.class }) @Configuration diff --git a/spring-modulith-events/spring-modulith-events-neo4j/src/test/resources/logback.xml b/spring-modulith-events/spring-modulith-events-neo4j/src/test/resources/logback.xml new file mode 100644 index 000000000..1c38c4767 --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-neo4j/src/test/resources/logback.xml @@ -0,0 +1,16 @@ + + + + + + %d %5p %40.40c:%4L - %m%n + + + + + + + + + +