diff --git a/Cargo.lock b/Cargo.lock index 7442fea11..daf40891d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3148,6 +3148,7 @@ dependencies = [ "byteorder", "bytes", "chrono", + "chrono-tz", "context", "criterion", "ctor 0.1.26", @@ -3164,6 +3165,9 @@ dependencies = [ "protocol", "rand", "rt", + "serde", + "serde_derive", + "serde_json", "sharding", "stream", "tokio", @@ -3183,6 +3187,7 @@ dependencies = [ "redis", "reqwest", "serde", + "serde_json", "url", ] diff --git a/endpoint/src/kv/kvtime.rs b/endpoint/src/kv/kvtime.rs index 1cb681079..4afeae069 100644 --- a/endpoint/src/kv/kvtime.rs +++ b/endpoint/src/kv/kvtime.rs @@ -1,8 +1,9 @@ -use super::{strategy::Postfix, uuid::*}; +use super::uuid::*; use chrono::{Datelike, NaiveDate}; use core::fmt::Write; use ds::RingSlice; use protocol::kv::Strategy; +use protocol::vector::Postfix; use sharding::hash::Hash; use sharding::{distribution::DBRange, hash::Hasher}; diff --git a/endpoint/src/kv/strategy.rs b/endpoint/src/kv/strategy.rs index db6ed0c23..a5f2031b4 100644 --- a/endpoint/src/kv/strategy.rs +++ b/endpoint/src/kv/strategy.rs @@ -5,25 +5,10 @@ use super::kvtime::KVTime; use ds::RingSlice; use protocol::kv::Strategy; +use protocol::vector::Postfix; use sharding::distribution::DBRange; use sharding::hash::Hasher; -#[derive(Debug, Clone)] -pub enum Postfix { - YYMM, - YYMMDD, - INDEX, -} - -impl Into for &str { - fn into(self) -> Postfix { - match self.to_lowercase().as_str() { - "yymm" => Postfix::YYMM, - _ => Postfix::YYMMDD, - } - } -} - #[derive(Debug, Clone)] pub enum Strategist { KVTime(KVTime), @@ -75,13 +60,14 @@ impl Default for Strategist { } impl Strategist { - pub fn try_from(ns: &KvNamespace) -> Self { - Self::KVTime(KVTime::new( + pub fn try_from(ns: &KvNamespace) -> Option { + let table_postfix = ns.basic.table_postfix.as_str().try_into().ok()?; + Some(Self::KVTime(KVTime::new( ns.basic.db_name.clone(), ns.basic.db_count, //此策略默认所有年都有同样的shard,basic也只配置了一项,也暗示了这个默认 ns.backends.iter().next().unwrap().1.len() as u32, - ns.basic.table_postfix.as_str().into(), - )) + table_postfix, + ))) } } diff --git a/endpoint/src/kv/topo.rs b/endpoint/src/kv/topo.rs index 50d9ac604..50ef7da7e 100644 --- a/endpoint/src/kv/topo.rs +++ b/endpoint/src/kv/topo.rs @@ -134,7 +134,7 @@ where (ctx.idx as usize, &shard.master) } }; - ctx.idx = idx as u16; + ctx.idx = idx as u8; ctx.runs += 1; // 只重试一次,重试次数过多,可能会导致雪崩。如果不重试,现在的配额策略在当前副本也只会连续发送四次请求,问题也不大 let try_next = ctx.runs == 1; @@ -163,7 +163,10 @@ where } fn update(&mut self, namespace: &str, cfg: &str) { if let Some(ns) = KvNamespace::try_from(cfg) { - self.strategist = Strategist::try_from(&ns); + let Some(strategist) = Strategist::try_from(&ns) else { + return; + }; + self.strategist = strategist; self.cfg.update(namespace, ns); } } diff --git a/endpoint/src/vector/batch.rs b/endpoint/src/vector/batch.rs index e963326b1..d27afdb4a 100644 --- a/endpoint/src/vector/batch.rs +++ b/endpoint/src/vector/batch.rs @@ -1,13 +1,14 @@ -use super::strategy::Postfix; use chrono::{Datelike, NaiveDate}; -use chrono_tz::Tz; use core::fmt::Write; use ds::RingSlice; -use protocol::Error; +use protocol::{ + vector::{CommandType, KeysType, Postfix}, + Error, DATE_YYMM, DATE_YYMMDD, +}; use sharding::{distribution::DBRange, hash::Hasher}; #[derive(Clone, Debug)] -pub struct Batch { +pub struct Aggregation { db_prefix: String, table_prefix: String, table_postfix: Postfix, @@ -18,7 +19,7 @@ pub struct Batch { si: Si, } -impl Batch { +impl Aggregation { pub fn new_with_db( db_prefix: String, table_prefix: String, @@ -63,11 +64,6 @@ impl Batch { &self.hasher } - pub fn get_date(&self, _: &[RingSlice]) -> Result { - let now = chrono::Utc::now().with_timezone(&Tz::Asia__Shanghai); - Ok(NaiveDate::from_ymd_opt(now.year(), now.month(), now.day()).unwrap()) - } - pub fn write_dname_with_hash(&self, buf: &mut impl Write, hash: i64) { let db_idx: usize = self.distribution.db_idx(hash); let _ = write!(buf, "{}_{}", self.db_prefix, db_idx); @@ -101,14 +97,44 @@ impl Batch { self.si.write_database_table(buf, hash) } - pub(crate) fn condition_keys(&self) -> Box> + '_> { - Box::new(self.keys_name.iter().map(|x| Some(x))) - } - pub(crate) fn keys(&self) -> &[String] { &self.keys_name } + pub fn get_date(&self, cmd: CommandType, keys: &[RingSlice]) -> Result { + match cmd { + CommandType::VRange | CommandType::VCard | CommandType::VRangeSi => { + Ok(NaiveDate::default()) + } + //相比vrange多了一个日期key + _ => { + let date = keys.last().unwrap(); + let ymd = match self.keys_name.last().unwrap().as_str() { + DATE_YYMM => ( + date.try_str_num(0..0 + 2) + .ok_or(Error::RequestProtocolInvalid)? as u16 + + 2000, + date.try_str_num(2..2 + 2) + .ok_or(Error::RequestProtocolInvalid)? as u16, + 1, + ), + DATE_YYMMDD => ( + date.try_str_num(0..0 + 2) + .ok_or(Error::RequestProtocolInvalid)? as u16 + + 2000, + date.try_str_num(2..2 + 2) + .ok_or(Error::RequestProtocolInvalid)? as u16, + date.try_str_num(4..4 + 2) + .ok_or(Error::RequestProtocolInvalid)? as u16, + ), + _ => (0, 0, 0), + }; + NaiveDate::from_ymd_opt(ymd.0.into(), ymd.1.into(), ymd.2.into()) + .ok_or(Error::RequestProtocolInvalid) + } + } + } + // pub(crate) fn get_next_date(&self, year: u16, month: u8) -> NaiveDate { // if month == 1 { // return NaiveDate::from_ymd_opt((year - 1).into(), 12, 1).unwrap(); @@ -124,6 +150,51 @@ impl Batch { pub(crate) fn si_cols(&self) -> &[String] { &self.si_cols } + + pub(crate) fn keys_with_type(&self) -> Box + '_> { + Box::new( + self.keys_name + .iter() + .map(|key_name| match key_name.as_str() { + DATE_YYMM | DATE_YYMMDD => KeysType::Time, + &_ => KeysType::Keys(key_name), + }), + ) + } + + //校验动态信息,后续有重复校验的后面延迟校验 + //上行、xx.timeline请求多带一个日期key,日期key固定在最后 + pub(crate) fn check_vector_cmd( + &self, + vcmd: &protocol::vector::VectorCmd, + ) -> protocol::Result<()> { + match vcmd.cmd { + CommandType::VRange | CommandType::VCard | CommandType::VRangeSi => { + if vcmd.keys.len() != self.keys().len() - 1 { + return Err(Error::RequestProtocolInvalid); + } + Ok(()) + } + //相比vrange多了一个日期key + CommandType::VGet + | CommandType::VAdd + | CommandType::VDel + | CommandType::VUpdate + | CommandType::VAddTimeline + | CommandType::VDelTimeline + | CommandType::VRangeTimeline + | CommandType::VUpdateTimeline + | CommandType::VAddSi + | CommandType::VUpdateSi + | CommandType::VDelSi => { + if vcmd.keys.len() != self.keys().len() { + return Err(Error::RequestProtocolInvalid); + } + Ok(()) + } + CommandType::Unknown => panic!("not sup {:?}", vcmd.cmd), + } + } } #[derive(Clone, Debug)] diff --git a/endpoint/src/vector/strategy.rs b/endpoint/src/vector/strategy.rs index acc0deebf..6ccb8756e 100644 --- a/endpoint/src/vector/strategy.rs +++ b/endpoint/src/vector/strategy.rs @@ -1,20 +1,20 @@ use std::fmt::Write; -pub use crate::kv::strategy::Postfix; use chrono::NaiveDate; use ds::RingSlice; +use protocol::vector::{CommandType, KeysType, Postfix}; use protocol::Result; use sharding::distribution::DBRange; use sharding::hash::Hasher; -use super::batch::Batch; +use super::batch::Aggregation; use super::config::VectorNamespace; use super::vectortime::VectorTime; #[derive(Debug, Clone)] pub enum Strategist { VectorTime(VectorTime), - Batch(Batch), + Aggregation(Aggregation), } impl Default for Strategist { @@ -31,26 +31,29 @@ impl Default for Strategist { } } -//vector的Strategy用来确定以下几点: -//3. 如何从keys中计算hash和year -//1. 数据库表名的格式如 table_yymm -//2. 库名表名后缀如何计算 +/// vector的Strategy用来确定以下几点: +/// 1. 如何从keys中计算hash和year +/// 2. 数据库表名的格式如 table_yymm +/// 3. 库名表名后缀如何计算 +// 约定:对于aggregation策略,必须得有timeline、si,其他策略目前只能有主库表; impl Strategist { pub fn try_from(ns: &VectorNamespace) -> Option { Some(match ns.basic.strategy.as_str() { "aggregation" => { - //至少需要date和count两个字段名 - if ns.basic.si_cols.len() < 2 || ns.basic.keys.len() != 1 { - log::warn!("len si_cols < 2 or len keys != 1"); + //至少需要date和count两个字段名,keys至少需要id+time + if ns.basic.si_cols.len() < 2 || ns.basic.keys.len() < 2 { + log::warn!("len si_cols < 2 or len keys < 2"); return None; } - Self::Batch(Batch::new_with_db( + //最后一个key需要是日期 + let _: Postfix = ns.basic.keys.last().unwrap().as_str().try_into().ok()?; + Self::Aggregation(Aggregation::new_with_db( ns.basic.db_name.clone(), ns.basic.table_name.clone(), ns.basic.db_count, //此策略默认所有年都有同样的shard,basic也只配置了一项,也暗示了这个默认 ns.backends.iter().next().unwrap().1.len() as u32, - ns.basic.table_postfix.as_str().into(), + ns.basic.table_postfix.as_str().try_into().ok()?, ns.basic.keys.clone(), ns.basic.si_cols.clone(), ns.basic.si_db_name.clone(), @@ -60,54 +63,78 @@ impl Strategist { ns.si_backends.len() as u32, )) } - _ => Self::VectorTime(VectorTime::new_with_db( - ns.basic.db_name.clone(), - ns.basic.table_name.clone(), - ns.basic.db_count, - //此策略默认所有年都有同样的shard,basic也只配置了一项,也暗示了这个默认 - ns.backends.iter().next().unwrap().1.len() as u32, - ns.basic.table_postfix.as_str().into(), - ns.basic.keys.clone(), - )), + _ => { + if ns.basic.keys.len() < 2 { + log::warn!("len keys < 2"); + return None; + } + //最后一个key需要是日期 + let _: Postfix = ns.basic.keys.last().unwrap().as_str().try_into().ok()?; + Self::VectorTime(VectorTime::new_with_db( + ns.basic.db_name.clone(), + ns.basic.table_name.clone(), + ns.basic.db_count, + //此策略默认所有年都有同样的shard,basic也只配置了一项,也暗示了这个默认 + ns.backends.iter().next().unwrap().1.len() as u32, + ns.basic.table_postfix.as_str().try_into().ok()?, + ns.basic.keys.clone(), + )) + } }) } #[inline] pub fn distribution(&self) -> &DBRange { match self { Strategist::VectorTime(inner) => inner.distribution(), - Strategist::Batch(inner) => inner.distribution(), + Strategist::Aggregation(inner) => inner.distribution(), } } #[inline] pub fn si_distribution(&self) -> &DBRange { match self { Strategist::VectorTime(_) => panic!("not support"), - Strategist::Batch(inner) => inner.si_distribution(), + Strategist::Aggregation(inner) => inner.si_distribution(), } } #[inline] pub fn hasher(&self) -> &Hasher { match self { Strategist::VectorTime(inner) => inner.hasher(), - Strategist::Batch(inner) => inner.hasher(), + Strategist::Aggregation(inner) => inner.hasher(), } } #[inline] - pub fn get_date(&self, keys: &[RingSlice]) -> Result { + pub fn get_date(&self, cmd: CommandType, keys: &[RingSlice]) -> Result { match self { Strategist::VectorTime(inner) => inner.get_date(keys), - Strategist::Batch(inner) => inner.get_date(keys), + Strategist::Aggregation(inner) => inner.get_date(cmd, keys), } } // 请求成功后,是否有更多的数据需要请求 #[inline] - pub fn more(&self) -> bool { + pub fn aggregation(&self) -> bool { match self { Strategist::VectorTime(_) => false, - Strategist::Batch(_) => true, + Strategist::Aggregation(_) => true, + } + } + +pub(crate) fn check_vector_cmd(&self, vcmd: &protocol::vector::VectorCmd) -> Result<()> { + match self { + Strategist::VectorTime(inner) => inner.check_vector_cmd(vcmd), + Strategist::Aggregation(inner) => inner.check_vector_cmd(vcmd), } } + // /// 获得配置的默认route;当配置strategy为aggregation时,默认的route是Aggregation,否则就是Main + // #[inline] + // pub(crate) fn config_aggregation(&self) -> bool { + // match self { + // Strategist::Batch(_) => true, + // _ => false, + // } + // } + // pub(crate) fn get_next_date(&self, year: u16, month: u8) -> NaiveDate { // match self { // Strategist::VectorTime(_) => panic!("VectorTime not support get_next_date"), @@ -120,37 +147,37 @@ impl protocol::vector::Strategy for Strategist { fn keys(&self) -> &[String] { match self { Strategist::VectorTime(inner) => inner.keys(), - Strategist::Batch(inner) => inner.keys(), + Strategist::Aggregation(inner) => inner.keys(), } } - fn condition_keys(&self) -> Box> + '_> { + fn keys_with_type(&self) -> Box + '_> { match self { - Strategist::VectorTime(inner) => inner.condition_keys(), - Strategist::Batch(inner) => inner.condition_keys(), + Strategist::VectorTime(inner) => inner.keys_with_type(), + Strategist::Aggregation(inner) => inner.keys_with_type(), } } fn write_database_table(&self, buf: &mut impl Write, date: &NaiveDate, hash: i64) { match self { Strategist::VectorTime(inner) => inner.write_database_table(buf, date, hash), - Strategist::Batch(inner) => inner.write_database_table(buf, date, hash), + Strategist::Aggregation(inner) => inner.write_database_table(buf, date, hash), } } fn write_si_database_table(&self, buf: &mut impl Write, hash: i64) { match self { Strategist::VectorTime(_) => panic!("not support"), - Strategist::Batch(inner) => inner.write_si_database_table(buf, hash), + Strategist::Aggregation(inner) => inner.write_si_database_table(buf, hash), } } fn batch(&self, limit: u64, vcmd: &protocol::vector::VectorCmd) -> u64 { match self { Strategist::VectorTime(_) => 0, - Strategist::Batch(inner) => inner.batch(limit, vcmd), + Strategist::Aggregation(inner) => inner.batch(limit, vcmd), } } fn si_cols(&self) -> &[String] { match self { Strategist::VectorTime(_) => panic!("not support"), - Strategist::Batch(inner) => inner.si_cols(), + Strategist::Aggregation(inner) => inner.si_cols(), } } } @@ -159,6 +186,7 @@ impl protocol::vector::Strategy for Strategist { mod tests { use std::collections::HashMap; + use attachment::Route; use protocol::{ kv::VectorSqlBuilder, vector::{mysql::*, *}, @@ -194,9 +222,9 @@ mod tests { timeout_ms_slave: Default::default(), db_name: "db_name".into(), table_name: "table_name".into(), - table_postfix: "yymm".into(), + table_postfix: DATE_YYMM.into(), db_count: 32, - keys: vec!["kid".into(), "yymm".into()], + keys: vec!["kid".into(), DATE_YYMM.into()], strategy: Default::default(), password: Default::default(), user: Default::default(), @@ -225,6 +253,7 @@ mod tests { // vrange let vector_cmd = VectorCmd { cmd: CommandType::VRange, + route: Some(Route::TimelineOrMain), keys: vec![ RingSlice::from_slice("id".as_bytes()), RingSlice::from_slice("2105".as_bytes()), @@ -242,7 +271,7 @@ mod tests { let date = NaiveDate::from_ymd_opt(2021, 5, 1).unwrap(); let builder = SqlBuilder::new(&vector_cmd, hash, date, &strategy, Default::default()).unwrap(); - builder.write_sql(buf); + let _ = builder.write_sql(buf); println!("len: {}, act len: {}", builder.len(), buf.len()); let db_idx = strategy.distribution().db_idx(hash); assert_eq!( @@ -253,6 +282,7 @@ mod tests { // vrange 无field let vector_cmd = VectorCmd { cmd: CommandType::VRange, + route: Some(Route::TimelineOrMain), keys: vec![ RingSlice::from_slice("id".as_bytes()), RingSlice::from_slice("2105".as_bytes()), @@ -268,7 +298,7 @@ mod tests { let builder = SqlBuilder::new(&vector_cmd, hash, date, &strategy, Default::default()).unwrap(); buf.clear(); - builder.write_sql(buf); + let _ = builder.write_sql(buf); println!("len: {}, act len: {}", builder.len(), buf.len()); let db_idx = strategy.distribution().db_idx(hash); assert_eq!( @@ -279,6 +309,7 @@ mod tests { // 复杂vrange let vector_cmd = VectorCmd { cmd: CommandType::VRange, + route: Some(Route::TimelineOrMain), keys: vec![ RingSlice::from_slice("id".as_bytes()), RingSlice::from_slice("2105".as_bytes()), @@ -316,7 +347,7 @@ mod tests { let builder = SqlBuilder::new(&vector_cmd, hash, date, &strategy, Default::default()).unwrap(); buf.clear(); - builder.write_sql(buf); + let _ = builder.write_sql(buf); println!("len: {}, act len: {}", builder.len(), buf.len()); let db_idx = strategy.distribution().db_idx(hash); assert_eq!( @@ -327,6 +358,7 @@ mod tests { // vcard let vector_cmd = VectorCmd { cmd: CommandType::VCard, + route: Some(Route::TimelineOrMain), keys: vec![ RingSlice::from_slice("id".as_bytes()), RingSlice::from_slice("2105".as_bytes()), @@ -359,7 +391,7 @@ mod tests { let builder = SqlBuilder::new(&vector_cmd, hash, date, &strategy, Default::default()).unwrap(); buf.clear(); - builder.write_sql(buf); + let _ = builder.write_sql(buf); println!("len: {}, act len: {}", builder.len(), buf.len()); let db_idx = strategy.distribution().db_idx(hash); assert_eq!( @@ -370,6 +402,7 @@ mod tests { //vadd let vector_cmd = VectorCmd { cmd: CommandType::VAdd, + route: Some(Route::TimelineOrMain), keys: vec![ RingSlice::from_slice("id".as_bytes()), RingSlice::from_slice("2105".as_bytes()), @@ -400,7 +433,7 @@ mod tests { let builder = SqlBuilder::new(&vector_cmd, hash, date, &strategy, Default::default()).unwrap(); buf.clear(); - builder.write_sql(buf); + let _ = builder.write_sql(buf); println!("len: {}, act len: {}", builder.len(), buf.len()); let db_idx = strategy.distribution().db_idx(hash); assert_eq!( @@ -413,6 +446,7 @@ mod tests { //vupdate let vector_cmd = VectorCmd { cmd: CommandType::VUpdate, + route: Some(Route::TimelineOrMain), keys: vec![ RingSlice::from_slice("id".as_bytes()), RingSlice::from_slice("2105".as_bytes()), @@ -454,7 +488,7 @@ mod tests { let builder = SqlBuilder::new(&vector_cmd, hash, date, &strategy, Default::default()).unwrap(); buf.clear(); - builder.write_sql(buf); + let _ = builder.write_sql(buf); println!("len: {}, act len: {}", builder.len(), buf.len()); let db_idx = strategy.distribution().db_idx(hash); assert_eq!( @@ -465,6 +499,7 @@ mod tests { //vdel let vector_cmd = VectorCmd { cmd: CommandType::VDel, + route: Some(Route::TimelineOrMain), keys: vec![ RingSlice::from_slice("id".as_bytes()), RingSlice::from_slice("2105".as_bytes()), @@ -497,7 +532,7 @@ mod tests { let builder = SqlBuilder::new(&vector_cmd, hash, date, &strategy, Default::default()).unwrap(); buf.clear(); - builder.write_sql(buf); + let _ = builder.write_sql(buf); println!("len: {}, act len: {}", builder.len(), buf.len()); let db_idx = strategy.distribution().db_idx(hash); assert_eq!( @@ -508,6 +543,7 @@ mod tests { // vget let vector_cmd = VectorCmd { cmd: CommandType::VGet, + route: Some(Route::TimelineOrMain), keys: vec![ RingSlice::from_slice("id".as_bytes()), RingSlice::from_slice("2105".as_bytes()), @@ -526,7 +562,7 @@ mod tests { let builder = SqlBuilder::new(&vector_cmd, hash, date, &strategy, Default::default()).unwrap(); buf.clear(); - builder.write_sql(buf); + let _ = builder.write_sql(buf); println!("len: {}, act len: {}", builder.len(), buf.len()); let db_idx = strategy.distribution().db_idx(hash); assert_eq!( @@ -537,6 +573,7 @@ mod tests { // vget 无field let vector_cmd = VectorCmd { cmd: CommandType::VGet, + route: Some(Route::TimelineOrMain), keys: vec![ RingSlice::from_slice("id".as_bytes()), RingSlice::from_slice("2105".as_bytes()), @@ -552,7 +589,7 @@ mod tests { let builder = SqlBuilder::new(&vector_cmd, hash, date, &strategy, Default::default()).unwrap(); buf.clear(); - builder.write_sql(buf); + let _ = builder.write_sql(buf); println!("len: {}, act len: {}", builder.len(), buf.len()); let db_idx = strategy.distribution().db_idx(hash); assert_eq!( @@ -563,6 +600,7 @@ mod tests { // 复杂vget let vector_cmd = VectorCmd { cmd: CommandType::VGet, + route: Some(Route::TimelineOrMain), keys: vec![ RingSlice::from_slice("id".as_bytes()), RingSlice::from_slice("2105".as_bytes()), @@ -600,7 +638,7 @@ mod tests { let builder = SqlBuilder::new(&vector_cmd, hash, date, &strategy, Default::default()).unwrap(); buf.clear(); - builder.write_sql(buf); + let _ = builder.write_sql(buf); println!("len: {}, act len: {}", builder.len(), buf.len()); let db_idx = strategy.distribution().db_idx(hash); assert_eq!( diff --git a/endpoint/src/vector/topo.rs b/endpoint/src/vector/topo.rs index 0aa91289f..b8182f4c4 100644 --- a/endpoint/src/vector/topo.rs +++ b/endpoint/src/vector/topo.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::env; use chrono::{Datelike, NaiveDate}; use discovery::dns; @@ -6,12 +7,13 @@ use discovery::dns::IPPort; use discovery::TopologyWrite; use ds::MemGuard; use protocol::kv::{ContextStatus, MysqlBuilder}; -use protocol::vector::attachment::{VAttach, VecAttach}; +use protocol::vector::attachment::{BackendType, VAttach, VectorAttach}; use protocol::vector::redis::parse_vector_detail; -use protocol::Protocol; +use protocol::vector::{command, CommandType, VectorCmd}; use protocol::Request; use protocol::ResOption; use protocol::Resource; +use protocol::{Operation, Protocol}; use sharding::hash::{Hash, HashKey}; use crate::dns::DnsConfig; @@ -64,7 +66,7 @@ where P: Protocol, { fn has_attach(&self) -> bool { - self.strategist.more() + self.strategist.aggregation() } } @@ -74,11 +76,63 @@ where Req: Request, P: Protocol, { + /// 从si表中计数来计算timeline待查询的参数 + fn get_timeline_query_param( + &self, + req: &Req, + unbound_vcmd: &Option, + round: u16, + ) -> Result<(bool, u16, NaiveDate), protocol::Error> { + use CommandType::*; + // let vcmd = unbound_vcmd.as_ref().unwrap_or(&req.attach().vcmd); + let vcmd = unbound_vcmd + .as_ref() + .or_else(|| Some(&req.attach().vcmd)) + .expect("vcmd"); + let cmd_props = command::get_cfg(req.op_code())?; + + // round为0时查timeline,肯定不是retrieve的aggregation请求; + // 只有对retrieve类的aggregation请求,才会从si中获取timeline参数,其他的都不用; + if round == 0 || !cmd_props.get_route().is_aggregation() { + let date = self.strategist.get_date(vcmd.cmd, &vcmd.keys)?; + let next_round = round == 0 && cmd_props.get_route().is_aggregation(); + return Ok((next_round, vcmd.limit(req.operation()) as u16, date)); + } + + assert!(req.operation().is_retrival(), "{vcmd}"); + assert!(vcmd.cmd == VRange || vcmd.cmd == VRangeTimeline, "{vcmd}"); + assert!(req.attachment().is_some(), "{vcmd}"); + + //根据round从si获取timeline查询的参数 + let attach = req.attach().retrieve_attach(); + let si_items = attach.si(); + assert!(si_items.len() > 0, "si_items.len() = 0"); + assert!( + round <= si_items.len() as u16, + "round = {round}, si_items.len() = {}", + si_items.len() + ); + let si_item = &si_items[(round - 1) as usize]; + + let year = si_item.date.year as u16 + 2000; + //构建sql + let limit = attach.left_count.min(si_item.count); + assert!(si_item.count > 0, "{}", si_item.count); + assert!(attach.left_count > 0, "{}", attach.left_count); + + let Some(date) = NaiveDate::from_ymd_opt(year.into(), si_item.date.month.into(), 1) else { + return Err(protocol::Error::ResponseInvalidMagic); + }; + Ok((round < si_items.len() as u16, limit, date)) + } + fn get_shard(&self, req: &mut Req) -> Result<&Shard, protocol::Error> { let (year, shard_idx) = if req.ctx_mut().runs == 0 { - let vcmd = parse_vector_detail(****req, req.flag())?; + let vcmd = parse_vector_detail(****req, req.flag(), false)?; + self.strategist.check_vector_cmd(&vcmd)?; //定位年库 - let date = self.strategist.get_date(&vcmd.keys)?; + let date = self.strategist.get_date(vcmd.cmd, &vcmd.keys)?; + let year = date.year() as u16; let shard_idx = self.shard_idx(req.hash()); @@ -106,95 +160,83 @@ where ); Ok(shard) } - fn more_get_shard(&self, req: &mut Req) -> Result<&Shard, protocol::Error> { - req.attachment_mut() - .get_or_insert(VecAttach::default().to_attach()); + + ///
+    /// 对于配置为aggregation的业务,指令的route可能有:timeline/main、si、aggregation(默认)三种;
+    /// 1. 第一次请求
+    ///     1.1. aggregation请求,对于下行vrange,首先请求si,然后请求timeline;对于vcard,直接请求si即结束;对于上行,需要先请求timeline,再请求si;
+    ///     1.2. timeline请求,直接请求timeline表,下行失败重试一次,上行失败直接返回;
+    ///     1.3. si请求,直接请求si表,下行失败重试一次,上行失败直接返回;
+    /// 2. 第二次及以后请求
+    ///     2.1. aggregation请求成功,对于下行vrange,继续请求timeline;对于上行,继续请求si;
+    ///     2.2. aggregation请求失败,上行失败不会到这里;下行失败直接重试;
+    ///     2.3. timeline、si请求成功,请求已结束,不会到这里;
+    ///     2.3. timeline、si请求失败,下行请求重试,上行请求不会到这里;
+    /// 
+ + fn get_aggregation_shard(&self, req: &mut Req) -> Result<&Shard, protocol::Error> { //分别代表请求的轮次和每轮重试次数 - let (round, runs) = (req.attach().round, req.ctx_mut().runs); + let runs = req.ctx_mut().runs; //runs == 0 表示第一轮第一次请求 - let shard = if runs == 0 || req.attach().rsp_ok { + let shard = if runs == 0 || (req.attachment().is_some() && req.attach().rsp_ok) { let shard = if runs == 0 { - //请求si表 - assert_eq!(req.attach().left_count, 0); assert_eq!(*req.context_mut(), 0); - let vcmd = parse_vector_detail(****req, req.flag())?; - if req.operation().is_retrival() { - req.retry_on_rsp_notok(true); - let limit = vcmd.limit(); - assert!(limit > 0, "{limit}"); - //需要在buildsql之前设置 - req.attach_mut().init(limit as u16); - } + assert_eq!(req.attachment(), None); - let si_sql = SiSqlBuilder::new(&vcmd, req.hash(), &self.strategist)?; - let cmd = MysqlBuilder::build_packets_for_vector(si_sql)?; - req.reshape(MemGuard::from_vec(cmd)); + let vcmd = parse_vector_detail(****req, req.flag(), true)?; + self.strategist.check_vector_cmd(&vcmd)?; - let si_shard_idx = self.strategist.si_distribution().index(req.hash()); - req.ctx_mut().shard_idx = si_shard_idx as u16; - req.attach_mut().vcmd = vcmd; - req.set_last(false); - &self.si_shard[si_shard_idx] - } else { - //根据round获取si - let si_items = req.attach().si(); - assert!(si_items.len() > 0, "si_items.len() = 0"); - assert!( - round <= si_items.len() as u16, - "round = {round}, si_items.len() = {}", - si_items.len() - ); - let si_item = &si_items[(round - 1) as usize]; + if vcmd.route.expect("aggregation").is_aggregation() { + // 目前非retrive请求,默认不开启聚合请求,避免sdk无操作 + if !self.check_aggregation_request(req.operation(), vcmd.cmd, true) { + log::info!("+++ found unsupported req:{}", req); + return Err(protocol::Error::ProtocolNotSupported); + } - let year = si_item.date.year as u16 + 2000; - //构建sql - let limit = req.attach().left_count.min(si_item.count); - assert!(si_item.count > 0, "{}", si_item.count); - assert!(req.attach().left_count > 0, "{}", req.attach().left_count); + req.set_next_round(true); + let operation = req.operation(); + let attach = VectorAttach::new(operation, vcmd).to_attach(); + let _ = req.attachment_mut().insert(attach); - let Some(date) = NaiveDate::from_ymd_opt(year.into(), si_item.date.month.into(), 1) - else { - return Err(protocol::Error::ResponseInvalidMagic); - }; - let vector_builder = SqlBuilder::new( - &req.attach().vcmd, - req.hash(), - date, - &self.strategist, - limit as u64, - )?; - let cmd = MysqlBuilder::build_packets_for_vector(vector_builder)?; - - //更新轮次信息 - if round == si_items.len() as u16 { - req.set_last(true); + self.reshape_and_get_shard(req, None, 0)? + } else { + // 非aggregation请求的场景,不需要构建attachement + self.reshape_and_get_shard(req, Some(vcmd), 0)? } - - req.reshape(MemGuard::from_vec(cmd)); - //获取shard - let shard_idx = self.shard_idx(req.hash()); - req.ctx_mut().year = year; - req.ctx_mut().shard_idx = shard_idx as u16; - let shards = self.shards.get(year); - shards.get(shard_idx).ok_or(protocol::Error::TopInvalid)? + } else { + assert!(req.get_next_round()); + assert!(req.attachment().is_some()); + self.reshape_and_get_shard(req, None, req.attach().round)? }; //重新发送后,视作新的请求,重置响应和runs - req.attach_mut().rsp_ok = false; - req.attach_mut().round += 1; + if req.attachment().is_some() { + req.attach_mut().rsp_ok = false; + req.attach_mut().round += 1; + } req.ctx_mut().runs = 0; - req.set_fitst_try(); + // req.set_fitst_try(); shard } else { - if round - 1 == 0 { - //上一轮si表重试 - &self.si_shard[req.ctx_mut().shard_idx as usize] - } else { - let (year, shard_idx) = (req.ctx_mut().year, req.ctx_mut().shard_idx); - let shards = self.shards.get(year); - shards - .get(shard_idx as usize) - .ok_or(protocol::Error::TopInvalid)? + // 失败重试,对于kvector请求,只有下行请求才会重试 + assert!(req.operation().is_retrival(), "kvector req: {}", req); + // 重试时,对于单库表访问,没有attachment + let ctx = req.ctx(); + match ctx.backend_type.into() { + BackendType::TimelineOrMain => { + // timeline重试 + let shards = self.shards.get(ctx.year); + shards + .get(ctx.shard_idx as usize) + .ok_or(protocol::Error::TopInvalid)? + } + BackendType::Si => { + //si表重试 + &self.si_shard[ctx.shard_idx as usize] + } + _ => { + panic!("malformed backend type"); + } } }; @@ -206,7 +248,135 @@ where ); Ok(shard) } + + fn reshape_and_get_shard( + &self, + req: &mut Req, + unbound_vcmd: Option, + round: u16, + ) -> Result<&Shard, protocol::Error> { + // let vcmd = unbound_vcmd.as_ref().unwrap_or(&req.attach().vcmd); + let vcmd = unbound_vcmd + .as_ref() + .or_else(|| Some(&req.attach().vcmd)) + .expect("vcmd"); + if self.will_access_timeline(&vcmd, req.operation(), round) { + let (next_round, date) = self.reshap_request_timeline(req, unbound_vcmd, round)?; + req.set_next_round(next_round); + let shard = self.get_shard_timeline(req, &date)?; + req.ctx_mut().backend_type = BackendType::TimelineOrMain; + Ok(shard) + } else { + let next_round = self.reshape_request_si(req, unbound_vcmd)?; + req.ctx_mut().backend_type = BackendType::Si; + req.set_next_round(next_round); + + Ok(self.get_shard_si(req)) + } + } + + /// 获取si shard,注意区分第一次获取和重试获取 + fn reshape_request_si( + &self, + req: &mut Req, + unbound_vcmd: Option, + ) -> Result { + let vcmd = unbound_vcmd + .as_ref() + .or_else(|| Some(&req.attach().vcmd)) + .expect("vcmd"); + assert!(vcmd.route.is_some(), "{vcmd}"); + let next_round = req.operation().is_retrival() && vcmd.route.expect("si").is_aggregation(); + + let date = self.strategist.get_date(vcmd.cmd, &vcmd.keys)?; + let si_sql = SiSqlBuilder::new(&vcmd, req.hash(), date, &self.strategist)?; + let cmd = MysqlBuilder::build_packets_for_vector(si_sql)?; + req.reshape(MemGuard::from_vec(cmd)); + + Ok(next_round) + } + + fn get_shard_si(&self, req: &mut Req) -> &Shard { + let si_shard_idx = self.strategist.si_distribution().index(req.hash()); + req.ctx_mut().shard_idx = si_shard_idx as u16; + &self.si_shard[si_shard_idx] + } + + /// 重新构建timeline请求,注意区分第一次获取和重试获取 + #[inline] + fn reshap_request_timeline( + &self, + req: &mut Req, + unbound_vcmd: Option, + round: u16, + ) -> Result<(bool, NaiveDate), protocol::Error> { + let (next_round, limit, date) = self.get_timeline_query_param(req, &unbound_vcmd, round)?; + + let vcmd = unbound_vcmd + .as_ref() + .or_else(|| Some(&req.attach().vcmd)) + .expect("vcmd"); + let vector_builder = + SqlBuilder::new(vcmd, req.hash(), date, &self.strategist, limit as u64)?; + let cmd = MysqlBuilder::build_packets_for_vector(vector_builder)?; + req.reshape(MemGuard::from_vec(cmd)); + req.ctx_mut().year = date.year() as u16; + Ok((next_round, date)) + } + + #[inline] + fn get_shard_timeline( + &self, + req: &mut Req, + date: &NaiveDate, + ) -> Result<&Shard, protocol::Error> { + let shard_idx = self.shard_idx(req.hash()); + req.ctx_mut().shard_idx = shard_idx as u16; + let shards = self.shards.get(date.year() as u16); + shards.get(shard_idx).ok_or(protocol::Error::TopInvalid) + } + + #[inline] + fn will_access_timeline(&self, vcmd: &VectorCmd, operation: Operation, round: u16) -> bool { + vcmd.route + .expect("aggregation") + .current_backend_timeline(operation, round) + } + + /// check是否支持该aggregation请求,目前默认只支持retrieve类聚合请求 + #[inline] + fn check_aggregation_request( + &self, + operation: Operation, + cmd: CommandType, + aggregation_route: bool, + ) -> bool { + if operation.is_retrival() { + // retrival 请求总是支持 + true + } else if aggregation_route && self.strategist.aggregation() { + // aggregation策略下,vupdate 目前不支持,因为si中计数不清楚如何设置 + match cmd { + CommandType::VUpdate => return false, + _ => {} + }; + + // 对非retrieve类的聚合请求,在aggregation策略的topo中,设置环境变量aggregation_store_enable为true,才支持访问; + // 目前线上业务都不打开,故线上应该没有此类访问;待有需求再打开 + log::info!( + "AGGENGATION_STORE: {}", + env::var("AGGENGATION_STORE").unwrap_or("not-set".to_string()) + ); + env::var("AGGENGATION_STORE") + .unwrap_or("false".to_string()) + .parse::() + .unwrap_or(false) + } else { + true + } + } } + impl Endpoint for VectorService where E: Endpoint, @@ -215,11 +385,22 @@ where { type Item = Req; + ///
+    /// 发送请求有如下之中类型:
+    ///   1. 配置为aggregation
+    ///     1.1 请求aggregation(默认),对于下行请求,先请求si,再请求timeline;对于上行请求,先请求timeline,成功再请求si(低优先级,线上无聚合上行,后续支持);
+    ///     1.2 请求timeline,直接请求timeline库表,上行失败直接返回,下行失败重试一次;
+    ///     1.3 请求si,直接请求si库表,上行失败直接返回,下行失败重试一次;
+    ///   2. 配置为非aggregation,当前只有vectortime
+    ///     2.1 没有请求route,上行失败直接返回,下行失败重试一次。
+    /// 
fn send(&self, mut req: Self::Item) { - let shard = if !self.strategist.more() { + log::info!("+++ will send req: {}", req); + + let shard = if !self.strategist.aggregation() { self.get_shard(&mut req) } else { - self.more_get_shard(&mut req) + self.get_aggregation_shard(&mut req) }; let shard = match shard { Ok(shard) => shard, @@ -250,13 +431,17 @@ where (ctx.idx as usize, &shard.master) } }; - ctx.idx = idx as u16; + ctx.idx = idx as u8; ctx.runs += 1; // 只重试一次,重试次数过多,可能会导致雪崩。如果不重试,现在的配额策略在当前副本也只会连续发送四次请求,问题也不大 let try_next = ctx.runs == 1; req.try_next(try_next); + log::info!("+++ send to {}, req: {}", endpoint.addr(), req); endpoint.send(req) } else { + let ctx = req.ctx_mut(); + ctx.runs += 1; + log::info!("+++ send to master {}, req: {}", shard.master.addr(), req); shard.master().send(req); } } diff --git a/endpoint/src/vector/vectortime.rs b/endpoint/src/vector/vectortime.rs index 0fa6d438b..95766a641 100644 --- a/endpoint/src/vector/vectortime.rs +++ b/endpoint/src/vector/vectortime.rs @@ -1,11 +1,11 @@ use crate::kv::kvtime::KVTime; -use super::strategy::Postfix; use chrono::NaiveDate; use core::fmt::Write; use ds::RingSlice; -use protocol::kv::Strategy; -use protocol::Error; +use protocol::vector::Postfix; +use protocol::{kv::Strategy, vector::KeysType}; +use protocol::{Error, DATE_YYMM, DATE_YYMMDD}; use sharding::{distribution::DBRange, hash::Hasher}; #[derive(Clone, Debug)] @@ -37,61 +37,43 @@ impl VectorTime { ::hasher(&self.kvtime) } + //策略处已作校验 pub fn get_date(&self, keys: &[RingSlice]) -> Result { - if keys.len() != self.keys_name.len() { - return Err(Error::RequestProtocolInvalid); - } - - let mut ymd = (0u16, 0u16, 0u16); - for (i, key_name) in self.keys_name.iter().enumerate() { - match key_name.as_str() { - "yymm" => { - ymd = ( - keys[i] - .try_str_num(0..0 + 2) - .ok_or(Error::RequestProtocolInvalid)? as u16 - + 2000, - keys[i] - .try_str_num(2..2 + 2) - .ok_or(Error::RequestProtocolInvalid)? as u16, - 1, - ); - break; - } - "yymmdd" => { - ymd = ( - keys[i] - .try_str_num(0..0 + 2) - .ok_or(Error::RequestProtocolInvalid)? as u16 - + 2000, - keys[i] - .try_str_num(2..2 + 2) - .ok_or(Error::RequestProtocolInvalid)? as u16, - keys[i] - .try_str_num(4..4 + 2) - .ok_or(Error::RequestProtocolInvalid)? as u16, - ); - break; - } - // "yyyymm" => { - // ymd = ( - // keys[i].try_str_num(0..0+4)? as u16, - // keys[i].try_str_num(4..4+2)? as u16, - // 1, - // ) - // } - // "yyyymmdd" => { - // ymd = ( - // keys[i].try_str_num(0..0+4)? as u16, - // keys[i].try_str_num(4..4+2)? as u16, - // keys[i].try_str_num(6..6+2)? as u16, - // ) - // } - &_ => { - continue; - } - } - } + let date = keys.last().unwrap(); + let ymd = match self.keys_name.last().unwrap().as_str() { + DATE_YYMM => ( + date.try_str_num(0..0 + 2) + .ok_or(Error::RequestProtocolInvalid)? as u16 + + 2000, + date.try_str_num(2..2 + 2) + .ok_or(Error::RequestProtocolInvalid)? as u16, + 1, + ), + DATE_YYMMDD => ( + date.try_str_num(0..0 + 2) + .ok_or(Error::RequestProtocolInvalid)? as u16 + + 2000, + date.try_str_num(2..2 + 2) + .ok_or(Error::RequestProtocolInvalid)? as u16, + date.try_str_num(4..4 + 2) + .ok_or(Error::RequestProtocolInvalid)? as u16, + ), + _ => (0, 0, 0), + // "yyyymm" => { + // ymd = ( + // date.try_str_num(0..0+4)? as u16, + // date.try_str_num(4..4+2)? as u16, + // 1, + // ) + // } + // "yyyymmdd" => { + // ymd = ( + // date.try_str_num(0..0+4)? as u16, + // date.try_str_num(4..4+2)? as u16, + // date.try_str_num(6..6+2)? as u16, + // ) + // } + }; NaiveDate::from_ymd_opt(ymd.0.into(), ymd.1.into(), ymd.2.into()) .ok_or(Error::RequestProtocolInvalid) } @@ -105,17 +87,23 @@ impl VectorTime { &self.keys_name } - pub(crate) fn condition_keys(&self) -> Box> + '_> { + pub(crate) fn keys_with_type(&self) -> Box + '_> { Box::new( self.keys_name .iter() .map(|key_name| match key_name.as_str() { - "yymm" | "yymmdd" => None, - // "yyyymm" | "yyyymmdd" => None, - &_ => Some(key_name), + DATE_YYMM | DATE_YYMMDD => KeysType::Time, + &_ => KeysType::Keys(key_name), }), ) } + + pub(crate) fn check_vector_cmd(&self, vcmd: &protocol::vector::VectorCmd) -> Result<(), Error> { + if vcmd.keys.len() != self.keys_name.len() { + return Err(Error::RequestProtocolInvalid); + } + Ok(()) + } } impl std::ops::Deref for VectorTime { diff --git a/protocol/src/callback.rs b/protocol/src/callback.rs index f7471921e..3fba9e48b 100644 --- a/protocol/src/callback.rs +++ b/protocol/src/callback.rs @@ -35,11 +35,12 @@ pub struct CallbackContext { done: AtomicBool, // 当前模式请求是否完成 inited: AtomicBool, // response是否已经初始化 pub(crate) try_next: bool, // 请求失败后,topo层面是否允许重试 + pub(crate) next_round: bool, // 请求成功后,topo层面是否允许继续下一轮请求,目前只有vector使用 pub(crate) retry_on_rsp_notok: bool, // 有响应且响应不ok时,协议层面是否允许重试 - pub(crate) write_back: bool, // 请求结束后,是否需要回写。 - pub(crate) max_tries: u8, // 最大重试次数 - first: bool, // 当前请求是否是所有子请求的第一个 - last: bool, // 当前请求是否是所有子请求的最后一个 + pub(crate) write_back: bool, // 请求结束后,是否需要回写。 + pub(crate) max_tries: u8, // 最大重试次数 + first: bool, // 当前请求是否是所有子请求的第一个 + last: bool, // 当前请求是否是所有子请求的最后一个 tries: AtomicU8, request: HashedCommand, response: MaybeUninit, @@ -49,6 +50,7 @@ pub struct CallbackContext { quota: Option, attachment: Option, // 附加数据,用于辅助请求和响应,目前只有kvector在使用 drop_attach: Option>, + next_action: u8, // 下一步动作 } impl CallbackContext { @@ -73,6 +75,7 @@ impl CallbackContext { inited: AtomicBool::new(false), async_mode: false, try_next: false, + next_round: false, retry_on_rsp_notok, write_back: false, max_tries, @@ -85,6 +88,7 @@ impl CallbackContext { quota: None, attachment: None, drop_attach, + next_action: 0, } } @@ -120,17 +124,17 @@ impl CallbackContext { #[inline] pub fn on_complete(&mut self, parser: &P, resp: Command) { log::debug!("on-complete:{} resp:{}", self, resp); - // 异步请求不关注response。 - if !self.async_mode { - debug_assert!(!self.complete(), "{:?}", self); - if self.attachment.is_some() { - // vector聚合场景 - self.on_complete_aggregate(parser, resp); - } else { + if self.attachment.is_none() { + // 异步请求不关注response。 + if !self.async_mode { + debug_assert!(!self.complete(), "{:?}", self); self.swap_response(resp); } + self.on_done(); + } else { + // vector聚合场景 + self.on_complete_aggregate(parser, resp); } - self.on_done(); } #[inline] @@ -139,24 +143,24 @@ impl CallbackContext { // 1. 第一轮获取si;若si获取失败(例如si为空),则终止请求 // 2. 后续轮次更新attachment,并判断是否是最后一轮。 // 返回失败,则终止请求。 - if resp.ok() { - // 更新attachment + let next_round = if resp.ok() { let attach = self.attachment.as_mut().expect("attach"); - let last = parser.update_attachment(attach, &mut resp); - if last { - self.set_last(true); - } // 更新attachment不成功,或者响应数足够,终止请求 + parser.update_attachment(attach, &mut resp) } else { - self.set_last(true); - } - if self.last() { + false + }; + + self.quota.take().map(|q| q.incr(self.start_at().elapsed())); + + //没有下一轮时,走以前的重试逻辑:有响应不会重试了 + if !next_round || !self.next_round { // 中间轮次的resp没有被使用,可忽略; + self.next_round = false; self.swap_response(resp); + self.mark_done_and_wake(); } else { - // 重置下一轮访问需要的变量 - self.try_next = true; // 可以进入下一轮访问 - self.set_fitst_try(); + self.goon() } } @@ -209,9 +213,10 @@ impl CallbackContext { return self.goon(); } - // 改到这里,不需要额外判断逻辑了 - self.set_last(true); + self.mark_done_and_wake(); + } + fn mark_done_and_wake(&mut self) { //markdone后,req标记为已完成,那么CallbackContext和CopyBidirectional都有可能被释放 //CopyBidirectional会提前释放,所以需要提前clone一份 //CallbackContext会提前释放,则需要在此clone到栈上 @@ -339,17 +344,25 @@ impl CallbackContext { &mut self.attachment } #[inline] - pub fn set_last(&mut self, last: bool) { - // todo: 可优化为依据请求数或者响应数量判断可以设置last为true - self.last = last; + pub fn set_next_round(&mut self, next_round: bool) { + self.next_round = next_round; } #[inline] pub fn set_max_tries(&mut self, max_tries: u8) { self.max_tries = max_tries; } + // #[inline] + // pub fn reset_tries(&mut self) { + // self.tries.store(0, Release); + // } + #[inline] - pub fn set_fitst_try(&mut self) { - self.tries = 0.into(); + pub fn with_next_action(&mut self, next_action: u8) { + self.next_action = next_action; + } + + pub(crate) fn get_next_round(&self) -> bool { + self.next_round } } diff --git a/protocol/src/kv/mc2mysql.rs b/protocol/src/kv/mc2mysql.rs index 98591c0a1..2b992f948 100644 --- a/protocol/src/kv/mc2mysql.rs +++ b/protocol/src/kv/mc2mysql.rs @@ -160,7 +160,7 @@ pub fn escape_mysql_and_push(packet: &mut impl Write, c: u8) { pub trait VectorSqlBuilder: MysqlBinary { fn len(&self) -> usize; - fn write_sql(&self, buf: &mut impl Write); + fn write_sql(&self, buf: &mut impl Write) -> Result<()>; } impl MysqlBuilder { @@ -198,7 +198,7 @@ impl MysqlBuilder { packet.write_next_packet_header(); packet.push(sql_builder.mysql_cmd() as u8); - sql_builder.write_sql(&mut packet); + sql_builder.write_sql(&mut packet)?; packet.finish_current_packet(); packet diff --git a/protocol/src/kv/mod.rs b/protocol/src/kv/mod.rs index ffb56407a..be1634ed2 100644 --- a/protocol/src/kv/mod.rs +++ b/protocol/src/kv/mod.rs @@ -25,6 +25,7 @@ use super::Flag; use super::Protocol; use crate::kv::client::Client; use crate::kv::error::Error; +use crate::vector::attachment::BackendType; use crate::Command; use crate::HandShake; use crate::HashedCommand; @@ -505,7 +506,8 @@ pub enum ContextStatus { pub struct Context { pub runs: u8, // 运行的次数 pub error: ContextStatus, - pub idx: u16, //最多有65535个主从 + pub idx: u8, //最多有256个主从 + pub backend_type: BackendType, pub shard_idx: u16, pub year: u16, } diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs index 6e1071d66..058dd32de 100644 --- a/protocol/src/lib.rs +++ b/protocol/src/lib.rs @@ -20,6 +20,7 @@ pub mod vector; pub use flag::*; pub use parser::Proto as Protocol; pub use parser::*; +pub use vector::{DATE_YYMM, DATE_YYMMDD}; pub use ds::{Bit, Ext}; diff --git a/protocol/src/parser.rs b/protocol/src/parser.rs index 397f69470..ab3667f51 100644 --- a/protocol/src/parser.rs +++ b/protocol/src/parser.rs @@ -130,10 +130,8 @@ pub trait Proto: Unpin + Clone + Send + Sync + 'static { } #[inline] - fn update_attachment(&self, attachment: &mut Attachment, response: &mut Command) -> bool { - // 默认情况下,attachment应该为空 - assert!(false, "{:?} {response}", attachment); - false + fn update_attachment(&self, _attachment: &mut Attachment, _responsee: &mut Command) -> bool { + panic!("unreachable"); } #[inline] fn drop_attach(&self, _att: Attachment) { @@ -153,7 +151,8 @@ pub trait RequestProcessor { // TODO Command实质就是response,考虑直接用response? fishermen pub struct Command { ok: bool, - pub(crate) header: ResponseHeader, + // header 只对部分请求有效,改为option + pub(crate) header: Option, count: u32, cmd: MemGuard, } @@ -172,7 +171,7 @@ impl Command { pub fn from(ok: bool, cmd: ds::MemGuard) -> Self { Self { ok, - header: Default::default(), + header: None, count: 0, cmd, } @@ -182,7 +181,7 @@ impl Command { let count = header.rows as u32; Self { ok, - header, + header: Some(header), count, cmd: body, } diff --git a/protocol/src/req.rs b/protocol/src/req.rs index d3b88a268..d34b8b267 100644 --- a/protocol/src/req.rs +++ b/protocol/src/req.rs @@ -7,7 +7,8 @@ use std::sync::Arc; use crate::{Command, HashedCommand}; pub type Context = u64; -pub type Attachment = [u8; 280]; +pub type Attachment = [u8; 296]; + #[repr(transparent)] #[derive(Clone, Default)] pub struct BackendQuota { @@ -72,6 +73,7 @@ pub trait Request: // 获取附加信息 fn attachment(&self) -> Option<&Attachment>; fn set_max_tries(&mut self, max_tries: u8); - fn set_fitst_try(&mut self); - fn set_last(&mut self, last: bool); + // fn set_fitst_try(&mut self); + fn set_next_round(&mut self, next_round: bool); + fn get_next_round(&mut self) -> bool; } diff --git a/protocol/src/request.rs b/protocol/src/request.rs index c66adbdf3..9c46eb431 100644 --- a/protocol/src/request.rs +++ b/protocol/src/request.rs @@ -71,12 +71,16 @@ impl crate::Request for Request { fn set_max_tries(&mut self, max_tries: u8) { self.ctx().set_max_tries(max_tries); } - #[inline] - fn set_fitst_try(&mut self) { - self.ctx().set_fitst_try(); + // #[inline] + // fn set_fitst_try(&mut self) { + // self.ctx().reset_tries(); + // } + fn set_next_round(&mut self, next_round: bool) { + self.ctx().set_next_round(next_round); } - fn set_last(&mut self, last: bool) { - self.ctx().set_last(last); + + fn get_next_round(&mut self) -> bool { + self.ctx().get_next_round() } } impl Request { diff --git a/protocol/src/vector/attachment.rs b/protocol/src/vector/attachment.rs index f45ec8a8a..51ef254ea 100644 --- a/protocol/src/vector/attachment.rs +++ b/protocol/src/vector/attachment.rs @@ -1,15 +1,42 @@ +use std::mem::transmute; +use std::mem::ManuallyDrop; + use crate::Attachment; use crate::Command; +use crate::Operation; use crate::Packet; use ds::RingSlice; use super::VectorCmd; -#[derive(Debug, Default)] + #[repr(C)] -pub struct VecAttach { - pub rsp_ok: bool, +pub struct VectorAttach { + // type + attach_type: AttachType, + // attach basic fields + pub vcmd: VectorCmd, // 查询的轮次,0代表si pub round: u16, + // 最新查询的rsp状态 + pub rsp_ok: bool, + + // attach ext fields + attach_ext: VectorAttachExt, +} + +// TODO retrieveAttach 需要在合适的位置 mannual drop fishermen +pub union VectorAttachExt { + pub(crate) retrieve_attach: ManuallyDrop, + pub(crate) store_attach: StoreAttach, +} + +#[derive(Debug)] +#[repr(C)] +pub struct RetrieveAttach { + // pub vcmd: VectorCmd, + // pub rsp_ok: bool, + // // 查询的轮次,0代表si + // pub round: u16, // 待查询数量,不能超过u16::MAX pub left_count: u16, body_token_count: u16, @@ -19,7 +46,232 @@ pub struct VecAttach { body: Vec>, // 查询响应的body中token数量 si: Vec, // si表中查询到的数据, si字段信息在配置里存放 - pub vcmd: VectorCmd, +} + +#[derive(Debug, Default, Clone, Copy)] +#[repr(C)] +pub struct StoreAttach { + op_date: VDate, + pub(crate) affected_rows: u16, +} + +impl VectorAttachExt { + #[inline] + pub fn default_retrieve_attach(left_count: u16) -> Self { + Self { + retrieve_attach: ManuallyDrop::new(RetrieveAttach::new(left_count)), + } + } + + #[inline] + pub fn default_store_attach() -> Self { + Self { + store_attach: StoreAttach::default(), + } + } +} + +#[derive(Debug)] +#[repr(C)] +pub enum AttachType { + Unknown = 0, + Retrieve = 1, + Store = 2, +} + +#[derive(Debug, Clone, Copy)] +#[repr(u8)] +pub enum Route { + Aggregation = 0, + TimelineOrMain = 1, + Si = 2, +} + +impl Default for Route { + fn default() -> Self { + Route::Aggregation + } +} + +#[derive(Debug, Clone, Copy)] +#[repr(u8)] +pub enum BackendType { + Unknown = 0, + TimelineOrMain = 1, + Si = 2, +} + +impl Default for BackendType { + fn default() -> Self { + BackendType::TimelineOrMain + } +} + +impl BackendType { + #[inline(always)] + pub fn is_timeline_or_main(&self) -> bool { + match self { + BackendType::TimelineOrMain => true, + _ => false, + } + } +} + +impl From for BackendType { + fn from(v: u8) -> Self { + match v { + 0 => BackendType::Unknown, + 1 => BackendType::TimelineOrMain, + 2 => BackendType::Si, + _ => panic!("invalid backend type"), + } + } +} + +impl Route { + /// parse route from string,对于配置为aggregation的业务,目前只有timeline和si两种需要特殊指出,默认是aggregation + #[inline(always)] + pub fn parse(route_str: &RingSlice) -> Self { + if route_str.start_ignore_case(0, "timeline".as_bytes()) { + Route::TimelineOrMain + } else if route_str.start_ignore_case(0, "si".as_bytes()) { + Route::Si + } else { + Route::Aggregation + } + } + + #[inline(always)] + pub fn is_aggregation(&self) -> bool { + match self { + Route::Aggregation => true, + _ => false, + } + } + + /// 获取aggregation的执行route + #[inline(always)] + pub fn current_backend_timeline(&self, op: Operation, round: u16) -> bool { + match self { + Route::Si => false, + Route::TimelineOrMain => true, + Route::Aggregation => { + if op.is_store() { + // (Route::TimelineOrMain, Route::Si),对于store,first round访问timeline,否则访问si + round == 0 + } else { + // (Route::Si, Route::TimelineOrMain),对于retrive指令,first round访问si,非first round才访问timeline, + round != 0 + } + } + } + } +} + +impl Default for AttachType { + fn default() -> Self { + AttachType::Unknown + } +} + +impl VectorAttach { + #[inline(always)] + pub fn new(operation: Operation, vcmd: VectorCmd) -> Self { + let limit = vcmd.limit(operation); + match operation { + Operation::Get | Operation::Gets => Self { + attach_type: AttachType::Retrieve, + vcmd: vcmd, + round: 0, + rsp_ok: false, + attach_ext: VectorAttachExt::default_retrieve_attach(limit), + }, + Operation::Store => Self { + attach_type: AttachType::Store, + vcmd: vcmd, + round: 0, + rsp_ok: false, + attach_ext: VectorAttachExt::default_store_attach(), + }, + _ => panic!("only support get/gets/store for kvector"), + } + } + #[inline(always)] + pub fn from(att: Attachment) -> VectorAttach { + unsafe { std::mem::transmute(att) } + } + #[inline(always)] + pub fn attach(att: &Attachment) -> &VectorAttach { + unsafe { std::mem::transmute(att) } + } + #[inline(always)] + pub fn attach_mut(att: &mut Attachment) -> &mut VectorAttach { + unsafe { std::mem::transmute(att) } + } + #[inline(always)] + pub fn to_attach(self) -> Attachment { + unsafe { std::mem::transmute(self) } + } + + #[inline(always)] + pub fn attch_type(&self) -> &AttachType { + &self.attach_type + } + + #[inline(always)] + pub fn retrieve_attach(&self) -> &RetrieveAttach { + assert!(self.attach_type.is_retrieve(), "{:?}", self.attach_type); + unsafe { &self.attach_ext.retrieve_attach } + } + + #[inline(always)] + pub fn store_attach(&self) -> &StoreAttach { + assert!(self.attach_type.is_store()); + unsafe { &self.attach_ext.store_attach } + } + + #[inline(always)] + pub fn retrieve_attach_mut(&mut self) -> &mut RetrieveAttach { + assert!(self.attach_type.is_retrieve(), "{:?}", self.attach_type); + unsafe { &mut self.attach_ext.retrieve_attach } + } + + #[inline(always)] + pub fn store_attach_mut(&mut self) -> &mut StoreAttach { + assert!(self.attach_type.is_store()); + unsafe { &mut self.attach_ext.store_attach } + } +} + +impl Drop for VectorAttach { + /// TODO 对于retrieve attach,需要手动进行drop + fn drop(&mut self) { + match self.attach_type { + AttachType::Retrieve => unsafe { + ManuallyDrop::drop(&mut self.attach_ext.retrieve_attach); + log::info!("+++ drop retrieve attach manually!"); + }, + _ => {} + } + } +} + +impl AttachType { + #[inline(always)] + pub fn is_retrieve(&self) -> bool { + match self { + AttachType::Retrieve => true, + _ => false, + } + } + + #[inline(always)] + pub fn is_store(&self) -> bool { + match self { + AttachType::Store => true, + _ => false, + } + } } #[derive(Debug, Default)] @@ -38,7 +290,7 @@ impl SiItem { } } } -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone, Copy)] pub struct VDate { pub year: u8, // year pub month: u8, // month @@ -75,64 +327,48 @@ impl VDate { } pub trait VAttach { - fn attach(&self) -> &VecAttach; - fn attach_mut(&mut self) -> &mut VecAttach; + fn attach(&self) -> &VectorAttach; + fn attach_mut(&mut self) -> &mut VectorAttach; } impl VAttach for T { #[inline(always)] - fn attach(&self) -> &VecAttach { - unsafe { std::mem::transmute(self.attachment().expect("attach is none")) } + fn attach(&self) -> &VectorAttach { + unsafe { transmute(self.attachment().expect("attach is none")) } } #[inline(always)] - fn attach_mut(&mut self) -> &mut VecAttach { + fn attach_mut(&mut self) -> &mut VectorAttach { unsafe { std::mem::transmute(self.attachment_mut().as_mut().expect("attach is none")) } } } -impl VecAttach { - #[inline(always)] - pub fn from(att: Attachment) -> VecAttach { - unsafe { std::mem::transmute(att) } - } - #[inline(always)] - pub fn attach(att: &Attachment) -> &VecAttach { - unsafe { std::mem::transmute(att) } - } - #[inline(always)] - pub fn attach_mut(att: &mut Attachment) -> &mut VecAttach { - unsafe { std::mem::transmute(att) } - } - #[inline(always)] - pub fn to_attach(self) -> Attachment { - unsafe { std::mem::transmute(self) } - } +impl RetrieveAttach { #[inline] - pub fn init(&mut self, left_count: u16) { - *self = VecAttach { - round: 0, + fn new(left_count: u16) -> Self { + Self { left_count, header: Vec::with_capacity(8), body: Vec::with_capacity(12), body_token_count: 0, - rsp_ok: false, si: Vec::with_capacity(6), - vcmd: Default::default(), - }; + } } #[inline] pub fn is_empty(&self) -> bool { self.body.is_empty() } - pub fn attach_header(&mut self, header: Vec) { - self.header = header; + // pub fn attach_header(&mut self, header: Vec) { + pub fn swap_header_data(&mut self, header: &mut Vec) { + // self.header = header; + std::mem::swap(&mut self.header, header); } #[inline] pub fn attach_body(&mut self, body_data: Vec, rows: u16, columns: u16) { self.body.push(body_data); self.body_token_count += rows * columns; self.left_count = self.left_count.saturating_sub(rows); + log::debug!("left_count: {}", self.left_count); } #[inline] @@ -153,8 +389,9 @@ impl VecAttach { // 约定:si返回结果的结构: uid、date、count顺序排列 #[inline] pub fn attach_si(&mut self, response: &Command) -> bool { - let rows = response.header.rows; - let cols = response.header.columns; + let header = response.header.as_ref().expect("rsp si"); + let rows = header.rows; + let cols = header.columns; debug_assert_eq!(cols, 3); self.si.reserve(rows as usize); let data = Packet::from(***response); @@ -177,6 +414,7 @@ impl VecAttach { } } } + log::debug!("si len: {},content: {:?}", self.si.len(), self.si); self.si.len() > 0 } #[inline] @@ -189,6 +427,13 @@ impl VecAttach { } } +impl StoreAttach { + #[inline] + pub fn incr_affected_rows(&mut self, affected_rows: u16) { + self.affected_rows = self.affected_rows + affected_rows; + } +} + #[cfg(test)] mod tests { use ds::MemGuard; @@ -205,8 +450,7 @@ mod tests { ); let body: MemGuard = MemGuard::from_vec(":6351590999\r\n$10\r\n2024-06-01\r\n:674\r\n:6351590999\r\n$10\r\n2024-05-01\r\n:1113\r\n:6351590999\r\n$10\r\n2024-04-01\r\n:833\r\n:6351590999\r\n$10\r\n2024-03-01\r\n:45\r\n:6351590999\r\n$10\r\n2024-02-01\r\n:61\r\n:6351590999\r\n$10\r\n2024-01-01\r\n:59\r\n:6351590999\r\n$10\r\n2023-12-01\r\n:20\r\n:6351590999\r\n$10\r\n2023-11-01\r\n:9\r\n:6351590999\r\n$10\r\n2023-10-01\r\n:13\r\n:6351590999\r\n$10\r\n2023-09-01\r\n:50\r\n:6351590999\r\n$10\r\n2023-08-01\r\n:16\r\n:6351590999\r\n$10\r\n2023-07-01\r\n:61\r\n:6351590999\r\n$10\r\n2023-06-01\r\n:30\r\n:6351590999\r\n$10\r\n2023-05-01\r\n:41\r\n:6351590999\r\n$10\r\n2023-04-01\r\n:54\r\n:6351590999\r\n$10\r\n2023-03-01\r\n:108\r\n:6351590999\r\n$10\r\n2023-02-01\r\n:213\r\n:6351590999\r\n$10\r\n2023-01-01\r\n:159\r\n:6351590999\r\n$10\r\n2022-12-01\r\n:26\r\n:6351590999\r\n$10\r\n2022-11-01\r\n:16\r\n:6351590999\r\n$10\r\n2022-10-01\r\n:14\r\n:6351590999\r\n$10\r\n2022-09-01\r\n:3\r\n:6351590999\r\n$10\r\n2022-08-01\r\n:10\r\n:6351590999\r\n$10\r\n2022-07-01\r\n:9\r\n:6351590999\r\n$10\r\n2022-06-01\r\n:4\r\n:6351590999\r\n$10\r\n2022-05-01\r\n:23\r\n:6351590999\r\n$10\r\n2022-04-01\r\n:4\r\n:6351590999\r\n$10\r\n2022-03-01\r\n:4\r\n:6351590999\r\n$10\r\n2022-02-01\r\n:4\r\n:6351590999\r\n$10\r\n2022-01-01\r\n:5\r\n:6351590999\r\n$10\r\n2021-12-01\r\n:14\r\n:6351590999\r\n$10\r\n2021-11-01\r\n:4\r\n:6351590999\r\n$10\r\n2021-10-01\r\n:2\r\n:6351590999\r\n$10\r\n2021-09-01\r\n:3\r\n:6351590999\r\n$10\r\n2021-08-01\r\n:25\r\n:6351590999\r\n$10\r\n2021-07-01\r\n:36\r\n:6351590999\r\n$10\r\n2021-06-01\r\n:30\r\n:6351590999\r\n$10\r\n2021-05-01\r\n:18\r\n:6351590999\r\n$10\r\n2021-04-01\r\n:20\r\n:6351590999\r\n$10\r\n2021-03-01\r\n:21\r\n:6351590999\r\n$10\r\n2021-02-01\r\n:35\r\n:6351590999\r\n$10\r\n2021-01-01\r\n:22\r\n:6351590999\r\n$10\r\n2020-12-01\r\n:55\r\n:6351590999\r\n$10\r\n2020-11-01\r\n:22\r\n:6351590999\r\n$10\r\n2020-10-01\r\n:37\r\n:6351590999\r\n$10\r\n2020-09-01\r\n:33\r\n:6351590999\r\n$10\r\n2020-08-01\r\n:15\r\n:6351590999\r\n$10\r\n2020-07-01\r\n:12\r\n:6351590999\r\n$10\r\n2020-06-01\r\n:26\r\n:6351590999\r\n$10\r\n2020-05-01\r\n:54\r\n:6351590999\r\n$10\r\n2020-04-01\r\n:38\r\n:6351590999\r\n$10\r\n2020-03-01\r\n:27\r\n:6351590999\r\n$10\r\n2020-02-01\r\n:80\r\n:6351590999\r\n$10\r\n2020-01-01\r\n:99\r\n:6351590999\r\n$10\r\n2019-12-01\r\n:67\r\n:6351590999\r\n$10\r\n2019-11-01\r\n:120\r\n:6351590999\r\n$10\r\n2019-10-01\r\n:80\r\n:6351590999\r\n$10\r\n2019-09-01\r\n:76\r\n:6351590999\r\n$10\r\n2019-08-01\r\n:120\r\n:6351590999\r\n$10\r\n2019-07-01\r\n:140\r\n:6351590999\r\n$10\r\n2019-06-01\r\n:118\r\n:6351590999\r\n$10\r\n2019-05-01\r\n:146\r\n:6351590999\r\n$10\r\n2019-04-01\r\n:287\r\n:6351590999\r\n$10\r\n2019-03-01\r\n:83\r\n:6351590999\r\n$10\r\n2019-02-01\r\n:88\r\n:6351590999\r\n$10\r\n2019-01-01\r\n:262\r\n:6351590999\r\n$10\r\n2018-12-01\r\n:213\r\n:6351590999\r\n$10\r\n2018-11-01\r\n:251\r\n:6351590999\r\n$10\r\n2018-10-01\r\n:215\r\n:6351590999\r\n$10\r\n2018-09-01\r\n:192\r\n:6351590999\r\n$10\r\n2018-08-01\r\n:208\r\n:6351590999\r\n$10\r\n2018-07-01\r\n:339\r\n:6351590999\r\n$10\r\n2018-06-01\r\n:97\r\n:6351590999\r\n$10\r\n2018-05-01\r\n:162\r\n:6351590999\r\n$10\r\n2018-04-01\r\n:127\r\n:6351590999\r\n$10\r\n2018-03-01\r\n:147\r\n:6351590999\r\n$10\r\n2018-02-01\r\n:529\r\n:6351590999\r\n$10\r\n2018-01-01\r\n:702\r\n:6351590999\r\n$10\r\n2017-12-01\r\n:453\r\n:6351590999\r\n$10\r\n2017-11-01\r\n:70\r\n:6351590999\r\n$10\r\n2017-10-01\r\n:1\r\n:6351590999\r\n$10\r\n2017-08-01\r\n:1\r\n".into()); let response: Command = Command::with_assemble_pack(true, header, body); - let mut att = VecAttach::default(); - att.init(1); + let mut att = RetrieveAttach::new(1); let r = att.attach_si(&response); assert!(r); } diff --git a/protocol/src/vector/command.rs b/protocol/src/vector/command.rs index e007a86d2..c7f826a99 100644 --- a/protocol/src/vector/command.rs +++ b/protocol/src/vector/command.rs @@ -17,7 +17,8 @@ pub struct CommandProperties { // 指令在不路由或者无server响应时的响应位置, pub(crate) padding_rsp: &'static str, pub(crate) noforward: bool, - pub(crate) quit: bool, // 是否需要quit掉连接 + pub(crate) quit: bool, // 是否需要quit掉连接 + pub(crate) route: Route, // 请求路线,timeline、si? } // 默认响应 @@ -122,6 +123,8 @@ pub fn get_cfg(op_code: u16) -> crate::Result<&'static CommandProperties> { // } use Operation::*; + +use super::attachment::Route; type Cmd = CommandProperties; #[ctor::ctor] #[rustfmt::skip] @@ -141,13 +144,28 @@ pub(super) static SUPPORTED: Commands = { Cmd::new("quit").arity(1).op(Meta).padding(pt[1]).nofwd().quit(), // kvector 相关的指令 - Cmd::new("vget").arity(-2).op(Get).cmd_type(CommandType::VGet).padding(pt[3]).has_key().can_hold_field().can_hold_where_condition(), Cmd::new("vrange").arity(-2).op(Get).cmd_type(CommandType::VRange).padding(pt[3]).has_key().can_hold_field().can_hold_where_condition(), Cmd::new("vadd").arity(-2).op(Store).cmd_type(CommandType::VAdd).padding(pt[3]).has_key().can_hold_field(), // Cmd::new("vreplace").arity(-2).op(Store).cmd_type(CommandType::VReplace).padding(pt[3]).has_key().can_hold_field(), Cmd::new("vupdate").arity(-2).op(Store).cmd_type(CommandType::VUpdate).padding(pt[3]).has_key().can_hold_field().can_hold_where_condition(), Cmd::new("vdel").arity(-2).op(Store).cmd_type(CommandType::VDel).padding(pt[3]).has_key().can_hold_where_condition(), - Cmd::new("vcard").arity(-2).op(Get).cmd_type(CommandType::VCard).padding(pt[3]).has_key().can_hold_where_condition(), + Cmd::new("vcard").route(Route::Si).arity(-2).op(Get).cmd_type(CommandType::VCard).padding(pt[3]).has_key().can_hold_field().can_hold_where_condition(), + + // vget 只是从timeline获取单条记录,所以route需要设置为timeline/main + Cmd::new("vget").route(Route::TimelineOrMain).arity(-2).op(Get).cmd_type(CommandType::VGet).padding(pt[3]).has_key().can_hold_field().can_hold_where_condition(), + + // 对于timeline、si后缀指令,只是中间状态,为了处理方便,不额外增加字段,仍然作为独立指令来处理 + Cmd::new("vrange.timeline").route(Route::TimelineOrMain).arity(-2).op(Get).cmd_type(CommandType::VRangeTimeline).padding(pt[3]).has_key().can_hold_field().can_hold_where_condition(), + Cmd::new("vrange.si").route(Route::Si).arity(-2).op(Get).cmd_type(CommandType::VRangeSi).padding(pt[3]).has_key().can_hold_field().can_hold_where_condition(), + Cmd::new("vadd.timeline").route(Route::TimelineOrMain).arity(-2).op(Store).cmd_type(CommandType::VAddTimeline).padding(pt[3]).has_key().can_hold_field(), + Cmd::new("vadd.si").route(Route::Si).arity(-2).op(Store).cmd_type(CommandType::VAddSi).padding(pt[3]).has_key().can_hold_field(), + Cmd::new("vupdate.timeline").route(Route::TimelineOrMain).arity(-2).op(Store).cmd_type(CommandType::VUpdateTimeline).padding(pt[3]).has_key().can_hold_field().can_hold_where_condition(), + // VUpdateSi, 在decr时,会用到update.si + Cmd::new("vupdate.si").route(Route::Si).arity(-2).op(Store).cmd_type(CommandType::VUpdateSi).padding(pt[3]).has_key().can_hold_field().can_hold_where_condition(), + Cmd::new("vdel.timeline").route(Route::TimelineOrMain).arity(-2).op(Store).cmd_type(CommandType::VDelTimeline).padding(pt[3]).has_key().can_hold_where_condition(), + // 部分业务,仍然会del si + Cmd::new("vdel.si").route(Route::Si).arity(-2).op(Store).cmd_type(CommandType::VDelSi).padding(pt[3]).has_key().can_hold_where_condition(), + ] { cmds.add_support(c); } @@ -194,10 +212,18 @@ impl CommandProperties { self.can_hold_where_condition = true; self } + pub(crate) fn route(mut self, route: Route) -> Self { + self.route = route; + self + } pub(crate) fn quit(mut self) -> Self { self.quit = true; self } + #[inline] + pub fn get_route(&self) -> Route { + self.route + } } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -206,11 +232,20 @@ pub enum CommandType { // kvector 访问的指令 VGet, VRange, + VCard, VAdd, - // VReplace, VUpdate, VDel, - VCard, + + // 扩展的单表指令 + VRangeTimeline, + VRangeSi, + VAddTimeline, + VAddSi, + VUpdateTimeline, + VUpdateSi, // 注意对于si,update只是基于count的incr、decr,并不是普通意义上的直接设置为某值 + VDelTimeline, + VDelSi, // // 兼容redisclient而引入的指令 // Select, @@ -226,85 +261,3 @@ impl Default for CommandType { CommandType::Unknown } } -// impl From for CommandType { -// fn from(name: RingSlice) -> Self { -// if name.len() >= 7 { -// return Self::Unknown; -// } - -// let mut oft = 0; -// // 第一个字符不是V,cmd就是扩展的兼容指令 or 未知指令 -// if name.scan_to_uppercase(&mut oft) == b'V' { -// match name.scan_to_uppercase(&mut oft) { -// b'R' => Self::parse_to_cmd(&name, oft, "VRANGE", Self::VRange), -// b'A' => Self::parse_to_cmd(&name, oft, "VADD", Self::VAdd), -// b'U' => Self::parse_to_cmd(&name, oft, "VUPDATE", Self::VUpdate), -// b'D' => Self::parse_to_cmd(&name, oft, "VDEL", Self::VDel), -// b'C' => Self::parse_to_cmd(&name, oft, "VCARD", Self::VCard), -// _ => Self::Unknown, -// } -// } else { -// match name.scan_to_uppercase(&mut oft) { -// b'S' => Self::parse_to_cmd(&name, oft, "SELECT", Self::Select), -// b'P' => Self::parse_to_cmd(&name, oft, "PING", Self::Ping), -// b'H' => Self::parse_to_cmd(&name, oft, "HELLO", Self::Hello), -// b'Q' => Self::parse_to_cmd(&name, oft, "QUIT", Self::Quit), -// _ => Self::Unknown, -// } -// } -// } -// } - -// impl CommandType { -// /// 检测oft之后的name是否于对应cmd name相同,如果相同则返回对应的CMD -// #[inline] -// fn parse_to_cmd(name: &RingSlice, oft: usize, cmd: &str, cmd_type: CommandType) -> Self { -// // 检测oft位置目前只有1和2,1表示非‘V’开头的redis兼容指令,2表示V开头的kvector指令 -// assert!(oft == 1 || oft == 2, "{}", name); - -// if name.len() == cmd.len() && name.start_ignore_case(oft, cmd[oft..].as_bytes()) { -// cmd_type -// } else { -// Self::Unknown -// } -// } - -// #[inline] -// pub(super) fn operation(&self) -> Operation { -// match self { -// CommandType::VRange => Operation::Gets, - -// CommandType::Unknown => panic!("no operation for unknow!"), -// _ => Operation::Store, -// } -// } - -// /// 这个type是否是合法的,unknow不合法 -// #[inline] -// pub(super) fn is_invalid(&self) -> bool { -// match self { -// CommandType::Unknown => true, -// _ => false, -// } -// } -// } - -// impl Default for CommandType { -// fn default() -> Self { -// Self::Unknown -// } -// } - -// /// 扫描对应位置的子节,将对应位置的字符转为大写,同时后移读取位置oft -// pub trait Uppercase { -// // 扫描当前子节,转成大写,并讲位置+1 -// fn scan_to_uppercase(&self, oft: &mut usize) -> u8; -// } - -// impl Uppercase for RingSlice { -// fn scan_to_uppercase(&self, oft: &mut usize) -> u8 { -// let b = self.at(*oft); -// *oft += 1; -// b.to_ascii_uppercase() -// } -// } diff --git a/protocol/src/vector/mod.rs b/protocol/src/vector/mod.rs index 047d22cf1..d8cc73bec 100644 --- a/protocol/src/vector/mod.rs +++ b/protocol/src/vector/mod.rs @@ -1,5 +1,5 @@ pub mod attachment; -mod command; +pub mod command; pub(crate) mod error; pub mod flager; pub mod mysql; @@ -9,18 +9,18 @@ pub mod redis; mod reqpacket; mod rsppacket; -use std::fmt::Write; -use std::mem; +use std::fmt::{Display, Write}; use crate::{ - Attachment, Command, Commander, Error, HashedCommand, Metric, MetricItem, Protocol, + Attachment, Command, Commander, Error, HashedCommand, Metric, MetricItem, Operation, Protocol, RequestProcessor, Result, Stream, Writer, }; +use attachment::{AttachType, Route}; use chrono::NaiveDate; use ds::RingSlice; use sharding::hash::Hash; -use self::attachment::VecAttach; +use self::attachment::VectorAttach; use self::packet::RedisPack; use self::reqpacket::RequestPacket; use self::rsppacket::ResponsePacket; @@ -123,29 +123,41 @@ impl Protocol for Vector { w.write("\r\n".as_bytes())?; } else { if ctx.attachment().is_some() { - // 有attachment: 组装rsp: header(vec[0]) + *body_tokens + vec[1..] - let attach = VecAttach::attach(ctx.attachment().unwrap()); - if attach.body_token_count() > 0 { - w.write(attach.header())?; - w.write(format!("*{}\r\n", attach.body_token_count()).as_bytes())?; - for b in attach.body() { - w.write(b.as_slice())?; + let vec_attach = + VectorAttach::attach(ctx.attachment().expect("vector attache")); + match vec_attach.attch_type() { + AttachType::Retrieve => { + // 有attachment: 组装rsp: header(vec[0]) + *body_tokens + vec[1..] + let attach = vec_attach.retrieve_attach(); + if attach.body_token_count() > 0 { + w.write(attach.header())?; + w.write(format!("*{}\r\n", attach.body_token_count()).as_bytes())?; + for b in attach.body() { + w.write(b.as_slice())?; + } + } else { + // 返回空 + w.write("$-1\r\n".as_bytes())?; + } + } + AttachType::Store => { + let attach = vec_attach.store_attach(); + w.write(format!(":{}\r\n", attach.affected_rows).as_bytes())?; + } + _ => { + panic!("bad attach type"); } - } else { - // 返回空 - w.write("$-1\r\n".as_bytes())?; } } else { // 无attachment: response已封装为redis协议。正常响应有三种: // 1. 只返回影响的行数 // 2. 一行或多行数据 // 3. 结果为空 - if response.header.rows > 0 { - w.write(response.header.header.as_ref())?; - w.write( - format!("*{}\r\n", response.header.rows * response.header.columns) - .as_bytes(), - )?; + if let Some(ref header) = response.header { + if header.rows > 0 { + w.write(header.header.as_ref())?; + w.write(format!("*{}\r\n", header.rows * header.columns).as_bytes())?; + } } w.write_slice(response, 0)?; // value } @@ -177,38 +189,65 @@ impl Protocol for Vector { // 将中间响应放到attachment中,方便后续继续查询 // 先收集si信息,再收集body - // 返回值:是否需要继续查询 + // 返回值:是否继续下一轮 #[inline] fn update_attachment(&self, attachment: &mut Attachment, response: &mut Command) -> bool { assert!(response.ok()); - let attach = VecAttach::attach_mut(attachment); + let vec_attach = VectorAttach::attach_mut(attachment); //收到响应就算ok,响应有问题也不会发送到topo了 - attach.rsp_ok = true; - - if attach.is_empty() { - // TODO 先打通,此处的内存操作需要考虑优化 fishermen - let mut header_data = Vec::new(); - let header = &mut response.header; - mem::swap(&mut header_data, &mut header.header); - attach.attach_header(header_data); - } + vec_attach.rsp_ok = true; + + match vec_attach.attch_type() { + AttachType::Retrieve => { + let attach = vec_attach.retrieve_attach_mut(); + // 如果header为none,说明当前查询结果为空 + // 如果有si,则看是否还有后续请求 + // 如果没有si,直接返回false + if response.header.is_none() { + if attach.has_si() { + return attach.left_count != 0; + } + return false; + } + + assert!(response.header.is_some(), "rsp:{}", response); + let body_data = response.data().0.to_vec(); + let header = response.header.as_mut().expect("rsp header"); - // TODO 先打通,此处的内存操作需要考虑优化 fishermen - match attach.has_si() { - true => { - if response.header.rows > 0 { - let header = &response.header; - attach.attach_body(response.data().0.to_vec(), header.rows, header.columns); + if attach.is_empty() { + // TODO 简化swap操作,注意功能验证 fishermen + // let mut header_data = Vec::new(); + // let header = &mut response.header; + // mem::swap(&mut header_data, &mut header.header); + attach.swap_header_data(&mut header.header); } - attach.left_count == 0 + + // TODO 先打通,此处的内存操作需要考虑优化 fishermen + match attach.has_si() { + true => { + if header.rows > 0 { + // let header = &response.header; + attach.attach_body(body_data, header.rows, header.columns); + } + attach.left_count != 0 + } + // 按si解析响应: 未成功获取有效si信息或者解析si失败,并终止后续请求 + false => response.count() != 0 && attach.attach_si(response), + } + } + AttachType::Store => { + let store_attach = vec_attach.store_attach_mut(); + store_attach.incr_affected_rows(response.count() as u16); + true + } + _ => { + panic!("malformed attach"); } - // 按si解析响应: 未成功获取有效si信息或者解析si失败,并终止后续请求 - false => response.count() == 0 || !attach.attach_si(response), } } #[inline] fn drop_attach(&self, att: Attachment) { - let _ = VecAttach::from(att); + let _ = VectorAttach::from(att); } } @@ -303,7 +342,7 @@ pub(crate) const COND_ORDER: &[u8] = b"ORDER"; pub(crate) const COND_LIMIT: &[u8] = b"LIMIT"; pub(crate) const COND_GROUP: &[u8] = b"GROUP"; -const DEFAULT_LIMIT: usize = 15; +const DEFAULT_LIMIT: u16 = 15; #[derive(Debug, Clone, Default)] pub struct Condition { @@ -351,6 +390,8 @@ pub type Field = (RingSlice, RingSlice); #[derive(Debug, Clone, Default)] pub struct VectorCmd { pub cmd: CommandType, + // 对于aggregation策略 + pub route: Option, pub keys: Vec, pub fields: Vec, pub wheres: Vec, @@ -361,14 +402,27 @@ pub struct VectorCmd { impl VectorCmd { #[inline(always)] - pub fn limit(&self) -> usize { + pub fn limit(&self, operation: Operation) -> u16 { match self.limit.limit.try_str_num(..) { - Some(limit) => limit, - None => DEFAULT_LIMIT, + Some(limit) => limit as u16, + None => match operation { + Operation::Get | Operation::Gets | Operation::MGet => DEFAULT_LIMIT, + Operation::Store | Operation::Meta | Operation::Other => 0, + }, } } } +impl Display for VectorCmd { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "cmd:{:?}, route:{:?}, keys:{:?}", + self.cmd, self.route, self.keys + ) + } +} + /// field 字段的值,对于‘field’关键字,值是|分隔的field names,否则就是二进制value #[derive(Debug, Clone)] pub enum FieldVal { @@ -413,10 +467,35 @@ impl FieldVal { } } +pub const DATE_YYMM: &str = "yymm"; +pub const DATE_YYMMDD: &str = "yymmdd"; + +#[derive(Debug, Clone)] +pub enum Postfix { + YYMM, + YYMMDD, +} + +impl TryInto for &str { + type Error = Error; + fn try_into(self) -> std::result::Result { + match self.to_lowercase().as_str() { + DATE_YYMM => Ok(Postfix::YYMM), + DATE_YYMMDD => Ok(Postfix::YYMMDD), + _ => Err(Error::RequestProtocolInvalid), + } + } +} + +pub enum KeysType<'a> { + Keys(&'a String), + Time, +} + pub trait Strategy { fn keys(&self) -> &[String]; //todo 通过代理类型实现 - fn condition_keys(&self) -> Box> + '_>; + fn keys_with_type(&self) -> Box + '_>; fn write_database_table(&self, buf: &mut impl Write, date: &NaiveDate, hash: i64); fn write_si_database_table(&self, buf: &mut impl Write, hash: i64); fn batch(&self, limit: u64, vcmd: &VectorCmd) -> u64; diff --git a/protocol/src/vector/mysql.rs b/protocol/src/vector/mysql.rs index 8c960f8dc..016f08418 100644 --- a/protocol/src/vector/mysql.rs +++ b/protocol/src/vector/mysql.rs @@ -4,10 +4,10 @@ use crate::kv::common::Command; use crate::kv::{MysqlBinary, VectorSqlBuilder}; use crate::vector::{CommandType, Condition, Field, VectorCmd}; use crate::{Error, Result}; -use chrono::NaiveDate; +use chrono::{Datelike, NaiveDate}; use ds::RingSlice; -use super::Strategy; +use super::{KeysType, Strategy}; struct VRingSlice<'a>(&'a RingSlice); impl<'a> Display for VRingSlice<'a> { @@ -107,8 +107,8 @@ struct InsertCols<'a, S>(&'a S, &'a Vec); impl<'a, S: Strategy> Display for InsertCols<'a, S> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let &Self(strategy, fields) = self; - for (i, key) in (&mut strategy.condition_keys()).enumerate() { - if let Some(key) = key { + for (i, key) in (&mut strategy.keys_with_type()).enumerate() { + if let KeysType::Keys(key) = key { if i == 0 { let _ = write!(f, "`{}`", key); } else { @@ -127,8 +127,8 @@ struct InsertVals<'a, S>(&'a S, &'a Vec, &'a Vec); impl<'a, S: Strategy> Display for InsertVals<'a, S> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let &Self(strategy, keys, fields) = self; - for (i, key) in (&mut strategy.condition_keys()).enumerate() { - if let Some(_) = key { + for (i, key) in (&mut strategy.keys_with_type()).enumerate() { + if let KeysType::Keys(_) = key { if i == 0 { let _ = write!(f, "{}", Val(&keys[i])); } else { @@ -163,7 +163,8 @@ impl<'a, S: Strategy> Display for KeysAndCondsAndOrderAndLimit<'a, S> { let &Self( strategy, vcmd @ VectorCmd { - cmd: _, + cmd, + route: _, keys, fields: _, wheres, @@ -173,8 +174,23 @@ impl<'a, S: Strategy> Display for KeysAndCondsAndOrderAndLimit<'a, S> { }, extra, ) = self; - for (i, key) in (&mut strategy.condition_keys()).enumerate() { - if let Some(key) = key { + + // 对于vupdate、vdel,必须得有where语句 + match cmd { + CommandType::VUpdate + | CommandType::VDel + | CommandType::VUpdateTimeline + | CommandType::VDelTimeline => { + if wheres.len() == 0 { + log::warn!("+++ ignore vdel/vupdate without where condition"); + return Err(std::fmt::Error); + } + } + _ => {} + } + + for (i, key) in (&mut strategy.keys_with_type()).enumerate() { + if let KeysType::Keys(key) = key { if i == 0 { let _ = write!(f, "`{}`={}", key, Val(&keys[i])); } else { @@ -227,17 +243,13 @@ impl<'a, S: Strategy> SqlBuilder<'a, S> { strategy: &'a S, limit: u64, ) -> Result { - if vcmd.keys.len() != strategy.keys().len() { - Err(Error::RequestProtocolInvalid) - } else { - Ok(Self { - vcmd, - hash, - date, - strategy, - limit, - }) - } + Ok(Self { + vcmd, + hash, + date, + strategy, + limit, + }) } } @@ -251,18 +263,25 @@ impl<'a, S: Strategy> VectorSqlBuilder for SqlBuilder<'a, S> { fn len(&self) -> usize { //按照可能的最长长度计算,其中table长度取得32,key长度取得5,测试比实际长15左右 let mut base = match self.vcmd.cmd { - CommandType::VRange | CommandType::VGet => "select from where ".len(), + CommandType::VRange | CommandType::VGet | CommandType::VRangeTimeline => { + "select from where ".len() + } CommandType::VCard => "select count(*) from where ".len(), - CommandType::VAdd => "insert into () values ()".len(), - CommandType::VUpdate => "update set where ".len(), - CommandType::VDel => "delete from where ".len(), - _ => { + CommandType::VAdd | CommandType::VAddTimeline => "insert into () values ()".len(), + CommandType::VUpdate | CommandType::VUpdateTimeline => "update set where ".len(), + CommandType::VDel | CommandType::VDelTimeline => "delete from where ".len(), + CommandType::VRangeSi + | CommandType::VAddSi + | CommandType::VUpdateSi + | CommandType::VDelSi + | CommandType::Unknown => { //校验应该在parser_req出 - panic!("not support cmd_type:{:?}", self.vcmd.cmd); + panic!("not support in timeline cmd_type:{:?}", self.vcmd.cmd); } }; let VectorCmd { cmd: _, + route: _, keys, fields, wheres, @@ -291,16 +310,18 @@ impl<'a, S: Strategy> VectorSqlBuilder for SqlBuilder<'a, S> { base.max(64) } - fn write_sql(&self, buf: &mut impl Write) { + fn write_sql(&self, buf: &mut impl Write) -> crate::Result<()> { + // TODO:使用所有commandType的具体类型,而非通配符_,避免新增type后,因未变更没有正确处理;后续考虑优化 fishermen match self.vcmd.cmd { - CommandType::VRange | CommandType::VGet => { + CommandType::VRange | CommandType::VGet | CommandType::VRangeTimeline => { let _ = write!( buf, "select {} from {} where {}", Select(self.vcmd.fields.get(0)), Table(self.strategy, &self.date, self.hash), KeysAndCondsAndOrderAndLimit(self.strategy, &self.vcmd, self.limit), - ); + ) + .map_err(|_| Error::RequestProtocolInvalid)?; } CommandType::VCard => { let _ = write!( @@ -308,39 +329,48 @@ impl<'a, S: Strategy> VectorSqlBuilder for SqlBuilder<'a, S> { "select count(*) from {} where {}", Table(self.strategy, &self.date, self.hash), KeysAndCondsAndOrderAndLimit(self.strategy, &self.vcmd, self.limit), - ); + ) + .map_err(|_| Error::RequestProtocolInvalid)?; } - CommandType::VAdd => { + CommandType::VAdd | CommandType::VAddTimeline => { let _ = write!( buf, "insert into {} ({}) values ({})", Table(self.strategy, &self.date, self.hash), InsertCols(self.strategy, &self.vcmd.fields), InsertVals(self.strategy, &self.vcmd.keys, &self.vcmd.fields), - ); + ) + .map_err(|_| Error::RequestProtocolInvalid)?; } - CommandType::VUpdate => { + CommandType::VUpdate | CommandType::VUpdateTimeline => { let _ = write!( buf, "update {} set {} where {}", Table(self.strategy, &self.date, self.hash), UpdateFields(&self.vcmd.fields), KeysAndCondsAndOrderAndLimit(self.strategy, &self.vcmd, self.limit), - ); + ) + .map_err(|_| Error::RequestProtocolInvalid)?; } - CommandType::VDel => { + CommandType::VDel | CommandType::VDelTimeline => { let _ = write!( buf, "delete from {} where {}", Table(self.strategy, &self.date, self.hash), KeysAndCondsAndOrderAndLimit(self.strategy, &self.vcmd, self.limit), - ); + ) + .map_err(|_| Error::RequestProtocolInvalid)?; } - _ => { + CommandType::VRangeSi + | CommandType::VAddSi + | CommandType::VUpdateSi + | CommandType::VDelSi + | CommandType::Unknown => { //校验应该在parser_req出 - panic!("not support cmd_type:{:?}", self.vcmd.cmd); + panic!("not support in timeline cmd_type:{:?}", self.vcmd.cmd); } } + Ok(()) } } @@ -348,19 +378,17 @@ pub struct SiSqlBuilder<'a, S> { vcmd: &'a VectorCmd, hash: i64, strategy: &'a S, + date: NaiveDate, } impl<'a, S: Strategy> SiSqlBuilder<'a, S> { - pub fn new(vcmd: &'a VectorCmd, hash: i64, strategy: &'a S) -> Result { - if vcmd.keys.len() != strategy.keys().len() { - Err(Error::RequestProtocolInvalid) - } else { - Ok(Self { - vcmd, - hash, - strategy, - }) - } + pub fn new(vcmd: &'a VectorCmd, hash: i64, date: NaiveDate, strategy: &'a S) -> Result { + Ok(Self { + vcmd, + hash, + strategy, + date, + }) } } @@ -383,22 +411,89 @@ impl<'a, S: Strategy> VectorSqlBuilder for SiSqlBuilder<'a, S> { // select date,count字段名, // 条件需要key,字段名 - fn write_sql(&self, buf: &mut impl Write) { + + fn write_sql(&self, buf: &mut impl Write) -> crate::Result<()> { match self.vcmd.cmd { - CommandType::VRange => { + CommandType::VRange | CommandType::VRangeSi | CommandType::VGet => { let _ = write!( buf, "select {} from {} where {}", SiSelect(self.strategy.keys(), self.strategy.si_cols()), SiTable(self.strategy, self.hash), SiKeysAndCondsAndOrder(self.strategy, &self.vcmd), + ) + .map_err(|_| Error::RequestProtocolInvalid)?; + } + // 1.1. 更新si: insert into $db$.$tb$ (uid, object_type, start_date, count) values (?, ?, ?, 1) on duplicate key update count=greatest(0, cast(count as signed) + 1)。 + // 1.2. 根据设置更新timeline:insert into $db$.$tb$ (uid, object_type, like_id, object_id) values (?, ?, ?, ?) + // 备注:有些场景只更新si,有些场景只更新timeline,需要业务修改时考虑。 + // 1.3. mesh cmd: vadd $uid,$date object_type $obj_type object_id $obj_id like_id $like_id + CommandType::VAdd | CommandType::VAddSi => { + let count_col_name = self.strategy.si_cols().last().unwrap(); + //对si表的更新插入至少需要keys + count + counttype 这些字段,下面会兜底校验 + let si_insert_vals = SiInsertVals( + self.strategy, + &self.date, + &self.vcmd.keys, + &self.vcmd.fields, ); + let count = si_insert_vals.count_in_field(); + write!( + buf, + "insert into {} ({}) values ({}) on duplicate key update {}=greatest(0, cast({} as signed) + {})", + SiTable(self.strategy, self.hash), + SiInsertCols(self.strategy, &self.vcmd.fields), + si_insert_vals, + count_col_name, + count_col_name, + count + ).map_err(|_| Error::RequestProtocolInvalid)?; + } + CommandType::VUpdate | CommandType::VUpdateSi => { + let _ = write!( + buf, + "update {} set {} where {}", + SiTable(self.strategy, self.hash), + SiUpdateFields(self.strategy, &self.vcmd.fields), + SiKeysAndUpdateOrDelConds(self.strategy, &self.vcmd, &self.date), + ) + .map_err(|_| Error::RequestProtocolInvalid)?; + } + // 2.1. 更新si:update $db$.$tb$ set count = greatest(0,cast(count as signed) - 1) where uid = ? and object_type = ? and start_date = ? + // 2.2. 删除timeline:delete from $db$.$tb$ where uid=? and object_id=? + // 备注:有些场景只更新si,有些场景只更新timeline,需要业务修改时考虑。 + // 2.3. mesh cmd: vdel $uid,$date where object_type = $obj_type object_id = $obj_id + CommandType::VDel | CommandType::VDelSi => { + //对si表的更新插入至少需要keys + count + counttype 这些字段,下面会兜底校验 + write!( + buf, + "update {} set count = greatest(0,cast(count as signed) - 1) where {}", + SiTable(self.strategy, self.hash), + SiKeysAndUpdateOrDelConds(self.strategy, &self.vcmd, &self.date), + ) + .map_err(|_| Error::RequestProtocolInvalid)?; + } + CommandType::VCard => { + let fields = self.vcmd.fields.first().map_or(None, |f| Some(f)); + let _ = write!( + buf, + "select {} from {} where {}", + SiCardCols(self.strategy, fields), + SiTable(self.strategy, self.hash), + SiKeysAndConds(self.strategy, &self.vcmd), + ) + .map_err(|_| Error::RequestProtocolInvalid)?; } - _ => { + CommandType::VRangeTimeline + | CommandType::VAddTimeline + | CommandType::VUpdateTimeline + | CommandType::VDelTimeline + | CommandType::Unknown => { //校验应该在parser_req出 panic!("not support cmd_type:{:?}", self.vcmd.cmd); } - } + }; + Ok(()) } } @@ -417,6 +512,26 @@ impl<'a> Display for SiSelect<'a> { } } +struct SiKeysAndConds<'a, S>(&'a S, &'a VectorCmd); +impl<'a, S: Strategy> Display for SiKeysAndConds<'a, S> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let &Self(strategy, VectorCmd { keys, wheres, .. }) = self; + let key_name = &strategy.keys()[0]; + let cols = strategy.si_cols(); + let _ = write!(f, "`{}`={}", key_name, Val(&keys[0])); + for w in wheres { + //条件中和si相同的列写入条件 + for col in cols { + if w.field.equal(col.as_bytes()) { + let _ = write!(f, " and {}", ConditionDisplay(w)); + break; + } + } + } + Ok(()) + } +} + struct SiKeysAndCondsAndOrder<'a, S>(&'a S, &'a VectorCmd); impl<'a, S: Strategy> Display for SiKeysAndCondsAndOrder<'a, S> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -450,3 +565,230 @@ impl<'a, S: Strategy> Display for SiTable<'a, S> { Ok(()) } } + +struct SiCardCols<'a, S>(&'a S, Option<&'a Field>); +// 如果有非count计数的field,则同时返回这些field,如果有count计数的field,则只返回sum(count) as count的计数 +// 目前配置只支持一个count(带type) +impl<'a, S: Strategy> Display for SiCardCols<'a, S> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let &Self(strategy, fields_origin) = self; + let si_cols = strategy.si_cols(); + if let Some(field) = fields_origin { + if field.1.equal_ignore_case(si_cols[1].as_bytes()) { + let _ = write!(f, "{},sum({}) as {}", si_cols[1], si_cols[2], si_cols[2]); + } else { + return Err(std::fmt::Error); + } + } else { + let _ = write!(f, "sum({}) as {}", si_cols[2], si_cols[2]); + } + Ok(()) + } +} + +struct SiInsertCols<'a, S>(&'a S, &'a Vec); +impl<'a, S: Strategy> Display for SiInsertCols<'a, S> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let &Self(strategy, fields) = self; + for (i, key) in strategy.keys_with_type().enumerate() { + if let KeysType::Keys(key) = key { + if i == 0 { + let _ = write!(f, "`{}`", key); + } else { + let _ = write!(f, ",`{}`", key); + } + } + } + let mut has_count_type = false; + let si_cols = strategy.si_cols(); + for field in fields { + // 目前配置限制了count只能支持1种类型,先打通,后续需要调整配置,以支持多种count fishermen + // for col in si_cols { + // if field.0.equal(col.as_bytes()) { + // let _ = write!(f, ",{}", Key(&field.0)); + // has_count_type = true; + // } + + assert_eq!(si_cols.len(), 3); + if field.0.equal(si_cols[1].as_bytes()) { + let _ = write!(f, ",{}", Key(&field.0)); + has_count_type = true; + break; + } + } + //必须提供count_type + if si_cols.len() > 2 && !has_count_type { + return Err(std::fmt::Error); + } + //date,count + let _ = write!( + f, + ",{},{}", + si_cols.first().unwrap(), + si_cols.last().unwrap() + ); + Ok(()) + } +} + +struct SiInsertVals<'a, S>(&'a S, &'a NaiveDate, &'a Vec, &'a Vec); +impl<'a, S: Strategy> SiInsertVals<'a, S> { + pub(crate) fn count_in_field(&self) -> i32 { + let &Self(strategy, _date, _keys, fields) = self; + let si_cols = strategy.si_cols(); + for field in fields { + // 目前配置限制了count只能支持1种类型,先打通,后续需要调整配置,以支持多种count fishermen + // si cols 当前格式:stat_date,type,count + assert_eq!(si_cols.len(), 3); + if field.0.equal(si_cols[2].as_bytes()) { + let count_slice = field.1; + if count_slice.at(0) == '-' as u8 { + return -1 * (count_slice.str_num(1..) as i32); + } else { + return count_slice.str_num(..) as i32; + } + } + } + return 1; + } +} +impl<'a, S: Strategy> Display for SiInsertVals<'a, S> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let &Self(strategy, date, keys, fields) = self; + for (i, key) in (&mut strategy.keys_with_type()).enumerate() { + if let KeysType::Keys(_) = key { + if i == 0 { + let _ = write!(f, "{}", Val(&keys[i])); + } else { + let _ = write!(f, ",{}", Val(&keys[i])); + } + } + } + let si_cols = strategy.si_cols(); + let count = self.count_in_field(); + for field in fields { + // 目前配置限制了count只能支持1种类型,先打通,后续需要调整配置,以支持多种count fishermen + // si cols 当前格式:stat_date,type,count + assert_eq!(si_cols.len(), 3); + if field.0.equal(si_cols[1].as_bytes()) { + let _ = write!(f, ",{}", Val(&field.1)); + break; + } + } + //date,count + // 写startdate的格式都是统一按照YY-mm-dd写的 + let _ = write!( + f, + ",'{}-{}-{}',{}", + date.year(), + date.month(), + date.day(), + count + ); + Ok(()) + } +} + +struct SiUpdateFields<'a, S: Strategy>(&'a S, &'a Vec); +impl<'a, S: Strategy> Display for SiUpdateFields<'a, S> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let &Self(strategy, fields) = self; + // 首先对于count增减,其次对其他字段进行直接设置 + let si_cols = strategy.si_cols(); + let mut update_count = false; + for field in fields.iter() { + if field.0.equal_ignore_case(si_cols[2].as_bytes()) { + update_count = true; + if field.1.start_with(0, "-".as_bytes()) { + let count = field.1.str_num(1..); + let _ = write!( + f, + "{}=greatest(0,cast({} as signed) - {})", + Key(&field.0), + Key(&field.0), + count + ); + } else { + let count = field.1.str_num(..); + let _ = write!( + f, + "{}=greatest(0,cast({} as signed) + {})", + Key(&field.0), + Key(&field.0), + count, + ); + } + } + } + for (i, field) in fields.iter().enumerate() { + if !field.0.equal_ignore_case(si_cols[2].as_bytes()) { + if i == 0 && !update_count { + let _ = write!(f, "{}={}", Key(&field.0), Val(&field.1)); + } else { + let _ = write!(f, ",{}={}", Key(&field.0), Val(&field.1)); + } + } + } + Ok(()) + } +} + +struct SiKeysAndUpdateOrDelConds<'a, S>(&'a S, &'a VectorCmd, &'a NaiveDate); +impl<'a, S: Strategy> Display for SiKeysAndUpdateOrDelConds<'a, S> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let &Self( + strategy, + VectorCmd { + route, + keys, + wheres, + .. + }, + date, + ) = self; + for (i, key) in (&mut strategy.keys_with_type()).enumerate() { + if let KeysType::Keys(key) = key { + if i == 0 { + let _ = write!(f, "`{}`={}", key, Val(&keys[i])); + } else { + let _ = write!(f, " and `{}`={}", key, Val(&keys[i])); + } + break; + } + } + let mut has_count_type = false; + let si_cols = strategy.si_cols(); + for w in wheres { + let mut added = false; + //条件中和si相同的列写入条件 + for col in si_cols { + if w.field.equal(col.as_bytes()) { + has_count_type = true; + let _ = write!(f, " and {}", ConditionDisplay(w)); + added = true; + break; + } + } + // 聚合查询会带入timeline的额外条件,不需要带入sql;只在si查询才需要增加这些条件 fishermen + if !route.expect("aggregation").is_aggregation() && !added { + let _ = write!(f, " and {}", ConditionDisplay(w)); + } + } + log::info!("+++ ==== si cols {} / {}", si_cols.len(), has_count_type); + //必须提供count_type + if si_cols.len() > 2 && !has_count_type { + log::debug!("++++ don't has count type"); + return Err(std::fmt::Error); + } + //date + let _ = write!( + f, + " and {}='{}-{}-{}'", + si_cols.first().unwrap(), + date.year(), + date.month(), + date.day(), + ); + Ok(()) + } +} diff --git a/protocol/src/vector/redis.rs b/protocol/src/vector/redis.rs index 2eb025183..37b7d943e 100644 --- a/protocol/src/vector/redis.rs +++ b/protocol/src/vector/redis.rs @@ -7,14 +7,24 @@ pub(crate) const FIELD_BYTES: &'static [u8] = b"FIELD"; pub(crate) const KVECTOR_SEPARATOR: u8 = b','; /// 根据parse的结果,此处进一步获得kvector的detail/具体字段信息,以便进行sql构建 -pub fn parse_vector_detail(cmd: RingSlice, flag: &Flag) -> crate::Result { +pub fn parse_vector_detail( + cmd: RingSlice, + flag: &Flag, + config_aggregation: bool, +) -> crate::Result { let data = Packet::from(cmd); let mut vcmd: VectorCmd = Default::default(); - vcmd.cmd = get_cfg(flag.op_code())?.cmd_type; + let cmd_props = get_cfg(flag.op_code())?; + vcmd.cmd = cmd_props.cmd_type; // 解析keys parse_vector_key(&data, flag.key_pos() as usize, &mut vcmd)?; + // 只对strategy为aggregation的配置,才设置route,否则直接访问当前main库表 + if config_aggregation { + vcmd.route = Some(cmd_props.route); + } + // 解析fields let field_start = flag.field_pos() as usize; let condition_pos = flag.condition_pos() as usize; @@ -151,7 +161,7 @@ pub(crate) fn validate_field_name(field_name: RingSlice) -> Result<()> { /// 6. vget:key不能为0 pub(crate) fn validate_cmd(vcmd: &VectorCmd, cmd_type: CommandType) -> Result<()> { match cmd_type { - CommandType::VRange => { + CommandType::VRange | CommandType::VRangeTimeline | CommandType::VRangeSi => { // vrange 的fields数量不能大于1 if vcmd.fields.len() > 1 || (vcmd.fields.len() == 1 && !vcmd.fields[0].0.equal_ignore_case(FIELD_BYTES)) @@ -159,17 +169,17 @@ pub(crate) fn validate_cmd(vcmd: &VectorCmd, cmd_type: CommandType) -> Result<() return Err(crate::Error::RequestInvalidMagic); } } - CommandType::VAdd => { + CommandType::VAdd | CommandType::VAddSi | CommandType::VAddTimeline => { if vcmd.fields.len() == 0 || vcmd.wheres.len() > 0 { return Err(crate::Error::RequestInvalidMagic); } } - CommandType::VUpdate => { + CommandType::VUpdate | CommandType::VUpdateTimeline | CommandType::VUpdateSi => { if vcmd.fields.len() == 0 { return Err(crate::Error::RequestInvalidMagic); } } - CommandType::VDel => { + CommandType::VDel | CommandType::VDelSi | CommandType::VDelTimeline => { if vcmd.fields.len() > 0 { return Err(crate::Error::RequestInvalidMagic); } @@ -181,7 +191,7 @@ pub(crate) fn validate_cmd(vcmd: &VectorCmd, cmd_type: CommandType) -> Result<() } } CommandType::VCard => {} - _ => { + CommandType::Unknown => { panic!("unknown kvector cmd:{:?}", vcmd); } } diff --git a/protocol/src/vector/rsppacket.rs b/protocol/src/vector/rsppacket.rs index ba6e1bad9..43382103e 100644 --- a/protocol/src/vector/rsppacket.rs +++ b/protocol/src/vector/rsppacket.rs @@ -85,7 +85,11 @@ impl<'a, S: crate::Stream> ResponsePacket<'a, S> { // 如果是只有meta的ok packet,直接返回影响的列数,如insert/delete/update if let Or::B(ok) = meta { let affected = ok.affected_rows(); - let cmd = self.build_final_affected_rows_rsp_cmd(affected); + let mut cmd = self.build_final_affected_rows_rsp_cmd(affected); + if affected > 0 { + // 目前affected只对kvector的聚合模式有效,但此处暂时无法区别req,所以统一都设置 fishermen + cmd.set_count(affected as u32); + } return Ok(cmd); } diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 2f332d4f4..0be7f440b 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -25,6 +25,7 @@ tokio.workspace = true ctor = "0.1.23" mysql_async = "0.31.3" chrono = "0.4" +chrono-tz = { version = "0.5", default-features = false } lazy_static = "*" mysql = "*" @@ -33,6 +34,10 @@ bytes = "1.0" proptest = "1.0" +serde = { version = "1.0", features = ["derive"] } +serde_derive = "1.0.126" +serde_json = "1.0.65" + [dev-dependencies.criterion] version = "0.4" [dev-dependencies.minstant] diff --git a/tests/src/all.rs b/tests/src/all.rs index e679c18da..eaea9ce08 100644 --- a/tests/src/all.rs +++ b/tests/src/all.rs @@ -25,6 +25,7 @@ mod dns; // mod ip; mod context; mod kv; +mod kvector; mod mq; mod mysql_strategy; mod number; diff --git a/tests/src/kvector/mod.rs b/tests/src/kvector/mod.rs new file mode 100644 index 000000000..54ed3472c --- /dev/null +++ b/tests/src/kvector/mod.rs @@ -0,0 +1,103 @@ +use std::{fs::File, io::BufRead}; + +use chrono::TimeZone; +use endpoint::kv::uuid::Uuid; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] + +struct LikeByMe { + #[serde(default)] + id: usize, +} + +const MESH_LOG_PREFIX_LEN: usize = + "2024-11-08 16:01:23 [INFO] consistencyNewLikeByMeVectorList listMesh:".len(); +const TCP_LOG_PREFIX_LENT: usize = + "2024-11-08 16:01:23 [INFO] consistencyNewLikeByMeVectorList listTcp:".len(); + +#[test] +fn parse_like_by_me() { + let file_name = "tmp_data/vector.log"; + let file = File::open(file_name).unwrap(); + let reader = std::io::BufReader::new(file); + + let mut mesh_ids = Vec::new(); + let mut _tcp_ids = Vec::new(); + let mut count = 0; + const MAX_COUNT: usize = 1; + + for line in reader.lines() { + let line = line.unwrap(); + let (prefix_len, is_mesh_list) = match line.contains("listMesh") { + true => (MESH_LOG_PREFIX_LEN, true), + false => (TCP_LOG_PREFIX_LENT, false), + }; + let likes_json = &line[prefix_len..]; + let suffix = likes_json.rfind("]").unwrap() + 1; + let likes_json = &likes_json[..suffix]; + // println!("line: {}", line); + + // 顺序为先一行mesh, 后一行tcp + let ids = parse_line(likes_json); + if is_mesh_list { + mesh_ids = ids; + // println!("mesh_ids: {:?}", mesh_ids); + continue; + } else { + _tcp_ids = ids; + // println!("tcp_ids: {:?}", tcp_ids); + + let diff_in_mesh: Vec = mesh_ids + .clone() + .into_iter() + .filter(|id| !_tcp_ids.contains(id)) + .collect(); + let diff_in_tcp: Vec = _tcp_ids + .clone() + .into_iter() + .filter(|id| !mesh_ids.contains(id)) + .collect(); + + if diff_in_mesh.len() > 0 || diff_in_tcp.len() > 0 { + println!("mesh/tcp count:{}/{}", mesh_ids.len(), _tcp_ids.len()); + println!("mesh more ids:{:?}", diff_in_mesh); + println!("tcp more ids: {:?}", diff_in_tcp); + println!("mesh:{:?}", mesh_ids); + println!(" tcp:{:?}", _tcp_ids); + count += 1; + if count >= MAX_COUNT { + break; + } + + mesh_ids.clear(); + _tcp_ids.clear(); + } + continue; + } + } +} + +fn parse_line(line: &str) -> Vec { + let mut ids = Vec::new(); + let like_by_mes: Vec = serde_json::from_str(line).unwrap(); + for like in like_by_mes { + ids.push(like.id); + } + ids +} + +#[test] +fn parse_time() { + // use endpoint::kv::uuid; + let uuid = 5100423841317698 as i64; + let seconds = uuid.unix_secs(); + + use chrono_tz::Asia::Shanghai; + let t = chrono::Utc + .timestamp_opt(seconds, 0) + .unwrap() + .with_timezone(&Shanghai) + .naive_local(); + println!("{}", t); +} diff --git a/tests/src/layout.rs b/tests/src/layout.rs index 1518bb3a0..e4714ba5f 100644 --- a/tests/src/layout.rs +++ b/tests/src/layout.rs @@ -51,7 +51,7 @@ fn checkout_basic() { assert_eq!(24, size_of::()); assert_eq!( size_of::(), - size_of::() + size_of::() ); } diff --git a/tests_integration/Cargo.toml b/tests_integration/Cargo.toml index 0e4e72a88..c537e2bb3 100644 --- a/tests_integration/Cargo.toml +++ b/tests_integration/Cargo.toml @@ -17,9 +17,14 @@ function_name = "0.3.0" chrono = "0.4" url = "2.2.2" +# for redis/kvector +serde_json = { version = "1.0" } redis = { version = "0.22.0", default-features = false, features = [] } serde = { version = "1.0", features = ["derive"] } -reqwest = { version = "0.11.4", features = ["json","blocking"], default-features = false } +reqwest = { version = "0.11.4", features = [ + "json", + "blocking", +], default-features = false } [[test]] diff --git a/tests_integration/src/vector/aggregation.rs b/tests_integration/src/vector/aggregation.rs new file mode 100644 index 000000000..229a402c9 --- /dev/null +++ b/tests_integration/src/vector/aggregation.rs @@ -0,0 +1,375 @@ +use redis::{RedisError, Value}; +use std::{sync::atomic::AtomicI64, thread::sleep, time::Duration}; + +use super::byme::*; + +use crate::{ + ci::env::*, + redis_helper::*, + vector::{assist::*, RESTYPE}, +}; + +const YEAR_MONTH: &str = "2410"; +static LIKE_ID: AtomicI64 = AtomicI64::new(5078096628678703); + +// 构建一个新的like id,避免请求重复 +fn next_like_id() -> i64 { + // 每次本地测试,可以调整,ci不用 + let offset = 600; + let id = LIKE_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + offset + id +} + +#[test] +fn aggregation_vrange_base() -> Result<(), RedisError> { + let mut conn = get_conn(&RESTYPE.get_host()); + let like_by_me = LikeByMe { + uid: 7916804453, + like_id: next_like_id() + 1, + object_id: 5078096628678703, + object_type: 1, + }; + + // vrange 获取最新的10条 + let rsp = aglike_cmd_vadd(&mut conn, YEAR_MONTH, &like_by_me)?; + assert!(rsp == 2 || rsp == 3); + let rsp = aglike_cmd_vrange(&mut conn, &like_by_me, 10); + assert!(rsp.is_ok()); + + Ok(()) +} + +#[test] +fn aggregation_vrange_timeline() -> Result<(), RedisError> { + let mut conn = get_conn(&RESTYPE.get_host()); + let like_by_me = LikeByMe { + uid: 79168044531, + like_id: next_like_id() + 2, + object_id: 5078096628678703, + object_type: 1, + }; + + let rsp = aglike_cmd_vadd(&mut conn, YEAR_MONTH, &like_by_me)?; + assert!(rsp == 2 || rsp == 3); + + // vrange 获取最新的10条 + let rsp = aglike_cmd_vrange_timeline(&mut conn, YEAR_MONTH, &like_by_me, 10)?; + println!("++ rsp:{:?}", rsp); + Ok(()) +} + +#[test] +fn aggregation_vrange_si() -> Result<(), RedisError> { + let mut conn = get_conn(&RESTYPE.get_host()); + let like_by_me = LikeByMe { + uid: 1761220674, + like_id: next_like_id() + 3, + object_id: 5078096628678703, + object_type: 1, + }; + + let rsp = aglike_cmd_vadd(&mut conn, YEAR_MONTH, &like_by_me)?; + assert!(rsp == 2 || rsp == 3); + + // 获取最新若干条si记录 + let rsp = aglike_cmd_vrange_si(&mut conn, &like_by_me); + println!("vrange.si rsp:{:?}", rsp); + Ok(()) +} + +#[test] +fn aggregation_vcard() -> Result<(), RedisError> { + let mut conn = get_conn(&RESTYPE.get_host()); + let like_by_me = LikeByMe { + uid: 7916804453, + like_id: next_like_id() + 4, + object_id: 5078096628678703, + object_type: 1, + }; + + let rsp = aglike_cmd_vadd(&mut conn, YEAR_MONTH, &like_by_me)?; + assert!(rsp == 2 || rsp == 3); + + let by_me_si = aglike_cmd_vcard(&mut conn, like_by_me.uid); + println!("si: {:?}", by_me_si); + Ok(()) +} + +#[test] +fn aggregation_vget() -> Result<(), RedisError> { + let mut conn = get_conn(&RESTYPE.get_host()); + let like_by_me = LikeByMe { + uid: 791680445, + like_id: next_like_id() + 5, + object_id: 5078096628678703, + object_type: 1, + }; + + // vadd 加一个id较大的like_by_me,同时更新si、timeline + let rsp = aglike_cmd_vadd(&mut conn, YEAR_MONTH, &like_by_me)?; + assert!(rsp == 2 || rsp == 3); + + sleep(Duration::from_secs(2)); + + // vget 获取某个月份最新的1条 + let rsp = aglike_cmd_vget(&mut conn, YEAR_MONTH, &like_by_me)?; + + println!("++ vget rsp:{:?}", rsp); + + // 删除新插入的记录 + let rsp = aglike_cmd_vdel(&mut conn, YEAR_MONTH, &like_by_me)?; + assert!(rsp == 2 || rsp == 1); + Ok(()) +} + +#[test] +fn aggregation_vadd_base() -> Result<(), RedisError> { + let mut conn = get_conn(&RESTYPE.get_host()); + let like_by_me = LikeByMe { + uid: 791680445, + like_id: next_like_id() + 6, + object_id: 5078096628678703, + object_type: 1, + }; + + // 预先删除新插入的记录 + let rsp = aglike_cmd_vdel(&mut conn, YEAR_MONTH, &like_by_me)?; + assert!(rsp == 0 || rsp == 2 || rsp == 3); + + // vadd 加一个id较大的like_by_me,同时更新si、timeline + let rsp = aglike_cmd_vadd(&mut conn, YEAR_MONTH, &like_by_me)?; + println!("+++ vadd rsp:{:?}", rsp); + assert!(rsp == (1 + 1) || rsp == (2 + 1)); + + // vrange 获取最新的1条 + let like_via_vget = aglike_cmd_vget(&mut conn, YEAR_MONTH, &like_by_me); + println!("++ rsp:{:?}", like_via_vget); + + assert_eq!( + Ok(Value::Bulk(vec![ + Value::Bulk(vec![ + Value::Status("uid".to_string().into()), + Value::Status("object_type".to_string().into()), + Value::Status("like_id".to_string().into()), + Value::Status("object_id".to_string().into()), + ]), + Value::Bulk(vec![ + Value::Int(like_by_me.uid), + Value::Int(like_by_me.object_type), + Value::Int(like_by_me.like_id), + Value::Int(like_by_me.object_id), + ]), + ])), + like_via_vget + ); + + // 删除新插入的记录 + let rsp = aglike_cmd_vdel(&mut conn, YEAR_MONTH, &like_by_me)?; + assert!(rsp == 2 || rsp == 3); + + Ok(()) +} + +#[test] +fn aggregation_vadd_timeline() -> Result<(), RedisError> { + let mut conn = get_conn(&RESTYPE.get_host()); + let like_by_me = LikeByMe { + uid: 791680445, + like_id: next_like_id() + 7, + object_id: 5078096628678703, + object_type: 1, + }; + + let rsp = aglike_cmd_vadd_timeline(&mut conn, YEAR_MONTH, &like_by_me); + println!("vadd timeline:{:?}", rsp); + + // 删除新插入的记录 + let rsp = aglike_cmd_vdel_timeline(&mut conn, YEAR_MONTH, &like_by_me)?; + assert!(rsp == 1); + + Ok(()) +} + +#[test] +fn aggregation_vadd_si() -> Result<(), RedisError> { + let mut conn = get_conn(&RESTYPE.get_host()); + let si = LikeByMeSi { + uid: 7916804459, + object_type: 3, + start_date: "2024-09-01".to_string(), + count: 3, + }; + + // vadd 加一个id较大的like_by_me,同时更新si、timeline + let rsp = aglike_cmd_vadd_si(&mut conn, YEAR_MONTH, &si)?; + println!("+++ aggregation add.si rsp:{:?}", rsp); + assert!(rsp == 1 || rsp == 2); + + let by_me_si = aglike_cmd_vcard(&mut conn, si.uid)?; + print!("after vadd si, vcard now: {:?}", by_me_si); + + // 删除新插入的记录 + let rsp = aglike_cmd_vdel_si(&mut conn, YEAR_MONTH, &si)?; + assert!(rsp == 1); + + let by_me_si = aglike_cmd_vcard(&mut conn, si.uid)?; + print!("after vdel si, vcard now: {:?}", by_me_si); + + Ok(()) +} + +/** + * careful:目前不支持aggregation下的vupdate,因为不知道si如何更新count技术 + * */ +#[test] +fn aggregation_vupdate_base() -> Result<(), RedisError> { + let mut conn = get_conn(&RESTYPE.get_host()); + let like_by_me = LikeByMe { + uid: 791680445, + like_id: next_like_id() + 8, + object_id: 5078096628678703, + object_type: 1, + }; + + // careful:目前不支持aggregation下的vupdate,因为不知道si如何更新count技术 + let rsp = aglike_cmd_vupdate(&mut conn, YEAR_MONTH, &like_by_me); + assert!(rsp.is_err()); + println!("vupdate rsp should be err: {:?}", rsp); + + Ok(()) +} + +#[test] +fn aggregation_vupdate_timeline() -> Result<(), RedisError> { + let mut conn = get_conn(&RESTYPE.get_host()); + let like_id = next_like_id() + 9; + let like_by_me = LikeByMe { + uid: 791680445, + like_id: like_id, + object_id: 5078096628678703, + object_type: 1, + }; + let like_by_me2 = LikeByMe { + uid: 791680445, + like_id: like_id, + object_id: 5078096628678703, + object_type: 2, + }; + + let rsp = aglike_cmd_vdel_timeline(&mut conn, YEAR_MONTH, &like_by_me)?; + assert!(rsp == 0 || rsp == 1); + println!("vdel rsp: {}", rsp); + + let rsp = aglike_cmd_vdel_timeline(&mut conn, YEAR_MONTH, &like_by_me2)?; + assert!(rsp == 0 || rsp == 1); + println!("vdel rsp: {}", rsp); + + let rsp = aglike_cmd_vadd_timeline(&mut conn, YEAR_MONTH, &like_by_me)?; + assert!(rsp == 1 || rsp == 2); + println!("aggregation vadd timeline rsp: {}", rsp); + + let rsp = aglike_cmd_vupdate_timeline(&mut conn, YEAR_MONTH, &like_by_me, &like_by_me2)?; + println!("vupdate timeline rsp: {}", rsp); + assert!(rsp == 1 || rsp == 2); + + let rsp = aglike_cmd_vdel_timeline(&mut conn, YEAR_MONTH, &like_by_me2)?; + println!("vdel rsp: {}", rsp); + assert!(rsp == 1); + + Ok(()) +} + +#[test] +fn aggregation_vupdate_si() -> Result<(), RedisError> { + let mut conn = get_conn(&RESTYPE.get_host()); + let si = LikeByMeSi { + uid: 7916804459, + object_type: 3, + start_date: "2024-09-01".to_string(), + count: 2, + }; + + // vadd 加一个id较大的like_by_me,同时更新si、timeline + let rsp = aglike_cmd_vadd_si(&mut conn, YEAR_MONTH, &si)?; + assert!(rsp == 2 || rsp == 1); + + let rsp = aglike_cmd_vupdate_si(&mut conn, YEAR_MONTH, &si)?; + + println!("+++ vupdate.si/{} rsp:{:?}", si.count, rsp); + assert!(rsp == 1 || rsp == 2); + + let by_me_si = aglike_cmd_vcard(&mut conn, si.uid)?; + print!("after vupdate si, vcard now: {:?}", by_me_si); + + println!("si: {:?}", by_me_si); + + Ok(()) +} + +#[test] +fn aggregation_vdel_base() -> Result<(), RedisError> { + let mut conn = get_conn(&RESTYPE.get_host()); + let like_by_me = LikeByMe { + uid: 791680445, + like_id: next_like_id() + 10, + object_id: 5078096628678703, + object_type: 1, + }; + + // 删除新插入的记录 + let rsp = aglike_cmd_vdel(&mut conn, YEAR_MONTH, &like_by_me)?; + assert!(rsp <= 3); + + let vadd_rsp = aglike_cmd_vadd(&mut conn, YEAR_MONTH, &like_by_me)?; + println!("+++ vadd rsp:{:?}", vadd_rsp); + assert!(vadd_rsp == (1 + 1) || vadd_rsp == (2 + 1)); + + // 删除新插入的记录 + let rsp = aglike_cmd_vdel(&mut conn, YEAR_MONTH, &like_by_me)?; + assert!(rsp == 2 || rsp == 3); + Ok(()) +} + +#[test] +fn aggregation_vdel_timeline() -> Result<(), RedisError> { + let mut conn = get_conn(&RESTYPE.get_host()); + let like_by_me = LikeByMe { + uid: 791680445, + like_id: next_like_id() + 11, + object_id: 5078096628678703, + object_type: 1, + }; + + let vadd_rsp = aglike_cmd_vadd_timeline(&mut conn, YEAR_MONTH, &like_by_me)?; + println!("+++ vadd rsp:{:?}", vadd_rsp); + assert!(vadd_rsp == 1); + + let vdel_rsp = aglike_cmd_vdel_timeline(&mut conn, YEAR_MONTH, &like_by_me)?; + println!("+++ vdel rsp:{:?}", vdel_rsp); + assert_eq!(vdel_rsp, 1); + Ok(()) +} + +#[test] +fn aggregation_vdel_si() -> Result<(), RedisError> { + let mut conn = get_conn(&RESTYPE.get_host()); + let si = LikeByMeSi { + uid: 7916804459, + object_type: 3, + start_date: "2024-10-01".to_string(), + count: 2, + }; + + let rsp = aglike_cmd_vadd_si(&mut conn, YEAR_MONTH, &si)?; + assert!(rsp == 2 || rsp == 3); + + // vadd 加一个id较大的like_by_me,同时更新si、timeline + let rsp: Result = aglike_cmd_vdel_si(&mut conn, YEAR_MONTH, &si); + println!("+++ vdel.si/{} rsp:{:?}", si.count, rsp); + let rsp = rsp.unwrap(); + assert!(rsp == 1 || rsp == 2); + + let value = aglike_cmd_vcard(&mut conn, si.uid)?; + print!("after vdel si, vcard now: {:?}", value); + Ok(()) +} diff --git a/tests_integration/src/vector/assist.rs b/tests_integration/src/vector/assist.rs new file mode 100644 index 000000000..78a243453 --- /dev/null +++ b/tests_integration/src/vector/assist.rs @@ -0,0 +1,306 @@ +/** + * 存放访问mesh的指令方法,方便服复用 + */ +use redis::{Connection, RedisError, Value}; + +use super::byme::{LikeByMe, LikeByMeSi}; + +pub(super) fn aglike_cmd_vrange( + conn: &mut Connection, + like_by_me: &LikeByMe, + count: i64, +) -> Result { + let rsp: Result = redis::cmd("VRANGE") + .arg(format!("{}", like_by_me.uid)) + .arg("field") + .arg("like_id,object_id,object_type") + .arg("where") + .arg("object_type") + .arg("in") + .arg("0,1,100") + .arg("order") + .arg("desc") + .arg("like_id") + .arg("limit") + .arg("0") + .arg(count) + .query(conn); + println!("cmd vrange rsp: {:?}", rsp); + rsp +} + +pub(super) fn aglike_cmd_vrange_timeline( + conn: &mut Connection, + year_month: &str, + like_by_me: &LikeByMe, + count: i64, +) -> Result { + let rsp: Result = redis::cmd("VRANGE.timeline") + .arg(format!("{},{}", like_by_me.uid, year_month)) + .arg("field") + .arg("like_id,object_id,object_type") + .arg("where") + .arg("object_type") + .arg("in") + .arg("0,1,100") + .arg("order") + .arg("desc") + .arg("like_id") + .arg("limit") + .arg("0") + .arg(count) + .query(conn); + println!("cmd vrange.timeline rsp: {:?}", rsp); + rsp +} + +pub(super) fn aglike_cmd_vrange_si( + conn: &mut Connection, + like_by_me: &LikeByMe, +) -> Result { + let rsp: Result = redis::cmd("VRANGE.si") + .arg(format!("{}", like_by_me.uid)) + .arg("field") + .arg("uid,start_date,sum(count)") + .arg("where") + .arg("object_type") + .arg("in") + .arg("0,1,100") + .arg("group") + .arg("by") + .arg("uid,start_date") + .arg("order") + .arg("desc") + .arg("start_date") + .query(conn); + println!("cmd vrange.si rsp: {:?}", rsp); + rsp +} + +pub(super) fn aglike_cmd_vcard(conn: &mut Connection, uid: i64) -> Result { + // let mut conn = get_conn(&RESTYPE.get_host()); + let rsp: Result = redis::cmd("vcard") + .arg(format!("{}", uid)) + .arg("field") + .arg("object_type") + .arg("where") + .arg("object_type") + .arg("in") + .arg("1,2,3,100") + .arg("group") + .arg("by") + .arg("object_type") + .query(conn); + + println!("cmd vcard rsp: {:?}", rsp); + rsp +} + +pub(super) fn aglike_cmd_vadd( + conn: &mut Connection, + year_month: &str, + like_by_me: &LikeByMe, +) -> Result { + // vadd 加一个id较大的like_by_me,同时更新si、timeline + let rsp: Result = redis::cmd("vadd") + .arg(format!("{},{}", like_by_me.uid, year_month)) + .arg("object_type") + .arg(like_by_me.object_type) + .arg("like_id") + .arg(like_by_me.like_id) + .arg("object_id") + .arg(like_by_me.object_id) + .query(conn); + println!("cmd vadd rsp: {:?}", rsp); + rsp +} + +pub(super) fn aglike_cmd_vadd_timeline( + conn: &mut Connection, + year_month: &str, + like_by_me: &LikeByMe, +) -> Result { + // vadd 加一个id较大的like_by_me,同时更新si、timeline + let rsp: Result = redis::cmd("vadd.timeline") + .arg(format!("{},{}", like_by_me.uid, year_month)) + .arg("object_type") + .arg(like_by_me.object_type) + .arg("like_id") + .arg(like_by_me.like_id) + .arg("object_id") + .arg(like_by_me.object_id) + .query(conn); + println!("cmd vadd.timeline rsp: {:?}", rsp); + rsp +} + +pub(super) fn aglike_cmd_vadd_si( + conn: &mut Connection, + year_month: &str, + si: &LikeByMeSi, +) -> Result { + let rsp: Result = redis::cmd("vadd.si") + .arg(format!("{},{}", si.uid, year_month)) + .arg("object_type") + .arg(si.object_type) + .arg("count") + .arg(si.count) + .query(conn); + println!("cmd vadd.si rsp: {:?}", rsp); + rsp +} + +// 在aggregation策略下,这个方法目前是返回error的 +pub(super) fn aglike_cmd_vupdate( + conn: &mut Connection, + year_month: &str, + like_by_me: &LikeByMe, +) -> Result { + let rsp: Result = redis::cmd("vupdate") + .arg(format!("{},{}", like_by_me.uid, year_month)) + .arg("object_id") + .arg(like_by_me.object_id) + .arg("where") + .arg("object_type") + .arg("=") + .arg(like_by_me.object_type) + .arg("like_id") + .arg("=") + .arg(like_by_me.like_id) + .arg("object_id") + .arg("=") + .arg(like_by_me.object_id) + .query(conn); + println!("cmd vupdate rsp: {:?}", rsp); + rsp +} + +/** + * 修改object_type + */ +pub(super) fn aglike_cmd_vupdate_timeline( + conn: &mut Connection, + year_month: &str, + like_by_me_origin: &LikeByMe, + like_by_me_new: &LikeByMe, +) -> Result { + let rsp: Result = redis::cmd("vupdate.timeline") + .arg(format!("{},{}", like_by_me_origin.uid, year_month)) + .arg("object_type") + .arg(like_by_me_new.object_type) + .arg("where") + .arg("object_type") + .arg("=") + .arg(like_by_me_origin.object_type) + .arg("like_id") + .arg("=") + .arg(like_by_me_origin.like_id) + .arg("object_id") + .arg("=") + .arg(like_by_me_origin.object_id) + .query(conn); + println!("cmd vupdate.timeline rsp: {:?}", rsp); + rsp +} + +pub(super) fn aglike_cmd_vupdate_si( + conn: &mut Connection, + year_month: &str, + si: &LikeByMeSi, +) -> Result { + let rsp: Result = redis::cmd("vupdate.si") + .arg(format!("{},{}", si.uid, year_month)) + .arg("count") + .arg(si.count) + .arg("where") + .arg("object_type") + .arg("=") + .arg(si.object_type) + .query(conn); + println!("cmd vupdate.si rsp: {:?}", rsp); + rsp +} + +pub(super) fn aglike_cmd_vdel( + conn: &mut Connection, + year_month: &str, + like_by_me: &LikeByMe, +) -> Result { + let rsp: Result = redis::cmd("vdel") + .arg(format!("{},{}", like_by_me.uid, year_month)) + .arg("where") + .arg("like_id") + .arg("=") + .arg(like_by_me.like_id) + .arg("object_id") + .arg("=") + .arg(like_by_me.object_id) + .arg("object_type") + .arg("=") + .arg(like_by_me.object_type) + .query(conn); + println!("cmd vdel rsp: {:?}", rsp); + rsp +} + +pub(super) fn aglike_cmd_vdel_timeline( + conn: &mut Connection, + year_month: &str, + like_by_me: &LikeByMe, +) -> Result { + let rsp: Result = redis::cmd("vdel.timeline") + .arg(format!("{},{}", like_by_me.uid, year_month)) + .arg("where") + .arg("object_id") + .arg("=") + .arg(like_by_me.object_id) + .arg("object_type") + .arg("=") + .arg(like_by_me.object_type) + .arg("like_id") + .arg("=") + .arg(like_by_me.like_id) + .query(conn); + println!("cmd vdel.timeline rsp: {:?}", rsp); + rsp +} + +pub(super) fn aglike_cmd_vdel_si( + conn: &mut Connection, + year_month: &str, + si: &LikeByMeSi, +) -> Result { + let rsp: Result = redis::cmd("vdel.si") + .arg(format!("{},{}", si.uid, year_month)) + .arg("where") + .arg("object_type") + .arg("=") + .arg(si.object_type) + .query(conn); + println!("cmd vdel.si rsp: {:?}", rsp); + rsp +} + +pub(super) fn aglike_cmd_vget( + conn: &mut Connection, + year_month: &str, + like_by_me: &LikeByMe, +) -> Result { + let rsp: Result = redis::cmd("vget") + .arg(format!("{},{}", like_by_me.uid, year_month)) + .arg("field") + .arg("uid,object_type,like_id,object_id") + .arg("where") + .arg("like_id") + .arg("=") + .arg(like_by_me.like_id) + .arg("object_type") + .arg("=") + .arg(like_by_me.object_type) + .arg("object_id") + .arg("=") + .arg(like_by_me.object_id) + .query(conn); + println!("cmd vget rsp: {:?}", rsp); + rsp +} diff --git a/tests_integration/src/vector/byme.rs b/tests_integration/src/vector/byme.rs new file mode 100644 index 000000000..10740f041 --- /dev/null +++ b/tests_integration/src/vector/byme.rs @@ -0,0 +1,118 @@ +use std::fmt::{Display, Formatter}; + +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Default, Serialize, Deserialize)] +pub(crate) struct LikeByMe { + pub(crate) uid: i64, + pub(crate) like_id: i64, + pub(crate) object_id: i64, + pub(crate) object_type: i64, +} + +#[derive(Debug, Serialize, Deserialize)] + +pub(crate) struct LikeByMeSi { + pub(crate) uid: i64, + pub(crate) object_type: i64, + pub(crate) start_date: String, + pub(crate) count: i64, +} + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct LikeByMeResponse { + pub(crate) header: LikeByMeMeta, + pub(crate) body: Vec, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub(crate) struct LikeByMeMeta { + like_id: String, + object_id: String, + object_type: String, +} + +// bulk(bulk(status("object_type"), status("count")), bulk(int(1), int(17)))) +#[derive(Debug, Default, Serialize, Deserialize)] + +pub(crate) struct LikeByMeVcardResponse { + pub(crate) header: LikeByMeVcardMeta, + pub(crate) body: LikeByMeVcard, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub(crate) struct LikeByMeVcard { + pub(crate) object_type: i64, + pub(crate) count: i64, +} +#[derive(Debug, Default, Serialize, Deserialize)] +pub(crate) struct LikeByMeSiMeta { + pub(crate) uid: String, + pub(crate) object_type: String, + pub(crate) start_date: String, + pub(crate) count: String, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub(crate) struct LikeByMeVcardMeta { + pub(crate) object_type: String, + pub(crate) count: String, +} + +impl Display for LikeByMeResponse { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "LikeByMeResponse {{ header: {:?}, body: {:?} }}", + self.header, self.body + ) + } +} + +impl Display for LikeByMeMeta { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "LikeByMeMeta {{ like_id: {}, object_id: {}, object_type: {} }}", + self.like_id, self.object_id, self.object_type + ) + } +} + +impl Display for LikeByMe { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "LikeByMeResponseBody {{ like_id: {}, object_id: {}, object_type: {} }}", + self.like_id, self.object_id, self.object_type + ) + } +} + +impl Display for LikeByMeVcardResponse { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "LikeByMeSiResponse {{ header: {:?}, body: {:?} }}", + self.header, self.body + ) + } +} +impl Display for LikeByMeSiMeta { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "LikeByMeSiMeta {{ uid: {}, object_type: {}, start_date: {}, count: {} }}", + self.uid, self.object_type, self.start_date, self.count + ) + } +} +impl Display for LikeByMeSi { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "LikeByMeSi {{ uid: {}, object_type: {}, start_date: {}, count: {} }}", + self.uid, self.object_type, self.start_date, self.count + ) + } +} diff --git a/tests_integration/src/vector/mod.rs b/tests_integration/src/vector/mod.rs index 98f587e89..047713c4c 100644 --- a/tests_integration/src/vector/mod.rs +++ b/tests_integration/src/vector/mod.rs @@ -1,12 +1,16 @@ use crate::ci::env::*; use crate::redis_helper::*; +use byme::LikeByMe; #[allow(unused)] use function_name::named; use redis::Value; +mod aggregation; +mod byme; +mod assist; -const RESTYPE: &str = "vector"; -const CMD_VGET: &str = "vget"; -const CMD_VRANGE: &str = "vrange"; +pub(crate) const RESTYPE: &str = "vector"; +pub(crate) const CMD_VGET: &str = "vget"; +pub(crate) const CMD_VRANGE: &str = "vrange"; #[test] #[named] @@ -123,7 +127,7 @@ fn vrange_or_vget_1_with_1rows(cmd: &str) { Ok(Value::Bulk(vec![ Value::Bulk(vec![ Value::Status("uid".to_string()), - Value::Status("object_type".to_string()) + Value::Status("object_type".to_string()), ]), Value::Bulk(vec![ Value::Int(like_by_me.uid), @@ -173,7 +177,7 @@ fn vrange_or_vget_2_with_2rows(cmd: &str) { Ok(Value::Bulk(vec![ Value::Bulk(vec![ Value::Status("uid".to_string()), - Value::Status("object_type".to_string()) + Value::Status("object_type".to_string()), ]), Value::Bulk(vec![ Value::Int(like_by_me1.uid), @@ -529,13 +533,6 @@ fn vcard() { assert_eq!(rsp, Ok(3)); } -struct LikeByMe { - uid: i64, - like_id: i64, - object_id: i64, - object_type: i64, -} - #[test] fn vadd() { let mut con = get_conn(&RESTYPE.get_host());