Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions rqueue-core/src/main/java/com/github/sonus21/rqueue/core/Job.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
/*
* Copyright 2021 Sonu Kumar
* Copyright (c) 2021-2023 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License");
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*
*/

Expand All @@ -27,10 +27,10 @@
/**
* On each execution Rqueue creates a job to track it's status and execution progress.
*
* <p>A job belongs to a single message poll, each message listener call creates an execution {@link
* com.github.sonus21.rqueue.models.db.Execution} and that has a detail for specific execution.
* Overall job status can be found using this job interface. This object is available via {@link
* org.springframework.messaging.handler.annotation.Header} in listener method.
* <p>A job belongs to a single message poll, each message listener call creates an execution
* {@link com.github.sonus21.rqueue.models.db.Execution} and that has a detail for specific
* execution. Overall job status can be found using this job interface. This object is available via
* {@link org.springframework.messaging.handler.annotation.Header} in listener method.
*/
public interface Job {

Expand Down Expand Up @@ -81,7 +81,7 @@ public interface Job {
* return zero value
*
* @return remaining duration that this job can take, otherwise other listener will consume this
* message
* message
*/
Duration getVisibilityTimeout();

Expand Down Expand Up @@ -181,8 +181,8 @@ public interface Job {
* Release this job back to the queue, the released job would be available for re-execution after
* the duration time.
*
* @param status job status
* @param why why do want to release this job
* @param status job status
* @param why why do want to release this job
* @param duration any positive duration
*/
void release(JobStatus status, Serializable why, Duration duration);
Expand All @@ -191,15 +191,15 @@ public interface Job {
* Release this job back to queue, this job available for execution after one second.
*
* @param status what should be the job status
* @param why why do you want to delete this job
* @param why why do you want to delete this job
*/
void release(JobStatus status, Serializable why);

/**
* Delete this job
*
* @param status what should be the job status
* @param why why do you want to delete this job
* @param why why do you want to delete this job
*/
void delete(JobStatus status, Serializable why);

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
/*
* Copyright 2021 Sonu Kumar
* Copyright (c) 2020-2023 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License");
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*
*/

package com.github.sonus21.rqueue.core;

import com.github.sonus21.rqueue.config.RqueueConfig;
import com.github.sonus21.rqueue.utils.Constants;
import com.github.sonus21.rqueue.utils.PriorityUtils;
import java.util.List;
import org.springframework.messaging.converter.MessageConverter;
Expand All @@ -43,7 +44,7 @@ public interface RqueueMessageManager {
* Delete all message for the given that has some priority like high,medium and low
*
* @param queueName queue name
* @param priority the priority for the queue
* @param priority the priority for the queue
* @return fail/success
*/
default boolean deleteAllMessages(String queueName, String priority) {
Expand All @@ -65,7 +66,7 @@ default boolean deleteAllMessages(String queueName, String priority) {
* method {@link #getAllMessages(String)}
*
* @param queueName queue name to be query for
* @param priority the priority of the queue
* @param priority the priority of the queue
* @return list of enqueued messages.
*/
default List<Object> getAllMessages(String queueName, String priority) {
Expand All @@ -77,7 +78,7 @@ default List<Object> getAllMessages(String queueName, String priority) {
* consumption message has a fixed lifetime.
*
* @param queueName queue name on which message was enqueued
* @param id message id
* @param id message id
* @return the enqueued message, it could be null if message is not found or it's deleted.
* @see RqueueConfig
*/
Expand All @@ -88,8 +89,8 @@ default List<Object> getAllMessages(String queueName, String priority) {
* priority queue.
*
* @param queueName queue name on which message was enqueued
* @param priority the priority of the queue
* @param id message id
* @param priority the priority of the queue
* @param id message id
* @return the enqueued message, it could be null if message is not found or it's deleted.
*/
default Object getMessage(String queueName, String priority, String id) {
Expand All @@ -101,17 +102,18 @@ default Object getMessage(String queueName, String priority, String id) {
* returns true/false.
*
* @param queueName queue name on which message was enqueued
* @param id message id
* @param id message id
* @return whether the message exist or not
*/
boolean exist(String queueName, String id);

/**
* Extension to the method {@link #exist(String, String)}, that checks message for priority queue.
* Extension to the method {@link #exist(String, String)}, that checks message for priority
* queue.
*
* @param queueName queue name on which message was enqueued
* @param priority priority of the given queue
* @param id message id
* @param priority priority of the given queue
* @param id message id
* @return whether the message exist or not
*/
default boolean exist(String queueName, String priority, String id) {
Expand All @@ -122,7 +124,7 @@ default boolean exist(String queueName, String priority, String id) {
* Extension to the method {@link #getMessage(String, String)}, this returns internal message.
*
* @param queueName queue name on which message was enqueued
* @param id message id
* @param id message id
* @return the enqueued message
*/
RqueueMessage getRqueueMessage(String queueName, String id);
Expand All @@ -131,8 +133,8 @@ default boolean exist(String queueName, String priority, String id) {
* Extension to the method {@link #getRqueueMessage(String, String)}
*
* @param queueName queue name on which message was enqueued
* @param priority the priority of the queue
* @param id message id
* @param priority the priority of the queue
* @param id message id
* @return the enqueued message
*/
default RqueueMessage getRqueueMessage(String queueName, String priority, String id) {
Expand All @@ -151,7 +153,7 @@ default RqueueMessage getRqueueMessage(String queueName, String priority, String
* Extension to the method {@link #getAllRqueueMessage(String)}
*
* @param queueName queue name on which message was enqueued
* @param priority the priority of the queue
* @param priority the priority of the queue
* @return the enqueued message
*/
default List<RqueueMessage> getAllRqueueMessage(String queueName, String priority) {
Expand All @@ -171,7 +173,7 @@ default List<RqueueMessage> getAllRqueueMessage(String queueName, String priorit
* Delete a message that's enqueued to a queue with some priority
*
* @param queueName queue on which message was enqueued
* @param priority priority of the message like high/low/medium
* @param priority priority of the message like high/low/medium
* @param messageId messageId corresponding to this message
* @return success/failure
*/
Expand All @@ -185,4 +187,27 @@ default boolean deleteMessage(String queueName, String priority, String messageI
* @return message converter that's used for message (de)serialization
*/
MessageConverter getMessageConverter();

/**
* Move messages from Dead Letter queue to the destination queue. This push the messages at the
* FRONT of destination queue, so that it can be reprocessed as soon as possible.
*
* @param deadLetterQueueName dead letter queue name
* @param queueName queue name
* @param maxMessages number of messages to be moved by default move
* {@link Constants#MAX_MESSAGES} messages
* @return success or failure.
*/
boolean moveMessageFromDeadLetterToQueue(
String deadLetterQueueName, String queueName, Integer maxMessages);

/**
* A shortcut to the method {@link #moveMessageFromDeadLetterToQueue(String, String, Integer)}
*
* @param deadLetterQueueName dead letter queue name
* @param queueName queue name
* @return success or failure
*/
boolean moveMessageFromDeadLetterToQueue(String deadLetterQueueName, String queueName);

}
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
/*
* Copyright 2022 Sonu Kumar
* Copyright (c) 2020-2025 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License");
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*
*/

Expand All @@ -29,6 +29,7 @@
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
import com.github.sonus21.rqueue.core.impl.MessageSweeper.MessageDeleteRequest;
import com.github.sonus21.rqueue.dao.RqueueStringDao;
import com.github.sonus21.rqueue.exception.DuplicateMessageException;
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.models.db.MessageMetadata;
import com.github.sonus21.rqueue.models.enums.MessageStatus;
Expand Down Expand Up @@ -66,13 +67,13 @@ abstract class BaseMessageSender {
}

protected Object storeMessageMetadata(
RqueueMessage rqueueMessage, Long delayInMillis, boolean reactive) {
RqueueMessage rqueueMessage, Long delayInMillis, boolean reactive, boolean isUnique) {
MessageMetadata messageMetadata = new MessageMetadata(rqueueMessage, MessageStatus.ENQUEUED);
Duration duration = rqueueConfig.getMessageDurability(delayInMillis);
if (reactive) {
return rqueueMessageMetadataService.saveReactive(messageMetadata, duration);
return rqueueMessageMetadataService.saveReactive(messageMetadata, duration, isUnique);
} else {
rqueueMessageMetadataService.save(messageMetadata, duration);
rqueueMessageMetadataService.save(messageMetadata, duration, isUnique);
}
return null;
}
Expand Down Expand Up @@ -109,7 +110,8 @@ protected String pushMessage(
String messageId,
Object message,
Integer retryCount,
Long delayInMilliSecs) {
Long delayInMilliSecs,
boolean isUnique) {
QueueDetail queueDetail = EndpointRegistry.get(queueName);
RqueueMessage rqueueMessage =
buildMessage(
Expand All @@ -121,17 +123,26 @@ protected String pushMessage(
delayInMilliSecs,
messageHeaders);
try {
storeMessageMetadata(rqueueMessage, delayInMilliSecs, false, isUnique);
enqueue(queueDetail, rqueueMessage, delayInMilliSecs, false);
storeMessageMetadata(rqueueMessage, delayInMilliSecs, false);
} catch (DuplicateMessageException e) {
log.warn(
"Duplicate message enqueue attempted queue: {}, messageId: {}",
queueName,
rqueueMessage.getId());
return null;
} catch (Exception e) {
log.error("Queue: {} Message {} could not be pushed {}", queueName, rqueueMessage, e);
log.error("Queue: {} Message {} could not be pushed", queueName, rqueueMessage.getId(), e);
return null;
}
return rqueueMessage.getId();
}

protected String pushPeriodicMessage(
String queueName, String messageId, Object message, long periodInMilliSeconds) {
String queueName,
String messageId,
Object message,
long periodInMilliSeconds) {
QueueDetail queueDetail = EndpointRegistry.get(queueName);
RqueueMessage rqueueMessage =
buildPeriodicMessage(
Expand All @@ -143,13 +154,13 @@ protected String pushPeriodicMessage(
periodInMilliSeconds,
messageHeaders);
try {
storeMessageMetadata(rqueueMessage, periodInMilliSeconds, false, false);
enqueue(queueDetail, rqueueMessage, periodInMilliSeconds, false);
storeMessageMetadata(rqueueMessage, periodInMilliSeconds, false);
return rqueueMessage.getId();
} catch (Exception e) {
log.error("Queue: {} Message {} could not be pushed {}", queueName, rqueueMessage, e);
log.error("Queue: {} Message {} could not be pushed", queueName, rqueueMessage, e);
return null;
}
return rqueueMessage.getId();
}

protected Object deleteAllMessages(QueueDetail queueDetail) {
Expand Down
Loading