|
57 | 57 | import org.apache.rocketmq.remoting.protocol.RemotingCommand; |
58 | 58 | import org.apache.rocketmq.remoting.protocol.RequestCode; |
59 | 59 | import org.apache.rocketmq.remoting.protocol.RequestHeaderRegistry; |
| 60 | +import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody; |
| 61 | +import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody; |
60 | 62 | import org.apache.rocketmq.remoting.protocol.header.ConsumerSendMsgBackRequestHeader; |
61 | 63 | import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader; |
62 | 64 | import org.apache.rocketmq.remoting.protocol.header.CreateUserRequestHeader; |
@@ -508,6 +510,66 @@ public void buildRemoting() { |
508 | 510 | Assert.assertEquals("User:rocketmq", result.get(0).getSubject().getSubjectKey()); |
509 | 511 | Assert.assertEquals("Cluster:DefaultCluster", result.get(0).getResource().getResourceKey()); |
510 | 512 | Assert.assertTrue(result.get(0).getActions().containsAll(Arrays.asList(Action.UPDATE))); |
| 513 | + |
| 514 | + LockBatchRequestBody lockBatchRequestBody = new LockBatchRequestBody(); |
| 515 | + lockBatchRequestBody.setConsumerGroup("group"); |
| 516 | + java.util.Set<org.apache.rocketmq.common.message.MessageQueue> lockMqSet = new java.util.HashSet<>(); |
| 517 | + |
| 518 | + lockMqSet.add(new org.apache.rocketmq.common.message.MessageQueue("topic", "broker-a", 0)); |
| 519 | + // retry topic, should be skipped |
| 520 | + lockMqSet.add(new org.apache.rocketmq.common.message.MessageQueue("%RETRY%group", "broker-a", 1)); |
| 521 | + lockBatchRequestBody.setMqSet(lockMqSet); |
| 522 | + |
| 523 | + request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null); |
| 524 | + request.setBody(JSON.toJSONBytes(lockBatchRequestBody)); |
| 525 | + request.setVersion(441); |
| 526 | + request.addExtField("AccessKey", "rocketmq"); |
| 527 | + request.makeCustomHeaderToNet(); |
| 528 | + |
| 529 | + result = builder.build(channelHandlerContext, request); |
| 530 | + Assert.assertEquals(2, result.size()); |
| 531 | + |
| 532 | + Assert.assertEquals("User:rocketmq", getContext(result, ResourceType.GROUP).getSubject().getSubjectKey()); |
| 533 | + Assert.assertEquals("Group:group", getContext(result, ResourceType.GROUP).getResource().getResourceKey()); |
| 534 | + Assert.assertTrue(getContext(result, ResourceType.GROUP).getActions().containsAll(Arrays.asList(Action.SUB))); |
| 535 | + |
| 536 | + Assert.assertEquals("User:rocketmq", getContext(result, ResourceType.TOPIC).getSubject().getSubjectKey()); |
| 537 | + Assert.assertEquals("Topic:topic", getContext(result, ResourceType.TOPIC).getResource().getResourceKey()); |
| 538 | + Assert.assertTrue(getContext(result, ResourceType.TOPIC).getActions().containsAll(Arrays.asList(Action.SUB))); |
| 539 | + |
| 540 | + Assert.assertEquals("192.168.0.1", getContext(result, ResourceType.TOPIC).getSourceIp()); |
| 541 | + Assert.assertEquals("channel-id", getContext(result, ResourceType.TOPIC).getChannelId()); |
| 542 | + Assert.assertEquals(String.valueOf(RequestCode.LOCK_BATCH_MQ), getContext(result, ResourceType.TOPIC).getRpcCode()); |
| 543 | + |
| 544 | + UnlockBatchRequestBody unlockBatchRequestBody = new UnlockBatchRequestBody(); |
| 545 | + unlockBatchRequestBody.setConsumerGroup("group"); |
| 546 | + java.util.Set<org.apache.rocketmq.common.message.MessageQueue> unlockMqSet = new java.util.HashSet<>(); |
| 547 | + unlockMqSet.add(new org.apache.rocketmq.common.message.MessageQueue("topic", "broker-a", 0)); |
| 548 | + // retry topic, should be skipped |
| 549 | + unlockMqSet.add(new org.apache.rocketmq.common.message.MessageQueue("%RETRY%group", "broker-a", 1)); |
| 550 | + unlockBatchRequestBody.setMqSet(unlockMqSet); |
| 551 | + |
| 552 | + request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null); |
| 553 | + request.setBody(JSON.toJSONBytes(unlockBatchRequestBody)); |
| 554 | + request.setVersion(441); |
| 555 | + request.addExtField("AccessKey", "rocketmq"); |
| 556 | + request.makeCustomHeaderToNet(); |
| 557 | + |
| 558 | + result = builder.build(channelHandlerContext, request); |
| 559 | + Assert.assertEquals(2, result.size()); |
| 560 | + |
| 561 | + Assert.assertEquals("User:rocketmq", getContext(result, ResourceType.GROUP).getSubject().getSubjectKey()); |
| 562 | + Assert.assertEquals("Group:group", getContext(result, ResourceType.GROUP).getResource().getResourceKey()); |
| 563 | + Assert.assertTrue(getContext(result, ResourceType.GROUP).getActions().containsAll(Arrays.asList(Action.SUB))); |
| 564 | + |
| 565 | + Assert.assertEquals("User:rocketmq", getContext(result, ResourceType.TOPIC).getSubject().getSubjectKey()); |
| 566 | + Assert.assertEquals("Topic:topic", getContext(result, ResourceType.TOPIC).getResource().getResourceKey()); |
| 567 | + Assert.assertTrue(getContext(result, ResourceType.TOPIC).getActions().containsAll(Arrays.asList(Action.SUB))); |
| 568 | + |
| 569 | + Assert.assertEquals("192.168.0.1", getContext(result, ResourceType.TOPIC).getSourceIp()); |
| 570 | + Assert.assertEquals("channel-id", getContext(result, ResourceType.TOPIC).getChannelId()); |
| 571 | + Assert.assertEquals(String.valueOf(RequestCode.UNLOCK_BATCH_MQ), getContext(result, ResourceType.TOPIC).getRpcCode()); |
| 572 | + |
511 | 573 | } |
512 | 574 |
|
513 | 575 | private DefaultAuthorizationContext getContext(List<DefaultAuthorizationContext> contexts, |
|
0 commit comments