20
20
import io .kubernetes .client .openapi .models .V1ObjectMeta ;
21
21
import io .kubernetes .client .util .CallGeneratorParams ;
22
22
import io .kubernetes .client .util .Watchable ;
23
+ import java .io .IOException ;
23
24
import java .net .ConnectException ;
24
25
import java .time .Duration ;
25
26
import java .util .List ;
@@ -88,24 +89,30 @@ public void run() {
88
89
}
89
90
while (true ) {
90
91
if (!isActive .get ()) {
91
- if (watch != null ) {
92
- watch .close ();
93
- return ;
94
- }
92
+ closeWatch ();
93
+ return ;
95
94
}
96
95
97
96
try {
98
97
if (log .isDebugEnabled ()) {
99
98
log .debug (
100
99
"{}#Start watch with resource version {}" , apiTypeClass , lastSyncResourceVersion );
101
100
}
102
- watch =
101
+ Watchable < ApiType > newWatch =
103
102
listerWatcher .watch (
104
103
new CallGeneratorParams (
105
104
Boolean .TRUE ,
106
105
lastSyncResourceVersion ,
107
106
Long .valueOf (Duration .ofMinutes (5 ).toMillis ()).intValue ()));
108
- watchHandler (watch );
107
+
108
+ synchronized (this ) {
109
+ if (!isActive .get ()) {
110
+ newWatch .close ();
111
+ continue ;
112
+ }
113
+ watch = newWatch ;
114
+ }
115
+ watchHandler (newWatch );
109
116
} catch (Throwable t ) {
110
117
if (isConnectException (t )) {
111
118
// If this is "connection refused" error, it means that most likely
@@ -132,10 +139,7 @@ public void run() {
132
139
this .exceptionHandler .accept (apiTypeClass , t );
133
140
return ;
134
141
} finally {
135
- if (watch != null ) {
136
- watch .close ();
137
- watch = null ;
138
- }
142
+ closeWatch ();
139
143
}
140
144
}
141
145
} catch (Throwable t ) {
@@ -144,7 +148,19 @@ public void run() {
144
148
}
145
149
146
150
public void stop () {
147
- isActive .set (false );
151
+ try {
152
+ isActive .set (false );
153
+ closeWatch ();
154
+ } catch (Throwable t ) {
155
+ this .exceptionHandler .accept (apiTypeClass , t );
156
+ }
157
+ }
158
+
159
+ private synchronized void closeWatch () throws IOException {
160
+ if (watch != null ) {
161
+ watch .close ();
162
+ watch = null ;
163
+ }
148
164
}
149
165
150
166
private void syncWith (List <? extends KubernetesObject > items , String resourceVersion ) {
0 commit comments