|
4 | 4 | import io |
5 | 5 | import logging |
6 | 6 | import re |
| 7 | +from itertools import count |
7 | 8 | from rtfparse import re_patterns |
8 | 9 | from rtfparse import utils |
| 10 | +from rtfparse import errors |
9 | 11 | from rtfparse.enums import Bytestring_Type |
10 | 12 |
|
11 | 13 |
|
|
14 | 16 |
|
15 | 17 |
|
16 | 18 | # Constants, number of bytes to read when creating entities |
17 | | -CHARACTER = BACKSLASH = len(b"\\") |
18 | | -IGNORABLE = BACKSLASH + len(rb"*") |
19 | | -GROUP_START = len(rb"x") + IGNORABLE # x = "}" cannot have a rogue brace for vim's auto-indent's sake |
20 | | -DELIMITER = len(rb" ") |
| 19 | +CHARACTER = BACKSLASH = DELIMITER = MINUS = GROUP_END = len(b"\\") |
| 20 | +SYMBOL = IGNORABLE = BACKSLASH + CHARACTER |
| 21 | +GROUP_START = BACKSLASH + IGNORABLE |
21 | 22 | MAX_CW_LETTERS = 32 |
22 | 23 | INTEGER_MAGNITUDE = 32 |
23 | | -CONTROL_WORD = BACKSLASH + MAX_CW_LETTERS + len(rb"-") + len(str((1 << INTEGER_MAGNITUDE) // 2)) + DELIMITER |
| 24 | +PLAIN_TEXT = CONTROL_WORD = BACKSLASH + MAX_CW_LETTERS + MINUS + len(str((1 << INTEGER_MAGNITUDE) // 2)) + DELIMITER |
24 | 25 |
|
25 | 26 |
|
26 | 27 | class Entity: |
27 | 28 | @classmethod |
28 | 29 | def probe(cls, pattern: re_patterns.Bytes_Regex, file: io.BufferedReader) -> Bytestring_Type: |
29 | 30 | logger.debug(f"in Entity.probed") |
30 | 31 | original_position = file.tell() |
31 | | - probed = file.read(len(re_patterns.probe_pattern)) |
32 | | - logger.debug(f"{probed = }") |
33 | | - file.seek(original_position - len(probed)) |
34 | | - if (match := re_patterns.group_start.match(probed)): |
35 | | - result = Bytestring_Type.GROUP_START |
36 | | - elif (match := re_patterns.group_end.match(probed)): |
37 | | - result = Bytestring_Type.GROUP_END |
38 | | - elif (match := re_patterns.control_word.match(probed)): |
39 | | - result = Bytestring_Type.CONTROL_WORD |
40 | | - elif (match := re_patterns.control_symbol.match(probed)): |
41 | | - result = Bytestring_Type.CONTROL_SYMBOL |
42 | | - else: |
43 | | - result = Bytestring_Type.PLAIN_TEXT |
| 32 | + while True: |
| 33 | + probed = file.read(len(re_patterns.probe_pattern)) |
| 34 | + logger.debug(f"{probed = }") |
| 35 | + file.seek(original_position) |
| 36 | + logger.debug(f"Probe returned to position {file.tell()}") |
| 37 | + if (match := re_patterns.group_start.match(probed)): |
| 38 | + result = Bytestring_Type.GROUP_START |
| 39 | + elif (match := re_patterns.group_end.match(probed)): |
| 40 | + result = Bytestring_Type.GROUP_END |
| 41 | + elif (match := re_patterns.control_word.match(probed)): |
| 42 | + result = Bytestring_Type.CONTROL_WORD |
| 43 | + elif (match := re_patterns.control_symbol.match(probed)): |
| 44 | + result = Bytestring_Type.CONTROL_SYMBOL |
| 45 | + elif (match := re_patterns.plain_text.match(probed)): |
| 46 | + result = Bytestring_Type.PLAIN_TEXT |
| 47 | + else: |
| 48 | + logger.debug(f"This does not match anything, it's probably a newline, moving on") |
| 49 | + original_position += 1 |
| 50 | + file.seek(original_position) |
| 51 | + logger.debug(f"Probe moved to position {file.tell()}") |
| 52 | + if not probed: |
| 53 | + logger.warning(f"Reached unexpected end of file.") |
| 54 | + raise errors.UnexpectedEndOfFileError(f"at position {file.tell()}") |
| 55 | + continue |
| 56 | + break |
44 | 57 | logger.debug(f"{result = }") |
| 58 | + logger.debug(f"Probe leaving file at position {file.tell()}") |
45 | 59 | return result |
46 | 60 |
|
47 | 61 |
|
48 | 62 | class Control_Word(Entity): |
49 | 63 | def __init__(self, file: io.BufferedReader) -> None: |
50 | 64 | logger.debug(f"Control_Word.__init__") |
| 65 | + self.control_name = "missing" |
| 66 | + self.parameter = "" |
51 | 67 | self.start_position = file.tell() |
52 | 68 | logger.debug(f"Starting at file position {self.start_position}") |
53 | 69 | probe = file.read(CONTROL_WORD) |
54 | 70 | if (match := re_patterns.control_word.match(probe)): |
55 | | - self.control_name = match.group("control_name") |
| 71 | + self.control_name = match.group("control_name").decode("ascii") |
56 | 72 | logger.debug(f"{self.control_name = }") |
57 | | - self.parameter = match.group("parameter") |
58 | | - file.seek(self.start_position + match.span()[1]) |
| 73 | + parameter = match.group("parameter") |
| 74 | + if parameter: |
| 75 | + self.parameter = int(parameter.decode("ascii")) |
| 76 | + logger.debug(f"{self.parameter = }") |
| 77 | + target_position = self.start_position + match.span()[1] |
| 78 | + if match.group("other"): |
| 79 | + logger.debug(f"Delimiter is {match.group('other').decode('ascii')}, len: {len(match.group('delimiter'))}") |
| 80 | + target_position -= len(match.group("delimiter")) |
| 81 | + file.seek(target_position) |
| 82 | + else: |
| 83 | + logger.warning(f"Missing Control Word") |
| 84 | + file.seek(self.start_position) |
| 85 | + def __repr__(self) -> str: |
| 86 | + name = self.control_name |
| 87 | + return f"<{self.__class__.__name__}: {name}{self.parameter}>" |
| 88 | + |
| 89 | + |
| 90 | +class Control_Symbol(Entity): |
| 91 | + def __init__(self, file: io.BufferedReader) -> None: |
| 92 | + self.start_position = file.tell() |
| 93 | + logger.debug(f"Starting at file position {self.start_position}") |
| 94 | + self.text = file.read(SYMBOL)[-1].decode("ascii") |
| 95 | + def __repr__(self) -> str: |
| 96 | + return f"<{self.__class__.__name__}: {self.text}>" |
| 97 | + |
| 98 | + |
| 99 | +class Plain_Text(Entity): |
| 100 | + def __init__(self, file: io.BufferedReader) -> None: |
| 101 | + self.start_position = file.tell() |
| 102 | + logger.debug(f"Starting at file position {self.start_position}") |
| 103 | + self.text = "" |
| 104 | + while True: |
| 105 | + read = file.read(PLAIN_TEXT) |
| 106 | + logger.debug(f"Read file up to position {file.tell()}") |
| 107 | + logger.debug(f"Read: {read}") |
| 108 | + # see if we have read all the plain text there is: |
| 109 | + if (match := re_patterns.plain_text.match(read)): |
| 110 | + logger.debug(f"This matches the plain text pattern") |
| 111 | + _text = match.group("text").decode("ascii") |
| 112 | + logger.debug(f"{_text = }") |
| 113 | + self.text = "".join((self.text, _text)) |
| 114 | + logger.debug(f"{self.text = }") |
| 115 | + if len(_text) == PLAIN_TEXT: |
| 116 | + continue |
| 117 | + else: |
| 118 | + file.seek(self.start_position + len(self.text)) |
| 119 | + logger.debug(f"Returned to position {file.tell()}") |
| 120 | + break |
| 121 | + else: |
| 122 | + break |
| 123 | + def __repr__(self) -> str: |
| 124 | + return f"<{self.__class__.__name__}: {self.text}>" |
59 | 125 |
|
60 | 126 |
|
61 | 127 | class Destination_Group(Entity): |
62 | 128 | def __init__(self, file: io.BufferedReader) -> None: |
63 | 129 | logger.debug(f"Destination_Group.__init__") |
64 | | - logger.debug(f"Creating destination group from {file.name}") |
65 | 130 | self.known = False |
66 | 131 | self.name = "unknown" |
67 | 132 | self.ignorable = False |
| 133 | + self.structure = list() |
| 134 | + logger.debug(f"Creating destination group from {file.name}") |
68 | 135 | self.start_position = file.tell() |
69 | 136 | logger.debug(f"Starting at file position {self.start_position}") |
70 | 137 | probe = file.read(GROUP_START) |
71 | | - logger.debug(f"Read file up to position {file.tell()}") |
| 138 | + logger.debug(f"Read file up to position {file.tell()}, read {probe = }") |
72 | 139 | if (match := re_patterns.group_start.match(probe)): |
73 | 140 | self.known = bool(match.group("group_start")) |
74 | 141 | self.ignorable = bool(match.group("ignorable")) |
75 | 142 | if not self.ignorable: |
76 | 143 | file.seek(-IGNORABLE, io.SEEK_CUR) |
77 | 144 | logger.debug(f"Returned to position {file.tell()}") |
78 | | - self.cw = Control_Word(file) |
79 | | - self.name = self.cw.control_name |
80 | 145 | else: |
81 | | - logger.warning(utils.warn(f"Expected group has no start. Creating unknown group")) |
82 | | - probed = self.probe(re_patterns.probe, file) |
83 | | - self.content = list() |
| 146 | + logger.warning(utils.warn(f"Expected a group but found no group start. Creating unknown group")) |
| 147 | + file.seek(self.start_position) |
| 148 | + self.cw = Control_Word(file) |
| 149 | + self.name = self.cw.control_name |
| 150 | + while True: |
| 151 | + probed = self.probe(re_patterns.probe, file) |
| 152 | + if probed is Bytestring_Type.CONTROL_WORD: |
| 153 | + self.structure.append(Control_Word(file)) |
| 154 | + elif probed is Bytestring_Type.GROUP_END: |
| 155 | + file.read(GROUP_END) |
| 156 | + break |
| 157 | + elif probed is Bytestring_Type.GROUP_START: |
| 158 | + self.structure.append(Destination_Group(file)) |
| 159 | + elif probed is Bytestring_Type.CONTROL_SYMBOL: |
| 160 | + self.structure.append(Control_Symbol(file)) |
| 161 | + else: |
| 162 | + self.structure.append(Plain_Text(file)) |
| 163 | + def __repr__(self) -> str: |
| 164 | + return f"<{self.__class__.__name__}: {self.cw.control_name}{self.cw.parameter}>" |
84 | 165 |
|
85 | 166 |
|
86 | 167 | if __name__ == "__main__": |
|
0 commit comments