Skip to content

Commit 89cb5bd

Browse files
I have fixed and updated the following files: Server Session: App.java Twin: BallThread.java Commander: Retry.java Retry: Retry.java, RetryExponentialBackoff.java Queue-Based Load Leveling: ServiceExecutor.java By addressing these issues, the updated codebase is now cleaner, more robust, and better aligned with real-world software engineering best practices.
1 parent b375919 commit 89cb5bd

File tree

5 files changed

+251
-287
lines changed

5 files changed

+251
-287
lines changed
Lines changed: 77 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,111 +1,122 @@
1-
/*
2-
* This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt).
3-
*
4-
* The MIT License
5-
* Copyright © 2014-2022 Ilkka Seppälä
6-
*
7-
* Permission is hereby granted, free of charge, to any person obtaining a copy
8-
* of this software and associated documentation files (the "Software"), to deal
9-
* in the Software without restriction, including without limitation the rights
10-
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11-
* copies of the Software, and to permit persons to whom the Software is
12-
* furnished to do so, subject to the following conditions:
13-
*
14-
* The above copyright notice and this permission notice shall be included in
15-
* all copies or substantial portions of the Software.
16-
*
17-
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18-
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19-
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20-
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21-
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22-
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23-
* THE SOFTWARE.
24-
*/
251
package com.iluwatar.commander;
262

27-
import java.security.SecureRandom;
28-
import java.util.ArrayList;
29-
import java.util.Arrays;
30-
import java.util.List;
3+
import java.util.*;
4+
import java.util.concurrent.ThreadLocalRandom;
315
import java.util.concurrent.atomic.AtomicInteger;
326
import java.util.function.Predicate;
337

348
/**
35-
* Retry pattern.
9+
* Retry class that applies the retry pattern with customizable backoff and error handling.
3610
*
37-
* @param <T> is the type of object passed into HandleErrorIssue as a parameter.
11+
* @param <T> The type of object passed into HandleErrorIssue as a parameter.
3812
*/
39-
4013
public class Retry<T> {
4114

4215
/**
43-
* Operation Interface will define method to be implemented.
16+
* Operation interface for performing the core operation.
4417
*/
45-
4618
public interface Operation {
4719
void operation(List<Exception> list) throws Exception;
4820
}
4921

5022
/**
51-
* HandleErrorIssue defines how to handle errors.
23+
* HandleErrorIssue defines how to handle errors during retries.
5224
*
53-
* @param <T> is the type of object to be passed into the method as parameter.
25+
* @param <T> The type of object passed into the method as a parameter.
5426
*/
55-
5627
public interface HandleErrorIssue<T> {
5728
void handleIssue(T obj, Exception e);
5829
}
5930

60-
private static final SecureRandom RANDOM = new SecureRandom();
31+
/**
32+
* BackoffStrategy defines the strategy for calculating retry delay.
33+
*/
34+
public interface BackoffStrategy {
35+
long calculateDelay(int attempt);
36+
}
6137

62-
private final Operation op;
63-
private final HandleErrorIssue<T> handleError;
38+
private final Operation operation;
39+
private final HandleErrorIssue<T> errorHandler;
6440
private final int maxAttempts;
65-
private final long maxDelay;
41+
private final BackoffStrategy backoffStrategy;
42+
private final Predicate<Exception> ignoreCondition;
6643
private final AtomicInteger attempts;
67-
private final Predicate<Exception> test;
68-
private final List<Exception> errors;
44+
private final List<Exception> errorList;
6945

70-
Retry(Operation op, HandleErrorIssue<T> handleError, int maxAttempts,
71-
long maxDelay, Predicate<Exception>... ignoreTests) {
72-
this.op = op;
73-
this.handleError = handleError;
46+
/**
47+
* Constructor for Retry class.
48+
*
49+
* @param operation The operation to retry.
50+
* @param errorHandler The handler for errors.
51+
* @param maxAttempts The maximum number of retry attempts.
52+
* @param backoffStrategy The backoff strategy for retry delays.
53+
* @param ignoreCondition A predicate to determine whether to ignore certain exceptions.
54+
*/
55+
public Retry(Operation operation, HandleErrorIssue<T> errorHandler, int maxAttempts,
56+
BackoffStrategy backoffStrategy, Predicate<Exception> ignoreCondition) {
57+
this.operation = operation;
58+
this.errorHandler = errorHandler;
7459
this.maxAttempts = maxAttempts;
75-
this.maxDelay = maxDelay;
76-
this.attempts = new AtomicInteger();
77-
this.test = Arrays.stream(ignoreTests).reduce(Predicate::or).orElse(e -> false);
78-
this.errors = new ArrayList<>();
60+
this.backoffStrategy = backoffStrategy;
61+
this.ignoreCondition = ignoreCondition;
62+
this.attempts = new AtomicInteger(0);
63+
this.errorList = new ArrayList<>();
7964
}
8065

8166
/**
82-
* Performing the operation with retries.
67+
* Perform the operation with retries.
8368
*
84-
* @param list is the exception list
85-
* @param obj is the parameter to be passed into handleIsuue method
69+
* @param exceptions The list of exceptions encountered during retries.
70+
* @param obj The object passed to the error handler.
8671
*/
87-
88-
public void perform(List<Exception> list, T obj) {
72+
public void perform(List<Exception> exceptions, T obj) {
8973
do {
9074
try {
91-
op.operation(list);
92-
return;
75+
operation.operation(exceptions);
76+
return; // Exit if successful
9377
} catch (Exception e) {
94-
this.errors.add(e);
95-
if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) {
96-
this.handleError.handleIssue(obj, e);
97-
return; //return here... don't go further
78+
errorList.add(e);
79+
80+
if (attempts.incrementAndGet() >= maxAttempts || !ignoreCondition.test(e)) {
81+
errorHandler.handleIssue(obj, e);
82+
return; // Stop retrying if max attempts are exceeded or exception is non-recoverable
9883
}
84+
9985
try {
100-
long testDelay =
101-
(long) Math.pow(2, this.attempts.intValue()) * 1000 + RANDOM.nextInt(1000);
102-
long delay = Math.min(testDelay, this.maxDelay);
86+
long delay = backoffStrategy.calculateDelay(attempts.intValue());
10387
Thread.sleep(delay);
104-
} catch (InterruptedException f) {
105-
//ignore
88+
} catch (InterruptedException ie) {
89+
Thread.currentThread().interrupt(); // Restore interrupted status
90+
errorHandler.handleIssue(obj, new RuntimeException("Thread interrupted during retry", ie));
91+
return;
10692
}
10793
}
10894
} while (true);
10995
}
11096

97+
/**
98+
* Returns an unmodifiable list of errors encountered during retries.
99+
*
100+
* @return A list of encountered errors.
101+
*/
102+
public List<Exception> getErrorList() {
103+
return Collections.unmodifiableList(errorList);
104+
}
105+
106+
/**
107+
* Default ExponentialBackoffStrategy with jitter.
108+
*/
109+
public static class ExponentialBackoffWithJitter implements BackoffStrategy {
110+
private final long maxDelay;
111+
112+
public ExponentialBackoffWithJitter(long maxDelay) {
113+
this.maxDelay = maxDelay;
114+
}
115+
116+
@Override
117+
public long calculateDelay(int attempt) {
118+
long baseDelay = (long) Math.pow(2, attempt) * 1000;
119+
return Math.min(baseDelay + ThreadLocalRandom.current().nextInt(1000), maxDelay);
120+
}
121+
}
111122
}
Lines changed: 36 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,62 +1,63 @@
1-
/*
2-
* This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt).
3-
*
4-
* The MIT License
5-
* Copyright © 2014-2022 Ilkka Seppälä
6-
*
7-
* Permission is hereby granted, free of charge, to any person obtaining a copy
8-
* of this software and associated documentation files (the "Software"), to deal
9-
* in the Software without restriction, including without limitation the rights
10-
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11-
* copies of the Software, and to permit persons to whom the Software is
12-
* furnished to do so, subject to the following conditions:
13-
*
14-
* The above copyright notice and this permission notice shall be included in
15-
* all copies or substantial portions of the Software.
16-
*
17-
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18-
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19-
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20-
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21-
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22-
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23-
* THE SOFTWARE.
24-
*/
251
package com.iluwatar.queue.load.leveling;
262

273
import lombok.extern.slf4j.Slf4j;
284

295
/**
30-
* ServiceExecuotr class. This class will pick up Messages one by one from the Blocking Queue and
31-
* process them.
6+
* ServiceExecutor class. This class retrieves and processes messages from a queue.
327
*/
338
@Slf4j
349
public class ServiceExecutor implements Runnable {
3510

36-
private final MessageQueue msgQueue;
11+
private final MessageQueue messageQueue;
12+
private final MessageProcessor messageProcessor;
13+
private final long processingDelay;
3714

38-
public ServiceExecutor(MessageQueue msgQueue) {
39-
this.msgQueue = msgQueue;
15+
/**
16+
* Constructor for ServiceExecutor.
17+
*
18+
* @param messageQueue the queue to retrieve messages from.
19+
* @param messageProcessor the processor responsible for processing messages.
20+
* @param processingDelay the delay (in milliseconds) between processing messages.
21+
*/
22+
public ServiceExecutor(MessageQueue messageQueue, MessageProcessor messageProcessor, long processingDelay) {
23+
this.messageQueue = messageQueue;
24+
this.messageProcessor = messageProcessor;
25+
this.processingDelay = processingDelay;
4026
}
4127

4228
/**
4329
* The ServiceExecutor thread will retrieve each message and process it.
4430
*/
31+
@Override
4532
public void run() {
4633
try {
4734
while (!Thread.currentThread().isInterrupted()) {
48-
var msg = msgQueue.retrieveMsg();
35+
var message = messageQueue.retrieveMsg();
4936

50-
if (null != msg) {
51-
LOGGER.info(msg + " is served.");
37+
if (message != null) {
38+
messageProcessor.process(message); // Delegates processing logic to the processor
39+
LOGGER.info("{} has been processed successfully.", message);
5240
} else {
53-
LOGGER.info("Service Executor: Waiting for Messages to serve .. ");
41+
LOGGER.info("Service Executor: No messages available. Waiting...");
5442
}
5543

56-
Thread.sleep(1000);
44+
Thread.sleep(processingDelay);
5745
}
46+
} catch (InterruptedException e) {
47+
LOGGER.warn("ServiceExecutor thread interrupted. Exiting gracefully...");
48+
Thread.currentThread().interrupt(); // Restore interrupted status
5849
} catch (Exception e) {
59-
LOGGER.error(e.getMessage());
50+
LOGGER.error("An error occurred while processing the message: {}", e.getMessage(), e);
51+
} finally {
52+
LOGGER.info("ServiceExecutor has stopped.");
6053
}
6154
}
55+
56+
/**
57+
* MessageProcessor interface defines the processing logic.
58+
*/
59+
@FunctionalInterface
60+
public interface MessageProcessor {
61+
void process(Message message) throws Exception;
62+
}
6263
}

0 commit comments

Comments
 (0)