Skip to content

Commit 1ff96f0

Browse files
Initial implementation of VEML6031X00 with unit tests (#329)
1 parent 8ade08d commit 1ff96f0

File tree

2 files changed

+1228
-0
lines changed
  • circuitpython-workspaces/flight-software/src/pysquared/hardware/light_sensor/manager
  • cpython-workspaces/flight-software-unit-tests/src/unit-tests/hardware/light_sensor/manager

2 files changed

+1228
-0
lines changed
Lines changed: 389 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,389 @@
1+
"""VEML6031x00 light sensor manager for CircuitPython.
2+
3+
This module provides a CircuitPython implementation to interact with the
4+
VEML6031/VEML6030 ambient light sensor family using direct I2C register
5+
access. It mirrors the behavior of the Zephyr driver with support for
6+
single-measurement (active force) mode and data-ready polling.
7+
8+
Usage:
9+
logger = Logger()
10+
i2c = initialize_i2c_bus(logger, board.SCL, board.SDA, 100000)
11+
sensor = VEML6031x00Manager(logger, i2c)
12+
lux = sensor.get_lux()
13+
"""
14+
15+
import time
16+
17+
from adafruit_tca9548a import TCA9548A_Channel
18+
from busio import I2C
19+
20+
from ....logger import Logger
21+
from ....protos.light_sensor import LightSensorProto
22+
from ....sensor_reading.error import (
23+
SensorReadingTimeoutError,
24+
SensorReadingUnknownError,
25+
SensorReadingValueError,
26+
)
27+
from ....sensor_reading.light import Light
28+
from ....sensor_reading.lux import Lux
29+
from ...exception import HardwareInitializationError
30+
31+
# I2C default address
32+
_DEFAULT_I2C_ADDR = 0x29
33+
34+
# Registers (16-bit command codes)
35+
_REG_ALS_CONF_0 = 0x00
36+
_REG_ALS_CONF_1 = 0x01
37+
_REG_ALS_WH_L = 0x04
38+
_REG_ALS_WH_H = 0x05
39+
_REG_ALS_WL_L = 0x06
40+
_REG_ALS_WL_H = 0x07
41+
_REG_ALS_DATA_L = 0x10
42+
_REG_ALS_DATA_H = 0x11
43+
_REG_IR_DATA_L = 0x12
44+
_REG_IR_DATA_H = 0x13
45+
_REG_ID_L = 0x14
46+
_REG_ID_H = 0x15
47+
_REG_ALS_INT = 0x17
48+
49+
# Device constants
50+
_DEFAULT_ID = 0x01
51+
_ALS_AF_DATA_READY = 1 << 3
52+
_ALS_DATA_OVERFLOW = 0xFFFF
53+
54+
55+
# Enumerations for settings (indices into resolution matrix)
56+
class _Div4:
57+
"""Effective photodiode size selection indices."""
58+
59+
SIZE_4_4 = 0
60+
SIZE_1_4 = 1
61+
COUNT = 2
62+
63+
64+
class _Gain:
65+
"""Gain selection indices for ambient-light channel."""
66+
67+
GAIN_1 = 0
68+
GAIN_2 = 1
69+
GAIN_0_66 = 2
70+
GAIN_0_5 = 3
71+
COUNT = 4
72+
73+
74+
class _It:
75+
"""Integration time selection indices (shortest to longest)."""
76+
77+
IT_3_125MS = 0
78+
IT_6_25MS = 1
79+
IT_12_5MS = 2
80+
IT_25MS = 3
81+
IT_50MS = 4
82+
IT_100MS = 5
83+
IT_200MS = 6
84+
IT_400MS = 7
85+
COUNT = 8
86+
87+
88+
# Integration time values in microseconds matching _It indices
89+
_IT_US = [
90+
3125,
91+
6250,
92+
12500,
93+
25000,
94+
50000,
95+
100000,
96+
200000,
97+
400000,
98+
]
99+
100+
101+
# Resolution matrix [div4][gain][itim] in lux/count (from Zephyr driver)
102+
_RESOLUTION = (
103+
# size 4/4
104+
(
105+
(0.8704, 0.4352, 0.2176, 0.1088, 0.0544, 0.0272, 0.0136, 0.0068), # gain 1
106+
(0.4352, 0.2176, 0.1088, 0.0544, 0.0272, 0.0136, 0.0068, 0.0034), # gain 2
107+
(1.3188, 0.6504, 0.3297, 0.1648, 0.0824, 0.0412, 0.0206, 0.0103), # gain 0.66
108+
(1.7408, 0.8704, 0.4352, 0.2176, 0.1088, 0.0544, 0.0272, 0.0136), # gain 0.5
109+
),
110+
# size 1/4
111+
(
112+
(3.4816, 1.7408, 0.8704, 0.4352, 0.2176, 0.1088, 0.0544, 0.0272), # gain 1
113+
(1.7408, 0.8704, 0.4352, 0.2176, 0.1088, 0.0544, 0.0272, 0.0136), # gain 2
114+
(5.2752, 2.6376, 1.3188, 0.6594, 0.3297, 0.1648, 0.0824, 0.0412), # gain 0.66
115+
(6.9632, 3.4816, 1.7408, 0.8704, 0.4352, 0.2176, 0.1088, 0.0544), # gain 0.5
116+
),
117+
)
118+
119+
120+
def _in_range(val: int, min_v: int, max_v: int) -> bool:
121+
"""Return True if val is between min_v and max_v inclusive."""
122+
return (val >= min_v) and (val <= max_v)
123+
124+
125+
class VEML6031x00Manager(LightSensorProto):
126+
"""Manages the VEML6031/VEML6030 ambient light sensor via I2C.
127+
128+
Implements single-shot measurement using active force mode and polls the
129+
data-ready bit before reading ambient light and IR data. Converts counts to lux
130+
using the device's resolution matrix based on configuration.
131+
"""
132+
133+
def __init__(
134+
self,
135+
logger: Logger,
136+
i2c: I2C | TCA9548A_Channel,
137+
address: int = _DEFAULT_I2C_ADDR,
138+
integration_time: int = _It.IT_100MS,
139+
gain: int = _Gain.GAIN_1,
140+
div4: int = _Div4.SIZE_4_4,
141+
persistence: int = 0,
142+
) -> None:
143+
"""Initialize the manager and validate the device ID.
144+
145+
Args:
146+
logger: Logger to log messages.
147+
i2c: I2C bus or TCA channel the device is on.
148+
address: I2C address of the sensor (default 0x29).
149+
integration_time: One of `_It.*` indices (default 100ms).
150+
gain: One of `_Gain.*` indices (default 1x).
151+
div4: One of `_Div4.*` indices (default full size).
152+
persistence: Persistence setting for ambient-light channel (0 maps to 1 sample).
153+
"""
154+
self._log: Logger = logger
155+
self._i2c: I2C | TCA9548A_Channel = i2c
156+
self._addr: int = address
157+
158+
# Current configuration state
159+
self._sd = 0
160+
self._int_en = 0
161+
self._trig = 1
162+
self._af = 1
163+
self._ir_sd = 0
164+
self._cal = 1
165+
self._div4 = div4
166+
self._gain = gain
167+
self._itim = integration_time
168+
self._pers = persistence
169+
self._thresh_high = 0xFFFF
170+
self._thresh_low = 0x0000
171+
172+
# Last measurement
173+
self._als_counts = 0
174+
self._ir_counts = 0
175+
self._als_lux = 0.0
176+
177+
# Probe device ID
178+
try:
179+
self._log.debug("Initializing VEML6031x00 light sensor")
180+
id_l = self._read8(_REG_ID_L)
181+
if id_l != _DEFAULT_ID:
182+
raise HardwareInitializationError("Unexpected VEML6031x00 device ID")
183+
_ = self._read8(_REG_ID_H)
184+
# Apply initial configuration
185+
self._write_thresh_low(self._thresh_low)
186+
self._write_thresh_high(self._thresh_high)
187+
self._write_conf()
188+
except HardwareInitializationError:
189+
raise
190+
except Exception as e:
191+
raise HardwareInitializationError("Failed to initialize VEML6031x00") from e
192+
193+
def get_light(self) -> Light:
194+
"""Perform a single measurement and return raw ambient-light counts.
195+
196+
Returns:
197+
Light: Non-unit-specific reading (sensor counts).
198+
"""
199+
try:
200+
self._single_measurement_sequence()
201+
return Light(float(self._als_counts))
202+
except SensorReadingTimeoutError:
203+
raise
204+
except SensorReadingValueError:
205+
raise
206+
except Exception as e:
207+
raise SensorReadingUnknownError("Failed to get light reading") from e
208+
209+
def get_lux(self) -> Lux:
210+
"""Perform a single measurement and return the light level in lux.
211+
212+
Returns:
213+
Lux: Light level in SI lux.
214+
"""
215+
try:
216+
self._single_measurement_sequence()
217+
if self._als_lux is None or self._als_lux == 0:
218+
raise SensorReadingValueError("Lux reading is invalid or zero")
219+
return Lux(self._als_lux)
220+
except SensorReadingTimeoutError:
221+
raise
222+
except SensorReadingValueError:
223+
raise
224+
except Exception as e:
225+
raise SensorReadingUnknownError("Failed to get lux reading") from e
226+
227+
def reset(self) -> None:
228+
"""Place device into shutdown briefly and then resume operation."""
229+
try:
230+
self._sd = 1
231+
self._ir_sd = 1
232+
self._write_conf()
233+
time.sleep(0.05)
234+
self._sd = 0
235+
self._ir_sd = 0
236+
self._write_conf()
237+
self._log.debug("Light sensor reset successfully")
238+
except Exception as e:
239+
self._log.error("Failed to reset VEML6031x00 light sensor: %s", e)
240+
241+
# --- Low-level helpers ---
242+
def _write_conf(self) -> None:
243+
"""Write configuration registers based on current state.
244+
245+
Encodes shutdown, gain, size, persistence, active-force trigger,
246+
and integration time into the two configuration bytes.
247+
"""
248+
conf0 = 0
249+
conf1 = 0
250+
# conf1 bits
251+
conf1 |= (self._ir_sd & 0x01) << 7
252+
conf1 |= (self._div4 & 0x01) << 6
253+
conf1 |= (self._gain & 0x03) << 3
254+
conf1 |= (self._pers & 0x03) << 1
255+
conf1 |= 1 if self._cal else 0
256+
257+
# conf0 bits
258+
conf0 |= (self._itim & 0x07) << 4
259+
conf0 |= (1 if self._af else 0) << 3
260+
conf0 |= (1 if self._trig else 0) << 2
261+
conf0 |= (1 if self._int_en else 0) << 1
262+
conf0 |= 1 if self._sd else 0
263+
264+
self._write16(_REG_ALS_CONF_0, (conf1 << 8) | conf0)
265+
266+
def _write_thresh_low(self, counts: int) -> None:
267+
"""Write low threshold in sensor counts."""
268+
counts &= 0xFFFF
269+
self._write16(_REG_ALS_WL_L, counts)
270+
271+
def _write_thresh_high(self, counts: int) -> None:
272+
"""Write high threshold in sensor counts."""
273+
counts &= 0xFFFF
274+
self._write16(_REG_ALS_WH_L, counts)
275+
276+
def _single_measurement_sequence(self) -> None:
277+
"""Run active-force single measurement and update cached readings.
278+
279+
Waits for the data-ready flag up to a bounded timeout, then reads
280+
ambient-light and IR counts and computes lux using the resolution
281+
matrix corresponding to the current configuration.
282+
"""
283+
# Configure for single measurement (active force)
284+
self._ir_sd = 0
285+
self._cal = 1
286+
self._af = 1
287+
self._trig = 1
288+
self._int_en = 0
289+
self._sd = 0
290+
self._write_conf()
291+
292+
# Initial read clears flags on some devices
293+
_ = self._read8(_REG_ALS_INT)
294+
295+
# Sleep for integration time
296+
if _in_range(self._itim, 0, _It.COUNT - 1):
297+
time.sleep(_IT_US[self._itim] / 1_000_000.0)
298+
else:
299+
# Fallback minimal wait if misconfigured
300+
time.sleep(0.001)
301+
302+
# Poll data-ready with timeout
303+
start = time.monotonic()
304+
while True:
305+
int_val = self._read8(_REG_ALS_INT)
306+
if (int_val & _ALS_AF_DATA_READY) != 0:
307+
break
308+
if time.monotonic() - start > 0.5: # 500ms safety timeout
309+
raise SensorReadingTimeoutError("VEML6031x00 data ready timeout")
310+
time.sleep(0.001)
311+
312+
# Read result registers (little endian pairs)
313+
als_counts = self._read16(_REG_ALS_DATA_L)
314+
ir_counts = self._read16(_REG_IR_DATA_L)
315+
316+
if als_counts == _ALS_DATA_OVERFLOW:
317+
raise SensorReadingValueError("Ambient light reading overflow (saturation)")
318+
319+
if not (
320+
_in_range(self._div4, 0, _Div4.COUNT - 1)
321+
and _in_range(self._gain, 0, _Gain.COUNT - 1)
322+
and _in_range(self._itim, 0, _It.COUNT - 1)
323+
):
324+
raise SensorReadingUnknownError("Invalid sensor configuration indices")
325+
326+
res = _RESOLUTION[self._div4][self._gain][self._itim]
327+
self._als_counts = als_counts
328+
self._ir_counts = ir_counts
329+
self._als_lux = als_counts * res
330+
331+
def _acquire_i2c_lock(self) -> None:
332+
"""Acquire I2C bus lock with retry logic.
333+
334+
Raises:
335+
RuntimeError: If unable to lock the I2C bus after 200 attempts.
336+
"""
337+
tries = 0
338+
while not self._i2c.try_lock():
339+
if tries >= 200:
340+
raise RuntimeError("Unable to lock I2C bus")
341+
tries += 1
342+
time.sleep(0)
343+
344+
def _write16(self, reg: int, value: int) -> None:
345+
"""Write a 16-bit little-endian value to a register."""
346+
# value is 16-bit little-endian
347+
buf = bytearray(3)
348+
buf[0] = reg & 0xFF
349+
buf[1] = value & 0xFF
350+
buf[2] = (value >> 8) & 0xFF
351+
self._acquire_i2c_lock()
352+
try:
353+
self._i2c.writeto(self._addr, buf)
354+
finally:
355+
self._i2c.unlock()
356+
357+
def _read16(self, reg: int) -> int:
358+
"""Read a 16-bit little-endian value from a register."""
359+
out_buf = bytearray(1)
360+
out_buf[0] = reg & 0xFF
361+
in_buf = bytearray(2)
362+
# Prefer repeated start if available
363+
self._acquire_i2c_lock()
364+
try:
365+
try:
366+
self._i2c.writeto_then_readfrom(self._addr, out_buf, in_buf)
367+
except AttributeError:
368+
# Fallback: separate ops
369+
self._i2c.writeto(self._addr, out_buf)
370+
self._i2c.readfrom_into(self._addr, in_buf)
371+
finally:
372+
self._i2c.unlock()
373+
return in_buf[0] | (in_buf[1] << 8)
374+
375+
def _read8(self, reg: int) -> int:
376+
"""Read an 8-bit value from a register."""
377+
out_buf = bytearray(1)
378+
out_buf[0] = reg & 0xFF
379+
in_buf = bytearray(1)
380+
self._acquire_i2c_lock()
381+
try:
382+
try:
383+
self._i2c.writeto_then_readfrom(self._addr, out_buf, in_buf)
384+
except AttributeError:
385+
self._i2c.writeto(self._addr, out_buf)
386+
self._i2c.readfrom_into(self._addr, in_buf)
387+
finally:
388+
self._i2c.unlock()
389+
return in_buf[0]

0 commit comments

Comments
 (0)