|
7 | 7 | # See https://aboutcode.org for more information about nexB OSS projects. |
8 | 8 | # |
9 | 9 |
|
| 10 | +from unittest.mock import patch |
| 11 | + |
| 12 | +from django.contrib.auth.models import User |
10 | 13 | from django.db.models import Prefetch |
11 | 14 | from django.urls import reverse |
12 | | -from packageurl import PackageURL |
13 | 15 | from rest_framework import status |
14 | 16 | from rest_framework.test import APIClient |
15 | 17 | from rest_framework.test import APITestCase |
|
19 | 21 | from vulnerabilities.models import Alias |
20 | 22 | from vulnerabilities.models import ApiUser |
21 | 23 | from vulnerabilities.models import Package |
| 24 | +from vulnerabilities.models import PipelineRun |
| 25 | +from vulnerabilities.models import PipelineSchedule |
22 | 26 | from vulnerabilities.models import Vulnerability |
23 | 27 | from vulnerabilities.models import VulnerabilityReference |
24 | 28 | from vulnerabilities.models import Weakness |
@@ -662,3 +666,116 @@ def test_lookup_with_invalid_purl_format(self): |
662 | 666 | self.assertEqual(response.status_code, status.HTTP_200_OK) |
663 | 667 | # No packages or vulnerabilities should be returned |
664 | 668 | self.assertEqual(len(response.data), 0) |
| 669 | + |
| 670 | + |
| 671 | +class PipelineScheduleV2ViewSetTest(APITestCase): |
| 672 | + def setUp(self): |
| 673 | + patcher = patch.object(PipelineSchedule, "create_new_job") |
| 674 | + self.mock_create_new_job = patcher.start() |
| 675 | + self.addCleanup(patcher.stop) |
| 676 | + |
| 677 | + self.mock_create_new_job.return_value = "work-id" |
| 678 | + |
| 679 | + self.schedule1 = PipelineSchedule.objects.create( |
| 680 | + pipeline_id="test_pipeline", |
| 681 | + ) |
| 682 | + self.run1 = PipelineRun.objects.create( |
| 683 | + pipeline=self.schedule1, |
| 684 | + ) |
| 685 | + |
| 686 | + self.admin_user = User.objects.create_superuser( |
| 687 | + username="admin_with_session", |
| 688 | + password="adminpassword", |
| 689 | + |
| 690 | + ) |
| 691 | + |
| 692 | + self.admin_token_only_user = ApiUser.objects.create_api_user( |
| 693 | + username="staff_with_token", |
| 694 | + is_staff=True, |
| 695 | + ) |
| 696 | + self.admin_token_auth = f"Token {self.admin_token_only_user.auth_token.key}" |
| 697 | + |
| 698 | + def test_schedule_list_anon_user_permitted(self): |
| 699 | + response = self.client.get("/api/v2/schedule/") |
| 700 | + self.assertEqual(response.status_code, status.HTTP_200_OK) |
| 701 | + |
| 702 | + def test_schedule_retrieve_anon_user_permitted(self): |
| 703 | + response = self.client.get("/api/v2/schedule/test_pipeline/") |
| 704 | + self.assertEqual(response.status_code, status.HTTP_200_OK) |
| 705 | + |
| 706 | + @patch("vulnerabilities.models.PipelineSchedule.create_new_job") |
| 707 | + def test_create_schedule_anon_user_not_permitted(self, mock_create_new_job): |
| 708 | + mock_create_new_job.return_value = "work-id2" |
| 709 | + |
| 710 | + data = {"pipeline_id": "test_pipeline2"} |
| 711 | + response = self.client.post("/api/v2/schedule/", data, format="json") |
| 712 | + |
| 713 | + self.assertNotEqual(response.status_code, status.HTTP_201_CREATED) |
| 714 | + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) |
| 715 | + self.assertEqual(PipelineSchedule.objects.count(), 1) |
| 716 | + |
| 717 | + @patch("vulnerabilities.models.PipelineSchedule.create_new_job") |
| 718 | + def test_create_schedule_with_staff_token_not_permitted(self, mock_create_new_job): |
| 719 | + self.client = APIClient(enforce_csrf_checks=True) |
| 720 | + self.client.credentials(HTTP_AUTHORIZATION=self.admin_token_auth) |
| 721 | + |
| 722 | + mock_create_new_job.return_value = "work-id3" |
| 723 | + |
| 724 | + data = {"pipeline_id": "test_pipeline3"} |
| 725 | + response = self.client.post("/api/v2/schedule/", data, format="json") |
| 726 | + |
| 727 | + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) |
| 728 | + self.assertNotEqual(response.status_code, status.HTTP_201_CREATED) |
| 729 | + self.assertEqual(PipelineSchedule.objects.count(), 1) |
| 730 | + |
| 731 | + @patch("vulnerabilities.models.PipelineSchedule.create_new_job") |
| 732 | + def test_create_schedule_with_staff_session_permitted(self, mock_create_new_job): |
| 733 | + mock_create_new_job.return_value = "work-id4" |
| 734 | + self.client.login(username="admin_with_session", password="adminpassword") |
| 735 | + |
| 736 | + data = {"pipeline_id": "test_pipeline3"} |
| 737 | + response = self.client.post("/api/v2/schedule/", data, format="json") |
| 738 | + |
| 739 | + self.assertNotEqual(response.status_code, status.HTTP_403_FORBIDDEN) |
| 740 | + self.assertEqual(response.status_code, status.HTTP_201_CREATED) |
| 741 | + self.assertEqual(PipelineSchedule.objects.count(), 2) |
| 742 | + |
| 743 | + @patch("vulnerabilities.models.PipelineSchedule.create_new_job") |
| 744 | + def test_schedule_update_anon_user_not_permitted(self, mock_create_new_job): |
| 745 | + mock_create_new_job.return_value = "work-id5" |
| 746 | + |
| 747 | + data = {"run_interval": 2} |
| 748 | + response = self.client.patch("/api/v2/schedule/test_pipeline/", data, format="json") |
| 749 | + self.schedule1.refresh_from_db() |
| 750 | + |
| 751 | + self.assertNotEqual(response.status_code, status.HTTP_200_OK) |
| 752 | + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) |
| 753 | + self.assertEqual(self.schedule1.run_interval, 1) |
| 754 | + |
| 755 | + @patch("vulnerabilities.models.PipelineSchedule.create_new_job") |
| 756 | + def test_schedule_update_with_staff_token_not_permitted(self, mock_create_new_job): |
| 757 | + self.client = APIClient(enforce_csrf_checks=True) |
| 758 | + self.client.credentials(HTTP_AUTHORIZATION=self.admin_token_auth) |
| 759 | + |
| 760 | + mock_create_new_job.return_value = "work-id6" |
| 761 | + |
| 762 | + data = {"run_interval": 2} |
| 763 | + response = self.client.patch("/api/v2/schedule/test_pipeline/", data, format="json") |
| 764 | + self.schedule1.refresh_from_db() |
| 765 | + |
| 766 | + self.assertNotEqual(response.status_code, status.HTTP_200_OK) |
| 767 | + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) |
| 768 | + self.assertEqual(self.schedule1.run_interval, 1) |
| 769 | + |
| 770 | + @patch("vulnerabilities.models.PipelineSchedule.create_new_job") |
| 771 | + def test_schedule_update_with_staff_session_permitted(self, mock_create_new_job): |
| 772 | + mock_create_new_job.return_value = "work-id7" |
| 773 | + self.client.login(username="admin_with_session", password="adminpassword") |
| 774 | + |
| 775 | + data = {"run_interval": 2} |
| 776 | + response = self.client.patch("/api/v2/schedule/test_pipeline/", data, format="json") |
| 777 | + self.schedule1.refresh_from_db() |
| 778 | + |
| 779 | + self.assertEqual(response.status_code, status.HTTP_200_OK) |
| 780 | + self.assertNotEqual(response.status_code, status.HTTP_403_FORBIDDEN) |
| 781 | + self.assertEqual(self.schedule1.run_interval, 2) |
0 commit comments