11import io
2- import os
2+ from typing import List
33
44import pytest
55from nisystemlink .clients .artifact import ArtifactClient
6+ from nisystemlink .clients .artifact .models ._upload_artifact_response import (
7+ UploadArtifactResponse ,
8+ )
69from nisystemlink .clients .core ._http_configuration import HttpConfiguration
710
811
@@ -12,35 +15,62 @@ def client(enterprise_config: HttpConfiguration) -> ArtifactClient:
1215 return ArtifactClient (enterprise_config )
1316
1417
18+ @pytest .fixture
19+ def create_artifact (client : ArtifactClient ):
20+ """Fixture to return a factory that creates artifact."""
21+ created_artifact_ids : List [str ] = []
22+
23+ def _create_artifact (
24+ content : bytes = b"test content" ,
25+ cleanup : bool = True ,
26+ workspace : str = "2300760d-38c4-48a1-9acb-800260812337" ,
27+ ):
28+ # Used the main-test default workspace since the client for creating a workspace has not been added yet
29+ artifact_stream = io .BytesIO (content )
30+ response = client .upload_artifact (workspace = workspace , artifact = artifact_stream )
31+ if cleanup :
32+ created_artifact_ids .append (response .id )
33+
34+ return response
35+
36+ yield _create_artifact
37+
38+ for artifact_id in created_artifact_ids :
39+ client .delete_artifact (artifact_id )
40+
41+
1542@pytest .mark .integration
1643@pytest .mark .enterprise
1744class TestArtifact :
1845
19- def test__upload_artifact__artifact_uploaded (self , client : ArtifactClient ):
20- workspace = os .getenv ("SYSTEMLINK_WORKSPACE_ID" )
46+ def test__upload_artifact__artifact_uploaded (
47+ self , client : ArtifactClient , create_artifact
48+ ):
49+ upload_response : UploadArtifactResponse = create_artifact ()
2150
22- if workspace is not None :
23- artifact_stream = io . BytesIO ( b"test content" )
51+ assert upload_response is not None
52+ assert upload_response . id is not None
2453
25- response = client .upload_artifact (
26- workspace = workspace , artifact = artifact_stream
27- )
54+ def test__download_artifact__artifact_downloaded (
55+ self , client : ArtifactClient , create_artifact
56+ ):
57+ artifact_content = b"test content"
2858
29- assert response is not None
30- assert response .id is not None
59+ upload_response : UploadArtifactResponse = create_artifact (
60+ content = artifact_content
61+ )
62+ artifact_id = upload_response .id
63+ download_response = client .download_artifact (artifact_id )
3164
32- def test__download_artifact__artifact_downloaded ( self , client : ArtifactClient ):
33- workspace = os . getenv ( "SYSTEMLINK_WORKSPACE_ID" )
65+ assert download_response is not None
66+ assert download_response . read () == artifact_content
3467
35- if workspace is not None :
36- artifact_content = b"test content"
37- artifact_stream = io .BytesIO (artifact_content )
68+ def test__delete_artifact__artifact_deleted (
69+ self , client : ArtifactClient , create_artifact
70+ ):
71+ upload_response : UploadArtifactResponse = create_artifact (cleanup = False )
72+ artifact_id = upload_response .id
3873
39- upload_response = client .upload_artifact (
40- workspace = workspace , artifact = artifact_stream
41- )
42- artifact_id = upload_response .id
43- download_response = client .download_artifact (artifact_id )
74+ delete_response = client .delete_artifact (artifact_id )
4475
45- assert download_response is not None
46- assert download_response .read () == artifact_content
76+ assert delete_response is None
0 commit comments