diff --git a/src/jsonid/jsonid.py b/src/jsonid/jsonid.py index 459099c..4d53704 100644 --- a/src/jsonid/jsonid.py +++ b/src/jsonid/jsonid.py @@ -2,12 +2,14 @@ import argparse import asyncio +import base64 +import binascii import json import logging import os import sys import time -from typing import Tuple +from typing import Any, Tuple try: import helpers @@ -36,32 +38,48 @@ logger = logging.getLogger(__name__) +def decode(content: Any, decodeb64: bool): + """Decode the given content stream.""" + data = "" + try: + data = json.loads(content) + except json.decoder.JSONDecodeError: + if not decodeb64: + return False, None + try: + logger.debug("attempting to decode as base64") + data = base64.b64decode(content) + return decode(data, False) + except (binascii.Error, ValueError): + return False, None + return True, data + + @helpers.timeit -async def identify_plaintext_bytestream(path: str) -> Tuple[bool, str]: +async def identify_plaintext_bytestream(path: str, decodeb64: bool) -> Tuple[bool, str]: """Ensure that the file is a palintext bytestream and can be processed as JSON. """ logger.debug("attempting to open: %s", path) - data = "" with open(path, "r", encoding="utf-8") as obj: try: - data = json.loads(obj.read()) - except (json.decoder.JSONDecodeError, UnicodeDecodeError): + content = obj.read() + return decode(content, decodeb64) + except UnicodeDecodeError: return False, None - return True, data -async def identify_json(paths: list[str], binary: bool): +async def identify_json(paths: list[str], binary: bool, decodeb64: bool): """Identify objects""" - print("---") for path in paths: - valid, data = await identify_plaintext_bytestream(path) + valid, data = await identify_plaintext_bytestream(path, decodeb64) if not valid: logger.debug("%s: is not plaintext", path) if binary: logger.warning("report on binary object...") continue if data != "": + print("---") logger.debug("processing: %s", path) res = registry.matcher(data) print(f"file: {path}") @@ -82,20 +100,20 @@ async def create_manifest(path: str) -> list[str]: return paths -async def process_data(path: str, binary: bool): +async def process_data(path: str, binary: bool, decodeb64: bool): """Process all objects at a given path""" logger.debug("processing: %s", path) if not os.path.exists(path): logger.error("path: '%s' does not exist", path) sys.exit(1) if os.path.isfile(path): - await identify_json([path], binary) + await identify_json([path], binary, decodeb64) sys.exit(0) paths = await create_manifest(path) if not paths: logger.info("no files in directory: %s", path) sys.exit(1) - await identify_json(paths, binary) + await identify_json(paths, binary, decodeb64) def main() -> None: @@ -122,6 +140,12 @@ def main() -> None: required=False, action="store_true", ) + parser.add_argument( + "--base64", + help="attempt to decode contents as base64 (prototype)", + required=False, + action="store_true", + ) parser.add_argument( "--registry", help="path to a custom registry to lead into memory replacing the default", @@ -150,6 +174,7 @@ def main() -> None: process_data( path=args.path, binary=args.binary, + decodeb64=args.base64, ) )