Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion cycode/cli/app.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import logging
from pathlib import Path
from typing import Annotated, Optional

import typer
from typer.completion import install_callback, show_callback

from cycode import __version__
from cycode.cli.apps import ai_remediation, auth, configure, ignore, report, scan, status
from cycode.cli.cli_types import OutputTypeOption
from cycode.cli.cli_types import ExportTypeOption, OutputTypeOption
from cycode.cli.consts import CLI_CONTEXT_SETTINGS
from cycode.cli.printers import ConsolePrinter
from cycode.cli.user_settings.configuration_manager import ConfigurationManager
from cycode.cli.utils.progress_bar import SCAN_PROGRESS_BAR_SECTIONS, get_progress_bar
from cycode.cli.utils.sentry import add_breadcrumb, init_sentry
Expand Down Expand Up @@ -44,7 +46,14 @@ def check_latest_version_on_close(ctx: typer.Context) -> None:
version_checker.check_and_notify_update(current_version=__version__, use_cache=should_use_cache)


def export_if_needed_on_close(ctx: typer.Context) -> None:
printer = ctx.obj.get('console_printer')
if printer.is_recording:
printer.export()


_COMPLETION_RICH_HELP_PANEL = 'Completion options'
_EXPORT_RICH_HELP_PANEL = 'Export options'


@app.callback()
Expand All @@ -64,6 +73,27 @@ def app_callback(
Optional[str],
typer.Option(hidden=True, help='Characteristic JSON object that lets servers identify the application.'),
] = None,
export_type: Annotated[
ExportTypeOption,
typer.Option(
'--export-type',
case_sensitive=False,
help='Specify the export type. '
'HTML and SVG will export terminal output and rely on --output option. '
'JSON always exports JSON.',
rich_help_panel=_EXPORT_RICH_HELP_PANEL,
),
] = ExportTypeOption.JSON,
export_file: Annotated[
Optional[Path],
typer.Option(
'--export-file',
help='Export file. Path to the file where the export will be saved. ',
dir_okay=False,
writable=True,
rich_help_panel=_EXPORT_RICH_HELP_PANEL,
),
] = None,
_: Annotated[
Optional[bool],
typer.Option(
Expand Down Expand Up @@ -104,6 +134,11 @@ def app_callback(

ctx.obj['progress_bar'] = get_progress_bar(hidden=no_progress_meter, sections=SCAN_PROGRESS_BAR_SECTIONS)

ctx.obj['export_type'] = export_type
ctx.obj['export_file'] = export_file
ctx.obj['console_printer'] = ConsolePrinter(ctx)
ctx.call_on_close(lambda: export_if_needed_on_close(ctx))

if user_agent:
user_agent_option = UserAgentOptionScheme().loads(user_agent)
CycodeClientBase.enrich_user_agent(user_agent_option.user_agent_suffix)
Expand Down
3 changes: 1 addition & 2 deletions cycode/cli/apps/ai_remediation/apply_fix.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@
from patch_ng import fromstring

from cycode.cli.models import CliResult
from cycode.cli.printers import ConsolePrinter


def apply_fix(ctx: typer.Context, diff: str, is_fix_available: bool) -> None:
printer = ConsolePrinter(ctx)
printer = ctx.obj.get('console_printer')
if not is_fix_available:
printer.print_result(CliResult(success=False, message='Fix is not available for this violation'))
return
Expand Down
3 changes: 1 addition & 2 deletions cycode/cli/apps/ai_remediation/print_remediation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@

from cycode.cli.console import console
from cycode.cli.models import CliResult
from cycode.cli.printers import ConsolePrinter


def print_remediation(ctx: typer.Context, remediation_markdown: str, is_fix_available: bool) -> None:
printer = ConsolePrinter(ctx)
printer = ctx.obj.get('console_printer')
if printer.is_json_printer:
data = {'remediation': remediation_markdown, 'is_fix_available': is_fix_available}
printer.print_result(CliResult(success=True, message='Remediation fetched successfully', data=data))
Expand Down
4 changes: 2 additions & 2 deletions cycode/cli/apps/auth/auth_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
from cycode.cli.exceptions.handle_auth_errors import handle_auth_exception
from cycode.cli.logger import logger
from cycode.cli.models import CliResult
from cycode.cli.printers import ConsolePrinter
from cycode.cli.utils.sentry import add_breadcrumb


def auth_command(ctx: typer.Context) -> None:
"""Authenticates your machine."""
add_breadcrumb('auth')
printer = ctx.obj.get('console_printer')

if ctx.invoked_subcommand is not None:
# if it is a subcommand, do nothing
Expand All @@ -23,6 +23,6 @@ def auth_command(ctx: typer.Context) -> None:
auth_manager.authenticate()

result = CliResult(success=True, message='Successfully logged into cycode')
ConsolePrinter(ctx).print_result(result)
printer.print_result(result)
except Exception as err:
handle_auth_exception(ctx, err)
5 changes: 3 additions & 2 deletions cycode/cli/apps/auth/auth_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@

from cycode.cli.apps.auth.models import AuthInfo
from cycode.cli.exceptions.custom_exceptions import HttpUnauthorizedError, RequestHttpError
from cycode.cli.printers import ConsolePrinter
from cycode.cli.user_settings.credentials_manager import CredentialsManager
from cycode.cli.utils.jwt_utils import get_user_and_tenant_ids_from_access_token
from cycode.cyclient.cycode_token_based_client import CycodeTokenBasedClient


def get_authorization_info(ctx: Optional[typer.Context] = None) -> Optional[AuthInfo]:
printer = ctx.obj.get('console_printer')

client_id, client_secret = CredentialsManager().get_credentials()
if not client_id or not client_secret:
return None
Expand All @@ -24,6 +25,6 @@ def get_authorization_info(ctx: Optional[typer.Context] = None) -> Optional[Auth
return AuthInfo(user_id=user_id, tenant_id=tenant_id)
except (RequestHttpError, HttpUnauthorizedError):
if ctx:
ConsolePrinter(ctx).print_exception()
printer.print_exception()

return None
3 changes: 1 addition & 2 deletions cycode/cli/apps/auth/check_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@

from cycode.cli.apps.auth.auth_common import get_authorization_info
from cycode.cli.models import CliResult
from cycode.cli.printers import ConsolePrinter
from cycode.cli.utils.sentry import add_breadcrumb


def check_command(ctx: typer.Context) -> None:
"""Checks that your machine is associating the CLI with your Cycode account."""
add_breadcrumb('check')

printer = ConsolePrinter(ctx)
printer = ctx.obj.get('console_printer')
auth_info = get_authorization_info(ctx)
if auth_info is None:
printer.print_result(CliResult(success=False, message='Cycode authentication failed'))
Expand Down
6 changes: 3 additions & 3 deletions cycode/cli/apps/scan/code_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from cycode.cli.files_collector.sca.sca_code_scanner import perform_pre_scan_documents_actions
from cycode.cli.files_collector.zip_documents import zip_documents
from cycode.cli.models import CliError, Document, DocumentDetections, LocalScanResult
from cycode.cli.printers import ConsolePrinter
from cycode.cli.utils import scan_utils
from cycode.cli.utils.git_proxy import git_proxy
from cycode.cli.utils.path_utils import get_path_by_os
Expand Down Expand Up @@ -304,10 +303,11 @@ def scan_documents(
) -> None:
scan_type = ctx.obj['scan_type']
progress_bar = ctx.obj['progress_bar']
printer = ctx.obj.get('console_printer')

if not documents_to_scan:
progress_bar.stop()
ConsolePrinter(ctx).print_error(
printer.print_error(
CliError(
code='no_relevant_files',
message='Error: The scan could not be completed - relevant files to scan are not found. '
Expand Down Expand Up @@ -569,7 +569,7 @@ def print_debug_scan_details(scan_details_response: 'ScanDetailsResponse') -> No
def print_results(
ctx: typer.Context, local_scan_results: List[LocalScanResult], errors: Optional[Dict[str, 'CliError']] = None
) -> None:
printer = ConsolePrinter(ctx)
printer = ctx.obj.get('console_printer')
printer.print_scan_results(local_scan_results, errors)


Expand Down
6 changes: 6 additions & 0 deletions cycode/cli/cli_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ class OutputTypeOption(str, Enum):
TABLE = 'table'


class ExportTypeOption(str, Enum):
JSON = 'json'
HTML = 'html'
SVG = 'svg'


class ScanTypeOption(str, Enum):
SECRET = consts.SECRET_SCAN_TYPE
SCA = consts.SCA_SCAN_TYPE
Expand Down
8 changes: 4 additions & 4 deletions cycode/cli/exceptions/handle_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
import typer

from cycode.cli.models import CliError, CliErrors
from cycode.cli.printers import ConsolePrinter
from cycode.cli.utils.sentry import capture_exception


def handle_errors(
ctx: typer.Context, err: BaseException, cli_errors: CliErrors, *, return_exception: bool = False
) -> Optional['CliError']:
ConsolePrinter(ctx).print_exception(err)
printer = ctx.obj.get('console_printer')
printer.print_exception(err)

if type(err) in cli_errors:
error = cli_errors[type(err)].enrich(additional_message=str(err))
Expand All @@ -22,7 +22,7 @@ def handle_errors(
if return_exception:
return error

ConsolePrinter(ctx).print_error(error)
printer.print_error(error)
return None

if isinstance(err, click.ClickException):
Expand All @@ -34,5 +34,5 @@ def handle_errors(
if return_exception:
return unknown_error

ConsolePrinter(ctx).print_error(unknown_error)
printer.print_error(unknown_error)
raise typer.Exit(1)
105 changes: 84 additions & 21 deletions cycode/cli/printers/console_printer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import io
from typing import TYPE_CHECKING, ClassVar, Dict, List, Optional, Type

import typer
from rich.console import Console

from cycode.cli import consts
from cycode.cli.cli_types import ExportTypeOption
from cycode.cli.console import console, console_err
from cycode.cli.exceptions.custom_exceptions import CycodeError
from cycode.cli.models import CliError, CliResult
from cycode.cli.printers.json_printer import JsonPrinter
Expand All @@ -27,57 +32,115 @@ class ConsolePrinter:
'rich_sca': ScaTablePrinter,
}

def __init__(self, ctx: typer.Context) -> None:
def __init__(
self,
ctx: typer.Context,
console_override: Optional['Console'] = None,
console_err_override: Optional['Console'] = None,
output_type_override: Optional[str] = None,
) -> None:
self.ctx = ctx
self.console = console_override or console
self.console_err = console_err_override or console_err

self.scan_type = self.ctx.obj.get('scan_type')
self.output_type = self.ctx.obj.get('output')
self.output_type = output_type_override or self.ctx.obj.get('output')
self.aggregation_report_url = self.ctx.obj.get('aggregation_report_url')

self._printer_class = self._AVAILABLE_PRINTERS.get(self.output_type)
if self._printer_class is None:
raise CycodeError(f'"{self.output_type}" output type is not supported.')
self.printer = self._get_scan_printer()

def print_scan_results(
self,
local_scan_results: List['LocalScanResult'],
errors: Optional[Dict[str, 'CliError']] = None,
) -> None:
printer = self._get_scan_printer()
printer.print_scan_results(local_scan_results, errors)
self.console_record = None

self.export_type = self.ctx.obj.get('export_type')
self.export_file = self.ctx.obj.get('export_file')
if console_override is None and self.export_type and self.export_file:
self.console_record = ConsolePrinter(
ctx,
console_override=Console(record=True, file=io.StringIO()),
console_err_override=Console(stderr=True, record=True, file=io.StringIO()),
output_type_override='json' if self.export_type == 'json' else self.output_type,
)

def _get_scan_printer(self) -> 'PrinterBase':
printer_class = self._printer_class
printer_class = self._AVAILABLE_PRINTERS.get(self.output_type)

composite_printer = self._AVAILABLE_PRINTERS.get(f'{self.output_type}_{self.scan_type}')
if composite_printer:
printer_class = composite_printer

return printer_class(self.ctx)
if not printer_class:
raise CycodeError(f'"{self.output_type}" output type is not supported.')

return printer_class(self.ctx, self.console, self.console_err)

def print_scan_results(
self,
local_scan_results: List['LocalScanResult'],
errors: Optional[Dict[str, 'CliError']] = None,
) -> None:
if self.console_record:
self.console_record.print_scan_results(local_scan_results, errors)
self.printer.print_scan_results(local_scan_results, errors)

def print_result(self, result: CliResult) -> None:
self._printer_class(self.ctx).print_result(result)
if self.console_record:
self.console_record.print_result(result)
self.printer.print_result(result)

def print_error(self, error: CliError) -> None:
self._printer_class(self.ctx).print_error(error)
if self.console_record:
self.console_record.print_error(error)
self.printer.print_error(error)

def print_exception(self, e: Optional[BaseException] = None, force_print: bool = False) -> None:
"""Print traceback message in stderr if verbose mode is set."""
if force_print or self.ctx.obj.get('verbose', False):
self._printer_class(self.ctx).print_exception(e)
if self.console_record:
self.console_record.print_exception(e)
self.printer.print_exception(e)

def export(self) -> None:
if self.console_record is None:
raise CycodeError('Console recording was not enabled. Cannot export.')

if not self.export_file.suffix:
# resolve file extension based on the export type if not provided in the file name
self.export_file = self.export_file.with_suffix(f'.{self.export_type.lower()}')

if self.export_type is ExportTypeOption.HTML:
self.console_record.console.save_html(self.export_file)
elif self.export_type is ExportTypeOption.SVG:
self.console_record.console.save_svg(self.export_file, title=consts.APP_NAME)
elif self.export_type is ExportTypeOption.JSON:
with open(self.export_file, 'w', encoding='UTF-8') as f:
self.console_record.console.file.seek(0)
f.write(self.console_record.console.file.read())
else:
raise CycodeError(f'Export type "{self.export_type}" is not supported.')

export_format_msg = f'{self.export_type.upper()} format'
if self.export_type in {ExportTypeOption.HTML, ExportTypeOption.SVG}:
export_format_msg += f' with {self.output_type.upper()} output type'

clickable_path = f'[link=file://{self.export_file}]{self.export_file}[/link]'
self.console.print(f'[b green]Cycode CLI output exported to {clickable_path} in {export_format_msg}[/]')

@property
def is_recording(self) -> bool:
return self.console_record is not None

@property
def is_json_printer(self) -> bool:
return self._printer_class == JsonPrinter
return isinstance(self.printer, JsonPrinter)

@property
def is_table_printer(self) -> bool:
return self._printer_class == TablePrinter
return isinstance(self.printer, TablePrinter)

@property
def is_text_printer(self) -> bool:
return self._printer_class == TextPrinter
return isinstance(self.printer, TextPrinter)

@property
def is_rich_printer(self) -> bool:
return self._printer_class == RichPrinter
return isinstance(self.printer, RichPrinter)
Loading