Skip to content

Commit 78a3bf5

Browse files
chore: refactor geoip, update deps
Signed-off-by: Henry Gressmann <[email protected]>
1 parent 4eb65bf commit 78a3bf5

File tree

18 files changed

+186
-202
lines changed

18 files changed

+186
-202
lines changed

Cargo.lock

Lines changed: 58 additions & 69 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,10 @@ path="src/main.rs"
1616

1717
[dependencies]
1818
# async/concurrency
19+
arc-swap="1.7"
1920
tokio={version="1.48", default-features=false, features=["macros", "rt-multi-thread", "signal"]}
2021
tokio-util={version="0.7", features=["io"]}
2122
futures-lite={version="2.6", default-features=false, features=["alloc"]}
22-
crossbeam-utils="0.8"
23-
crossbeam-channel="0.5"
2423
quick_cache={version="0.6", features=["parking_lot", "ahash"]}
2524

2625
# encoding
@@ -63,7 +62,8 @@ reqwest={version="0.12", default-features=false, features=[
6362
]}
6463
rustls={version="0.23", features=["aws_lc_rs"]}
6564

66-
duckdb={version="1.4", features=["buildtime_bindgen", "bundled", "chrono", "r2d2"]} # database
65+
# database
66+
duckdb={version="1.4", features=["buildtime_bindgen", "bundled", "chrono", "r2d2"]}
6767
rusqlite={version="0.37", features=["bundled", "modern_sqlite", "chrono"]}
6868
r2d2={version="0.8", default-features=false}
6969
r2d2_sqlite="0.31"

data/licenses-cargo.json

Lines changed: 1 addition & 1 deletion
Large diffs are not rendered by default.

data/licenses-npm.json

Lines changed: 1 addition & 1 deletion
Large diffs are not rendered by default.

src/app/core/events.rs

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use std::sync::Arc;
22

3+
use arc_swap::ArcSwap;
34
use chrono::{DateTime, Utc};
4-
use crossbeam_channel::Receiver;
5-
use crossbeam_utils::sync::ShardedLock;
65
use eyre::{Result, bail};
6+
use std::sync::mpsc::Receiver;
77

88
use crate::app::models::{Event, event_params};
99
use crate::app::{DuckDBPool, EVENT_BATCH_INTERVAL, SqlitePool};
@@ -13,7 +13,7 @@ use crate::utils::hash::generate_salt;
1313
pub struct LiwanEvents {
1414
duckdb: DuckDBPool,
1515
sqlite: SqlitePool,
16-
daily_salt: Arc<ShardedLock<(String, DateTime<Utc>)>>,
16+
daily_salt: Arc<ArcSwap<(String, DateTime<Utc>)>>,
1717
}
1818

1919
impl LiwanEvents {
@@ -24,15 +24,12 @@ impl LiwanEvents {
2424
Ok((row.get(0)?, row.get(1)?))
2525
})?
2626
};
27-
Ok(Self { duckdb, sqlite, daily_salt: ShardedLock::new(daily_salt).into() })
27+
Ok(Self { duckdb, sqlite, daily_salt: ArcSwap::new(daily_salt.into()).into() })
2828
}
2929

3030
/// Get the daily salt, generating a new one if the current one is older than 24 hours
3131
pub fn get_salt(&self) -> Result<String> {
32-
let (salt, updated_at) = {
33-
let salt = self.daily_salt.read().map_err(|_| eyre::eyre!("Failed to acquire read lock"))?;
34-
salt.clone()
35-
};
32+
let (salt, updated_at) = &**self.daily_salt.load();
3633

3734
// if the salt is older than 24 hours, replace it with a new one (utils::generate_salt)
3835
if (Utc::now() - updated_at) > chrono::Duration::hours(24) {
@@ -41,15 +38,11 @@ impl LiwanEvents {
4138
let now = Utc::now();
4239
let conn = self.sqlite.get()?;
4340
conn.execute("update salts set salt = ?, updated_at = ? where id = 1", rusqlite::params![&new_salt, now])?;
44-
45-
if let Ok(mut daily_salt) = self.daily_salt.try_write() {
46-
daily_salt.0.clone_from(&new_salt);
47-
daily_salt.1 = now;
48-
return Ok(new_salt);
49-
}
41+
self.daily_salt.store((new_salt.clone(), now).into());
42+
Ok(new_salt)
43+
} else {
44+
Ok(salt.clone())
5045
}
51-
52-
Ok(salt)
5346
}
5447

5548
/// Append events in batch

src/app/core/geoip.rs

Lines changed: 48 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
#![allow(dead_code)]
22

3-
use std::collections::HashMap;
43
use std::io::{self};
54
use std::net::IpAddr;
65
use std::path::{Path, PathBuf};
76
use std::sync::Arc;
87
use std::sync::atomic::{AtomicBool, Ordering};
98

109
use crate::app::SqlitePool;
11-
use crossbeam_utils::sync::ShardedLock;
10+
use arc_swap::ArcSwapOption;
1211
use eyre::{Context, OptionExt, Result};
1312
use futures_lite::StreamExt;
1413
use md5::{Digest, Md5};
@@ -19,6 +18,7 @@ const BASE_URL: &str = "https://updates.maxmind.com";
1918
const METADATA_ENDPOINT: &str = "/geoip/updates/metadata?edition_id=";
2019
const DOWNLOAD_ENDPOINT: &str = "/geoip/databases/";
2120

21+
#[derive(Default)]
2222
pub struct LookupResult {
2323
pub city: Option<String>,
2424
pub country_code: Option<String>,
@@ -27,30 +27,26 @@ pub struct LookupResult {
2727
#[derive(Clone)]
2828
pub struct LiwanGeoIP {
2929
pool: SqlitePool,
30-
reader: Arc<ShardedLock<Option<maxminddb::Reader<Vec<u8>>>>>,
30+
reader: Arc<ArcSwapOption<maxminddb::Reader<Vec<u8>>>>,
3131

3232
downloading: Arc<AtomicBool>,
33-
config: crate::config::Config,
3433
geoip: crate::config::GeoIpConfig,
3534
path: PathBuf,
3635
}
3736

3837
impl LiwanGeoIP {
39-
pub fn try_new(config: crate::config::Config, pool: SqlitePool) -> Result<Option<Self>> {
40-
let Some(geoip) = &config.geoip else {
41-
tracing::trace!("GeoIP support disabled, skipping...");
42-
return Ok(None);
43-
};
44-
38+
pub fn try_new(config: crate::config::Config, pool: SqlitePool) -> Result<Self> {
39+
let geoip = config.geoip;
4540
if geoip.maxmind_account_id.is_none() && geoip.maxmind_license_key.is_none() && geoip.maxmind_db_path.is_none()
4641
{
4742
tracing::trace!("GeoIP support disabled, skipping...");
48-
return Ok(None);
43+
return Ok(Self::noop(pool));
4944
}
5045

51-
let edition = geoip.maxmind_edition.as_deref().unwrap_or("GeoLite2-City");
46+
let edition = &geoip.maxmind_edition;
5247
let default_path = PathBuf::from(config.data_dir.clone()).join(format!("./geoip/{edition}.mmdb"));
5348
let path = geoip.maxmind_db_path.as_ref().map_or(default_path, PathBuf::from);
49+
5450
if let Some(parent) = path.parent() {
5551
std::fs::create_dir_all(parent)?;
5652
}
@@ -60,28 +56,38 @@ impl LiwanGeoIP {
6056
}
6157

6258
tracing::info!(database = geoip.maxmind_db_path, "GeoIP support enabled, loading database");
63-
let reader = if path.exists() {
64-
Some(maxminddb::Reader::open_readfile(path.clone()).expect("Failed to open GeoIP database file"))
65-
} else {
66-
None
67-
};
6859

69-
Ok(Some(Self {
70-
geoip: geoip.clone(),
71-
config,
60+
let reader = path.exists().then(|| {
61+
maxminddb::Reader::open_readfile(path.clone()).expect("Failed to open GeoIP database file").into()
62+
});
63+
64+
Ok(Self { geoip, pool, reader: ArcSwapOption::new(reader).into(), path, downloading: Default::default() })
65+
}
66+
67+
fn is_enabled(&self) -> bool {
68+
self.reader.load().is_some() || self.downloading.load(Ordering::Acquire)
69+
}
70+
71+
fn noop(pool: SqlitePool) -> Self {
72+
Self {
73+
geoip: Default::default(),
7274
pool,
73-
reader: Arc::new(ShardedLock::new(reader)),
74-
path,
75-
downloading: Arc::new(AtomicBool::new(false)),
76-
}))
75+
reader: ArcSwapOption::new(None).into(),
76+
downloading: Default::default(),
77+
path: PathBuf::new(),
78+
}
7779
}
7880

7981
// Lookup the IP address in the GeoIP database
8082
pub fn lookup(&self, ip: &IpAddr) -> Result<LookupResult> {
81-
let reader = self.reader.read().map_err(|_| eyre::eyre!("Failed to acquire GeoIP reader lock"))?;
82-
let reader = reader.as_ref().ok_or_eyre("GeoIP database not found")?;
83-
let lookup: maxminddb::geoip2::City =
84-
reader.lookup(*ip)?.ok_or_else(|| eyre::eyre!("No data found for IP address"))?;
83+
let Some(reader) = &*self.reader.load() else {
84+
return Ok(Default::default());
85+
};
86+
87+
let lookup = reader
88+
.lookup::<maxminddb::geoip2::City>(*ip)?
89+
.ok_or_else(|| eyre::eyre!("No data found for IP address"))?;
90+
8591
let city = lookup.city.and_then(|city| city.names.and_then(|names| names.get("en").map(|s| (*s).to_string())));
8692
let country_code = lookup.country.and_then(|country| country.iso_code.map(ToString::to_string));
8793
Ok(LookupResult { city, country_code })
@@ -93,16 +99,17 @@ impl LiwanGeoIP {
9399
return Ok(());
94100
}
95101

96-
let maxmind_edition = self.geoip.maxmind_edition.clone().ok_or_eyre("MaxMind edition not found")?;
97-
let maxmind_account_id = self.geoip.maxmind_account_id.clone().ok_or_eyre("MaxMind account ID not found")?;
98-
let maxmind_license_key = self.geoip.maxmind_license_key.clone().ok_or_eyre("MaxMind license key not found")?;
102+
let maxmind_edition = &self.geoip.maxmind_edition;
103+
let maxmind_account_id = self.geoip.maxmind_account_id.as_deref().ok_or_eyre("MaxMind account ID not found")?;
104+
let maxmind_license_key =
105+
self.geoip.maxmind_license_key.as_deref().ok_or_eyre("MaxMind license key not found")?;
99106

100107
let db_exists = self.path.exists();
101108
let db_md5 = if db_exists { file_md5(&self.path)? } else { String::new() };
102109

103-
let mut update = false;
110+
let mut update = !db_exists;
104111
if db_exists {
105-
match get_latest_md5(&maxmind_edition, &maxmind_account_id, &maxmind_license_key).await {
112+
match get_latest_md5(maxmind_edition, maxmind_account_id, maxmind_license_key).await {
106113
Ok(latest_md5) => {
107114
if latest_md5 != db_md5 {
108115
tracing::info!("GeoIP database outdated, downloading...");
@@ -115,11 +122,10 @@ impl LiwanGeoIP {
115122
};
116123
} else {
117124
tracing::info!("GeoIP database doesn't exist, attempting to download...");
118-
update = true;
119125
}
120126

121127
if update {
122-
let file = match download_maxmind_db(&maxmind_edition, &maxmind_account_id, &maxmind_license_key).await {
128+
let file = match download_maxmind_db(maxmind_edition, maxmind_account_id, maxmind_license_key).await {
123129
Ok(file) => file,
124130
Err(e) => {
125131
tracing::warn!(error = ?e, "Failed to download GeoIP database, skipping update");
@@ -129,18 +135,15 @@ impl LiwanGeoIP {
129135
};
130136

131137
// close the current reader to free up the file
132-
{
133-
let mut reader = self.reader.write().unwrap();
134-
reader.take();
135-
}
138+
self.reader.swap(None);
136139

137140
// move the downloaded file to the correct path
138141
std::fs::copy(&file, &self.path)?;
139142
std::fs::remove_file(file)?;
140143

141144
// open the new reader
142145
let reader = maxminddb::Reader::open_readfile(self.path.clone())?;
143-
*self.reader.write().unwrap() = Some(reader);
146+
self.reader.store(Some(reader.into()));
144147

145148
let path = std::fs::canonicalize(&self.path)?;
146149
tracing::info!(path = ?path, "GeoIP database updated successfully");
@@ -151,8 +154,10 @@ impl LiwanGeoIP {
151154
}
152155
}
153156

154-
pub fn keep_updated(geoip: Option<LiwanGeoIP>) {
155-
let Some(geoip) = geoip else { return };
157+
pub fn keep_updated(geoip: LiwanGeoIP) {
158+
if !geoip.is_enabled() {
159+
return;
160+
}
156161

157162
tokio::task::spawn(async move {
158163
if let Err(e) = geoip.check_for_updates().await {
@@ -182,7 +187,7 @@ async fn get_latest_md5(edition: &str, account_id: &str, license_key: &str) -> R
182187
.basic_auth(account_id, Some(license_key))
183188
.send()
184189
.await?
185-
.json::<HashMap<String, Vec<HashMap<String, String>>>>()
190+
.json::<ahash::HashMap<String, Vec<ahash::HashMap<String, String>>>>()
186191
.await?;
187192

188193
Ok(response

src/app/core/onboarding.rs

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,34 @@
1-
use std::sync::Arc;
2-
3-
use crossbeam_utils::sync::ShardedLock;
1+
use arc_swap::ArcSwapOption;
42
use eyre::Result;
3+
use std::sync::Arc;
54

65
use crate::{app::SqlitePool, utils::hash::onboarding_token};
76

87
#[derive(Clone)]
98
pub struct LiwanOnboarding {
10-
token: Arc<ShardedLock<Option<String>>>,
9+
token: Arc<ArcSwapOption<String>>,
1110
}
1211

1312
impl LiwanOnboarding {
1413
pub fn try_new(pool: &SqlitePool) -> Result<Self> {
1514
let onboarding = {
1615
tracing::debug!("Checking if an onboarding token needs to be generated");
1716
let conn = pool.get()?;
18-
let mut stmt = conn.prepare("select 1 from users limit 1")?;
19-
ShardedLock::new(if stmt.exists([])? { None } else { Some(onboarding_token()) })
17+
let onboarded = conn.prepare("select 1 from users limit 1")?.exists([])?;
18+
ArcSwapOption::new(onboarded.then(|| onboarding_token().into()))
2019
};
2120

2221
Ok(Self { token: onboarding.into() })
2322
}
2423

2524
/// Get the onboarding token, if it exists
2625
pub fn token(&self) -> Result<Option<String>> {
27-
Ok(self
28-
.token
29-
.read()
30-
.map_err(|_| eyre::eyre!("Failed to acquire onboarding token read lock"))?
31-
.as_ref()
32-
.cloned())
26+
Ok((self.token.load()).as_ref().map(|v| (**v).clone()))
3327
}
3428

3529
/// Clear the onboarding token to prevent it from being used again
3630
pub fn clear(&self) -> Result<()> {
37-
let mut onboarding =
38-
self.token.write().map_err(|_| eyre::eyre!("Failed to acquire onboarding token write lock"))?;
39-
*onboarding = None;
31+
self.token.store(None);
4032
Ok(())
4133
}
4234
}

src/app/db.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::path::PathBuf;
1010

1111
pub(super) fn init_duckdb(
1212
path: &PathBuf,
13-
duckdb_config: Option<DuckdbConfig>,
13+
duckdb_config: DuckdbConfig,
1414
mut migrations_runner: Runner,
1515
) -> Result<r2d2::Pool<DuckdbConnectionManager>> {
1616
let mut flags = duckdb::Config::default()
@@ -19,14 +19,12 @@ pub(super) fn init_duckdb(
1919
.with("enable_fsst_vectors", "true")?
2020
.with("allocator_background_threads", "true")?;
2121

22-
if let Some(duckdb_config) = duckdb_config {
23-
if let Some(memory_limit) = duckdb_config.memory_limit {
24-
flags = flags.max_memory(&memory_limit)?;
25-
}
22+
if let Some(memory_limit) = duckdb_config.memory_limit {
23+
flags = flags.max_memory(&memory_limit)?;
24+
}
2625

27-
if let Some(threads) = duckdb_config.threads {
28-
flags = flags.threads(threads.get().into())?;
29-
}
26+
if let Some(threads) = duckdb_config.threads {
27+
flags = flags.threads(threads.get().into())?;
3028
}
3129

3230
let conn = DuckdbConnectionManager::file_with_flags(path, flags).map_err(|e| {

src/app/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ pub struct Liwan {
2525
pub onboarding: core::onboarding::LiwanOnboarding,
2626
pub entities: core::entities::LiwanEntities,
2727
pub projects: core::projects::LiwanProjects,
28-
pub geoip: Option<core::geoip::LiwanGeoIP>,
28+
pub geoip: core::geoip::LiwanGeoIP,
2929

3030
pub config: Config,
3131
}

src/cli.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
22
app::{Liwan, models::UserRole},
3-
config::{Config, DEFAULT_CONFIG},
3+
config::{Config, DEFAULT_CONFIG, GeoIpConfig},
44
};
55
use argh::FromArgs;
66
use eyre::Result;
@@ -86,7 +86,7 @@ pub struct AddUser {
8686
}
8787

8888
pub fn handle_command(mut config: Config, cmd: Command) -> Result<()> {
89-
config.geoip = None; // disable GeoIP support in CLI
89+
config.geoip = GeoIpConfig::default(); // disable GeoIP in CLI commands
9090

9191
match cmd {
9292
Command::UpdatePassword(update) => {

0 commit comments

Comments
 (0)