diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 8a296b8..b533f99 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -8,8 +8,10 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
- # Once we move to Python 3.x, this can be tested against multiple Python versions. eg. [3.6, 3.7, 3.8, 3.9]
- python-version: [ 2.7 ]
+ # This allows the pipeline to be run against multiple Python versions. eg. [3.6, 3.7, 3.8, 3.9]. This results
+ # in linting and unit tests running for all listed versions as well as the creation of packages and wheels on
+ # creation of a tag in Git.
+ python-version: [ 3.7, 3.8, 3.9, "3.10", "3.11" ]
steps:
# Get the code from the repository to be packaged
@@ -78,6 +80,8 @@ jobs:
- name: Build Objects
if: startsWith(github.ref, 'refs/tags')
run: python setup.py sdist bdist_wheel
+ env:
+ TAG_VERSION: "${GITHUB_REF#refs/*/}"
# Ensure the objects were packaged correctly and there wasn't an issue with
# the compilation or packaging process.
diff --git a/.gitignore b/.gitignore
index f0c3ffc..5d4906d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -30,6 +30,8 @@
# Other
/output
/log
+
+# PyTest and Coverage
/case
/.pytest_cache
/htmlcov
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index 030f003..7de3aa5 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -17,6 +17,8 @@ stages:
- Test
- Build
+image: python:3.9
+
before_script:
- pip config set global.index-url ${PIP_URL}
- pip config set global.trusted-host ${ARTIFACTORY_HOST_NAME}
diff --git a/README.md b/README.md
index a85bb83..f38a8e8 100644
--- a/README.md
+++ b/README.md
@@ -298,6 +298,5 @@ TODO:
- [ ] Add additional logging messages to the master schema entries skipped in signature generation.
- [ ] Integrate in the SQLite Forensic Corpus into tests.
- [ ] Look into updating terminology for versioning to timelining.
-- [ ] Update code for compatibility with Python 3.
- [ ] Create PyUnit tests.
- [ ] Create a GUI.
diff --git a/docs/source/conf.py b/docs/source/conf.py
index fbb079f..cc063e1 100644
--- a/docs/source/conf.py
+++ b/docs/source/conf.py
@@ -22,7 +22,7 @@
author = 'Department of Defense Cyber Crime Center (DC3)'
# The full version, including alpha/beta/rc tags
-release = '0.2.0'
+release = '1.0.0'
# -- General configuration ---------------------------------------------------
master_doc = 'index'
diff --git a/docs/source/index.rst b/docs/source/index.rst
index d0111d3..5d29efb 100644
--- a/docs/source/index.rst
+++ b/docs/source/index.rst
@@ -7,6 +7,12 @@ the associated journal files. If they are not in the same directory as the speci
their location will need to be specified in the command. SQLite carving will not be done by default. See the full
command line options to enable carving.
+SQLite Dissect is primarily developed and maintained by `Department of Defense Cyber Crime Center (DC3) `_
+with support from open-source contributors. The tool is hosted on `GitHub `_.
+
+All released versions of the tool can be downloaded from the `GitHub releases page `_.
+
+
.. toctree::
:maxdepth: 2
diff --git a/docs/source/sqlite_dissect/getting_started.rst b/docs/source/sqlite_dissect/getting_started.rst
index b7c1246..3a44eab 100644
--- a/docs/source/sqlite_dissect/getting_started.rst
+++ b/docs/source/sqlite_dissect/getting_started.rst
@@ -3,11 +3,14 @@ Getting Started
System Requirements
+++++++++++++++++++
-SQLite Dissect depends on Python 2.7, with support for Python 3.x expected soon. It has been tested on Windows, OSX, and
-Linux (Ubuntu) platforms.
+SQLite Dissect depends on Python 3.6+, with automated tests run against versions 3.6, 3.7, 3.8, 3.9, and 3.10. It has
+been tested on Windows, OSX, and Linux (Ubuntu) platforms.
-To try to limit the need for dependencies, only one package is required for SQLite Dissect, which the `openpxl `_ package
-that is used for exporting the results into Excel format.
+SQLite Dissect versions up to and including 0.1.0 support Python 2.7 and can be downloaded from the
+`GitHub releases page `_.
+
+To try to limit the need for dependencies, only one package is required for SQLite Dissect, which is the
+`openpxl `_ package that is used for exporting the results into Excel format.
Installation
+++++++++++++++++++
diff --git a/setup.py b/setup.py
index 8fc0004..30c4d80 100644
--- a/setup.py
+++ b/setup.py
@@ -47,14 +47,18 @@
"sqlite_dissect.carving",
"sqlite_dissect.export"],
classifiers=[
- "Programming Language :: Python :: 2",
- "Programming Language :: Python :: 2.7"
+ "Programming Language :: Python :: 3",
+ "Programming Language :: Python :: 3.6",
+ "Programming Language :: Python :: 3.7",
+ "Programming Language :: Python :: 3.8",
+ "Programming Language :: Python :: 3.9",
+ "Programming Language :: Python :: 3.10"
],
entry_points={
'console_scripts': ['sqlite_dissect=sqlite_dissect.entrypoint:cli'],
},
install_requires=[
- "openpyxl==2.6.4",
+ "openpyxl==3.0.9",
"ConfigArgParse"
],
zip_safe=False
diff --git a/sqlite_dissect/_version.py b/sqlite_dissect/_version.py
index 8ab63c0..46f3a6b 100644
--- a/sqlite_dissect/_version.py
+++ b/sqlite_dissect/_version.py
@@ -4,4 +4,4 @@
This script identifies the version of the SQLite Dissect library.
"""
-__version__ = "0.2.0"
+__version__ = "1.0.0"
diff --git a/sqlite_dissect/carving/carved_cell.py b/sqlite_dissect/carving/carved_cell.py
index 3cbb263..b4f0ae3 100644
--- a/sqlite_dissect/carving/carved_cell.py
+++ b/sqlite_dissect/carving/carved_cell.py
@@ -1,23 +1,18 @@
from struct import unpack
from warnings import warn
-from sqlite_dissect.carving.utilities import calculate_body_content_size
-from sqlite_dissect.carving.utilities import calculate_serial_type_definition_content_length_min_max
-from sqlite_dissect.carving.utilities import decode_varint_in_reverse
-from sqlite_dissect.carving.utilities import get_content_size
-from sqlite_dissect.constants import BLOB_SIGNATURE_IDENTIFIER
-from sqlite_dissect.constants import CELL_LOCATION
-from sqlite_dissect.constants import FILE_TYPE
-from sqlite_dissect.constants import TEXT_SIGNATURE_IDENTIFIER
-from sqlite_dissect.exception import CellCarvingError
-from sqlite_dissect.exception import InvalidVarIntError
+
+from sqlite_dissect.carving.utilities import (
+ calculate_body_content_size,
+ calculate_serial_type_definition_content_length_min_max,
+ decode_varint_in_reverse, get_content_size)
+from sqlite_dissect.constants import (BLOB_SIGNATURE_IDENTIFIER, CELL_LOCATION,
+ FILE_TYPE, TEXT_SIGNATURE_IDENTIFIER)
+from sqlite_dissect.exception import CellCarvingError, InvalidVarIntError
from sqlite_dissect.file.database.page import BTreeCell
-from sqlite_dissect.file.database.payload import Payload
-from sqlite_dissect.file.database.payload import RecordColumn
-from sqlite_dissect.utilities import decode_varint
-from sqlite_dissect.utilities import encode_varint
-from sqlite_dissect.utilities import get_md5_hash
-from sqlite_dissect.utilities import get_record_content
-from sqlite_dissect.utilities import get_serial_type_signature
+from sqlite_dissect.file.database.payload import Payload, RecordColumn
+from sqlite_dissect.utilities import (decode_varint, encode_varint,
+ get_md5_hash, get_record_content,
+ get_serial_type_signature)
"""
@@ -221,7 +216,7 @@ def __init__(self, location, data, serial_type_definition_start_offset, serial_t
self.truncated_beginning = False
self.truncated_ending = False
- record_column_md5_hash_strings = [""] * self.number_of_columns
+ record_column_md5_hash_strings = [b""] * self.number_of_columns
column_index = 0
body_byte_size = 0
@@ -273,7 +268,7 @@ def __init__(self, location, data, serial_type_definition_start_offset, serial_t
self.serial_type_signature += str(get_serial_type_signature(first_serial_type))
- record_column_md5_hash_strings[column_index] = ""
+ record_column_md5_hash_strings[column_index] = b""
self.serial_type_definition_size += first_serial_type_varint_length
@@ -439,7 +434,7 @@ def __init__(self, location, data, serial_type_definition_start_offset, serial_t
self.serial_type_signature += str(get_serial_type_signature(first_serial_type))
- record_column_md5_hash_strings[column_index] = ""
+ record_column_md5_hash_strings[column_index] = b""
self.serial_type_definition_size += first_serial_type_varint_length
diff --git a/sqlite_dissect/carving/carver.py b/sqlite_dissect/carving/carver.py
index 14ef80b..2a845a4 100644
--- a/sqlite_dissect/carving/carver.py
+++ b/sqlite_dissect/carving/carver.py
@@ -3,12 +3,8 @@
from warnings import warn
from sqlite_dissect.carving.carved_cell import CarvedBTreeCell
from sqlite_dissect.carving.utilities import generate_signature_regex
-from sqlite_dissect.constants import BLOB_SIGNATURE_IDENTIFIER
-from sqlite_dissect.constants import CELL_LOCATION
-from sqlite_dissect.constants import LOGGER_NAME
-from sqlite_dissect.constants import TEXT_SIGNATURE_IDENTIFIER
-from sqlite_dissect.exception import CarvingError
-from sqlite_dissect.exception import CellCarvingError
+from sqlite_dissect.constants import BLOB_SIGNATURE_IDENTIFIER, CELL_LOCATION, LOGGER_NAME, TEXT_SIGNATURE_IDENTIFIER
+from sqlite_dissect.exception import CarvingError, CellCarvingError
"""
@@ -83,7 +79,7 @@ def carve_freeblocks(version, source, freeblocks, signature):
"carving freeblocks with signatures: {}. Signatures starting with variable length serial " \
"types are not fully implemented and may result in carving false positives."
log_message = log_message.format(first_column_serial_types, simplified_signature)
- logger.warn(log_message)
+ logger.warning(log_message)
warn(log_message, RuntimeWarning)
# Retrieve and compile the serial type definition signature pattern
@@ -186,7 +182,7 @@ def carve_freeblocks(version, source, freeblocks, signature):
serial_type_definition_start_offset,
serial_type_definition_end_offset, cutoff_offset,
number_of_columns, signature.name, signature.table_name)
- logger.warn(log_message)
+ logger.warning(log_message)
warn(log_message, RuntimeWarning)
# Return the cells carved from the freeblocks
@@ -406,7 +402,7 @@ def carve_unallocated_space(version, source, page_number, unallocated_space_star
serial_type_definition_start_offset,
serial_type_definition_end_offset, cutoff_offset,
number_of_columns, signature.name, signature.table_name)
- logger.warn(log_message)
+ logger.warning(log_message)
warn(log_message, RuntimeWarning)
"""
@@ -586,7 +582,7 @@ def carve_unallocated_space(version, source, page_number, unallocated_space_star
partial_serial_type_definition_end_offset,
partial_cutoff_offset, number_of_columns, signature.name,
signature.table_name)
- logger.warn(log_message)
+ logger.warning(log_message)
warn(log_message, RuntimeWarning)
# Return the cells carved from the freeblocks
diff --git a/sqlite_dissect/carving/signature.py b/sqlite_dissect/carving/signature.py
index 812b449..316b06b 100644
--- a/sqlite_dissect/carving/signature.py
+++ b/sqlite_dissect/carving/signature.py
@@ -2,7 +2,6 @@
from abc import abstractmethod
from copy import copy
from logging import getLogger
-from re import sub
from warnings import warn
from sqlite_dissect.carving.utilities import get_content_size
from sqlite_dissect.constants import LOGGER_NAME
@@ -84,7 +83,7 @@ def __init__(self, version_history, master_schema_entry, version_number=None, en
log_message = log_message.format(master_schema_entry.root_page_number,
master_schema_entry.row_type, master_schema_entry.name,
master_schema_entry.table_name, master_schema_entry.sql)
- logger.warn(log_message)
+ logger.warning(log_message)
warn(log_message, RuntimeWarning)
if master_schema_entry.internal_schema_object:
@@ -94,7 +93,7 @@ def __init__(self, version_history, master_schema_entry, version_number=None, en
log_message = log_message.format(master_schema_entry.root_page_number,
master_schema_entry.row_type, master_schema_entry.name,
master_schema_entry.table_name, master_schema_entry.sql)
- logger.warn(log_message)
+ logger.warning(log_message)
warn(log_message, RuntimeWarning)
log_message = "Creating signature for master schema entry with name: {} table name: {} row type: {} and " \
@@ -258,7 +257,7 @@ def __init__(self, version_history, master_schema_entry, version_number=None, en
"""
# Iterate through each of the records
- for cell_md5_hex_digest, record in records.iteritems():
+ for cell_md5_hex_digest, record in records.items():
"""
@@ -294,7 +293,7 @@ def __init__(self, version_history, master_schema_entry, version_number=None, en
total_table_row_signature_count = 0
# Iterate through the table row signatures and set the total rows and increment the count
- for serial_type_signature, table_row_signature in self.table_row_signatures.iteritems():
+ for serial_type_signature, table_row_signature in self.table_row_signatures.items():
table_row_signature.number_of_rows = self.unique_records
total_table_row_signature_count += table_row_signature.count
@@ -422,7 +421,7 @@ def __init__(self, version_history, master_schema_entry, version_number=None, en
table_row_columns = {}
# Iterate through the table row signatures and create the table row columns dictionary
- for table_row_md5_hex_digest, table_row_signature in self.table_row_signatures.iteritems():
+ for table_row_md5_hex_digest, table_row_signature in self.table_row_signatures.items():
# Iterate through all of the column signatures in the current table row signature
for column_index in range(len(table_row_signature.column_signatures)):
@@ -434,7 +433,7 @@ def __init__(self, version_history, master_schema_entry, version_number=None, en
table_row_columns[column_index] = [table_row_signature.column_signatures[column_index]]
# Iterate through the table row columns and create the table column signatures
- for table_row_column_index, table_row_column_serial_type_array in table_row_columns.iteritems():
+ for table_row_column_index, table_row_column_serial_type_array in table_row_columns.items():
column_name = column_definitions[table_row_column_index].column_name
self.table_column_signatures.append(TableColumnSignature(table_row_column_index, column_name,
table_row_column_serial_type_array))
@@ -541,7 +540,7 @@ def stringify(self, padding="", print_table_row_signatures=True, print_schema_co
signature_string = signature_string.format(schema_column_signature.stringify("\t"))
string += signature_string
if print_table_row_signatures:
- for table_row_md5_hex_digest, table_row_signature in self.table_row_signatures.iteritems():
+ for table_row_md5_hex_digest, table_row_signature in self.table_row_signatures.items():
signature_string = "\n" + padding + "Table Row Signature:\n{}"
signature_string = signature_string.format(table_row_signature.stringify("\t", print_column_signatures))
string += signature_string
@@ -820,10 +819,10 @@ def __init__(self, column_definition):
raise SignatureError(log_message)
def __repr__(self):
- return self.__str__().encode("hex")
+ return self.__str__()
def __str__(self):
- return sub("\t", "", sub("\n", " ", self.stringify()))
+ return self.stringify().replace('\t', '').replace('\n', ' ')
def stringify(self, padding=""):
string = padding + "Derived Data Type Name: {}\n" \
@@ -913,14 +912,14 @@ def __init__(self, index, name, column_signatures):
self._logger.error(log_message)
raise SignatureError(log_message)
- for column_signature_index, column_signature in self.column_signatures.iteritems():
+ for column_signature_index, column_signature in self.column_signatures.items():
column_signature.number_of_rows = self.count
def __repr__(self):
- return self.__str__().encode("hex")
+ return self.__str__()
def __str__(self):
- return sub("\t", "", sub("\n", " ", self.stringify()))
+ return self.stringify().replace('\t', '').replace('\n', ' ')
def stringify(self, padding="", print_column_signatures=True):
string = padding + "Index: {}\n" \
@@ -936,14 +935,14 @@ def stringify(self, padding="", print_column_signatures=True):
self.simplified_signature,
len(self.column_signatures))
if print_column_signatures:
- for column_signature_index, column_signature in self.column_signatures.iteritems():
+ for column_signature_index, column_signature in self.column_signatures.items():
string += "\n" + padding + "Column Signature:\n{}".format(column_signature.stringify(padding + "\t"))
return string
@property
def focused_probabilistic_signature(self):
focused_signatures = []
- for column_signature_index, column_signature in self.column_signatures.iteritems():
+ for column_signature_index, column_signature in self.column_signatures.items():
if isinstance(column_signature, ColumnVariableLengthSignature):
for serial_type in column_signature.variable_length_serial_types:
serial_type_probability = column_signature.get_variable_length_serial_type_probability(serial_type)
@@ -961,7 +960,7 @@ def focused_probabilistic_signature(self):
@property
def focused_signature(self):
focused_signatures = []
- for column_signature_index, column_signature in self.column_signatures.iteritems():
+ for column_signature_index, column_signature in self.column_signatures.items():
if isinstance(column_signature, ColumnVariableLengthSignature):
focused_signatures.extend(column_signature.variable_length_serial_types.keys())
elif isinstance(column_signature, ColumnFixedLengthSignature):
@@ -977,14 +976,14 @@ def focused_signature(self):
@property
def simplified_probabilistic_signature(self):
simplified_signatures = []
- for column_signature_index, column_signature in self.column_signatures.iteritems():
+ for column_signature_index, column_signature in self.column_signatures.items():
simplified_signatures.append((column_signature.serial_type, column_signature.probability))
return sorted(simplified_signatures, key=lambda x: x[0])
@property
def simplified_signature(self):
simplified_signatures = []
- for column_signature_index, column_signature in self.column_signatures.iteritems():
+ for column_signature_index, column_signature in self.column_signatures.items():
simplified_signatures.append(column_signature.serial_type)
return sorted(simplified_signatures, key=int)
@@ -1098,10 +1097,10 @@ def __init__(self, column_definitions, record):
raise SignatureError(log_message)
def __repr__(self):
- return self.__str__().encode("hex")
+ return self.__str__()
def __str__(self):
- return sub("\t", "", sub("\n", " ", self.stringify()))
+ return self.stringify().replace('\t', '').replace('\n', ' ')
def stringify(self, padding="", print_column_signatures=True):
string = padding + "Record Serial Type Signature: {}\n" \
@@ -1119,14 +1118,14 @@ def stringify(self, padding="", print_column_signatures=True):
self.simplified_signature,
len(self.column_signatures))
if print_column_signatures:
- for column_signature_index, column_signature in self.column_signatures.iteritems():
+ for column_signature_index, column_signature in self.column_signatures.items():
string += "\n" + padding + "Column Signature:\n{}".format(column_signature.stringify(padding + "\t"))
return string
@property
def focused_signature(self):
focused_signatures = []
- for column_signature_index, column_signature in self.column_signatures.iteritems():
+ for column_signature_index, column_signature in self.column_signatures.items():
if isinstance(column_signature, ColumnVariableLengthSignature):
focused_signatures.append(sorted(column_signature.variable_length_serial_types.keys(), key=int))
elif isinstance(column_signature, ColumnFixedLengthSignature):
@@ -1165,7 +1164,7 @@ def number_of_rows(self, number_of_rows):
self._number_of_rows = number_of_rows
- for column_signature_index, column_signature in self.column_signatures.iteritems():
+ for column_signature_index, column_signature in self.column_signatures.items():
column_signature.number_of_rows = number_of_rows
@property
@@ -1188,7 +1187,7 @@ def probability(self):
@property
def simplified_signature(self):
simplified_signatures = []
- for column_signature_index, column_signature in self.column_signatures.iteritems():
+ for column_signature_index, column_signature in self.column_signatures.items():
simplified_signatures.append([column_signature.serial_type])
return simplified_signatures
@@ -1311,10 +1310,10 @@ def __init__(self, index, name, serial_type, count=1):
raise ValueError(log_message)
def __repr__(self):
- return self.__str__().encode("hex")
+ return self.__str__()
def __str__(self):
- return sub("\t", "", sub("\n", " ", self.stringify()))
+ return self.stringify().replace('\t', '').replace('\n', ' ')
def stringify(self, padding=""):
string = padding + "Index: {}\n" \
@@ -1519,7 +1518,7 @@ def update(self, serial_type, count=None, variable_length_serial_types=None):
self.count += count
- for variable_length_serial_type, variable_length_serial_type_count in variable_length_serial_types.iteritems():
+ for variable_length_serial_type, variable_length_serial_type_count in variable_length_serial_types.items():
if variable_length_serial_type in self.variable_length_serial_types:
self.variable_length_serial_types[variable_length_serial_type] += variable_length_serial_type_count
else:
diff --git a/sqlite_dissect/carving/utilities.py b/sqlite_dissect/carving/utilities.py
index 78a3481..4ab2688 100644
--- a/sqlite_dissect/carving/utilities.py
+++ b/sqlite_dissect/carving/utilities.py
@@ -1,11 +1,9 @@
-from binascii import hexlify
-from binascii import unhexlify
+from binascii import hexlify, unhexlify
from logging import getLogger
-from sqlite_dissect.constants import BLOB_SIGNATURE_IDENTIFIER
-from sqlite_dissect.constants import LOGGER_NAME
-from sqlite_dissect.constants import TEXT_SIGNATURE_IDENTIFIER
-from sqlite_dissect.exception import CarvingError
-from sqlite_dissect.exception import InvalidVarIntError
+
+from sqlite_dissect.constants import (BLOB_SIGNATURE_IDENTIFIER, LOGGER_NAME,
+ TEXT_SIGNATURE_IDENTIFIER)
+from sqlite_dissect.exception import CarvingError, InvalidVarIntError
from sqlite_dissect.utilities import decode_varint
"""
@@ -26,10 +24,9 @@
"""
-def decode_varint_in_reverse(byte_array, offset, max_varint_length=9):
+def decode_varint_in_reverse(byte_array: bytearray, offset: int, max_varint_length=9):
"""
-
This function will move backwards through a byte array trying to decode a varint in reverse. A InvalidVarIntError
will be raised if a varint is not found by this algorithm used in this function. The calling logic should check
for this case in case it is encountered which is likely in the context of carving.
@@ -57,7 +54,6 @@ def decode_varint_in_reverse(byte_array, offset, max_varint_length=9):
algorithm in this function. This error is not logged as an error but rather a
debug statement since it is very likely to occur during carving and should be handled
appropriately.
-
"""
if offset > len(byte_array):
@@ -180,9 +176,9 @@ def generate_regex_for_simplified_serial_type(simplified_serial_type):
"""
if simplified_serial_type == -2:
- return "(?:[\x0C-\x7F]|[\x80-\xFF]{1,7}[\x00-\x7F])"
+ return b"(?:[\x0C-\x7F]|[\x80-\xFF]{1,7}[\x00-\x7F])"
elif simplified_serial_type == -1:
- return "(?:[\x0D-\x7F]|[\x80-\xFF]{1,7}[\x00-\x7F])"
+ return b"(?:[\x0D-\x7F]|[\x80-\xFF]{1,7}[\x00-\x7F])"
elif 0 <= simplified_serial_type <= 9:
return unhexlify("0{}".format(simplified_serial_type))
else:
@@ -217,7 +213,7 @@ def generate_signature_regex(signature, skip_first_serial_type=False):
"""
- regex = ""
+ regex = b""
if skip_first_serial_type:
signature = signature[1:]
@@ -242,9 +238,9 @@ def generate_signature_regex(signature, skip_first_serial_type=False):
"""
- basic_serial_type_regex = ""
- blob_regex = ""
- text_regex = ""
+ basic_serial_type_regex = b""
+ blob_regex = b""
+ text_regex = b""
for column_serial_type in column_serial_type_array:
if column_serial_type == -1:
@@ -257,7 +253,7 @@ def generate_signature_regex(signature, skip_first_serial_type=False):
if blob_regex or text_regex:
if basic_serial_type_regex:
- basic_serial_type_regex = "[{}]".format(basic_serial_type_regex)
+ basic_serial_type_regex = b"[%b]" % basic_serial_type_regex
if blob_regex and not text_regex:
@@ -269,7 +265,7 @@ def generate_signature_regex(signature, skip_first_serial_type=False):
getLogger(LOGGER_NAME).error(log_message)
raise CarvingError(log_message)
- regex += "(?:{}|{})".format(basic_serial_type_regex, blob_regex)
+ regex += b"(?:%b|%b)" % (basic_serial_type_regex, blob_regex)
elif not blob_regex and text_regex:
@@ -282,15 +278,15 @@ def generate_signature_regex(signature, skip_first_serial_type=False):
getLogger(LOGGER_NAME).error(log_message)
raise CarvingError(log_message)
- regex += "(?:{}|{})".format(basic_serial_type_regex, text_regex)
+ regex += b"(?:%b|%b)" % (basic_serial_type_regex, text_regex)
elif blob_regex and text_regex:
- var_length_regex = blob_regex + "|" + text_regex
+ var_length_regex = blob_regex + b"|" + text_regex
if basic_serial_type_regex:
- regex += "(?:{}|{})".format(basic_serial_type_regex, var_length_regex)
+ regex += b"(?:%b|%b)" % (basic_serial_type_regex, var_length_regex)
else:
- regex += "(?:{})".format(var_length_regex)
+ regex += b"(?:%b)" % var_length_regex
else:
log_message = "No appropriate regular expressions were found for basic serial type, blob, or " \
@@ -317,7 +313,7 @@ def generate_signature_regex(signature, skip_first_serial_type=False):
getLogger(LOGGER_NAME).error(log_message)
raise CarvingError(log_message)
- regex += "[{}]".format(basic_serial_type_regex)
+ regex += b"[%b]" % basic_serial_type_regex
else:
@@ -374,11 +370,11 @@ def get_content_size(serial_type):
# A BLOB that is (N-12)/2 bytes in length
elif serial_type >= 12 and serial_type % 2 == 0:
- return (serial_type - 12) / 2
+ return (serial_type - 12) // 2
- # A string in the database encoding and is (N-13)/2 bytes in length. The nul terminator is omitted
+ # A string in the database encoding and is (N-13)/2 bytes in length. The null terminator is omitted
elif serial_type >= 13 and serial_type % 2 == 1:
- return (serial_type - 13) / 2
+ return int((serial_type - 13) // 2)
else:
log_message = "Invalid serial type: {}."
diff --git a/sqlite_dissect/constants.py b/sqlite_dissect/constants.py
index 02f904f..eac1940 100644
--- a/sqlite_dissect/constants.py
+++ b/sqlite_dissect/constants.py
@@ -1,4 +1,9 @@
-from collections import MutableMapping
+# https://docs.python.org/3.9/library/collections.html
+try:
+ from collections.abc import MutableMapping
+except ImportError:
+ from collections import MutableMapping
+
from logging import getLogger
from re import compile
from sys import maxunicode
@@ -79,7 +84,7 @@ def __len__(self):
LOCK_BYTE_PAGE_END_OFFSET = 1073742336
SQLITE_DATABASE_HEADER_LENGTH = 100
-MAGIC_HEADER_STRING = "SQLite format 3\000"
+MAGIC_HEADER_STRING = b'SQLite format 3\000'
MAGIC_HEADER_STRING_ENCODING = UTF_8
MAXIMUM_PAGE_SIZE_INDICATOR = 1
MINIMUM_PAGE_SIZE_LIMIT = 512
@@ -155,7 +160,7 @@ def __len__(self):
TEXT_SIGNATURE_IDENTIFIER = -2
ZERO_BYTE = b'\x00'
-ALL_ZEROS_REGEX = "^0*$"
+ALL_ZEROS_REGEX = b"^0*$"
SQLITE_MASTER_SCHEMA_ROOT_PAGE = 1
MASTER_SCHEMA_COLUMN = Enum({"TYPE": 0, "NAME": 1, "TABLE_NAME": 2, "ROOT_PAGE": 3, "SQL": 4})
@@ -240,8 +245,8 @@ def __len__(self):
ROLLBACK_JOURNAL_ALL_CONTENT_UNTIL_END_OF_FILE = -1
ROLLBACK_JOURNAL_POSTFIX = "-journal"
ROLLBACK_JOURNAL_HEADER_LENGTH = 28
-ROLLBACK_JOURNAL_HEADER_HEX_STRING = 'd9d505f920a163d7'
-ROLLBACK_JOURNAL_HEADER_ALL_CONTENT = 'ffffffff'
+ROLLBACK_JOURNAL_HEADER_HEX_STRING = b'\xd9\xd5\x05\xf9\x20\xa1\x63\xd7'
+ROLLBACK_JOURNAL_HEADER_ALL_CONTENT = b'\xff\xff\xff\xff'
BASE_VERSION_NUMBER = 0
COMMIT_RECORD_BASE_VERSION_NUMBER = BASE_VERSION_NUMBER + 1
@@ -293,5 +298,5 @@ def __len__(self):
(0xDFFFE, 0xDFFFF), (0xEFFFE, 0xEFFFF), (0xFFFFE, 0xFFFFF),
(0x10FFFE, 0x10FFFF)])
-_illegal_xml_ranges = ["%s-%s" % (unichr(low), unichr(high)) for (low, high) in _illegal_xml_characters]
+_illegal_xml_ranges = ["%s-%s" % (chr(low), chr(high)) for (low, high) in _illegal_xml_characters]
ILLEGAL_XML_CHARACTER_PATTERN = compile(u'[%s]' % u''.join(_illegal_xml_ranges))
diff --git a/sqlite_dissect/entrypoint.py b/sqlite_dissect/entrypoint.py
index 7a92ea6..2bf9579 100644
--- a/sqlite_dissect/entrypoint.py
+++ b/sqlite_dissect/entrypoint.py
@@ -2,12 +2,7 @@
import warnings
from logging import CRITICAL, ERROR, WARNING, INFO, DEBUG, basicConfig, getLogger, StreamHandler, FileHandler
from os import path
-from os.path import basename
-from os.path import join
-from os.path import exists
-from os.path import getsize
-from os.path import normpath
-from os.path import sep
+from os.path import basename, abspath, join, exists, getsize, normpath, sep
from time import time
from warnings import warn
from sqlite_dissect.carving.rollback_journal_carver import RollBackJournalCarver
@@ -45,7 +40,7 @@
"""
-def main(arguments, sqlite_file_path, export_sub_paths=False):
+def main(arguments, sqlite_file_path: str, export_sub_paths=False):
"""
The primary entrypoint for the SQLite Dissect carving utility.
@@ -85,8 +80,8 @@ def main(arguments, sqlite_file_path, export_sub_paths=False):
datefmt='%d %b %Y %H:%M:%S',
filename=arguments.log_file if arguments.log_file else None)
- logger.debug("Setup logging using the log level: {}.".format(logging_level))
- logger.info("Using options: {}".format(arguments))
+ logger.debug(f"Setup logging using the log level: {logging_level}.")
+ logger.info(f"Using options: {arguments}")
case = CaseExporter(logger)
case.start_datetime = datetime.now()
@@ -121,7 +116,7 @@ def main(arguments, sqlite_file_path, export_sub_paths=False):
# Setup the export type
export_types = [EXPORT_TYPES.TEXT]
if arguments.export and len(export_types) > 0:
- export_types = map(str.upper, arguments.export)
+ export_types = list(map(str.upper, arguments.export))
# Setup the strict format checking
strict_format_checking = True
@@ -141,7 +136,7 @@ def main(arguments, sqlite_file_path, export_sub_paths=False):
output_directory = None
if arguments.directory:
if not exists(arguments.directory):
- raise SqliteError("Unable to find output directory: {}.".format(arguments.directory))
+ raise SqliteError(f"Unable to find output directory: {arguments.directory}.")
output_directory = arguments.directory
# Determine if there are sub-paths being configured for exports
if export_sub_paths:
@@ -152,23 +147,23 @@ def main(arguments, sqlite_file_path, export_sub_paths=False):
else:
raise IOError("Unable to create the new sub-directory: {}", join(output_directory, subpath))
- logger.debug("Determined export type to be {} with file prefix: {} and output directory: {}"
- .format(', '.join(export_types), file_prefix, output_directory))
+ logger.debug(
+ f"Determined export type to be {export_types} with file prefix: {file_prefix} and output directory: {output_directory}")
# Obtain the SQLite file
if not exists(sqlite_file_path):
- raise SqliteError("Unable to find SQLite file: {}.".format(sqlite_file_path))
+ raise SqliteError(f"Unable to find SQLite file: {sqlite_file_path}.")
"""
-
+
If the file is a zero length file, we set a flag indicating it and check to make sure there are no associated wal
or journal files before just exiting out stating that the file was empty. If a (non-zero length) wal or journal
file is found, an exception will be thrown. However, if the no-journal option is specified, the journal files will
not be checked, and the program will exit.
-
+
Note: It is currently believed that there cannot be a zero length SQLite database file with a wal or journal file.
That is why an exception is thrown here but needs to be investigated to make sure.
-
+
"""
# See if the SQLite file is zero-length
@@ -182,11 +177,11 @@ def main(arguments, sqlite_file_path, export_sub_paths=False):
if not arguments.no_journal:
if arguments.wal:
if not exists(arguments.wal):
- raise SqliteError("Unable to find wal file: {}.".format(arguments.wal))
+ raise SqliteError(f"Unable to find wal file: {arguments.wal}.")
wal_file_name = arguments.wal
elif arguments.rollback_journal:
if not exists(arguments.rollback_journal):
- raise SqliteError("Unable to find rollback journal file: {}.".format(arguments.rollback_journal))
+ raise SqliteError(f"Unable to find rollback journal file: {arguments.rollback_journal}.")
rollback_journal_file_name = arguments.rollback_journal
else:
if exists(sqlite_file_path + WAL_FILE_POSTFIX):
@@ -217,64 +212,65 @@ def main(arguments, sqlite_file_path, export_sub_paths=False):
if wal_file_name and not zero_length_wal_file:
"""
-
+
Here we throw an exception if we find a wal file with content with no content in the original SQLite file.
It is not certain this use case can occur and investigation needs to be done to make certain. There have
been scenarios where there will be a database header with no schema or content in a database file with a
WAL file that has all the schema entries and content but this is handled differently.
-
+
"""
- raise SqliteError(
- "Found a zero length SQLite file with a wal file: {}. Unable to parse.".format(arguments.wal))
+ raise SqliteError(f"Found a zero length SQLite file with a wal file: {arguments.wal}. Unable to parse.")
elif zero_length_wal_file:
- print("File: {} with wal file: {} has no content. Nothing to parse."
- .format(sqlite_file_path, wal_file_name))
+ logger.error(f"File: {sqlite_file_path} with wal file: {wal_file_name} has no content. Nothing to parse.")
exit(0)
elif rollback_journal_file_name and not zero_length_rollback_journal_file:
"""
-
+
Here we will only have a rollback journal file. Currently, since we need to have the database file to parse
signatures from, we cannot solely carve on the journal file alone.
-
+
"""
- raise SqliteError("Found a zero length SQLite file with a rollback journal file: {}. Unable to parse."
- .format(arguments.rollback_journal))
+ raise SqliteError(
+ f"Found a zero length SQLite file with a rollback journal file: {arguments.rollback_journal}. "
+ f"Unable to parse.")
elif zero_length_rollback_journal_file:
- print("File: {} with rollback journal file: {} has no content. Nothing to parse."
- .format(sqlite_file_path, rollback_journal_file_name))
+ logger.error(
+ f"File: {sqlite_file_path} with rollback journal file: {rollback_journal_file_name} has no content. "
+ f"Nothing to parse.")
exit(0)
else:
- print("File: {} has no content. Nothing to parse.".format(sqlite_file_path))
+ logger.error("File: {} has no content. Nothing to parse.".format(sqlite_file_path))
exit(0)
# Make sure that both of the journal files are not found
if rollback_journal_file_name and wal_file_name:
"""
-
+
Since the arguments have you specify the journal file in a way that you can only set the wal or rollback journal
file name, this case can only occur from finding both of the files on the file system for both wal and rollback
journal when there is no journal options specified. Since the SQLite database cannot be set to use both wal and
journal files in the same running, we determine this to be an error and throw and exception up.
-
+
There may be a case where the mode was changed at some point and there is a single SQLite file with one or more
journal files in combination of rollback journal and WAL files. More research would have to take place in this
scenario and also take into the account of this actually occurring since in most cases it is set statically
by the application SQLite database owner.
-
+
"""
- raise SqliteError("Found both a rollback journal: {} and wal file: {}. Only one journal file should exist. "
- "Unable to parse.".format(arguments.rollback_journal, arguments.wal))
+ raise SqliteError(
+ f"Found both a rollback journal: {arguments.rollback_journal} and wal file: {arguments.wal}. "
+ f"Only one journal file should exist. Unable to parse.")
# Print a message parsing is starting and log the start time for reporting at the end on amount of time to run
- print("\nParsing: {}...".format(sqlite_file_path))
+ logger.info(f"\nParsing: {sqlite_file_path}...")
start_time = time()
# Create the database and wal/rollback journal file (if existent)
@@ -294,20 +290,24 @@ def main(arguments, sqlite_file_path, export_sub_paths=False):
# Check if the header info was asked for
if arguments.header:
# Print the header info of the database
- print("\nDatabase header information:\n{}".format(database.database_header.stringify(padding="\t")))
- print("Continuing to parse...")
+ str_header = database.database_header.stringify(padding="\t")
+ logger.debug(f"\nDatabase header information:\n{str_header}")
+ # Print to stdout if the "text" output option was selected
+ if not export_types or EXPORT_TYPES.TEXT in export_types:
+ print(f"\nDatabase header information:\n{str_header}")
+ logger.debug("Continuing to parse...")
# Check if the master schema was asked for
if arguments.schema:
# print the master schema of the database
- print("\nDatabase Master Schema:\n{}".format(stringify_master_schema_version(database)))
- print("Continuing to parse...")
+ logger.debug(f"\nDatabase Master Schema:\n{stringify_master_schema_version(database)}")
+ logger.debug("Continuing to parse...")
# Check if the schema history was asked for
if arguments.schema_history:
# print the master schema version history
- print("\nVersion History of Master Schemas:\n{}".format(stringify_master_schema_versions(version_history)))
- print("Continuing to parse...")
+ logger.debug(f"\nVersion History of Master Schemas:\n{stringify_master_schema_versions(version_history)}")
+ logger.debug("Continuing to parse...")
# Get the signature options
print_signatures = arguments.signatures
@@ -320,7 +320,7 @@ def main(arguments, sqlite_file_path, export_sub_paths=False):
if not carve and carve_freelists:
log_message = "The carve option was not set but the carve_freelists option was. Disabling carve_freelists. " \
"Please specify the carve option to enable."
- logger.warn(log_message)
+ logger.warning(log_message)
warn(log_message, RuntimeWarning)
# Specific tables to be carved
@@ -331,8 +331,8 @@ def main(arguments, sqlite_file_path, export_sub_paths=False):
if rollback_journal_exempted_tables and specified_tables_to_carve:
for table in rollback_journal_exempted_tables:
if table in specified_tables_to_carve:
- print("Table: {} found in both exempted and specified tables. Please update the arguments correctly."
- .format(table))
+ logger.error(f"Table: {table} found in both exempted and specified tables. Please update the "
+ f"arguments correctly.")
exit(0)
# See if we need to generate signatures
@@ -358,7 +358,7 @@ def main(arguments, sqlite_file_path, export_sub_paths=False):
Note: This is not allowing "without rowid" or virtual tables until further testing is done.
(Virtual tables tend to have a root page number of 0 with no data stored in the main table. Further
investigation is needed.)
-
+
Note: Table internal schema objects will not be accounted for. These are tables that start with "sqlite_"
and are used for internal use to SQLite itself. These have never known to produce any forensic
pertinent data.
@@ -368,25 +368,25 @@ def main(arguments, sqlite_file_path, export_sub_paths=False):
if isinstance(master_schema_entry, OrdinaryTableRow):
if master_schema_entry.without_row_id:
- log_message = "A `without row_id` table was found: {} and will not have a signature generated " \
- "for carving since it is not supported yet.".format(master_schema_entry.table_name)
+ log_message = f"A `without row_id` table was found: {master_schema_entry.table_name} and will not" \
+ " have a signature generated for carving since it is not supported yet."
logger.info(log_message)
continue
if master_schema_entry.internal_schema_object:
- log_message = "A `internal schema` table was found: {} and will not have a signature generated " \
- "for carving since it is not supported yet.".format(master_schema_entry.table_name)
+ log_message = f"A `internal schema` table was found: {master_schema_entry.table_name} and will " \
+ f"not have a signature generated for carving since it is not supported yet."
logger.info(log_message)
continue
signatures[master_schema_entry.name] = Signature(version_history, master_schema_entry)
if print_signatures:
- print("\nSignature:\n{}".format(signatures[master_schema_entry.name]
- .stringify("\t", False, False, False)))
+ printable_signature = signatures[master_schema_entry.name].stringify("\t", False, False, False)
+ logger.debug(f"\nSignature:\n{printable_signature}")
"""
-
+
Note: Master schema entries (schema) are all pulled from the base version (the SQLite database file). Currently,
the master schema entries are taken from the base version. Even though schema additions are handled in the
WAL file for existing tables, tables added in the WAL have not been accounted for yet.
@@ -455,32 +455,35 @@ def main(arguments, sqlite_file_path, export_sub_paths=False):
# The export type was not found (this should not occur due to the checking of argparse)
if not exported:
- raise SqliteError("Invalid option for export type: {}.".format(', '.join(export_types)))
+ raise SqliteError(f"Invalid option for export type: {(', '.join(export_types))}.")
# Carve the rollback journal if found and carving is not specified
if rollback_journal_file and not carve:
- print("Rollback journal file found: {}. Rollback journal file parsing is under development and "
- "currently only supports carving. Please rerun with the --carve option for this output.")
+ logger.warning(f"Rollback journal file found: {rollback_journal_file}. Rollback journal file parsing is under "
+ f"development and currently only supports carving. Please rerun with the --carve option for this"
+ f" output.")
# Carve the rollback journal if found and carving is specified
if rollback_journal_file and carve:
if not output_directory:
- print("Rollback journal file found: {}. Rollback journal file carving is under development and "
- "currently only outputs to CSV. Due to this, the output directory needs to be specified. Please"
- "rerun with a output directory specified in order for this to complete.")
+ logger.error(f"Rollback journal file found: {rollback_journal_file}. Rollback journal file carving is "
+ f"under development and currently only outputs to CSV. Due to this, the output directory "
+ f"needs to be specified. Please rerun with a output directory specified in order for this to "
+ f"complete.")
else:
- print("Carving rollback journal file: {}. Rollback journal file carving is under development and "
- "currently only outputs to CSV. Any export type specified will be overridden for this.")
+ logger.error(f"Carving rollback journal file: {rollback_journal_file}. Rollback journal file carving is "
+ f"under development and currently only outputs to CSV. Any export type specified will be "
+ f"overridden for this.")
carve_rollback_journal(output_directory, rollback_journal_file, rollback_journal_file_name,
specified_tables_to_carve, rollback_journal_exempted_tables,
version_history, signatures, logger)
- print("Finished in {} seconds.".format(round(time() - start_time, 2)))
+ logger.info(f"Finished in {round(time() - start_time, 2)} seconds.")
def print_text(output_directory, file_prefix, carve, carve_freelists, specified_tables_to_carve,
@@ -497,8 +500,8 @@ def print_text(output_directory, file_prefix, carve, carve_freelists, specified_
text_file_name = file_prefix + file_postfix
# Export all index and table histories to a text file while supplying signature to carve with
- print("\nExporting history as text to {}{}{}...".format(output_directory, sep, text_file_name))
- logger.debug("Exporting history as text to {}{}{}.".format(output_directory, sep, text_file_name))
+ print(f"\nExporting history as text to {output_directory}{sep}{text_file_name}...")
+ logger.debug(f"Exporting history as text to {output_directory}{sep}{text_file_name}.")
with CommitTextExporter(output_directory, text_file_name) as commit_text_exporter:
@@ -519,10 +522,10 @@ def print_text(output_directory, file_prefix, carve, carve_freelists, specified_
if not signature and master_schema_entry.row_type is MASTER_SCHEMA_ROW_TYPE.TABLE \
and not master_schema_entry.without_row_id \
and not master_schema_entry.internal_schema_object:
- print("Unable to find signature for: {}. This table will not be carved."
- .format(master_schema_entry.name))
- logger.error("Unable to find signature for: {}. This table will not be carved."
- .format(master_schema_entry.name))
+ print(f"Unable to find signature for: {master_schema_entry.name}. This table will not be "
+ f"carved.")
+ logger.error(f"Unable to find signature for: {master_schema_entry.name}. This table will "
+ f"not be carved.")
if signature:
version_history_parser = VersionHistoryParser(version_history, master_schema_entry, None, None,
@@ -543,7 +546,7 @@ def print_text(output_directory, file_prefix, carve, carve_freelists, specified_
else:
# Export all index and table histories to csv files while supplying signature to carve with
- logger.debug("Exporting history to {} as text.".format("console"))
+ logger.debug("Exporting history to console as text.")
for master_schema_entry in version_history.versions[BASE_VERSION_NUMBER].master_schema.master_schema_entries:
@@ -560,10 +563,10 @@ def print_text(output_directory, file_prefix, carve, carve_freelists, specified_
if not signature and master_schema_entry.row_type is MASTER_SCHEMA_ROW_TYPE.TABLE \
and not master_schema_entry.without_row_id \
and not master_schema_entry.internal_schema_object:
- print("Unable to find signature for: {}. This table will not be carved."
- .format(master_schema_entry.name))
- logger.error("Unable to find signature for: {}. This table will not be carved."
- .format(master_schema_entry.name))
+ print(f"Unable to find signature for: {master_schema_entry.name}. This table will not be "
+ f"carved.")
+ logger.error(f"Unable to find signature for: {master_schema_entry.name}. This table will not "
+ f"be carved.")
if signature:
version_history_parser = VersionHistoryParser(version_history, master_schema_entry, None, None,
@@ -585,8 +588,7 @@ def print_text(output_directory, file_prefix, carve, carve_freelists, specified_
def print_csv(output_directory, file_prefix, carve, carve_freelists, specified_tables_to_carve,
version_history, signatures, logger):
# Export all index and table histories to csv files while supplying signature to carve with
- print("\nExporting history as CSV to {}...".format(output_directory))
- logger.debug("Exporting history to {} as CSV.".format(output_directory))
+ logger.info(f"Exporting history to {output_directory} as CSV.")
commit_csv_exporter = CommitCsvExporter(output_directory, file_prefix)
@@ -605,10 +607,8 @@ def print_csv(output_directory, file_prefix, carve, carve_freelists, specified_t
if not signature and master_schema_entry.row_type is MASTER_SCHEMA_ROW_TYPE.TABLE \
and not master_schema_entry.without_row_id \
and not master_schema_entry.internal_schema_object:
- print("Unable to find signature for: {}. This table will not be carved."
- .format(master_schema_entry.name))
- logger.error("Unable to find signature for: {}. This table will not be carved."
- .format(master_schema_entry.name))
+ logger.error(f"Unable to find signature for: {master_schema_entry.name}. This table will not be "
+ f"carved.")
if signature:
version_history_parser = VersionHistoryParser(version_history, master_schema_entry, None, None,
@@ -628,7 +628,6 @@ def print_sqlite(output_directory, file_prefix, carve, carve_freelists,
file_postfix = "-sqlite-dissect.db3"
sqlite_file_name = file_prefix + file_postfix
- print("\nExporting history as SQLite to {}{}{}...".format(output_directory, sep, sqlite_file_name))
logger.debug("Exporting history as SQLite to {}{}{}.".format(output_directory, sep, sqlite_file_name))
with CommitSqliteExporter(output_directory, sqlite_file_name) as commit_sqlite_exporter:
@@ -648,8 +647,6 @@ def print_sqlite(output_directory, file_prefix, carve, carve_freelists,
if not signature and master_schema_entry.row_type is MASTER_SCHEMA_ROW_TYPE.TABLE \
and not master_schema_entry.without_row_id \
and not master_schema_entry.internal_schema_object:
- print("Unable to find signature for: {}. This table will not be carved."
- .format(master_schema_entry.name))
logger.error("Unable to find signature for: {}. This table will not be carved."
.format(master_schema_entry.name))
@@ -671,7 +668,6 @@ def print_xlsx(output_directory, file_prefix, carve, carve_freelists, specified_
xlsx_file_name = file_prefix + file_postfix
# Export all index and table histories to a xlsx workbook while supplying signature to carve with
- print("\nExporting history as XLSX to {}{}{}...".format(output_directory, sep, xlsx_file_name))
logger.debug("Exporting history as XLSX to {}{}{}.".format(output_directory, sep, xlsx_file_name))
with CommitXlsxExporter(output_directory, xlsx_file_name) as commit_xlsx_exporter:
@@ -691,8 +687,6 @@ def print_xlsx(output_directory, file_prefix, carve, carve_freelists, specified_
if not signature and master_schema_entry.row_type is MASTER_SCHEMA_ROW_TYPE.TABLE \
and not master_schema_entry.without_row_id \
and not master_schema_entry.internal_schema_object:
- print("Unable to find signature for: {}. This table will not be carved."
- .format(master_schema_entry.name))
logger.error("Unable to find signature for: {}. This table will not be carved."
.format(master_schema_entry.name))
@@ -723,7 +717,6 @@ def carve_rollback_journal(output_directory, rollback_journal_file, rollback_jou
"""
csv_prefix_rollback_journal_file_name = basename(normpath(rollback_journal_file_name))
- print("Exporting rollback journal carvings as CSV to {}...".format(output_directory))
logger.debug("Exporting rollback journal carvings as csv to output directory: {}.".format(output_directory))
commit_csv_exporter = CommitCsvExporter(output_directory, csv_prefix_rollback_journal_file_name)
@@ -763,21 +756,15 @@ def carve_rollback_journal(output_directory, rollback_journal_file, rollback_jou
commit_csv_exporter.write_commit(master_schema_entry, commit)
else:
- print("Unable to find signature for: {}. This table will not be carved from the rollback journal."
- .format(master_schema_entry.name))
logger.error("Unable to find signature for: {}. This table will not be carved from the "
"rollback journal.".format(master_schema_entry.name))
def cli():
- """
- Serves as a primary CLI entrypoint to parse the arguments from the CLI and call the main() function to parse the
- arguments and process the SQLite files into the specified outputs.
- """
# Determine if a directory has been passed instead of a file, in which case, find all
args = parse_args()
if args.sqlite_path is not None:
- sqlite_files = get_sqlite_files(args.sqlite_path)
+ sqlite_files = get_sqlite_files(abspath(args.sqlite_path))
# Ensure there is at least one SQLite file
if len(sqlite_files) > 0:
for sqlite_file in sqlite_files:
diff --git a/sqlite_dissect/export/case_export.py b/sqlite_dissect/export/case_export.py
index 23ce9a8..1fc0826 100644
--- a/sqlite_dissect/export/case_export.py
+++ b/sqlite_dissect/export/case_export.py
@@ -8,6 +8,7 @@
import uuid
from datetime import datetime
from os import path
+from typing import Optional
from sqlite_dissect._version import __version__
from sqlite_dissect.utilities import hash_file
@@ -27,13 +28,13 @@ class CaseExporter(object):
# Define the formatted logger that is provided by the main.py execution path
logger = None
- result_guids = []
+ result_guids: list = []
configuration_guid = ""
# Defines the initial structure for the CASE export. This will be supplemented with various methods that get called
# from the main.py execution path.
- case = {
+ case: dict = {
"@context": {
"case-investigation": "https://ontology.caseontology.org/case/investigation/",
"kb": "http://example.org/kb/",
@@ -59,7 +60,7 @@ def __init__(self, logger):
self.logger = logger
self.configuration_guid = ("kb:configuration-" + str(uuid.uuid4()))
- def register_options(self, options):
+ def register_options(self, options: list):
"""
Adds the command line options provided as the configuration values provided and outputting them in the schema
defined in the uco-configuration namespace.
@@ -92,7 +93,7 @@ def register_options(self, options):
# Add the configuration object to the in progress CASE object
self.case['@graph'].append(configuration)
- def add_observable_file(self, filepath, filetype=None):
+ def add_observable_file(self, filepath: str, filetype: str = None) -> Optional[str]:
"""
Adds the file specified in the provided filepath as an ObservableObject in the CASE export. This method handles
calculation of filesize, extension, MD5 hash, SHA1 hash, and other metadata expected in the Observable TTL spec.
@@ -201,8 +202,9 @@ def add_observable_file(self, filepath, filetype=None):
return guid
else:
self.logger.critical('Attempting to add invalid filepath to CASE Observable export: {}'.format(filepath))
+ return None
- def link_observable_relationship(self, source_guid, target_guid, relationship):
+ def link_observable_relationship(self, source_guid: str, target_guid: str, relationship: str) -> None:
self.case['@graph'].append({
"@id": ("kb:export-artifact-relationship-" + str(uuid.uuid4())),
"@type": "uco-observable:ObservableRelationship",
@@ -219,7 +221,7 @@ def link_observable_relationship(self, source_guid, target_guid, relationship):
"uco-core:isDirectional": True
})
- def add_export_artifacts(self, export_paths=None):
+ def add_export_artifacts(self, export_paths: list = None):
"""
Loops through the list of provided export artifact paths and adds them as observables and links them to the
original observable artifact
@@ -233,7 +235,7 @@ def add_export_artifacts(self, export_paths=None):
# Add the export result GUID to the list to be extracted
self.result_guids.append(export_guid)
- def generate_provenance_record(self, description, guids):
+ def generate_provenance_record(self, description: str, guids: list) -> Optional[str]:
"""
Generates a provenance record for the tool and returns the GUID for the new object
"""
@@ -254,7 +256,7 @@ def generate_provenance_record(self, description, guids):
else:
return None
- def generate_header(self):
+ def generate_header(self) -> str:
"""
Generates the header for the tool and returns the GUID for the ObservableRelationships
"""
@@ -293,7 +295,7 @@ def generate_header(self):
return tool_guid
- def generate_investigation_action(self, source_guids, tool_guid):
+ def generate_investigation_action(self, source_guids: list, tool_guid: str):
"""
Builds the investigative action object as defined in the CASE ontology. This also takes in the start and end
datetimes from the analysis.
@@ -327,7 +329,7 @@ def generate_investigation_action(self, source_guids, tool_guid):
}
self.case['@graph'].append(action)
- def export_case_file(self, export_path='output/case.json'):
+ def export_case_file(self, export_path: str = 'output/case.json'):
"""
Exports the built CASE object to the path specified in the export_path parameter.
"""
diff --git a/sqlite_dissect/export/csv_export.py b/sqlite_dissect/export/csv_export.py
index dc82fe2..0a91665 100644
--- a/sqlite_dissect/export/csv_export.py
+++ b/sqlite_dissect/export/csv_export.py
@@ -6,7 +6,6 @@
from os.path import basename
from os.path import normpath
from os.path import sep
-from re import sub
from sqlite_dissect.constants import ILLEGAL_XML_CHARACTER_PATTERN
from sqlite_dissect.constants import LOGGER_NAME
from sqlite_dissect.constants import MASTER_SCHEMA_ROW_TYPE
@@ -52,7 +51,7 @@ def write_version(csv_file_name, export_directory, version, master_schema_entry_
if master_schema_entry.root_page_number:
fixed_file_name = basename(normpath(csv_file_name))
- fixed_master_schema_name = sub(" ", "_", master_schema_entry.name)
+ fixed_master_schema_name = master_schema_entry.name.replace(" ", "_")
csv_file_name = export_directory + sep + fixed_file_name + "-" + fixed_master_schema_name + ".csv"
logger.info("Writing CSV file: {}.".format(csv_file_name))
@@ -119,7 +118,7 @@ def write_version(csv_file_name, export_directory, version, master_schema_entry_
log_message = log_message.format(master_schema_entry.row_type, master_schema_entry.name,
master_schema_entry.table_name, master_schema_entry.sql)
- logger.warn(log_message)
+ logger.warning(log_message)
raise ExportError(log_message)
@staticmethod
@@ -231,15 +230,13 @@ def _write_b_tree_index_leaf_records(csv_writer, version, master_schema_entry, b
serial_type = record_column.serial_type
text_affinity = True if serial_type >= 13 and serial_type % 2 == 1 else False
value = record_column.value
- if value is None:
- pass
- elif isinstance(value, (bytearray, str)):
+ if isinstance(value, (bytearray, str)):
value = value.decode(version.database_text_encoding, "replace") if text_affinity else str(value)
try:
value.encode(UTF_8)
except UnicodeDecodeError:
value = value.decode(UTF_8, "replace")
- value = ILLEGAL_XML_CHARACTER_PATTERN.sub(" ", value)
+ value = ILLEGAL_XML_CHARACTER_PATTERN.replace(" ", value)
if value.startswith("="):
value = ' ' + value
cell_record_column_values.append(value)
@@ -370,15 +367,13 @@ def _write_b_tree_table_leaf_records(csv_writer, version, master_schema_entry, b
serial_type = record_column.serial_type
text_affinity = True if serial_type >= 13 and serial_type % 2 == 1 else False
value = record_column.value
- if value is None:
- pass
- elif isinstance(value, (bytearray, str)):
+ if isinstance(value, (bytearray, str)):
value = value.decode(version.database_text_encoding, "replace") if text_affinity else str(value)
try:
value = value.encode(UTF_8)
except UnicodeDecodeError:
value = value.decode(UTF_8, "replace").encode(UTF_8)
- value = ILLEGAL_XML_CHARACTER_PATTERN.sub(" ", value)
+ value = ILLEGAL_XML_CHARACTER_PATTERN.replace(" ", value)
if value.startswith("="):
value = ' ' + value
value = str(value)
@@ -416,15 +411,13 @@ def _write_b_tree_table_master_schema_carved_records(csv_writer, version, carved
serial_type = record_column.serial_type
text_affinity = True if serial_type >= 13 and serial_type % 2 == 1 else False
value = record_column.value
- if value is None:
- pass
- elif isinstance(value, (bytearray, str)):
+ if isinstance(value, (bytearray, str)):
value = value.decode(version.database_text_encoding, "replace") if text_affinity else str(value)
try:
value = value.encode(UTF_8)
except UnicodeDecodeError:
value = value.decode(UTF_8, "replace").encode(UTF_8)
- value = ILLEGAL_XML_CHARACTER_PATTERN.sub(" ", value)
+ value = ILLEGAL_XML_CHARACTER_PATTERN.replace(" ", value)
if value.startswith("="):
value = ' ' + value
value = str(value)
@@ -483,7 +476,7 @@ def write_commit(self, master_schema_entry, commit):
if not csv_file_name:
mode = "wb"
- commit_name = sub(" ", "_", commit.name)
+ commit_name = commit.name.replace(" ", "_")
csv_file_name = os.path.join(self._export_directory, (self._file_name_prefix + "-" + commit_name + ".csv"))
self._csv_file_names[commit.name] = csv_file_name
write_headers = True
@@ -563,7 +556,7 @@ def write_commit(self, master_schema_entry, commit):
log_message = "Invalid commit page type: {} found for csv export on master " \
"schema entry name: {} while writing to csv file name: {}."
log_message = log_message.format(commit.page_type, commit.name, csv_file_name)
- logger.warn(log_message)
+ logger.warning(log_message)
raise ExportError(log_message)
@staticmethod
@@ -649,15 +642,13 @@ def _write_cells(csv_writer, file_type, database_text_encoding, page_type, cells
serial_type = record_column.serial_type
text_affinity = True if serial_type >= 13 and serial_type % 2 == 1 else False
value = record_column.value
- if value is None:
- pass
- elif isinstance(value, (bytearray, str)):
+ if isinstance(value, (bytearray, str)):
value = value.decode(database_text_encoding, "replace") if text_affinity else str(value)
try:
value = value.encode(UTF_8)
except UnicodeDecodeError:
value = value.decode(UTF_8, "replace").encode(UTF_8)
- value = ILLEGAL_XML_CHARACTER_PATTERN.sub(" ", value)
+ value = ILLEGAL_XML_CHARACTER_PATTERN.replace(" ", value)
if value.startswith("="):
value = ' ' + value
value = str(value)
diff --git a/sqlite_dissect/export/sqlite_export.py b/sqlite_dissect/export/sqlite_export.py
index 3d120f5..304ced7 100644
--- a/sqlite_dissect/export/sqlite_export.py
+++ b/sqlite_dissect/export/sqlite_export.py
@@ -2,7 +2,6 @@
from os import rename
from os.path import exists
from os.path import sep
-from re import sub
from sqlite3 import connect
from sqlite3 import sqlite_version
from sqlite3 import version
@@ -168,7 +167,7 @@ def write_commit(self, master_schema_entry, commit):
"found for sqlite export on master schema entry name: {} page type: {} " \
"while writing to sqlite file name: {}."
log_message = log_message.format(len(cells), commit.name, commit.page_type, self._sqlite_file_name)
- logger.warn(log_message)
+ logger.warning(log_message)
raise ExportError(log_message)
number_of_columns = len(cells[0].payload.record_columns)
@@ -177,7 +176,7 @@ def write_commit(self, master_schema_entry, commit):
index_column_headers.append("Column {}".format(i))
column_headers.extend(index_column_headers)
- column_headers = [sub(" ", "_", column_header).lower() for column_header in column_headers]
+ column_headers = [column_header.replace(" ", "_").lower() for column_header in column_headers]
elif commit.page_type == PAGE_TYPE.B_TREE_TABLE_LEAF:
@@ -194,7 +193,7 @@ def write_commit(self, master_schema_entry, commit):
updated_column_headers = []
for column_header in column_headers:
- updated_column_header_name = "sd_" + sub(" ", "_", column_header).lower()
+ updated_column_header_name = "sd_" + column_header.replace(" ", "_").lower()
while updated_column_header_name in column_definitions:
updated_column_header_name = "sd_" + updated_column_header_name
updated_column_headers.append(updated_column_header_name)
@@ -207,7 +206,7 @@ def write_commit(self, master_schema_entry, commit):
log_message = "Invalid commit page type: {} found for sqlite export on master " \
"schema entry name: {} while writing to sqlite file name: {}."
log_message = log_message.format(commit.page_type, commit.name, self._sqlite_file_name)
- logger.warn(log_message)
+ logger.warning(log_message)
raise ExportError(log_message)
create_table_statement = "CREATE TABLE {} ({})"
@@ -266,7 +265,7 @@ def write_commit(self, master_schema_entry, commit):
log_message = "Invalid commit page type: {} found for sqlite export on master " \
"schema entry name: {} while writing to sqlite file name: {}."
log_message = log_message.format(commit.page_type, commit.name, self._sqlite_file_name)
- logger.warn(log_message)
+ logger.warning(log_message)
raise ExportError(log_message)
"""
@@ -298,14 +297,14 @@ def _write_cells(connection, table_name, column_count, file_type,
algorithm internal to SQLite to slightly change. Despite this, we make the following modifications in
order to best ensure data integrity when writing the data back to the SQLite file:
1.) If the value is a bytearray, the value is interpreted as a blob object. In order to write this
- back correctly, we set it to buffer(value) in order to write it back to the SQLite database as
+ back correctly, we set it to memoryview(value) in order to write it back to the SQLite database as
a blob object. Before we write it back, we make sure that the object does not have text affinity,
or if it does we decode it in the database text encoding before writing it.
2.) If the value is a string, we encode it using UTF-8. If this fails, that means it had characters
not supported by the unicode encoding which caused it to fail. Since we are writing back carved
records that may have invalid characters in strings due to parts being overwritten or false
positives, this can occur a lot. Therefore, if the unicode encoding fails, we do the same
- as above for blob objects and create a buffer(value) blob object and write that back to the
+ as above for blob objects and create a memoryview(value) blob object and write that back to the
database in order to maintain the original data. Therefore, in some tables, depending on the
data parsed or strings retrieved may be stored in either a string (text) or blob storage class.
3.) If the value does not fall in one of the above use cases, we leave it as is and write it back to the
@@ -348,19 +347,17 @@ def _write_cells(connection, table_name, column_count, file_type,
text_affinity = True if serial_type >= 13 and serial_type % 2 == 1 else False
value = record_column.value
- if value is None:
- pass
- elif isinstance(value, bytearray):
+ if isinstance(value, bytearray):
if text_affinity:
value = value.decode(database_text_encoding, "replace")
else:
- value = buffer(value)
+ value = memoryview(value)
elif isinstance(value, str):
try:
if text_affinity:
value = value.decode(database_text_encoding, "replace")
else:
- value = buffer(value)
+ value = memoryview(value)
except UnicodeDecodeError:
"""
@@ -374,7 +371,7 @@ def _write_cells(connection, table_name, column_count, file_type,
"""
- value = buffer(value)
+ value = memoryview(value)
cell_record_column_values.append(value)
diff --git a/sqlite_dissect/export/text_export.py b/sqlite_dissect/export/text_export.py
index ba53927..bd842dd 100644
--- a/sqlite_dissect/export/text_export.py
+++ b/sqlite_dissect/export/text_export.py
@@ -25,10 +25,8 @@ class CommitConsoleExporter(object):
@staticmethod
def write_header(master_schema_entry, page_type):
- header = "\nMaster schema entry: {} row type: {} on page type: {} with sql: {}."
- header = header.format(master_schema_entry.name, master_schema_entry.row_type,
- page_type, master_schema_entry.sql)
- print(header)
+ print(f"\nMaster schema entry: {master_schema_entry.name} row type: {master_schema_entry.row_type} on page "
+ f"type: {page_type} with sql: {page_type}.")
@staticmethod
def write_commit(commit):
@@ -87,7 +85,7 @@ def write_commit(commit):
log_message = "Invalid commit page type: {} found for text export on master " \
"schema entry name: {} while writing to sqlite file name: {}."
log_message = log_message.format(commit.page_type, commit.name)
- logger.warn(log_message)
+ logger.warning(log_message)
raise ExportError(log_message)
@staticmethod
@@ -144,7 +142,6 @@ def __enter__(self):
# Check if the file exists and if it does rename it
if exists(self._text_file_name):
-
# Generate a uuid to append to the file name
new_file_name_for_existing_file = self._text_file_name + "-" + str(uuid4())
@@ -224,7 +221,7 @@ def write_commit(self, commit):
log_message = "Invalid commit page type: {} found for text export on master " \
"schema entry name: {}."
log_message = log_message.format(commit.page_type, commit.name, self._text_file_name)
- logger.warn(log_message)
+ logger.warning(log_message)
raise ExportError(log_message)
@staticmethod
diff --git a/sqlite_dissect/export/xlsx_export.py b/sqlite_dissect/export/xlsx_export.py
index d8c9c8f..c05ee62 100644
--- a/sqlite_dissect/export/xlsx_export.py
+++ b/sqlite_dissect/export/xlsx_export.py
@@ -147,7 +147,7 @@ def write_commit(self, master_schema_entry, commit):
"which is greater than the 31 allowed characters while writing to xlsx file name: {}."
log_message = log_message.format(commit.name, commit.page_type, len(commit.name),
self._xlsx_file_name)
- logger.warn(log_message)
+ logger.warning(log_message)
raise ExportError(log_message)
sheet = self._sheets[sheet_name] if sheet_name in self._sheets else None
@@ -229,7 +229,7 @@ def write_commit(self, master_schema_entry, commit):
log_message = "Invalid commit page type: {} found for xlsx export on master " \
"schema entry name: {} while writing to xlsx file name: {}."
log_message = log_message.format(commit.page_type, commit.name, self._xlsx_file_name)
- logger.warn(log_message)
+ logger.warning(log_message)
raise ExportError(log_message)
@staticmethod
diff --git a/sqlite_dissect/file/database/database.py b/sqlite_dissect/file/database/database.py
index f440394..6249a07 100644
--- a/sqlite_dissect/file/database/database.py
+++ b/sqlite_dissect/file/database/database.py
@@ -1,11 +1,11 @@
from copy import copy
from warnings import warn
-from sqlite_dissect.constants import BASE_VERSION_NUMBER
-from sqlite_dissect.constants import FILE_TYPE
-from sqlite_dissect.constants import FIRST_FREELIST_TRUNK_PAGE_INDEX
-from sqlite_dissect.constants import FIRST_FREELIST_TRUNK_PARENT_PAGE_NUMBER
-from sqlite_dissect.constants import SQLITE_3_7_0_VERSION_NUMBER
-from sqlite_dissect.constants import SQLITE_MASTER_SCHEMA_ROOT_PAGE
+
+from sqlite_dissect.constants import (BASE_VERSION_NUMBER, FILE_TYPE,
+ FIRST_FREELIST_TRUNK_PAGE_INDEX,
+ FIRST_FREELIST_TRUNK_PARENT_PAGE_NUMBER,
+ SQLITE_3_7_0_VERSION_NUMBER,
+ SQLITE_MASTER_SCHEMA_ROOT_PAGE)
from sqlite_dissect.exception import DatabaseParsingError
from sqlite_dissect.file.database.page import FreelistTrunkPage
from sqlite_dissect.file.database.utilities import create_pointer_map_pages
@@ -98,7 +98,7 @@ def __init__(self, file_identifier, store_in_memory=False, file_size=None, stric
raise DatabaseParsingError(log_message)
# Calculate the number of pages from the file size and page size
- self.database_size_in_pages = self.file_handle.file_size / self.page_size
+ self.database_size_in_pages = self.file_handle.file_size // self.page_size
# The database header size in pages is set and the version valid for number does not equal the change counter
elif self.database_header.version_valid_for_number != self.database_header.file_change_counter:
@@ -111,7 +111,7 @@ def __init__(self, file_identifier, store_in_memory=False, file_size=None, stric
"""
# Calculate the number of pages from the file size and page size
- self.database_size_in_pages = self.file_handle.file_size / self.page_size
+ self.database_size_in_pages = self.file_handle.file_size // self.page_size
log_message = "Database header for version: {} specifies a database size in pages of {} but version " \
"valid for number: {} does not equal the file change counter: {} for sqlite " \
@@ -121,7 +121,7 @@ def __init__(self, file_identifier, store_in_memory=False, file_size=None, stric
self.database_header.file_change_counter,
self.database_header.sqlite_version_number,
self.database_size_in_pages)
- self._logger.warn(log_message)
+ self._logger.warning(log_message)
warn(log_message, RuntimeWarning)
# The database header size in pages is set and the version valid for number does equals the change counter
@@ -145,7 +145,7 @@ def __init__(self, file_identifier, store_in_memory=False, file_size=None, stric
"""
- calculated_size_in_pages = self.file_handle.file_size / self.page_size
+ calculated_size_in_pages = self.file_handle.file_size // self.page_size
if self.database_header.database_size_in_pages != calculated_size_in_pages:
@@ -157,7 +157,7 @@ def __init__(self, file_identifier, store_in_memory=False, file_size=None, stric
"pages will remain unchanged but possibly erroneous use cases may occur when parsing."
log_message = log_message.format(self.version_number, self.database_header.database_size_in_pages,
calculated_size_in_pages, self.database_header.sqlite_version_number)
- self._logger.warn(log_message)
+ self._logger.warning(log_message)
warn(log_message, RuntimeWarning)
else:
@@ -174,7 +174,7 @@ def __init__(self, file_identifier, store_in_memory=False, file_size=None, stric
"""
- self.updated_page_numbers = [page_index + 1 for page_index in range(self.database_size_in_pages)]
+ self.updated_page_numbers = [page_index + 1 for page_index in range(int(self.database_size_in_pages))]
self.page_version_index = dict(map(lambda x: [x, self.version_number], self.updated_page_numbers))
self._logger.debug("Updated page numbers initialized as: {} in version: {}.".format(self.updated_page_numbers,
diff --git a/sqlite_dissect/file/database/header.py b/sqlite_dissect/file/database/header.py
index 728e5f0..d0c7277 100644
--- a/sqlite_dissect/file/database/header.py
+++ b/sqlite_dissect/file/database/header.py
@@ -2,7 +2,6 @@
from binascii import hexlify
from logging import getLogger
from re import compile
-from re import sub
from struct import error
from struct import unpack
from warnings import warn
@@ -12,7 +11,6 @@
from sqlite_dissect.constants import LEAF_PAYLOAD_FRACTION
from sqlite_dissect.constants import LOGGER_NAME
from sqlite_dissect.constants import MAGIC_HEADER_STRING
-from sqlite_dissect.constants import MAGIC_HEADER_STRING_ENCODING
from sqlite_dissect.constants import MASTER_PAGE_HEX_ID
from sqlite_dissect.constants import MAXIMUM_EMBEDDED_PAYLOAD_FRACTION
from sqlite_dissect.constants import MAXIMUM_PAGE_SIZE
@@ -68,7 +66,7 @@ def __init__(self, database_header_byte_array):
logger.error("Failed to retrieve the magic header.")
raise
- if self.magic_header_string != MAGIC_HEADER_STRING.decode(MAGIC_HEADER_STRING_ENCODING):
+ if self.magic_header_string != MAGIC_HEADER_STRING:
log_message = "The magic header string is invalid."
logger.error(log_message)
raise HeaderParsingError(log_message)
@@ -211,7 +209,7 @@ def __init__(self, database_header_byte_array):
"""
log_message = "Schema format number and database text encoding are 0 indicating no schema or data."
- logger.warn(log_message)
+ logger.warning(log_message)
warn(log_message, RuntimeWarning)
else:
@@ -260,7 +258,7 @@ def __init__(self, database_header_byte_array):
self.reserved_for_expansion = database_header_byte_array[72:92]
pattern = compile(RESERVED_FOR_EXPANSION_REGEX)
- reserved_for_expansion_hex = hexlify(self.reserved_for_expansion)
+ reserved_for_expansion_hex = hexlify(self.reserved_for_expansion).decode()
if not pattern.match(reserved_for_expansion_hex):
log_message = "Header space reserved for expansion is not zero: {}.".format(reserved_for_expansion_hex)
logger.error(log_message)
@@ -356,10 +354,10 @@ def __init__(self, page, header_length):
self.md5_hex_digest = get_md5_hash(page[self.offset:self.header_length])
def __repr__(self):
- return self.__str__().encode("hex")
+ return self.__str__()
def __str__(self):
- return sub("\t", "", sub("\n", " ", self.stringify()))
+ return self.stringify().replace('\t', '').replace('\n', ' ')
def stringify(self, padding=""):
string = padding + "Contains SQLite Database Header: {}\n" \
diff --git a/sqlite_dissect/file/database/page.py b/sqlite_dissect/file/database/page.py
index 9cafa35..d915e62 100644
--- a/sqlite_dissect/file/database/page.py
+++ b/sqlite_dissect/file/database/page.py
@@ -1,57 +1,48 @@
from abc import ABCMeta
from binascii import hexlify
from logging import getLogger
-from re import sub
from struct import unpack
from warnings import warn
-from sqlite_dissect.constants import CELL_LOCATION
-from sqlite_dissect.constants import CELL_MODULE
-from sqlite_dissect.constants import CELL_POINTER_BYTE_LENGTH
-from sqlite_dissect.constants import CELL_SOURCE
-from sqlite_dissect.constants import FIRST_OVERFLOW_PAGE_INDEX
-from sqlite_dissect.constants import FIRST_OVERFLOW_PAGE_NUMBER_LENGTH
-from sqlite_dissect.constants import FIRST_OVERFLOW_PARENT_PAGE_NUMBER
-from sqlite_dissect.constants import FREEBLOCK_BYTE_LENGTH
-from sqlite_dissect.constants import FREELIST_HEADER_LENGTH
-from sqlite_dissect.constants import FREELIST_LEAF_PAGE_NUMBER_LENGTH
-from sqlite_dissect.constants import FREELIST_NEXT_TRUNK_PAGE_LENGTH
-from sqlite_dissect.constants import INDEX_INTERIOR_CELL_CLASS
-from sqlite_dissect.constants import INDEX_INTERIOR_PAGE_HEX_ID
-from sqlite_dissect.constants import INDEX_LEAF_CELL_CLASS
-from sqlite_dissect.constants import INDEX_LEAF_PAGE_HEX_ID
-from sqlite_dissect.constants import INTERIOR_PAGE_HEADER_CLASS
-from sqlite_dissect.constants import LEAF_PAGE_HEADER_CLASS
-from sqlite_dissect.constants import LEFT_CHILD_POINTER_BYTE_LENGTH
-from sqlite_dissect.constants import LOGGER_NAME
-from sqlite_dissect.constants import MASTER_PAGE_HEX_ID
-from sqlite_dissect.constants import NEXT_FREEBLOCK_OFFSET_LENGTH
-from sqlite_dissect.constants import OVERFLOW_HEADER_LENGTH
-from sqlite_dissect.constants import PAGE_FRAGMENT_LIMIT
-from sqlite_dissect.constants import PAGE_HEADER_MODULE
-from sqlite_dissect.constants import PAGE_TYPE
-from sqlite_dissect.constants import PAGE_TYPE_LENGTH
-from sqlite_dissect.constants import POINTER_MAP_B_TREE_NON_ROOT_PAGE_TYPE
-from sqlite_dissect.constants import POINTER_MAP_B_TREE_ROOT_PAGE_TYPE
-from sqlite_dissect.constants import POINTER_MAP_ENTRY_LENGTH
-from sqlite_dissect.constants import POINTER_MAP_FREELIST_PAGE_TYPE
-from sqlite_dissect.constants import POINTER_MAP_OVERFLOW_FIRST_PAGE_TYPE
-from sqlite_dissect.constants import POINTER_MAP_OVERFLOW_FOLLOWING_PAGE_TYPE
-from sqlite_dissect.constants import POINTER_MAP_PAGE_TYPES
-from sqlite_dissect.constants import SQLITE_DATABASE_HEADER_LENGTH
-from sqlite_dissect.constants import SQLITE_MASTER_SCHEMA_ROOT_PAGE
-from sqlite_dissect.constants import TABLE_INTERIOR_CELL_CLASS
-from sqlite_dissect.constants import TABLE_INTERIOR_PAGE_HEX_ID
-from sqlite_dissect.constants import TABLE_LEAF_CELL_CLASS
-from sqlite_dissect.constants import TABLE_LEAF_PAGE_HEX_ID
-from sqlite_dissect.constants import ZERO_BYTE
-from sqlite_dissect.exception import BTreePageParsingError
-from sqlite_dissect.exception import CellParsingError
-from sqlite_dissect.exception import PageParsingError
-from sqlite_dissect.file.database.payload import decode_varint
-from sqlite_dissect.file.database.payload import Record
-from sqlite_dissect.utilities import calculate_expected_overflow
-from sqlite_dissect.utilities import get_class_instance
-from sqlite_dissect.utilities import get_md5_hash
+
+from sqlite_dissect.constants import (CELL_LOCATION, CELL_MODULE,
+ CELL_POINTER_BYTE_LENGTH, CELL_SOURCE,
+ FIRST_OVERFLOW_PAGE_INDEX,
+ FIRST_OVERFLOW_PAGE_NUMBER_LENGTH,
+ FIRST_OVERFLOW_PARENT_PAGE_NUMBER,
+ FREEBLOCK_BYTE_LENGTH,
+ FREELIST_HEADER_LENGTH,
+ FREELIST_LEAF_PAGE_NUMBER_LENGTH,
+ FREELIST_NEXT_TRUNK_PAGE_LENGTH,
+ INDEX_INTERIOR_CELL_CLASS,
+ INDEX_INTERIOR_PAGE_HEX_ID,
+ INDEX_LEAF_CELL_CLASS,
+ INDEX_LEAF_PAGE_HEX_ID,
+ INTERIOR_PAGE_HEADER_CLASS,
+ LEAF_PAGE_HEADER_CLASS,
+ LEFT_CHILD_POINTER_BYTE_LENGTH,
+ LOGGER_NAME, MASTER_PAGE_HEX_ID,
+ NEXT_FREEBLOCK_OFFSET_LENGTH,
+ OVERFLOW_HEADER_LENGTH,
+ PAGE_FRAGMENT_LIMIT, PAGE_HEADER_MODULE,
+ PAGE_TYPE, PAGE_TYPE_LENGTH,
+ POINTER_MAP_B_TREE_NON_ROOT_PAGE_TYPE,
+ POINTER_MAP_B_TREE_ROOT_PAGE_TYPE,
+ POINTER_MAP_ENTRY_LENGTH,
+ POINTER_MAP_FREELIST_PAGE_TYPE,
+ POINTER_MAP_OVERFLOW_FIRST_PAGE_TYPE,
+ POINTER_MAP_OVERFLOW_FOLLOWING_PAGE_TYPE,
+ POINTER_MAP_PAGE_TYPES,
+ SQLITE_DATABASE_HEADER_LENGTH,
+ SQLITE_MASTER_SCHEMA_ROOT_PAGE,
+ TABLE_INTERIOR_CELL_CLASS,
+ TABLE_INTERIOR_PAGE_HEX_ID,
+ TABLE_LEAF_CELL_CLASS,
+ TABLE_LEAF_PAGE_HEX_ID, ZERO_BYTE)
+from sqlite_dissect.exception import (BTreePageParsingError, CellParsingError,
+ PageParsingError)
+from sqlite_dissect.file.database.payload import Record, decode_varint
+from sqlite_dissect.utilities import (calculate_expected_overflow,
+ get_class_instance, get_md5_hash)
"""
@@ -112,10 +103,10 @@ def __init__(self, version_interface, number):
self.unallocated_space_end_offset = None
def __repr__(self):
- return self.__str__().encode("hex")
+ return self.__str__()
def __str__(self):
- return sub("\t", "", sub("\n", " ", self.stringify()))
+ return self.stringify().replace('\t', '').replace('\n', ' ')
def stringify(self, padding=""):
string = padding + "Version Number: {}\n" \
@@ -302,7 +293,7 @@ def __init__(self, version_interface, number, parent_freelist_trunk_page_number,
FREELIST_HEADER_LENGTH])[0]
self.freelist_leaf_page_numbers = []
self.freelist_leaf_pages = []
- for index in range(self.number_of_leaf_page_pointers):
+ for index in range(int(self.number_of_leaf_page_pointers)):
start_offset = index * FREELIST_LEAF_PAGE_NUMBER_LENGTH + FREELIST_HEADER_LENGTH
end_offset = start_offset + FREELIST_LEAF_PAGE_NUMBER_LENGTH
freelist_leaf_page_number = unpack(b">I", page[start_offset:end_offset])[0]
@@ -424,7 +415,7 @@ def __init__(self, version_interface, number, number_of_entries):
self.md5_hex_digest = get_md5_hash(page)
self.pointer_map_entries = []
- for index in range(self.number_of_entries):
+ for index in range(int(self.number_of_entries)):
offset = index * POINTER_MAP_ENTRY_LENGTH
@@ -551,10 +542,10 @@ def __init__(self, index, offset, page_number, page_type, parent_page_number, md
self.md5_hex_digest = md5_hex_digest
def __repr__(self):
- return self.__str__().encode("hex")
+ return self.__str__()
def __str__(self):
- return sub("\t", "", sub("\n", " ", self.stringify()))
+ return self.stringify().replace('\t', '').replace('\n', ' ')
def stringify(self, padding=""):
string = padding + "Index: {}\n" \
@@ -582,10 +573,10 @@ def __init__(self, version_interface, number, header_class_name, cell_class_name
page = self._version_interface.get_page_data(self.number)
self.page_type = None
- self.hex_type = page[0]
+ self.hex_type = chr(page[0]).encode()
if self.hex_type == MASTER_PAGE_HEX_ID:
- master_page_hex_type = page[SQLITE_DATABASE_HEADER_LENGTH]
+ master_page_hex_type = chr(page[SQLITE_DATABASE_HEADER_LENGTH]).encode()
if master_page_hex_type == TABLE_INTERIOR_PAGE_HEX_ID:
self.page_type = PAGE_TYPE.B_TREE_TABLE_INTERIOR
elif master_page_hex_type == TABLE_LEAF_PAGE_HEX_ID:
@@ -593,7 +584,7 @@ def __init__(self, version_interface, number, header_class_name, cell_class_name
else:
log_message = "Page hex type for master page is: {} and not a table interior or table leaf page as " \
"expected in b-tree page: {} in page version: {} for version: {}."
- log_message = log_message.format(hexlify(master_page_hex_type), self.number,
+ log_message = log_message.format(master_page_hex_type, self.number,
self.page_version_number, self.version_number)
self._logger.error(log_message)
raise BTreePageParsingError(log_message)
@@ -609,7 +600,7 @@ def __init__(self, version_interface, number, header_class_name, cell_class_name
else:
log_message = "Page hex type: {} is not a valid b-tree page type for b-tree page: {} in page version: {} " \
"for version: {}."
- log_message = log_message.format(hexlify(self.hex_type), self.number, self.page_version_number,
+ log_message = log_message.format(hex(self.hex_type), self.number, self.page_version_number,
self.version_number)
self._logger.error(log_message)
raise BTreePageParsingError(log_message)
@@ -657,7 +648,7 @@ def __init__(self, version_interface, number, header_class_name, cell_class_name
self.cells = []
self.calculated_cell_total_byte_size = 0
- for cell_index in range(self.header.number_of_cells_on_page):
+ for cell_index in range(int(self.header.number_of_cells_on_page)):
cell_start_offset = cell_pointer_array_offset + cell_index * CELL_POINTER_BYTE_LENGTH
cell_end_offset = cell_start_offset + CELL_POINTER_BYTE_LENGTH
cell_offset = unpack(b">H", page[cell_start_offset:cell_end_offset])[0]
@@ -907,10 +898,10 @@ def __init__(self, version_interface, page_version_number, file_offset, page_num
self.md5_hex_digest = None
def __repr__(self):
- return self.__str__().encode("hex")
+ return self.__str__()
def __str__(self):
- return sub("\t", "", sub("\n", " ", self.stringify()))
+ return self.stringify().replace('\t', '').replace('\n', ' ')
def stringify(self, padding=""):
string = padding + "Version Number: {}\n" \
@@ -1058,7 +1049,7 @@ def __init__(self, version_interface, page_version_number, file_offset, page_num
self.bytes_on_first_page = p
if p > u - 35:
- m = (((u - 12) * 32) / 255) - 23
+ m = (((u - 12) * 32) // 255) - 23
self.bytes_on_first_page = m + ((p - m) % (u - 4))
if self.bytes_on_first_page > u - 35:
self.bytes_on_first_page = m
@@ -1240,7 +1231,7 @@ def __init__(self, version_interface, page_version_number, file_offset, page_num
u = self._page_size
p = self.payload_byte_size
- x = (((u - 12) * 64) / 255) - 23
+ x = (((u - 12) * 64) // 255) - 23
"""
@@ -1274,7 +1265,7 @@ def __init__(self, version_interface, page_version_number, file_offset, page_num
self.bytes_on_first_page = p
if p > x:
- m = (((u - 12) * 32) / 255) - 23
+ m = (((u - 12) * 32) // 255) - 23
self.bytes_on_first_page = m + ((p - m) % (u - 4))
if self.bytes_on_first_page > x:
self.bytes_on_first_page = m
@@ -1483,7 +1474,7 @@ def __init__(self, version_interface, page_version_number, file_offset, page_num
u = self._page_size
p = self.payload_byte_size
- x = (((u - 12) * 64) / 255) - 23
+ x = (((u - 12) * 64) // 255) - 23
"""
@@ -1517,7 +1508,7 @@ def __init__(self, version_interface, page_version_number, file_offset, page_num
self.bytes_on_first_page = p
if p > x:
- m = (((u - 12) * 32) / 255) - 23
+ m = (((u - 12) * 32) // 255) - 23
self.bytes_on_first_page = m + ((p - m) % (u - 4))
if self.bytes_on_first_page > x:
self.bytes_on_first_page = m
diff --git a/sqlite_dissect/file/database/payload.py b/sqlite_dissect/file/database/payload.py
index 4a71e65..290690f 100644
--- a/sqlite_dissect/file/database/payload.py
+++ b/sqlite_dissect/file/database/payload.py
@@ -1,7 +1,6 @@
from abc import ABCMeta
from binascii import hexlify
from logging import getLogger
-from re import sub
from sqlite_dissect.constants import LOGGER_NAME
from sqlite_dissect.exception import RecordParsingError
from sqlite_dissect.utilities import decode_varint
@@ -51,10 +50,10 @@ def __init__(self):
self.serial_type_signature = ""
def __repr__(self):
- return self.__str__().encode("hex")
+ return self.__str__()
def __str__(self):
- return sub("\t", "", sub("\n", " ", self.stringify()))
+ return self.stringify().replace('\t', '').replace('\n', ' ')
def stringify(self, padding="", print_record_columns=True):
string = padding + "Start Offset: {}\n" \
@@ -201,10 +200,10 @@ def __init__(self, index, serial_type, serial_type_varint_length, content_size,
self.md5_hex_digest = md5_hex_digest
def __repr__(self):
- return self.__str__().encode("hex")
+ return self.__str__()
def __str__(self):
- return sub("\t", "", sub("\n", " ", self.stringify()))
+ return self.stringify().replace('\t', '').replace('\n', ' ')
def stringify(self, padding=""):
string = padding + "Index: {}\n" \
diff --git a/sqlite_dissect/file/file_handle.py b/sqlite_dissect/file/file_handle.py
index 7791ceb..da7407e 100644
--- a/sqlite_dissect/file/file_handle.py
+++ b/sqlite_dissect/file/file_handle.py
@@ -1,6 +1,5 @@
import os
from logging import getLogger
-from re import sub
from warnings import warn
from sqlite_dissect.constants import FILE_TYPE
from sqlite_dissect.constants import LOCK_BYTE_PAGE_START_OFFSET
@@ -68,7 +67,8 @@ def __init__(self, file_type, file_identifier, database_text_encoding=None, file
self.file_externally_controlled = False
self._database_text_encoding = database_text_encoding
- if isinstance(file_identifier, basestring):
+ xbasestring = (str, bytes)
+ if isinstance(file_identifier, xbasestring):
"""
@@ -178,10 +178,10 @@ def __init__(self, file_type, file_identifier, database_text_encoding=None, file
raise ValueError(log_message)
def __repr__(self):
- return self.__str__().encode("hex")
+ return self.__str__()
def __str__(self):
- return sub("\t", "", sub("\n", " ", self.stringify()))
+ return self.stringify().replace('\t', '').replace('\n', ' ')
def stringify(self, padding="", print_header=True):
string = padding + "File Type: {}\n" \
@@ -221,7 +221,7 @@ def close(self):
if self.file_externally_controlled:
log_message = "Ignored request to close externally controlled file."
- self._logger.warn(log_message)
+ self._logger.warning(log_message)
warn(log_message, RuntimeWarning)
else:
diff --git a/sqlite_dissect/file/header.py b/sqlite_dissect/file/header.py
index e68471e..e1a0b08 100644
--- a/sqlite_dissect/file/header.py
+++ b/sqlite_dissect/file/header.py
@@ -1,7 +1,6 @@
from abc import ABCMeta
from abc import abstractmethod
from logging import getLogger
-from re import sub
from sqlite_dissect.constants import LOGGER_NAME
"""
@@ -30,10 +29,10 @@ def __init__(self):
self.md5_hex_digest = None
def __repr__(self):
- return self.__str__().encode("hex")
+ return self.__str__()
def __str__(self):
- return sub("\t", "", sub("\n", " ", self.stringify()))
+ return self.stringify().replace('\t', '').replace('\n', ' ')
@abstractmethod
def stringify(self, padding=""):
diff --git a/sqlite_dissect/file/journal/header.py b/sqlite_dissect/file/journal/header.py
index e093c8b..8019d37 100644
--- a/sqlite_dissect/file/journal/header.py
+++ b/sqlite_dissect/file/journal/header.py
@@ -1,7 +1,6 @@
from binascii import hexlify
from logging import getLogger
from struct import unpack
-from re import sub
from warnings import warn
from sqlite_dissect.constants import LOGGER_NAME
from sqlite_dissect.constants import ROLLBACK_JOURNAL_ALL_CONTENT_UNTIL_END_OF_FILE
@@ -40,8 +39,7 @@ def __init__(self, rollback_journal_header_byte_array):
self.header_string = rollback_journal_header_byte_array[0:8]
- if self.header_string != ROLLBACK_JOURNAL_HEADER_HEX_STRING.decode("hex"):
-
+ if self.header_string != ROLLBACK_JOURNAL_HEADER_HEX_STRING:
"""
Instead of throwing an error here, a warning is thrown instead. This is due to the fact that the header
@@ -52,12 +50,12 @@ def __init__(self, rollback_journal_header_byte_array):
"""
log_message = "The header string is invalid."
- logger.warn(log_message)
+ logger.warning(log_message)
warn(log_message, RuntimeWarning)
self.page_count = unpack(b">I", rollback_journal_header_byte_array[8:12])[0]
- if rollback_journal_header_byte_array[8:12] == ROLLBACK_JOURNAL_HEADER_ALL_CONTENT.decode("hex"):
+ if rollback_journal_header_byte_array[8:12] == ROLLBACK_JOURNAL_HEADER_ALL_CONTENT:
self.page_count = ROLLBACK_JOURNAL_ALL_CONTENT_UNTIL_END_OF_FILE
self.random_nonce_for_checksum = unpack(b">I", rollback_journal_header_byte_array[12:16])[0]
@@ -85,14 +83,8 @@ def stringify(self, padding=""):
class RollbackJournalPageRecordHeader(object):
- def __init__(self):
- pass
-
def __repr__(self):
- return self.__str__().encode("hex")
+ return self.__str__()
def __str__(self):
- return sub("\t", "", sub("\n", " ", self.stringify()))
-
- def stringify(self, padding=""):
- pass
+ return self.stringify().replace('\t', '').replace('\n', ' ')
diff --git a/sqlite_dissect/file/journal/jounal.py b/sqlite_dissect/file/journal/jounal.py
index 97aa12a..3535b39 100644
--- a/sqlite_dissect/file/journal/jounal.py
+++ b/sqlite_dissect/file/journal/jounal.py
@@ -1,4 +1,3 @@
-from re import sub
from sqlite_dissect.constants import FILE_TYPE
from sqlite_dissect.file.file_handle import FileHandle
@@ -21,10 +20,10 @@ def __init__(self, file_identifier, file_size=None):
self.file_handle = FileHandle(FILE_TYPE.ROLLBACK_JOURNAL, file_identifier, file_size=file_size)
def __repr__(self):
- return self.__str__().encode("hex")
+ return self.__str__()
def __str__(self):
- return sub("\t", "", sub("\n", " ", self.stringify()))
+ return self.stringify().replace('\t', '').replace('\n', ' ')
def stringify(self, padding=""):
string = padding + "File Handle:\n{}"
diff --git a/sqlite_dissect/file/schema/column.py b/sqlite_dissect/file/schema/column.py
index 99f4ac7..f73f2c3 100644
--- a/sqlite_dissect/file/schema/column.py
+++ b/sqlite_dissect/file/schema/column.py
@@ -430,7 +430,7 @@ def _get_data_type(derived_data_type):
derived_data_type = sub("\(.*\)$", "", derived_data_type)
# Replace spaces with underscores
- derived_data_type = sub(" ", "_", derived_data_type)
+ derived_data_type = derived_data_type.replace(" ", "_")
for data_type in DATA_TYPE:
@@ -558,10 +558,10 @@ def _is_column_constraint_preface(segment):
return False
def __repr__(self):
- return self.__str__().encode("hex")
+ return self.__str__()
def __str__(self):
- return sub("\t", "", sub("\n", " ", self.stringify()))
+ return self.stringify().replace('\t', '').replace('\n', ' ')
def stringify(self, padding="", print_column_constraints=True):
string = padding + "Column Text: {}\n" \
@@ -593,10 +593,10 @@ def __init__(self, index, constraint):
self.constraint = constraint
def __repr__(self):
- return self.__str__().encode("hex")
+ return self.__str__()
def __str__(self):
- return sub("\t", "", sub("\n", " ", self.stringify()))
+ return self.stringify().replace('\t', '').replace('\n', ' ')
def stringify(self, padding=""):
string = padding + "Index: {}\n" \
diff --git a/sqlite_dissect/file/schema/master.py b/sqlite_dissect/file/schema/master.py
index 8b9e5d0..828dba3 100644
--- a/sqlite_dissect/file/schema/master.py
+++ b/sqlite_dissect/file/schema/master.py
@@ -56,7 +56,6 @@
class MasterSchema(object):
-
MasterSchemaEntryData = namedtuple("MasterSchemaEntryData",
"record_columns row_type sql b_tree_table_leaf_page_number cell")
@@ -291,10 +290,10 @@ def __init__(self, version_interface, root_page):
self.master_schema_page_numbers = [master_schema_page.number for master_schema_page in self.master_schema_pages]
def __repr__(self):
- return self.__str__().encode("hex")
+ return self.__str__()
def __str__(self):
- return sub("\t", "", sub("\n", " ", self.stringify()))
+ return self.stringify().replace('\t', '').replace('\n', ' ')
def stringify(self, padding="", print_master_schema_root_page=True,
print_master_schema_entries=True, print_b_tree_root_pages=True):
@@ -422,7 +421,7 @@ def _parse_table_interior(b_tree_table_interior_page, database_text_encoding):
logger.error(log_message)
raise MasterSchemaParsingError(log_message)
- for row_type, row_type_data in returned_master_schema_entry_data.iteritems():
+ for row_type, row_type_data in returned_master_schema_entry_data.items():
if row_type in master_schema_entry_data:
master_schema_entry_data[row_type].extend(row_type_data)
else:
@@ -470,7 +469,6 @@ def _parse_table_leaf(b_tree_table_leaf_page, database_text_encoding):
class MasterSchemaRow(object):
-
__metaclass__ = ABCMeta
@abstractmethod
@@ -589,15 +587,15 @@ def __init__(self, version_interface, b_tree_table_leaf_page_number, b_tree_tabl
"""
- master_schema_entry_identifier_string = "{}{}{}{}".format(self.row_id, self.row_type, self.name,
+ master_schema_entry_identifier_string = "{}{}{}{}{}".format(self.row_id, self.row_type, self.name,
self.table_name, self.sql)
self.md5_hash_identifier = get_md5_hash(master_schema_entry_identifier_string)
def __repr__(self):
- return self.__str__().encode("hex")
+ return self.__str__()
def __str__(self):
- return sub("\t", "", sub("\n", " ", self.stringify()))
+ return self.stringify().replace('\t', '').replace('\n', ' ')
def stringify(self, padding="", print_record_columns=True):
string = padding + "Version Number: {}\n" \
@@ -629,7 +627,7 @@ def stringify(self, padding="", print_record_columns=True):
for comment in self.comments:
string += "\n" + padding + "Comment: {}".format(comment)
if print_record_columns:
- for index, record_column in self.record_columns.iteritems():
+ for index, record_column in self.record_columns.items():
string += "\n" \
+ padding + "Record Column {}:\n{}:".format(index, record_column.stringify(padding + "\t"))
return string
@@ -677,7 +675,7 @@ def _get_master_schema_row_name_and_remaining_sql(row_type, name, sql, remaining
if remaining_sql_command[0] == "[":
# The table name or index name is surrounded by brackets
- match_object = match("^\[(.*?)\]", remaining_sql_command)
+ match_object = match(r"^\[(.*?)\]", remaining_sql_command)
if not match_object:
log_message = "No bracket match found for {} name in sql for {} row name: {} and sql: {}."
@@ -784,7 +782,6 @@ def _get_master_schema_row_name_and_remaining_sql(row_type, name, sql, remaining
# Check to make sure the full comment indicators were found for "--" and "/*"
if (character == '-' and remaining_sql_command[index + 1] != '-') or \
(character == '/' and remaining_sql_command[index + 1] != '*'):
-
log_message = "Comment indicator '{}' found followed by an invalid secondary comment " \
"indicator: {} found in {} name in sql for {} row name: {} and sql: {}."
log_message = log_message.format(character, remaining_sql_command[index + 1],
@@ -1417,7 +1414,7 @@ class for parsing. This was decided to be the best way to associate comments ba
Note: When the check is done on the definition, we check the next character is not one of the
allowed characters in a column name to make sure the constraint preface is not the
beginning of a longer column name where it is not actually a constraint preface
- (example: primaryEmail). The "\w" regular expression when no LOCALE and UNICODE flags
+ (example: primaryEmail). The r'\w' regular expression when no LOCALE and UNICODE flags
are set will be equivalent to the set: [a-zA-Z0-9_].
"""
@@ -1426,7 +1423,7 @@ class for parsing. This was decided to be the best way to associate comments ba
if definition[:len(table_constraint_preface)].upper() == table_constraint_preface:
if not (len(table_constraint_preface) + 1 <= len(definition)
- and match("\w", definition[len(table_constraint_preface)])):
+ and match(r"\w", definition[len(table_constraint_preface)])):
# We have found a table constraint here and make sure this is not the first definition
if definition_index == 0:
@@ -1625,7 +1622,7 @@ class for parsing. This was decided to be the best way to associate comments ba
log_message = "A table specified without a row id was found in table row with name: {} and sql: {}. " \
"This use case is not fully implemented."
log_message = log_message.format(self.name, self.sql)
- logger.warn(log_message)
+ logger.warning(log_message)
warn(log_message, RuntimeWarning)
def stringify(self, padding="", print_record_columns=True,
@@ -1861,7 +1858,7 @@ def __init__(self, version, b_tree_table_leaf_page_number, b_tree_table_leaf_cel
log_message = "Virtual table name: {} was found with module name: {} and sql: {}. Virtual table modules are " \
"not fully implemented."
log_message = log_message.format(self.name, self.module_name, self.sql)
- logger.warn(log_message)
+ logger.warning(log_message)
warn(log_message, RuntimeWarning)
"""
@@ -1992,7 +1989,7 @@ def __init__(self, version_interface, b_tree_table_leaf_page_number,
if table_row.without_row_id:
log_message = "Index row with name: {} and table name: {} was found to rely on a table without a row id."
log_message = log_message.format(self.name, self.table_name)
- logger.warn(log_message)
+ logger.warning(log_message)
warn(log_message, RuntimeWarning)
"""
@@ -2037,7 +2034,7 @@ def __init__(self, version_interface, b_tree_table_leaf_page_number,
log_message = "A index internal schema object found in index row with name: {} " \
"and sql: {}. This is not fully implemented and may cause issues with index pages."
log_message = log_message.format(self.name, self.sql)
- logger.warn(log_message)
+ logger.warning(log_message)
warn(log_message, RuntimeWarning)
else:
@@ -2278,7 +2275,7 @@ def __init__(self, version_interface, b_tree_table_leaf_page_number,
log_message = "A index specified as a partial index was found in index row with name: {} " \
"and sql: {}. This use case is not fully implemented."
log_message = log_message.format(self.name, self.sql)
- logger.warn(log_message)
+ logger.warning(log_message)
warn(log_message, RuntimeWarning)
def stringify(self, padding="", print_record_columns=True):
@@ -2297,7 +2294,6 @@ class ViewRow(MasterSchemaRow):
def __init__(self, version_interface, b_tree_table_leaf_page_number,
b_tree_table_leaf_cell, record_columns, tables):
-
super(ViewRow, self).__init__(version_interface, b_tree_table_leaf_page_number,
b_tree_table_leaf_cell, record_columns)
@@ -2314,7 +2310,6 @@ class TriggerRow(MasterSchemaRow):
def __init__(self, version_interface, b_tree_table_leaf_page_number,
b_tree_table_leaf_cell, record_columns, tables, views):
-
super(TriggerRow, self).__init__(version_interface, b_tree_table_leaf_page_number,
b_tree_table_leaf_cell, record_columns)
diff --git a/sqlite_dissect/file/schema/table.py b/sqlite_dissect/file/schema/table.py
index ba5b1c1..7cd5199 100644
--- a/sqlite_dissect/file/schema/table.py
+++ b/sqlite_dissect/file/schema/table.py
@@ -1,5 +1,4 @@
from logging import getLogger
-from re import sub
from sqlite_dissect.constants import LOGGER_NAME
from sqlite_dissect.exception import MasterSchemaRowParsingError
@@ -34,10 +33,10 @@ def __init__(self, index, constraint, comments=None):
self.comments = [comment.strip() for comment in comments] if comments else []
def __repr__(self):
- return self.__str__().encode("hex")
+ return self.__str__()
def __str__(self):
- return sub("\t", "", sub("\n", " ", self.stringify()))
+ return self.stringify().replace('\t', '').replace('\n', ' ')
def stringify(self, padding=""):
string = padding + "Index: {}\n" \
diff --git a/sqlite_dissect/file/utilities.py b/sqlite_dissect/file/utilities.py
index 5c38524..bb01999 100644
--- a/sqlite_dissect/file/utilities.py
+++ b/sqlite_dissect/file/utilities.py
@@ -13,8 +13,8 @@
def validate_page_version_history(version_history):
- for version_number, version in version_history.versions.iteritems():
- for page_number, page in version.pages.iteritems():
+ for version_number, version in version_history.versions.items():
+ for page_number, page in version.pages.items():
if page.page_version_number != version.page_version_index[page.number]:
return False
if page.version_number != version.version_number:
diff --git a/sqlite_dissect/file/version.py b/sqlite_dissect/file/version.py
index 684caf3..03733d5 100644
--- a/sqlite_dissect/file/version.py
+++ b/sqlite_dissect/file/version.py
@@ -2,7 +2,6 @@
from abc import abstractmethod
from binascii import hexlify
from logging import getLogger
-from re import sub
from sqlite_dissect.constants import INDEX_INTERIOR_PAGE_HEX_ID
from sqlite_dissect.constants import INDEX_LEAF_PAGE_HEX_ID
from sqlite_dissect.constants import LOGGER_NAME
@@ -139,10 +138,10 @@ def __init__(self, file_handle, version_number, store_in_memory, strict_format_c
self.updated_b_tree_page_numbers = None
def __repr__(self):
- return self.__str__().encode("hex")
+ return self.__str__()
def __str__(self):
- return sub("\t", "", sub("\n", " ", self.stringify()))
+ return self.stringify().replace('\t', '').replace('\n', ' ')
def stringify(self, padding="", print_pages=True, print_schema=True):
string = padding + "File Type: {}\n" \
@@ -182,7 +181,7 @@ def stringify(self, padding="", print_pages=True, print_schema=True):
self.pointer_map_pages_modified,
self.updated_b_tree_page_numbers)
if print_pages:
- for page in self.pages.itervalues():
+ for page in self.pages.values():
string += "\n" + padding + "Page:\n{}".format(page.stringify(padding + "\t"))
if print_schema:
string += "\n" \
@@ -268,7 +267,7 @@ def pages(self):
self._logger.error(log_message)
raise VersionParsingError(log_message)
- for page_number in [page_index + 1 for page_index in range(self.database_size_in_pages)]:
+ for page_number in [page_index + 1 for page_index in range(int(self.database_size_in_pages))]:
if page_number not in pages:
log_message = "Page number: {} was not found in the pages: {} for version: {}."
log_message = log_message.format(page_number, pages.keys(), self.version_number)
diff --git a/sqlite_dissect/file/version_parser.py b/sqlite_dissect/file/version_parser.py
index c2e23ae..eabdbda 100644
--- a/sqlite_dissect/file/version_parser.py
+++ b/sqlite_dissect/file/version_parser.py
@@ -1,6 +1,5 @@
from abc import ABCMeta
from logging import getLogger
-from re import sub
from warnings import warn
from sqlite_dissect.constants import BASE_VERSION_NUMBER
from sqlite_dissect.constants import LOGGER_NAME
@@ -139,7 +138,7 @@ def __init__(self, version_history, master_schema_entry, version_number=None, en
log_message = log_message.format(master_schema_entry.row_type, master_schema_entry.root_page_number,
master_schema_entry.name, master_schema_entry.table_name,
master_schema_entry.sql)
- logger.warn(log_message)
+ logger.warning(log_message)
warn(log_message, RuntimeWarning)
# Set the page type and update it as appropriate
@@ -151,7 +150,7 @@ def __init__(self, version_history, master_schema_entry, version_number=None, en
log_message = log_message.format(master_schema_entry.root_page_number,
master_schema_entry.row_type, master_schema_entry.name,
master_schema_entry.table_name, master_schema_entry.sql)
- logger.warn(log_message)
+ logger.warning(log_message)
warn(log_message, RuntimeWarning)
elif isinstance(master_schema_entry, OrdinaryTableRow) and master_schema_entry.without_row_id:
@@ -161,7 +160,7 @@ def __init__(self, version_history, master_schema_entry, version_number=None, en
log_message = log_message.format(master_schema_entry.root_page_number,
master_schema_entry.row_type, master_schema_entry.name,
master_schema_entry.table_name, master_schema_entry.sql)
- logger.warn(log_message)
+ logger.warning(log_message)
warn(log_message, RuntimeWarning)
self.page_type = PAGE_TYPE.B_TREE_INDEX_LEAF
@@ -277,7 +276,7 @@ def __init__(self, version_history, master_schema_entry, version_number=None, en
log_message = log_message.format(version_number, self.name, self.table_name, self.row_type,
self.sql, self.parser_starting_version_number,
self.parser_ending_version_number)
- logger.warn(log_message)
+ logger.warning(log_message)
warn(log_message, RuntimeWarning)
if starting_version_number is None and ending_version_number is None:
@@ -287,7 +286,7 @@ def __init__(self, version_history, master_schema_entry, version_number=None, en
log_message = log_message.format(self.parser_starting_version_number,
self.parser_ending_version_number, self.name, self.table_name,
self.row_type, self.sql)
- logger.warn(log_message)
+ logger.warning(log_message)
warn(log_message, RuntimeWarning)
self.parser_starting_version_number = starting_version_number
@@ -307,10 +306,10 @@ def __init__(self, version_history, master_schema_entry, version_number=None, en
"""
def __repr__(self):
- return self.__str__().encode("hex")
+ return self.__str__()
def __str__(self):
- return sub("\t", "", sub("\n", " ", self.stringify()))
+ return self.stringify().replace('\t', '').replace('\n', ' ')
def stringify(self, padding=""):
string = padding + "Row Type: {}\n" \
diff --git a/sqlite_dissect/file/wal/commit_record.py b/sqlite_dissect/file/wal/commit_record.py
index b3f1051..5b2c813 100644
--- a/sqlite_dissect/file/wal/commit_record.py
+++ b/sqlite_dissect/file/wal/commit_record.py
@@ -100,7 +100,7 @@ def __init__(self, version_number, database, write_ahead_log, frames, page_frame
self._database = database
- for page_version_number in page_version_index.itervalues():
+ for page_version_number in page_version_index.values():
if page_version_number >= version_number:
log_message = "Page version number: {} is greater than the commit record specified version: {}."
log_message = log_message.format(page_version_number, version_number)
@@ -181,7 +181,7 @@ def __init__(self, version_number, database, write_ahead_log, frames, page_frame
self.frames[frame.header.page_number] = frame
# Set the updated page numbers derived from this commit records frame keys
- self.updated_page_numbers = copy(self.frames.keys())
+ self.updated_page_numbers = copy(list(self.frames.keys()))
log_message = "Commit Record Version: {} has the updated page numbers: {}."
log_message = log_message.format(self.version_number, self.updated_page_numbers)
@@ -226,7 +226,7 @@ def __init__(self, version_number, database, write_ahead_log, frames, page_frame
"when parsing."
log_message = log_message.format(len(self.page_version_index), self.database_size_in_pages,
self.version_number, self.page_version_index)
- self._logger.warn(log_message)
+ self._logger.warning(log_message)
warn(log_message, RuntimeWarning)
"""
@@ -332,7 +332,7 @@ def __init__(self, version_number, database, write_ahead_log, frames, page_frame
log_message = "The sqlite database root page was found in version: {} in the updated pages: {} when " \
"both the database header and the root b-tree page were not modified."
log_message = log_message.format(self.version_number, self.updated_page_numbers)
- self._logger.warn(log_message)
+ self._logger.warning(log_message)
warn(log_message, RuntimeWarning)
if not self.master_schema_modified:
@@ -416,7 +416,7 @@ def __init__(self, version_number, database, write_ahead_log, frames, page_frame
"committed page size is {}. Possibly erroneous use cases may occur when parsing."
log_message = log_message.format(self.version_number, last_database_header.database_size_in_pages,
self.committed_page_size)
- self._logger.warn(log_message)
+ self._logger.warning(log_message)
warn(log_message, RuntimeWarning)
if self.master_schema_modified:
@@ -648,7 +648,7 @@ def stringify(self, padding="", print_pages=True, print_schema=True, print_frame
string += "\n" + padding + "Database Header Differences:"
# Parse the database header differences
- for field, difference in self.database_header_differences.iteritems():
+ for field, difference in self.database_header_differences.items():
difference_string = "\n" + padding + "\t" + "Field: {} changed from previous Value: {} to new Value: {}"
string += difference_string.format(field, difference[0], difference[1])
diff --git a/sqlite_dissect/file/wal/frame.py b/sqlite_dissect/file/wal/frame.py
index 77acbdd..abc5a2a 100644
--- a/sqlite_dissect/file/wal/frame.py
+++ b/sqlite_dissect/file/wal/frame.py
@@ -1,6 +1,5 @@
from binascii import hexlify
from logging import getLogger
-from re import sub
from sqlite_dissect.constants import FILE_TYPE
from sqlite_dissect.constants import LOGGER_NAME
from sqlite_dissect.constants import MASTER_PAGE_HEX_ID
@@ -74,10 +73,10 @@ def __init__(self, file_handle, frame_index, commit_record_number):
self.contains_sqlite_database_header = True
def __repr__(self):
- return self.__str__().encode("hex")
+ return self.__str__()
def __str__(self):
- return sub("\t", "", sub("\n", " ", self.stringify()))
+ return self.stringify().replace('\t', '').replace('\n', ' ')
def stringify(self, padding=""):
string = padding + "Frame Index: {}\n" \
diff --git a/sqlite_dissect/file/wal/header.py b/sqlite_dissect/file/wal/header.py
index 7acdd34..22a6df4 100644
--- a/sqlite_dissect/file/wal/header.py
+++ b/sqlite_dissect/file/wal/header.py
@@ -1,5 +1,4 @@
from logging import getLogger
-from re import sub
from struct import unpack
from warnings import warn
from sqlite_dissect.constants import LOGGER_NAME
@@ -66,7 +65,7 @@ def __init__(self, wal_header_byte_array):
if self.checkpoint_sequence_number != 0:
log_message = "Checkpoint sequence number is {} instead of 0 and may cause inconsistencies in wal parsing."
log_message = log_message.format(self.checkpoint_sequence_number)
- logger.warn(log_message)
+ logger.warning(log_message)
warn(log_message, RuntimeWarning)
self.salt_1 = unpack(b">I", wal_header_byte_array[16:20])[0]
@@ -119,10 +118,10 @@ def __init__(self, wal_frame_header_byte_array):
self.md5_hex_digest = get_md5_hash(wal_frame_header_byte_array)
def __repr__(self):
- return self.__str__().encode("hex")
+ return self.__str__()
def __str__(self):
- return sub("\t", "", sub("\n", " ", self.stringify()))
+ return self.stringify().replace('\t', '').replace('\n', ' ')
def stringify(self, padding=""):
string = padding + "Page Number: {}\n" \
diff --git a/sqlite_dissect/file/wal/wal.py b/sqlite_dissect/file/wal/wal.py
index 80a0158..511a336 100644
--- a/sqlite_dissect/file/wal/wal.py
+++ b/sqlite_dissect/file/wal/wal.py
@@ -1,5 +1,4 @@
from logging import getLogger
-from re import sub
from warnings import warn
from sqlite_dissect.constants import FILE_TYPE
from sqlite_dissect.constants import LOGGER_NAME
@@ -44,7 +43,7 @@ def __init__(self, file_identifier, store_in_memory=False, file_size=None, stric
frame_size = (WAL_FRAME_HEADER_LENGTH + self.file_handle.header.page_size)
- self.number_of_frames = (self.file_handle.file_size - WAL_HEADER_LENGTH) / frame_size
+ self.number_of_frames = int((self.file_handle.file_size - WAL_HEADER_LENGTH) / frame_size)
valid_frame_array = []
invalid_frame_array = []
@@ -78,7 +77,7 @@ def __init__(self, file_identifier, store_in_memory=False, file_size=None, stric
# Initialize the dictionary
self.invalid_frame_indices = {}
- for frame_index in range(self.number_of_frames):
+ for frame_index in range(int(self.number_of_frames)):
frame = WriteAheadLogFrame(self.file_handle, frame_index, commit_record_number)
@@ -175,7 +174,7 @@ def __init__(self, file_identifier, store_in_memory=False, file_size=None, stric
log_message = "The wal file contains {} invalid frames. Invalid frames are currently skipped and not " \
"implemented which may cause loss in possible carved data at this time until implemented."
log_message = log_message.format(len(self.invalid_frames))
- logger.warn(log_message)
+ logger.warning(log_message)
warn(log_message, RuntimeWarning)
self.last_frame_commit_record = None
@@ -212,10 +211,10 @@ def __init__(self, file_identifier, store_in_memory=False, file_size=None, stric
raise NotImplementedError(log_message)
def __repr__(self):
- return self.__str__().encode("hex")
+ return self.__str__()
def __str__(self):
- return sub("\t", "", sub("\n", " ", self.stringify()))
+ return self.stringify().replace('\t', '').replace('\n', ' ')
def stringify(self, padding="", print_frames=True):
string = padding + "File Handle:\n{}"
@@ -232,9 +231,9 @@ def stringify(self, padding="", print_frames=True):
self.invalid_frame_indices,
self.last_frame_commit_record.frame_index + 1)
if print_frames:
- for frame in self.frames.itervalues():
+ for frame in self.frames.values():
string += "\n" + padding + "Frame:\n{}".format(frame.stringify(padding + "\t"))
if print_frames and self.invalid_frames:
- for invalid_frame in self.invalid_frames.itervalues():
+ for invalid_frame in self.invalid_frames.values():
string += "\n" + padding + "Invalid Frame:\n{}".format(invalid_frame.stringify(padding + "\t"))
return string
diff --git a/sqlite_dissect/file/wal_index/header.py b/sqlite_dissect/file/wal_index/header.py
index a7e3eee..fb75f25 100644
--- a/sqlite_dissect/file/wal_index/header.py
+++ b/sqlite_dissect/file/wal_index/header.py
@@ -1,6 +1,5 @@
from binascii import hexlify
from logging import getLogger
-from re import sub
from struct import unpack
from sqlite_dissect.constants import ENDIANNESS
from sqlite_dissect.constants import LOGGER_NAME
@@ -235,10 +234,10 @@ def __init__(self, wal_index_checkpoint_info_byte_array, endianness):
self.md5_hex_digest = get_md5_hash(wal_index_checkpoint_info_byte_array)
def __repr__(self):
- return self.__str__().encode("hex")
+ return self.__str__()
def __str__(self):
- return sub("\t", "", sub("\n", " ", self.stringify()))
+ return self.stringify().replace('\t', '').replace('\n', ' ')
def stringify(self, padding=""):
string = padding + "Endianness: {}\n" \
diff --git a/sqlite_dissect/file/wal_index/wal_index.py b/sqlite_dissect/file/wal_index/wal_index.py
index 66642de..c5cf578 100644
--- a/sqlite_dissect/file/wal_index/wal_index.py
+++ b/sqlite_dissect/file/wal_index/wal_index.py
@@ -1,9 +1,8 @@
from logging import getLogger
-from re import sub
from struct import unpack
-from sqlite_dissect.constants import FILE_TYPE
-from sqlite_dissect.constants import LOGGER_NAME
-from sqlite_dissect.constants import WAL_INDEX_HEADER_LENGTH
+
+from sqlite_dissect.constants import (FILE_TYPE, LOGGER_NAME,
+ WAL_INDEX_HEADER_LENGTH)
from sqlite_dissect.file.file_handle import FileHandle
"""
@@ -29,7 +28,7 @@ def __init__(self, file_name, file_size=None):
zero = False
start = WAL_INDEX_HEADER_LENGTH
while not zero:
- i = (start - WAL_INDEX_HEADER_LENGTH) / 4
+ i = (start - WAL_INDEX_HEADER_LENGTH) // 4
data = unpack(b"> 64:
- return None
- elif value >> 56:
- encoded_string.insert(0, value & 0xFF)
- value >>= 8
- else:
- encoded_string.insert(0, value & 0x7F)
- value >>= 7
- while value != 0:
- byte_to_insert = (value & 0x7F) | 0x80
- encoded_string.insert(0, byte_to_insert)
- value >>= 7
- return encoded_string
-'''
varint_tuples = [
(0x10, encode_varint(0x10)),
@@ -50,7 +25,7 @@ def encode_varint(value):
@pytest.mark.parametrize('value, encoded_value', varint_tuples)
def test_decode_varint_in_reverse(value, encoded_value):
with pytest.raises(ValueError):
- decode_varint_in_reverse(bytearray('0'*9), 11)
+ decode_varint_in_reverse(bytearray(b'0'*9), 11)
assert decode_varint_in_reverse(encoded_value, len(encoded_value))[0] == value
@@ -83,7 +58,7 @@ def test_generate_regex_for_simplified_serial_type():
# hardcoded values for -2 and -1
# hex string for 0-9
# CarvingError for anything else
- assert generate_regex_for_simplified_serial_type(4) == "\x04"
+ assert generate_regex_for_simplified_serial_type(4) == b"\x04"
with pytest.raises(CarvingError):
generate_regex_for_simplified_serial_type(-10)
diff --git a/sqlite_dissect/tests/file_utilities_test.py b/sqlite_dissect/tests/file_utilities_test.py
index 7db58f6..8d515f6 100644
--- a/sqlite_dissect/tests/file_utilities_test.py
+++ b/sqlite_dissect/tests/file_utilities_test.py
@@ -33,8 +33,8 @@ def test_validate_page_version_history(change, expected_result):
else:
modified = False
- for version_number, version in version_history.versions.iteritems():
- for page_number, page in version.pages.iteritems():
+ for version_number, version in version_history.versions.items():
+ for page_number, page in version.pages.items():
# modifies first page version number
if change == 1:
page.page_version_number += 1
diff --git a/sqlite_dissect/tests/nist_assertions.py b/sqlite_dissect/tests/nist_assertions.py
index f383951..7e8fafa 100644
--- a/sqlite_dissect/tests/nist_assertions.py
+++ b/sqlite_dissect/tests/nist_assertions.py
@@ -1,5 +1,4 @@
from os.path import exists, basename
-from hashlib import md5
def row_equals(row1, row2):
@@ -40,9 +39,9 @@ def assert_file_exists(file_path):
# SFT-CA-03
def assert_correct_page_size(reported_size, correct_size):
- assert reported_size == correct_size, "The program reports an incorrect page size!\nCorrect page size: %d\n" \
- "Reported page size: %d" % (correct_size, reported_size)
-
+ assert reported_size == correct_size, "The program reports an incorrect page size!\n" \
+ f"Correct page size: {correct_size}\n" \
+ f"Reported page size: {reported_size}"
# SFT-CA-04
# SFT-CA-05
@@ -73,7 +72,7 @@ def assert_correct_num_pages(reported_num, correct_num):
def assert_correct_encoding(reported_enc, correct_enc):
assert reported_enc.upper() == correct_enc.upper(), "The program reports and incorrect database text encoding!\n" \
"Correct encoding: %s\nReported encoding: %s" % (
- correct_enc, reported_enc)
+ correct_enc, reported_enc)
# SFT-CA-08
@@ -127,7 +126,7 @@ def assert_correct_rows(reported_rows, correct_rows):
# SFT-CA-13
def assert_correct_source(reported_source, accepted_sources, element):
assert reported_source in accepted_sources, "The program reports an invalid file source!\n Element: %s\n" \
- "Reported source: %s" % (element, reported_source)
+ "Reported source: %s" % (element, reported_source)
# NIST SFT-AO:
diff --git a/sqlite_dissect/tests/nist_test.py b/sqlite_dissect/tests/nist_test.py
index 88af02c..5f5ca09 100644
--- a/sqlite_dissect/tests/nist_test.py
+++ b/sqlite_dissect/tests/nist_test.py
@@ -5,9 +5,14 @@
from sqlite_dissect.entrypoint import main
import io
import sys
-import os
import pytest
+
+from contextlib import redirect_stdout
+from hashlib import md5
+from io import StringIO
+from sqlite_dissect.entrypoint import main
from sqlite_dissect.constants import FILE_TYPE
+from sqlite_dissect.tests import nist_assertions
from sqlite_dissect.tests.utilities import db_file, parse_csv
from sqlite_dissect.utilities import get_sqlite_files, parse_args
@@ -30,46 +35,47 @@ def test_header_reporting(db_file):
db_filepath = str(db_file[0].resolve())
hash_before_parsing = get_md5_hash(db_filepath)
- parser_output = io.BytesIO()
- sys.stdout = parser_output
args = parse_args([db_filepath, '--header'])
sqlite_files = get_sqlite_files(args.sqlite_path)
- main(args, sqlite_files[0], len(sqlite_files) > 1)
-
- reported_page_size = None
- reported_journal_mode_read = None
- reported_journal_mode_write = None
- reported_num_pages = None
- reported_encoding = None
- for line in parser_output.getvalue().splitlines():
- if "FILE FORMAT WRITE VERSION" in line.upper():
- reported_journal_mode_write = line.split(': ')[1].strip()
- elif "FILE FORMAT READ VERSION" in line.upper():
- reported_journal_mode_read = line.split(': ')[1].strip()
- elif "PAGE SIZE" in line.upper():
- reported_page_size = int(line.split(': ')[1].strip())
- elif "DATABASE SIZE IN PAGES" in line.upper():
- reported_num_pages = int(line.split(': ')[1].strip())
- elif "DATABASE TEXT ENCODING" in line.upper():
- reported_encoding = line.split(': ')[1].strip()
-
- actual_database = sqlite3.connect(db_filepath)
- db_cursor = actual_database.cursor()
-
- actual_page_size = fetch_pragma(db_cursor, 'page_size')
- actual_journal_mode = fetch_pragma(db_cursor, 'journal_mode')
- actual_num_pages = fetch_pragma(db_cursor, 'page_count')
- actual_encoding = fetch_pragma(db_cursor, 'encoding')
- hash_after_parsing = get_md5_hash(db_filepath)
+ with redirect_stdout(io.StringIO()) as output:
+ main(args, sqlite_files[0], len(sqlite_files) > 1)
- nist_assertions.assert_md5_equals(hash_before_parsing, hash_after_parsing, db_file[0].name)
- nist_assertions.assert_file_exists(db_filepath)
- nist_assertions.assert_correct_page_size(reported_page_size, actual_page_size)
- nist_assertions.assert_correct_journal_mode(reported_journal_mode_read, actual_journal_mode, 'r')
- nist_assertions.assert_correct_journal_mode(reported_journal_mode_write, actual_journal_mode, 'w')
- nist_assertions.assert_correct_num_pages(reported_num_pages, actual_num_pages)
- nist_assertions.assert_correct_encoding(reported_encoding, actual_encoding)
+ reported_page_size = None
+ reported_journal_mode_read = None
+ reported_journal_mode_write = None
+ reported_num_pages = None
+ reported_encoding = None
+
+ for line in output.getvalue().splitlines():
+ if "FILE FORMAT WRITE VERSION" in line.upper():
+ reported_journal_mode_write = line.split(': ')[1].strip()
+ elif "FILE FORMAT READ VERSION" in line.upper():
+ reported_journal_mode_read = line.split(': ')[1].strip()
+ elif "PAGE SIZE" in line.upper():
+ reported_page_size = int(line.split(': ')[1].strip())
+ elif "DATABASE SIZE IN PAGES" in line.upper():
+ reported_num_pages = int(line.split(': ')[1].strip())
+ elif "DATABASE TEXT ENCODING" in line.upper():
+ reported_encoding = line.split(': ')[1].strip()
+
+ actual_database = sqlite3.connect(db_filepath)
+ db_cursor = actual_database.cursor()
+
+ actual_page_size = fetch_pragma(db_cursor, 'page_size')
+ actual_journal_mode = fetch_pragma(db_cursor, 'journal_mode')
+ actual_num_pages = fetch_pragma(db_cursor, 'page_count')
+ actual_encoding = fetch_pragma(db_cursor, 'encoding')
+
+ hash_after_parsing = get_md5_hash(db_filepath)
+
+ nist_assertions.assert_md5_equals(hash_before_parsing, hash_after_parsing, db_file[0].name)
+ nist_assertions.assert_file_exists(db_filepath)
+ nist_assertions.assert_correct_page_size(reported_page_size, actual_page_size)
+ nist_assertions.assert_correct_journal_mode(reported_journal_mode_read, actual_journal_mode, 'r')
+ nist_assertions.assert_correct_journal_mode(reported_journal_mode_write, actual_journal_mode, 'w')
+ nist_assertions.assert_correct_num_pages(reported_num_pages, actual_num_pages)
+ nist_assertions.assert_correct_encoding(reported_encoding, actual_encoding)
# SFT-02
@@ -114,33 +120,33 @@ def test_schema_reporting(db_file, tmp_path):
current_table = None
row_count = 0
- actual_database = sqlite3.connect(db_filepath)
- db_cursor = actual_database.cursor()
- db_cursor.execute("SELECT tbl_name, sql FROM sqlite_master WHERE type='table'")
+ actual_database = sqlite3.connect(db_filepath)
+ db_cursor = actual_database.cursor()
+ db_cursor.execute("SELECT tbl_name, sql FROM sqlite_master WHERE type='table'")
- actual_tables = []
- actual_columns = {}
- actual_num_rows = {}
- for table in db_cursor.fetchall():
- actual_tables.append(table[0])
- actual_columns[table[0]] = []
+ actual_tables = []
+ actual_columns = {}
+ actual_num_rows = {}
+ for table in db_cursor.fetchall():
+ actual_tables.append(table[0])
+ actual_columns[table[0]] = []
- columns = table[1][table[1].find("(")+1:table[1].find(")")]
- for column in columns.split(","):
- actual_columns[table[0]].append(column.strip().split()[0])
+ columns = table[1][table[1].find("(")+1:table[1].find(")")]
+ for column in columns.split(","):
+ actual_columns[table[0]].append(column.strip().split()[0])
- db_cursor.execute("SELECT COUNT(*) FROM %s" % table[0])
- actual_num_rows[table[0]] = int(db_cursor.fetchone()[0])
+ db_cursor.execute("SELECT COUNT(*) FROM %s" % table[0])
+ actual_num_rows[table[0]] = int(db_cursor.fetchone()[0])
- hash_after_parsing = get_md5_hash(db_filepath)
+ hash_after_parsing = get_md5_hash(db_filepath)
- nist_assertions.assert_md5_equals(hash_before_parsing, hash_after_parsing, db_file[0].name)
- nist_assertions.assert_file_exists(db_filepath)
- nist_assertions.assert_correct_tables(reported_tables, actual_tables)
+ nist_assertions.assert_md5_equals(hash_before_parsing, hash_after_parsing, db_file[0].name)
+ nist_assertions.assert_file_exists(db_filepath)
+ nist_assertions.assert_correct_tables(reported_tables, actual_tables)
- for table in reported_columns:
- nist_assertions.assert_correct_columns(reported_columns[table], actual_columns[table], table)
- nist_assertions.assert_correct_num_pages(reported_num_rows[table], actual_num_rows[table])
+ for table in reported_columns:
+ nist_assertions.assert_correct_columns(reported_columns[table], actual_columns[table], table)
+ nist_assertions.assert_correct_num_pages(reported_num_rows[table], actual_num_rows[table])
# SFT-03
@@ -159,7 +165,7 @@ def test_row_recovery(db_file, tmp_path):
args = parse_args([db_filepath, '-c',
'-e', 'csv', '--directory', str(tmp_path)])
sqlite_files = get_sqlite_files(args.sqlite_path)
- main(args, sqlite_files[0], len(sqlite_files) > 1)
+ main(args, str(sqlite_files[0]), len(sqlite_files) > 1)
recovered_rows = []
@@ -184,10 +190,9 @@ def test_metadata_reporting(db_file):
parser_output = sys.stdout
args = parse_args([db_filepath, '-c'])
sqlite_files = get_sqlite_files(args.sqlite_path)
- main(args, sqlite_files[0], len(sqlite_files) > 1)
+ main(args, str(sqlite_files[0]), len(sqlite_files) > 1)
current_table = None
- log_lines = ''
for line in parser_output.read().splitlines():
if "Master schema entry: " in line and "row type: table" in line:
current_table = line[line.find("Master schema entry: "):line.find("row type: ")].split(': ')[1].strip()
@@ -200,11 +205,6 @@ def test_metadata_reporting(db_file):
elif line == '-' * 15:
current_table = None
- # Logging for debugging purposes:
- # with open(os.path.join(os.path.split(__file__)[0], 'log_files', db_file[0].name + '.log'), 'w') as log_file:
- # log_file.write("Recovered table rows:\n")
- # log_file.write(log_lines)
-
hash_after_parsing = get_md5_hash(db_filepath)
nist_assertions.assert_md5_equals(hash_before_parsing, hash_after_parsing, db_file[0].name)
diff --git a/sqlite_dissect/tests/output_test.py b/sqlite_dissect/tests/output_test.py
index df1e442..b09f0e8 100644
--- a/sqlite_dissect/tests/output_test.py
+++ b/sqlite_dissect/tests/output_test.py
@@ -69,10 +69,10 @@ def test_get_pointer_map_entries_breakdown():
MockPointerMapPage([MockPage("LOCK_BYTE", 0), MockPage("FREELIST_TRUNK", 1), MockPage("FREELIST_LEAF", 2)], 0, 0),
MockPointerMapPage([MockPage("LOCK_BYTE", 3), MockPage("FREELIST_TRUNK", 4), MockPage("FREELIST_LEAF", 5)], 1, 0)
], 0)) == [
- (0, 1, 0, 0, '4c4f434b5f42595445'),
- (0, 1, 1, 1, '465245454c4953545f5452554e4b'),
- (0, 2, 2, 1, '465245454c4953545f4c454146'),
- (1, 2, 3, 2, '4c4f434b5f42595445'),
- (1, 4, 4, 1, '465245454c4953545f5452554e4b'),
- (1, 5, 5, 1, '465245454c4953545f4c454146')
+ (0, 1, 0, 0, b'4c4f434b5f42595445'),
+ (0, 1, 1, 1, b'465245454c4953545f5452554e4b'),
+ (0, 2, 2, 1, b'465245454c4953545f4c454146'),
+ (1, 2, 3, 2, b'4c4f434b5f42595445'),
+ (1, 4, 4, 1, b'465245454c4953545f5452554e4b'),
+ (1, 5, 5, 1, b'465245454c4953545f4c454146')
]
\ No newline at end of file
diff --git a/sqlite_dissect/tests/test_headers.py b/sqlite_dissect/tests/test_headers.py
index 8fd8e37..4c08c85 100644
--- a/sqlite_dissect/tests/test_headers.py
+++ b/sqlite_dissect/tests/test_headers.py
@@ -313,8 +313,8 @@ def test_journal_header_init(rollback_journal_header_byte_array, expected_value)
assert journal_header.header_string == rollback_journal_header_byte_array[0:8]
assert journal_header.page_count == (
- ROLLBACK_JOURNAL_ALL_CONTENT_UNTIL_END_OF_FILE \
- if rollback_journal_header_byte_array[8:12] == ROLLBACK_JOURNAL_HEADER_ALL_CONTENT.decode("hex") \
+ ROLLBACK_JOURNAL_ALL_CONTENT_UNTIL_END_OF_FILE
+ if rollback_journal_header_byte_array[8:12] == ROLLBACK_JOURNAL_HEADER_ALL_CONTENT
else unpack(b">I", rollback_journal_header_byte_array[8:12])[0]
)
diff --git a/sqlite_dissect/tests/test_payload.py b/sqlite_dissect/tests/test_payload.py
index 554d502..c603485 100644
--- a/sqlite_dissect/tests/test_payload.py
+++ b/sqlite_dissect/tests/test_payload.py
@@ -53,7 +53,7 @@ def test_record_init(page, payload_offset, payload_byte_size, bytes_on_first_pag
current_header_offset = decode_varint(page, payload_offset)[1]
num_columns = 0
- serial_type_signature = b""
+ serial_type_signature = ""
while current_header_offset < decode_varint(page, payload_offset)[0]:
serial_type, serial_type_varint_length = decode_varint(total_record_content, current_header_offset)
serial_type_signature += str(get_serial_type_signature(serial_type))
diff --git a/sqlite_dissect/tests/utilities.py b/sqlite_dissect/tests/utilities.py
index adf9a9e..047cd30 100644
--- a/sqlite_dissect/tests/utilities.py
+++ b/sqlite_dissect/tests/utilities.py
@@ -1,22 +1,25 @@
-import hashlib
-import os
import pytest
+import hashlib
import sqlite3
import random
import string
import re
-from collections import OrderedDict
import uuid
+from collections import OrderedDict
+
-def strip_one(string, pattern):
- return re.sub(pattern + '$', "", re.sub('^' + pattern, "", string))
+def strip_one(s: str, pattern: str):
+ return re.sub(pattern + '$', "", re.sub('^' + pattern, "", s))
+
+
+def find_breakpoints(input_string, quote_chars=None, delim=','):
+ if quote_chars is None:
+ quote_chars = ["'", '"']
-def find_breakpoints(input_string, quote_chars = ["'", '"'], delim = ','):
breakpoints = []
in_quotes = None
is_encapsulated = False
- last_char = None
for index, character in enumerate(input_string):
if in_quotes:
if character == in_quotes:
@@ -38,19 +41,21 @@ def find_breakpoints(input_string, quote_chars = ["'", '"'], delim = ','):
return breakpoints
+
def parse_rows(row_string):
commas = find_breakpoints(row_string)
row_dict = {}
- row_list = [row_string[i:j].strip() for i,j in zip([0] + [index + 1 for index in commas], commas + [None])]
+ row_list = [row_string[i:j].strip() for i, j in zip([0] + [index + 1 for index in commas], commas + [None])]
for row in row_list:
spaces = find_breakpoints(row, delim=' ')
- row_dict[strip_one(row[ : spaces[0]], '[\'"]').lstrip('[ ').rstrip('] ')] = row[spaces[0] : ].strip()
+ row_dict[strip_one(row[: spaces[0]], '[\'"]').lstrip('[ ').rstrip('] ')] = row[spaces[0]:].strip()
return row_dict
-def get_index_of_closing_parenthesis(string, opening_parenthesis_offset=0):
+
+def get_index_of_closing_parenthesis(s: str, opening_parenthesis_offset=0):
in_quotes = None
in_block_comment = False
in_line_comment = False
@@ -61,11 +66,11 @@ def get_index_of_closing_parenthesis(string, opening_parenthesis_offset=0):
line_comment_chars = '--'
line_comment_term = '\n'
- for index, character in enumerate(string[opening_parenthesis_offset : ]):
+ for index, character in enumerate(s[opening_parenthesis_offset:]):
if in_quotes and character == in_quotes:
in_quotes = None
- elif in_block_comment and character == block_comment_term[0] and string[index : index + 2] == block_comment_term:
+ elif in_block_comment and character == block_comment_term[0] and s[index: index + 2] == block_comment_term:
in_block_comment = False
elif in_line_comment and character == line_comment_term:
@@ -75,10 +80,10 @@ def get_index_of_closing_parenthesis(string, opening_parenthesis_offset=0):
if character in quote_chars:
in_quotes = character
- elif character == block_comment_chars[0] and string[index : index + 2] == block_comment_chars:
+ elif character == block_comment_chars[0] and s[index: index + 2] == block_comment_chars:
in_block_comment = True
- elif character == line_comment_chars[0] and string[index : index + 2] == line_comment_chars:
+ elif character == line_comment_chars[0] and s[index: index + 2] == line_comment_chars:
in_line_comment = True
elif character == ')':
@@ -88,21 +93,24 @@ def get_index_of_closing_parenthesis(string, opening_parenthesis_offset=0):
def parse_schema(stdout):
tables = {}
+ next_parenthesis = 0
+ closing_parenthesis = 0
+
while stdout:
# Find the next table entry
- stdout = stdout[stdout.find("Type: table") : ]
- table_name = stdout[stdout.find("Table Name:") + 11 : stdout.find("SQL:")].strip()
+ stdout = stdout[stdout.find("Type: table"):]
+ table_name = stdout[stdout.find("Table Name:") + 11: stdout.find("SQL:")].strip()
if table_name:
- stdout = stdout[stdout.find("SQL:") + 4 : ]
-
+ stdout = stdout[stdout.find("SQL:") + 4:]
+
closing_parenthesis_found = False
in_quotes = False
index = 0
while not closing_parenthesis_found and stdout:
if stdout[index] == "'":
in_quotes = not in_quotes
-
+
elif stdout[index] == '(' and not in_quotes:
next_parenthesis = index
closing_parenthesis = get_index_of_closing_parenthesis(stdout, next_parenthesis)
@@ -111,26 +119,27 @@ def parse_schema(stdout):
index += 1
# Fetches lines with columns in them
- schema_statement = stdout[next_parenthesis + 1 : closing_parenthesis].strip()
+ schema_statement = stdout[next_parenthesis + 1: closing_parenthesis].strip()
tables[table_name] = parse_rows(schema_statement)
- stdout = stdout[closing_parenthesis + 1 : ]
+ stdout = stdout[closing_parenthesis + 1:]
return tables
-def get_md5_hash(string):
- return hashlib.md5(string).hexdigest().upper()
+
+def get_md5_hash(s: str):
+ return hashlib.md5(s).hexdigest().upper()
-def replace_bytes(byte_array, replacement, index):
+def replace_bytes(byte_array, replacement, index: int):
return byte_array[:index] + replacement + byte_array[index + len(replacement):]
-def decode_varint(byte_array, offset=0):
+def decode_varint(byte_array, offset: int = 0):
unsigned_integer_value = 0
varint_relative_offset = 0
- for x in xrange(1, 10):
+ for x in range(1, 10):
varint_byte = ord(byte_array[offset + varint_relative_offset:offset + varint_relative_offset + 1])
varint_relative_offset += 1
@@ -153,6 +162,7 @@ def decode_varint(byte_array, offset=0):
return signed_integer_value, varint_relative_offset
+
default_columns = OrderedDict(
[
('name', 'TEXT NOT NULL'),
@@ -311,16 +321,16 @@ def db_file(request, tmp_path):
row_values = [row[1:] for row in generate_rows(request.param['modify'], request.param['columns'])]
map(lambda row_values, id_for_mod: row_values.append(id_for_mod), row_values, id_for_mod)
for row_id in id_for_mod:
- cursor.execute("SELECT * FROM testing WHERE id=?", (row_id, ))
+ cursor.execute("SELECT * FROM testing WHERE id=?", (row_id,))
modified_rows.append(cursor.fetchone())
update_statement = generate_update_statement(request.param['table_name'], request.param['columns'])
- cursor.executemany(update_statement, row_values)
+ cursor.executemany(update_statement, row_list)
db.commit()
if request.param['delete'] > 0:
for row_id in id_for_del:
- cursor.execute("SELECT * FROM testing WHERE id=?", (row_id, ))
+ cursor.execute("SELECT * FROM testing WHERE id=?", (row_id,))
deleted_rows.append(cursor.fetchone())
cursor.executemany("DELETE FROM testing WHERE id=?", [[row_id] for row_id in id_for_del])
@@ -330,24 +340,24 @@ def db_file(request, tmp_path):
db.close()
yield db_filepath, modified_rows + deleted_rows
+
# Parses CSV file returned by sqlite_dissect operations and returns rows found that match the given operations.
-def parse_csv(filepath, operations, first_key = 'id'):
+def parse_csv(filepath, operations, first_key='id'):
accepted_sources = ["ROLLBACK_JOURNAL", "DATABASE", "WAL"]
with open(filepath, 'r') as csv_file:
key_line = csv_file.readline().strip()
commas = find_breakpoints(key_line)
- keys = [strip_one(key_line[i:j], "['\"]") for i,j in zip([0] + [index + 1 for index in commas], commas + [None])]
+ keys = [strip_one(key_line[i:j], "['\"]") for i, j in
+ zip([0] + [index + 1 for index in commas], commas + [None])]
op_index = keys.index("Operation")
first_index = keys.index(first_key)
rows = []
for line in csv_file:
line_list = map(lambda data: data.strip('"'), line.strip().split(','))
-
+
if line_list[0] in accepted_sources and line_list[op_index] in operations:
rows.append(tuple(line_list[first_index:]))
return tuple(rows)
-
-
diff --git a/sqlite_dissect/tests/utilities_test.py b/sqlite_dissect/tests/utilities_test.py
index 18b6681..3132d87 100644
--- a/sqlite_dissect/tests/utilities_test.py
+++ b/sqlite_dissect/tests/utilities_test.py
@@ -86,20 +86,20 @@ def test_get_record_content(self):
# Test when serial_type is >= 12 and even
result = get_record_content(12, test_string_array, 0)
self.assertEqual(0, result[0])
- self.assertEqual('', result[1])
+ self.assertEqual(b'', result[1])
result = get_record_content(24, test_string_array, 0)
self.assertEqual(6, result[0])
- self.assertEqual('this i', result[1])
+ self.assertEqual(b'this i', result[1])
# Test when serial_type is >= 13 and odd
result = get_record_content(13, test_string_array, 0)
self.assertEqual(0, result[0])
- self.assertEqual('', result[1])
+ self.assertEqual(b'', result[1])
result = get_record_content(25, test_string_array, 0)
self.assertEqual(6, result[0])
- self.assertEqual('this i', result[1])
+ self.assertEqual(b'this i', result[1])
# Test that the proper exception is thrown when the input is invalid
cases = [10, 11]
diff --git a/sqlite_dissect/utilities.py b/sqlite_dissect/utilities.py
index c22bea9..15c2def 100644
--- a/sqlite_dissect/utilities.py
+++ b/sqlite_dissect/utilities.py
@@ -3,21 +3,23 @@
from binascii import hexlify
from hashlib import md5
from logging import getLogger
-from re import compile
-from struct import pack
-from struct import unpack
-from os import walk, makedirs, path
+from os import makedirs, path, walk
from os.path import exists, isdir, join
-from sqlite_dissect.constants import ALL_ZEROS_REGEX, SQLITE_DATABASE_HEADER_LENGTH, MAGIC_HEADER_STRING, \
- MAGIC_HEADER_STRING_ENCODING, SQLITE_FILE_EXTENSIONS
-from sqlite_dissect.constants import LOGGER_NAME
-from sqlite_dissect.constants import OVERFLOW_HEADER_LENGTH
-from sqlite_dissect.constants import BLOB_SIGNATURE_IDENTIFIER
-from sqlite_dissect.constants import TEXT_SIGNATURE_IDENTIFIER
-from sqlite_dissect.exception import InvalidVarIntError
-from sqlite_dissect._version import __version__
+from re import compile
+from struct import pack, unpack
+
from configargparse import ArgParser
+from sqlite_dissect._version import __version__
+from sqlite_dissect.constants import (ALL_ZEROS_REGEX,
+ BLOB_SIGNATURE_IDENTIFIER, LOGGER_NAME,
+ MAGIC_HEADER_STRING,
+ OVERFLOW_HEADER_LENGTH,
+ SQLITE_DATABASE_HEADER_LENGTH,
+ SQLITE_FILE_EXTENSIONS,
+ TEXT_SIGNATURE_IDENTIFIER)
+from sqlite_dissect.exception import InvalidVarIntError
+
"""
utilities.py
@@ -68,7 +70,7 @@ def decode_varint(byte_array, offset=0):
unsigned_integer_value = 0
varint_relative_offset = 0
- for x in xrange(1, 10):
+ for x in range(1, 10):
varint_byte = ord(byte_array[offset + varint_relative_offset:offset + varint_relative_offset + 1])
varint_relative_offset += 1
@@ -108,17 +110,16 @@ def encode_varint(value):
if value & 0xff000000 << 32:
byte = value & 0xff
- byte_array.insert(0, pack("B", byte))
+ byte_array.insert(0, byte)
value >>= 8
- for _ in xrange(8):
- byte_array.insert(0, pack("B", (value & 0x7f) | 0x80))
+ for _ in range(8):
+ byte_array.insert(0, ((value & 0x7f) | 0x80))
value >>= 7
else:
-
while value:
- byte_array.insert(0, pack("B", (value & 0x7f) | 0x80))
+ byte_array.insert(0, ((value & 0x7f) | 0x80))
value >>= 7
if len(byte_array) >= 9:
@@ -128,7 +129,7 @@ def encode_varint(value):
getLogger(LOGGER_NAME).error(log_message)
raise InvalidVarIntError(log_message)
- byte_array[-1] &= 0x7f
+ byte_array = byte_array[:-1] + pack("B", (byte_array[-1] & 0x7f))
return byte_array
@@ -150,6 +151,9 @@ def get_class_instance(class_name):
def get_md5_hash(string):
md5_hash = md5()
+ # Ensure the string is properly encoded as a binary string
+ if isinstance(string, str):
+ string = string.encode()
md5_hash.update(string)
return md5_hash.hexdigest().upper()
@@ -173,7 +177,7 @@ def get_record_content(serial_type, record_body, offset=0):
# Big-endian 24-bit twos-complement integer
elif serial_type == 3:
content_size = 3
- value_byte_array = '\0' + record_body[offset:offset + content_size]
+ value_byte_array = b'\0' + record_body[offset:offset + content_size]
value = unpack(b">I", value_byte_array)[0]
if value & 0x800000:
value -= 0x1000000
@@ -186,7 +190,7 @@ def get_record_content(serial_type, record_body, offset=0):
# Big-endian 48-bit twos-complement integer
elif serial_type == 5:
content_size = 6
- value_byte_array = '\0' + '\0' + record_body[offset:offset + content_size]
+ value_byte_array = b'\0' + b'\0' + record_body[offset:offset + content_size]
value = unpack(b">Q", value_byte_array)[0]
if value & 0x800000000000:
value -= 0x1000000000000
@@ -217,12 +221,12 @@ def get_record_content(serial_type, record_body, offset=0):
# A BLOB that is (N-12)/2 bytes in length
elif serial_type >= 12 and serial_type % 2 == 0:
- content_size = (serial_type - 12) / 2
+ content_size = int((serial_type - 12) // 2)
value = record_body[offset:offset + content_size]
# A string in the database encoding and is (N-13)/2 bytes in length. The nul terminator is omitted
elif serial_type >= 13 and serial_type % 2 == 1:
- content_size = (serial_type - 13) / 2
+ content_size = int((serial_type - 13) // 2)
value = record_body[offset:offset + content_size]
else:
@@ -250,7 +254,7 @@ def has_content(byte_array):
return True
-def is_sqlite_file(path):
+def is_sqlite_file(path: str) -> bool:
"""
Determines if the specified file contains the magic bytes to indicate it is a SQLite file. This is not meant to be a
full validation of the file format, and that is asserted within the class at file/database/header.py.
@@ -269,13 +273,12 @@ def is_sqlite_file(path):
with open(path, "rb") as sqlite:
header = sqlite.read(SQLITE_DATABASE_HEADER_LENGTH)
header_magic = header[0:16]
- magic = MAGIC_HEADER_STRING.decode(MAGIC_HEADER_STRING_ENCODING)
- return header_magic == magic
+ return header_magic == MAGIC_HEADER_STRING
except IOError as e:
logging.error("Invalid SQLite file found: {}".format(e))
-def get_sqlite_files(path):
+def get_sqlite_files(path: str) -> []:
"""
Parses the path, validates it exists, and returns a list of all valid file(s) at the provided path. If the provided
path is a file, it ensures it's a valid SQLite file and returns the path. If it's a directory, it validates all
@@ -310,7 +313,7 @@ def get_sqlite_files(path):
return sqlite_files
-def create_directory(dir_path):
+def create_directory(dir_path: str) -> bool:
"""
Creates a directory if it doesn't already exist.
:param dir_path: The path of the directory to create
@@ -327,7 +330,7 @@ def create_directory(dir_path):
return exists(dir_path) and isdir(dir_path)
-def hash_file(file_path, hash_algo=hashlib.sha256()):
+def hash_file(file_path: str, hash_algo=hashlib.sha256()) -> str:
"""
Generates a hash of a file by chunking it and utilizing the Python hashlib library.
"""
@@ -345,6 +348,14 @@ def hash_file(file_path, hash_algo=hashlib.sha256()):
return hash_algo.hexdigest()
+def decode_str(string):
+ """Python compatibility for auto-detecting encoded strings and decoding them"""
+ if isinstance(string, bytes):
+ return string.decode()
+ else:
+ return string
+
+
# Uses ArgumentParser from argparse to evaluate user arguments.
def parse_args(args=None):
description = "SQLite Dissect is a SQLite parser with recovery abilities over SQLite databases " \
diff --git a/sqlite_dissect/version_history.py b/sqlite_dissect/version_history.py
index 4d3336f..68623c0 100644
--- a/sqlite_dissect/version_history.py
+++ b/sqlite_dissect/version_history.py
@@ -1,5 +1,4 @@
from logging import getLogger
-from re import sub
from warnings import warn
from sqlite_dissect.carving.carver import SignatureCarver
from sqlite_dissect.constants import BASE_VERSION_NUMBER
@@ -190,17 +189,17 @@ def __init__(self, database, write_ahead_log=None):
log_message = "Version (commit record): {} has additional frames beyond the last commit frame found " \
"in the write ahead log and erroneous use cases may occur when parsing."
log_message = log_message.format(commit_record_number)
- logger.warn(log_message)
+ logger.warning(log_message)
warn(log_message, RuntimeWarning)
# Set the number of versions
self.number_of_versions = len(self.versions)
def __repr__(self):
- return self.__str__().encode("hex")
+ return self.__str__()
def __str__(self):
- return sub("\t", "", sub("\n", " ", self.stringify()))
+ return self.stringify().replace('\t', '').replace('\n', ' ')
def stringify(self, padding="", print_versions=True):
string = "File Type: {}"
@@ -283,7 +282,7 @@ def __init__(self, version_history, master_schema_entry,
log_message = log_message.format(self.name, self.table_name, self.row_type, self.sql,
self.parser_starting_version_number, self.parser_ending_version_number,
MASTER_SCHEMA_ROW_TYPE.TABLE, signature.row_type)
- logger.warn(log_message)
+ logger.warning(log_message)
warn(log_message, RuntimeWarning)
# Set the signature
@@ -397,10 +396,13 @@ def __iter__(self):
return self
def __repr__(self):
- return self.__str__().encode("hex")
+ return self.__str__()
+
+ def __next__(self):
+ return self.next()
def __str__(self):
- return sub("\t", "", sub("\n", " ", self.stringify()))
+ return self.stringify().replace('\t', '').replace('\n', ' ')
def stringify(self, padding="", print_cells=True):
string = padding + "Page Type: {}\n" \
@@ -417,7 +419,7 @@ def stringify(self, padding="", print_cells=True):
self._current_b_tree_page_numbers,
self._carve_freelist_pages)
if print_cells:
- for current_cell in self._current_cells.itervalues():
+ for current_cell in self._current_cells.values():
string += "\n" + padding + "Cell:\n{}".format(current_cell.stringify(padding + "\t"))
return string
@@ -501,7 +503,7 @@ def next(self):
deleted_cells = {}
# Iterate through the current cells
- for current_cell_md5, current_cell in self._current_cells.iteritems():
+ for current_cell_md5, current_cell in self._current_cells.items():
# Remove the cell from the added cells if it was already pre-existing
if current_cell_md5 in added_cells:
@@ -694,7 +696,7 @@ def next(self):
# Initialize the carved cells
carved_cells = []
- for freelist_page_number, freelist_page in updated_freelist_pages.iteritems():
+ for freelist_page_number, freelist_page in updated_freelist_pages.items():
# Carve unallocated space
carvings = SignatureCarver.carve_unallocated_space(version, CELL_SOURCE.FREELIST,
@@ -773,10 +775,10 @@ def __init__(self, name, file_type, version_number, database_text_encoding, page
self.carved_cells = {}
def __repr__(self):
- return self.__str__().encode("hex")
+ return self.__str__()
def __str__(self):
- return sub("\t", "", sub("\n", " ", self.stringify()))
+ return self.stringify().replace('\t', '').replace('\n', ' ')
def stringify(self, padding="", print_cells=True):
string = padding + "Version Number: {}\n" \
@@ -798,13 +800,13 @@ def stringify(self, padding="", print_cells=True):
self.freelist_pages_carved,
self.updated_freelist_page_numbers)
if print_cells:
- for added_cell in self.added_cells.itervalues():
+ for added_cell in self.added_cells.values():
string += "\n" + padding + "Added Cell:\n{}".format(added_cell.stringify(padding + "\t"))
- for deleted_cell in self.deleted_cells.itervalues():
+ for deleted_cell in self.deleted_cells.values():
string += "\n" + padding + "Deleted Cell:\n{}".format(deleted_cell.stringify(padding + "\t"))
- for updated_cell in self.updated_cells.itervalues():
+ for updated_cell in self.updated_cells.values():
string += "\n" + padding + "Updated Cell:\n{}".format(updated_cell.stringify(padding + "\t"))
- for carved_cell in self.carved_cells.itervalues():
+ for carved_cell in self.carved_cells.values():
string += "\n" + padding + "Carved Cell:\n{}".format(carved_cell.stringify(padding + "\t"))
return string