Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
278 changes: 278 additions & 0 deletions Tests/test_arro3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
from __future__ import annotations

import json
from typing import Any, NamedTuple

import pytest

from PIL import Image

from .helper import (
assert_deep_equal,
assert_image_equal,
hopper,
is_big_endian,
)

TYPE_CHECKING = False
if TYPE_CHECKING:
from arro3 import compute # type: ignore [import-not-found]
from arro3.core import ( # type: ignore [import-not-found]
Array,
DataType,
Field,
fixed_size_list_array,
)
else:
arro3 = pytest.importorskip("arro3", reason="Arro3 not installed")
from arro3 import compute
from arro3.core import Array, DataType, Field, fixed_size_list_array

TEST_IMAGE_SIZE = (10, 10)


def _test_img_equals_pyarray(
img: Image.Image, arr: Any, mask: list[int] | None, elts_per_pixel: int = 1
) -> None:
assert img.height * img.width * elts_per_pixel == len(arr)
px = img.load()
assert px is not None
if elts_per_pixel > 1 and mask is None:
# have to do element-wise comparison when we're comparing
# flattened r,g,b,a to a pixel.
mask = list(range(elts_per_pixel))
for x in range(0, img.size[0], int(img.size[0] / 10)):
for y in range(0, img.size[1], int(img.size[1] / 10)):
if mask:
pixel = px[x, y]
assert isinstance(pixel, tuple)
for ix, elt in enumerate(mask):
if elts_per_pixel == 1:
assert pixel[ix] == arr[y * img.width + x].as_py()[elt]
else:
assert (
pixel[ix]
== arr[(y * img.width + x) * elts_per_pixel + elt].as_py()
)
else:
assert_deep_equal(px[x, y], arr[y * img.width + x].as_py())


def _test_img_equals_int32_pyarray(
img: Image.Image, arr: Any, mask: list[int] | None, elts_per_pixel: int = 1
) -> None:
assert img.height * img.width * elts_per_pixel == len(arr)
px = img.load()
assert px is not None
if mask is None:
# have to do element-wise comparison when we're comparing
# flattened rgba in an uint32 to a pixel.
mask = list(range(elts_per_pixel))
for x in range(0, img.size[0], int(img.size[0] / 10)):
for y in range(0, img.size[1], int(img.size[1] / 10)):
pixel = px[x, y]
assert isinstance(pixel, tuple)
arr_pixel_int = arr[y * img.width + x].as_py()
arr_pixel_tuple = (
arr_pixel_int % 256,
(arr_pixel_int // 256) % 256,
(arr_pixel_int // 256**2) % 256,
(arr_pixel_int // 256**3),
)
if is_big_endian():
arr_pixel_tuple = arr_pixel_tuple[::-1]

for ix, elt in enumerate(mask):
assert pixel[ix] == arr_pixel_tuple[elt]


fl_uint8_4_type = DataType.list(Field("_", DataType.uint8()).with_nullable(False), 4)


@pytest.mark.parametrize(
"mode, dtype, mask",
(
("L", DataType.uint8(), None),
("I", DataType.int32(), None),
("F", DataType.float32(), None),
("LA", fl_uint8_4_type, [0, 3]),
("RGB", fl_uint8_4_type, [0, 1, 2]),
("RGBA", fl_uint8_4_type, None),
("RGBX", fl_uint8_4_type, None),
("CMYK", fl_uint8_4_type, None),
("YCbCr", fl_uint8_4_type, [0, 1, 2]),
("HSV", fl_uint8_4_type, [0, 1, 2]),
),
)
def test_to_array(mode: str, dtype: DataType, mask: list[int] | None) -> None:
img = hopper(mode)

# Resize to non-square
img = img.crop((3, 0, 124, 127))
assert img.size == (121, 127)

arr = Array(img)
_test_img_equals_pyarray(img, arr, mask)
assert arr.type == dtype

reloaded = Image.fromarrow(arr, mode, img.size)

assert reloaded

assert_image_equal(img, reloaded)


def test_lifetime() -> None:
# valgrind shouldn't error out here.
# arrays should be accessible after the image is deleted.

img = hopper("L")

arr_1 = Array(img)
arr_2 = Array(img)

del img

assert compute.sum(arr_1).as_py() > 0
del arr_1

assert compute.sum(arr_2).as_py() > 0
del arr_2


def test_lifetime2() -> None:
# valgrind shouldn't error out here.
# img should remain after the arrays are collected.

img = hopper("L")

arr_1 = Array(img)
arr_2 = Array(img)

assert compute.sum(arr_1).as_py() > 0
del arr_1

assert compute.sum(arr_2).as_py() > 0
del arr_2

img2 = img.copy()
px = img2.load()
Comment on lines +155 to +156
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
img2 = img.copy()
px = img2.load()
px = img.load()

Or are you using copy() because you're testing that the data in img2 is still correct after img is no longer used?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to ensure we haven't over freed the memory, as we're refcounting on the arrow usages.

assert px # make mypy happy
assert isinstance(px[0, 0], int)


class DataShape(NamedTuple):
dtype: DataType
# Strictly speaking, elt should be a pixel or pixel component, so
# list[uint8][4], float, int, uint32, uint8, etc. But more
# correctly, it should be exactly the dtype from the line above.
elt: Any
elts_per_pixel: int


UINT_ARR = DataShape(
dtype=fl_uint8_4_type,
elt=[1, 2, 3, 4], # array of 4 uint8 per pixel
elts_per_pixel=1, # only one array per pixel
)

UINT = DataShape(
dtype=DataType.uint8(),
elt=3, # one uint8,
elts_per_pixel=4, # but repeated 4x per pixel
)

UINT32 = DataShape(
dtype=DataType.uint32(),
elt=0xABCDEF45, # one packed int, doesn't fit in a int32 > 0x80000000
elts_per_pixel=1, # one per pixel
)

INT32 = DataShape(
dtype=DataType.uint32(),
elt=0x12CDEF45, # one packed int
elts_per_pixel=1, # one per pixel
)


@pytest.mark.parametrize(
"mode, data_tp, mask",
(
("L", DataShape(DataType.uint8(), 3, 1), None),
("I", DataShape(DataType.int32(), 1 << 24, 1), None),
("F", DataShape(DataType.float32(), 3.14159, 1), None),
("LA", UINT_ARR, [0, 3]),
("LA", UINT, [0, 3]),
("RGB", UINT_ARR, [0, 1, 2]),
("RGBA", UINT_ARR, None),
("CMYK", UINT_ARR, None),
("YCbCr", UINT_ARR, [0, 1, 2]),
("HSV", UINT_ARR, [0, 1, 2]),
("RGB", UINT, [0, 1, 2]),
("RGBA", UINT, None),
("CMYK", UINT, None),
("YCbCr", UINT, [0, 1, 2]),
("HSV", UINT, [0, 1, 2]),
),
)
def test_fromarray(mode: str, data_tp: DataShape, mask: list[int] | None) -> None:
(dtype, elt, elts_per_pixel) = data_tp

ct_pixels = TEST_IMAGE_SIZE[0] * TEST_IMAGE_SIZE[1]
if dtype == fl_uint8_4_type:
tmp_arr = Array(elt * (ct_pixels * elts_per_pixel), type=DataType.uint8())
arr = fixed_size_list_array(tmp_arr, 4)
else:
arr = Array([elt] * (ct_pixels * elts_per_pixel), type=dtype)
img = Image.fromarrow(arr, mode, TEST_IMAGE_SIZE)

_test_img_equals_pyarray(img, arr, mask, elts_per_pixel)


@pytest.mark.parametrize(
"mode, mask",
(
("LA", [0, 3]),
("RGB", [0, 1, 2]),
("RGBA", None),
("CMYK", None),
("YCbCr", [0, 1, 2]),
("HSV", [0, 1, 2]),
),
)
@pytest.mark.parametrize("data_tp", (UINT32, INT32))
def test_from_int32array(mode: str, mask: list[int] | None, data_tp: DataShape) -> None:
(dtype, elt, elts_per_pixel) = data_tp

ct_pixels = TEST_IMAGE_SIZE[0] * TEST_IMAGE_SIZE[1]
arr = Array([elt] * (ct_pixels * elts_per_pixel), type=dtype)
img = Image.fromarrow(arr, mode, TEST_IMAGE_SIZE)

_test_img_equals_int32_pyarray(img, arr, mask, elts_per_pixel)


@pytest.mark.parametrize(
"mode, metadata",
(
("LA", ["L", "X", "X", "A"]),
("RGB", ["R", "G", "B", "X"]),
("RGBX", ["R", "G", "B", "X"]),
("RGBA", ["R", "G", "B", "A"]),
("CMYK", ["C", "M", "Y", "K"]),
("YCbCr", ["Y", "Cb", "Cr", "X"]),
("HSV", ["H", "S", "V", "X"]),
),
)
def test_image_metadata(mode: str, metadata: list[str]) -> None:
img = hopper(mode)

arr = Array(img)

assert arr.type.value_field
assert arr.type.value_field.metadata
assert arr.type.value_field.metadata[b"image"]

parsed_metadata = json.loads(arr.type.value_field.metadata[b"image"].decode("utf8"))

assert "bands" in parsed_metadata
assert parsed_metadata["bands"] == metadata
Loading
Loading