Skip to content

Commit f97135b

Browse files
committed
Fixed an issue where PushStream push that was in recovery does not shut down.
Signed-off-by: Phil Hunt <phil.hunt@independentid.com>
1 parent 44140ac commit f97135b

File tree

1 file changed

+22
-3
lines changed

1 file changed

+22
-3
lines changed

i2scim-signals/src/main/java/com/independentid/signals/PushStream.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.io.IOException;
1818
import java.security.Key;
1919
import java.security.PublicKey;
20+
import java.util.concurrent.atomic.AtomicBoolean;
2021

2122
public class PushStream {
2223
private final static Logger logger = LoggerFactory.getLogger(PushStream.class);
@@ -42,6 +43,8 @@ public class PushStream {
4243
@JsonIgnore
4344
CloseableHttpClient client = HttpClients.createDefault();
4445

46+
private final AtomicBoolean shuttingDown = new AtomicBoolean(false);
47+
4548
public String toString() {
4649
if (endpointUrl == null || endpointUrl.isEmpty())
4750
return "<undefined>";
@@ -59,7 +62,7 @@ public String toString() {
5962
}
6063

6164
public boolean pushEvent(SecurityEventToken event) {
62-
if (this.errorState)
65+
if (this.errorState || this.shuttingDown.get())
6366
return false;
6467
if (this.aud != null)
6568
event.setAud(this.aud);
@@ -68,7 +71,7 @@ public boolean pushEvent(SecurityEventToken event) {
6871
if (this.endpointUrl.equals("NONE")) {
6972
logger.error("Push endpoint is not yet set. Waiting...");
7073
int i = 0;
71-
while (this.endpointUrl.equals("NONE")) {
74+
while (this.endpointUrl.equals("NONE") && !this.shuttingDown.get()) {
7275
i++;
7376
try {
7477
Thread.sleep(1000);
@@ -82,6 +85,10 @@ public boolean pushEvent(SecurityEventToken event) {
8285
i = 0;
8386
}
8487
}
88+
if (this.shuttingDown.get()) {
89+
logger.info("Push stream shutting down, aborting event push");
90+
return false;
91+
}
8592
logger.info("SET Push endpoint set to: " + this.endpointUrl);
8693
}
8794

@@ -97,7 +104,7 @@ public boolean pushEvent(SecurityEventToken event) {
97104
int attempt = 0;
98105
long delay = this.initialDelay;
99106

100-
while (attempt <= this.maxRetries) {
107+
while (attempt <= this.maxRetries && !this.shuttingDown.get()) {
101108
try {
102109
if (attempt > 0)
103110
logger.info("Pushing event to " + this.endpointUrl + " (Attempt " + (attempt + 1) + ")");
@@ -135,6 +142,10 @@ public boolean pushEvent(SecurityEventToken event) {
135142
}
136143
}
137144
} catch (IOException e) {
145+
if (this.shuttingDown.get()) {
146+
logger.info("Push stream shutting down, aborting event push");
147+
return false;
148+
}
138149
logger.warn("Communications error while pushing event (attempt " + (attempt + 1) + "): " + e.getMessage());
139150
}
140151

@@ -145,6 +156,11 @@ public boolean pushEvent(SecurityEventToken event) {
145156
break;
146157
}
147158

159+
if (this.shuttingDown.get()) {
160+
logger.info("Push stream shutting down, aborting retry");
161+
return false;
162+
}
163+
148164
try {
149165
logger.info("Retrying in " + delay + "ms...");
150166
Thread.sleep(delay);
@@ -159,6 +175,9 @@ public boolean pushEvent(SecurityEventToken event) {
159175
}
160176

161177
public void Close() throws IOException {
178+
logger.info("Closing push stream...");
179+
this.shuttingDown.set(true);
180+
162181
if (this.client != null)
163182
this.client.close();
164183
}

0 commit comments

Comments
 (0)