Skip to content

Commit fe9bcd1

Browse files
committed
facts: add server.Group based on getent and tests
1 parent 308e0df commit fe9bcd1

File tree

4 files changed

+94
-1
lines changed

4 files changed

+94
-1
lines changed

pyinfra/facts/server.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import shutil
66
from datetime import datetime
77
from tempfile import mkdtemp
8-
from typing import Dict, List, Optional, Union
8+
from typing import Dict, Iterable, List, Optional, Union
99

1010
from dateutil.parser import parse as parse_date
1111
from distro import distro
@@ -357,6 +357,57 @@ def process(self, output):
357357
return sysctls
358358

359359

360+
class GroupInfo(TypedDict):
361+
name: str
362+
password: str
363+
gid: int
364+
user_list: list[str]
365+
366+
367+
def _group_info_from_group_str(info: str) -> GroupInfo:
368+
"""
369+
Parses an entry from /etc/group or a similar source, e.g.
370+
'plugdev:x:46:sysadmin,user2' into a GroupInfo dict object
371+
"""
372+
373+
fields = info.split(":")
374+
375+
if len(fields) != 4:
376+
raise ValueError(f"Error parsing group '{info}', expected exactly 4 fields separated by :")
377+
378+
return {
379+
"name": fields[0],
380+
"password": fields[1],
381+
"gid": int(fields[2]),
382+
"user_list": fields[3].split(","),
383+
}
384+
385+
386+
class Group(FactBase[GroupInfo]):
387+
"""
388+
Returns information on a specific group on the system.
389+
"""
390+
391+
def command(self, group):
392+
# FIXME: the '|| true' ensures 'process' is called, even if
393+
# getent was unable to find information on the group
394+
# There must be a better way to do this !
395+
# e.g. allow facts 'process' method access to the process
396+
# return code ?
397+
return f"getent group {group} || true"
398+
399+
default = None
400+
401+
def process(self, output: Iterable[str]) -> str:
402+
group_string = next(iter(output), None)
403+
404+
if group_string is None:
405+
# This will happen if the group was simply not found
406+
return None
407+
408+
return _group_info_from_group_str(group_string)
409+
410+
360411
class Groups(FactBase[List[str]]):
361412
"""
362413
Returns a list of groups on the system.
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
{
2+
"arg": "plugdev",
3+
"command": "getent group plugdev || true",
4+
"requires_command": "getent",
5+
"output": [
6+
"plugdev:x:46:sysadmin,myuser,abc"
7+
],
8+
"fact": {
9+
"name": "plugdev",
10+
"password": "x",
11+
"gid": 46,
12+
"user_list": ["sysadmin", "myuser", "abc"]
13+
}
14+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{
2+
"arg": "doesnotexist",
3+
"command": "getent group doesnotexist || true",
4+
"requires_command": "getent",
5+
"output": [],
6+
"fact": null
7+
}

tests/test_facts_utils.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import pytest
2+
3+
from pyinfra.facts.server import _group_info_from_group_str
4+
5+
6+
def test__group_info_from_group_str() -> None:
7+
test_str = "plugdev:x:46:sysadmin,user2"
8+
group_info = _group_info_from_group_str(test_str)
9+
10+
assert group_info["name"] == "plugdev"
11+
assert group_info["password"] == "x"
12+
assert group_info["gid"] == 46
13+
assert group_info["user_list"] == ["sysadmin", "user2"]
14+
15+
16+
def test__group_info_from_group_str_empty() -> None:
17+
with pytest.raises(ValueError):
18+
_group_info_from_group_str("")
19+
20+
with pytest.raises(ValueError):
21+
_group_info_from_group_str("a:b:")

0 commit comments

Comments
 (0)