|
38 | 38 |
|
39 | 39 | @SuppressWarnings({ "java:S2925", "java:S5778" }) |
40 | 40 | class RingBufferBlockingQueueTest { |
41 | | - |
| 41 | + |
42 | 42 | @Test |
43 | 43 | void testSuccess() { |
44 | 44 | final ExecutorService producer = Executors.newSingleThreadScheduledExecutor(); |
45 | | - |
| 45 | + |
46 | 46 | final ScheduledExecutorService consumer = Executors.newSingleThreadScheduledExecutor(); |
47 | | - |
| 47 | + |
48 | 48 | final List<RequestEntry<Integer>> requestEntriesOut = new LinkedList<>(); |
49 | | - |
| 49 | + |
50 | 50 | final RingBufferBlockingQueue<RequestEntry<Integer>> ringBlockingQueue = new RingBufferBlockingQueue<>(); |
51 | | - |
| 51 | + |
52 | 52 | producer.submit(() -> { |
53 | 53 | IntStream.range(0, 100_000).forEach(value -> { |
54 | 54 | ringBlockingQueue.put(RequestEntry.<Integer>builder().withValue(value).build()); |
55 | | - System.err.println("write: " + ringBlockingQueue.writeSequence()); |
56 | 55 | }); |
57 | 56 | }); |
58 | | - |
| 57 | + |
59 | 58 | consumer.scheduleAtFixedRate(() -> { |
60 | 59 | while (!ringBlockingQueue.isEmpty()) { |
61 | 60 | final List<RequestEntry<Integer>> requestEntries = new LinkedList<>(); |
62 | | - |
| 61 | + |
63 | 62 | while ((requestEntries.size() < 10) && Objects.nonNull(ringBlockingQueue.peek())) { |
64 | 63 | final RequestEntry<Integer> take = ringBlockingQueue.take(); |
65 | | - System.err.println("read: " + ringBlockingQueue.readSequence()); |
66 | 64 | requestEntries.add(take); |
67 | 65 | } |
68 | | - |
| 66 | + |
69 | 67 | requestEntriesOut.addAll(requestEntries); |
70 | 68 | } |
71 | 69 | }, 0, 100L, TimeUnit.MILLISECONDS); |
72 | | - |
| 70 | + |
73 | 71 | await().pollInterval(5, TimeUnit.SECONDS).pollDelay(200, TimeUnit.MILLISECONDS).until(() -> { |
74 | 72 | return (ringBlockingQueue.writeSequence() == 99_999) && (ringBlockingQueue.readSequence() == 100_000); |
75 | 73 | }); |
76 | | - |
| 74 | + |
77 | 75 | producer.shutdown(); |
78 | 76 | consumer.shutdown(); |
79 | | - |
| 77 | + |
80 | 78 | assertThat(ringBlockingQueue.size(), is(0)); |
81 | 79 | assertThat(ringBlockingQueue.isEmpty(), is(true)); |
82 | | - |
| 80 | + |
83 | 81 | assertThat(requestEntriesOut, hasSize(100_000)); |
84 | 82 | requestEntriesOut.sort((a, b) -> a.getValue() - b.getValue()); |
85 | | - |
| 83 | + |
86 | 84 | for (int i = 0; i < 100_000; i++) { |
87 | 85 | assertThat(requestEntriesOut.get(i).getValue(), is(i)); |
88 | 86 | } |
89 | 87 | } |
90 | | - |
| 88 | + |
91 | 89 | @Test |
92 | 90 | void testSuccessWhenIsEmpty() { |
93 | 91 | final RingBufferBlockingQueue<RequestEntry<Integer>> ringBlockingQueue = new RingBufferBlockingQueue<>(); |
94 | | - |
| 92 | + |
95 | 93 | final ExecutorService producer = Executors.newSingleThreadExecutor(); |
96 | | - |
| 94 | + |
97 | 95 | final ExecutorService consumer = Executors.newSingleThreadExecutor(); |
98 | | - |
| 96 | + |
99 | 97 | consumer.submit(() -> { |
100 | 98 | assertThat(ringBlockingQueue.take().getValue(), is(0)); |
101 | 99 | assertThat(ringBlockingQueue.take().getValue(), is(1)); |
102 | 100 | }); |
103 | | - |
| 101 | + |
104 | 102 | await().pollDelay(2, TimeUnit.SECONDS).until(() -> true); |
105 | | - |
| 103 | + |
106 | 104 | producer.submit(() -> { |
107 | 105 | ringBlockingQueue.put(RequestEntry.<Integer>builder().withValue(0).build()); |
108 | 106 | ringBlockingQueue.put(RequestEntry.<Integer>builder().withValue(1).build()); |
109 | 107 | }); |
110 | | - |
| 108 | + |
111 | 109 | await().atMost(1, TimeUnit.MINUTES).until(() -> ringBlockingQueue.writeSequence() == 1); |
112 | 110 | producer.shutdownNow(); |
113 | | - |
| 111 | + |
114 | 112 | await().atMost(1, TimeUnit.MINUTES).until(() -> ringBlockingQueue.readSequence() == 2); |
115 | 113 | consumer.shutdownNow(); |
116 | | - |
| 114 | + |
117 | 115 | assertThat(ringBlockingQueue.isEmpty(), is(true)); |
118 | 116 | } |
119 | | - |
| 117 | + |
120 | 118 | @Test |
121 | 119 | void testSuccessWhenIsFull() { |
122 | 120 | final RingBufferBlockingQueue<RequestEntry<Integer>> ringBlockingQueue = new RingBufferBlockingQueue<>(1); |
123 | | - |
| 121 | + |
124 | 122 | final ExecutorService producer = Executors.newSingleThreadExecutor(); |
125 | | - |
| 123 | + |
126 | 124 | final ExecutorService consumer = Executors.newSingleThreadExecutor(); |
127 | | - |
| 125 | + |
128 | 126 | producer.submit(() -> { |
129 | 127 | ringBlockingQueue.put(RequestEntry.<Integer>builder().withValue(0).build()); |
130 | 128 | ringBlockingQueue.put(RequestEntry.<Integer>builder().withValue(1).build()); |
131 | 129 | }); |
132 | | - |
| 130 | + |
133 | 131 | await().pollDelay(2, TimeUnit.SECONDS).until(() -> true); |
134 | | - |
| 132 | + |
135 | 133 | consumer.submit(() -> { |
136 | 134 | assertThat(ringBlockingQueue.take().getValue(), is(0)); |
137 | 135 | assertThat(ringBlockingQueue.take().getValue(), is(1)); |
138 | 136 | }); |
139 | | - |
| 137 | + |
140 | 138 | await().atMost(1, TimeUnit.MINUTES).until(() -> ringBlockingQueue.writeSequence() == 1); |
141 | 139 | producer.shutdownNow(); |
142 | | - |
| 140 | + |
143 | 141 | await().atMost(1, TimeUnit.MINUTES).until(() -> ringBlockingQueue.readSequence() == 2); |
144 | 142 | consumer.shutdownNow(); |
145 | | - |
| 143 | + |
146 | 144 | assertThat(ringBlockingQueue.isEmpty(), is(true)); |
147 | 145 | } |
148 | | - |
| 146 | + |
149 | 147 | @Test |
150 | 148 | void testFailOffer() { |
151 | 149 | final RingBufferBlockingQueue<RequestEntry<Integer>> ringBlockingQueue = new RingBufferBlockingQueue<>(); |
152 | 150 | assertThrows(UnsupportedOperationException.class, () -> ringBlockingQueue.offer(RequestEntry.<Integer>builder().withValue(0).build())); |
153 | 151 | } |
154 | | - |
| 152 | + |
155 | 153 | @Test |
156 | 154 | void testFailOfferWithParams() { |
157 | 155 | final RingBufferBlockingQueue<RequestEntry<Integer>> ringBlockingQueue = new RingBufferBlockingQueue<>(); |
158 | 156 | assertThrows(UnsupportedOperationException.class, () -> ringBlockingQueue.offer(RequestEntry.<Integer>builder().withValue(0).build(), 1, TimeUnit.MILLISECONDS)); |
159 | 157 | } |
160 | | - |
| 158 | + |
161 | 159 | @Test |
162 | 160 | void testFailPoll() { |
163 | 161 | final RingBufferBlockingQueue<RequestEntry<Integer>> ringBlockingQueue = new RingBufferBlockingQueue<>(); |
164 | 162 | assertThrows(UnsupportedOperationException.class, ringBlockingQueue::poll); |
165 | 163 | } |
166 | | - |
| 164 | + |
167 | 165 | @Test |
168 | 166 | void testFailPollWithParams() { |
169 | 167 | final RingBufferBlockingQueue<RequestEntry<Integer>> ringBlockingQueue = new RingBufferBlockingQueue<>(); |
170 | 168 | assertThrows(UnsupportedOperationException.class, () -> ringBlockingQueue.poll(1, TimeUnit.MILLISECONDS)); |
171 | 169 | } |
172 | | - |
| 170 | + |
173 | 171 | @Test |
174 | 172 | void testFailIterator() { |
175 | 173 | final RingBufferBlockingQueue<RequestEntry<Integer>> ringBlockingQueue = new RingBufferBlockingQueue<>(); |
176 | 174 | assertThrows(UnsupportedOperationException.class, ringBlockingQueue::iterator); |
177 | 175 | } |
178 | | - |
| 176 | + |
179 | 177 | @Test |
180 | 178 | void testFailAdd() { |
181 | 179 | final RingBufferBlockingQueue<RequestEntry<Integer>> ringBlockingQueue = new RingBufferBlockingQueue<>(); |
182 | 180 | assertThrows(UnsupportedOperationException.class, () -> ringBlockingQueue.add(RequestEntry.<Integer>builder().withValue(0).build())); |
183 | 181 | } |
184 | | - |
| 182 | + |
185 | 183 | @Test |
186 | 184 | void testFailRemainingCapacity() { |
187 | 185 | final RingBufferBlockingQueue<RequestEntry<Integer>> ringBlockingQueue = new RingBufferBlockingQueue<>(); |
188 | 186 | assertThrows(UnsupportedOperationException.class, ringBlockingQueue::remainingCapacity); |
189 | 187 | } |
190 | | - |
| 188 | + |
191 | 189 | @Test |
192 | 190 | void testFailDrainTo() { |
193 | 191 | final RingBufferBlockingQueue<RequestEntry<Integer>> ringBlockingQueue = new RingBufferBlockingQueue<>(); |
194 | 192 | assertThrows(UnsupportedOperationException.class, () -> ringBlockingQueue.drainTo(Collections.emptyList())); |
195 | 193 | } |
196 | | - |
| 194 | + |
197 | 195 | @Test |
198 | 196 | void testFailDrainToWithParams() { |
199 | 197 | final RingBufferBlockingQueue<RequestEntry<Integer>> ringBlockingQueue = new RingBufferBlockingQueue<>(); |
200 | 198 | assertThrows(UnsupportedOperationException.class, () -> ringBlockingQueue.drainTo(Collections.emptyList(), 1)); |
201 | 199 | } |
202 | | - |
| 200 | + |
203 | 201 | } |
0 commit comments