From 3b0bd7fb911c1c3a27dbfda65f69d1774ae057ab Mon Sep 17 00:00:00 2001 From: Dan Lavu Date: Thu, 15 Jan 2026 12:31:06 -0500 Subject: [PATCH] removing netgroup intg test Reviewed-by: Scott Poore (cherry picked from commit ab7a7f438bebdce156a68890a3367250514f4ae6) --- src/tests/intg/Makefile.am | 1 - src/tests/intg/test_netgroup.py | 539 -------------------------------- 2 files changed, 540 deletions(-) delete mode 100644 src/tests/intg/test_netgroup.py diff --git a/src/tests/intg/Makefile.am b/src/tests/intg/Makefile.am index 0ed438e001b..021e15bd9a8 100644 --- a/src/tests/intg/Makefile.am +++ b/src/tests/intg/Makefile.am @@ -15,7 +15,6 @@ dist_noinst_DATA = \ ldap_ent.py \ util.py \ test_memory_cache.py \ - test_netgroup.py \ files_ops.py \ kdc.py \ krb5utils.py \ diff --git a/src/tests/intg/test_netgroup.py b/src/tests/intg/test_netgroup.py deleted file mode 100644 index 1cea42f746a..00000000000 --- a/src/tests/intg/test_netgroup.py +++ /dev/null @@ -1,539 +0,0 @@ -# -# Netgroup integration test -# -# Copyright (c) 2016 Red Hat, Inc. -# Author: Petr Cech -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -# - -import os -import stat -import signal -import subprocess -import time -import ldap -import ldap.modlist -import pytest - -import config -import ds_openldap -import ldap_ent -from util import unindent -from sssd_nss import NssReturnCode -from sssd_netgroup import get_sssd_netgroups - -LDAP_BASE_DN = "dc=example,dc=com" - - -@pytest.fixture(scope="module") -def ds_inst(request): - """LDAP server instance fixture""" - ds_inst = ds_openldap.DSOpenLDAP( - config.PREFIX, 10389, LDAP_BASE_DN, - "cn=admin", "Secret123" - ) - - try: - ds_inst.setup() - except Exception: - ds_inst.teardown() - raise - request.addfinalizer(ds_inst.teardown) - return ds_inst - - -@pytest.fixture(scope="module") -def ldap_conn(request, ds_inst): - """LDAP server connection fixture""" - ldap_conn = ds_inst.bind() - ldap_conn.ds_inst = ds_inst - request.addfinalizer(ldap_conn.unbind_s) - return ldap_conn - - -def create_ldap_entries(ldap_conn, ent_list=None): - """Add LDAP entries from ent_list""" - if ent_list is not None: - for entry in ent_list: - ldap_conn.add_s(entry[0], entry[1]) - - -def cleanup_ldap_entries(ldap_conn, ent_list=None): - """Remove LDAP entries added by create_ldap_entries""" - if ent_list is None: - for ou in ("Users", "Groups", "Netgroups", "Services", "Policies"): - for entry in ldap_conn.search_s(f"ou={ou}," - f"{ldap_conn.ds_inst.base_dn}", - ldap.SCOPE_ONELEVEL, - attrlist=[]): - ldap_conn.delete_s(entry[0]) - else: - for entry in ent_list: - ldap_conn.delete_s(entry[0]) - - -def create_ldap_cleanup(request, ldap_conn, ent_list=None): - """Add teardown for removing all user/group LDAP entries""" - request.addfinalizer(lambda: cleanup_ldap_entries(ldap_conn, ent_list)) - - -def create_ldap_fixture(request, ldap_conn, ent_list=None): - """Add LDAP entries and add teardown for removing them""" - create_ldap_entries(ldap_conn, ent_list) - create_ldap_cleanup(request, ldap_conn, ent_list) - - -SCHEMA_RFC2307_BIS = "rfc2307bis" - - -def format_basic_conf(ldap_conn, schema): - """Format a basic SSSD configuration""" - schema_conf = "ldap_schema = " + schema + "\n" - schema_conf += "ldap_group_object_class = groupOfNames\n" - return unindent("""\ - [sssd] - domains = LDAP - services = nss - disable_netlink = true - - [nss] - - [domain/LDAP] - {schema_conf} - id_provider = ldap - auth_provider = ldap - ldap_uri = {ldap_conn.ds_inst.ldap_url} - ldap_search_base = {ldap_conn.ds_inst.base_dn} - ldap_netgroup_search_base = ou=Netgroups,{ldap_conn.ds_inst.base_dn} - ldap_id_use_start_tls = false - """).format(**locals()) - - -def create_conf_file(contents): - """Create sssd.conf with specified contents""" - conf = open(config.CONF_PATH, "w") - conf.write(contents) - conf.close() - os.chmod(config.CONF_PATH, stat.S_IRUSR | stat.S_IWUSR) - - -def cleanup_conf_file(): - """Remove sssd.conf, if it exists""" - if os.path.lexists(config.CONF_PATH): - os.unlink(config.CONF_PATH) - - -def create_conf_cleanup(request): - """Add teardown for removing sssd.conf""" - request.addfinalizer(cleanup_conf_file) - - -def create_conf_fixture(request, contents): - """ - Create sssd.conf with specified contents and add teardown for removing it - """ - create_conf_file(contents) - create_conf_cleanup(request) - - -def create_sssd_process(): - """Start the SSSD process""" - if subprocess.call(["sssd", "-D", "--logger=files"]) != 0: - raise Exception("sssd start failed") - - -def get_sssd_pid(): - pid_file = open(config.PIDFILE_PATH, "r") - pid = int(pid_file.read()) - return pid - - -def cleanup_sssd_process(): - """Stop the SSSD process and remove its state""" - try: - pid = get_sssd_pid() - os.kill(pid, signal.SIGTERM) - while True: - try: - os.kill(pid, signal.SIGCONT) - except OSError: - break - time.sleep(1) - except OSError: - pass - for path in os.listdir(config.DB_PATH): - os.unlink(config.DB_PATH + "/" + path) - for path in os.listdir(config.MCACHE_PATH): - os.unlink(config.MCACHE_PATH + "/" + path) - - -def create_sssd_cleanup(request): - """Add teardown for stopping SSSD and removing its state""" - request.addfinalizer(cleanup_sssd_process) - - -def simulate_offline(): - pid = get_sssd_pid() - os.kill(pid, signal.SIGUSR1) - - -def create_sssd_fixture(request): - """Start SSSD and add teardown for stopping it and removing its state""" - create_sssd_process() - create_sssd_cleanup(request) - - -@pytest.fixture -def add_empty_netgroup(request, ldap_conn): - ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) - - ent_list.add_netgroup("empty_netgroup") - - create_ldap_fixture(request, ldap_conn, ent_list) - conf = format_basic_conf(ldap_conn, SCHEMA_RFC2307_BIS) - create_conf_fixture(request, conf) - create_sssd_fixture(request) - return None - - -def test_add_empty_netgroup(add_empty_netgroup): - """ - Adding empty netgroup. - """ - - res, _, netgroups = get_sssd_netgroups("empty_netgroup") - assert res == NssReturnCode.SUCCESS - assert netgroups == [] - - -@pytest.fixture -def add_tripled_netgroup(request, ldap_conn): - ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) - - ent_list.add_netgroup("tripled_netgroup", ["(host,user,domain)"]) - - ent_list.add_netgroup("adv_tripled_netgroup", ["(host1,user1,domain1)", - "(host2,user2,domain2)"]) - - ent_list.add_netgroup("tripled_netgroup_no_domain", ["(host,user,)"]) - - ent_list.add_netgroup("tripled_netgroup_no_user", ["(host,,domain)"]) - - ent_list.add_netgroup("tripled_netgroup_no_host", ["(,user,domain)"]) - - ent_list.add_netgroup("tripled_netgroup_none", ["(,,)"]) - - create_ldap_fixture(request, ldap_conn, ent_list) - conf = format_basic_conf(ldap_conn, SCHEMA_RFC2307_BIS) - create_conf_fixture(request, conf) - create_sssd_fixture(request) - return None - - -def test_add_tripled_netgroup(add_tripled_netgroup): - """ - Adding netgroup with triplet. - """ - - res, _, netgrps = get_sssd_netgroups("tripled_netgroup") - assert res == NssReturnCode.SUCCESS - assert netgrps == [("host", "user", "domain")] - - res, _, netgrps = get_sssd_netgroups("adv_tripled_netgroup") - assert res == NssReturnCode.SUCCESS - assert sorted(netgrps) == sorted([("host1", "user1", "domain1"), - ("host2", "user2", "domain2")]) - - res, _, netgrps = get_sssd_netgroups("tripled_netgroup_no_domain") - assert res == NssReturnCode.SUCCESS - assert netgrps == [("host", "user", "")] - - res, _, netgrps = get_sssd_netgroups("tripled_netgroup_no_user") - assert res == NssReturnCode.SUCCESS - assert netgrps == [("host", "", "domain")] - - res, _, netgrps = get_sssd_netgroups("tripled_netgroup_no_host") - assert res == NssReturnCode.SUCCESS - assert netgrps == [("", "user", "domain")] - - res, _, netgrps = get_sssd_netgroups("tripled_netgroup_none") - assert res == NssReturnCode.SUCCESS - assert netgrps == [("", "", "")] - - -@pytest.fixture -def add_mixed_netgroup(request, ldap_conn): - ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) - - ent_list.add_netgroup("mixed_netgroup1") - ent_list.add_netgroup("mixed_netgroup2", members=["mixed_netgroup1"]) - - ent_list.add_netgroup("mixed_netgroup3", ["(host1,user1,domain1)"]) - ent_list.add_netgroup("mixed_netgroup4", - ["(host2,user2,domain2)", "(host3,user3,domain3)"]) - - ent_list.add_netgroup("mixed_netgroup5", - ["(host4,user4,domain4)"], - ["mixed_netgroup1"]) - ent_list.add_netgroup("mixed_netgroup6", - ["(host5,user5,domain5)"], - ["mixed_netgroup2"]) - - ent_list.add_netgroup("mixed_netgroup7", members=["mixed_netgroup3"]) - ent_list.add_netgroup("mixed_netgroup8", - members=["mixed_netgroup3", "mixed_netgroup4"]) - - ent_list.add_netgroup("mixed_netgroup9", - ["(host6,user6,domain6)"], - ["mixed_netgroup3", "mixed_netgroup4"]) - - create_ldap_fixture(request, ldap_conn, ent_list) - conf = format_basic_conf(ldap_conn, SCHEMA_RFC2307_BIS) - create_conf_fixture(request, conf) - create_sssd_fixture(request) - return None - - -def test_add_mixed_netgroup(add_mixed_netgroup): - """ - Adding many netgroups of different type. - """ - - res, _, netgroups = get_sssd_netgroups("mixed_netgroup1") - assert res == NssReturnCode.SUCCESS - assert netgroups == [] - - res, _, netgroups = get_sssd_netgroups("mixed_netgroup2") - assert res == NssReturnCode.SUCCESS - assert netgroups == [] - - res, _, netgroups = get_sssd_netgroups("mixed_netgroup3") - assert res == NssReturnCode.SUCCESS - assert netgroups == [("host1", "user1", "domain1")] - - res, _, netgroups = get_sssd_netgroups("mixed_netgroup4") - assert res == NssReturnCode.SUCCESS - assert sorted(netgroups) == sorted([("host2", "user2", "domain2"), - ("host3", "user3", "domain3")]) - - res, _, netgroups = get_sssd_netgroups("mixed_netgroup5") - assert res == NssReturnCode.SUCCESS - assert netgroups == [("host4", "user4", "domain4")] - - res, _, netgroups = get_sssd_netgroups("mixed_netgroup6") - assert res == NssReturnCode.SUCCESS - assert netgroups == [("host5", "user5", "domain5")] - - res, _, netgroups = get_sssd_netgroups("mixed_netgroup7") - assert res == NssReturnCode.SUCCESS - assert netgroups == [("host1", "user1", "domain1")] - - res, _, netgroups = get_sssd_netgroups("mixed_netgroup8") - assert res == NssReturnCode.SUCCESS - assert sorted(netgroups) == sorted([("host1", "user1", "domain1"), - ("host2", "user2", "domain2"), - ("host3", "user3", "domain3")]) - - res, _, netgroups = get_sssd_netgroups("mixed_netgroup9") - assert res == NssReturnCode.SUCCESS - assert sorted(netgroups) == sorted([("host1", "user1", "domain1"), - ("host2", "user2", "domain2"), - ("host3", "user3", "domain3"), - ("host6", "user6", "domain6")]) - - -@pytest.fixture -def remove_step_by_step(request, ldap_conn): - ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) - - ent_list.add_netgroup("rm_empty_netgroup1", ["(host1,user1,domain1)"]) - ent_list.add_netgroup("rm_empty_netgroup2", - ["(host2,user2,domain2)"], - ["rm_empty_netgroup1"]) - - create_ldap_fixture(request, ldap_conn, ent_list) - conf = format_basic_conf(ldap_conn, SCHEMA_RFC2307_BIS) - create_conf_fixture(request, conf) - create_sssd_fixture(request) - return ent_list - - -def test_remove_step_by_step(remove_step_by_step, ldap_conn): - """ - Removing netgroups step by step. - """ - - ent_list = remove_step_by_step - - res, _, netgroups = get_sssd_netgroups("rm_empty_netgroup1") - assert res == NssReturnCode.SUCCESS - assert netgroups == [('host1', 'user1', 'domain1')] - - res, _, netgroups = get_sssd_netgroups("rm_empty_netgroup2") - assert res == NssReturnCode.SUCCESS - assert sorted(netgroups) == sorted([('host1', 'user1', 'domain1'), - ('host2', 'user2', 'domain2')]) - - # removing of rm_empty_netgroup1 - ldap_conn.delete_s(ent_list[0][0]) - ent_list.remove(ent_list[0]) - - if subprocess.call(["sss_cache", "-N"]) != 0: - raise Exception("sssd_cache failed") - - res, _, netgroups = get_sssd_netgroups("rm_empty_netgroup1") - assert res == NssReturnCode.NOTFOUND - assert netgroups == [] - - res, _, netgroups = get_sssd_netgroups("rm_empty_netgroup2") - assert res == NssReturnCode.SUCCESS - assert netgroups == [('host2', 'user2', 'domain2')] - - # removing of rm_empty_netgroup2 - ldap_conn.delete_s(ent_list[0][0]) - ent_list.remove(ent_list[0]) - - if subprocess.call(["sss_cache", "-N"]) != 0: - raise Exception("sssd_cache failed") - - res, _, netgroups = get_sssd_netgroups("rm_empty_netgroup1") - assert res == NssReturnCode.NOTFOUND - assert netgroups == [] - - res, _, netgroups = get_sssd_netgroups("rm_empty_netgroup2") - assert res == NssReturnCode.NOTFOUND - assert netgroups == [] - - -@pytest.fixture -def removing_nested_netgroups(request, ldap_conn): - ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) - - ent_list.add_netgroup("t2841_netgroup1", ["(host1,user1,domain1)"]) - ent_list.add_netgroup("t2841_netgroup2", ["(host2,user2,domain2)"]) - ent_list.add_netgroup("t2841_netgroup3", - members=["t2841_netgroup1", "t2841_netgroup2"]) - - create_ldap_fixture(request, ldap_conn, ent_list) - conf = format_basic_conf(ldap_conn, SCHEMA_RFC2307_BIS) - create_conf_fixture(request, conf) - create_sssd_fixture(request) - return None - - -def test_removing_nested_netgroups(removing_nested_netgroups, ldap_conn): - """ - Regression test for ticket 2841. - https://fedorahosted.org/sssd/ticket/2841 - """ - - netgrp_dn = 'cn=t2841_netgroup3,ou=Netgroups,' + ldap_conn.ds_inst.base_dn - - res, _, netgroups = get_sssd_netgroups("t2841_netgroup1") - assert res == NssReturnCode.SUCCESS - assert netgroups == [('host1', 'user1', 'domain1')] - - res, _, netgroups = get_sssd_netgroups("t2841_netgroup2") - assert res == NssReturnCode.SUCCESS - assert netgroups == [('host2', 'user2', 'domain2')] - - res, _, netgroups = get_sssd_netgroups("t2841_netgroup3") - assert res == NssReturnCode.SUCCESS - assert sorted(netgroups) == sorted([('host1', 'user1', 'domain1'), - ('host2', 'user2', 'domain2')]) - - # removing of t2841_netgroup1 from t2841_netgroup3 - old = {'memberNisNetgroup': [b"t2841_netgroup1", b"t2841_netgroup2"]} - new = {'memberNisNetgroup': [b"t2841_netgroup2"]} - - ldif = ldap.modlist.modifyModlist(old, new) - ldap_conn.modify_s(netgrp_dn, ldif) - - if subprocess.call(["sss_cache", "-N"]) != 0: - raise Exception("sssd_cache failed") - - res, _, netgroups = get_sssd_netgroups("t2841_netgroup1") - assert res == NssReturnCode.SUCCESS - assert netgroups == [('host1', 'user1', 'domain1')] - - res, _, netgroups = get_sssd_netgroups("t2841_netgroup2") - assert res == NssReturnCode.SUCCESS - assert netgroups == [('host2', 'user2', 'domain2')] - - res, _, netgroups = get_sssd_netgroups("t2841_netgroup3") - assert res == NssReturnCode.SUCCESS - assert netgroups == [('host2', 'user2', 'domain2')] - - # removing of t2841_netgroup2 from t2841_netgroup3 - old = {'memberNisNetgroup': [b"t2841_netgroup2"]} - new = {'memberNisNetgroup': []} - - ldif = ldap.modlist.modifyModlist(old, new) - ldap_conn.modify_s(netgrp_dn, ldif) - - if subprocess.call(["sss_cache", "-N"]) != 0: - raise Exception("sssd_cache failed") - - res, _, netgroups = get_sssd_netgroups("t2841_netgroup1") - assert res == NssReturnCode.SUCCESS - assert netgroups == [('host1', 'user1', 'domain1')] - - res, _, netgroups = get_sssd_netgroups("t2841_netgroup2") - assert res == NssReturnCode.SUCCESS - assert netgroups == [('host2', 'user2', 'domain2')] - - res, _, netgroups = get_sssd_netgroups("t2841_netgroup3") - assert res == NssReturnCode.SUCCESS - assert netgroups == [] - - -def test_offline_netgroups(add_tripled_netgroup): - res, _, netgrps = get_sssd_netgroups("tripled_netgroup") - assert res == NssReturnCode.SUCCESS - assert netgrps == [("host", "user", "domain")] - - subprocess.check_call(["sss_cache", "-N"]) - - simulate_offline() - - res, _, netgrps = get_sssd_netgroups("tripled_netgroup") - assert res == NssReturnCode.SUCCESS - assert netgrps == [("host", "user", "domain")] - - -@pytest.fixture -def add_thread_test_netgroup(request, ldap_conn): - ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) - - triple_list = [] - for i in range(1, 999): - triple_list.append("(host1,user" + str(i) + ",domain1)") - ent_list.add_netgroup("ng1", triple_list) - - triple_list = [] - for i in range(1, 999): - triple_list.append("(host2,user" + str(i) + ",domain2)") - ent_list.add_netgroup("ng2", triple_list) - - create_ldap_fixture(request, ldap_conn, ent_list) - conf = format_basic_conf(ldap_conn, SCHEMA_RFC2307_BIS) - create_conf_fixture(request, conf) - create_sssd_fixture(request) - return None - - -def test_innetgr_with_threads(add_thread_test_netgroup): - - subprocess.check_call(["sss_netgroup_thread_test"])