Skip to content

Commit 9642af5

Browse files
amascia-ggagateau-gg
authored andcommitted
feat: added iac models and modified GGClient
1 parent 396b568 commit 9642af5

File tree

5 files changed

+278
-1
lines changed

5 files changed

+278
-1
lines changed

Pipfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ verify_ssl = true
77
marshmallow = ">=3.5"
88
pygitguardian = { editable = true, path = "." }
99
requests = ">=2"
10+
marshmallow-dataclass = ">=8.5.8,<8.6.0"
1011

1112
[dev-packages]
1213
black = "==22.3.0"

pygitguardian/client.py

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
1+
import os
12
import platform
3+
import tarfile
24
import urllib.parse
5+
from io import BytesIO
6+
from pathlib import Path
37
from typing import Any, Dict, List, Optional, Union, cast
48

9+
import click
10+
import requests
511
from requests import Response, Session, codes
612

713
from .config import (
@@ -10,6 +16,12 @@
1016
DEFAULT_TIMEOUT,
1117
MULTI_DOCUMENT_LIMIT,
1218
)
19+
from .iac_models import (
20+
IaCScanParameters,
21+
IaCScanParametersSchema,
22+
IaCScanResult,
23+
IaCScanResultSchema,
24+
)
1325
from .models import (
1426
Detail,
1527
Document,
@@ -20,6 +32,10 @@
2032
)
2133

2234

35+
# max files size to create a tar from
36+
MAX_TAR_CONTENT_SIZE = 30 * 1024 * 1024
37+
38+
2339
class Versions:
2440
app_version: Optional[str] = None
2541
secrets_engine_version: Optional[str] = None
@@ -57,6 +73,27 @@ def is_ok(resp: Response) -> bool:
5773
)
5874

5975

76+
def _create_tar(root_path: Path, filenames: List[str]) -> bytes:
77+
"""
78+
:param root_path: the root_path from which the tar is created
79+
:param files: the files which need to be added to the tar, filenames should be the paths relative to the root_path
80+
:return: a bytes object containing the tar.gz created from the files, with paths relative to root_path
81+
"""
82+
tar_stream = BytesIO()
83+
current_dir_size = 0
84+
with tarfile.open(fileobj=tar_stream, mode="w:gz") as tar:
85+
for filename in filenames:
86+
full_path = root_path / filename
87+
current_dir_size += os.path.getsize(full_path)
88+
if current_dir_size > MAX_TAR_CONTENT_SIZE:
89+
raise click.ClickException(
90+
f"The total size of the files processed exceeds {MAX_TAR_CONTENT_SIZE / (1024 * 1024):.0f}MB, "
91+
f"please try again with less files"
92+
)
93+
tar.add(full_path, filename)
94+
return tar_stream.getvalue()
95+
96+
6097
class GGClient:
6198
_version = "undefined"
6299
session: Session
@@ -197,7 +234,7 @@ def get(
197234
def post(
198235
self,
199236
endpoint: str,
200-
data: Optional[str] = None,
237+
data: Optional[Dict[str, str]] = None,
201238
version: str = DEFAULT_API_VERSION,
202239
extra_headers: Optional[Dict[str, str]] = None,
203240
**kwargs: Any,
@@ -347,3 +384,38 @@ def quota_overview(
347384
obj.status_code = resp.status_code
348385

349386
return obj
387+
388+
# For IaC Scans
389+
def iac_directory_scan(
390+
self,
391+
directory: Path,
392+
filenames: List[str],
393+
scan_parameters: IaCScanParameters,
394+
extra_headers: Optional[Dict[str, str]] = None,
395+
) -> Union[Detail, IaCScanResult]:
396+
397+
tar = _create_tar(directory, filenames)
398+
result: Union[Detail, IaCScanResult]
399+
try:
400+
resp = self.post(
401+
endpoint="iac_scan",
402+
extra_headers=extra_headers,
403+
files={
404+
"directory": tar,
405+
},
406+
data={
407+
"scan_parameters": IaCScanParametersSchema().dumps(scan_parameters),
408+
},
409+
)
410+
except requests.exceptions.ReadTimeout:
411+
result = Detail("The request timed out.")
412+
result.status_code = 504
413+
else:
414+
if is_ok(resp):
415+
result = IaCScanResultSchema().load(resp.json())
416+
else:
417+
result = load_detail(resp)
418+
419+
result.status_code = resp.status_code
420+
421+
return result

pygitguardian/iac_models.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
from dataclasses import dataclass, field
2+
from typing import List, Optional
3+
4+
import marshmallow_dataclass
5+
6+
from pygitguardian.models import Base, BaseSchema
7+
8+
9+
@dataclass
10+
class IaCVulnerability(Base):
11+
policy: str
12+
policy_id: str
13+
line_end: int
14+
line_start: int
15+
description: str
16+
documentation_url: str
17+
component: str = ""
18+
severity: str = ""
19+
20+
21+
IaCVulnerabilitySchema = marshmallow_dataclass.class_schema(
22+
IaCVulnerability, BaseSchema
23+
)
24+
25+
26+
@dataclass
27+
class IaCFileResult(Base):
28+
filename: str
29+
incidents: List[IaCVulnerability]
30+
31+
32+
IaCFileResultSchema = marshmallow_dataclass.class_schema(IaCFileResult, BaseSchema)
33+
34+
35+
@dataclass
36+
class IaCScanParameters(Base):
37+
ignored_policies: List[str] = field(default_factory=list)
38+
minimum_severity: Optional[str] = None
39+
40+
41+
IaCScanParametersSchema = marshmallow_dataclass.class_schema(
42+
IaCScanParameters, BaseSchema
43+
)
44+
45+
46+
@dataclass
47+
class IaCScanResult(Base):
48+
id: str = ""
49+
type: str = ""
50+
iac_engine_version: str = ""
51+
entities_with_incidents: List[IaCFileResult] = field(default_factory=list)
52+
53+
54+
IaCScanResultSchema = marshmallow_dataclass.class_schema(IaCScanResult, BaseSchema)

tests/test_iac_models.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import pytest
2+
3+
from pygitguardian.iac_models import (
4+
IaCFileResult,
5+
IaCFileResultSchema,
6+
IaCScanParameters,
7+
IaCScanParametersSchema,
8+
IaCScanResult,
9+
IaCScanResultSchema,
10+
IaCVulnerability,
11+
IaCVulnerabilitySchema,
12+
)
13+
14+
15+
class TestModel:
16+
@pytest.mark.parametrize(
17+
"schema_klass, expected_klass, instance_data",
18+
[
19+
(
20+
IaCScanResultSchema,
21+
IaCScanResult,
22+
{
23+
"id": "myid",
24+
"type": "type",
25+
"iac_engine_version": "version",
26+
"entities_with_incidents": [
27+
{
28+
"filename": "filename",
29+
"incidents": [
30+
{
31+
"policy": "mypolicy,",
32+
"policy_id": "mypolicyid",
33+
"line_end": 0,
34+
"line_start": 0,
35+
"description": "mydescription",
36+
"documentation_url": "mydoc",
37+
"component": "mycomponent",
38+
"severity": "myseverity",
39+
"some_extra_field": "extra",
40+
}
41+
],
42+
}
43+
],
44+
},
45+
),
46+
(
47+
IaCScanParametersSchema,
48+
IaCScanParameters,
49+
{"ignored_policies": ["pol1", "pol2"], "minimum_severity": "LOW"},
50+
),
51+
(
52+
IaCVulnerabilitySchema,
53+
IaCVulnerability,
54+
{
55+
"policy": "mypolicy,",
56+
"policy_id": "mypolicyid",
57+
"line_end": 0,
58+
"line_start": 0,
59+
"description": "mydescription",
60+
"documentation_url": "mydoc",
61+
"component": "mycomponent",
62+
"severity": "myseverity",
63+
"some_extra_field": "extra",
64+
},
65+
),
66+
(
67+
IaCFileResultSchema,
68+
IaCFileResult,
69+
{
70+
"filename": "filename",
71+
"incidents": [
72+
{
73+
"policy": "mypolicy,",
74+
"policy_id": "mypolicyid",
75+
"line_end": 0,
76+
"line_start": 0,
77+
"description": "mydescription",
78+
"documentation_url": "mydoc",
79+
"component": "mycomponent",
80+
"severity": "myseverity",
81+
"some_extra_field": "extra",
82+
}
83+
],
84+
},
85+
),
86+
],
87+
)
88+
def test_schema_loads(self, schema_klass, expected_klass, instance_data):
89+
"""
90+
GIVEN the right kwargs and an extra field in dict format
91+
WHEN loading using the schema
92+
THEN the extra field is not taken into account
93+
AND the result should be an instance of the expected class
94+
"""
95+
schema = schema_klass()
96+
97+
data = {**instance_data, "field": "extra"}
98+
99+
obj = schema.load(data)
100+
assert isinstance(obj, expected_klass)

tests/test_utils.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import tarfile
2+
from io import BytesIO
3+
from unittest import mock
4+
5+
import click
6+
import pytest
7+
8+
from pygitguardian.client import _create_tar
9+
10+
11+
def test_create_tar(tmp_path):
12+
"""
13+
GIVEN a list of filenames, representing paths relative to the tmp directory
14+
WHEN the _create_tar method is called
15+
THEN a bytes object is outputted, representing a tar of the files, with paths relative to the tmp directory
16+
"""
17+
file1_name = "file1.txt"
18+
dir_path = "my_test_dir"
19+
file2_name = f"{dir_path}/file2.txt"
20+
file1_content = "My first document"
21+
file2_content = "My second document"
22+
23+
(tmp_path / file1_name).write_text(file1_content)
24+
(tmp_path / dir_path).mkdir(parents=True, exist_ok=True)
25+
(tmp_path / file2_name).write_text(file2_content)
26+
27+
tar_stream = _create_tar(tmp_path, [file1_name, file2_name])
28+
29+
# Create tar archive from bytes stream and write it in the tmp_path/output directory
30+
(tmp_path / "output").mkdir(exist_ok=True)
31+
with tarfile.open(fileobj=BytesIO(tar_stream), mode="r:gz") as tar:
32+
tar.extractall(tmp_path / "output")
33+
34+
assert file1_content == (tmp_path / f"output/{file1_name}").read_text()
35+
assert file2_content == (tmp_path / f"output/{file2_name}").read_text()
36+
37+
38+
def test_create_tar_cannot_exceed_max_tar_content_size(tmp_path):
39+
with mock.patch("os.path.getsize", return_value=16 * 1024 * 1024):
40+
file1_name = "file1.txt"
41+
file2_name = "file2.txt"
42+
43+
(tmp_path / file1_name).write_text("")
44+
(tmp_path / file2_name).write_text("")
45+
46+
with pytest.raises(
47+
click.ClickException,
48+
match=r"The total size of the files processed exceeds \d+MB, please try again with less files",
49+
):
50+
_create_tar(tmp_path, [file1_name, file2_name])

0 commit comments

Comments
 (0)