Skip to content

Commit e46c4ca

Browse files
committed
added eeg import
1 parent 68f10bc commit e46c4ca

File tree

2 files changed

+84
-0
lines changed

2 files changed

+84
-0
lines changed

backend/api-server/src/main.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use axum::{
55
routing::{get, post},
66
Json,
77
Router,
8+
body::Bytes,
89
};
910
use serde::{Deserialize, Serialize};
1011
use serde_json::{json, Value};
@@ -279,6 +280,19 @@ async fn export_eeg_data(
279280

280281
}
281282

283+
// Handler for POST /api/sessions/{session_id}/eeg_data/import
284+
async fn import_eeg_data(
285+
State(app_state): State<AppState>,
286+
Path(session_id): Path<i32>,
287+
// we expect the CSV data to be sent as raw text in the body of the request
288+
body: Bytes,
289+
) -> Result<Json<Value>, (StatusCode, String)> {
290+
shared_logic::db::import_eeg_data_from_csv(&app_state.db_client, session_id, &body)
291+
.await
292+
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to import EEG data: {}", e)))?;
293+
294+
Ok(Json(json!({"status": "success"})))
295+
}
282296

283297
async fn run_python_script_handler() -> Result<Json<Value>, (StatusCode, String)> {
284298
info!("Received request to run Python script.");
@@ -371,6 +385,7 @@ async fn main() {
371385
.route("/api/sessions/:session_id/frontend-state", get(get_frontend_state))
372386

373387
.route("/api/sessions/:session_id/eeg_data/export", post(export_eeg_data))
388+
.route("/api/sessions/:session_id/eeg_data/import", post(import_eeg_data))
374389

375390
// Share application state with all handlers
376391
.with_state(app_state);

backend/shared-logic/src/db.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,75 @@ pub async fn export_eeg_data_as_csv(client: &DbClient, session_id: i32, start_ti
413413
Ok(csv_data)
414414
}
415415

416+
/// Import EEG data from a CSV byte stream for a given session ID. The CSV is expected
417+
/// to have columns: "time", "channel1", "channel2", "channel3", "channel4".
418+
///
419+
/// Returns Ok(()) on success.
420+
pub async fn import_eeg_data_from_csv(client: &DbClient, session_id: i32, csv_bytes: &[u8]) -> Result<(), Error> {
421+
info!("Importing EEG data for session id {} from CSV", session_id);
422+
423+
// we use the csv crate to read the CSV data, converting them to the struct we made for CSV rows
424+
let mut reader = csv::ReaderBuilder::new()
425+
.has_headers(true) // we expect the CSV to have headers, should probably make this clear somewhere
426+
.from_reader(csv_bytes);
427+
428+
// set up our vectors to hold the parsed EEG data rows, so we can batch insert them later
429+
let mut timestamps: Vec<DateTime<Utc>> = Vec::new();
430+
let mut channel1_data: Vec<f64> = Vec::new();
431+
let mut channel2_data: Vec<f64> = Vec::new();
432+
let mut channel3_data: Vec<f64> = Vec::new();
433+
let mut channel4_data: Vec<f64> = Vec::new();
434+
435+
436+
// we iterate through the CSV records, parsing each row and converting it to the format we need for insertion
437+
for result in reader.records() {
438+
// unwrap the record, if there's an error we return it
439+
let record = result.map_err(|e| Error::Protocol(e.to_string()))?;
440+
441+
// now we parse the fields, converting time to DateTime<Utc> and channels to i32
442+
let time_str = record.get(0).ok_or_else(|| Error::Protocol("Missing time field".to_string()))?;
443+
444+
// we assume the time is in RFC3339 format
445+
let time = DateTime::parse_from_rfc3339(time_str)
446+
.map_err(|e| Error::Protocol(format!("Invalid time format: {}", e)))?
447+
.with_timezone(&Utc);
448+
449+
let channel1 = record.get(1)
450+
.ok_or_else(|| Error::Protocol("Missing channel1 field".to_string()))?
451+
.parse::<f64>()
452+
.map_err(|e| Error::Protocol(format!("Invalid channel1 value: {}", e)))?;
453+
let channel2 = record.get(2)
454+
.ok_or_else(|| Error::Protocol("Missing channel2 field".to_string()))?
455+
.parse::<f64>()
456+
.map_err(|e| Error::Protocol(format!("Invalid channel2 value: {}", e)))?;
457+
let channel3 = record.get(3)
458+
.ok_or_else(|| Error::Protocol("Missing channel3 field".to_string()))?
459+
.parse::<f64>()
460+
.map_err(|e| Error::Protocol(format!("Invalid channel3 value: {}", e)))?;
461+
let channel4 = record.get(4)
462+
.ok_or_else(|| Error::Protocol("Missing channel4 field".to_string()))?
463+
.parse::<f64>()
464+
.map_err(|e| Error::Protocol(format!("Invalid channel4 value: {}", e)))?;
465+
466+
// now we construct the tuple and add it to our vectors
467+
timestamps.push(time);
468+
channel1_data.push(channel1);
469+
channel2_data.push(channel2);
470+
channel3_data.push(channel3);
471+
channel4_data.push(channel4);
472+
}
473+
474+
// now we use our existing batch insert function to insert the data into the database
475+
let eeg_rows = EEGDataPacket {
476+
timestamps,
477+
signals: vec![channel1_data, channel2_data, channel3_data, channel4_data],
478+
};
479+
480+
insert_batch_eeg(client, session_id, &eeg_rows).await?;
481+
482+
Ok(())
483+
}
484+
416485
/// Helper function for eeg data to find the earliest timestamp for a given session
417486
///
418487
/// Returns the earliest timestamp on success.

0 commit comments

Comments
 (0)