Skip to content

Commit 21fb671

Browse files
authored
[fix][test] Improve the declare exchange test (#1150)
1 parent 1c6ab82 commit 21fb671

File tree

1 file changed

+30
-5
lines changed

1 file changed

+30
-5
lines changed

tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/functional/ExchangeDeclareTest.java

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626
import java.io.IOException;
2727
import java.util.HashMap;
2828
import java.util.Map;
29+
import java.util.concurrent.TimeUnit;
2930
import java.util.concurrent.TimeoutException;
31+
import org.awaitility.Awaitility;
3032
import org.testng.annotations.Test;
3133

3234
/**
@@ -121,24 +123,47 @@ private void doTestExchangeDeclaredWithEnumerationEquivalent(Channel channel)
121123
for (BuiltinExchangeType exchangeType : BuiltinExchangeType.values()) {
122124
channel.exchangeDeclare(NAME, exchangeType);
123125
verifyEquivalent(NAME, exchangeType.getType(), false, false, null);
124-
channel.exchangeDelete(NAME);
126+
deleteExchangeWithRetry();
125127

126128
channel.exchangeDeclare(NAME, exchangeType, false);
127129
verifyEquivalent(NAME, exchangeType.getType(), false, false, null);
128-
channel.exchangeDelete(NAME);
130+
deleteExchangeWithRetry();
129131

130132
channel.exchangeDeclare(NAME, exchangeType, false, false, null);
131133
verifyEquivalent(NAME, exchangeType.getType(), false, false, null);
132-
channel.exchangeDelete(NAME);
134+
deleteExchangeWithRetry();
133135

134136
channel.exchangeDeclare(NAME, exchangeType, false, false, false, null);
135137
verifyEquivalent(NAME, exchangeType.getType(), false, false, null);
136-
channel.exchangeDelete(NAME);
138+
deleteExchangeWithRetry();
137139

138140
channel.exchangeDeclareNoWait(NAME, exchangeType, false,
139141
false, false, null);
140142
// no check, this one is asynchronous
141-
channel.exchangeDelete(NAME);
143+
deleteExchangeWithRetry();
142144
}
143145
}
146+
147+
public void deleteExchangeWithRetry() throws IOException {
148+
// the replicator cursor of exchange is created in async way,
149+
// delete the exchange may fail due to non-empty directory error
150+
//
151+
// KeeperErrorCode = Directory not empty for
152+
// /managed-ledgers/public/vhost1/persistent/__amqp_exchange__exchange_test
153+
//
154+
// - /managed-ledgers/public/vhost1/persistent/__amqp_exchange__exchange_test
155+
// - /managed-ledgers/public/vhost1/persistent/__amqp_exchange__exchange_test/__amqp_replicator__exchange_test
156+
Awaitility.await()
157+
.atMost(5, TimeUnit.SECONDS)
158+
.pollInterval(100, TimeUnit.MILLISECONDS)
159+
.until(() -> {
160+
try {
161+
channel.exchangeDelete(NAME);
162+
} catch (Exception e) {
163+
return false;
164+
}
165+
return true;
166+
});
167+
}
168+
144169
}

0 commit comments

Comments
 (0)