diff --git a/requirements-dev.txt b/requirements-dev.txt index 5d3baed..5bf918b 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,5 +1,5 @@ pyinstaller==5.9.0 -tox +tox>=4.18.1 pytest pytest-cov pytest-flake8 diff --git a/src/fosslight_binary/binary_analysis.py b/src/fosslight_binary/binary_analysis.py index d60165f..e140518 100755 --- a/src/fosslight_binary/binary_analysis.py +++ b/src/fosslight_binary/binary_analysis.py @@ -11,6 +11,7 @@ import logging import yaml import stat +import re from fosslight_util.set_log import init_log import fosslight_util.constant as constant from fosslight_util.output_format import check_output_formats, write_output_file @@ -19,6 +20,8 @@ from ._jar_analysis import analyze_jar_file, merge_binary_list from fosslight_util.correct import correct_with_yaml from fosslight_util.oss_item import ScannerItem +from fosslight_util.download import compression_extension +from fosslight_util.write_txt import write_txt_file import hashlib import tlsh from io import open @@ -26,6 +29,7 @@ PKG_NAME = "fosslight_binary" logger = logging.getLogger(constant.LOGGER_NAME) +_REMOVE_FILE_EXTENSION_SIMPLE = ['ttf', 'otf', 'png', 'gif', 'jpg', 'bmp', 'jpeg'] _REMOVE_FILE_EXTENSION = ['qm', 'xlsx', 'pdf', 'pptx', 'jfif', 'docx', 'doc', 'whl', 'xls', 'xlsm', 'ppt', 'mp4', 'pyc', 'plist', 'dat', 'json', 'js'] _REMOVE_FILE_COMMAND_RESULT = [ @@ -48,6 +52,7 @@ 'Homepage', 'Copyright Text', 'Exclude', 'Comment', 'Vulnerability Link', 'TLSH', 'SHA1']} HIDE_HEADER = {'TLSH', "SHA1"} +SUPPORT_FORMAT = {'excel': '.xlsx', 'csv': '.csv', 'opossum': '.json', 'yaml': '.yaml', 'text': '.txt'} def get_checksum_and_tlsh(bin_with_path): @@ -70,8 +75,9 @@ def get_checksum_and_tlsh(bin_with_path): return checksum_value, tlsh_value, error_msg -def init(path_to_find_bin, output_file_name, formats, path_to_exclude=[]): +def init(path_to_find_bin, output_file_name, formats, path_to_exclude=[], simple_mode=False): global _root_path, logger, _start_time + combined_paths_and_files = [] _json_ext = ".json" _start_time = datetime.now().strftime('%y%m%d_%H%M') @@ -83,7 +89,10 @@ def init(path_to_find_bin, output_file_name, formats, path_to_exclude=[]): if not path_to_find_bin.endswith(os.path.sep): _root_path += os.path.sep - success, msg, output_path, output_files, output_extensions = check_output_formats(output_file_name, formats) + success, msg, output_path, output_files, output_extensions = check_output_formats(output_file_name, formats, SUPPORT_FORMAT, True) + print(f"output_path from init : {output_path}") + print(f"output_files from init : {output_files}") + print(f"output_extensions from init : {output_extensions}") if success: if output_path == "": @@ -93,14 +102,19 @@ def init(path_to_find_bin, output_file_name, formats, path_to_exclude=[]): while len(output_files) < len(output_extensions): output_files.append(None) - for i, output_extension in enumerate(output_extensions): - if output_files[i] is None or output_files[i] == "": - if output_extension == _json_ext: - output_files[i] = f"fosslight_opossum_bin_{_start_time}" - else: - output_files[i] = f"fosslight_report_bin_{_start_time}" - - combined_paths_and_files = [os.path.join(output_path, file) for file in output_files] + if simple_mode: + if output_files is None or output_files == "": + combined_paths_and_files.append(os.path.join(output_path, f"binary_list_{_start_time}")) + combined_paths_and_files.append(os.path.join(output_path, f"compressed_list_{_start_time}")) + else: + for i, output_extension in enumerate(output_extensions): + if output_files[i] is None or output_files[i] == "": + if output_extension == _json_ext: + output_files[i] = f"fosslight_opossum_bin_{_start_time}" + else: + output_files[i] = f"fosslight_report_bin_{_start_time}" + + combined_paths_and_files = [os.path.join(output_path, file) for file in output_files] else: logger.error(f"Format error - {msg}") sys.exit(1) @@ -116,10 +130,11 @@ def init(path_to_find_bin, output_file_name, formats, path_to_exclude=[]): return _result_log, combined_paths_and_files, output_extensions -def get_file_list(path_to_find, abs_path_to_exclude): +def get_file_list(path_to_find, abs_path_to_exclude, simple_mode=False): bin_list = [] file_cnt = 0 found_jar = False + compressed_list = [] for root, dirs, files in os.walk(path_to_find): if os.path.abspath(root) in abs_path_to_exclude: @@ -148,6 +163,9 @@ def get_file_list(path_to_find, abs_path_to_exclude): bin_item.source_name_or_path = bin_with_path.replace( _root_path, '', 1) + if simple_mode and f".{extension}" in compression_extension: + compressed_list.append(bin_item.bin_name_with_path) + if any(dir_name in dir_path for dir_name in _EXCLUDE_DIR): bin_item.exclude = True elif file.lower() in _EXCLUDE_FILE: @@ -156,20 +174,44 @@ def get_file_list(path_to_find, abs_path_to_exclude): bin_item.exclude = True bin_list.append(bin_item) file_cnt += 1 - return file_cnt, bin_list, found_jar + return file_cnt, bin_list, found_jar, compressed_list + + +def exclude_bin_for_simple_mode(binary_list): + for bin in binary_list: + if not (re.search(r".*source\.jar", bin.bin_name_with_path.lower()) or bin.exclude): + yield bin.bin_name_with_path + + +def log_result_msg(results:list, scan_item:ScannerItem): + for success_to_write, writing_msg, result_file in results: + if success_to_write: + if result_file: + logger.info(f"Output file :{result_file}") + else: + logger.warning(f"{writing_msg}") + for row in scan_item.get_cover_comment(): + logger.info(row) + else: + logger.error(f"Fail to generate result file.:{writing_msg}") + + +# def get_result_file(result_reports:list, output_extensions:list, results:list, compressed_list:list, bin_list:list): +# for combined_path_and_file, output_extension in zip(result_reports, output_extensions): +# results.append(write_output_file(combined_path_and_file, output_extension, scan_item, BIN_EXT_HEADER, HIDE_HEADER)) +# return results def find_binaries(path_to_find_bin, output_dir, formats, dburl="", simple_mode=False, correct_mode=True, correct_filepath="", path_to_exclude=[]): _result_log, result_reports, output_extensions = init( - path_to_find_bin, output_dir, formats, path_to_exclude) + path_to_find_bin, output_dir, formats, path_to_exclude, simple_mode) total_bin_cnt = 0 total_file_cnt = 0 db_loaded_cnt = 0 success_to_write = False - writing_msg = "" results = [] bin_list = [] base_dir_name = os.path.basename(path_to_find_bin) @@ -183,15 +225,22 @@ def find_binaries(path_to_find_bin, output_dir, formats, dburl="", simple_mode=F if not correct_filepath: correct_filepath = path_to_find_bin try: - total_file_cnt, file_list, found_jar = get_file_list(path_to_find_bin, abs_path_to_exclude) - return_list = list(return_bin_only(file_list)) + total_file_cnt, file_list, found_jar, compressed_list = get_file_list(path_to_find_bin, abs_path_to_exclude, simple_mode) + return_list = list(return_bin_only(file_list, True, simple_mode)) except Exception as ex: error_occured(error_msg=f"Failed to check whether it is binary or not : {ex}", result_log=_result_log, exit=True) total_bin_cnt = len(return_list) if simple_mode: - bin_list = [bin.bin_name_with_path for bin in return_list] + bin_list = list(exclude_bin_for_simple_mode(return_list)) + + # results = get_result_file(result_reports, output_extensions, results, compressed_list, bin_list) + + print(f"result_reports: {result_reports}") + print(f"output_extensions: {output_extensions}") + + print_simple_mode(result_reports, output_extensions, compressed_list, bin_list) else: scan_item = ScannerItem(PKG_NAME, _start_time) scan_item.set_cover_pathinfo(path_to_find_bin, path_to_exclude) @@ -226,17 +275,8 @@ def find_binaries(path_to_find_bin, output_dir, formats, dburl="", simple_mode=F except Exception as ex: error_occured(error_msg=str(ex), exit=False) - - for success_to_write, writing_msg, result_file in results: - if success_to_write: - if result_file: - logger.info(f"Output file :{result_file}") - else: - logger.warning(f"{writing_msg}") - for row in scan_item.get_cover_comment(): - logger.info(row) - else: - logger.error(f"Fail to generate result file.:{writing_msg}") + + log_result_msg(results, scan_item) try: print_result_log(success=True, result_log=_result_log, @@ -249,10 +289,10 @@ def find_binaries(path_to_find_bin, output_dir, formats, dburl="", simple_mode=F return success_to_write, scan_item -def return_bin_only(file_list, need_checksum_tlsh=True): +def return_bin_only(file_list, need_checksum_tlsh=True, simple_mode=False): for file_item in file_list: try: - if check_binary(file_item.bin_name_with_path): + if check_binary(file_item.bin_name_with_path, simple_mode): if need_checksum_tlsh: file_item.checksum, file_item.tlsh, error_msg = get_checksum_and_tlsh(file_item.bin_name_with_path) if error_msg: @@ -264,11 +304,17 @@ def return_bin_only(file_list, need_checksum_tlsh=True): yield file_item -def check_binary(file_with_path): +def check_binary(file_with_path, simple_mode=False): is_bin_confirmed = False file = os.path.basename(file_with_path) extension = os.path.splitext(file)[1][1:] - if not os.path.islink(file_with_path) and extension.lower() not in _REMOVE_FILE_EXTENSION: + + if simple_mode: + remove_file_ext_list = _REMOVE_FILE_EXTENSION + _REMOVE_FILE_EXTENSION_SIMPLE + list(compression_extension) + else: + remove_file_ext_list = _REMOVE_FILE_EXTENSION + + if not os.path.islink(file_with_path) and extension.lower() not in remove_file_ext_list: if stat.S_ISFIFO(os.stat(file_with_path).st_mode): return False file_command_result = "" @@ -324,3 +370,30 @@ def print_result_log(success=True, result_log={}, file_cnt="", bin_file_cnt="", logger.info(_str_final_result_log) except Exception as ex: logger.warning(f"Error to print final log: {ex}") + + +def convert_list_to_str(input_list): + output_text = '\n'.join(map(str, input_list)) + return output_text + + +def print_simple_mode(result_reports, output_extensions, compressed_list, bin_list): + print(f"result_reports: {result_reports}") + compressed_list_txt = "" + simplie_mode_bin_list_txt = "" + + for item in result_reports: + if 'compressed_list_' in item: + compressed_list_txt = f"{item}.{output_extensions}" + else: + simplie_mode_bin_list_txt = f"{item}.{output_extensions}" + + + if compressed_list: + success, error = write_txt_file(compressed_list_txt, convert_list_to_str(compressed_list)) + if not success: + logger.info(f"Error to write compressed list file for simple mode : {error}") + if bin_list: + success, error = write_txt_file(simplie_mode_bin_list_txt, convert_list_to_str(bin_list)) + if not success: + logger.info(f"Error to write binary list file for simple mode : {error}") diff --git a/src/fosslight_binary/cli.py b/src/fosslight_binary/cli.py index 2086d34..eb77143 100644 --- a/src/fosslight_binary/cli.py +++ b/src/fosslight_binary/cli.py @@ -49,7 +49,7 @@ def main(): print_package_version(_PKG_NAME, "FOSSLight Binary Scanner Version:") sys.exit(0) - if args.simple: + if args.simple: # -s option simple_mode = True if args.path: # -p option