Skip to content

Commit 53c056d

Browse files
authored
Merge pull request #78 from Incomplete-Outputs-Lab/feat/export-linear-interpolation
feat: interpolated export aggregations
2 parents 9dff211 + e24e943 commit 53c056d

File tree

10 files changed

+341
-101
lines changed

10 files changed

+341
-101
lines changed

src-tauri/src/commands/channels.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,25 @@ pub async fn list_channels(
172172
Ok(channels_with_stats)
173173
}
174174

175+
/// 軽量版チャンネル一覧取得
176+
///
177+
/// - DBの `channels` テーブルのみを参照し、Twitch API にはアクセスしない
178+
/// - タイムラインなど「配信者リストだけ欲しい」画面で使用する
179+
#[tauri::command]
180+
pub async fn list_channels_basic(
181+
db_manager: State<'_, DatabaseManager>,
182+
) -> Result<Vec<Channel>, String> {
183+
let channels: Vec<Channel> = db_manager
184+
.with_connection(|conn| {
185+
ChannelRepository::list_all(conn)
186+
.db_context("list all channels (basic)")
187+
.map_err(|e| e.to_string())
188+
})
189+
.await?;
190+
191+
Ok(channels)
192+
}
193+
175194
/// チャンネル情報にTwitch API情報を統合
176195
async fn enrich_channels_with_twitch_info(
177196
channels: Vec<Channel>,

src-tauri/src/commands/export.rs

Lines changed: 89 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,35 @@
11
use crate::database::{repositories::StreamStatsRepository, DatabaseManager};
22
use crate::error::ResultExt;
3+
use chrono::{DateTime, FixedOffset, NaiveDateTime};
34
use serde::{Deserialize, Serialize};
45
use tauri::{AppHandle, State};
56

67
#[derive(Debug, Serialize, Deserialize)]
78
pub struct ExportQuery {
8-
pub channel_id: Option<i64>,
9+
pub channel_id: i64,
910
pub start_time: Option<String>,
1011
pub end_time: Option<String>,
1112
pub aggregation: Option<String>, // "raw", "1min", "5min", "1hour"
1213
pub delimiter: Option<String>, // Custom delimiter (default: comma)
1314
}
1415

16+
fn normalize_timestamp(value: &str) -> String {
17+
// 1) RFC3339 (元の文字列形式を想定)
18+
if let Ok(dt) = DateTime::parse_from_rfc3339(value) {
19+
return dt.to_rfc3339();
20+
}
21+
// 2) DuckDB の TIMESTAMP 表示形式を想定(秒以下あり/なし両対応)
22+
if let Ok(naive) = NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S%.f")
23+
.or_else(|_| NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S"))
24+
{
25+
if let Some(offset) = FixedOffset::east_opt(0) {
26+
return DateTime::<FixedOffset>::from_naive_utc_and_offset(naive, offset).to_rfc3339();
27+
}
28+
}
29+
// 3) どれにも当てはまらない場合は元の文字列を返す
30+
value.to_string()
31+
}
32+
1533
/// Helper function to escape field values for delimited output
1634
fn escape_field(value: &str, delimiter: &str) -> String {
1735
// Check if field needs escaping (contains delimiter, quotes, or newlines)
@@ -35,25 +53,47 @@ pub async fn export_to_delimited(
3553
file_path: String,
3654
include_bom: Option<bool>,
3755
) -> Result<String, String> {
56+
let ExportQuery {
57+
channel_id,
58+
start_time,
59+
end_time,
60+
aggregation,
61+
delimiter,
62+
} = query;
63+
3864
let stats = db_manager
3965
.with_connection(|conn| {
40-
StreamStatsRepository::get_stream_stats_filtered(
41-
conn,
42-
None,
43-
query.channel_id,
44-
query.start_time.as_deref(),
45-
query.end_time.as_deref(),
46-
true, // ORDER BY collected_at ASC for export
47-
)
48-
.db_context("query stats")
49-
.map_err(|e| e.to_string())
66+
let start_opt = start_time.as_deref();
67+
let end_opt = end_time.as_deref();
68+
69+
let interval_minutes = match aggregation.as_deref() {
70+
Some("1min") => Some(1),
71+
Some("5min") => Some(5),
72+
Some("1hour") => Some(60),
73+
_ => None,
74+
};
75+
76+
if let (Some(st), Some(et), Some(interval)) = (start_opt, end_opt, interval_minutes) {
77+
StreamStatsRepository::get_interpolated_stream_stats_for_export(
78+
conn, None, Some(channel_id), st, et, interval,
79+
)
80+
.db_context("query interpolated stats for export")
81+
.map_err(|e| e.to_string())
82+
} else {
83+
StreamStatsRepository::get_stream_stats_filtered(
84+
conn, None, Some(channel_id), start_opt, end_opt,
85+
true, // ORDER BY collected_at ASC for export
86+
)
87+
.db_context("query stats")
88+
.map_err(|e| e.to_string())
89+
}
5090
})
5191
.await?;
5292

5393
let stats_len = stats.len();
5494

5595
// Determine delimiter (default to comma)
56-
let delimiter = query.delimiter.as_deref().unwrap_or(",");
96+
let delimiter = delimiter.as_deref().unwrap_or(",");
5797

5898
// Build delimited file content
5999
let mut output = String::new();
@@ -71,7 +111,7 @@ pub async fn export_to_delimited(
71111

72112
// Data rows
73113
for stat in &stats {
74-
let collected_at = &stat.collected_at;
114+
let collected_at = normalize_timestamp(&stat.collected_at);
75115
let channel_name = stat.channel_name.as_deref().unwrap_or("");
76116
let viewer_count = stat.viewer_count.unwrap_or(0).to_string();
77117
let category = stat.category.as_deref().unwrap_or("");
@@ -83,7 +123,7 @@ pub async fn export_to_delimited(
83123

84124
output.push_str(&format!(
85125
"{}{}{}{}{}{}{}{}{}{}{}\n",
86-
escape_field(collected_at, delimiter),
126+
escape_field(&collected_at, delimiter),
87127
delimiter,
88128
escape_field(channel_name, delimiter),
89129
delimiter,
@@ -115,18 +155,40 @@ pub async fn preview_export_data(
115155
query: ExportQuery,
116156
max_rows: Option<usize>,
117157
) -> Result<String, String> {
158+
let ExportQuery {
159+
channel_id,
160+
start_time,
161+
end_time,
162+
aggregation,
163+
delimiter,
164+
} = query;
165+
118166
let stats = db_manager
119167
.with_connection(|conn| {
120-
StreamStatsRepository::get_stream_stats_filtered(
121-
conn,
122-
None,
123-
query.channel_id,
124-
query.start_time.as_deref(),
125-
query.end_time.as_deref(),
126-
true, // ORDER BY collected_at ASC
127-
)
128-
.db_context("query stats")
129-
.map_err(|e| e.to_string())
168+
let start_opt = start_time.as_deref();
169+
let end_opt = end_time.as_deref();
170+
171+
let interval_minutes = match aggregation.as_deref() {
172+
Some("1min") => Some(1),
173+
Some("5min") => Some(5),
174+
Some("1hour") => Some(60),
175+
_ => None,
176+
};
177+
178+
if let (Some(st), Some(et), Some(interval)) = (start_opt, end_opt, interval_minutes) {
179+
StreamStatsRepository::get_interpolated_stream_stats_for_export(
180+
conn, None, Some(channel_id), st, et, interval,
181+
)
182+
.db_context("query interpolated stats for preview")
183+
.map_err(|e| e.to_string())
184+
} else {
185+
StreamStatsRepository::get_stream_stats_filtered(
186+
conn, None, Some(channel_id), start_opt, end_opt,
187+
true, // ORDER BY collected_at ASC
188+
)
189+
.db_context("query stats")
190+
.map_err(|e| e.to_string())
191+
}
130192
})
131193
.await?;
132194

@@ -135,7 +197,7 @@ pub async fn preview_export_data(
135197
let preview_stats = stats.iter().take(max_rows);
136198

137199
// Determine delimiter (default to comma)
138-
let delimiter = query.delimiter.as_deref().unwrap_or(",");
200+
let delimiter = delimiter.as_deref().unwrap_or(",");
139201

140202
// Build preview content
141203
let mut output = String::new();
@@ -148,7 +210,7 @@ pub async fn preview_export_data(
148210

149211
// Data rows (limited to max_rows)
150212
for stat in preview_stats {
151-
let collected_at = &stat.collected_at;
213+
let collected_at = normalize_timestamp(&stat.collected_at);
152214
let channel_name = stat.channel_name.as_deref().unwrap_or("");
153215
let viewer_count = stat.viewer_count.unwrap_or(0).to_string();
154216
let category = stat.category.as_deref().unwrap_or("");
@@ -160,7 +222,7 @@ pub async fn preview_export_data(
160222

161223
output.push_str(&format!(
162224
"{}{}{}{}{}{}{}{}{}{}{}\n",
163-
escape_field(collected_at, delimiter),
225+
escape_field(&collected_at, delimiter),
164226
delimiter,
165227
escape_field(channel_name, delimiter),
166228
delimiter,

src-tauri/src/database/repositories/stream_stats_repository.rs

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::database::analytics::DailyStats;
66
use crate::database::models::StreamStats;
77
use crate::database::query_helpers::stream_stats_query;
88
use crate::database::utils;
9+
use chrono::{DateTime, Duration, FixedOffset, NaiveDateTime};
910
use duckdb::Connection;
1011
use serde::{Deserialize, Serialize};
1112

@@ -101,6 +102,173 @@ impl StreamStatsRepository {
101102
results.collect::<Result<Vec<_>, _>>()
102103
}
103104

105+
/// 指定した時間範囲と間隔で線形補完した統計データを取得(エクスポート用)
106+
///
107+
/// - 元データは get_stream_stats_filtered で取得
108+
/// - viewer_count / chat_rate_1min を線形補完
109+
/// - それ以外の文字列系は直前の値を採用
110+
pub fn get_interpolated_stream_stats_for_export(
111+
conn: &Connection,
112+
stream_id: Option<i64>,
113+
channel_id: Option<i64>,
114+
start_time: &str,
115+
end_time: &str,
116+
interval_minutes: i64,
117+
) -> Result<Vec<StreamStats>, duckdb::Error> {
118+
// ベースとなる生データを取得(昇順)
119+
let base_stats = Self::get_stream_stats_filtered(
120+
conn,
121+
stream_id,
122+
channel_id,
123+
Some(start_time),
124+
Some(end_time),
125+
true,
126+
)?;
127+
128+
if base_stats.is_empty() {
129+
return Ok(Vec::new());
130+
}
131+
132+
// 補完の対象範囲は「実際にデータが存在する最初と最後の時刻」に限定する
133+
let effective_start = &base_stats
134+
.first()
135+
.map(|s| s.collected_at.as_str())
136+
.unwrap_or(start_time);
137+
let effective_end = &base_stats
138+
.last()
139+
.map(|s| s.collected_at.as_str())
140+
.unwrap_or(end_time);
141+
142+
Ok(Self::interpolate_stats(
143+
&base_stats,
144+
effective_start,
145+
effective_end,
146+
interval_minutes,
147+
))
148+
}
149+
150+
/// 内部ヘルパー: 線形補完を行う
151+
fn interpolate_stats(
152+
stats: &[StreamStats],
153+
start_time: &str,
154+
end_time: &str,
155+
interval_minutes: i64,
156+
) -> Vec<StreamStats> {
157+
fn parse_ts(s: &str) -> Option<DateTime<FixedOffset>> {
158+
// 1) RFC3339 (元の文字列形式を想定)
159+
if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
160+
return Some(dt);
161+
}
162+
// 2) DuckDB の TIMESTAMP 表示形式を想定(秒以下あり/なし両対応)
163+
if let Ok(naive) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f")
164+
.or_else(|_| NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S"))
165+
{
166+
let offset = FixedOffset::east_opt(0)?;
167+
return Some(DateTime::from_naive_utc_and_offset(naive, offset));
168+
}
169+
None
170+
}
171+
172+
fn lerp_i32(a: Option<i32>, b: Option<i32>, frac: f64) -> Option<i32> {
173+
match (a, b) {
174+
(Some(av), Some(bv)) => {
175+
let v = av as f64 + (bv as f64 - av as f64) * frac;
176+
Some(v.round() as i32)
177+
}
178+
(Some(av), None) => Some(av),
179+
(None, Some(bv)) => Some(bv),
180+
(None, None) => None,
181+
}
182+
}
183+
fn lerp_i64(a: Option<i64>, b: Option<i64>, frac: f64) -> Option<i64> {
184+
match (a, b) {
185+
(Some(av), Some(bv)) => {
186+
let v = av as f64 + (bv as f64 - av as f64) * frac;
187+
Some(v.round() as i64)
188+
}
189+
(Some(av), None) => Some(av),
190+
(None, Some(bv)) => Some(bv),
191+
(None, None) => None,
192+
}
193+
}
194+
195+
// 元データのタイムスタンプをパース
196+
let mut times: Vec<DateTime<FixedOffset>> = Vec::with_capacity(stats.len());
197+
for s in stats {
198+
if let Some(dt) = parse_ts(&s.collected_at) {
199+
times.push(dt);
200+
} else {
201+
// パースできない場合は補完せずそのまま返す
202+
return stats.to_vec();
203+
}
204+
}
205+
206+
let n = stats.len();
207+
if n == 0 {
208+
return Vec::new();
209+
}
210+
211+
// 線形補完の範囲は start_time〜end_time(呼び出し元で first/last にクリップ済み)
212+
let start_dt = parse_ts(start_time).unwrap_or(times[0]);
213+
let end_dt = parse_ts(end_time).unwrap_or(*times.last().unwrap());
214+
215+
let step = Duration::minutes(interval_minutes.max(1));
216+
let mut grid: Vec<DateTime<FixedOffset>> = Vec::new();
217+
let mut t = start_dt;
218+
while t <= end_dt {
219+
grid.push(t);
220+
t += step;
221+
}
222+
223+
let mut result: Vec<StreamStats> = Vec::with_capacity(grid.len());
224+
225+
for t in grid {
226+
// 前後のサンプルを探す
227+
let mut j = 0usize;
228+
while j < n && times[j] < t {
229+
j += 1;
230+
}
231+
232+
let prev_idx = if j == 0 { 0 } else { j - 1 };
233+
let next_idx = if j >= n { n - 1 } else { j };
234+
235+
let prev_t = times[prev_idx];
236+
let next_t = times[next_idx];
237+
238+
let prev = &stats[prev_idx];
239+
let next = &stats[next_idx];
240+
241+
let frac = if next_t <= prev_t {
242+
0.0
243+
} else {
244+
let total = (next_t - prev_t).num_seconds() as f64;
245+
let elapsed = (t - prev_t).num_seconds() as f64;
246+
(elapsed / total).clamp(0.0, 1.0)
247+
};
248+
249+
let viewer = lerp_i32(prev.viewer_count, next.viewer_count, frac);
250+
let chat = lerp_i64(prev.chat_rate_1min, next.chat_rate_1min, frac);
251+
252+
let base = prev; // カテゴリ等は直前の値を使用
253+
254+
result.push(StreamStats {
255+
id: None,
256+
stream_id: base.stream_id,
257+
collected_at: t.to_rfc3339(),
258+
viewer_count: viewer,
259+
chat_rate_1min: chat,
260+
category: base.category.clone(),
261+
game_id: base.game_id.clone(),
262+
title: base.title.clone(),
263+
follower_count: base.follower_count,
264+
twitch_user_id: base.twitch_user_id.clone(),
265+
channel_name: base.channel_name.clone(),
266+
});
267+
}
268+
269+
result
270+
}
271+
104272
/// 自動発見された配信の統計データを挿入
105273
///
106274
/// stream_idがNULLの状態で、自動発見時の統計データを記録します。

0 commit comments

Comments
 (0)