|
1 |
| -from __future__ import annotations |
| 1 | +from __future__ import annotations |
2 | 2 |
|
3 | 3 | import struct
|
4 | 4 | from copy import copy
|
5 | 5 | from io import BytesIO
|
| 6 | +from threading import Lock |
6 | 7 | from typing import TYPE_CHECKING, Dict, Tuple, Union
|
7 | 8 |
|
8 | 9 | import astc_encoder
|
@@ -74,22 +75,20 @@ def image_to_texture2d(
|
74 | 75 | block_size = tuple(
|
75 | 76 | map(int, target_texture_format.name.rsplit("_", 1)[1].split("x"))
|
76 | 77 | )
|
77 |
| - |
78 |
| - config = astc_encoder.ASTCConfig( |
79 |
| - astc_encoder.ASTCProfile.LDR, *block_size, block_z=1, quality=100 |
80 |
| - ) |
81 |
| - context = astc_encoder.ASTCContext(config) |
82 |
| - raw_img = astc_encoder.ASTCImage( |
83 |
| - astc_encoder.ASTCType.U8, img.width, img.height, 1, raw_img |
84 |
| - ) |
85 |
| - if img.mode == "RGB": |
86 |
| - tex_format = getattr(TF, f"ASTC_RGB_{block_size[0]}x{block_size[1]}") |
87 |
| - else: |
88 |
| - tex_format = getattr(TF, f"ASTC_RGBA_{block_size[0]}x{block_size[1]}") |
89 |
| - |
90 |
| - swizzle = astc_encoder.ASTCSwizzle.from_str("RGBA") |
91 |
| - enc_img = context.compress(raw_img, swizzle) |
92 |
| - tex_format = target_texture_format |
| 78 | + context, lock = get_astc_context(block_size) |
| 79 | + |
| 80 | + with lock: |
| 81 | + raw_img = astc_encoder.ASTCImage( |
| 82 | + astc_encoder.ASTCType.U8, img.width, img.height, 1, raw_img |
| 83 | + ) |
| 84 | + if img.mode == "RGB": |
| 85 | + tex_format = getattr(TF, f"ASTC_RGB_{block_size[0]}x{block_size[1]}") |
| 86 | + else: |
| 87 | + tex_format = getattr(TF, f"ASTC_RGBA_{block_size[0]}x{block_size[1]}") |
| 88 | + |
| 89 | + swizzle = astc_encoder.ASTCSwizzle.from_str("RGBA") |
| 90 | + enc_img = context.compress(raw_img, swizzle) |
| 91 | + tex_format = target_texture_format |
93 | 92 | # A
|
94 | 93 | elif target_texture_format == TF.Alpha8:
|
95 | 94 | enc_img = img.tobytes("raw", "A")
|
@@ -252,30 +251,38 @@ def atc(image_data: bytes, width: int, height: int, alpha: bool) -> Image.Image:
|
252 | 251 | return Image.frombytes("RGBA", (width, height), image_data, "raw", "BGRA")
|
253 | 252 |
|
254 | 253 |
|
255 |
| -ASTC_CONTEXTS: Dict[Tuple[int, int], astc_encoder.ASTCContext] = {} |
| 254 | +def astc(image_data: bytes, width: int, height: int, block_size: tuple) -> Image.Image: |
| 255 | + context, lock = get_astc_context(block_size) |
| 256 | + |
| 257 | + with lock: |
| 258 | + image = astc_encoder.ASTCImage(astc_encoder.ASTCType.U8, width, height, 1) |
| 259 | + texture_size = calculate_astc_compressed_size(width, height, block_size) |
| 260 | + if len(image_data) < texture_size: |
| 261 | + raise ValueError(f"Invalid ASTC data size: {len(image_data)} < {texture_size}") |
| 262 | + context.decompress( |
| 263 | + image_data[:texture_size], image, astc_encoder.ASTCSwizzle.from_str("RGBA") |
| 264 | + ) |
| 265 | + |
| 266 | + return Image.frombytes("RGBA", (width, height), image.data, "raw", "RGBA") |
| 267 | + |
256 | 268 |
|
| 269 | +ASTC_CONTEXTS: Dict[Tuple[int, int], Tuple[astc_encoder.ASTCContext, Lock]] = {} |
257 | 270 |
|
258 |
| -def astc(image_data: bytes, width: int, height: int, block_size: tuple) -> Image.Image: |
259 |
| - context = ASTC_CONTEXTS.get(block_size) |
260 |
| - if context is None: |
| 271 | + |
| 272 | +def get_astc_context(block_size: tuple): |
| 273 | + """Get the ASTC context and its lock using the given `block_size`.""" |
| 274 | + if block_size not in ASTC_CONTEXTS: |
261 | 275 | config = astc_encoder.ASTCConfig(
|
262 | 276 | astc_encoder.ASTCProfile.LDR,
|
263 | 277 | *block_size,
|
264 |
| - 1, |
265 |
| - 100, |
266 |
| - astc_encoder.ASTCConfigFlags.USE_DECODE_UNORM8, |
| 278 | + block_z=1, |
| 279 | + quality=100, |
| 280 | + flags=astc_encoder.ASTCConfigFlags.USE_DECODE_UNORM8, |
267 | 281 | )
|
268 |
| - context = ASTC_CONTEXTS[block_size] = astc_encoder.ASTCContext(config) |
269 |
| - |
270 |
| - image = astc_encoder.ASTCImage(astc_encoder.ASTCType.U8, width, height, 1) |
271 |
| - texture_size = calculate_astc_compressed_size(width, height, block_size) |
272 |
| - if len(image_data) < texture_size: |
273 |
| - raise ValueError(f"Invalid ASTC data size: {len(image_data)} < {texture_size}") |
274 |
| - context.decompress( |
275 |
| - image_data[:texture_size], image, astc_encoder.ASTCSwizzle.from_str("RGBA") |
276 |
| - ) |
277 |
| - |
278 |
| - return Image.frombytes("RGBA", (width, height), image.data, "raw", "RGBA") |
| 282 | + context = astc_encoder.ASTCContext(config) |
| 283 | + lock = Lock() |
| 284 | + ASTC_CONTEXTS[block_size] = (context, lock) |
| 285 | + return ASTC_CONTEXTS[block_size] |
279 | 286 |
|
280 | 287 |
|
281 | 288 | def calculate_astc_compressed_size(width: int, height: int, block_size: tuple) -> int:
|
|
0 commit comments