|
41 | 41 | import java.util.concurrent.CountDownLatch; |
42 | 42 | import java.util.concurrent.ExecutorService; |
43 | 43 | import java.util.concurrent.Executors; |
44 | | -import java.util.concurrent.Semaphore; |
45 | 44 | import java.util.concurrent.TimeUnit; |
46 | 45 | import java.util.concurrent.atomic.AtomicBoolean; |
47 | 46 | import java.util.concurrent.atomic.AtomicInteger; |
|
58 | 57 | import org.junit.jupiter.api.extension.ExtendWith; |
59 | 58 | import org.junit.jupiter.params.ParameterizedTest; |
60 | 59 | import org.junit.jupiter.params.provider.ValueSource; |
61 | | -import wiremock.org.checkerframework.checker.units.qual.A; |
62 | 60 |
|
63 | 61 | @ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) |
64 | 62 | public class StreamProducerTest { |
@@ -90,67 +88,6 @@ void tearDown() { |
90 | 88 | environment.close(); |
91 | 89 | } |
92 | 90 |
|
93 | | - private static AtomicLong rate() { |
94 | | - AtomicLong count = new AtomicLong(); |
95 | | - AtomicLong tick = new AtomicLong(System.nanoTime()); |
96 | | - |
97 | | - Executors.newSingleThreadScheduledExecutor() |
98 | | - .scheduleAtFixedRate( |
99 | | - () -> { |
100 | | - long now = System.nanoTime(); |
101 | | - long before = tick.getAndSet(now); |
102 | | - long elapsed = now - before; |
103 | | - long sent = count.getAndSet(0); |
104 | | - System.out.println("Rate " + (sent * 1_000_000_000L / elapsed) + " msg/s"); |
105 | | - }, |
106 | | - 1, |
107 | | - 1, |
108 | | - TimeUnit.SECONDS); |
109 | | - return count; |
110 | | - } |
111 | | - |
112 | | - @Test |
113 | | - void test() { |
114 | | - AtomicLong count = rate(); |
115 | | - Producer producer = environment.producerBuilder().stream(stream) |
116 | | - .maxUnconfirmedMessages(10) |
117 | | - .build(); |
118 | | - |
119 | | - while(true) { |
120 | | - producer.send(producer.messageBuilder().build(), s -> { }); |
121 | | - count.incrementAndGet(); |
122 | | - } |
123 | | - |
124 | | - } |
125 | | - |
126 | | - @Test |
127 | | - void client() throws Exception { |
128 | | - int permits = 10; |
129 | | - Semaphore semaphore = new Semaphore(permits); |
130 | | - Client client = cf.get(new Client.ClientParameters().publishConfirmListener(new Client.PublishConfirmListener() { |
131 | | - @Override |
132 | | - public void handle(byte publisherId, long publishingId) { |
133 | | - semaphore.release(); |
134 | | - } |
135 | | - })); |
136 | | - |
137 | | - byte pubId = (byte) 0; |
138 | | - client.declarePublisher(pubId, null, stream); |
139 | | - |
140 | | - AtomicLong count = rate(); |
141 | | - |
142 | | - List<Message> messages = IntStream.range(0, permits).mapToObj(ignored -> client |
143 | | - .messageBuilder() |
144 | | - .addData("hello".getBytes(StandardCharsets.UTF_8)) |
145 | | - .build()).collect(Collectors.toList()); |
146 | | - while (true) { |
147 | | - semaphore.acquire(permits); |
148 | | - client.publish(pubId, messages); |
149 | | - count.addAndGet(permits); |
150 | | - } |
151 | | - |
152 | | - } |
153 | | - |
154 | 91 | @Test |
155 | 92 | void send() throws Exception { |
156 | 93 | int batchSize = 10; |
|
0 commit comments