1
+ from __future__ import annotations
2
+ import sys
3
+ from collections import defaultdict
4
+
5
+
6
+ class PPMNode :
7
+ def __init__ (self ) -> None :
8
+ # Initialize a PPMNode with a dictionary for child nodes and a count of total occurrences
9
+ self .counts : dict [str , PPMNode ] = defaultdict (PPMNode )
10
+ self .total : int = 0
11
+
12
+ def __repr__ (self ) -> str :
13
+ return f"PPMNode(total={ self .total } )"
14
+
15
+
16
+ class PPM :
17
+ def __init__ (self , order : int = 2 ) -> None :
18
+ # Initialize the PPM model with a specified order and create a root node
19
+ self .order : int = order
20
+ self .root : PPMNode = PPMNode ()
21
+ self .current_context : PPMNode = self .root
22
+
23
+ def update_model (self , context : str , symbol : str ) -> None :
24
+ # Update the model with the new symbol in the given context
25
+ node = self .current_context
26
+ for char in context :
27
+ # Traverse through the context characters, updating the total counts
28
+ node = node .counts [char ]
29
+ node .total += 1
30
+
31
+ # Increment the count for the specific symbol in the current context
32
+ node .counts [symbol ].total += 1
33
+
34
+ def compress (self , data : str ) -> list [float ]:
35
+ # Compress the data using the PPM algorithm and return a list of probabilities
36
+ compressed_output : list [float ] = []
37
+ context : str = ""
38
+
39
+ for symbol in data :
40
+ # Update the model with the current context and symbol
41
+ self .update_model (context , symbol )
42
+ # Encode the symbol based on the current context
43
+ compressed_output .append (self .encode_symbol (context , symbol ))
44
+ # Update the context by appending the symbol, keeping it within the specified order
45
+ context = (context + symbol )[- self .order :] # Keep the context within order
46
+
47
+ return compressed_output
48
+
49
+ def encode_symbol (self , context : str , symbol : str ) -> float :
50
+ # Encode a symbol based on the current context and return its probability
51
+ node = self .root
52
+ for char in context :
53
+ # Traverse through the context to find the corresponding node
54
+ if char in node .counts :
55
+ node = node .counts [char ]
56
+ else :
57
+ return 0.0 # Return 0.0 if the context is not found
58
+
59
+ # Return the probability of the symbol given the context
60
+ if symbol in node .counts :
61
+ return node .counts [symbol ].total / node .total # Return probability
62
+ return 0.0 # Return 0.0 if the symbol is not found
63
+
64
+ def decompress (self , compressed_data : list [float ]) -> str :
65
+ # Decompress the compressed data back into the original string
66
+ decompressed_output : list [str ] = []
67
+ context : str = ""
68
+
69
+ for prob in compressed_data :
70
+ # Decode each probability to retrieve the corresponding symbol
71
+ symbol = self .decode_symbol (context , prob )
72
+ if symbol :
73
+ decompressed_output .append (symbol )
74
+ # Update the context with the newly decoded symbol
75
+ context = (context + symbol )[- self .order :] # Keep the context within order
76
+ else :
77
+ break # Stop if a symbol cannot be found
78
+
79
+ return '' .join (decompressed_output ) # Join the list into a single string
80
+
81
+ def decode_symbol (self , context : str , prob : float ) -> str | None :
82
+ # Decode a symbol from the given context based on the probability
83
+ node = self .root
84
+ for char in context :
85
+ # Traverse through the context to find the corresponding node
86
+ if char in node .counts :
87
+ node = node .counts [char ]
88
+ else :
89
+ return None # Return None if the context is not found
90
+
91
+ # Iterate through the children of the node to find the symbol matching the given probability
92
+ for symbol , child in node .counts .items ():
93
+ if child .total / node .total == prob :
94
+ return symbol # Return the symbol if the probability matches
95
+ return None # Return None if the symbol is not found
96
+
97
+
98
+ def read_file (file_path : str ) -> str :
99
+ """Read the entire file and return its content as a string."""
100
+ with open (file_path , 'r' ) as f :
101
+ return f .read ()
102
+
103
+
104
+ def ppm (file_path : str ) -> None :
105
+ """Compress and decompress the file using PPM algorithm."""
106
+ data = read_file (file_path ) # Read the data from the specified file
107
+ ppm_instance = PPM (order = 2 ) # Create an instance of the PPM model with order 2
108
+
109
+ # Compress the data using the PPM model
110
+ compressed = ppm_instance .compress (data )
111
+ print ("Compressed Data (Prob abilities):" , compressed )
112
+
113
+ # Decompress the data back to its original form
114
+ decompressed = ppm_instance .decompress (compressed )
115
+ print ("Decompressed Data:" , decompressed )
116
+
117
+
118
+ if __name__ == "__main__" :
119
+ # Check if the correct number of command line arguments is provided
120
+ if len (sys .argv ) != 2 :
121
+ print ("Usage: python ppm.py <file_path>" )
122
+ sys .exit (1 )
123
+
124
+ # Call the ppm function with the provided file path
125
+ ppm (sys .argv [1 ])
0 commit comments