Skip to content

Commit 64ec0f0

Browse files
authored
Create IterablePublisher to convert an Iterable to a Publisher (#5030)
* Implement the functionality to convert an Iterable to a Publisher * Use SimplePublisher in IterablePublisher * Remove unncessary lock
1 parent ba56f43 commit 64ec0f0

File tree

7 files changed

+252
-1
lines changed

7 files changed

+252
-1
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Support creating an `SdkPublisher` from an `Iterable`"
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/SdkPublisher.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import software.amazon.awssdk.utils.async.EventListeningSubscriber;
3131
import software.amazon.awssdk.utils.async.FilteringSubscriber;
3232
import software.amazon.awssdk.utils.async.FlatteningSubscriber;
33+
import software.amazon.awssdk.utils.async.IterablePublisher;
3334
import software.amazon.awssdk.utils.async.LimitingSubscriber;
3435
import software.amazon.awssdk.utils.async.SequentialSubscriber;
3536
import software.amazon.awssdk.utils.internal.MappingSubscriber;
@@ -51,6 +52,17 @@ static <T> SdkPublisher<T> adapt(Publisher<T> toAdapt) {
5152
return toAdapt::subscribe;
5253
}
5354

55+
/**
56+
* Create an {@link SdkPublisher} from an {@link Iterable}.
57+
*
58+
* @param iterable {@link Iterable} to adapt.
59+
* @param <T> Type of object being published.
60+
* @return SdkPublisher
61+
*/
62+
static <T> SdkPublisher<T> fromIterable(Iterable<T> iterable) {
63+
return adapt(new IterablePublisher<>(iterable));
64+
}
65+
5466
/**
5567
* Filters published events to just those that are instances of the given class. This changes the type of
5668
* publisher to the type specified in the {@link Class}.

utils/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,11 @@
120120
<scope>test</scope>
121121
<version>${slf4j.version}</version>
122122
</dependency>
123+
<dependency>
124+
<groupId>commons-lang</groupId>
125+
<artifactId>commons-lang</artifactId>
126+
<scope>test</scope>
127+
</dependency>
123128
</dependencies>
124129

125130
<build>
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.utils.async;
17+
18+
import java.util.Iterator;
19+
import org.reactivestreams.Publisher;
20+
import org.reactivestreams.Subscriber;
21+
import software.amazon.awssdk.annotations.SdkProtectedApi;
22+
import software.amazon.awssdk.utils.Validate;
23+
24+
@SdkProtectedApi
25+
public class IterablePublisher<T> implements Publisher<T> {
26+
private final Iterable<T> iterable;
27+
28+
public IterablePublisher(Iterable<T> iterable) {
29+
this.iterable = Validate.paramNotNull(iterable, "iterable");
30+
}
31+
32+
@Override
33+
public void subscribe(Subscriber<? super T> subscriber) {
34+
Iterator<T> iterator = iterable.iterator();
35+
SimplePublisher<T> publisher = new SimplePublisher<>();
36+
37+
// Prime the simple publisher with 1 event. More will be sent as these complete.
38+
sendEvent(iterator, publisher);
39+
40+
publisher.subscribe(subscriber);
41+
}
42+
43+
private void sendEvent(Iterator<T> iterator, SimplePublisher<T> publisher) {
44+
try {
45+
if (!iterator.hasNext()) {
46+
publisher.complete();
47+
return;
48+
}
49+
50+
T next = iterator.next();
51+
if (next == null) {
52+
publisher.error(new IllegalArgumentException("Iterable returned null"));
53+
return;
54+
}
55+
56+
publisher.send(next).whenComplete((v, t) -> {
57+
if (t != null) {
58+
publisher.error(t);
59+
} else {
60+
sendEvent(iterator, publisher);
61+
}
62+
});
63+
} catch (Throwable e) {
64+
publisher.error(e);
65+
}
66+
}
67+
}

utils/src/main/java/software/amazon/awssdk/utils/async/SimplePublisher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ private void doProcessQueue() {
278278

279279
OnErrorQueueEntry<T> onErrorEntry = (OnErrorQueueEntry<T>) entry;
280280
failureMessage.trySet(() -> new IllegalStateException("onError() was already invoked.",
281-
onErrorEntry.failure));
281+
onErrorEntry.failure));
282282
log.trace(() -> "Calling onError() with " + onErrorEntry.failure, onErrorEntry.failure);
283283
subscriber.onError(onErrorEntry.failure);
284284
break;
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.utils.async;
17+
18+
import java.util.Iterator;
19+
import org.reactivestreams.Publisher;
20+
import org.reactivestreams.tck.PublisherVerification;
21+
import org.reactivestreams.tck.TestEnvironment;
22+
23+
public class IterablePublisherTckTest extends PublisherVerification<Long> {
24+
25+
26+
public IterablePublisherTckTest() {
27+
super(new TestEnvironment());
28+
}
29+
30+
@Override
31+
public Publisher<Long> createPublisher(long elements) {
32+
Iterable<Long> iterable = () -> new Iterator<Long>() {
33+
private long count;
34+
@Override
35+
public boolean hasNext() {
36+
if (count == elements) {
37+
return false;
38+
}
39+
40+
return true;
41+
}
42+
43+
@Override
44+
public Long next() {
45+
count++;
46+
return count;
47+
}
48+
};
49+
return new IterablePublisher<>(iterable);
50+
}
51+
52+
@Override
53+
public Publisher<Long> createFailedPublisher() {
54+
return null;
55+
}
56+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.utils.async;
17+
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
21+
22+
import java.util.ArrayList;
23+
import java.util.Arrays;
24+
import java.util.List;
25+
import java.util.stream.Collectors;
26+
import java.util.stream.IntStream;
27+
import org.apache.commons.lang.RandomStringUtils;
28+
import org.junit.jupiter.api.Test;
29+
import org.reactivestreams.Subscriber;
30+
import org.reactivestreams.Subscription;
31+
32+
public class IterablePublisherTest {
33+
34+
@Test
35+
void nullIterable_throwException() {
36+
assertThatThrownBy(() -> new IterablePublisher<>(null)).isInstanceOf(NullPointerException.class);
37+
}
38+
39+
@Test
40+
void emptyIterable_shouldComplete() {
41+
TestSubscriber testSubscriber = new TestSubscriber();
42+
IterablePublisher<String> iterablePublisher = new IterablePublisher<>(new ArrayList<>());
43+
iterablePublisher.subscribe(testSubscriber);
44+
assertThat(testSubscriber.onCompleteInvoked).isTrue();
45+
assertThat(testSubscriber.onNextInvoked).isFalse();
46+
assertThat(testSubscriber.onErrorInvoked).isFalse();
47+
}
48+
49+
@Test
50+
void iterableReturnNull_shouldInvokeOnError() {
51+
TestSubscriber testSubscriber = new TestSubscriber();
52+
IterablePublisher<String> iterablePublisher = new IterablePublisher<>(Arrays.asList("foo", null));
53+
iterablePublisher.subscribe(testSubscriber);
54+
assertThat(testSubscriber.onCompleteInvoked).isFalse();
55+
assertThat(testSubscriber.results).contains("foo");
56+
assertThat(testSubscriber.onErrorInvoked).isTrue();
57+
assertThat(testSubscriber.throwable).isInstanceOf(IllegalArgumentException.class).hasMessageContaining("returned null");
58+
}
59+
60+
@Test
61+
void happyCase_shouldSendAllEvents() {
62+
TestSubscriber testSubscriber = new TestSubscriber();
63+
List<String> strings = IntStream.range(0, 100).mapToObj(i -> RandomStringUtils.random(i)).collect(Collectors.toList());
64+
65+
IterablePublisher<String> iterablePublisher = new IterablePublisher<>(strings);
66+
iterablePublisher.subscribe(testSubscriber);
67+
assertThat(testSubscriber.onCompleteInvoked).isTrue();
68+
assertThat(testSubscriber.results).hasSameElementsAs(strings);
69+
assertThat(testSubscriber.onErrorInvoked).isFalse();
70+
}
71+
72+
private static class TestSubscriber implements Subscriber<String> {
73+
private Subscription subscription;
74+
private List<String> results = new ArrayList<>();
75+
private boolean onNextInvoked;
76+
private boolean onErrorInvoked;
77+
private boolean onCompleteInvoked;
78+
private Throwable throwable;
79+
80+
@Override
81+
public void onSubscribe(Subscription s) {
82+
this.subscription = s;
83+
s.request(1);
84+
}
85+
86+
@Override
87+
public void onNext(String s) {
88+
onNextInvoked = true;
89+
results.add(s);
90+
subscription.request(1);
91+
}
92+
93+
@Override
94+
public void onError(Throwable t) {
95+
onErrorInvoked = true;
96+
throwable = t;
97+
}
98+
99+
@Override
100+
public void onComplete() {
101+
onCompleteInvoked = true;
102+
103+
}
104+
}
105+
}

0 commit comments

Comments
 (0)