Skip to content

Commit a09edc5

Browse files
committed
thread-safety for pager
1 parent 2ba127f commit a09edc5

File tree

2 files changed

+120
-93
lines changed

2 files changed

+120
-93
lines changed

util/src/main/java/io/kubernetes/client/pager/Pager.java

Lines changed: 74 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -15,25 +15,24 @@
1515
import com.squareup.okhttp.Call;
1616
import io.kubernetes.client.ApiClient;
1717
import io.kubernetes.client.ApiException;
18-
import io.kubernetes.client.models.V1ListMeta;
1918
import io.kubernetes.client.util.Reflect;
2019
import io.kubernetes.client.util.exception.ObjectMetaReflectException;
2120
import java.io.IOException;
2221
import java.lang.reflect.Type;
2322
import java.util.Iterator;
2423
import java.util.function.Function;
2524

26-
public class Pager<ApiType, ApiListType> implements Iterable<ApiType>, Iterator<ApiType> {
27-
private String continueToken;
25+
/*
26+
* Pager encapsulates kubernetes limit/continue-based list pagination into an iterator.
27+
* Note that pager is thread-safe.
28+
*/
29+
public class Pager<ApiType, ApiListType> implements Iterable<ApiType> {
2830
private Integer limit;
2931
private ApiClient client;
30-
private Call call;
3132
private Type listType;
3233
private Function<PagerParams, Call> listFunc;
3334

3435
private ApiListType listObjectCurrentPage;
35-
private int offsetCurrentPage;
36-
private int currentPageSize;
3736

3837
/**
3938
* Pagination in kubernetes list call depends on continue and limit variable
@@ -53,88 +52,101 @@ public Pager(
5352
}
5453

5554
/**
56-
* returns false if kubernetes server has exhausted List.
57-
*
58-
* @return
59-
*/
60-
@Override
61-
public boolean hasNext() {
62-
if (call == null) {
63-
// the first time looping over the pager
64-
return Boolean.TRUE;
65-
}
66-
if (continueToken == null && offsetCurrentPage >= currentPageSize) {
67-
return Boolean.FALSE;
68-
}
69-
return Boolean.TRUE;
70-
}
71-
72-
/**
73-
* returns next chunk of List. size of list depends on limit set in constructor.
55+
* Iterator iterator.
7456
*
75-
* @return Object
57+
* @return the iterator
7658
*/
77-
@Override
78-
public ApiType next() {
79-
try {
80-
if (offsetCurrentPage >= currentPageSize) {
81-
call = getNextCall(limit);
82-
listObjectCurrentPage = executeRequest(call);
83-
84-
offsetCurrentPage = 0;
85-
currentPageSize = Reflect.<ApiType>getItems(listObjectCurrentPage).size();
86-
}
87-
return Reflect.<ApiType>getItems(listObjectCurrentPage).get(offsetCurrentPage++);
88-
} catch (ApiException e) {
89-
throw new RuntimeException(e.getResponseBody());
90-
} catch (ObjectMetaReflectException | IOException e) {
91-
throw new RuntimeException(e);
92-
}
93-
}
94-
9559
@Override
9660
public Iterator<ApiType> iterator() {
97-
this.call = null;
98-
return this;
61+
return new PagerIterator();
9962
}
10063

10164
/** returns next list call by setting continue variable and limit */
102-
private Call getNextCall(Integer nextLimit) {
103-
PagerParams params = new PagerParams((nextLimit != null) ? nextLimit : limit);
104-
if (continueToken != null) {
105-
params.continueToken = continueToken;
106-
}
65+
private Call getNextCall(Integer nextLimit, String continueToken) {
66+
PagerParams params = new PagerParams((nextLimit != null) ? nextLimit : limit, continueToken);
10767
return listFunc.apply(params);
10868
}
10969

11070
/** executes the list call and sets the continue variable for next list call */
111-
private ApiListType executeRequest(Call call)
112-
throws IOException, ApiException, ObjectMetaReflectException {
113-
ApiListType data = client.handleResponse(call.execute(), listType);
114-
V1ListMeta listMetaData = Reflect.listMetadata(data);
115-
continueToken = listMetaData.getContinue();
116-
return data;
71+
private ApiListType executeRequest(Call call) throws IOException, ApiException {
72+
return client.handleResponse(call.execute(), listType);
11773
}
11874

11975
public static class PagerParams {
120-
private Integer limit;
121-
private String continueToken;
12276

123-
public PagerParams(Integer limit) {
124-
this.limit = limit;
125-
}
77+
private String continueToken;
78+
private Integer limit;
12679

127-
public PagerParams(Integer limit, String continueToken) {
80+
private PagerParams(Integer limit, String continueToken) {
12881
this.limit = limit;
12982
this.continueToken = continueToken;
13083
}
13184

85+
/**
86+
* Gets get limit.
87+
*
88+
* @return the get limit
89+
*/
13290
public Integer getLimit() {
13391
return limit;
13492
}
13593

94+
/**
95+
* Gets get continue token.
96+
*
97+
* @return the get continue token
98+
*/
13699
public String getContinueToken() {
137100
return continueToken;
138101
}
139102
}
103+
104+
private class PagerIterator implements Iterator<ApiType> {
105+
106+
private boolean started;
107+
private String continueToken;
108+
private Call call;
109+
private int offsetCurrentPage;
110+
private int currentPageSize;
111+
112+
/**
113+
* returns false if kubernetes server has exhausted List.
114+
*
115+
* @return the boolean
116+
*/
117+
@Override
118+
public boolean hasNext() {
119+
if (!started) {
120+
started = true;
121+
return Boolean.TRUE;
122+
}
123+
return !(continueToken == null && offsetCurrentPage >= currentPageSize);
124+
}
125+
126+
/**
127+
* returns next chunk of List. size of list depends on limit set in constructor.
128+
*
129+
* @return the next Object
130+
*/
131+
@Override
132+
public ApiType next() {
133+
try {
134+
if (offsetCurrentPage >= currentPageSize) {
135+
136+
call = getNextCall(limit, continueToken);
137+
138+
listObjectCurrentPage = executeRequest(call);
139+
continueToken = Reflect.listMetadata(listObjectCurrentPage).getContinue();
140+
141+
offsetCurrentPage = 0;
142+
currentPageSize = Reflect.<ApiType>getItems(listObjectCurrentPage).size();
143+
}
144+
return Reflect.<ApiType>getItems(listObjectCurrentPage).get(offsetCurrentPage++);
145+
} catch (ApiException e) {
146+
throw new RuntimeException(e.getResponseBody());
147+
} catch (ObjectMetaReflectException | IOException e) {
148+
throw new RuntimeException(e);
149+
}
150+
}
151+
}
140152
}

util/src/test/java/io/kubernetes/client/pager/PagerTest.java

Lines changed: 46 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
2020
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
2121
import static com.github.tomakehurst.wiremock.client.WireMock.verify;
22-
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.*;
2323

2424
import com.github.tomakehurst.wiremock.junit.WireMockRule;
2525
import com.google.common.io.Resources;
@@ -31,7 +31,10 @@
3131
import java.io.IOException;
3232
import java.nio.file.Files;
3333
import java.nio.file.Paths;
34-
import java.util.List;
34+
import java.util.concurrent.CountDownLatch;
35+
import java.util.concurrent.ExecutorService;
36+
import java.util.concurrent.Executors;
37+
import java.util.concurrent.TimeUnit;
3538
import org.junit.Before;
3639
import org.junit.Rule;
3740
import org.junit.Test;
@@ -56,7 +59,7 @@ public void setup() throws IOException {
5659
}
5760

5861
@Test
59-
public void testPaginationForNamespaceListWithSuccess() throws IOException {
62+
public void testPaginationForNamespaceListWithSuccessThreadSafely() throws IOException {
6063
String namespaceListPage1Str = new String(Files.readAllBytes(Paths.get(LIST_PAGE1_FILE_PATH)));
6164
String namespaceListPage2Str = new String(Files.readAllBytes(Paths.get(LIST_PAGE2_FILE_PATH)));
6265
CoreV1Api api = new CoreV1Api(client);
@@ -79,6 +82,10 @@ public void testPaginationForNamespaceListWithSuccess() throws IOException {
7982
.withHeader("Content-Type", "application/json")
8083
.withBody(namespaceListPage2Str)));
8184

85+
int threads = 10;
86+
CountDownLatch latch = new CountDownLatch(threads);
87+
ExecutorService service = Executors.newFixedThreadPool(threads);
88+
8289
Pager<V1Namespace, V1NamespaceList> pager =
8390
new Pager<V1Namespace, V1NamespaceList>(
8491
(Pager.PagerParams param) -> {
@@ -103,15 +110,27 @@ public void testPaginationForNamespaceListWithSuccess() throws IOException {
103110
1,
104111
V1NamespaceList.class);
105112

106-
int size = 0;
107-
for (V1Namespace namespace : pager) {
108-
assertEquals("default", namespace.getMetadata().getName());
109-
size++;
113+
for (int i = 0; i < threads; i++) {
114+
service.submit(
115+
() -> {
116+
int size = 0;
117+
for (V1Namespace namespace : pager) {
118+
assertEquals("default", namespace.getMetadata().getName());
119+
size++;
120+
}
121+
assertEquals(2, size);
122+
latch.countDown();
123+
});
124+
}
125+
126+
try {
127+
latch.await(5, TimeUnit.SECONDS);
128+
} catch (InterruptedException e) {
129+
fail("timed out waiting for pager finished");
110130
}
111-
assertEquals(2, size);
112131

113132
verify(
114-
2,
133+
2 * threads,
115134
getRequestedFor(urlPathEqualTo("/api/v1/namespaces"))
116135
.withQueryParam("limit", equalTo("1")));
117136
}
@@ -129,7 +148,7 @@ public void testPaginationForNamespaceListWithBadTokenFailure() throws IOExcepti
129148
.withStatus(400)
130149
.withHeader("Content-Type", "application/json")
131150
.withBody(status400Str)));
132-
Pager pager =
151+
Pager<V1Namespace, V1NamespaceList> pager =
133152
new Pager<V1Namespace, V1NamespaceList>(
134153
(Pager.PagerParams param) -> {
135154
try {
@@ -152,18 +171,16 @@ public void testPaginationForNamespaceListWithBadTokenFailure() throws IOExcepti
152171
client,
153172
1,
154173
V1NamespaceList.class);
155-
while (pager.hasNext()) {
156-
try {
157-
V1NamespaceList list = (V1NamespaceList) pager.next();
158-
List<V1Namespace> items = list.getItems();
159-
assertEquals(1, items.size());
160-
for (V1Namespace namespace : items) {
161-
assertEquals("default", namespace.getMetadata().getName());
162-
}
163-
} catch (Exception e) {
164-
assertEquals(status400Str, e.getMessage());
174+
int count = 0;
175+
try {
176+
for (V1Namespace namespace : pager) {
177+
assertEquals("default", namespace.getMetadata().getName());
178+
count++;
165179
}
180+
} catch (Exception e) {
181+
assertEquals(status400Str, e.getMessage());
166182
}
183+
167184
verify(
168185
getRequestedFor(urlPathEqualTo("/api/v1/namespaces"))
169186
.withQueryParam("limit", equalTo("1")));
@@ -183,7 +200,7 @@ public void testPaginationForNamespaceListWithFieldSelectorFailure() throws IOEx
183200
.withStatus(400)
184201
.withHeader("Content-Type", "application/json")
185202
.withBody(status400Str)));
186-
Pager pager =
203+
Pager<V1Namespace, V1NamespaceList> pager =
187204
new Pager<V1Namespace, V1NamespaceList>(
188205
(Pager.PagerParams param) -> {
189206
try {
@@ -206,18 +223,16 @@ public void testPaginationForNamespaceListWithFieldSelectorFailure() throws IOEx
206223
client,
207224
1,
208225
V1NamespaceList.class);
209-
while (pager.hasNext()) {
210-
try {
211-
V1NamespaceList list = (V1NamespaceList) pager.next();
212-
List<V1Namespace> items = list.getItems();
213-
assertEquals(1, items.size());
214-
for (V1Namespace namespace : items) {
215-
assertEquals("default", namespace.getMetadata().getName());
216-
}
217-
} catch (Exception e) {
218-
assertEquals(status400Str, e.getMessage());
226+
int count = 0;
227+
try {
228+
for (V1Namespace namespace : pager) {
229+
count++;
230+
assertEquals("default", namespace.getMetadata().getName());
219231
}
232+
} catch (Exception e) {
233+
assertEquals(status400Str, e.getMessage());
220234
}
235+
221236
verify(
222237
getRequestedFor(urlPathEqualTo("/api/v1/namespaces"))
223238
.withQueryParam("fieldSelector", equalTo("metadata.name=default"))

0 commit comments

Comments
 (0)