Skip to content

Commit d40d712

Browse files
authored
feat(sync): daemon support for advanced sync (#528)
* feat(sync):daemon support for advanced sync * docs(sync):update docs for advanced sync daemon support * feat:remove '*' parameter for syncing all buckets * Update README.md * feat: update and use parse_list for buckets
1 parent 10c50ba commit d40d712

File tree

4 files changed

+148
-83
lines changed

4 files changed

+148
-83
lines changed

aw-sync/README.md

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,26 @@ Was originally prototyped as a PR to aw-server: https://github.com/ActivityWatch
1515
This will start a daemon which pulls and pushes events with the sync directory (`~/ActivityWatchSync` by default) every 5 minutes:
1616

1717
```sh
18+
# Basic sync daemon (syncs all buckets every 5 minutes)
1819
aw-sync
20+
21+
# Same as above
22+
aw-sync daemon
23+
24+
# Sync daemon with specific buckets only
25+
aw-sync daemon --buckets "aw-watcher-window,aw-watcher-afk" --start-date "2024-01-01"
26+
27+
# Sync all buckets once and exit
28+
aw-sync sync --start-date "2024-01-01"
1929
```
2030

21-
For more options, see `aw-sync --help`.
31+
For more options, see `aw-sync --help`. Some notable options:
32+
- `--buckets`: Specify which buckets to sync (comma-separated). By default, all buckets are synced.
33+
- Use `--buckets "bucket1,bucket2"` to sync specific buckets
34+
- Not specifying this option syncs all buckets by default
35+
- `--start-date`: Only sync events after this date (YYYY-MM-DD)
36+
- `--sync-db`: Specify a specific database file in the sync directory
37+
- `--mode`: Choose sync mode: "push", "pull", or "both" (default: "both")
2238

2339
### Setting up sync
2440

@@ -117,5 +133,5 @@ PORT=5668
117133
cargo run --bin aw-server -- --testing --port $PORT --dbpath test-$PORT.sqlite --device-id $PORT --no-legacy-import
118134
```
119135

120-
Now run `cargo run --bin aw-sync-rust -- --port 5668` to pull buckets from the sync dir (retrieving events from the 5667 instance) and push buckets from the 5668 instance to the sync dir.
136+
Now run `cargo run --bin aw-sync -- --port 5668` to pull buckets from the sync dir (retrieving events from the 5667 instance) and push buckets from the 5668 instance to the sync dir.
121137

aw-sync/src/main.rs

Lines changed: 120 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
// - [x] Setup local sync bucket
33
// - [x] Import local buckets and sync events from aw-server (either through API or through creating a read-only Datastore)
44
// - [x] Import buckets and sync events from remotes
5-
// - [ ] Add CLI arguments
5+
// - [x] Add CLI arguments
66
// - [x] For which local server to use
77
// - [x] For which sync dir to use
8-
// - [ ] Date to start syncing from
8+
// - [x] Date to start syncing from
99

1010
#[macro_use]
1111
extern crate log;
@@ -60,33 +60,43 @@ struct Opts {
6060
enum Commands {
6161
/// Daemon subcommand
6262
/// Starts aw-sync as a daemon, which will sync every 5 minutes.
63-
Daemon {},
63+
Daemon {
64+
/// Date to start syncing from.
65+
/// If not specified, start from beginning.
66+
/// Format: YYYY-MM-DD
67+
#[clap(long, value_parser=parse_start_date)]
68+
start_date: Option<DateTime<Utc>>,
69+
70+
/// Specify buckets to sync using a comma-separated list.
71+
/// By default, all buckets are synced.
72+
#[clap(long, value_parser=parse_list)]
73+
buckets: Option<Vec<String>>,
74+
75+
/// Full path to sync db file
76+
/// Useful for syncing buckets from a specific db file in the sync directory.
77+
/// Must be a valid absolute path to a file in the sync directory.
78+
#[clap(long)]
79+
sync_db: Option<PathBuf>,
80+
},
6481

65-
/// Sync subcommand (basic)
82+
/// Sync subcommand
6683
///
67-
/// Pulls remote buckets then pushes local buckets.
84+
/// Syncs data between local aw-server and sync directory.
85+
/// First pulls remote buckets from the sync directory to the local aw-server.
86+
/// Then pushes local buckets from the aw-server to the local sync directory.
6887
Sync {
6988
/// Host(s) to pull from, comma separated. Will pull from all hosts if not specified.
7089
#[clap(long, value_parser=parse_list)]
7190
host: Option<Vec<String>>,
72-
},
7391

74-
/// Sync subcommand (advanced)
75-
///
76-
/// Pulls remote buckets then pushes local buckets.
77-
/// First pulls remote buckets in the sync directory to the local aw-server.
78-
/// Then pushes local buckets from the aw-server to the local sync directory.
79-
#[clap(arg_required_else_help = true)]
80-
SyncAdvanced {
8192
/// Date to start syncing from.
8293
/// If not specified, start from beginning.
83-
/// NOTE: might be unstable, as count cannot be used to verify integrity of sync.
8494
/// Format: YYYY-MM-DD
8595
#[clap(long, value_parser=parse_start_date)]
8696
start_date: Option<DateTime<Utc>>,
8797

8898
/// Specify buckets to sync using a comma-separated list.
89-
/// If not specified, all buckets will be synced.
99+
/// By default, all buckets are synced.
90100
#[clap(long, value_parser=parse_list)]
91101
buckets: Option<Vec<String>>,
92102

@@ -111,7 +121,18 @@ fn parse_start_date(arg: &str) -> Result<DateTime<Utc>, chrono::ParseError> {
111121
}
112122

113123
fn parse_list(arg: &str) -> Result<Vec<String>, clap::Error> {
114-
Ok(arg.split(',').map(|s| s.to_string()).collect())
124+
// If the argument is empty or just whitespace, return an empty Vec
125+
// This handles the case when --buckets is used without a value
126+
if arg.trim().is_empty() {
127+
return Ok(vec![]);
128+
}
129+
130+
// Otherwise, split by comma, trim each part, and filter out empty strings
131+
Ok(arg
132+
.split(',')
133+
.map(|s| s.trim().to_string())
134+
.filter(|s| !s.is_empty())
135+
.collect())
115136
}
116137

117138
fn main() -> Result<(), Box<dyn Error>> {
@@ -139,60 +160,76 @@ fn main() -> Result<(), Box<dyn Error>> {
139160

140161
let client = AwClient::new(&opts.host, port, "aw-sync")?;
141162

142-
// if opts.command is None, then we're using the default subcommand (Sync)
143-
match opts.command.unwrap_or(Commands::Daemon {}) {
163+
// if opts.command is None, then we're using the default subcommand (Daemon)
164+
match opts.command.unwrap_or(Commands::Daemon {
165+
start_date: None,
166+
buckets: None,
167+
sync_db: None,
168+
}) {
144169
// Start daemon
145-
Commands::Daemon {} => {
170+
Commands::Daemon {
171+
start_date,
172+
buckets,
173+
sync_db,
174+
} => {
146175
info!("Starting daemon...");
147-
daemon(&client)?;
148-
}
149-
// Perform basic sync
150-
Commands::Sync { host } => {
151-
// Pull
152-
match host {
153-
Some(hosts) => {
154-
for host in hosts.iter() {
155-
info!("Pulling from host: {}", host);
156-
sync_wrapper::pull(host, &client)?;
157-
}
158-
}
159-
None => {
160-
info!("Pulling from all hosts");
161-
sync_wrapper::pull_all(&client)?;
162-
}
163-
}
164176

165-
// Push
166-
info!("Pushing local data");
167-
sync_wrapper::push(&client)?
177+
let effective_buckets = buckets;
178+
179+
daemon(&client, start_date, effective_buckets, sync_db)?;
168180
}
169-
// Perform two-way sync
170-
Commands::SyncAdvanced {
181+
// Perform sync
182+
Commands::Sync {
183+
host,
171184
start_date,
172185
buckets,
173186
mode,
174187
sync_db,
175188
} => {
176-
let sync_dir = dirs::get_sync_dir()?;
177-
if let Some(db_path) = &sync_db {
178-
info!("Using sync db: {}", &db_path.display());
189+
let effective_buckets = buckets;
179190

180-
if !db_path.is_absolute() {
181-
Err("Sync db path must be absolute")?
182-
}
183-
if !db_path.starts_with(&sync_dir) {
184-
Err("Sync db path must be in sync directory")?
191+
// If advanced options are provided, use advanced sync mode
192+
if start_date.is_some() || effective_buckets.is_some() || sync_db.is_some() {
193+
let sync_dir = dirs::get_sync_dir()?;
194+
if let Some(db_path) = &sync_db {
195+
info!("Using sync db: {}", &db_path.display());
196+
197+
if !db_path.is_absolute() {
198+
Err("Sync db path must be absolute")?
199+
}
200+
if !db_path.starts_with(&sync_dir) {
201+
Err("Sync db path must be in sync directory")?
202+
}
185203
}
186-
}
187204

188-
let sync_spec = sync::SyncSpec {
189-
path: sync_dir,
190-
path_db: sync_db,
191-
buckets,
192-
start: start_date,
193-
};
205+
let sync_spec = sync::SyncSpec {
206+
path: sync_dir,
207+
path_db: sync_db,
208+
buckets: effective_buckets,
209+
start: start_date,
210+
};
211+
212+
sync::sync_run(&client, &sync_spec, mode)?
213+
} else {
214+
// Simple host-based sync mode (backwards compatibility)
215+
// Pull
216+
match host {
217+
Some(hosts) => {
218+
for host in hosts.iter() {
219+
info!("Pulling from host: {}", host);
220+
sync_wrapper::pull(host, &client)?;
221+
}
222+
}
223+
None => {
224+
info!("Pulling from all hosts");
225+
sync_wrapper::pull_all(&client)?;
226+
}
227+
}
194228

195-
sync::sync_run(&client, &sync_spec, mode)?
229+
// Push
230+
info!("Pushing local data");
231+
sync_wrapper::push(&client)?
232+
}
196233
}
197234

198235
// List all buckets
@@ -207,23 +244,45 @@ fn main() -> Result<(), Box<dyn Error>> {
207244
Ok(())
208245
}
209246

210-
fn daemon(client: &AwClient) -> Result<(), Box<dyn Error>> {
247+
fn daemon(
248+
client: &AwClient,
249+
start_date: Option<DateTime<Utc>>,
250+
buckets: Option<Vec<String>>,
251+
sync_db: Option<PathBuf>,
252+
) -> Result<(), Box<dyn Error>> {
211253
let (tx, rx) = channel();
212254

213255
ctrlc::set_handler(move || {
214256
let _ = tx.send(());
215257
})?;
216258

259+
let sync_dir = dirs::get_sync_dir()?;
260+
if let Some(db_path) = &sync_db {
261+
info!("Using sync db: {}", &db_path.display());
262+
263+
if !db_path.is_absolute() {
264+
Err("Sync db path must be absolute")?
265+
}
266+
if !db_path.starts_with(&sync_dir) {
267+
Err("Sync db path must be in sync directory")?
268+
}
269+
}
270+
271+
let sync_spec = sync::SyncSpec {
272+
path: sync_dir,
273+
buckets,
274+
path_db: sync_db,
275+
start: start_date,
276+
};
277+
217278
loop {
218-
if let Err(e) = daemon_sync_cycle(client) {
279+
if let Err(e) = sync::sync_run(client, &sync_spec, sync::SyncMode::Both) {
219280
error!("Error during sync cycle: {}", e);
220-
// Re-throw the error
221281
return Err(e);
222282
}
223283

224284
info!("Sync pass done, sleeping for 5 minutes");
225285

226-
// Wait for either the sleep duration or a termination signal
227286
match rx.recv_timeout(Duration::from_secs(300)) {
228287
Ok(_) | Err(RecvTimeoutError::Disconnected) => {
229288
info!("Termination signal received, shutting down.");
@@ -237,13 +296,3 @@ fn daemon(client: &AwClient) -> Result<(), Box<dyn Error>> {
237296

238297
Ok(())
239298
}
240-
241-
fn daemon_sync_cycle(client: &AwClient) -> Result<(), Box<dyn Error>> {
242-
info!("Pulling from all hosts");
243-
sync_wrapper::pull_all(client)?;
244-
245-
info!("Pushing local data");
246-
sync_wrapper::push(client)?;
247-
248-
Ok(())
249-
}

aw-sync/src/sync.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,12 +247,18 @@ pub fn sync_datastores(
247247
.get_buckets()
248248
.unwrap()
249249
.iter_mut()
250-
// If buckets vec isn't empty, filter out buckets not in the buckets vec
250+
// Only filter buckets if specific bucket IDs are provided
251251
.filter(|tup| {
252252
let bucket = &tup.1;
253253
if let Some(buckets) = &sync_spec.buckets {
254-
buckets.iter().any(|b_id| b_id == &bucket.id)
254+
// If "*" is in the buckets list or no buckets specified, sync all buckets
255+
if buckets.iter().any(|b_id| b_id == "*") || buckets.is_empty() {
256+
true
257+
} else {
258+
buckets.iter().any(|b_id| b_id == &bucket.id)
259+
}
255260
} else {
261+
// By default, sync all buckets
256262
true
257263
}
258264
})

aw-sync/src/sync_wrapper.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,7 @@ pub fn pull(host: &str, client: &AwClient) -> Result<(), Box<dyn Error>> {
4848
let sync_spec = SyncSpec {
4949
path: sync_dir.clone(),
5050
path_db: Some(db.path().clone()),
51-
buckets: Some(vec![
52-
format!("aw-watcher-window_{}", host),
53-
format!("aw-watcher-afk_{}", host),
54-
]),
51+
buckets: None, // Sync all buckets by default
5552
start: None,
5653
};
5754
sync_run(client, &sync_spec, SyncMode::Pull)?;
@@ -67,10 +64,7 @@ pub fn push(client: &AwClient) -> Result<(), Box<dyn Error>> {
6764
let sync_spec = SyncSpec {
6865
path: sync_dir,
6966
path_db: None,
70-
buckets: Some(vec![
71-
format!("aw-watcher-window_{}", client.hostname),
72-
format!("aw-watcher-afk_{}", client.hostname),
73-
]),
67+
buckets: None, // Sync all buckets by default
7468
start: None,
7569
};
7670
sync_run(client, &sync_spec, SyncMode::Push)?;

0 commit comments

Comments
 (0)