Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
20f2aef
keyslen
viciousstar Aug 14, 2024
3ddba2e
sqlinfo
viciousstar Aug 14, 2024
2376cfc
refactor attachment for retrieve and store of kvector
hustfisher Aug 14, 2024
7f2b0eb
topo 暂时回退,只放stub,避免冲突
hustfisher Aug 14, 2024
c8ef918
对add/del记录si&timeline的总affected rows,并返回sdk
hustfisher Aug 14, 2024
6a6b9ea
Merge branch 'main_kvector_hybrid_202408' into main_kvector_hybrid_20…
hustfisher Aug 15, 2024
5d78637
Merge pull request #486 from weibocom/main_kvector_hybrid_202407
hustfisher Aug 15, 2024
b23c45e
update sql_info in kvector
hustfisher Aug 15, 2024
02ef72c
refactor attachment for retrieve and store of kvector
hustfisher Aug 14, 2024
96bb1b8
topo 暂时回退,只放stub,避免冲突
hustfisher Aug 14, 2024
9ddbb08
对add/del记录si&timeline的总affected rows,并返回sdk
hustfisher Aug 14, 2024
f89fc7a
update sql_info in kvector
hustfisher Aug 15, 2024
016b280
Merge branch 'main_kvector_hybrid_202408' of https://github.com/weibo…
hustfisher Aug 15, 2024
3fde1e2
vadd
viciousstar Aug 15, 2024
3246298
yymm subkey
viciousstar Aug 16, 2024
ed25354
将response中header改为option,避免大部分场景下不必要的内存分配
hustfisher Aug 16, 2024
59544d0
Merge branch 'main_kvector_hybrid_202408' of https://github.com/weibo…
hustfisher Aug 16, 2024
aaaabdd
聚合操作的重试和普通重试逻辑区分开
viciousstar Aug 19, 2024
9a657d1
master runs and date
viciousstar Aug 19, 2024
c8c2f6c
get next round
viciousstar Aug 19, 2024
a228fa1
si del
viciousstar Aug 20, 2024
59aba0d
del 需提供count type
viciousstar Aug 20, 2024
7100ab4
Revert "将response中header改为option,避免大部分场景下不必要的内存分配"
viciousstar Aug 20, 2024
b4aaa34
第一轮时候初始化
viciousstar Aug 21, 2024
040d793
get date 区分命令
viciousstar Aug 21, 2024
ad8081f
check key len
viciousstar Aug 22, 2024
0c83723
yymm统一为常量
viciousstar Aug 22, 2024
b7bf759
最后一个key是日期
viciousstar Aug 22, 2024
94814e8
校验get date
viciousstar Aug 22, 2024
f9c88b5
round 后获取
viciousstar Aug 22, 2024
17aae00
update kvector config
hustfisher Sep 5, 2024
a23866b
command header 改为Option,避免不必要的内存分配
hustfisher Sep 6, 2024
9318baa
对于kvector请求,attachment中增加route,用于指示请求路线是aggregation、timeline还是si等
hustfisher Sep 9, 2024
98319d1
支持aggregation的单库表、多库表的多姿势查询
hustfisher Sep 13, 2024
b8b3757
update aggregation request
hustfisher Sep 19, 2024
1d59f22
通过AGGENGATION_STORE设置是否支持聚合store指令
hustfisher Sep 19, 2024
b237c46
对空响应的聚合查询直接提前返回
hustfisher Sep 20, 2024
80114f9
update commandType
hustfisher Sep 23, 2024
bde5242
rm unused imports
hustfisher Sep 23, 2024
0242e14
支持vupdatesi,部分业务需要
hustfisher Sep 23, 2024
152e153
update for si request
hustfisher Sep 24, 2024
d04cd9d
支持si count增加任意计数
hustfisher Sep 24, 2024
b30d59b
update for vcard and vrange.si
hustfisher Oct 10, 2024
a074be8
update kvector vadd for duplicat key
hustfisher Oct 17, 2024
e8358f1
add tests
hustfisher Oct 18, 2024
87b7512
add tests
hustfisher Oct 18, 2024
451849f
1. 支持vget;2 vdel/vupdate增加防护策略:不带where条件请求判定为异常请求
hustfisher Oct 24, 2024
daeeb68
update tests
hustfisher Oct 24, 2024
53005c1
rm unused codes
hustfisher Oct 24, 2024
6ca7fd4
update tersts
hustfisher Oct 28, 2024
c6cf4fd
add vcard test for aggregation
hustfisher Oct 28, 2024
24257ee
refactor test for ci
hustfisher Oct 29, 2024
a175aaa
refactor aggregation tests
hustfisher Nov 6, 2024
c529362
update tests
hustfisher Nov 26, 2024
d6c49eb
update tests
hustfisher Nov 26, 2024
541ac9b
rm unused files
hustfisher Nov 26, 2024
a6f67d8
update test
hustfisher Dec 20, 2024
c1be6a3
vector聚合访问,中间某轮返回空,不提前终止后续轮次的访问
parabala Dec 24, 2024
371a17a
Merge pull request #498 from weibocom/vector_continue
parabala Dec 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion endpoint/src/kv/kvtime.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use super::{strategy::Postfix, uuid::*};
use super::uuid::*;
use chrono::{Datelike, NaiveDate};
use core::fmt::Write;
use ds::RingSlice;
use protocol::kv::Strategy;
use protocol::vector::Postfix;
use sharding::hash::Hash;
use sharding::{distribution::DBRange, hash::Hasher};

Expand Down
26 changes: 6 additions & 20 deletions endpoint/src/kv/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,10 @@ use super::kvtime::KVTime;
use ds::RingSlice;

use protocol::kv::Strategy;
use protocol::vector::Postfix;
use sharding::distribution::DBRange;
use sharding::hash::Hasher;

#[derive(Debug, Clone)]
pub enum Postfix {
YYMM,
YYMMDD,
INDEX,
}

impl Into<Postfix> for &str {
fn into(self) -> Postfix {
match self.to_lowercase().as_str() {
"yymm" => Postfix::YYMM,
_ => Postfix::YYMMDD,
}
}
}

#[derive(Debug, Clone)]
pub enum Strategist {
KVTime(KVTime),
Expand Down Expand Up @@ -75,13 +60,14 @@ impl Default for Strategist {
}

impl Strategist {
pub fn try_from(ns: &KvNamespace) -> Self {
Self::KVTime(KVTime::new(
pub fn try_from(ns: &KvNamespace) -> Option<Self> {
let table_postfix = ns.basic.table_postfix.as_str().try_into().ok()?;
Some(Self::KVTime(KVTime::new(
ns.basic.db_name.clone(),
ns.basic.db_count,
//此策略默认所有年都有同样的shard,basic也只配置了一项,也暗示了这个默认
ns.backends.iter().next().unwrap().1.len() as u32,
ns.basic.table_postfix.as_str().into(),
))
table_postfix,
)))
}
}
7 changes: 5 additions & 2 deletions endpoint/src/kv/topo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ where
(ctx.idx as usize, &shard.master)
}
};
ctx.idx = idx as u16;
ctx.idx = idx as u8;
ctx.runs += 1;
// 只重试一次,重试次数过多,可能会导致雪崩。如果不重试,现在的配额策略在当前副本也只会连续发送四次请求,问题也不大
let try_next = ctx.runs == 1;
Expand Down Expand Up @@ -163,7 +163,10 @@ where
}
fn update(&mut self, namespace: &str, cfg: &str) {
if let Some(ns) = KvNamespace::try_from(cfg) {
self.strategist = Strategist::try_from(&ns);
let Some(strategist) = Strategist::try_from(&ns) else {
return;
};
self.strategist = strategist;
self.cfg.update(namespace, ns);
}
}
Expand Down
99 changes: 85 additions & 14 deletions endpoint/src/vector/batch.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use super::strategy::Postfix;
use chrono::{Datelike, NaiveDate};
use chrono_tz::Tz;
use core::fmt::Write;
use ds::RingSlice;
use protocol::Error;
use protocol::{
vector::{CommandType, KeysType, Postfix},
Error, DATE_YYMM, DATE_YYMMDD,
};
use sharding::{distribution::DBRange, hash::Hasher};

#[derive(Clone, Debug)]
pub struct Batch {
pub struct Aggregation {
db_prefix: String,
table_prefix: String,
table_postfix: Postfix,
Expand All @@ -18,7 +19,7 @@ pub struct Batch {
si: Si,
}

impl Batch {
impl Aggregation {
pub fn new_with_db(
db_prefix: String,
table_prefix: String,
Expand Down Expand Up @@ -63,11 +64,6 @@ impl Batch {
&self.hasher
}

pub fn get_date(&self, _: &[RingSlice]) -> Result<NaiveDate, Error> {
let now = chrono::Utc::now().with_timezone(&Tz::Asia__Shanghai);
Ok(NaiveDate::from_ymd_opt(now.year(), now.month(), now.day()).unwrap())
}

pub fn write_dname_with_hash(&self, buf: &mut impl Write, hash: i64) {
let db_idx: usize = self.distribution.db_idx(hash);
let _ = write!(buf, "{}_{}", self.db_prefix, db_idx);
Expand Down Expand Up @@ -101,14 +97,44 @@ impl Batch {
self.si.write_database_table(buf, hash)
}

pub(crate) fn condition_keys(&self) -> Box<dyn Iterator<Item = Option<&String>> + '_> {
Box::new(self.keys_name.iter().map(|x| Some(x)))
}

pub(crate) fn keys(&self) -> &[String] {
&self.keys_name
}

pub fn get_date(&self, cmd: CommandType, keys: &[RingSlice]) -> Result<NaiveDate, Error> {
match cmd {
CommandType::VRange | CommandType::VCard | CommandType::VRangeSi => {
Ok(NaiveDate::default())
}
//相比vrange多了一个日期key
_ => {
let date = keys.last().unwrap();
let ymd = match self.keys_name.last().unwrap().as_str() {
DATE_YYMM => (
date.try_str_num(0..0 + 2)
.ok_or(Error::RequestProtocolInvalid)? as u16
+ 2000,
date.try_str_num(2..2 + 2)
.ok_or(Error::RequestProtocolInvalid)? as u16,
1,
),
DATE_YYMMDD => (
date.try_str_num(0..0 + 2)
.ok_or(Error::RequestProtocolInvalid)? as u16
+ 2000,
date.try_str_num(2..2 + 2)
.ok_or(Error::RequestProtocolInvalid)? as u16,
date.try_str_num(4..4 + 2)
.ok_or(Error::RequestProtocolInvalid)? as u16,
),
_ => (0, 0, 0),
};
NaiveDate::from_ymd_opt(ymd.0.into(), ymd.1.into(), ymd.2.into())
.ok_or(Error::RequestProtocolInvalid)
}
}
}

// pub(crate) fn get_next_date(&self, year: u16, month: u8) -> NaiveDate {
// if month == 1 {
// return NaiveDate::from_ymd_opt((year - 1).into(), 12, 1).unwrap();
Expand All @@ -124,6 +150,51 @@ impl Batch {
pub(crate) fn si_cols(&self) -> &[String] {
&self.si_cols
}

pub(crate) fn keys_with_type(&self) -> Box<dyn Iterator<Item = KeysType> + '_> {
Box::new(
self.keys_name
.iter()
.map(|key_name| match key_name.as_str() {
DATE_YYMM | DATE_YYMMDD => KeysType::Time,
&_ => KeysType::Keys(key_name),
}),
)
}

//校验动态信息,后续有重复校验的后面延迟校验
//上行、xx.timeline请求多带一个日期key,日期key固定在最后
pub(crate) fn check_vector_cmd(
&self,
vcmd: &protocol::vector::VectorCmd,
) -> protocol::Result<()> {
match vcmd.cmd {
CommandType::VRange | CommandType::VCard | CommandType::VRangeSi => {
if vcmd.keys.len() != self.keys().len() - 1 {
return Err(Error::RequestProtocolInvalid);
}
Ok(())
}
//相比vrange多了一个日期key
CommandType::VGet
| CommandType::VAdd
| CommandType::VDel
| CommandType::VUpdate
| CommandType::VAddTimeline
| CommandType::VDelTimeline
| CommandType::VRangeTimeline
| CommandType::VUpdateTimeline
| CommandType::VAddSi
| CommandType::VUpdateSi
| CommandType::VDelSi => {
if vcmd.keys.len() != self.keys().len() {
return Err(Error::RequestProtocolInvalid);
}
Ok(())
}
CommandType::Unknown => panic!("not sup {:?}", vcmd.cmd),
}
}
}

#[derive(Clone, Debug)]
Expand Down
Loading