@@ -182,47 +182,54 @@ The Portenta X8’s M4 microcontroller captures real time flow sensor data and o
182182#include <FlowSensor.h>
183183
184184#if defined(ARDUINO_PORTENTA_X8)
185- // Use Serial1 (UART0) for Portenta Max Carrier / Breakout Carrier
186- #define SerialDebug Serial1
185+ #define SerialDebug Serial1 // Use Serial1 for Portenta Carrier family
187186#else
188187#define SerialDebug Serial
189188#endif
190189
191190// Define Flow Sensor Type (Change if using another model)
192191#define SENSOR_TYPE YFS201
193- #define SENSOR_PIN D2 // Connect signal to pin D2 (interrupt pin)
192+ #define SENSOR_PIN PD_15 // Flow sensor signal pin
194193
195194FlowSensor flowSensor(SENSOR_TYPE, SENSOR_PIN);
196- unsigned long lastReadTime = 0; // Stores last measurement time
195+
196+ // Define 1Hz Sampling Frequency
197+ #define INTERVAL_MS 1000 // 1 sample per second
198+
199+ static unsigned long last_interval_ms = 0;
197200
198201// Interrupt function for counting pulses
199202void count() {
200- flowSensor.count();
203+ flowSensor.count();
201204}
202205
203206void setup() {
204- SerialDebug.begin(115200);
205- while (!SerialDebug); // Wait for Serial to be ready
207+ SerialDebug.begin(115200);
208+ while (!SerialDebug);
206209
207- // Initialize Flow Sensor
208- flowSensor.begin(count);
210+ pinMode(LED_BUILTIN, OUTPUT);
211+ flowSensor.begin(count);
209212
210- // Print CSV header for Edge Impulse Data Forwarder
211- SerialDebug.println("Timestamp (ms),Flow Rate (L/min)");
213+ SerialDebug.println("Setup complete. Streaming flow rate at 1 Hz");
212214}
213215
214216void loop() {
215- if (millis() - lastReadTime >= 1000) { // Read data every second
216- flowSensor.read(); // Get new reading
217- float flowRate = flowSensor.getFlowRate_m(); // Flow rate in L/min
218-
219- // Print CSV format: "Timestamp, Flow Rate"
220- SerialDebug.print(millis());
221- SerialDebug.print(",");
222- SerialDebug.println(flowRate, 2);
223-
224- lastReadTime = millis(); // Update last read time
225- }
217+ if (millis() - last_interval_ms >= INTERVAL_MS) {
218+ last_interval_ms = millis();
219+
220+ flowSensor.read();
221+ float flowRate = flowSensor.getFlowRate_m(); // Flow rate in L/min
222+
223+ // Avoid NaN or Inf issues
224+ if (isnan(flowRate) || isinf(flowRate)) {
225+ SerialDebug.println("0.00"); // Send 0 if no valid reading
226+ } else {
227+ SerialDebug.println(flowRate, 2); // Print flow rate
228+ }
229+
230+ // Blink LED to indicate data collection
231+ digitalWrite(LED_BUILTIN, !digitalRead(LED_BUILTIN));
232+ }
226233}
227234```
228235
@@ -240,6 +247,46 @@ Follow the prompts to select the correct serial port and map the flow rate varia
240247
241248After training and validating the model, export it as a Docker container for deployment.
242249
250+ ```
251+ networks:
252+ sensorfusion:
253+
254+ services:
255+ cad:
256+ image: arduino/python-sf:latest
257+ build: .
258+ restart: unless-stopped
259+ depends_on:
260+ - inference
261+ tty: true
262+ environment:
263+ M4_PROXY_HOST: m4proxy
264+ M4_PROXY_PORT: 5001
265+ extra_hosts:
266+ - "m4proxy:host-gateway"
267+ networks:
268+ sensorfusion:
269+ aliases:
270+ - collect-and-dispatch
271+ command: ["inference", "1337"]
272+
273+ inference:
274+ image: public.ecr.aws/g7a8t7v6/inference-container:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
275+ restart: unless-stopped
276+ ports:
277+ - 1337:1337
278+ networks:
279+ sensorfusion:
280+ aliases:
281+ - ei-inference
282+ command: [
283+ "--api-key", "ei_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
284+ "--run-http-server", "1337",
285+ "--force-target", "runner-linux-aarch64",
286+ "--model-variant", "int8"
287+ ]
288+ ```
289+
243290Once connected with the Portenta X8, upload the required files using ADB:
244291
245292``` bash
@@ -268,82 +315,159 @@ docker-compose logs -f -n 10
268315
269316### Classifying Flow Rate in Real-Time
270317
318+ ``` arduino
319+ #include <Arduino.h>
320+ #include <RPC.h>
321+ #include <SerialRPC.h>
322+ #include <FlowSensor.h>
323+
324+ // Define Flow Sensor Type and Pin
325+ #define SENSOR_TYPE YFS201
326+ #define SENSOR_PIN PD_15 // Flow sensor signal pin
327+
328+ FlowSensor flowSensor(SENSOR_TYPE, SENSOR_PIN);
329+
330+ // Interrupt function for counting pulses
331+ void count() {
332+ flowSensor.count();
333+ }
334+
335+ // Function to calculate and return flow rate (for RPC)
336+ float getFlowRate() {
337+ flowSensor.read();
338+ float flowRate = flowSensor.getFlowRate_m(); // Get flow rate in L/min
339+
340+ // Avoid NaN or Inf issues
341+ if (isnan(flowRate) || isinf(flowRate)) {
342+ return 0.0; // Default to 0 if no valid reading
343+ }
344+ return flowRate;
345+ }
346+
347+ void setup() {
348+ flowSensor.begin(count);
349+
350+ // Register the RPC function
351+ RPC.bind("flow_rate", getFlowRate);
352+ }
353+
354+ void loop() {
355+ // Nothing needed in loop, RPC handles function calls when requested
356+ }
357+ ```
358+
271359The ` main.py ` script reads flow sensor data, sends it to Edge Impulse for classification and forwards the results to the M4 microcontroller for further action.
272360
273361``` python
274- # !/usr/bin/env python3
275362import os
276363import time
277- import requests # type: ignore
278364import json
279365import argparse
280- from msgpackrpc import Address as RpcAddress, Client as RpcClient, error as RpcError # type: ignore
281-
282- # RPC Configuration for M4 Microcontroller
283- M4_PROXY_HOST = os.getenv(' M4_PROXY_HOST' , ' m4proxy' )
284- M4_PROXY_PORT = int (os.getenv(' M4_PROXY_PORT' , ' 5001' ))
285- m4_proxy_address = RpcAddress(M4_PROXY_HOST , M4_PROXY_PORT )
366+ from msgpackrpc import Address as RpcAddress, Client as RpcClient, error as RpcError
286367
287- # Flow Sensor Data Source (Serial Port)
288- SERIAL_PORT = os.getenv(" SERIAL_PORT" , " /dev/ttyUSB0" ) # Adjust if needed
289- BAUD_RATE = 115200
368+ # Retrieve M4 Proxy settings from environment variables (or use defaults)
369+ m4_proxy_host = os.getenv(" M4_PROXY_HOST" , " m4proxy" )
370+ m4_proxy_port = int (os.getenv(" M4_PROXY_PORT" , " 5001" ))
371+ m4_proxy_address = RpcAddress(m4_proxy_host, m4_proxy_port)
290372
291- # Inference Server Configuration
292- INFERENCE_HOST = os.getenv(" INFERENCE_HOST" , " inference" )
293- INFERENCE_PORT = int (os.getenv(" INFERENCE_PORT" , 1337 ))
294- INFERENCE_URL = f " http:// { INFERENCE_HOST } : { INFERENCE_PORT } /api/features "
373+ # Define the single sensor we are using
374+ sensors = (" flow_rate" ,) # Tuple with one element to keep extend() valid
295375
296- def read_flow_sensor ():
297- """ Reads flow rate from the flow sensor via serial. """
298- import serial
376+ def get_sensors_data_from_m4 ():
377+ """
378+ Get flow sensor data from the M4 via RPC (MessagePack-RPC).
379+ The Arduino sketch on the M4 must implement the "flow_rate" method.
380+ """
299381 try :
300- with serial.Serial(SERIAL_PORT , BAUD_RATE , timeout = 1 ) as ser:
301- line = ser.readline().decode(" utf-8" ).strip()
302- if " ," in line:
303- _, flow_rate = line.split(" ," )
304- return float (flow_rate)
305- except Exception as e:
306- print (f " Error reading flow sensor: { e} " )
307- return None
308-
309- def classify_flow_rate (flow_rate ):
310- """ Sends flow rate data to Edge Impulse and retrieves the classification result. """
311- data = {" features" : [flow_rate]}
312- try :
313- response = requests.post(INFERENCE_URL , json = data)
314- if response.status_code == 200 :
315- result = response.json().get(" result" , {}).get(" classification" , {})
316- if result:
317- label = max (result, key = result.get)
318- confidence = result[label]
319- print (f " Classified as: { label} ( { confidence:.2f } ) " )
320- return label, confidence
321- except requests.ConnectionError:
322- print (" Error: Unable to reach Edge Impulse inference server." )
323- return None , None
324-
325- def send_classification_to_m4 (label , confidence ):
326- """ Sends classification results to M4 via RPC. """
327- try :
328- client = RpcClient(m4_proxy_address)
329- response = client.call(" classification" , json.dumps({" label" : label, " value" : confidence}))
330- print (f " Sent to M4: { label} ( { confidence:.2f } ), Response: { response} " )
382+ get_value = lambda value : RpcClient(m4_proxy_address).call(value) # Ensure this returns a value
383+ data = [get_value(sensor) for sensor in sensors] # Ensure it's a list
384+
385+ print (f " Sensor Data: { data} " ) # Debug output
386+ return data
387+
331388 except RpcError.TimeoutError:
332- print (" Error: RPC Timeout while sending classification." )
389+ print (" Unable to retrieve sensor data from the M4: RPC Timeout" )
390+ return [] # Ensure an empty list is returned instead of `None`
391+
392+ def get_sensors_and_classify (host , port ):
393+ """
394+ Collect sensor data and send it for classification to Edge Impulse.
395+ """
396+ url = f " http:// { host} : { port} /api/features "
397+
398+ while True :
399+ print (" Collecting 400 features from sensors... " , end = " " )
400+
401+ data = {
402+ " features" : [],
403+ " model_type" : " int8" # Force quantized inference mode
404+ }
405+ start = time.time()
406+
407+ for _ in range (100 ): # Collect data in chunks
408+ sensor_values = get_sensors_data_from_m4()
409+
410+ if not isinstance (sensor_values, list ): # Validate that we get a list
411+ print (f " Error: Expected list but got { type (sensor_values)} with value { sensor_values} " )
412+ sensor_values = [] # Default to an empty list
413+
414+ data[" features" ].extend(sensor_values) # Avoid TypeError
415+
416+ time.sleep(100e-6 ) # Small delay to match sampling rate
417+
418+ stop = time.time()
419+ print (f " Done in { stop - start:.2f } seconds. " )
420+
421+ try :
422+ response = requests.post(url, json = data)
423+ except ConnectionError :
424+ print (" Connection Error: retrying later" )
425+ time.sleep(5 )
426+ continue
427+
428+ # Check the response
429+ if response.status_code != 200 :
430+ print (f " Failed to submit features. Status Code: { response.status_code} " )
431+ continue
432+
433+ print (" Successfully submitted features." )
434+
435+ # Process the JSON response to extract classification results
436+ response_data = response.json()
437+ classification = response_data.get(" result" , {}).get(" classification" , {})
438+
439+ print (f " Classification: { classification} " )
440+
441+ if classification:
442+ label = max (classification, key = classification.get)
443+ value = classification[label]
444+
445+ print (f " { label} : { value} " )
446+
447+ request_data = json.dumps({" label" : label, " value" : value})
448+
449+ try :
450+ client = RpcClient(m4_proxy_address)
451+ result = client.call(" classification" , request_data)
452+ print (f " Sent to { m4_proxy_host} : { m4_proxy_port} : { request_data} . Result: { result} " )
453+ except RpcError.TimeoutError:
454+ print (" Unable to send classification data to M4: RPC Timeout." )
455+ else :
456+ print (" No classification found." )
333457
334458if __name__ == " __main__" :
335- parser = argparse.ArgumentParser(description = " Classify flow rate using Edge Impulse" )
459+ parser = argparse.ArgumentParser(description = " Get flow sensor data and send it to inference container for classification" )
460+ parser.add_argument(" host" , help = " The hostname or IP address of the inference server" )
461+ parser.add_argument(" port" , type = int , help = " The port number of the inference server" )
462+
336463 args = parser.parse_args()
337464
338- print (" Starting Flow Sensor Classification ... Press Ctrl+C to stop." )
465+ print (" Classifying Flow Sensor Data with AI ... Press Ctrl+C to stop." )
339466
340- while True :
341- flow_rate = read_flow_sensor()
342- if flow_rate is not None :
343- label, confidence = classify_flow_rate(flow_rate)
344- if label:
345- send_classification_to_m4(label, confidence)
346- time.sleep(1 ) # Adjust sampling rate if needed
467+ try :
468+ get_sensors_and_classify(args.host, args.port)
469+ except KeyboardInterrupt :
470+ print (" Exiting gracefully..." )
347471```
348472
349473## Additional Resources
0 commit comments