RingBuffer in MicroPython #12079
-
I get the code from https://www.oreilly.com/library/view/python-cookbook/0596001673/ch05s19.html The critical part is to check if the list length has reached the maximum value. If it is true, then set This means there is no need to repeat the I added some print statements to confirm whether the switch has occurred. class RingBuffer:
""" class that implements a not-yet-full buffer """
def __init__(self,size_max):
self.max = size_max
self.data = []
class __Full:
""" class that implements a full buffer """
def append(self, x):
""" Append an element overwriting the oldest one. """
self.data[self.cur] = x
print("data:", self.data, "cur:",self.cur)
self.cur = (self.cur+1) % self.max
print("next cur:",self.cur)
def get(self):
""" return list of elements in correct order """
print("data:", self.data, "cur:",self.cur)
print(self.data[self.cur:], self.data[:self.cur])
return self.data[self.cur:]+self.data[:self.cur]
def append(self,x):
"""append an element at the end of the buffer"""
self.data.append(x)
if len(self.data) == self.max:
self.cur = 0
# Permanently change self's class from non-full to full
self.__class__ = self.__Full
def get(self):
""" Return a list of elements from the oldest to the newest. """
return self.data Everything works fine in CPython, but in MicroPython, it didn't go as expected. MicroPython v1.20.0-297-g9fb56d156 on 2023-07-20; ESP32S3 module (spiram) with ESP32S3
Type "help()" for more information.
>>> from test5 import RingBuffer
>>> a = RingBuffer(5)
>>> a.append(1);a.append(2);a.append(3);a.append(4)
>>> a.get()
[1, 2, 3, 4]
>>> a.__class__
<class 'RingBuffer'>
>>> a.append(5)
>>> a.get()
[1, 2, 3, 4, 5]
>>> a.__class__
<class '__Full'>
>>> a.append(6)
>>> a.get()
[1, 2, 3, 4, 5, 6]
>>> a.__class__
<class '__Full'>
>>> What should we do to make it work? |
Beta Was this translation helpful? Give feedback.
Replies: 6 comments 5 replies
-
I see what they're going for but uhhhhhh, that seems... overly complicated. MicroPython does not support modifying the type of a class instance at runtime. You actually could make this work though by just replacing the relevant methods:
But this approach is also missing the opportunity to pre-allocate the list. |
Beta Was this translation helpful? Give feedback.
-
See also ringbuf queue, code here. This has a number of MicroPython-specific optimisations. |
Beta Was this translation helpful? Give feedback.
-
I did a test for comparison. @jimmo @peterhinch from machine import freq
from ringbuf_queue import RingbufQueue
import gc
import time
class RingBuffer:
""" class that implements a not-yet-full buffer """
def __init__(self, size_max):
self.max = size_max
self.data = []
self.cur = 0
def _full_append(self, x):
""" Append an element overwriting the oldest one. """
self.data[self.cur] = x
self.cur = (self.cur+1) % self.max
def _full_get(self):
""" return list of elements in correct order """
return self.data[self.cur:]+self.data[:self.cur]
def append(self, x):
"""append an element at the end of the buffer"""
self.data.append(x)
if len(self.data) == self.max:
# Replace the get/append methods
self.append = self._full_append
self.get = self._full_get
def get(self):
""" Return a list of elements from the oldest to the newest. """
return self.data
class RingBuffer_3:
""" class that implements a not-yet-full buffer """
def __init__(self, size_max):
self.max = size_max
self.data = [None]*self.max
self.cur = 0
def append(self, x):
""" Append an element overwriting the oldest one. """
self.data[self.cur] = x
self.cur = (self.cur+1) % self.max
def get(self):
""" return list of elements in correct order """
return self.data[self.cur:]+self.data[:self.cur]
def start_test(len=1000, nums=100_000, fq=240000000):
freq(fq)
print("-"*12+f"len={len}, nums={nums}, freq={freq()//1000000}MHz"+"-"*12)
gc.collect()
start_time_1 = time.ticks_ms()
a1 = RingBuffer(len)
for i in range(nums):
a1.append(i)
# temp1 = a1.get()
consumed_time_1 = time.ticks_diff(time.ticks_ms(), start_time_1)
print(
f"RingBuffer = {consumed_time_1}ms,RAM used = {gc.mem_alloc()}B")
del (start_time_1, a1, consumed_time_1)
gc.collect()
start_time_2 = time.ticks_ms()
a2 = RingBuffer_3(len)
for i in range(nums):
a2.append(i)
# temp2 = a2.get()
consumed_time_2 = time.ticks_diff(time.ticks_ms(), start_time_2)
print(
f"RingBuffer_3 = {consumed_time_2}ms,RAM used = {gc.mem_alloc()}B")
del (start_time_2, a2, consumed_time_2)
gc.collect()
start_time_3 = time.ticks_ms()
a5 = RingbufQueue(len+1)
for i in range(nums):
a5.put_nowait(i)
consumed_time_3 = time.ticks_diff(time.ticks_ms(), start_time_3)
print(
f"RingbufQueue = {consumed_time_3}ms,RAM used = {gc.mem_alloc()}B")
# print(a5.qsize(), a5.get_nowait())
del (start_time_3, a5, consumed_time_3)
time.sleep(0.1)
while True:
start_test(len=100, nums=100_000)
start_test(len=1_000, nums=100_000)
start_test(len=10_000, nums=100_000)
start_test(len=10_000, nums=1_000_000)
start_test(len=10_000, nums=10_000_000) freq = 240MHz:
freq = 160MHz:
|
Beta Was this translation helpful? Give feedback.
-
@Wind-stormger The benefits of pre-allocation are lost if you use The benchmark results are interesting, however avoiding allocation doesn't automatically map onto improved performance. The hit from allocation occurs when a GC takes place, which may or may not happen while the benchmark runs. There may also be a lack of deterministic behaviour on an ESP32 owing to the underlying OS. Are results consistent between runs? It might be worth running a Other random observations. The tests overwrite the oldest data. While this is entirely valid, the typical use of a queue avoids this. It is possible that |
Beta Was this translation helpful? Give feedback.
-
I try it. @rkompass RingBuffer RingBuffer_3 RingbufQueue
Much better. |
Beta Was this translation helpful? Give feedback.
-
Full test code. from machine import freq
from ringbuf_queue import RingbufQueue
import gc
import time
import array
class RingBuffer:
""" class that implements a not-yet-full buffer """
def __init__(self, size_max):
self.max = size_max
self.data = array.array('i', ())
self.cur = 0
def _full_append(self, x):
""" Append an element overwriting the oldest one. """
self.data[self.cur] = x
self.cur = (self.cur+1) % self.max
def _full_get(self):
""" return list of elements in correct order """
return self.data[self.cur:]+self.data[:self.cur]
def append(self, x):
"""append an element at the end of the buffer"""
self.data.append(x)
if len(self.data) == self.max:
# Replace the get/append methods
self.append = self._full_append
self.get = self._full_get
def get(self):
""" Return a list of elements from the oldest to the newest. """
return self.data
class RingBuffer_3:
""" class that implements a not-yet-full buffer """
def __init__(self, size_max):
self.max = size_max
self.data = array.array('i', (0 for _ in range(self.max)))
self.cur = 0
def append(self, x):
""" Append an element overwriting the oldest one. """
self.data[self.cur] = x
self.cur = (self.cur+1) % self.max
def get(self):
""" return list of elements in correct order """
return self.data[self.cur:]+self.data[:self.cur]
def start_test(len=1000, nums=100_000, fq=240000000):
freq(fq)
print("-"*12+f"len={len}, nums={nums}, freq={freq()//1000000}MHz"+"-"*12)
gc.collect()
start_time_1 = time.ticks_ms()
a1 = RingBuffer(len)
for i in range(nums):
a1.append(i)
# temp1 = a1.get()
consumed_time_1 = time.ticks_diff(time.ticks_ms(), start_time_1)
print(
f"RingBuffer = {consumed_time_1}ms,RAM used = {gc.mem_alloc()}B")
# print(a1.get,type(a1.get()))
del (start_time_1, a1, consumed_time_1)
gc.collect()
start_time_2 = time.ticks_ms()
a2 = RingBuffer_3(len)
for i in range(nums):
a2.append(i)
# temp2 = a2.get()
consumed_time_2 = time.ticks_diff(time.ticks_ms(), start_time_2)
print(
f"RingBuffer_3 = {consumed_time_2}ms,RAM used = {gc.mem_alloc()}B")
# print(a2.get,type(a2.get()))
del (start_time_2, a2, consumed_time_2)
gc.collect()
start_time_3 = time.ticks_ms()
ar = array.array('i', (0 for _ in range(len+1)))
a5 = RingbufQueue(ar)
for i in range(nums):
a5.put_nowait(i)
consumed_time_3 = time.ticks_diff(time.ticks_ms(), start_time_3)
print(
f"RingbufQueue = {consumed_time_3}ms,RAM used = {gc.mem_alloc()}B")
# print(a5.qsize(), a5.get_nowait())
del (start_time_3, a5, consumed_time_3)
time.sleep(0.1)
while True:
start_test(len=100, nums=100_000)
start_test(len=1_000, nums=100_000)
start_test(len=10_000, nums=100_000)
start_test(len=10_000, nums=1_000_000)
start_test(len=10_000, nums=10_000_000)
Much better than |
Beta Was this translation helpful? Give feedback.
I see what they're going for but uhhhhhh, that seems... overly complicated.
MicroPython does not support modifying the type of a class instance at runtime. You actually could make this work though by just replacing the relevant methods: