|
8 | 8 | from __future__ import annotations
|
9 | 9 |
|
10 | 10 | import os
|
| 11 | +from typing import IO |
11 | 12 |
|
12 | 13 | from . import Image, ImageFile
|
13 | 14 | from ._binary import i32be as i32
|
| 15 | +from ._binary import o8 |
| 16 | +from ._binary import o32be as o32 |
14 | 17 |
|
15 | 18 |
|
16 | 19 | def _accept(prefix: bytes) -> bool:
|
@@ -110,6 +113,122 @@ def decode(self, buffer: bytes | Image.SupportsArrayInterface) -> tuple[int, int
|
110 | 113 | return -1, 0
|
111 | 114 |
|
112 | 115 |
|
| 116 | +def _save(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None: |
| 117 | + if im.mode == "RGB": |
| 118 | + channels = 3 |
| 119 | + elif im.mode == "RGBA": |
| 120 | + channels = 4 |
| 121 | + else: |
| 122 | + msg = "Unsupported QOI image mode" |
| 123 | + raise ValueError(msg) |
| 124 | + |
| 125 | + colorspace = 0 if im.encoderinfo.get("colorspace") == "sRGB" else 1 |
| 126 | + |
| 127 | + fp.write(b"qoif") |
| 128 | + fp.write(o32(im.size[0])) |
| 129 | + fp.write(o32(im.size[1])) |
| 130 | + fp.write(o8(channels)) |
| 131 | + fp.write(o8(colorspace)) |
| 132 | + |
| 133 | + ImageFile._save(im, fp, [ImageFile._Tile("qoi", (0, 0) + im.size)]) |
| 134 | + |
| 135 | + |
| 136 | +class QoiEncoder(ImageFile.PyEncoder): |
| 137 | + _pushes_fd = True |
| 138 | + _previous_pixel: tuple[int, int, int, int] | None = None |
| 139 | + _previously_seen_pixels: dict[int, tuple[int, int, int, int]] = {} |
| 140 | + _run = 0 |
| 141 | + |
| 142 | + def _write_run(self) -> bytes: |
| 143 | + data = o8(0b11000000 | (self._run - 1)) # QOI_OP_RUN |
| 144 | + self._run = 0 |
| 145 | + return data |
| 146 | + |
| 147 | + def _delta(self, left: int, right: int) -> int: |
| 148 | + result = (left - right) & 255 |
| 149 | + if result >= 128: |
| 150 | + result -= 256 |
| 151 | + return result |
| 152 | + |
| 153 | + def encode(self, bufsize: int) -> tuple[int, int, bytes]: |
| 154 | + assert self.im is not None |
| 155 | + |
| 156 | + self._previously_seen_pixels = {0: (0, 0, 0, 0)} |
| 157 | + self._previous_pixel = (0, 0, 0, 255) |
| 158 | + |
| 159 | + data = bytearray() |
| 160 | + w, h = self.im.size |
| 161 | + bands = Image.getmodebands(self.mode) |
| 162 | + |
| 163 | + for y in range(h): |
| 164 | + for x in range(w): |
| 165 | + pixel = self.im.getpixel((x, y)) |
| 166 | + if bands == 3: |
| 167 | + pixel = (*pixel, 255) |
| 168 | + |
| 169 | + if pixel == self._previous_pixel: |
| 170 | + self._run += 1 |
| 171 | + if self._run == 62: |
| 172 | + data += self._write_run() |
| 173 | + else: |
| 174 | + if self._run: |
| 175 | + data += self._write_run() |
| 176 | + |
| 177 | + r, g, b, a = pixel |
| 178 | + hash_value = (r * 3 + g * 5 + b * 7 + a * 11) % 64 |
| 179 | + if self._previously_seen_pixels.get(hash_value) == pixel: |
| 180 | + data += o8(hash_value) # QOI_OP_INDEX |
| 181 | + elif self._previous_pixel: |
| 182 | + self._previously_seen_pixels[hash_value] = pixel |
| 183 | + |
| 184 | + prev_r, prev_g, prev_b, prev_a = self._previous_pixel |
| 185 | + if prev_a == a: |
| 186 | + delta_r = self._delta(r, prev_r) |
| 187 | + delta_g = self._delta(g, prev_g) |
| 188 | + delta_b = self._delta(b, prev_b) |
| 189 | + |
| 190 | + if ( |
| 191 | + -2 <= delta_r < 2 |
| 192 | + and -2 <= delta_g < 2 |
| 193 | + and -2 <= delta_b < 2 |
| 194 | + ): |
| 195 | + data += o8( |
| 196 | + 0b01000000 |
| 197 | + | (delta_r + 2) << 4 |
| 198 | + | (delta_g + 2) << 2 |
| 199 | + | (delta_b + 2) |
| 200 | + ) # QOI_OP_DIFF |
| 201 | + else: |
| 202 | + delta_gr = self._delta(delta_r, delta_g) |
| 203 | + delta_gb = self._delta(delta_b, delta_g) |
| 204 | + if ( |
| 205 | + -8 <= delta_gr < 8 |
| 206 | + and -32 <= delta_g < 32 |
| 207 | + and -8 <= delta_gb < 8 |
| 208 | + ): |
| 209 | + data += o8( |
| 210 | + 0b10000000 | (delta_g + 32) |
| 211 | + ) # QOI_OP_LUMA |
| 212 | + data += o8((delta_gr + 8) << 4 | (delta_gb + 8)) |
| 213 | + else: |
| 214 | + data += o8(0b11111110) # QOI_OP_RGB |
| 215 | + data += bytes(pixel[:3]) |
| 216 | + else: |
| 217 | + data += o8(0b11111111) # QOI_OP_RGBA |
| 218 | + data += bytes(pixel) |
| 219 | + |
| 220 | + self._previous_pixel = pixel |
| 221 | + |
| 222 | + if self._run: |
| 223 | + data += self._write_run() |
| 224 | + data += bytes((0, 0, 0, 0, 0, 0, 0, 1)) # padding |
| 225 | + |
| 226 | + return len(data), 0, data |
| 227 | + |
| 228 | + |
113 | 229 | Image.register_open(QoiImageFile.format, QoiImageFile, _accept)
|
114 | 230 | Image.register_decoder("qoi", QoiDecoder)
|
115 | 231 | Image.register_extension(QoiImageFile.format, ".qoi")
|
| 232 | + |
| 233 | +Image.register_save(QoiImageFile.format, _save) |
| 234 | +Image.register_encoder("qoi", QoiEncoder) |
0 commit comments