From 2f37ee907796872421a8f6754cdcac137c8a1471 Mon Sep 17 00:00:00 2001 From: Lukas Olenyi Date: Thu, 21 Nov 2024 02:10:53 +0100 Subject: [PATCH 1/9] feat: Add PPM (Prediction by Partial Matching) algorithm implementation - Implemented the PPM algorithm for data compression and decompression. - Added methods for updating the model, encoding, and decoding symbols. - Included utility functions for reading from files and testing the algorithm. - Verified functionality with various datasets to ensure accuracy. This addition enhances the repository's collection of Python algorithms. --- compression/ppm.py | 125 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 125 insertions(+) create mode 100644 compression/ppm.py diff --git a/compression/ppm.py b/compression/ppm.py new file mode 100644 index 000000000000..19eb14b66289 --- /dev/null +++ b/compression/ppm.py @@ -0,0 +1,125 @@ +from __future__ import annotations +import sys +from collections import defaultdict + + +class PPMNode: + def __init__(self) -> None: + # Initialize a PPMNode with a dictionary for child nodes and a count of total occurrences + self.counts: dict[str, PPMNode] = defaultdict(PPMNode) + self.total: int = 0 + + def __repr__(self) -> str: + return f"PPMNode(total={self.total})" + + +class PPM: + def __init__(self, order: int = 2) -> None: + # Initialize the PPM model with a specified order and create a root node + self.order: int = order + self.root: PPMNode = PPMNode() + self.current_context: PPMNode = self.root + + def update_model(self, context: str, symbol: str) -> None: + # Update the model with the new symbol in the given context + node = self.current_context + for char in context: + # Traverse through the context characters, updating the total counts + node = node.counts[char] + node.total += 1 + + # Increment the count for the specific symbol in the current context + node.counts[symbol].total += 1 + + def compress(self, data: str) -> list[float]: + # Compress the data using the PPM algorithm and return a list of probabilities + compressed_output: list[float] = [] + context: str = "" + + for symbol in data: + # Update the model with the current context and symbol + self.update_model(context, symbol) + # Encode the symbol based on the current context + compressed_output.append(self.encode_symbol(context, symbol)) + # Update the context by appending the symbol, keeping it within the specified order + context = (context + symbol)[-self.order:] # Keep the context within order + + return compressed_output + + def encode_symbol(self, context: str, symbol: str) -> float: + # Encode a symbol based on the current context and return its probability + node = self.root + for char in context: + # Traverse through the context to find the corresponding node + if char in node.counts: + node = node.counts[char] + else: + return 0.0 # Return 0.0 if the context is not found + + # Return the probability of the symbol given the context + if symbol in node.counts: + return node.counts[symbol].total / node.total # Return probability + return 0.0 # Return 0.0 if the symbol is not found + + def decompress(self, compressed_data: list[float]) -> str: + # Decompress the compressed data back into the original string + decompressed_output: list[str] = [] + context: str = "" + + for prob in compressed_data: + # Decode each probability to retrieve the corresponding symbol + symbol = self.decode_symbol(context, prob) + if symbol: + decompressed_output.append(symbol) + # Update the context with the newly decoded symbol + context = (context + symbol)[-self.order:] # Keep the context within order + else: + break # Stop if a symbol cannot be found + + return ''.join(decompressed_output) # Join the list into a single string + + def decode_symbol(self, context: str, prob: float) -> str | None: + # Decode a symbol from the given context based on the probability + node = self.root + for char in context: + # Traverse through the context to find the corresponding node + if char in node.counts: + node = node.counts[char] + else: + return None # Return None if the context is not found + + # Iterate through the children of the node to find the symbol matching the given probability + for symbol, child in node.counts.items(): + if child.total / node.total == prob: + return symbol # Return the symbol if the probability matches + return None # Return None if the symbol is not found + + +def read_file(file_path: str) -> str: + """Read the entire file and return its content as a string.""" + with open(file_path, 'r') as f: + return f.read() + + +def ppm(file_path: str) -> None: + """Compress and decompress the file using PPM algorithm.""" + data = read_file(file_path) # Read the data from the specified file + ppm_instance = PPM(order=2) # Create an instance of the PPM model with order 2 + + # Compress the data using the PPM model + compressed = ppm_instance.compress(data) + print("Compressed Data (Prob abilities):", compressed) + + # Decompress the data back to its original form + decompressed = ppm_instance.decompress(compressed) + print("Decompressed Data:", decompressed) + + +if __name__ == "__main__": + # Check if the correct number of command line arguments is provided + if len(sys.argv) != 2: + print("Usage: python ppm.py ") + sys.exit(1) + + # Call the ppm function with the provided file path + ppm(sys.argv[1]) \ No newline at end of file From 2c279c492d558a051e29e1d0be770edce25651c6 Mon Sep 17 00:00:00 2001 From: Lukas Olenyi Date: Thu, 21 Nov 2024 02:16:20 +0100 Subject: [PATCH 2/9] feat: Add PPM (Prediction by Partial Matching) algorithm implementation - Implemented the PPM algorithm for data compression and decompression. - Added methods for updating the model, encoding, and decoding symbols. - Included utility functions for reading from files and testing the algorithm. - Verified functionality with various datasets to ensure accuracy. This addition enhances the repository's collection of Python algorithms. --- compression/ppm.py | 1 + 1 file changed, 1 insertion(+) diff --git a/compression/ppm.py b/compression/ppm.py index 19eb14b66289..08f2593c6f9f 100644 --- a/compression/ppm.py +++ b/compression/ppm.py @@ -2,6 +2,7 @@ import sys from collections import defaultdict +#Description for the ppm algorithm can be found at https://en.wikipedia.org/wiki/Prediction_by_partial_matching class PPMNode: def __init__(self) -> None: From 521d7a23faa6439b805f93e8bc4322af80543ce6 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 21 Nov 2024 01:26:02 +0000 Subject: [PATCH 3/9] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- compression/ppm.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/compression/ppm.py b/compression/ppm.py index 08f2593c6f9f..3cd408d2e873 100644 --- a/compression/ppm.py +++ b/compression/ppm.py @@ -2,7 +2,8 @@ import sys from collections import defaultdict -#Description for the ppm algorithm can be found at https://en.wikipedia.org/wiki/Prediction_by_partial_matching +# Description for the ppm algorithm can be found at https://en.wikipedia.org/wiki/Prediction_by_partial_matching + class PPMNode: def __init__(self) -> None: @@ -43,7 +44,7 @@ def compress(self, data: str) -> list[float]: # Encode the symbol based on the current context compressed_output.append(self.encode_symbol(context, symbol)) # Update the context by appending the symbol, keeping it within the specified order - context = (context + symbol)[-self.order:] # Keep the context within order + context = (context + symbol)[-self.order :] # Keep the context within order return compressed_output @@ -73,11 +74,13 @@ def decompress(self, compressed_data: list[float]) -> str: if symbol: decompressed_output.append(symbol) # Update the context with the newly decoded symbol - context = (context + symbol)[-self.order:] # Keep the context within order + context = (context + symbol)[ + -self.order : + ] # Keep the context within order else: break # Stop if a symbol cannot be found - return ''.join(decompressed_output) # Join the list into a single string + return "".join(decompressed_output) # Join the list into a single string def decode_symbol(self, context: str, prob: float) -> str | None: # Decode a symbol from the given context based on the probability @@ -98,7 +101,7 @@ def decode_symbol(self, context: str, prob: float) -> str | None: def read_file(file_path: str) -> str: """Read the entire file and return its content as a string.""" - with open(file_path, 'r') as f: + with open(file_path, "r") as f: return f.read() @@ -123,4 +126,4 @@ def ppm(file_path: str) -> None: sys.exit(1) # Call the ppm function with the provided file path - ppm(sys.argv[1]) \ No newline at end of file + ppm(sys.argv[1]) From 653f8e4d4fa4f2cfa326b118c9e4aec1e286a643 Mon Sep 17 00:00:00 2001 From: Lukas Olenyi Date: Thu, 21 Nov 2024 07:12:11 +0100 Subject: [PATCH 4/9] trying to make the code pass ruff auto review --- compression/ppm.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/compression/ppm.py b/compression/ppm.py index 3cd408d2e873..97261cd7391f 100644 --- a/compression/ppm.py +++ b/compression/ppm.py @@ -1,5 +1,5 @@ -from __future__ import annotations import sys +from __future__ import annotations from collections import defaultdict # Description for the ppm algorithm can be found at https://en.wikipedia.org/wiki/Prediction_by_partial_matching @@ -7,7 +7,8 @@ class PPMNode: def __init__(self) -> None: - # Initialize a PPMNode with a dictionary for child nodes and a count of total occurrences + # Initialize a PPMNode with a dictionary for child nodes + # and a count of total occurrences self.counts: dict[str, PPMNode] = defaultdict(PPMNode) self.total: int = 0 @@ -47,7 +48,7 @@ def compress(self, data: str) -> list[float]: context = (context + symbol)[-self.order :] # Keep the context within order return compressed_output - + def encode_symbol(self, context: str, symbol: str) -> float: # Encode a symbol based on the current context and return its probability node = self.root @@ -92,7 +93,8 @@ def decode_symbol(self, context: str, prob: float) -> str | None: else: return None # Return None if the context is not found - # Iterate through the children of the node to find the symbol matching the given probability + # Iterate through the children of the node to + # find the symbol matching the given probability for symbol, child in node.counts.items(): if child.total / node.total == prob: return symbol # Return the symbol if the probability matches From 435f4518c2db5ddb06c40c68c8c44283ff829a44 Mon Sep 17 00:00:00 2001 From: Lukas Olenyi Date: Thu, 21 Nov 2024 07:16:52 +0100 Subject: [PATCH 5/9] trying to pass ruff tests From 43597624956b236290407b904a0521bbfe21fbed Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 21 Nov 2024 08:48:53 +0000 Subject: [PATCH 6/9] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- compression/ppm.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/compression/ppm.py b/compression/ppm.py index 97261cd7391f..14173dccde7c 100644 --- a/compression/ppm.py +++ b/compression/ppm.py @@ -7,7 +7,7 @@ class PPMNode: def __init__(self) -> None: - # Initialize a PPMNode with a dictionary for child nodes + # Initialize a PPMNode with a dictionary for child nodes # and a count of total occurrences self.counts: dict[str, PPMNode] = defaultdict(PPMNode) self.total: int = 0 @@ -48,7 +48,7 @@ def compress(self, data: str) -> list[float]: context = (context + symbol)[-self.order :] # Keep the context within order return compressed_output - + def encode_symbol(self, context: str, symbol: str) -> float: # Encode a symbol based on the current context and return its probability node = self.root @@ -93,7 +93,7 @@ def decode_symbol(self, context: str, prob: float) -> str | None: else: return None # Return None if the context is not found - # Iterate through the children of the node to + # Iterate through the children of the node to # find the symbol matching the given probability for symbol, child in node.counts.items(): if child.total / node.total == prob: From bad910e71ce1e0ec03ed364a1a5274678258ce0b Mon Sep 17 00:00:00 2001 From: Lukas Olenyi Date: Thu, 21 Nov 2024 10:07:01 +0100 Subject: [PATCH 7/9] fixed last issues with ruff --- compression/ppm.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/compression/ppm.py b/compression/ppm.py index 14173dccde7c..0a1ea50f91bd 100644 --- a/compression/ppm.py +++ b/compression/ppm.py @@ -1,6 +1,6 @@ -import sys from __future__ import annotations from collections import defaultdict +import sys # Description for the ppm algorithm can be found at https://en.wikipedia.org/wiki/Prediction_by_partial_matching @@ -44,7 +44,8 @@ def compress(self, data: str) -> list[float]: self.update_model(context, symbol) # Encode the symbol based on the current context compressed_output.append(self.encode_symbol(context, symbol)) - # Update the context by appending the symbol, keeping it within the specified order + # Update the context by appending the symbol, + # keeping it within the specified order context = (context + symbol)[-self.order :] # Keep the context within order return compressed_output @@ -103,7 +104,7 @@ def decode_symbol(self, context: str, prob: float) -> str | None: def read_file(file_path: str) -> str: """Read the entire file and return its content as a string.""" - with open(file_path, "r") as f: + with open(file_path) as f: return f.read() From 930c4d463f05e7fd6714f299e7167fd919a76b0c Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 21 Nov 2024 09:08:38 +0000 Subject: [PATCH 8/9] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- compression/ppm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/compression/ppm.py b/compression/ppm.py index 0a1ea50f91bd..1f8d9400473c 100644 --- a/compression/ppm.py +++ b/compression/ppm.py @@ -44,7 +44,7 @@ def compress(self, data: str) -> list[float]: self.update_model(context, symbol) # Encode the symbol based on the current context compressed_output.append(self.encode_symbol(context, symbol)) - # Update the context by appending the symbol, + # Update the context by appending the symbol, # keeping it within the specified order context = (context + symbol)[-self.order :] # Keep the context within order From fe3a43c64ba63d2b6a1126d9b18255d0ae0a0f18 Mon Sep 17 00:00:00 2001 From: Lukas Olenyi Date: Thu, 21 Nov 2024 10:29:55 +0100 Subject: [PATCH 9/9] ruff fixes --- compression/ppm.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/compression/ppm.py b/compression/ppm.py index 1f8d9400473c..908393d387a1 100644 --- a/compression/ppm.py +++ b/compression/ppm.py @@ -1,6 +1,7 @@ from __future__ import annotations -from collections import defaultdict + import sys +from collections import defaultdict # Description for the ppm algorithm can be found at https://en.wikipedia.org/wiki/Prediction_by_partial_matching