Skip to content

Commit f660c2c

Browse files
authored
add transactional id support for acls (#637)
1 parent 06dcf2c commit f660c2c

File tree

3 files changed

+204
-1
lines changed

3 files changed

+204
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
- [Enhancement] Allow native Kafka customization poll time.
1313
- [Enhancement] Roll out experimental jruby support.
1414
- [Enhancement] Run all specs on each of the platforms with and without precompilation.
15+
- [Enhancement] Support transactional id in the ACL API.
1516
- [Fix] Fix issue where post-closed producer C topics refs would not be cleaned.
1617
- [Fix] Fiber causes Segmentation Fault.
1718
- [Change] Move to trusted-publishers and remove signing since no longer needed.

lib/rdkafka/bindings.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,7 @@ def self.partitioner(topic_ptr, str, partition_count, partitioner = "consistent_
511511
RD_KAFKA_RESOURCE_TOPIC = 2
512512
RD_KAFKA_RESOURCE_GROUP = 3
513513
RD_KAFKA_RESOURCE_BROKER = 4
514+
RD_KAFKA_RESOURCE_TRANSACTIONAL_ID = 5
514515

515516
# rd_kafka_ResourcePatternType_t - https://github.com/confluentinc/librdkafka/blob/292d2a66b9921b783f08147807992e603c7af059/src/rdkafka.h#L7320
516517

spec/rdkafka/admin_spec.rb

Lines changed: 202 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -513,7 +513,7 @@
513513
end
514514
end
515515

516-
describe "#ACL tests" do
516+
describe "#ACL tests for topic resource" do
517517
let(:non_existing_resource_name) {"non-existing-topic"}
518518
before do
519519
#create topic for testing acl
@@ -615,6 +615,207 @@
615615
end
616616
end
617617

618+
describe "#ACL tests for transactional_id" do
619+
let(:transactional_id_resource_name) {"test-transactional-id"}
620+
let(:non_existing_transactional_id) {"non-existing-transactional-id"}
621+
let(:transactional_id_resource_type) { Rdkafka::Bindings::RD_KAFKA_RESOURCE_TRANSACTIONAL_ID }
622+
let(:transactional_id_resource_pattern_type) { Rdkafka::Bindings::RD_KAFKA_RESOURCE_PATTERN_LITERAL }
623+
let(:transactional_id_principal) { "User:test-user" }
624+
let(:transactional_id_host) { "*" }
625+
let(:transactional_id_operation) { Rdkafka::Bindings::RD_KAFKA_ACL_OPERATION_WRITE }
626+
let(:transactional_id_permission_type) { Rdkafka::Bindings::RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW }
627+
628+
after do
629+
# Clean up any ACLs that might have been created during tests
630+
begin
631+
delete_acl_handle = admin.delete_acl(
632+
resource_type: transactional_id_resource_type,
633+
resource_name: nil,
634+
resource_pattern_type: transactional_id_resource_pattern_type,
635+
principal: transactional_id_principal,
636+
host: transactional_id_host,
637+
operation: transactional_id_operation,
638+
permission_type: transactional_id_permission_type
639+
)
640+
delete_acl_handle.wait(max_wait_timeout: 15.0)
641+
rescue
642+
# Ignore cleanup errors
643+
end
644+
end
645+
646+
describe "#create_acl" do
647+
it "creates acl for a transactional_id" do
648+
create_acl_handle = admin.create_acl(
649+
resource_type: transactional_id_resource_type,
650+
resource_name: transactional_id_resource_name,
651+
resource_pattern_type: transactional_id_resource_pattern_type,
652+
principal: transactional_id_principal,
653+
host: transactional_id_host,
654+
operation: transactional_id_operation,
655+
permission_type: transactional_id_permission_type
656+
)
657+
create_acl_report = create_acl_handle.wait(max_wait_timeout: 15.0)
658+
expect(create_acl_report.rdkafka_response).to eq(0)
659+
expect(create_acl_report.rdkafka_response_string).to eq("")
660+
end
661+
662+
it "creates acl for a non-existing transactional_id" do
663+
# ACL creation for transactional_ids that don't exist will still get created successfully
664+
create_acl_handle = admin.create_acl(
665+
resource_type: transactional_id_resource_type,
666+
resource_name: non_existing_transactional_id,
667+
resource_pattern_type: transactional_id_resource_pattern_type,
668+
principal: transactional_id_principal,
669+
host: transactional_id_host,
670+
operation: transactional_id_operation,
671+
permission_type: transactional_id_permission_type
672+
)
673+
create_acl_report = create_acl_handle.wait(max_wait_timeout: 15.0)
674+
expect(create_acl_report.rdkafka_response).to eq(0)
675+
expect(create_acl_report.rdkafka_response_string).to eq("")
676+
677+
# Clean up the ACL that was created for the non-existing transactional_id
678+
delete_acl_handle = admin.delete_acl(
679+
resource_type: transactional_id_resource_type,
680+
resource_name: non_existing_transactional_id,
681+
resource_pattern_type: transactional_id_resource_pattern_type,
682+
principal: transactional_id_principal,
683+
host: transactional_id_host,
684+
operation: transactional_id_operation,
685+
permission_type: transactional_id_permission_type
686+
)
687+
delete_acl_report = delete_acl_handle.wait(max_wait_timeout: 15.0)
688+
expect(delete_acl_handle[:response]).to eq(0)
689+
expect(delete_acl_report.deleted_acls.size).to eq(1)
690+
end
691+
end
692+
693+
describe "#describe_acl" do
694+
it "describes acl of a transactional_id that does not exist" do
695+
describe_acl_handle = admin.describe_acl(
696+
resource_type: transactional_id_resource_type,
697+
resource_name: non_existing_transactional_id,
698+
resource_pattern_type: transactional_id_resource_pattern_type,
699+
principal: transactional_id_principal,
700+
host: transactional_id_host,
701+
operation: transactional_id_operation,
702+
permission_type: transactional_id_permission_type
703+
)
704+
describe_acl_report = describe_acl_handle.wait(max_wait_timeout: 15.0)
705+
expect(describe_acl_handle[:response]).to eq(0)
706+
expect(describe_acl_report.acls.size).to eq(0)
707+
end
708+
709+
it "creates acls and describes the newly created transactional_id acls" do
710+
# Create first ACL
711+
create_acl_handle = admin.create_acl(
712+
resource_type: transactional_id_resource_type,
713+
resource_name: "test_transactional_id_1",
714+
resource_pattern_type: transactional_id_resource_pattern_type,
715+
principal: transactional_id_principal,
716+
host: transactional_id_host,
717+
operation: transactional_id_operation,
718+
permission_type: transactional_id_permission_type
719+
)
720+
create_acl_report = create_acl_handle.wait(max_wait_timeout: 15.0)
721+
expect(create_acl_report.rdkafka_response).to eq(0)
722+
expect(create_acl_report.rdkafka_response_string).to eq("")
723+
724+
# Create second ACL
725+
create_acl_handle = admin.create_acl(
726+
resource_type: transactional_id_resource_type,
727+
resource_name: "test_transactional_id_2",
728+
resource_pattern_type: transactional_id_resource_pattern_type,
729+
principal: transactional_id_principal,
730+
host: transactional_id_host,
731+
operation: transactional_id_operation,
732+
permission_type: transactional_id_permission_type
733+
)
734+
create_acl_report = create_acl_handle.wait(max_wait_timeout: 15.0)
735+
expect(create_acl_report.rdkafka_response).to eq(0)
736+
expect(create_acl_report.rdkafka_response_string).to eq("")
737+
738+
# Since we create and immediately check, this is slow on loaded CIs, hence we wait
739+
sleep(2)
740+
741+
# Describe ACLs - filter by transactional_id resource type
742+
describe_acl_handle = admin.describe_acl(
743+
resource_type: transactional_id_resource_type,
744+
resource_name: nil,
745+
resource_pattern_type: Rdkafka::Bindings::RD_KAFKA_RESOURCE_PATTERN_ANY,
746+
principal: transactional_id_principal,
747+
host: transactional_id_host,
748+
operation: transactional_id_operation,
749+
permission_type: transactional_id_permission_type
750+
)
751+
describe_acl_report = describe_acl_handle.wait(max_wait_timeout: 15.0)
752+
expect(describe_acl_handle[:response]).to eq(0)
753+
expect(describe_acl_report.acls.length).to eq(2)
754+
end
755+
end
756+
757+
describe "#delete_acl" do
758+
it "deletes acl of a transactional_id that does not exist" do
759+
delete_acl_handle = admin.delete_acl(
760+
resource_type: transactional_id_resource_type,
761+
resource_name: non_existing_transactional_id,
762+
resource_pattern_type: transactional_id_resource_pattern_type,
763+
principal: transactional_id_principal,
764+
host: transactional_id_host,
765+
operation: transactional_id_operation,
766+
permission_type: transactional_id_permission_type
767+
)
768+
delete_acl_report = delete_acl_handle.wait(max_wait_timeout: 15.0)
769+
expect(delete_acl_handle[:response]).to eq(0)
770+
expect(delete_acl_report.deleted_acls.size).to eq(0)
771+
end
772+
773+
it "creates transactional_id acls and deletes the newly created acls" do
774+
# Create first ACL
775+
create_acl_handle = admin.create_acl(
776+
resource_type: transactional_id_resource_type,
777+
resource_name: "test_transactional_id_1",
778+
resource_pattern_type: transactional_id_resource_pattern_type,
779+
principal: transactional_id_principal,
780+
host: transactional_id_host,
781+
operation: transactional_id_operation,
782+
permission_type: transactional_id_permission_type
783+
)
784+
create_acl_report = create_acl_handle.wait(max_wait_timeout: 15.0)
785+
expect(create_acl_report.rdkafka_response).to eq(0)
786+
expect(create_acl_report.rdkafka_response_string).to eq("")
787+
788+
# Create second ACL
789+
create_acl_handle = admin.create_acl(
790+
resource_type: transactional_id_resource_type,
791+
resource_name: "test_transactional_id_2",
792+
resource_pattern_type: transactional_id_resource_pattern_type,
793+
principal: transactional_id_principal,
794+
host: transactional_id_host,
795+
operation: transactional_id_operation,
796+
permission_type: transactional_id_permission_type
797+
)
798+
create_acl_report = create_acl_handle.wait(max_wait_timeout: 15.0)
799+
expect(create_acl_report.rdkafka_response).to eq(0)
800+
expect(create_acl_report.rdkafka_response_string).to eq("")
801+
802+
# Delete ACLs - resource_name nil to delete all ACLs with any resource name and matching all other filters
803+
delete_acl_handle = admin.delete_acl(
804+
resource_type: transactional_id_resource_type,
805+
resource_name: nil,
806+
resource_pattern_type: transactional_id_resource_pattern_type,
807+
principal: transactional_id_principal,
808+
host: transactional_id_host,
809+
operation: transactional_id_operation,
810+
permission_type: transactional_id_permission_type
811+
)
812+
delete_acl_report = delete_acl_handle.wait(max_wait_timeout: 15.0)
813+
expect(delete_acl_handle[:response]).to eq(0)
814+
expect(delete_acl_report.deleted_acls.length).to eq(2)
815+
end
816+
end
817+
end
818+
618819
describe('Group tests') do
619820
describe "#delete_group" do
620821
describe("with an existing group") do

0 commit comments

Comments
 (0)