Skip to content

Commit ea912cc

Browse files
committed
Support PROCESS-NAME rule
1 parent e588194 commit ea912cc

File tree

14 files changed

+461
-36
lines changed

14 files changed

+461
-36
lines changed

leaf/Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ openssl-tls = ["openssl", "tokio-openssl", "openssl-probe"]
9595
config-conf = ["regex"]
9696
config-json = ["serde", "serde_derive", "serde_json"]
9797

98+
# Router rules
99+
rule-process-name = ["regex"]
100+
98101
# Outbounds
99102
outbound-direct = []
100103
outbound-drop = []
@@ -164,6 +167,7 @@ colored = "2.1"
164167
maxminddb = { version = "0.24", features = ["mmap"] }
165168
memmap2 = "0.9"
166169
cidr = "0.2"
170+
regex = { version = "1.10", optional = true }
167171

168172
# outbound-select
169173
directories = { version = "4.0", optional = true }
@@ -178,7 +182,6 @@ serde_derive = { version = "1.0", optional = true }
178182
serde = { version = "1.0", optional = true }
179183

180184
# config-conf
181-
regex = { version = "1.10", optional = true }
182185

183186
openssl = { version = "0.10", features = ["vendored"], optional = true }
184187

leaf/src/app/dispatcher.rs

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::convert::TryFrom;
22
use std::io::{self, ErrorKind};
3+
use std::path::Path;
34
use std::sync::Arc;
45
use std::time::Duration;
56

@@ -43,15 +44,43 @@ fn log_request(
4344
} else {
4445
(sess.network.to_string(), outbound_tag.to_string())
4546
};
46-
info!(
47-
"[{}] [{}] [{}] [{}] [{}] [{}]",
48-
sess.forwarded_source.unwrap_or_else(|| sess.source.ip()),
49-
network,
50-
&sess.inbound_tag,
51-
outbound_tag,
52-
hs,
53-
&sess.destination,
54-
);
47+
48+
#[cfg(feature = "rule-process-name")]
49+
{
50+
let process_name = sess
51+
.process_name
52+
.as_ref()
53+
.map(|x| {
54+
Path::new(x)
55+
.file_name()
56+
.and_then(|name| name.to_str())
57+
.unwrap_or(x)
58+
})
59+
.unwrap_or("");
60+
info!(
61+
"[{}] [{}] [{}] [{}] [{}] [{}] [{}]",
62+
process_name,
63+
sess.forwarded_source.unwrap_or_else(|| sess.source.ip()),
64+
network,
65+
&sess.inbound_tag,
66+
outbound_tag,
67+
hs,
68+
&sess.destination,
69+
);
70+
}
71+
72+
#[cfg(not(feature = "rule-process-name"))]
73+
{
74+
info!(
75+
"[{}] [{}] [{}] [{}] [{}] [{}]",
76+
sess.forwarded_source.unwrap_or_else(|| sess.source.ip()),
77+
network,
78+
&sess.inbound_tag,
79+
outbound_tag,
80+
hs,
81+
&sess.destination,
82+
);
83+
}
5584
}
5685

5786
pub struct Dispatcher {

leaf/src/app/nat_manager.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -133,13 +133,14 @@ impl NatManager {
133133
source: dgram_src.address,
134134
destination: pkt.dst_addr.clone(),
135135
inbound_tag: inbound_tag.to_string(),
136+
process_name: dgram_src.process_name.clone(),
136137
..Default::default()
137138
});
138139
if sess.inbound_tag.is_empty() {
139140
sess.inbound_tag = inbound_tag.to_string();
140141
}
141142

142-
self.add_session(sess, *dgram_src, client_ch_tx.clone(), &mut guard)
143+
self.add_session(sess, dgram_src.clone(), client_ch_tx.clone(), &mut guard)
143144
.await;
144145

145146
debug!(
@@ -170,36 +171,38 @@ impl NatManager {
170171
mpsc::channel(*crate::option::UDP_UPLINK_CHANNEL_SIZE);
171172
let (downlink_abort_tx, downlink_abort_rx) = oneshot::channel();
172173

173-
guard.insert(raddr, (target_ch_tx, downlink_abort_tx, Instant::now()));
174+
guard.insert(raddr.clone(), (target_ch_tx, downlink_abort_tx, Instant::now()));
174175

175176
let dispatcher = self.dispatcher.clone();
176177
let sessions = self.sessions.clone();
177178

178179
// Spawns a new task for dispatching to avoid blocking the current task,
179180
// because we have stream type transports for UDP traffic, establishing a
180181
// TCP stream would block the task.
182+
let raddr_cloned = raddr.clone();
181183
tokio::spawn(async move {
182184
// new socket to communicate with the target.
183185
let socket = match dispatcher.dispatch_datagram(sess).await {
184186
Ok(s) => s,
185187
Err(e) => {
186-
debug!("dispatch {} failed: {}", &raddr, e);
187-
sessions.lock().await.remove(&raddr);
188+
debug!("dispatch {} failed: {}", &raddr_cloned, e);
189+
sessions.lock().await.remove(&raddr_cloned);
188190
return;
189191
}
190192
};
191193

192194
let (mut target_sock_recv, mut target_sock_send) = socket.split();
193195

194196
// downlink
197+
let raddr_downlink = raddr_cloned.clone();
195198
let downlink_task = async move {
196199
let mut buf = vec![0u8; *crate::option::DATAGRAM_BUFFER_SIZE * 1024];
197200
loop {
198201
match target_sock_recv.recv_from(&mut buf).await {
199202
Err(err) => {
200203
debug!(
201204
"Failed to receive downlink packets on session {}: {}",
202-
&raddr, err
205+
&raddr_downlink, err
203206
);
204207
break;
205208
}
@@ -208,20 +211,20 @@ impl NatManager {
208211
let pkt = UdpPacket::new(
209212
buf[..n].to_vec(),
210213
addr.clone(),
211-
SocksAddr::from(raddr.address),
214+
SocksAddr::from(raddr_downlink.address),
212215
);
213216
if let Err(err) = client_ch_tx.send(pkt).await {
214217
debug!(
215218
"Failed to send downlink packets on session {} to {}: {}",
216-
&raddr, &addr, err
219+
&raddr_downlink, &addr, err
217220
);
218221
break;
219222
}
220223

221224
// activity update
222225
{
223226
let mut sessions = sessions.lock().await;
224-
if let Some(sess) = sessions.get_mut(&raddr) {
227+
if let Some(sess) = sessions.get_mut(&raddr_downlink) {
225228
if addr.port() == 53 {
226229
// If the destination port is 53, we assume it's a
227230
// DNS query and set a negative timeout so it will
@@ -237,7 +240,7 @@ impl NatManager {
237240
}
238241
}
239242
}
240-
sessions.lock().await.remove(&raddr);
243+
sessions.lock().await.remove(&raddr_downlink);
241244
};
242245

243246
let (downlink_task, downlink_task_handle) = abortable(downlink_task);
@@ -250,6 +253,7 @@ impl NatManager {
250253
});
251254

252255
// uplink
256+
let raddr_uplink = raddr_cloned.clone();
253257
tokio::spawn(async move {
254258
while let Some(pkt) = target_ch_rx.recv().await {
255259
trace!(
@@ -260,13 +264,13 @@ impl NatManager {
260264
if let Err(e) = target_sock_send.send_to(&pkt.data, &pkt.dst_addr).await {
261265
debug!(
262266
"Failed to send uplink packets on session {} to {}: {:?}",
263-
&raddr, &pkt.dst_addr, e
267+
&raddr_uplink, &pkt.dst_addr, e
264268
);
265269
break;
266270
}
267271
}
268272
if let Err(e) = target_sock_send.close().await {
269-
debug!("Failed to close outbound datagram {}: {}", &raddr, e);
273+
debug!("Failed to close outbound datagram {}: {}", &raddr_uplink, e);
270274
}
271275
});
272276
});

leaf/src/app/router.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ use futures::TryFutureExt;
88
use maxminddb::geoip2::Country;
99
use maxminddb::Mmap;
1010
use tracing::{debug, warn};
11+
#[cfg(feature = "rule-process-name")]
12+
use regex::Regex;
1113

1214
use crate::app::SyncDnsClient;
1315
use crate::config;
@@ -354,6 +356,42 @@ impl Condition for DomainMatcher {
354356
}
355357
}
356358

359+
#[cfg(feature = "rule-process-name")]
360+
pub struct ProcessNameMatcher {
361+
regexes: Vec<Regex>,
362+
}
363+
364+
#[cfg(feature = "rule-process-name")]
365+
impl ProcessNameMatcher {
366+
pub fn new(patterns: Vec<String>) -> Self {
367+
let mut regexes = Vec::new();
368+
for pattern in patterns {
369+
if let Ok(regex) = Regex::new(&pattern) {
370+
regexes.push(regex);
371+
} else {
372+
warn!("Invalid regex pattern: {}", pattern);
373+
}
374+
}
375+
Self { regexes }
376+
}
377+
}
378+
379+
#[cfg(feature = "rule-process-name")]
380+
impl Condition for ProcessNameMatcher {
381+
fn apply(&self, sess: &Session) -> bool {
382+
if let Some(process_name) = sess.process_name.as_ref() {
383+
for regex in &self.regexes {
384+
if regex.is_match(process_name) {
385+
debug!("Matched process_name={} with regex", process_name);
386+
return true;
387+
}
388+
}
389+
}
390+
false
391+
}
392+
}
393+
394+
357395
struct ConditionAnd {
358396
conditions: Vec<Box<dyn Condition>>,
359397
}
@@ -467,6 +505,11 @@ impl Router {
467505
cond_and.add(Box::new(InboundTagMatcher::new(&mut rr.inbound_tags)));
468506
}
469507

508+
#[cfg(feature = "rule-process-name")]
509+
if !rr.process_names.is_empty() {
510+
cond_and.add(Box::new(ProcessNameMatcher::new(rr.process_names.clone())));
511+
}
512+
470513
if cond_and.is_empty() {
471514
warn!("empty rule at target {}", rr.target_tag);
472515
continue;

leaf/src/config/conf/config.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -724,7 +724,7 @@ pub fn from_lines(lines: Vec<io::Result<String>>) -> Result<Config> {
724724

725725
match rule.type_field.as_str() {
726726
"IP-CIDR" | "DOMAIN" | "DOMAIN-SUFFIX" | "DOMAIN-KEYWORD" | "GEOIP" | "EXTERNAL"
727-
| "PORT-RANGE" | "NETWORK" | "INBOUND-TAG" => {
727+
| "PORT-RANGE" | "NETWORK" | "INBOUND-TAG" | "PROCESS-NAME" => {
728728
rule.filter = Some(params[1].to_string());
729729
}
730730
_ => {}
@@ -1493,6 +1493,10 @@ pub fn to_internal(conf: &mut Config) -> Result<internal::Config> {
14931493
"INBOUND-TAG" => {
14941494
rule.inbound_tags.push(ext_filter);
14951495
}
1496+
#[cfg(feature = "rule-process-name")]
1497+
"PROCESS-NAME" => {
1498+
rule.process_names.push(ext_filter);
1499+
}
14961500
_ => {}
14971501
}
14981502
rules.push(rule);

leaf/src/config/internal/config.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ message Router {
264264
repeated string port_ranges = 5;
265265
repeated string networks = 6;
266266
repeated string inbound_tags = 7;
267+
repeated string process_names = 8;
267268
}
268269

269270
repeated Rule rules = 1;

leaf/src/config/internal/config.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4053,6 +4053,8 @@ pub mod router {
40534053
pub networks: ::std::vec::Vec<::std::string::String>,
40544054
// @@protoc_insertion_point(field:Router.Rule.inbound_tags)
40554055
pub inbound_tags: ::std::vec::Vec<::std::string::String>,
4056+
// @@protoc_insertion_point(field:Router.Rule.process_names)
4057+
pub process_names: ::std::vec::Vec<::std::string::String>,
40564058
// special fields
40574059
// @@protoc_insertion_point(special_field:Router.Rule.special_fields)
40584060
pub special_fields: ::protobuf::SpecialFields,
@@ -4101,6 +4103,9 @@ pub mod router {
41014103
58 => {
41024104
self.inbound_tags.push(is.read_string()?);
41034105
},
4106+
66 => {
4107+
self.process_names.push(is.read_string()?);
4108+
},
41044109
tag => {
41054110
::protobuf::rt::read_unknown_or_skip_group(tag, is, self.special_fields.mut_unknown_fields())?;
41064111
},
@@ -4136,6 +4141,9 @@ pub mod router {
41364141
for value in &self.inbound_tags {
41374142
my_size += ::protobuf::rt::string_size(7, &value);
41384143
};
4144+
for value in &self.process_names {
4145+
my_size += ::protobuf::rt::string_size(8, &value);
4146+
};
41394147
my_size += ::protobuf::rt::unknown_fields_size(self.special_fields.unknown_fields());
41404148
self.special_fields.cached_size().set(my_size as u32);
41414149
my_size
@@ -4163,6 +4171,9 @@ pub mod router {
41634171
for v in &self.inbound_tags {
41644172
os.write_string(7, &v)?;
41654173
};
4174+
for v in &self.process_names {
4175+
os.write_string(8, &v)?;
4176+
};
41664177
os.write_unknown_fields(self.special_fields.unknown_fields())?;
41674178
::std::result::Result::Ok(())
41684179
}
@@ -4187,6 +4198,7 @@ pub mod router {
41874198
self.port_ranges.clear();
41884199
self.networks.clear();
41894200
self.inbound_tags.clear();
4201+
self.process_names.clear();
41904202
self.special_fields.clear();
41914203
}
41924204

@@ -4199,6 +4211,7 @@ pub mod router {
41994211
port_ranges: ::std::vec::Vec::new(),
42004212
networks: ::std::vec::Vec::new(),
42014213
inbound_tags: ::std::vec::Vec::new(),
4214+
process_names: ::std::vec::Vec::new(),
42024215
special_fields: ::protobuf::SpecialFields::new(),
42034216
};
42044217
&instance

leaf/src/config/json/config.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,8 @@ pub struct Rule {
275275
pub network: Option<Vec<String>>,
276276
#[serde(rename = "inboundTag")]
277277
pub inbound_tag: Option<Vec<String>>,
278+
#[serde(rename = "processName")]
279+
pub process_name: Option<Vec<String>>,
278280
pub target: String,
279281
}
280282

@@ -1076,6 +1078,12 @@ pub fn to_internal(json: &mut Config) -> Result<internal::Config> {
10761078
rule.inbound_tags.push(it);
10771079
}
10781080
}
1081+
#[cfg(feature = "rule-process-name")]
1082+
if let Some(ext_process_names) = ext_rule.process_name.as_mut() {
1083+
for process_name in ext_process_names.drain(0..) {
1084+
rule.process_names.push(process_name);
1085+
}
1086+
}
10791087
rules.push(rule);
10801088
}
10811089
}

leaf/src/proxy/nf/inbound/datagram.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,17 +78,18 @@ impl InboundDatagramRecvHalf for DatagramRecvHalf {
7878
assert!(buf.len() >= payload_size);
7979
let real_payload = &recv_buf[header_size..header_size + payload_size];
8080

81-
let local_addr = if let Some(info) = super::UDP_LOCAL_INFO.lock().unwrap().get(&id) {
82-
info.local_address.clone()
81+
let (local_addr, process_name) = if let Some(info) = super::UDP_LOCAL_INFO.lock().unwrap().get(&id) {
82+
(info.local_address.clone(), info.process_name.clone())
8383
} else {
8484
return Err(ProxyError::DatagramWarn(anyhow!(format!(
8585
"local socket not found id={}",
8686
id
8787
))));
8888
};
8989

90-
// Override with real source address.
90+
// Override with real source address and process name.
9191
src_addr.address = local_addr;
92+
src_addr.process_name = process_name;
9293

9394
if dst_addr.port() == 53 {
9495
match self.1.generate_fake_response(real_payload).await {

0 commit comments

Comments
 (0)