Skip to content

Commit 3276559

Browse files
jonathanhoggdpgeorge
authored andcommitted
esp32/modules/machine.py: Add Counter and Encoder classes.
Adds a Python override of the `machine` module, which delegates to the built-in module and adds an implementation of `Counter` and `Encoder`, based on the `esp32.PCNT` class. Original implementation by: Jonathan Hogg <[email protected]> Signed-off-by: Jim Mussared <[email protected]>
1 parent e54553c commit 3276559

File tree

1 file changed

+192
-0
lines changed

1 file changed

+192
-0
lines changed

ports/esp32/modules/machine.py

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
import sys
2+
3+
_path = sys.path
4+
sys.path = ()
5+
try:
6+
import machine as _machine
7+
finally:
8+
sys.path = _path
9+
del _path
10+
del sys
11+
12+
13+
from micropython import const
14+
import esp32
15+
16+
if hasattr(esp32, "PCNT"):
17+
_PCNT_RANGE = const(32000)
18+
19+
class _CounterBase:
20+
_PCNT = esp32.PCNT
21+
# Singletons, keyed by PCNT unit_id (shared by both Counter & Encoder).
22+
_INSTANCES = {}
23+
24+
# Use __new__ to implement a singleton rather than a factory function,
25+
# because we need to be able to provide class attributes, e.g.
26+
# Counter.RISING, which is not possible if Counter was a function
27+
# (functions cannot have attributes in MicroPython).
28+
def __new__(cls, unit_id, *_args, **_kwargs):
29+
# Find an existing instance for this PCNT unit id.
30+
self = cls._INSTANCES.get(unit_id)
31+
32+
if self:
33+
# Verify that this PCNT is being used for the same type
34+
# (Encoder or Counter).
35+
if not isinstance(self, cls):
36+
raise ValueError("PCNT in use")
37+
else:
38+
# Previously unused PCNT unit.
39+
self = object.__new__(cls)
40+
cls._INSTANCES[unit_id] = self
41+
42+
# __init__ will now be called with the same args.
43+
return self
44+
45+
def __init__(self, unit_id, *args, filter_ns=0, **kwargs):
46+
self._unit_id = unit_id
47+
48+
if not hasattr(self, "_pcnt"):
49+
# New instance, or previously deinit-ed.
50+
self._pcnt = self._PCNT(unit_id, min=-_PCNT_RANGE, max=_PCNT_RANGE)
51+
elif not (args or kwargs):
52+
# Existing instance, and no args, so accessing the existing
53+
# singleton without reconfiguring. Note: This means that
54+
# Counter/Encoder cannot be partially re-initalised. Either
55+
# you get the existing instance as-is (by passing no arguments
56+
# other than the id), or you must pass all the necessary
57+
# arguments to additionally re-configure it.
58+
return
59+
60+
# Counter- or Encoder-specific configuration of self._pcnt.
61+
self._configure(*args, **kwargs)
62+
63+
# Common unit configuration.
64+
self._pcnt.init(
65+
filter=min(max(0, filter_ns * 80 // 1000), 1023),
66+
value=0,
67+
)
68+
69+
# Note: We track number-of-overflows rather than the actual count in
70+
# order to avoid the IRQ handler overflowing MicroPython's "small int"
71+
# range. This gives an effective range of 2**30 overflows. User code
72+
# should use counter.value(0) to reset the overflow count.
73+
# The ESP32 PCNT resets to zero on under/overflow (i.e. it does not wrap
74+
# around to the opposite limit), so each overflow corresponds to exactly
75+
# _PCNT_RANGE counts.
76+
77+
# Reset counter state.
78+
self._overflows = 0
79+
self._offset = 0
80+
81+
# Install IRQ handler to handle under/overflow.
82+
self._pcnt.irq(self._overflow, self._PCNT.IRQ_MIN | self._PCNT.IRQ_MAX)
83+
84+
# Start counting.
85+
self._pcnt.start()
86+
87+
# Handle counter under/overflow.
88+
def _overflow(self, pcnt):
89+
mask = pcnt.irq().flags()
90+
if mask & self._PCNT.IRQ_MIN:
91+
self._overflows -= 1
92+
elif mask & self._PCNT.IRQ_MAX:
93+
self._overflows += 1
94+
95+
# Public machine.Counter & machine.Encoder API.
96+
def init(self, *args, **kwargs):
97+
self.__init__(self._unit_id, *args, **kwargs)
98+
99+
# Public machine.Counter & machine.Encoder API.
100+
def deinit(self):
101+
if hasattr(self, "_pcnt"):
102+
self._pcnt.deinit()
103+
del self._pcnt
104+
105+
# Public machine.Counter & machine.Encoder API.
106+
def value(self, value=None):
107+
if not hasattr(self, "_pcnt"):
108+
raise RuntimeError("not initialised")
109+
110+
# This loop deals with the possibility that a PCNT overflow occurs
111+
# between retrieving self._overflows and self._pcnt.value().
112+
while True:
113+
overflows = self._overflows
114+
current = self._pcnt.value()
115+
# Calling PCNT.value() forces any pending interrupts to run
116+
# for this PCNT unit. So self._overflows must now be the the
117+
# value corresponding to the value we read.
118+
if self._overflows == overflows:
119+
break
120+
121+
# Compute the result including the number of times we've cycled
122+
# through the range, and any applied offset.
123+
result = overflows * _PCNT_RANGE + current + self._offset
124+
125+
# If a new value is specified, then zero out the overflows, and set
126+
# self._offset so that it zeros out the current PCNT value. The
127+
# mutation to self._overflows is atomic w.r.t. the overflow IRQ
128+
# handler because the scheduler only runs on branch instructions.
129+
if value is not None:
130+
self._overflows -= overflows
131+
self._offset = value - current
132+
133+
return result
134+
135+
class Counter(_CounterBase):
136+
# Public machine.Counter API.
137+
RISING = 1
138+
FALLING = 2
139+
UP = _CounterBase._PCNT.INCREMENT
140+
DOWN = _CounterBase._PCNT.DECREMENT
141+
142+
# Counter-specific configuration.
143+
def _configure(self, src, edge=RISING, direction=UP):
144+
# Only use the first channel.
145+
self._pcnt.init(
146+
channel=0,
147+
pin=src,
148+
rising=direction if edge & Counter.RISING else self._PCNT.IGNORE,
149+
falling=direction if edge & Counter.FALLING else self._PCNT.IGNORE,
150+
)
151+
152+
class Encoder(_CounterBase):
153+
# Encoder-specific configuration.
154+
def _configure(self, phase_a, phase_b, phases=1):
155+
if phases not in (1, 2, 4):
156+
raise ValueError("phases")
157+
# Configure the first channel.
158+
self._pcnt.init(
159+
channel=0,
160+
pin=phase_a,
161+
falling=self._PCNT.INCREMENT,
162+
rising=self._PCNT.DECREMENT,
163+
mode_pin=phase_b,
164+
mode_low=self._PCNT.HOLD if phases == 1 else self._PCNT.REVERSE,
165+
)
166+
if phases == 4:
167+
# For 4x quadrature, enable the second channel.
168+
self._pcnt.init(
169+
channel=1,
170+
pin=phase_b,
171+
falling=self._PCNT.DECREMENT,
172+
rising=self._PCNT.INCREMENT,
173+
mode_pin=phase_a,
174+
mode_low=self._PCNT.REVERSE,
175+
)
176+
else:
177+
# For 1x and 2x quadrature, disable the second channel.
178+
self._pcnt.init(channel=1, pin=None, rising=self._PCNT.IGNORE)
179+
self._phases = phases
180+
181+
def phases(self):
182+
return self._phases
183+
184+
del _CounterBase
185+
186+
187+
del esp32
188+
189+
190+
# Delegate to built-in machine module.
191+
def __getattr__(attr):
192+
return getattr(_machine, attr)

0 commit comments

Comments
 (0)