Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions test/integration/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from qiskit.transpiler.preset_passmanagers import generate_preset_pass_manager

from qiskit_ibm_runtime.exceptions import (
IBMRuntimeError,
RuntimeInvalidStateError,
RuntimeJobNotFound,
)
Expand Down Expand Up @@ -90,29 +91,35 @@ def test_cancel_job_done(self, service):
@run_integration_test
def test_delete_job(self, service):
"""Test deleting a job."""
self.skipTest("Deleting jobs not supported on IBM Quantum Platform.")
sub_tests = ["DONE"]
for status in sub_tests:
with self.subTest(status=status):
job = self._run_program(service)
wait_for_status(job, status)
service.delete_job(job.job_id())
try:
service.delete_job(job.job_id())
except IBMRuntimeError as ex:
if "403 Client Error" in ex.message or "401 Client Error" in ex.message:
self.skipTest("Credentials do not have delete job privileges")
with self.assertRaises(RuntimeJobNotFound):
service.job(job.job_id())

@run_integration_test
@production_only
def test_delete_job_queued(self, service):
"""Test deleting a queued job."""
self.skipTest("Deleting jobs not supported on IBM Quantum Platform.")
real_device_name = get_real_device(service)
real_device = service.backend(real_device_name)
pm = generate_preset_pass_manager(optimization_level=1, target=real_device.target)
isa_circuit = pm.run([bell()])
_ = self._run_program(service, circuits=isa_circuit, backend=real_device_name)
job = self._run_program(service, circuits=isa_circuit, backend=real_device_name)
wait_for_status(job, "QUEUED")
service.delete_job(job.job_id())
try:
service.delete_job(job.job_id())
except IBMRuntimeError as ex:
if "403 Client Error" in ex.message or "401 Client Error" in ex.message:
self.skipTest("Credentials do not have delete job privileges")
with self.assertRaises(RuntimeJobNotFound):
service.job(job.job_id())

Expand Down