diff --git a/tests/integration-tests/images_factory.py b/tests/integration-tests/images_factory.py index 0e06dde949..a9a89ecb92 100644 --- a/tests/integration-tests/images_factory.py +++ b/tests/integration-tests/images_factory.py @@ -11,11 +11,43 @@ # See the License for the specific language governing permissions and limitations under the License. import json import logging +import subprocess +import time import yaml from framework.credential_providers import run_pcluster_command from utils import kebab_case +# Constants for transient error retry +MAX_TRANSIENT_RETRIES = 3 +TRANSIENT_RETRY_DELAY_SECONDS = 30 + + +def _is_transient_pcluster_error(stdout: str = None, stderr: str = None) -> bool: + """ + Check if the error is a transient pcluster CLI error. + + These errors are typically caused by package metadata issues, module loading + problems, or pyenv environment issues in the CI environment. + """ + combined_output = f"{stdout or ''} {stderr or ''}".lower() + if not combined_output.strip(): + return False + + # Check for specific transient patterns: + # 1. Error message is just the package name (package loading issue) + # 2. Infrastructure creation failed with only package name as the exception + # 3. Module not found errors (Python environment issues) + # 4. pyenv command not found (virtual environment activation issues) + transient_patterns = [ + '"message": "aws-parallelcluster"', + 'creation failed.\naws-parallelcluster', + 'creation failed.\\naws-parallelcluster', + "modulenotfounderror: no module named 'pcluster", + "pyenv: pcluster: command not found", + ] + return any(pattern in combined_output for pattern in transient_patterns) + class Image: """Contain all static and dynamic data related to an image instance.""" @@ -46,7 +78,7 @@ def list_images(**kwargs): return response def build(self, **kwargs): - """Build image.""" + """Build image with retry logic for transient errors.""" raise_on_error = kwargs.pop("raise_on_error", True) log_error = kwargs.pop("log_error", True) @@ -64,26 +96,57 @@ def build(self, **kwargs): for k, val in kwargs.items(): command.extend([f"--{kebab_case(k)}", str(val)]) - result = run_pcluster_command(command, raise_on_error=raise_on_error, log_error=log_error) - response = json.loads(result.stdout) - try: - if response["image"]["imageBuildStatus"] == "BUILD_IN_PROGRESS": - self._update_image_info(response["image"]) - elif log_error: - logging.error("Error building image: %s", response) - except KeyError: - if log_error: - logging.error("Error building image: %s", result.stdout) - if raise_on_error: - raise + for attempt in range(MAX_TRANSIENT_RETRIES): + try: + result = run_pcluster_command(command, raise_on_error=raise_on_error, log_error=log_error) + response = json.loads(result.stdout) + + # Check for transient error in successful response (returncode=0 but error in message) + if _is_transient_pcluster_error(stdout=result.stdout, stderr=result.stderr): + logging.warning( + "Transient pcluster error detected (attempt %d/%d): %s", + attempt + 1, + MAX_TRANSIENT_RETRIES, + result.stdout, + ) + if attempt < MAX_TRANSIENT_RETRIES - 1: + time.sleep(TRANSIENT_RETRY_DELAY_SECONDS) + continue + # Last attempt failed, fall through to normal error handling - if "configurationValidationErrors" in response: - self.configuration_errors = response["configurationValidationErrors"] + try: + if response["image"]["imageBuildStatus"] == "BUILD_IN_PROGRESS": + self._update_image_info(response["image"]) + elif log_error: + logging.error("Error building image: %s", response) + except KeyError: + if log_error: + logging.error("Error building image: %s", result.stdout) + if raise_on_error: + raise - if "message" in response: - self.message = response["message"] + if "configurationValidationErrors" in response: + self.configuration_errors = response["configurationValidationErrors"] - return response["image"] if "image" in response else response + if "message" in response: + self.message = response["message"] + + return response["image"] if "image" in response else response + + except subprocess.CalledProcessError as e: + if _is_transient_pcluster_error(stdout=e.stdout, stderr=e.stderr): + logging.warning( + "Transient pcluster error detected (attempt %d/%d): stdout=%s, stderr=%s", + attempt + 1, + MAX_TRANSIENT_RETRIES, + e.stdout, + e.stderr, + ) + if attempt < MAX_TRANSIENT_RETRIES - 1: + time.sleep(TRANSIENT_RETRY_DELAY_SECONDS) + continue + # Not a transient error or last attempt, re-raise + raise def delete(self, force=False): """Delete image.""" @@ -141,14 +204,31 @@ def get_log_events(self, log_stream_name, **args): return response def get_stack_events(self, **args): - """Get image build stack events.""" + """Get image build stack events with retry logic for transient errors.""" logging.info("Get image %s build log.", self.image_id) command = ["pcluster", "get-image-stack-events", "--region", self.region, "--image-id", self.image_id] for k, val in args.items(): command.extend([f"--{kebab_case(k)}", str(val)]) - result = run_pcluster_command(command).stdout - response = json.loads(result) - return response + + for attempt in range(MAX_TRANSIENT_RETRIES): + try: + result = run_pcluster_command(command).stdout + response = json.loads(result) + return response + except subprocess.CalledProcessError as e: + if _is_transient_pcluster_error(stdout=e.stdout, stderr=e.stderr): + logging.warning( + "Transient pcluster error detected in get_stack_events (attempt %d/%d): " + "stdout=%s, stderr=%s", + attempt + 1, + MAX_TRANSIENT_RETRIES, + e.stdout, + e.stderr, + ) + if attempt < MAX_TRANSIENT_RETRIES - 1: + time.sleep(TRANSIENT_RETRY_DELAY_SECONDS) + continue + raise def list_log_streams(self): """Get image build log streams."""