Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pyinstaller==5.9.0
tox
tox>=4.18.1
pytest
pytest-cov
pytest-flake8
Expand Down
137 changes: 105 additions & 32 deletions src/fosslight_binary/binary_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import logging
import yaml
import stat
import re
from fosslight_util.set_log import init_log
import fosslight_util.constant as constant
from fosslight_util.output_format import check_output_formats, write_output_file
Expand All @@ -19,13 +20,16 @@
from ._jar_analysis import analyze_jar_file, merge_binary_list
from fosslight_util.correct import correct_with_yaml
from fosslight_util.oss_item import ScannerItem
from fosslight_util.download import compression_extension
from fosslight_util.write_txt import write_txt_file
import hashlib
import tlsh
from io import open

PKG_NAME = "fosslight_binary"
logger = logging.getLogger(constant.LOGGER_NAME)

_REMOVE_FILE_EXTENSION_SIMPLE = ['ttf', 'otf', 'png', 'gif', 'jpg', 'bmp', 'jpeg']
_REMOVE_FILE_EXTENSION = ['qm', 'xlsx', 'pdf', 'pptx', 'jfif', 'docx', 'doc', 'whl',
'xls', 'xlsm', 'ppt', 'mp4', 'pyc', 'plist', 'dat', 'json', 'js']
_REMOVE_FILE_COMMAND_RESULT = [
Expand All @@ -48,6 +52,7 @@
'Homepage', 'Copyright Text', 'Exclude',
'Comment', 'Vulnerability Link', 'TLSH', 'SHA1']}
HIDE_HEADER = {'TLSH', "SHA1"}
SUPPORT_FORMAT = {'excel': '.xlsx', 'csv': '.csv', 'opossum': '.json', 'yaml': '.yaml', 'text': '.txt'}


def get_checksum_and_tlsh(bin_with_path):
Expand All @@ -70,8 +75,9 @@ def get_checksum_and_tlsh(bin_with_path):
return checksum_value, tlsh_value, error_msg


def init(path_to_find_bin, output_file_name, formats, path_to_exclude=[]):
def init(path_to_find_bin, output_file_name, formats, path_to_exclude=[], simple_mode=False):
global _root_path, logger, _start_time
combined_paths_and_files = []

_json_ext = ".json"
_start_time = datetime.now().strftime('%y%m%d_%H%M')
Expand All @@ -83,7 +89,10 @@ def init(path_to_find_bin, output_file_name, formats, path_to_exclude=[]):
if not path_to_find_bin.endswith(os.path.sep):
_root_path += os.path.sep

success, msg, output_path, output_files, output_extensions = check_output_formats(output_file_name, formats)
success, msg, output_path, output_files, output_extensions = check_output_formats(output_file_name, formats, SUPPORT_FORMAT, True)
print(f"output_path from init : {output_path}")
print(f"output_files from init : {output_files}")
print(f"output_extensions from init : {output_extensions}")

if success:
if output_path == "":
Expand All @@ -93,14 +102,19 @@ def init(path_to_find_bin, output_file_name, formats, path_to_exclude=[]):

while len(output_files) < len(output_extensions):
output_files.append(None)
for i, output_extension in enumerate(output_extensions):
if output_files[i] is None or output_files[i] == "":
if output_extension == _json_ext:
output_files[i] = f"fosslight_opossum_bin_{_start_time}"
else:
output_files[i] = f"fosslight_report_bin_{_start_time}"

combined_paths_and_files = [os.path.join(output_path, file) for file in output_files]
if simple_mode:
if output_files is None or output_files == "":
combined_paths_and_files.append(os.path.join(output_path, f"binary_list_{_start_time}"))
combined_paths_and_files.append(os.path.join(output_path, f"compressed_list_{_start_time}"))
else:
for i, output_extension in enumerate(output_extensions):
if output_files[i] is None or output_files[i] == "":
if output_extension == _json_ext:
output_files[i] = f"fosslight_opossum_bin_{_start_time}"
else:
output_files[i] = f"fosslight_report_bin_{_start_time}"

combined_paths_and_files = [os.path.join(output_path, file) for file in output_files]
else:
logger.error(f"Format error - {msg}")
sys.exit(1)
Expand All @@ -116,10 +130,11 @@ def init(path_to_find_bin, output_file_name, formats, path_to_exclude=[]):
return _result_log, combined_paths_and_files, output_extensions


def get_file_list(path_to_find, abs_path_to_exclude):
def get_file_list(path_to_find, abs_path_to_exclude, simple_mode=False):
bin_list = []
file_cnt = 0
found_jar = False
compressed_list = []

for root, dirs, files in os.walk(path_to_find):
if os.path.abspath(root) in abs_path_to_exclude:
Expand Down Expand Up @@ -148,6 +163,9 @@ def get_file_list(path_to_find, abs_path_to_exclude):
bin_item.source_name_or_path = bin_with_path.replace(
_root_path, '', 1)

if simple_mode and f".{extension}" in compression_extension:
compressed_list.append(bin_item.bin_name_with_path)

if any(dir_name in dir_path for dir_name in _EXCLUDE_DIR):
bin_item.exclude = True
elif file.lower() in _EXCLUDE_FILE:
Expand All @@ -156,20 +174,44 @@ def get_file_list(path_to_find, abs_path_to_exclude):
bin_item.exclude = True
bin_list.append(bin_item)
file_cnt += 1
return file_cnt, bin_list, found_jar
return file_cnt, bin_list, found_jar, compressed_list


def exclude_bin_for_simple_mode(binary_list):
for bin in binary_list:
if not (re.search(r".*source\.jar", bin.bin_name_with_path.lower()) or bin.exclude):
yield bin.bin_name_with_path


def log_result_msg(results:list, scan_item:ScannerItem):
for success_to_write, writing_msg, result_file in results:
if success_to_write:
if result_file:
logger.info(f"Output file :{result_file}")
else:
logger.warning(f"{writing_msg}")
for row in scan_item.get_cover_comment():
logger.info(row)
else:
logger.error(f"Fail to generate result file.:{writing_msg}")


# def get_result_file(result_reports:list, output_extensions:list, results:list, compressed_list:list, bin_list:list):
# for combined_path_and_file, output_extension in zip(result_reports, output_extensions):
# results.append(write_output_file(combined_path_and_file, output_extension, scan_item, BIN_EXT_HEADER, HIDE_HEADER))
# return results


def find_binaries(path_to_find_bin, output_dir, formats, dburl="", simple_mode=False,
correct_mode=True, correct_filepath="", path_to_exclude=[]):

_result_log, result_reports, output_extensions = init(
path_to_find_bin, output_dir, formats, path_to_exclude)
path_to_find_bin, output_dir, formats, path_to_exclude, simple_mode)

total_bin_cnt = 0
total_file_cnt = 0
db_loaded_cnt = 0
success_to_write = False
writing_msg = ""
results = []
bin_list = []
base_dir_name = os.path.basename(path_to_find_bin)
Expand All @@ -183,15 +225,22 @@ def find_binaries(path_to_find_bin, output_dir, formats, dburl="", simple_mode=F
if not correct_filepath:
correct_filepath = path_to_find_bin
try:
total_file_cnt, file_list, found_jar = get_file_list(path_to_find_bin, abs_path_to_exclude)
return_list = list(return_bin_only(file_list))
total_file_cnt, file_list, found_jar, compressed_list = get_file_list(path_to_find_bin, abs_path_to_exclude, simple_mode)
return_list = list(return_bin_only(file_list, True, simple_mode))
except Exception as ex:
error_occured(error_msg=f"Failed to check whether it is binary or not : {ex}",
result_log=_result_log,
exit=True)
total_bin_cnt = len(return_list)
if simple_mode:
bin_list = [bin.bin_name_with_path for bin in return_list]
bin_list = list(exclude_bin_for_simple_mode(return_list))

# results = get_result_file(result_reports, output_extensions, results, compressed_list, bin_list)

print(f"result_reports: {result_reports}")
print(f"output_extensions: {output_extensions}")

print_simple_mode(result_reports, output_extensions, compressed_list, bin_list)
else:
scan_item = ScannerItem(PKG_NAME, _start_time)
scan_item.set_cover_pathinfo(path_to_find_bin, path_to_exclude)
Expand Down Expand Up @@ -226,17 +275,8 @@ def find_binaries(path_to_find_bin, output_dir, formats, dburl="", simple_mode=F

except Exception as ex:
error_occured(error_msg=str(ex), exit=False)

for success_to_write, writing_msg, result_file in results:
if success_to_write:
if result_file:
logger.info(f"Output file :{result_file}")
else:
logger.warning(f"{writing_msg}")
for row in scan_item.get_cover_comment():
logger.info(row)
else:
logger.error(f"Fail to generate result file.:{writing_msg}")

log_result_msg(results, scan_item)

try:
print_result_log(success=True, result_log=_result_log,
Expand All @@ -249,10 +289,10 @@ def find_binaries(path_to_find_bin, output_dir, formats, dburl="", simple_mode=F
return success_to_write, scan_item


def return_bin_only(file_list, need_checksum_tlsh=True):
def return_bin_only(file_list, need_checksum_tlsh=True, simple_mode=False):
for file_item in file_list:
try:
if check_binary(file_item.bin_name_with_path):
if check_binary(file_item.bin_name_with_path, simple_mode):
if need_checksum_tlsh:
file_item.checksum, file_item.tlsh, error_msg = get_checksum_and_tlsh(file_item.bin_name_with_path)
if error_msg:
Expand All @@ -264,11 +304,17 @@ def return_bin_only(file_list, need_checksum_tlsh=True):
yield file_item


def check_binary(file_with_path):
def check_binary(file_with_path, simple_mode=False):
is_bin_confirmed = False
file = os.path.basename(file_with_path)
extension = os.path.splitext(file)[1][1:]
if not os.path.islink(file_with_path) and extension.lower() not in _REMOVE_FILE_EXTENSION:

if simple_mode:
remove_file_ext_list = _REMOVE_FILE_EXTENSION + _REMOVE_FILE_EXTENSION_SIMPLE + list(compression_extension)
else:
remove_file_ext_list = _REMOVE_FILE_EXTENSION

if not os.path.islink(file_with_path) and extension.lower() not in remove_file_ext_list:
if stat.S_ISFIFO(os.stat(file_with_path).st_mode):
return False
file_command_result = ""
Expand Down Expand Up @@ -324,3 +370,30 @@ def print_result_log(success=True, result_log={}, file_cnt="", bin_file_cnt="",
logger.info(_str_final_result_log)
except Exception as ex:
logger.warning(f"Error to print final log: {ex}")


def convert_list_to_str(input_list):
output_text = '\n'.join(map(str, input_list))
return output_text


def print_simple_mode(result_reports, output_extensions, compressed_list, bin_list):
print(f"result_reports: {result_reports}")
compressed_list_txt = ""
simplie_mode_bin_list_txt = ""

for item in result_reports:
if 'compressed_list_' in item:
compressed_list_txt = f"{item}.{output_extensions}"
else:
simplie_mode_bin_list_txt = f"{item}.{output_extensions}"


if compressed_list:
success, error = write_txt_file(compressed_list_txt, convert_list_to_str(compressed_list))
if not success:
logger.info(f"Error to write compressed list file for simple mode : {error}")
if bin_list:
success, error = write_txt_file(simplie_mode_bin_list_txt, convert_list_to_str(bin_list))
if not success:
logger.info(f"Error to write binary list file for simple mode : {error}")
2 changes: 1 addition & 1 deletion src/fosslight_binary/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def main():
print_package_version(_PKG_NAME, "FOSSLight Binary Scanner Version:")
sys.exit(0)

if args.simple:
if args.simple: # -s option
simple_mode = True

if args.path: # -p option
Expand Down
Loading