|
9 | 9 | import re
|
10 | 10 | import shutil
|
11 | 11 | import sys
|
| 12 | +import tarfile |
12 | 13 | import tempfile
|
13 | 14 |
|
| 15 | +import zstandard |
14 | 16 | from rpmfile.cli import main as rpmextract
|
15 | 17 |
|
16 | 18 | from cve_bin_tool.async_utils import (
|
@@ -47,6 +49,8 @@ def __init__(self, logger=None, error_mode=ErrorMode.TruncTrace):
|
47 | 49 | self.extract_file_deb: {".deb", ".ipk"},
|
48 | 50 | self.extract_file_cab: {".cab"},
|
49 | 51 | self.extract_file_apk: {".apk"},
|
| 52 | + self.extract_file_zst: {".zst"}, |
| 53 | + self.extract_file_pkg: {".pkg"}, |
50 | 54 | self.extract_file_zip: {
|
51 | 55 | ".exe",
|
52 | 56 | ".zip",
|
@@ -109,6 +113,40 @@ async def extract_file_rpm(self, filename, extraction_path):
|
109 | 113 | return 1
|
110 | 114 | return 0
|
111 | 115 |
|
| 116 | + async def extract_file_zst(self, filename: str, extraction_path: str) -> int: |
| 117 | + """Extract zstd compressed files""" |
| 118 | + |
| 119 | + dctx = zstandard.ZstdDecompressor() |
| 120 | + with ErrorHandler(mode=ErrorMode.Ignore) as e: |
| 121 | + with open(filename, "rb") as f: |
| 122 | + with dctx.stream_reader(f) as reader: |
| 123 | + with tarfile.open(mode="r|", fileobj=reader) as tar: |
| 124 | + tar.extractall(extraction_path) |
| 125 | + tar.close() |
| 126 | + return e.exit_code |
| 127 | + |
| 128 | + async def extract_file_pkg(self, filename: str, extraction_path: str) -> int: |
| 129 | + """Extract pkg files""" |
| 130 | + |
| 131 | + # Tarfile wasn't used here because it can't open [.pkg] files directy |
| 132 | + # and failed to manage distinct compression types in differnet versions of FreeBSD packages. |
| 133 | + # Reference: https://github.com/intel/cve-bin-tool/pull/1580#discussion_r829346602 |
| 134 | + if await aio_inpath("tar"): |
| 135 | + stdout, stderr, return_code = await aio_run_command( |
| 136 | + ["tar", "xf", filename, "-C", extraction_path] |
| 137 | + ) |
| 138 | + if (stderr or not stdout) and return_code != 0: |
| 139 | + return 1 |
| 140 | + return 0 |
| 141 | + if await aio_inpath("7z"): |
| 142 | + stdout, stderr, _ = await aio_run_command( |
| 143 | + ["7z", "x", filename, "-o", extraction_path] |
| 144 | + ) |
| 145 | + if stderr or not stdout: |
| 146 | + return 1 |
| 147 | + return 0 |
| 148 | + return 1 |
| 149 | + |
112 | 150 | async def extract_file_deb(self, filename, extraction_path):
|
113 | 151 | """Extract debian packages"""
|
114 | 152 | if not await aio_inpath("ar"):
|
|
0 commit comments