|
| 1 | +# SPDX-FileCopyrightText: 2023 Christopher Parrott for Pimoroni Ltd |
| 2 | +# |
| 3 | +# SPDX-License-Identifier: MIT |
| 4 | + |
| 5 | +import os |
| 6 | +import math |
| 7 | +import struct |
| 8 | +from machine import I2S, Pin |
| 9 | + |
| 10 | +""" |
| 11 | +A class for playing Wav files out of an I2S audio amp. It can also play pure tones. |
| 12 | +This code is based heavily on the work of Mike Teachman, at: |
| 13 | +https://github.com/miketeachman/micropython-i2s-examples/blob/master/examples/wavplayer.py |
| 14 | +""" |
| 15 | + |
| 16 | + |
| 17 | +class WavPlayer: |
| 18 | + # Internal states |
| 19 | + PLAY = 0 |
| 20 | + PAUSE = 1 |
| 21 | + FLUSH = 2 |
| 22 | + STOP = 3 |
| 23 | + NONE = 4 |
| 24 | + |
| 25 | + MODE_WAV = 0 |
| 26 | + MODE_TONE = 1 |
| 27 | + |
| 28 | + # Default buffer length |
| 29 | + SILENCE_BUFFER_LENGTH = 1000 |
| 30 | + WAV_BUFFER_LENGTH = 10000 |
| 31 | + INTERNAL_BUFFER_LENGTH = 20000 |
| 32 | + |
| 33 | + TONE_SAMPLE_RATE = 44_100 |
| 34 | + TONE_BITS_PER_SAMPLE = 16 |
| 35 | + TONE_FULL_WAVES = 2 |
| 36 | + |
| 37 | + def __init__(self, id, sck_pin, ws_pin, sd_pin, amp_enable=None, ibuf_len=INTERNAL_BUFFER_LENGTH, root="/"): |
| 38 | + self.__id = id |
| 39 | + self.__sck_pin = sck_pin |
| 40 | + self.__ws_pin = ws_pin |
| 41 | + self.__sd_pin = sd_pin |
| 42 | + self.__ibuf_len = ibuf_len |
| 43 | + self.__enable = None |
| 44 | + |
| 45 | + if amp_enable is not None: |
| 46 | + self.__enable = Pin(amp_enable, Pin.OUT) |
| 47 | + |
| 48 | + # Set the directory to search for files in |
| 49 | + self.set_root(root) |
| 50 | + |
| 51 | + self.__state = WavPlayer.NONE |
| 52 | + self.__mode = WavPlayer.MODE_WAV |
| 53 | + self.__wav_file = None |
| 54 | + self.__loop_wav = False |
| 55 | + self.__first_sample_offset = None |
| 56 | + self.__flush_count = 0 |
| 57 | + self.__audio_out = None |
| 58 | + |
| 59 | + # Allocate a small array of blank audio samples used for silence |
| 60 | + self.__silence_samples = bytearray(self.SILENCE_BUFFER_LENGTH) |
| 61 | + |
| 62 | + # Allocate a larger array for WAV audio samples, using a memoryview for more efficient access |
| 63 | + self.__wav_samples_mv = memoryview(bytearray(self.WAV_BUFFER_LENGTH)) |
| 64 | + |
| 65 | + # Reserve a variable for audio samples used for tones |
| 66 | + self.__tone_samples = None |
| 67 | + self.__queued_samples = None |
| 68 | + |
| 69 | + def set_root(self, root): |
| 70 | + self.__root = root.rstrip("/") + "/" |
| 71 | + |
| 72 | + def play_wav(self, wav_file, loop=False): |
| 73 | + if os.listdir(self.__root).count(wav_file) == 0: |
| 74 | + raise ValueError(f"'{wav_file}' not found") |
| 75 | + |
| 76 | + self.__stop_i2s() # Stop any active playback and terminate the I2S instance |
| 77 | + |
| 78 | + self.__wav_file = open(self.__root + wav_file, "rb") # Open the chosen WAV file in read-only, binary mode |
| 79 | + self.__loop_wav = loop # Record if the user wants the file to loop |
| 80 | + |
| 81 | + # Parse the WAV file, returning the necessary parameters to initialise I2S communication |
| 82 | + format, sample_rate, bits_per_sample, self.__first_sample_offset, self.sample_size = WavPlayer.__parse_wav(self.__wav_file) |
| 83 | + |
| 84 | + # Keep a track of total bytes read from WAV File |
| 85 | + self.total_bytes_read = 0 |
| 86 | + |
| 87 | + self.__wav_file.seek(self.__first_sample_offset) # Advance to first byte of sample data |
| 88 | + |
| 89 | + self.__start_i2s(bits=bits_per_sample, |
| 90 | + format=format, |
| 91 | + rate=sample_rate, |
| 92 | + state=WavPlayer.PLAY, |
| 93 | + mode=WavPlayer.MODE_WAV) |
| 94 | + |
| 95 | + def play_tone(self, frequency, amplitude): |
| 96 | + if frequency < 20.0 or frequency > 20_000: |
| 97 | + raise ValueError("frequency out of range. Expected between 20Hz and 20KHz") |
| 98 | + |
| 99 | + if amplitude < 0.0 or amplitude > 1.0: |
| 100 | + raise ValueError("amplitude out of range. Expected 0.0 to 1.0") |
| 101 | + |
| 102 | + # Create a buffer containing the pure tone samples |
| 103 | + samples_per_cycle = self.TONE_SAMPLE_RATE // frequency |
| 104 | + sample_size_in_bytes = self.TONE_BITS_PER_SAMPLE // 8 |
| 105 | + samples = bytearray(self.TONE_FULL_WAVES * samples_per_cycle * sample_size_in_bytes) |
| 106 | + range = pow(2, self.TONE_BITS_PER_SAMPLE) // 2 |
| 107 | + |
| 108 | + format = "<h" if self.TONE_BITS_PER_SAMPLE == 16 else "<l" |
| 109 | + |
| 110 | + # Populate the buffer with multiple cycles to avoid it completing too quickly and causing drop outs |
| 111 | + for i in range(samples_per_cycle * self.TONE_FULL_WAVES): |
| 112 | + sample = int((range - 1) * (math.sin(2 * math.pi * i / samples_per_cycle)) * amplitude) |
| 113 | + struct.pack_into(format, samples, i * sample_size_in_bytes, sample) |
| 114 | + |
| 115 | + # Are we not already playing tones? |
| 116 | + if not (self.__mode == WavPlayer.MODE_TONE and (self.__state == WavPlayer.PLAY or self.__state == WavPlayer.PAUSE)): |
| 117 | + self.__stop_i2s() # Stop any active playback and terminate the I2S instance |
| 118 | + self.__tone_samples = samples |
| 119 | + self.__start_i2s(bits=self.TONE_BITS_PER_SAMPLE, |
| 120 | + format=I2S.MONO, |
| 121 | + rate=self.TONE_SAMPLE_RATE, |
| 122 | + state=WavPlayer.PLAY, |
| 123 | + mode=WavPlayer.MODE_TONE) |
| 124 | + else: |
| 125 | + self.__queued_samples = samples |
| 126 | + self.__state = WavPlayer.PLAY |
| 127 | + |
| 128 | + def pause(self): |
| 129 | + if self.__state == WavPlayer.PLAY: |
| 130 | + self.__state = WavPlayer.PAUSE # Enter the pause state on the next callback |
| 131 | + |
| 132 | + def resume(self): |
| 133 | + if self.__state == WavPlayer.PAUSE: |
| 134 | + self.__state = WavPlayer.PLAY # Enter the play state on the next callback |
| 135 | + |
| 136 | + def stop(self): |
| 137 | + if self.__state == WavPlayer.PLAY or self.__state == WavPlayer.PAUSE: |
| 138 | + if self.__mode == WavPlayer.MODE_WAV: |
| 139 | + # Enter the flush state on the next callback and close the file |
| 140 | + # It is done in this order to prevent the callback entering the play |
| 141 | + # state after we close the file but before we change the state) |
| 142 | + self.__state = WavPlayer.FLUSH |
| 143 | + self.__wav_file.close() |
| 144 | + else: |
| 145 | + self.__state = WavPlayer.STOP |
| 146 | + |
| 147 | + def is_playing(self): |
| 148 | + return self.__state != WavPlayer.NONE and self.__state != WavPlayer.STOP |
| 149 | + |
| 150 | + def is_paused(self): |
| 151 | + return self.__state == WavPlayer.PAUSE |
| 152 | + |
| 153 | + def __start_i2s(self, bits=16, format=I2S.MONO, rate=44_100, state=STOP, mode=MODE_WAV): |
| 154 | + import gc |
| 155 | + gc.collect() |
| 156 | + self.__audio_out = I2S( |
| 157 | + self.__id, |
| 158 | + sck=self.__sck_pin, |
| 159 | + ws=self.__ws_pin, |
| 160 | + sd=self.__sd_pin, |
| 161 | + mode=I2S.TX, |
| 162 | + bits=bits, |
| 163 | + format=format, |
| 164 | + rate=rate, |
| 165 | + ibuf=self.__ibuf_len, |
| 166 | + ) |
| 167 | + |
| 168 | + self.__state = state |
| 169 | + self.__mode = mode |
| 170 | + self.__flush_count = self.__ibuf_len // self.SILENCE_BUFFER_LENGTH + 1 |
| 171 | + self.__audio_out.irq(self.__i2s_callback) |
| 172 | + self.__audio_out.write(self.__silence_samples) |
| 173 | + |
| 174 | + if self.__enable is not None: |
| 175 | + self.__enable.on() |
| 176 | + |
| 177 | + def __stop_i2s(self): |
| 178 | + self.stop() # Stop any active playback |
| 179 | + while self.is_playing(): # and wait for it to complete |
| 180 | + pass |
| 181 | + |
| 182 | + if self.__enable is not None: |
| 183 | + self.__enable.off() |
| 184 | + |
| 185 | + if self.__audio_out is not None: |
| 186 | + self.__audio_out.deinit() # Deinit any active I2S comms |
| 187 | + |
| 188 | + self.__state == WavPlayer.NONE # Return to the none state |
| 189 | + |
| 190 | + def __i2s_callback(self, arg): |
| 191 | + # PLAY |
| 192 | + if self.__state == WavPlayer.PLAY: |
| 193 | + if self.__mode == WavPlayer.MODE_WAV: |
| 194 | + num_read = self.__wav_file.readinto(self.__wav_samples_mv) # Read the next section of the WAV file |
| 195 | + self.total_bytes_read += num_read |
| 196 | + # Have we reached the end of the file? |
| 197 | + if num_read == 0: |
| 198 | + # Do we want to loop the WAV playback? |
| 199 | + if self.__loop_wav: |
| 200 | + _ = self.__wav_file.seek(self.__first_sample_offset) # Play again, so advance to first byte of sample data |
| 201 | + else: |
| 202 | + self.__wav_file.close() # Stop playing, so close the file |
| 203 | + self.__state = WavPlayer.FLUSH # and enter the flush state on the next callback |
| 204 | + |
| 205 | + self.__audio_out.write(self.__silence_samples) # In both cases play silence to end this callback |
| 206 | + else: |
| 207 | + if num_read > 0 and num_read < self.WAV_BUFFER_LENGTH: |
| 208 | + num_read = num_read - (self.total_bytes_read - self.sample_size) |
| 209 | + self.__audio_out.write(self.__wav_samples_mv[: num_read]) # We are within the file, so write out the next audio samples |
| 210 | + else: |
| 211 | + if self.__queued_samples is not None: |
| 212 | + self.__tone_samples = self.__queued_samples |
| 213 | + self.__queued_samples = None |
| 214 | + self.__audio_out.write(self.__tone_samples) |
| 215 | + |
| 216 | + # PAUSE or STOP |
| 217 | + elif self.__state == WavPlayer.PAUSE or self.__state == WavPlayer.STOP: |
| 218 | + self.__audio_out.write(self.__silence_samples) # Play silence |
| 219 | + |
| 220 | + # FLUSH |
| 221 | + elif self.__state == WavPlayer.FLUSH: |
| 222 | + # Flush is used to allow the residual audio samples in the internal buffer to be written |
| 223 | + # to the I2S peripheral. This step avoids part of the sound file from being cut off |
| 224 | + if self.__flush_count > 0: |
| 225 | + self.__flush_count -= 1 |
| 226 | + else: |
| 227 | + self.__state = WavPlayer.STOP # Enter the stop state on the next callback |
| 228 | + self.__audio_out.write(self.__silence_samples) # Play silence |
| 229 | + |
| 230 | + # NONE |
| 231 | + elif self.__state == WavPlayer.NONE: |
| 232 | + pass |
| 233 | + |
| 234 | + @staticmethod |
| 235 | + def __parse_wav(wav_file): |
| 236 | + chunk_ID = wav_file.read(4) |
| 237 | + if chunk_ID != b"RIFF": |
| 238 | + raise ValueError("WAV chunk ID invalid") |
| 239 | + _ = wav_file.read(4) # chunk_size |
| 240 | + format = wav_file.read(4) |
| 241 | + if format != b"WAVE": |
| 242 | + raise ValueError("WAV format invalid") |
| 243 | + sub_chunk1_ID = wav_file.read(4) |
| 244 | + if sub_chunk1_ID != b"fmt ": |
| 245 | + raise ValueError("WAV sub chunk 1 ID invalid") |
| 246 | + _ = wav_file.read(4) # sub_chunk1_size |
| 247 | + _ = struct.unpack("<H", wav_file.read(2))[0] # audio_format |
| 248 | + num_channels = struct.unpack("<H", wav_file.read(2))[0] |
| 249 | + |
| 250 | + if num_channels == 1: |
| 251 | + format = I2S.MONO |
| 252 | + else: |
| 253 | + format = I2S.STEREO |
| 254 | + |
| 255 | + sample_rate = struct.unpack("<I", wav_file.read(4))[0] |
| 256 | + # if sample_rate != 44_100 and sample_rate != 48_000: |
| 257 | + # raise ValueError(f"WAV sample rate of {sample_rate} invalid. Only 44.1KHz or 48KHz audio are supported") |
| 258 | + |
| 259 | + _ = struct.unpack("<I", wav_file.read(4))[0] # byte_rate |
| 260 | + _ = struct.unpack("<H", wav_file.read(2))[0] # block_align |
| 261 | + bits_per_sample = struct.unpack("<H", wav_file.read(2))[0] |
| 262 | + |
| 263 | + # usually the sub chunk2 ID ("data") comes next, but |
| 264 | + # some online MP3->WAV converters add |
| 265 | + # binary data before "data". So, read a fairly large |
| 266 | + # block of bytes and search for "data". |
| 267 | + |
| 268 | + binary_block = wav_file.read(200) |
| 269 | + offset = binary_block.find(b"data") |
| 270 | + if offset == -1: |
| 271 | + raise ValueError("WAV sub chunk 2 ID not found") |
| 272 | + |
| 273 | + wav_file.seek(40) |
| 274 | + sub_chunk2_size = struct.unpack("<I", wav_file.read(4))[0] |
| 275 | + |
| 276 | + return (format, sample_rate, bits_per_sample, 44 + offset, sub_chunk2_size) |
0 commit comments