Skip to content

Commit 2d321d5

Browse files
committed
Stream limit
1 parent 809a6fd commit 2d321d5

File tree

2 files changed

+89
-71
lines changed

2 files changed

+89
-71
lines changed
Lines changed: 87 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,86 +1,104 @@
11
package com.github.kuangcp.stream;
22

3-
import static org.junit.Assert.assertArrayEquals;
4-
import static org.junit.Assert.assertFalse;
5-
import static org.junit.Assert.assertTrue;
3+
import lombok.extern.slf4j.Slf4j;
4+
import org.junit.Test;
65

76
import java.util.Arrays;
87
import java.util.Iterator;
98
import java.util.List;
109
import java.util.Objects;
10+
import java.util.Random;
1111
import java.util.stream.Collectors;
1212
import java.util.stream.Stream;
1313
import java.util.stream.StreamSupport;
14-
import org.junit.Test;
14+
15+
import static org.junit.Assert.assertArrayEquals;
16+
import static org.junit.Assert.assertFalse;
17+
import static org.junit.Assert.assertTrue;
1518

1619
/**
1720
* @author <a href="https://github.com/kuangcp">Kuangcp</a> on 2020-01-08 16:41
1821
*/
22+
@Slf4j
1923
public class CreateStreamTest {
2024

21-
@Test
22-
public void testOfWithNull() {
23-
List<String> list = Stream.of(null, "test", "name").filter(Objects::nonNull)
24-
.collect(Collectors.toList());
25-
System.out.println(list);
26-
}
27-
28-
/**
29-
* 判断是否无限流, 理论上是无法实现的,但是这里可以估计
30-
*/
31-
@Test
32-
public void testJudgeFinite() {
33-
assertFalse(isFinite(Stream.iterate(1, x -> x)));
34-
assertTrue(isFinite(Stream.of(1)));
35-
assertTrue(isFinite(Stream.of(1, 2, 3, 4).limit(1)));
36-
}
37-
38-
<T> boolean isFinite(Stream<T> stream) {
39-
return !Objects.equals(stream.spliterator().estimateSize(), Long.MAX_VALUE);
40-
}
41-
42-
@Test
43-
public void testZip() throws Exception {
44-
45-
Integer[] result = new Integer[5];
46-
Stream<Integer> zip = zip(Stream.of(1, 2, 4, 5).parallel(), Stream.of(2, 3,10).parallel());
47-
zip.collect(Collectors.toList()).toArray(result);
48-
49-
List<Integer> excepts = Arrays.asList(1, 2, 2, 3, 4, 10, 5);
50-
Integer[] exceptArray = new Integer[5];
51-
excepts.toArray(exceptArray);
52-
assertArrayEquals(result, exceptArray);
53-
}
54-
55-
<T> Stream<T> zip(Stream<T> left, Stream<T> right) {
56-
Iterator<T> leftIterator = left.iterator();
57-
Iterator<T> rightIterator = right.iterator();
58-
59-
Iterator<T> iterator = new Iterator<T>() {
60-
private boolean left = false;
61-
62-
@Override
63-
public boolean hasNext() {
64-
return leftIterator.hasNext() || rightIterator.hasNext();
65-
}
66-
67-
@Override
68-
public T next() {
69-
left = !left;
70-
if (left && !leftIterator.hasNext()) {
71-
return rightIterator.next();
72-
}
73-
if (!left && !rightIterator.hasNext()) {
74-
return leftIterator.next();
75-
}
76-
return left
77-
? leftIterator.next()
78-
: rightIterator.next();
79-
}
80-
};
81-
82-
Iterable<T> iterable = () -> iterator;
83-
boolean parallel = left.isParallel() || right.isParallel();
84-
return StreamSupport.stream(iterable.spliterator(), parallel);
85-
}
25+
@Test
26+
public void testOfWithNull() {
27+
List<String> list = Stream.of(null, "test", "name").filter(Objects::nonNull)
28+
.collect(Collectors.toList());
29+
System.out.println(list);
30+
}
31+
32+
/**
33+
* 判断是否无限流, 理论上是无法实现的,但是这里可以估计
34+
*/
35+
@Test
36+
public void testJudgeFinite() {
37+
assertFalse(isFinite(Stream.iterate(1, x -> x)));
38+
assertTrue(isFinite(Stream.of(1)));
39+
assertTrue(isFinite(Stream.of(1, 2, 3, 4).limit(1)));
40+
}
41+
42+
@Test
43+
public void testLimitBlocked() throws Exception {
44+
// 从一个集合中随机选取一部分数据出来。 limit的目标值大于集合大小时,就会阻塞
45+
List<String> ids = Arrays.asList("1", "2", "3");
46+
int size = ids.size();
47+
List<String> pick = new Random()
48+
.ints(0, size)
49+
.distinct()
50+
.limit(5)
51+
.mapToObj(ids::get)
52+
.collect(Collectors.toList());
53+
log.info("pick={}", pick);
54+
}
55+
56+
<T> boolean isFinite(Stream<T> stream) {
57+
return !Objects.equals(stream.spliterator().estimateSize(), Long.MAX_VALUE);
58+
}
59+
60+
@Test
61+
public void testZip() throws Exception {
62+
63+
Integer[] result = new Integer[5];
64+
Stream<Integer> zip = zip(Stream.of(1, 2, 4, 5).parallel(), Stream.of(2, 3, 10).parallel());
65+
zip.collect(Collectors.toList()).toArray(result);
66+
67+
List<Integer> excepts = Arrays.asList(1, 2, 2, 3, 4, 10, 5);
68+
Integer[] exceptArray = new Integer[5];
69+
excepts.toArray(exceptArray);
70+
assertArrayEquals(result, exceptArray);
71+
}
72+
73+
<T> Stream<T> zip(Stream<T> left, Stream<T> right) {
74+
Iterator<T> leftIterator = left.iterator();
75+
Iterator<T> rightIterator = right.iterator();
76+
77+
Iterator<T> iterator = new Iterator<T>() {
78+
private boolean left = false;
79+
80+
@Override
81+
public boolean hasNext() {
82+
return leftIterator.hasNext() || rightIterator.hasNext();
83+
}
84+
85+
@Override
86+
public T next() {
87+
left = !left;
88+
if (left && !leftIterator.hasNext()) {
89+
return rightIterator.next();
90+
}
91+
if (!left && !rightIterator.hasNext()) {
92+
return leftIterator.next();
93+
}
94+
return left
95+
? leftIterator.next()
96+
: rightIterator.next();
97+
}
98+
};
99+
100+
Iterable<T> iterable = () -> iterator;
101+
boolean parallel = left.isParallel() || right.isParallel();
102+
return StreamSupport.stream(iterable.spliterator(), parallel);
103+
}
86104
}

java8/src/test/java/com/github/kuangcp/stream/bug/StreamGenericTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ public void testList() throws Exception {
2222
.map(v -> (Integer) v)
2323
// 即使是硬编码转换类型 同样会只推断为Object类型
2424
.map(v -> Integer.parseInt(v.toString()))
25-
// 甚至换成任意对象值,都不符合Optional.map方法的签名(泛型推断),永远推断为Object
26-
.map(v -> new HashMap<>())
25+
// 甚至换成任意对象值,都不符合Optional.map方法的签名(泛型推断),result 永远推断为Object
26+
// .map(v -> new HashMap<>())
2727
.orElse(null);
2828

2929
// 只能在lambda表达式结束后做强转

0 commit comments

Comments
 (0)