diff --git a/.github/workflows/stable-test.yml b/.github/workflows/stable-test.yml index fa493a6..6b2b7b4 100644 --- a/.github/workflows/stable-test.yml +++ b/.github/workflows/stable-test.yml @@ -16,19 +16,19 @@ jobs: - uses: actions/checkout@v2 - uses: actions-rs/toolchain@v1 with: - toolchain: stable + toolchain: nightly-2021-06-01 components: rustfmt, clippy - + target: wasm32-unknown-unknown - uses: actions/setup-python@v2 with: python-version: '3.7' architecture: 'x64' - name: Lint - run: cargo clippy --all-targets -- -D warnings + run: make lint - name: Build - run: cargo build --all --verbose - - name: Test - run: cargo test --all + run: make build + - name: Unit Test + run: make test - name: Integration test run: | cd ./tests/ diff --git a/Cargo.lock b/Cargo.lock index de8e9bf..6463340 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -35,6 +35,18 @@ version = "1.0.43" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28ae2b3dec75a406790005a200b1bd89785afc02517a00ca99ecfe093ee9e6cf" +[[package]] +name = "arrayref" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4c527152e37cf757a3f78aae5a06fbeefdb07ccc535c980a3208ee3060dd544" + +[[package]] +name = "arrayvec" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" + [[package]] name = "async-trait" version = "0.1.51" @@ -90,6 +102,27 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" +[[package]] +name = "blake3" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b64485778c4f16a6a5a9d335e80d449ac6c70cdd6a06d2af18a6f6f775a125b3" +dependencies = [ + "arrayref", + "arrayvec", + "cc", + "cfg-if 0.1.10", + "constant_time_eq", + "crypto-mac", + "digest 0.9.0", +] + +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + [[package]] name = "bytes" version = "1.0.1" @@ -102,6 +135,12 @@ version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e70cc2f62c6ce1868963827bd677764c62d07c3d9a3e1fb1177ee1a9ab199eb2" +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" + [[package]] name = "cfg-if" version = "1.0.0" @@ -135,6 +174,141 @@ dependencies = [ "vec_map", ] +[[package]] +name = "cloudabi" +version = "0.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" +dependencies = [ + "bitflags 1.2.1", +] + +[[package]] +name = "constant_time_eq" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" + +[[package]] +name = "convert_case" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" + +[[package]] +name = "cranelift-bforest" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45a9c21f8042b9857bda93f6c1910b9f9f24100187a3d3d52f214a34e3dc5818" +dependencies = [ + "cranelift-entity", +] + +[[package]] +name = "cranelift-codegen" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7853f77a6e4a33c67a69c40f5e1bb982bd2dc5c4a22e17e67b65bbccf9b33b2e" +dependencies = [ + "byteorder", + "cranelift-bforest", + "cranelift-codegen-meta", + "cranelift-codegen-shared", + "cranelift-entity", + "gimli", + "log 0.4.14", + "smallvec", + "target-lexicon", + "thiserror", +] + +[[package]] +name = "cranelift-codegen-meta" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "084cd6d5fb0d1da28acd72c199471bfb09acc703ec8f3bf07b1699584272a3b9" +dependencies = [ + "cranelift-codegen-shared", + "cranelift-entity", +] + +[[package]] +name = "cranelift-codegen-shared" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "701b599783305a58c25027a4d73f2d6b599b2d8ef3f26677275f480b4d51e05d" + +[[package]] +name = "cranelift-entity" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b88e792b28e1ebbc0187b72ba5ba880dad083abe9231a99d19604d10c9e73f38" + +[[package]] +name = "cranelift-native" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32daf082da21c0c05d93394ff4842c2ab7c4991b1f3186a1d952f8ac660edd0b" +dependencies = [ + "cranelift-codegen", + "raw-cpuid", + "target-lexicon", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4" +dependencies = [ + "cfg-if 1.0.0", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6455c0ca19f0d2fbf751b908d5c55c1f5cbc65e03c4225427254b46890bdde1e" +dependencies = [ + "cfg-if 1.0.0", + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd" +dependencies = [ + "cfg-if 1.0.0", + "crossbeam-utils", + "lazy_static", + "memoffset", + "scopeguard", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db" +dependencies = [ + "cfg-if 1.0.0", + "lazy_static", +] + +[[package]] +name = "crypto-mac" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b584a330336237c1eecd3e94266efb216c56ed91225d634cb2991c5f3fd1aeab" +dependencies = [ + "generic-array 0.14.4", + "subtle", +] + [[package]] name = "default-net" version = "0.2.0" @@ -145,12 +319,70 @@ dependencies = [ "pnet 0.27.2", ] +[[package]] +name = "derive_more" +version = "0.99.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40eebddd2156ce1bb37b20bbe5151340a31828b1f2d22ba4141f3531710e38df" +dependencies = [ + "convert_case", + "proc-macro2", + "quote", + "rustc_version 0.3.3", + "syn", +] + +[[package]] +name = "digest" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5" +dependencies = [ + "generic-array 0.12.4", +] + +[[package]] +name = "digest" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066" +dependencies = [ + "generic-array 0.14.4", +] + [[package]] name = "dtoa" version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56899898ce76aaf4a0f24d914c97ea6ed976d42fec6ad33fcbb0a1103e07b2b0" +[[package]] +name = "either" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" + +[[package]] +name = "errno" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa68f2fb9cae9d37c9b2b3584aba698a2e97f72d7aef7b9f7aa71d8b54ce46fe" +dependencies = [ + "errno-dragonfly", + "libc", + "winapi 0.3.9", +] + +[[package]] +name = "errno-dragonfly" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "fnv" version = "1.0.7" @@ -261,17 +493,46 @@ dependencies = [ "slab", ] +[[package]] +name = "generic-array" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffdf9f34f1447443d37393cc6c2b8313aebddcd96906caf34e54c68d8e57d7bd" +dependencies = [ + "typenum", +] + +[[package]] +name = "generic-array" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "501466ecc8a30d1d3b7fc9229b122b2ce8ed6e9d9223f1138d4babb253e51817" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "getrandom" version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", "wasi", ] +[[package]] +name = "gimli" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81dd6190aad0f05ddbbf3245c54ed14ca4aa6dd32f22312b70d8f168c3e3e633" +dependencies = [ + "byteorder", + "indexmap", +] + [[package]] name = "glob" version = "0.2.11" @@ -327,6 +588,12 @@ dependencies = [ "libc", ] +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "http" version = "0.2.4" @@ -432,6 +699,7 @@ checksum = "bc633605454125dec4b66843673f01c7df2b89479b32e0ed634e43a91cff62a5" dependencies = [ "autocfg", "hashbrown", + "serde", ] [[package]] @@ -440,7 +708,7 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bee0328b1209d157ef001c94dd85b4f8f64139adb0eac2659f4b08382b2f474d" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -477,7 +745,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5451d970ceaf1d94b287f060eda6c553b0bd93412986765e3274c28a89b50830" dependencies = [ "lazy_static", - "nix", + "nix 0.20.1", "regex", ] @@ -526,6 +794,15 @@ version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3" +[[package]] +name = "lock_api" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4da24a77a3d8a6d4862d95f72e6fdb9c09a643ecdb402d754004a557f2bec75" +dependencies = [ + "scopeguard", +] + [[package]] name = "lock_api" version = "0.4.4" @@ -550,7 +827,7 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -568,12 +845,28 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" +[[package]] +name = "md5" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" + [[package]] name = "memchr" version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" +[[package]] +name = "memmap" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b" +dependencies = [ + "libc", + "winapi 0.3.9", +] + [[package]] name = "memoffset" version = "0.6.4" @@ -605,6 +898,19 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "nix" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b2e0b4f3320ed72aaedb9a5ac838690a8047c7b275da22711fddff4f8a14229" +dependencies = [ + "bitflags 1.2.1", + "cc", + "cfg-if 0.1.10", + "libc", + "void", +] + [[package]] name = "nix" version = "0.20.1" @@ -613,7 +919,7 @@ checksum = "df8e5e343312e7fbeb2a52139114e9e702991ef9c2aea6817ff2440b35647d56" dependencies = [ "bitflags 1.2.1", "cc", - "cfg-if", + "cfg-if 1.0.0", "libc", "memoffset", ] @@ -662,6 +968,26 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56" +[[package]] +name = "page_size" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eebde548fbbf1ea81a99b128872779c437752fb99f217c45245e1a61dcd9edcd" +dependencies = [ + "libc", + "winapi 0.3.9", +] + +[[package]] +name = "parking_lot" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3a704eb390aafdc107b0e392f56a82b668e3a71366993b5340f5833fd62505e" +dependencies = [ + "lock_api 0.3.4", + "parking_lot_core 0.7.2", +] + [[package]] name = "parking_lot" version = "0.11.1" @@ -669,8 +995,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d7744ac029df22dca6284efe4e898991d28e3085c706c972bcd7da4a27a15eb" dependencies = [ "instant", - "lock_api", - "parking_lot_core", + "lock_api 0.4.4", + "parking_lot_core 0.8.3", +] + +[[package]] +name = "parking_lot_core" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d58c7c768d4ba344e3e8d72518ac13e259d7c7ade24167003b8488e10b6740a3" +dependencies = [ + "cfg-if 0.1.10", + "cloudabi", + "libc", + "redox_syscall 0.1.57", + "smallvec", + "winapi 0.3.9", ] [[package]] @@ -679,10 +1019,10 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "instant", "libc", - "redox_syscall", + "redox_syscall 0.2.10", "smallvec", "winapi 0.3.9", ] @@ -720,6 +1060,15 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "pest" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10f4872ae94d7b90ae48754df22fd42ad52ce740b8f370b03da4835417403e53" +dependencies = [ + "ucd-trie", +] + [[package]] name = "pin-project" version = "1.0.8" @@ -752,6 +1101,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "plugin-example" +version = "0.1.0" +dependencies = [ + "anyhow", + "http", + "log 0.4.14", + "rs-tproxy-plugin", + "serde_json", +] + [[package]] name = "pnet" version = "0.26.0" @@ -1117,6 +1477,48 @@ dependencies = [ "rand_core", ] +[[package]] +name = "raw-cpuid" +version = "7.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "beb71f708fe39b2c5e98076204c3cc094ee5a4c12c4cdb119a2b72dc34164f41" +dependencies = [ + "bitflags 1.2.1", + "cc", + "rustc_version 0.2.3", +] + +[[package]] +name = "rayon" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06aca804d41dbc8ba42dfd964f0d01334eceb64314b9ecf7c5fad5188a06d90" +dependencies = [ + "autocfg", + "crossbeam-deque", + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d78120e2c850279833f1dd3582f730c4ab53ed95aeaaaa862a2a5c71b1656d8e" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-utils", + "lazy_static", + "num_cpus", +] + +[[package]] +name = "redox_syscall" +version = "0.1.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" + [[package]] name = "redox_syscall" version = "0.2.10" @@ -1163,7 +1565,7 @@ dependencies = [ [[package]] name = "rs-tproxy" -version = "0.1.1" +version = "0.4.1" dependencies = [ "anyhow", "async-trait", @@ -1179,8 +1581,10 @@ dependencies = [ "iptables", "json-patch", "libc", + "log 0.4.14", "paw", "pnet 0.28.0", + "rs-tproxy-controller", "rs-tproxy-proxy", "serde", "serde_derive", @@ -1201,13 +1605,13 @@ dependencies = [ [[package]] name = "rs-tproxy-controller" -version = "0.1.0" +version = "0.4.1" dependencies = [ "anyhow", "async-trait", "bincode", "bytes", - "cfg-if", + "cfg-if 1.0.0", "clap", "default-net", "futures", @@ -1218,6 +1622,7 @@ dependencies = [ "iptables", "json-patch", "libc", + "log 0.4.14", "paw", "pnet 0.28.0", "rs-tproxy-proxy", @@ -1238,17 +1643,29 @@ dependencies = [ "wildmatch", ] +[[package]] +name = "rs-tproxy-plugin" +version = "0.4.1" +dependencies = [ + "anyhow", + "http", + "log 0.4.14", + "serde", + "serde_json", +] + [[package]] name = "rs-tproxy-proxy" -version = "0.1.0" +version = "0.4.1" dependencies = [ "anyhow", "async-trait", "base64", "bincode", "bytes", - "cfg-if", + "cfg-if 1.0.0", "clap", + "derive_more", "futures", "http", "humantime-serde", @@ -1256,7 +1673,10 @@ dependencies = [ "iptables", "json-patch", "libc", + "log 0.4.14", + "md5", "paw", + "rs-tproxy-plugin", "serde", "serde_derive", "serde_json", @@ -1270,6 +1690,7 @@ dependencies = [ "tracing-futures", "tracing-subscriber", "uuid", + "wasmer-runtime", "wildmatch", ] @@ -1279,6 +1700,24 @@ version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dcf128d1287d2ea9d80910b5f1120d0b8eede3fbf1abe91c40d39ea7d51e6fda" +[[package]] +name = "rustc_version" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" +dependencies = [ + "semver 0.9.0", +] + +[[package]] +name = "rustc_version" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0dfe2087c51c460008730de8b57e6a320782fbfb312e1f4d520e6c6fae155ee" +dependencies = [ + "semver 0.11.0", +] + [[package]] name = "ryu" version = "1.0.5" @@ -1291,6 +1730,39 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +[[package]] +name = "semver" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403" +dependencies = [ + "semver-parser 0.7.0", +] + +[[package]] +name = "semver" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f301af10236f6df4160f7c3f04eec6dbc70ace82d23326abad5edee88801c6b6" +dependencies = [ + "semver-parser 0.10.2", +] + +[[package]] +name = "semver-parser" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" + +[[package]] +name = "semver-parser" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0bef5b7f9e0df16536d3961cfb6e84331c065b4066afb39768d0e319411f7" +dependencies = [ + "pest", +] + [[package]] name = "serde" version = "1.0.129" @@ -1300,6 +1772,25 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-bench" +version = "0.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d733da87e79faaac25616e33d26299a41143fd4cd42746cbb0e91d8feea243fd" +dependencies = [ + "byteorder", + "serde", +] + +[[package]] +name = "serde_bytes" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16ae07dd2f88a366f15bd0632ba725227018c69a1c8550a927324f8eb8368bb9" +dependencies = [ + "serde", +] + [[package]] name = "serde_derive" version = "1.0.129" @@ -1382,7 +1873,7 @@ version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", "winapi 0.3.9", ] @@ -1428,6 +1919,12 @@ dependencies = [ "syn", ] +[[package]] +name = "subtle" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" + [[package]] name = "syn" version = "1.0.75" @@ -1493,16 +1990,22 @@ name = "system_gateway" version = "0.1.0" source = "git+https://github.com/aruntomar/system_gateway#ec345e52f1525b5b3e453d269414642b65299618" +[[package]] +name = "target-lexicon" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab0e7238dcc7b40a7be719a25365910f6807bd864f4cce6b2e6b873658e2b19d" + [[package]] name = "tempfile" version = "3.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", "rand", - "redox_syscall", + "redox_syscall 0.2.10", "remove_dir_all", "winapi 0.3.9", ] @@ -1523,7 +2026,7 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b114ece25254e97bf48dd4bfc2a12bad0647adacfe4cae1247a9ca6ad302cec" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "proc-macro2", "quote", "syn", @@ -1538,7 +2041,7 @@ dependencies = [ "async-trait", "bincode", "bytes", - "cfg-if", + "cfg-if 1.0.0", "clap", "default-net", "futures", @@ -1579,6 +2082,26 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "thiserror" +version = "1.0.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "854babe52e4df1653706b98fcfc05843010039b406875930a70e4d9644e5c417" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa32fd3f627f367fe16f893e2597ae3c05020f8bba2666a4e6ea73d377e5714b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "thread_local" version = "1.1.3" @@ -1601,7 +2124,7 @@ dependencies = [ "mio", "num_cpus", "once_cell", - "parking_lot", + "parking_lot 0.11.1", "pin-project-lite", "signal-hook-registry", "tokio-macros", @@ -1645,7 +2168,7 @@ version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -1739,6 +2262,18 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" +[[package]] +name = "typenum" +version = "1.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b63708a265f51345575b27fe43f9500ad611579e764c79edbc2037b1121959ec" + +[[package]] +name = "ucd-trie" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56dee185309b50d1f11bfedef0fe6d036842e3fb77413abef29f8f8d1c5d4c1c" + [[package]] name = "unicode-segmentation" version = "1.8.0" @@ -1785,6 +2320,12 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe" +[[package]] +name = "void" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" + [[package]] name = "want" version = "0.3.0" @@ -1801,6 +2342,119 @@ version = "0.10.2+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" +[[package]] +name = "wasmer-clif-backend" +version = "0.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a2fae69b1c7429316cad6743f3d2ca83cf8957924c477c5a4eff036ec0097a9" +dependencies = [ + "byteorder", + "cranelift-codegen", + "cranelift-entity", + "cranelift-native", + "libc", + "nix 0.15.0", + "rayon", + "serde", + "serde-bench", + "serde_bytes", + "serde_derive", + "target-lexicon", + "wasmer-clif-fork-frontend", + "wasmer-clif-fork-wasm", + "wasmer-runtime-core", + "wasmer-win-exception-handler", + "wasmparser", + "winapi 0.3.9", +] + +[[package]] +name = "wasmer-clif-fork-frontend" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c23f2824f354a00a77e4b040eef6e1d4c595a8a3e9013bad65199cc8dade9a5a" +dependencies = [ + "cranelift-codegen", + "log 0.4.14", + "smallvec", + "target-lexicon", +] + +[[package]] +name = "wasmer-clif-fork-wasm" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a35e21d3aebc51cc6ebc0e830cf8458a9891c3482fb3c65ad18d408102929ae5" +dependencies = [ + "cranelift-codegen", + "cranelift-entity", + "log 0.4.14", + "thiserror", + "wasmer-clif-fork-frontend", + "wasmparser", +] + +[[package]] +name = "wasmer-runtime" +version = "0.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c92a9ae96b193c35c47fc829265198322cf980edc353a9de32bc87a1545d44f3" +dependencies = [ + "lazy_static", + "memmap", + "serde", + "serde_derive", + "wasmer-clif-backend", + "wasmer-runtime-core", +] + +[[package]] +name = "wasmer-runtime-core" +version = "0.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "740161245998752cf1a567e860fd6355df0336fedca6be1940ec7aaa59643220" +dependencies = [ + "bincode", + "blake3", + "cc", + "digest 0.8.1", + "errno", + "hex", + "indexmap", + "lazy_static", + "libc", + "nix 0.15.0", + "page_size", + "parking_lot 0.10.2", + "rustc_version 0.2.3", + "serde", + "serde-bench", + "serde_bytes", + "serde_derive", + "smallvec", + "target-lexicon", + "wasmparser", + "winapi 0.3.9", +] + +[[package]] +name = "wasmer-win-exception-handler" +version = "0.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cd39f3b2bd7964b28ea6f944a7eaa445cfbc91c4f2695d188103f2689bb37d9" +dependencies = [ + "cc", + "libc", + "wasmer-runtime-core", + "winapi 0.3.9", +] + +[[package]] +name = "wasmparser" +version = "0.51.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aeb1956b19469d1c5e63e459d29e7b5aa0f558d9f16fcef09736f8a265e6c10a" + [[package]] name = "wildmatch" version = "2.1.0" diff --git a/Cargo.toml b/Cargo.toml index b06bd18..abac93c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,36 +1,52 @@ [package] -authors = ["Andrewmatilde ", "Hexilee "] +authors = [ + "Andrewmatilde ", + "Hexilee ", +] edition = "2018" name = "rs-tproxy" -version = "0.1.1" +version = "0.4.1" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html -[[bin]] -name = "rs-tproxy" -path = "rs-tproxy-controller/src/main.rs" - [workspace] -members = ["rs-tproxy-controller", "rs-tproxy-proxy", "tests"] +members = [ + "rs-tproxy-controller", + "rs-tproxy-proxy", + "rs-tproxy-plugin", + "examples/plugin-example", + "tests", +] [dependencies] +rs-tproxy-proxy = { path = "./rs-tproxy-proxy" } +rs-tproxy-controller = { path = "./rs-tproxy-controller" } + anyhow = "1.0" clap = "2.33.3" futures = "0.3.10" http = "0.2.3" humantime-serde = "1.0" -hyper = {git = "https://github.com/Andrewmatilde/hyper.git", features = ["runtime", "client", "server", "http1", "http2", "stream", "error_return"]} +hyper = { git = "https://github.com/Andrewmatilde/hyper.git", features = [ + "runtime", + "client", + "server", + "http1", + "http2", + "stream", + "error_return", +] } iptables = "0.4" -libc = {version = "0.2.81", features = ["std"]} +libc = { version = "0.2.81", features = ["std"] } paw = "1.0" -serde = {version = "1.0", features = ["derive"]} +serde = { version = "1.0", features = ["derive"] } serde_derive = "1.0.123" serde_json = "1.0" serde_urlencoded = "0.7" serde_yaml = "0.8" socket2 = "0.3" -structopt = {version = "0.3", features = ["paw"]} -tokio = {version = "1.4", features = ["full"]} +structopt = { version = "0.3", features = ["paw"] } +tokio = { version = "1.4", features = ["full"] } wildmatch = "2.1" tracing = "0.1" tracing-futures = "0.2" @@ -38,13 +54,13 @@ tracing-subscriber = "0.2" json-patch = "0.2.6" async-trait = "0.1.50" bytes = "1.0.1" -rs-tproxy-proxy = {path = "./rs-tproxy-proxy"} uuid = { version = "0.8", features = ["serde", "v4"] } pnet = "0.28.0" bincode = "1.3.3" default-net = "0.2.0" -system_gateway = {git="https://github.com/aruntomar/system_gateway"} +system_gateway = { git = "https://github.com/aruntomar/system_gateway" } base64 = "0.13.0" +log = "0.4" [dev-dependencies] test-case = "1.2" diff --git a/Dockerfile b/Dockerfile index 53b4ca1..5535d3c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -36,7 +36,7 @@ WORKDIR /tproxy-build RUN --mount=type=cache,target=/tproxy-build/target \ --mount=type=cache,target=/root/.cargo/registry \ - cargo build --release --all --target $TARGET + cargo build --release --bin rs-tproxy --target $TARGET RUN --mount=type=cache,target=/tproxy-build/target \ cp /tproxy-build/target/$TARGET/release/rs-tproxy /tproxy diff --git a/Makefile b/Makefile index c62c0f1..2330a8b 100644 --- a/Makefile +++ b/Makefile @@ -1,10 +1,14 @@ build: - cargo build --all + cargo build --workspace fmt: - cargo +nightly fmt + cargo fmt run: - RUST_LOG=trace ./target/debug/tproxy $(config) -test: - cargo test --all + ./target/debug/tproxy -vvv $(config) +test: build + cargo test -p rs-tproxy -p rs-tproxy-proxy -p rs-tproxy-plugin -p rs-tproxy-controller lint: - cargo clippy --all-targets -- -D warnings \ No newline at end of file + cargo clippy --all-targets -- -D warnings +clean: + cargo clean +image: + DOCKER_BUILDKIT=1 docker build --build-arg HTTP_PROXY=${HTTP_PROXY} --build-arg HTTPS_PROXY=${HTTPS_PROXY} . -t chaos-mesh/tproxy diff --git a/config-examples/abort_by_path.yaml b/config-examples/abort_by_path.yaml index ab4349b..d13b9b9 100644 --- a/config-examples/abort_by_path.yaml +++ b/config-examples/abort_by_path.yaml @@ -1,4 +1,4 @@ -proxy_ports: [80, 8080, 30086] # proxy will do nothing if empty +proxy_ports: [80, 8080, 30086] # proxy for all ports bigger than 1024 if empty rules: - target: Request selector: diff --git a/config-examples/json_example.json b/config-examples/json_example.json index b7152de..5f6290b 100644 --- a/config-examples/json_example.json +++ b/config-examples/json_example.json @@ -1,8 +1,9 @@ { - "listen_port": 58080, - "proxy_ports": [80, 8080, 30086], - "proxy_mark": 1, - "ignore_mark": 255, + "proxy_ports": [ + 80, + 8080, + 30086 + ], "rules": [ { "target": "Request", @@ -22,8 +23,14 @@ }, "patch": { "queries": [ - ["foo", "bar"], - ["foo", "other"] + [ + "foo", + "bar" + ], + [ + "foo", + "other" + ] ], "body": { "contents": { @@ -46,4 +53,4 @@ } } ] -} +} \ No newline at end of file diff --git a/config-examples/one-line.json b/config-examples/one-line.json deleted file mode 100644 index 4fff930..0000000 --- a/config-examples/one-line.json +++ /dev/null @@ -1 +0,0 @@ -{"rules": [{"target": "Request", "selector": {"path": "/src", "method": "GET"},"actions": {"abort": false}}]} diff --git a/config-examples/yaml_example.yaml b/config-examples/yaml_example.yaml index 06e0a0c..b49c85f 100644 --- a/config-examples/yaml_example.yaml +++ b/config-examples/yaml_example.yaml @@ -1,7 +1,5 @@ -listen_port: 58080 # optional -proxy_ports: [80, 443, 8080] # proxy will do nothing if empty -proxy_mark: 1 # optional -ignore_mark: 255 # optional +proxy_ports: [80, 443, 8080] # proxy for all ports bigger than 1024 if empty +plugin_path: / # the directory path to load plugin files (xxx.wasm), `/etc/rs-tproxy/plugins` by default rules: - target: Request selector: @@ -12,7 +10,6 @@ rules: delay: 10s replace: body: - update_content_length: false # true by default contents: type: TEXT value: '{"name": "Chaos Mesh", "message": "Hello!"}' @@ -24,10 +21,11 @@ rules: - [foo, bar] - [foo, other] body: - update_content_length: false # true by default contents: type: JSON value: '{"message": "Hi!"}' + plugins: + - xxx # plugin name, load from file `/xxx.wasm` - target: Response selector: diff --git a/examples/plugin-example/.cargo/config b/examples/plugin-example/.cargo/config new file mode 100644 index 0000000..f4e8c00 --- /dev/null +++ b/examples/plugin-example/.cargo/config @@ -0,0 +1,2 @@ +[build] +target = "wasm32-unknown-unknown" diff --git a/examples/plugin-example/Cargo.toml b/examples/plugin-example/Cargo.toml new file mode 100644 index 0000000..7f07844 --- /dev/null +++ b/examples/plugin-example/Cargo.toml @@ -0,0 +1,20 @@ +cargo-features = ["per-package-target"] + +[package] +name = "plugin-example" +version = "0.1.0" +edition = "2018" +forced-target = "wasm32-unknown-unknown" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[lib] +crate-type = ["cdylib"] + +[dependencies] +rs-tproxy-plugin = { path = "../../rs-tproxy-plugin" } + +serde_json = "1.0" +log = "0.4" +anyhow = "1.0" +http = "0.2" \ No newline at end of file diff --git a/examples/plugin-example/src/lib.rs b/examples/plugin-example/src/lib.rs new file mode 100644 index 0000000..5e74f21 --- /dev/null +++ b/examples/plugin-example/src/lib.rs @@ -0,0 +1,15 @@ +use log::info; +use rs_tproxy_plugin::register_response_handler; + +register_response_handler!(|resp| { + let content_type = resp + .headers() + .get("content-type") + .ok_or(anyhow::anyhow!("content-type not found"))? + .to_str()?; + info!("get content-type: {}", content_type); + Ok(serde_json::to_vec(&serde_json::json!({ + "type": content_type, + "content": *resp.body(), + }))?) +}); diff --git a/rs-tproxy-controller/Cargo.toml b/rs-tproxy-controller/Cargo.toml index 2f30d91..30769ea 100644 --- a/rs-tproxy-controller/Cargo.toml +++ b/rs-tproxy-controller/Cargo.toml @@ -1,34 +1,33 @@ [package] name = "rs-tproxy-controller" -version = "0.1.0" +version = "0.4.1" edition = "2018" -[[bin]] -name = "rs-tproxy-controller-bin" -path = "src/main.rs" - -[lib] -name = "rs_tproxy_controller_lib" -path = "src/lib.rs" - [dependencies] anyhow = "1.0" clap = "2.33.3" futures = "0.3.10" http = "0.2.3" humantime-serde = "1.0" -hyper = {version = "0.14.4", features = ["runtime", "client", "server", "http1", "http2", "stream"]} +hyper = { version = "0.14.4", features = [ + "runtime", + "client", + "server", + "http1", + "http2", + "stream", +] } iptables = "0.4" -libc = {version = "0.2.81", features = ["std"]} +libc = { version = "0.2.81", features = ["std"] } paw = "1.0" -serde = {version = "1.0", features = ["derive"]} +serde = { version = "1.0", features = ["derive"] } serde_derive = "1.0.123" serde_json = "1.0" serde_urlencoded = "0.7" serde_yaml = "0.8" socket2 = "0.3" -structopt = {version = "0.3", features = ["paw"]} -tokio = {version = "1.4", features = ["full"]} +structopt = { version = "0.3", features = ["paw"] } +tokio = { version = "1.4", features = ["full"] } wildmatch = "2.1" tracing = "0.1" tracing-futures = "0.2" @@ -40,8 +39,12 @@ cfg-if = "1.0.0" bincode = "1.3.3" tempfile = "3.2.0" uuid = { version = "0.8", features = ["serde", "v4"] } -futures-util = { version = "0.3.7", default-features = false, features = ["alloc", "sink"] } -rs-tproxy-proxy = {path = "../rs-tproxy-proxy"} +futures-util = { version = "0.3.7", default-features = false, features = [ + "alloc", + "sink", +] } +rs-tproxy-proxy = { path = "../rs-tproxy-proxy" } pnet = "0.28.0" default-net = "0.2.0" -system_gateway = {git="https://github.com/aruntomar/system_gateway"} \ No newline at end of file +system_gateway = { git = "https://github.com/aruntomar/system_gateway" } +log = "0.4" diff --git a/rs-tproxy-controller/src/cmd/mod.rs b/rs-tproxy-controller/src/cmd.rs similarity index 100% rename from rs-tproxy-controller/src/cmd/mod.rs rename to rs-tproxy-controller/src/cmd.rs diff --git a/rs-tproxy-controller/src/cmd/command_line.rs b/rs-tproxy-controller/src/cmd/command_line.rs index d47f194..5d9f15f 100644 --- a/rs-tproxy-controller/src/cmd/command_line.rs +++ b/rs-tproxy-controller/src/cmd/command_line.rs @@ -2,9 +2,9 @@ use std::convert::TryInto; use std::path::PathBuf; use anyhow::{anyhow, Result}; +use log::Level; use structopt::StructOpt; use tokio::fs::read_to_string; -use tracing_subscriber::filter::LevelFilter; use crate::proxy::config::Config; use crate::raw_config::RawConfig; @@ -36,12 +36,12 @@ pub struct Opt { } impl Opt { - pub fn get_level_filter(&self) -> LevelFilter { + pub fn get_level(&self) -> Level { match self.verbose { - 0 => LevelFilter::ERROR, - 1 => LevelFilter::INFO, - 2 => LevelFilter::DEBUG, - _ => LevelFilter::TRACE, + 0 => Level::Error, + 1 => Level::Info, + 2 => Level::Debug, + _ => Level::Trace, } } diff --git a/rs-tproxy-controller/src/cmd/daemon/mod.rs b/rs-tproxy-controller/src/cmd/daemon.rs similarity index 100% rename from rs-tproxy-controller/src/cmd/daemon/mod.rs rename to rs-tproxy-controller/src/cmd/daemon.rs diff --git a/rs-tproxy-controller/src/cmd/interactive/mod.rs b/rs-tproxy-controller/src/cmd/interactive.rs similarity index 100% rename from rs-tproxy-controller/src/cmd/interactive/mod.rs rename to rs-tproxy-controller/src/cmd/interactive.rs diff --git a/rs-tproxy-controller/src/cmd/interactive/handler.rs b/rs-tproxy-controller/src/cmd/interactive/handler.rs index 419b476..26317e3 100644 --- a/rs-tproxy-controller/src/cmd/interactive/handler.rs +++ b/rs-tproxy-controller/src/cmd/interactive/handler.rs @@ -96,7 +96,7 @@ impl ConfigService { raw_config.try_into() } - #[instrument] + #[instrument(skip(proxy))] async fn handle(proxy: &mut Proxy, request: Request) -> anyhow::Result> { if request.method() != Method::PUT { return Ok(Response::builder() diff --git a/rs-tproxy-controller/src/lib.rs b/rs-tproxy-controller/src/lib.rs index ab1c578..46e0c17 100644 --- a/rs-tproxy-controller/src/lib.rs +++ b/rs-tproxy-controller/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(exit_status_error)] + pub mod cmd; pub mod proxy; pub mod raw_config; diff --git a/rs-tproxy-controller/src/proxy/mod.rs b/rs-tproxy-controller/src/proxy.rs similarity index 68% rename from rs-tproxy-controller/src/proxy/mod.rs rename to rs-tproxy-controller/src/proxy.rs index f8c8306..639d0b7 100644 --- a/rs-tproxy-controller/src/proxy/mod.rs +++ b/rs-tproxy-controller/src/proxy.rs @@ -1,4 +1,4 @@ pub mod config; +pub mod controller; pub mod exec; pub mod net; -pub mod uds_server; diff --git a/rs-tproxy-controller/src/proxy/config.rs b/rs-tproxy-controller/src/proxy/config.rs index 961120a..e7c9fd5 100644 --- a/rs-tproxy-controller/src/proxy/config.rs +++ b/rs-tproxy-controller/src/proxy/config.rs @@ -32,6 +32,7 @@ impl TryFrom for Config { Some(rules) => rules, None => vec![], }, + plugin_path: raw.plugin_path, }, }) } @@ -76,6 +77,7 @@ mod tests { safe_mode: None, interface: None, rules: None, + plugin_path: None, listen_port: None, proxy_mark: None, @@ -92,7 +94,8 @@ mod tests { listen_port: get_free_port(None).unwrap(), safe_mode: false, interface: None, - rules: vec![] + rules: vec![], + plugin_path: None, } } ); @@ -102,6 +105,7 @@ mod tests { safe_mode: Some(true), interface: Some("ens33".parse().unwrap()), rules: None, + plugin_path: None, listen_port: None, proxy_mark: None, @@ -118,7 +122,8 @@ mod tests { listen_port: 1027u16, safe_mode: true, interface: Some("ens33".parse().unwrap()), - rules: vec![] + rules: vec![], + plugin_path: None, } } ); diff --git a/rs-tproxy-controller/src/proxy/controller.rs b/rs-tproxy-controller/src/proxy/controller.rs new file mode 100644 index 0000000..263632d --- /dev/null +++ b/rs-tproxy-controller/src/proxy/controller.rs @@ -0,0 +1,91 @@ +use std::future::Future; +use std::io; +use std::path::PathBuf; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use anyhow::Result; +use http::{Method, Request}; +use hyper::client::connect::{Connected, Connection}; +use hyper::client::Client; +use hyper::service::Service; +use hyper::{Body, Uri}; +use rs_tproxy_proxy::raw_config::RawConfig as ProxyRawConfig; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tokio::net::UnixStream; +use tracing::debug; + +#[derive(Debug, Clone)] +struct UnixConnect(PathBuf); + +#[derive(Debug)] +struct UnixConnection(UnixStream); + +impl Service for UnixConnect { + type Response = UnixConnection; + type Error = anyhow::Error; + type Future = Pin>>>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, _: Uri) -> Self::Future { + let path = self.0.clone(); + Box::pin(async move { Ok(UnixConnection(UnixStream::connect(path).await?)) }) + } +} + +pub async fn send_config(path: impl Into, config: &ProxyRawConfig) -> Result<()> { + let p = path.into(); + let client = Client::builder().build(UnixConnect(p)); + let request = Request::builder() + .uri("http://ignore/") + .method(Method::PUT) + .body(Body::from(serde_json::to_vec(config)?))?; + let resp = client.request(request).await?; + debug!("config({:?}) is sent", config); + if !resp.status().is_success() { + return Err(anyhow::anyhow!( + "fail to send config: status({})", + resp.status() + )); + } + Ok(()) +} + +impl Connection for UnixConnection { + fn connected(&self) -> Connected { + Connected::new() + } +} + +impl AsyncRead for UnixConnection { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut self.0).poll_read(cx, buf) + } +} + +impl AsyncWrite for UnixConnection { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.0).poll_write(cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.0).poll_flush(cx) + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.0).poll_shutdown(cx) + } +} diff --git a/rs-tproxy-controller/src/proxy/exec.rs b/rs-tproxy-controller/src/proxy/exec.rs index 01bbe51..2a0f60a 100644 --- a/rs-tproxy-controller/src/proxy/exec.rs +++ b/rs-tproxy-controller/src/proxy/exec.rs @@ -1,18 +1,16 @@ use std::env; use std::path::PathBuf; -use std::process::Stdio; +use std::process::{ExitStatus, Stdio}; -use anyhow::Error; use rs_tproxy_proxy::raw_config::RawConfig as ProxyRawConfig; +use rs_tproxy_proxy::task::Task; use tokio::process::Command; -use tokio::select; -use tokio::sync::oneshot::{channel, Receiver, Sender}; -use tokio::task::JoinHandle; +use tracing::instrument; use uuid::Uuid; -use crate::proxy::net::bridge::NetEnv; -use crate::proxy::net::set_net::set_net; -use crate::proxy::uds_server::UdsDataServer; +use super::controller::send_config; +use super::net::bridge::NetEnv; +use super::net::set_net::set_net; #[derive(Debug, Clone)] pub struct ProxyOpt { @@ -30,9 +28,8 @@ impl ProxyOpt { pub struct Proxy { pub opt: ProxyOpt, pub net_env: NetEnv, - pub sender: Option>, - pub rx: Option>, - pub task: Option>>, + pub proxy_ports: Option, + pub task: Option>, } impl Proxy { @@ -42,29 +39,17 @@ impl Proxy { .with_extension("sock"); let opt = ProxyOpt::new(uds_path, verbose); - let (sender, rx) = channel(); Self { opt, net_env: NetEnv::new(), - sender: Some(sender), - rx: Some(rx), + proxy_ports: None, task: None, } } - pub async fn exec(&mut self, config: ProxyRawConfig) -> anyhow::Result<()> { - tracing::info!(target : "transferring proxy raw config ", "{:?}" ,&config); - let uds_server = UdsDataServer::new(config.clone(), self.opt.ipc_path.clone()); - let listener = uds_server.bind()?; - - let server = uds_server; - tokio::spawn(async move { - let _ = server - .listen(listener) - .await - .map_err(|e| tracing::error!("{:?}", e)); - }); - + #[instrument(skip(self, config))] + pub async fn start(&mut self, config: ProxyRawConfig) -> anyhow::Result<()> { + tracing::info!("transferring proxy raw config {:?}", &config); let opt = self.opt.clone(); let exe_path = match std::env::current_exe() { Err(e) => { @@ -76,16 +61,16 @@ impl Proxy { Ok(path) => path, }; - tracing::info!(target: "Network device name", "{}", self.net_env.device.clone()); + tracing::info!("network device name {}", self.net_env.device.clone()); match config.interface { None => {} - Some(interface) => { - self.net_env.set_ip_with_interface_name(&interface)?; + Some(ref interface) => { + self.net_env.set_ip_with_interface_name(interface)?; } } set_net( &self.net_env, - config.proxy_ports, + config.proxy_ports.as_ref(), config.listen_port, config.safe_mode, )?; @@ -98,64 +83,47 @@ impl Proxy { .arg(exe_path) .arg(format!( "-{}", - String::from_utf8(vec![b'v'; self.opt.verbose as usize]).unwrap() + String::from_utf8(vec![b'v'; self.opt.verbose as usize])? )) .arg("--proxy") - .arg(format!("--ipc-path={}", opt.ipc_path.to_str().unwrap())); + .arg(format!("--ipc-path={}", opt.ipc_path.to_string_lossy())); + tracing::info!("starting proxy"); + let mut process = match proxy.stdin(Stdio::piped()).spawn() { + Ok(process) => { + tracing::info!("proxy is running"); + process + } + Err(e) => { + return Err(anyhow::anyhow!("failed to start sub proxy: {}", e)); + } + }; - let rx = self.rx.take().unwrap(); - self.task = Some(tokio::spawn(async move { - tracing::info!(target : "Proxy executor", "Starting proxy."); - let mut process = match proxy.stdin(Stdio::piped()).spawn() { - Ok(process) => { - tracing::info!(target : "Proxy executor", "Proxy is running."); - process - } - Err(e) => { - return Err(anyhow::anyhow!("failed to exec sub proxy : {:?}", e)); - } - }; - select! { - _ = process.wait() => {} - _ = rx => { - tracing::info!(target : "Proxy executor","killing sub process"); - let id = process.id().unwrap() as i32; - unsafe { - libc::kill(id, libc::SIGINT); - } - } - }; - Ok(()) - })); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + send_config(&self.opt.ipc_path, &config).await?; + self.task = Some(Task::start(async move { Ok(process.wait().await?) })); + self.proxy_ports = config.proxy_ports; Ok(()) } pub async fn stop(&mut self) -> anyhow::Result<()> { + let _ = self.net_env.clear_bridge(); if let Some(task) = self.task.take() { - if let Some(sender) = self.sender.take() { - let _ = sender.send(()); - }; - let _ = self.net_env.clear_bridge(); - let _ = task.await?; + if let Some(status) = task.stop().await? { + status.exit_ok()?; + } } Ok(()) } pub async fn reload(&mut self, config: ProxyRawConfig) -> anyhow::Result<()> { - self.stop().await?; if self.task.is_none() { - let mut new = Self::new(self.opt.verbose); - self.opt = new.opt; - self.sender = new.sender.take(); - self.rx = new.rx.take(); + self.start(config).await?; + } else if self.proxy_ports == config.proxy_ports { + send_config(&self.opt.ipc_path, &config).await?; + } else { + self.stop().await?; + self.start(config).await?; } - - return match self.exec(config).await { - Err(e) => { - self.net_env.clear_bridge()?; - Err(e) - } - Ok(_) => Ok(()), - }; + Ok(()) } } diff --git a/rs-tproxy-controller/src/proxy/net/mod.rs b/rs-tproxy-controller/src/proxy/net.rs similarity index 100% rename from rs-tproxy-controller/src/proxy/net/mod.rs rename to rs-tproxy-controller/src/proxy/net.rs diff --git a/rs-tproxy-controller/src/proxy/net/bridge.rs b/rs-tproxy-controller/src/proxy/net/bridge.rs index 1e062b2..c513453 100644 --- a/rs-tproxy-controller/src/proxy/net/bridge.rs +++ b/rs-tproxy-controller/src/proxy/net/bridge.rs @@ -2,6 +2,7 @@ use std::process::Command; use anyhow::{anyhow, Result}; use default_net; +use log::debug; use pnet::datalink::NetworkInterface; use pnet::ipnetwork::{IpNetwork, Ipv4Network}; use uuid::Uuid; @@ -189,6 +190,7 @@ impl NetEnv { bash_c(&remove_store), clear_ebtables(), ]; + debug!("clear bridge, scripts: {:?}", cmdvv); execute_all_with_log_error(cmdvv)?; Ok(()) } diff --git a/rs-tproxy-controller/src/proxy/net/set_net.rs b/rs-tproxy-controller/src/proxy/net/set_net.rs index 4f0c5b4..fc8de71 100644 --- a/rs-tproxy-controller/src/proxy/net/set_net.rs +++ b/rs-tproxy-controller/src/proxy/net/set_net.rs @@ -6,7 +6,7 @@ use crate::proxy::net::iptables::{set_iptables, set_iptables_safe}; #[cfg(target_os = "linux")] pub fn set_net( net_env: &NetEnv, - proxy_ports: Option, + proxy_ports: Option<&String>, listen_port: u16, safe: bool, ) -> anyhow::Result<()> { @@ -16,7 +16,7 @@ pub fn set_net( let device_interface = get_interface(net_env.veth4.clone()).unwrap(); let device_mac = device_interface.mac.unwrap().to_string(); - if let Some(ref proxy_ports) = proxy_ports { + if let Some(proxy_ports) = proxy_ports { execute_all(set_iptables(net_env, Some(proxy_ports), &port, &device_mac))?; } else { execute_all(set_iptables(net_env, None, &port, &device_mac))?; diff --git a/rs-tproxy-controller/src/proxy/uds_server.rs b/rs-tproxy-controller/src/proxy/uds_server.rs deleted file mode 100644 index 82dcd4e..0000000 --- a/rs-tproxy-controller/src/proxy/uds_server.rs +++ /dev/null @@ -1,57 +0,0 @@ -use std::path::PathBuf; - -use tokio::io::AsyncWriteExt; -use tokio::net::UnixListener; - -#[derive(Debug, Clone)] -pub struct UdsDataServer { - pub data: T, - pub path: PathBuf, -} - -impl UdsDataServer { - pub fn new(data: T, path: PathBuf) -> Self { - Self { data, path } - } - - pub fn bind(&self) -> anyhow::Result { - tracing::info!(target : "Uds listener try binding", "{:?}", &self.path); - let listener = UnixListener::bind(self.path.clone())?; - Ok(listener) - } - - pub fn clear(&self) -> anyhow::Result<()> { - std::fs::remove_file(&self.path)?; - Ok(()) - } - - pub async fn listen(&self, listener: UnixListener) -> anyhow::Result<()> { - tracing::info!(target : "Uds listener listening on", "{:?}", &self.path); - loop { - match (&listener).accept().await { - Ok((mut stream, addr)) => { - let buf = bincode::serialize(&self.data)?; - tokio::spawn(async move { - return match stream.write_all(buf.as_slice()).await { - Ok(_) => { - tracing::info!(target : "Uds server" ,"Config successfully transferred."); - Ok(()) - } - Err(e) => { - tracing::error!( - "error : write_all raw config to {:?} failed", - addr - ); - Err(anyhow::anyhow!("{}", e)) - } - }; - }); - } - Err(e) => { - tracing::error!("error : accept connection failed"); - return Err(anyhow::anyhow!("{}", e)); - } - } - } - } -} diff --git a/rs-tproxy-controller/src/raw_config.rs b/rs-tproxy-controller/src/raw_config.rs index b2d0a4c..3144487 100644 --- a/rs-tproxy-controller/src/raw_config.rs +++ b/rs-tproxy-controller/src/raw_config.rs @@ -8,6 +8,7 @@ pub struct RawConfig { pub safe_mode: Option, pub interface: Option, pub rules: Option>, + pub plugin_path: Option, // Useless options now. Keep these options for upward compatible. pub listen_port: Option, diff --git a/rs-tproxy-plugin/Cargo.toml b/rs-tproxy-plugin/Cargo.toml new file mode 100644 index 0000000..7b67f85 --- /dev/null +++ b/rs-tproxy-plugin/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "rs-tproxy-plugin" +version = "0.4.1" +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +anyhow = "1.0" +http = "0.2.3" +log = "0.4" diff --git a/rs-tproxy-plugin/src/handler.rs b/rs-tproxy-plugin/src/handler.rs new file mode 100644 index 0000000..9391ffb --- /dev/null +++ b/rs-tproxy-plugin/src/handler.rs @@ -0,0 +1,108 @@ +use http::{Request, Response}; +use log::error; + +use super::header::{RequestHeader, ResponseHeader}; +use super::logger::setup_logger; +use super::print::eprintln; + +#[macro_export] +macro_rules! register_request_handler { + ($func_name:ident) => { + #[no_mangle] + pub extern "C" fn handle_request(ptr: i64, header_len: i64, body_len: i64) { + $crate::call_request_handler(ptr, header_len, body_len, $func_name) + } + }; + (|$($arg:ident)*| $body:block) => { + #[no_mangle] + pub extern "C" fn handle_request(ptr: i64, header_len: i64, body_len: i64) { + $crate::call_request_handler(ptr, header_len, body_len, |$($arg)*| $body) + } + }; +} + +#[macro_export] +macro_rules! register_response_handler { + ($func_name:ident) => { + #[no_mangle] + pub extern "C" fn handle_response(ptr: i64, header_len: i64, body_len: i64) { + $crate::call_response_handler(ptr, header_len, body_len, $func_name) + } + }; + (|$($arg:ident)*| $body:block) => { + #[no_mangle] + pub extern "C" fn handle_response(ptr: i64, header_len: i64, body_len: i64) { + $crate::call_response_handler(ptr, header_len, body_len, |$($arg)*| $body) + } + }; +} + +pub fn call_response_handler(ptr: i64, header_len: i64, body_len: i64, handler: F) +where + F: Fn(Response<&[u8]>) -> anyhow::Result>, +{ + if let Err(err) = setup_logger() { + eprintln(format!("plugin fail to setup logger: {}", err)); + } + + let scope = || -> anyhow::Result<()> { + let resp = read_response(ptr, header_len, body_len)?; + write_body(handler(resp)?); + Ok(()) + }; + + if let Err(err) = scope() { + error!("fail to call response handler: {}", err) + } +} + +pub fn call_request_handler(ptr: i64, header_len: i64, body_len: i64, handler: F) +where + F: Fn(Request<&[u8]>) -> anyhow::Result>, +{ + if let Err(err) = setup_logger() { + eprintln(format!("plugin fail to setup logger: {}", err)); + } + + let scope = || -> anyhow::Result<()> { + let req = read_request(ptr, header_len, body_len)?; + write_body(handler(req)?); + Ok(()) + }; + + if let Err(err) = scope() { + error!("fail to call response handler: {}", err) + } +} + +pub fn read_request<'a>( + ptr: i64, + header_len: i64, + body_len: i64, +) -> anyhow::Result> { + let header = unsafe { std::slice::from_raw_parts(ptr as _, header_len as _) }; + let body: &[u8] = unsafe { std::slice::from_raw_parts((ptr + header_len) as _, body_len as _) }; + let req_header: RequestHeader = serde_json::from_slice(header)?; + req_header.build(body) +} + +pub fn read_response<'a>( + ptr: i64, + header_len: i64, + body_len: i64, +) -> anyhow::Result> { + let header = unsafe { std::slice::from_raw_parts(ptr as _, header_len as _) }; + let body: &[u8] = unsafe { std::slice::from_raw_parts((ptr + header_len) as _, body_len as _) }; + let resp_header: ResponseHeader = serde_json::from_slice(header)?; + resp_header.build(body) +} + +mod buildin { + extern "C" { + pub fn write_body(ptr: *const u8, len: usize); + } +} + +pub fn write_body(body: Vec) { + unsafe { buildin::write_body(body.as_ptr(), body.len()) }; +} diff --git a/rs-tproxy-plugin/src/header.rs b/rs-tproxy-plugin/src/header.rs new file mode 100644 index 0000000..d94b65e --- /dev/null +++ b/rs-tproxy-plugin/src/header.rs @@ -0,0 +1,98 @@ +use std::collections::HashMap; + +use http::{request, response, Request, Response, Version}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RequestHeader<'a> { + pub method: String, + pub uri: String, + pub version: String, + + #[serde(borrow)] + pub header_map: HashMap<&'a str, Vec>>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ResponseHeader<'a> { + pub status_code: u16, + pub version: String, + + #[serde(borrow)] + pub header_map: HashMap<&'a str, Vec>>, +} + +impl RequestHeader<'_> { + pub fn build(&self, body: T) -> anyhow::Result> { + let mut req_builder = Request::builder() + .method(self.method.as_str()) + .uri(self.uri.as_str()) + .version(parse_version(&self.version)?); + for (k, list) in self.header_map.iter() { + for v in list { + req_builder = req_builder.header(*k, v.as_slice()) + } + } + Ok(req_builder.body(body)?) + } +} + +impl ResponseHeader<'_> { + pub fn build(&self, body: T) -> anyhow::Result> { + let mut resp_builder = Response::builder() + .status(self.status_code) + .version(parse_version(&self.version)?); + for (k, list) in self.header_map.iter() { + for v in list { + resp_builder = resp_builder.header(*k, v.as_slice()) + } + } + Ok(resp_builder.body(body)?) + } +} + +impl<'a> From<&'a request::Parts> for RequestHeader<'a> { + fn from(parts: &'a request::Parts) -> Self { + Self { + method: parts.method.to_string(), + uri: parts.uri.to_string(), + version: format!("{:?}", parts.version), + header_map: make_header_map(&parts.headers), + } + } +} + +impl<'a> From<&'a response::Parts> for ResponseHeader<'a> { + fn from(parts: &'a response::Parts) -> Self { + Self { + status_code: parts.status.as_u16(), + version: format!("{:?}", parts.version), + header_map: make_header_map(&parts.headers), + } + } +} + +fn make_header_map(raw: &http::HeaderMap) -> HashMap<&'_ str, Vec>> { + let mut map = HashMap::<&str, Vec>>::new(); + for (name, value) in raw.into_iter() { + let key = name.as_str(); + match map.get_mut(key) { + Some(v) => v.push(value.as_bytes().to_owned()), + None => { + map.insert(key, vec![value.as_bytes().to_owned()]); + } + } + } + map +} + +fn parse_version(version: &str) -> anyhow::Result { + match version { + "HTTP/0.9" => Ok(Version::HTTP_09), + "HTTP/1.0" => Ok(Version::HTTP_10), + "HTTP/1.1" => Ok(Version::HTTP_11), + "HTTP/2.0" => Ok(Version::HTTP_2), + "HTTP/3.0" => Ok(Version::HTTP_3), + _ => Err(anyhow::anyhow!("unsupported http version: {}", version)), + } +} diff --git a/rs-tproxy-plugin/src/lib.rs b/rs-tproxy-plugin/src/lib.rs new file mode 100644 index 0000000..bc42bfa --- /dev/null +++ b/rs-tproxy-plugin/src/lib.rs @@ -0,0 +1,9 @@ +mod handler; +mod header; +mod logger; +mod print; + +pub use handler::{call_request_handler, call_response_handler}; +pub use header::{RequestHeader, ResponseHeader}; +pub use logger::{setup_logger, Metadata, Record}; +pub use print::{eprintln, println}; diff --git a/rs-tproxy-plugin/src/logger.rs b/rs-tproxy-plugin/src/logger.rs new file mode 100644 index 0000000..0b3f3a2 --- /dev/null +++ b/rs-tproxy-plugin/src/logger.rs @@ -0,0 +1,113 @@ +use std::convert::TryFrom; + +use log::{LevelFilter, Log, SetLoggerError}; +use serde::{Deserialize, Serialize}; + +mod buildin { + extern "C" { + pub fn log_enabled(ptr: *const u8, len: u32) -> i32; + pub fn log_log(ptr: *const u8, len: u32); + pub fn log_flush(); + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Metadata<'a> { + pub level: &'a str, + pub target: &'a str, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Record<'a> { + pub level: &'a str, + pub target: &'a str, + pub content: String, + pub mount_path: Option<&'a str>, + pub file: Option<&'a str>, + pub line: Option, +} + +static LOGGER: &dyn Log = &Logger(); +pub struct Logger(); + +pub fn setup_logger() -> Result<(), SetLoggerError> { + log::set_logger(LOGGER)?; + log::set_max_level(LevelFilter::Trace); + Ok(()) +} + +impl Log for Logger { + // log level cannot be set by plugin + fn enabled(&self, metadata: &log::Metadata) -> bool { + let meta: Metadata = metadata.into(); + let data = serde_json::to_vec(&meta).unwrap(); + unsafe { buildin::log_enabled(data.as_ptr(), data.len() as u32) != 0 } + } + + fn log(&self, record: &log::Record) { + if self.enabled(record.metadata()) { + let re: Record = record.into(); + let data = serde_json::to_vec(&re).unwrap(); + unsafe { buildin::log_log(data.as_ptr(), data.len() as u32) } + } + } + + /// Flushes any buffered records. + fn flush(&self) { + unsafe { buildin::log_flush() } + } +} + +impl<'a> Record<'a> { + pub fn build(&self, args: std::fmt::Arguments<'a>) -> anyhow::Result> { + Ok(log::Record::builder() + .level( + self.level + .parse() + .map_err(|err| anyhow::anyhow!("fail to parse level: {}", err))?, + ) + .target(self.target) + .args(args) + .module_path(self.mount_path) + .file(self.file) + .line(self.line) + .build()) + } +} + +impl<'a> From<&'a log::Metadata<'a>> for Metadata<'a> { + fn from(meta: &'a log::Metadata) -> Self { + Self { + level: meta.level().as_str(), + target: meta.target(), + } + } +} + +impl<'a> From<&'a log::Record<'a>> for Record<'a> { + fn from(record: &'a log::Record<'a>) -> Self { + Self { + level: record.level().as_str(), + target: record.target(), + content: record.args().to_string(), + mount_path: record.module_path(), + file: record.file(), + line: record.line(), + } + } +} + +impl<'a> TryFrom> for log::Metadata<'a> { + type Error = anyhow::Error; + + fn try_from(meta: Metadata<'a>) -> Result { + Ok(Self::builder() + .level( + meta.level + .parse() + .map_err(|err| anyhow::anyhow!("fail to parse level: {}", err))?, + ) + .target(meta.target) + .build()) + } +} diff --git a/rs-tproxy-plugin/src/print.rs b/rs-tproxy-plugin/src/print.rs new file mode 100644 index 0000000..543687b --- /dev/null +++ b/rs-tproxy-plugin/src/print.rs @@ -0,0 +1,16 @@ +mod buildin { + extern "C" { + pub fn println(ptr: *const u8, len: u32); + pub fn eprintln(ptr: *const u8, len: u32); + } +} + +pub fn println(str: impl AsRef) { + let data = str.as_ref().as_bytes(); + unsafe { buildin::println(data.as_ptr(), data.len() as u32) } +} + +pub fn eprintln(str: impl AsRef) { + let data = str.as_ref().as_bytes(); + unsafe { buildin::eprintln(data.as_ptr(), data.len() as u32) } +} diff --git a/rs-tproxy-proxy/Cargo.toml b/rs-tproxy-proxy/Cargo.toml index afd7760..20cc893 100644 --- a/rs-tproxy-proxy/Cargo.toml +++ b/rs-tproxy-proxy/Cargo.toml @@ -1,28 +1,38 @@ [package] name = "rs-tproxy-proxy" -version = "0.1.0" +version = "0.4.1" edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +rs-tproxy-plugin = { path = "../rs-tproxy-plugin" } + anyhow = "1.0" clap = "2.33.3" futures = "0.3.10" http = "0.2.3" humantime-serde = "1.0" -hyper = {git = "https://github.com/Andrewmatilde/hyper.git", features = ["runtime", "client", "server", "http1", "http2", "stream", "error_return"]} +hyper = { git = "https://github.com/Andrewmatilde/hyper.git", features = [ + "runtime", + "client", + "server", + "http1", + "http2", + "stream", + "error_return", +] } iptables = "0.4" -libc = {version = "0.2.81", features = ["std"]} +libc = { version = "0.2.81", features = ["std"] } paw = "1.0" -serde = {version = "1.0", features = ["derive"]} +serde = { version = "1.0", features = ["derive"] } serde_derive = "1.0.123" serde_json = "1.0" serde_urlencoded = "0.7" serde_yaml = "0.8" socket2 = "0.3" -structopt = {version = "0.3", features = ["paw"]} -tokio = {version = "1.4", features = ["full"]} +structopt = { version = "0.3", features = ["paw"] } +tokio = { version = "1.4", features = ["full"] } wildmatch = "2.1" tracing = "0.1" tracing-futures = "0.2" @@ -35,3 +45,7 @@ bincode = "1.3.3" tempfile = "3.2.0" uuid = { version = "0.8", features = ["serde", "v4"] } base64 = "0.13.0" +wasmer-runtime = "0.17" +log = "0.4" +md5 = "0.7" +derive_more = "0.99" diff --git a/rs-tproxy-proxy/src/controller.rs b/rs-tproxy-proxy/src/controller.rs new file mode 100644 index 0000000..49cf821 --- /dev/null +++ b/rs-tproxy-proxy/src/controller.rs @@ -0,0 +1,228 @@ +use std::collections::HashMap; +use std::convert::TryInto; +use std::future::Future; +use std::path::PathBuf; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use anyhow::Result; +use derive_more::{Deref, DerefMut}; +use futures::TryStreamExt; +use http::{Method, Request, Response, StatusCode}; +use hyper::server::conn::Http; +use hyper::service::Service; +use hyper::Body; +use tokio::fs::{metadata, read, read_dir}; +use tokio::net::UnixListener; +use tokio::sync::Mutex; +use tracing::{debug, instrument, trace}; + +use super::handler::http::plugin::Plugin; +use super::proxy::http::config::Config; +use super::proxy::http::server::HttpServer; +use super::raw_config::RawConfig; +use super::task::Task; + +const WASM_EXT: &str = ".wasm"; + +#[derive(Debug)] +pub struct CtrlServer { + uds_path: PathBuf, + service: CtrlService, + task: Option>, +} + +impl CtrlServer { + pub async fn build(path: impl Into) -> Result { + Ok(Self { + uds_path: path.into(), + service: Default::default(), + task: None, + }) + } + + pub async fn start(&mut self) -> Result<()> { + self.stop().await?; + let service = self.service.clone(); + let uds_path = self.uds_path.clone(); + self.task = Some(Task::start(async move { + let listener = UnixListener::bind(&uds_path)?; + service.serve(listener).await + })); + Ok(()) + } + + pub async fn stop(&mut self) -> Result<()> { + if let Some(task) = self.task.take() { + task.stop().await?; + } + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub struct CtrlService(Arc>>); + +#[derive(Debug, Default, Clone, Deref, DerefMut)] +pub struct PluginMap(HashMap); + +#[derive(Debug)] +struct ProxyGuard { + plugin_map: PluginMap, + task: Option>, +} + +impl ProxyGuard { + fn start(config: Config, plugin_map: PluginMap) -> Self { + let proxy = HttpServer::new(config, plugin_map.clone()); + Self { + plugin_map, + task: Some(Task::start(async move { + tracing::info!("proxy starting"); + proxy.serve().await + })), + } + } + + async fn stop(self) -> Result { + if let Some(task) = self.task { + task.stop().await?; + } + Ok(self.plugin_map) + } +} + +impl PluginMap { + pub fn must_get(&self, name: &str) -> Result<&Plugin> { + self.get(name) + .ok_or_else(|| anyhow::anyhow!("plugin `{}` not found", name)) + } + + async fn load_plugins(&mut self, plugin_path: &str) -> Result<()> { + match metadata(plugin_path).await { + Ok(meta) if meta.is_dir() => (), + _ => return Ok(()), + } + + debug!("ready to load plugins in path({})", plugin_path); + let mut dir = read_dir(&plugin_path).await?; + while let Some(entry) = dir.next_entry().await? { + trace!("read entry: {:?}", entry); + if !entry.file_type().await?.is_dir() + && entry.file_name().to_string_lossy().ends_with(WASM_EXT) + { + debug!( + "ready to load plugin file({})", + entry.file_name().to_string_lossy() + ); + let module = read(entry.path()).await?; + let name = entry + .file_name() + .to_string_lossy() + .trim_end_matches(WASM_EXT) + .to_owned(); + match self.get_mut(&name) { + None => { + debug!("ready to load new plugin({})", name); + self.insert(name, Plugin::wasm(&module)?); + } + Some(plugin) => { + if plugin.is_change(&module) { + debug!("ready to update plugin({})", name); + *plugin = Plugin::wasm(&module)?; + } + } + } + } + } + Ok(()) + } +} + +impl CtrlService { + fn new() -> Self { + Self(Arc::new(Mutex::new(None))) + } + + pub async fn serve(&self, listener: UnixListener) -> Result<()> { + tracing::info!("controller listening"); + loop { + let service = self.clone(); + let (stream, addr) = listener.accept().await?; + tracing::debug!("accept streaming: addr={:?}", addr); + tokio::spawn(async move { + if let Err(err) = Http::new().serve_connection(stream, service).await { + tracing::error!("{}", err); + } + }); + } + } + + async fn read_config(request: Request) -> Result { + let request_data: Vec = request + .into_body() + .try_fold(vec![], |mut data, seg| { + data.extend(seg); + futures::future::ok(data) + }) + .await?; + + let raw_config: RawConfig = serde_json::from_slice(&request_data)?; + raw_config.try_into() + } + + #[instrument] + async fn handle(self, request: Request) -> anyhow::Result> { + if request.method() != Method::PUT { + return Ok(Response::builder() + .status(StatusCode::METHOD_NOT_ALLOWED) + .body(Body::empty())?); + } + + let config = match Self::read_config(request).await { + Err(e) => { + return Ok(Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(e.to_string().into())?); + } + Ok(c) => c, + }; + debug!("read config: {:?}", config); + let mut proxy = self.0.lock().await; + let mut plugin_map = match proxy.take() { + Some(proxy) => proxy.stop().await?, + None => Default::default(), + }; + debug!("ready to load plugins: current({:?})", plugin_map); + plugin_map.load_plugins(&config.plugin_path).await?; + debug!("plugins loaded: current({:?})", plugin_map); + *proxy = Some(ProxyGuard::start(config, plugin_map)); + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::empty())?) + } +} + +impl Default for CtrlService { + fn default() -> Self { + Self::new() + } +} + +impl Service> for CtrlService { + type Response = Response; + type Error = anyhow::Error; + #[allow(clippy::type_complexity)] + type Future = + Pin>>>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + #[inline] + fn call(&mut self, request: Request) -> Self::Future { + Box::pin(Self::handle(self.clone(), request)) + } +} diff --git a/rs-tproxy-proxy/src/handler/mod.rs b/rs-tproxy-proxy/src/handler.rs similarity index 100% rename from rs-tproxy-proxy/src/handler/mod.rs rename to rs-tproxy-proxy/src/handler.rs diff --git a/rs-tproxy-proxy/src/handler/http/mod.rs b/rs-tproxy-proxy/src/handler/http.rs similarity index 75% rename from rs-tproxy-proxy/src/handler/http/mod.rs rename to rs-tproxy-proxy/src/handler/http.rs index 4a5b9d3..1fd95e0 100644 --- a/rs-tproxy-proxy/src/handler/http/mod.rs +++ b/rs-tproxy-proxy/src/handler/http.rs @@ -1,3 +1,4 @@ pub mod action; +pub mod plugin; pub mod rule; pub mod selector; diff --git a/rs-tproxy-proxy/src/handler/http/plugin.rs b/rs-tproxy-proxy/src/handler/http/plugin.rs new file mode 100644 index 0000000..8e5030e --- /dev/null +++ b/rs-tproxy-proxy/src/handler/http/plugin.rs @@ -0,0 +1,197 @@ +use std::cell::Cell; +use std::fmt::{self, Debug, Display}; +use std::io; +use std::sync::{Arc, Mutex}; + +use futures::stream::TryStreamExt; +use futures::AsyncReadExt; +use http::{Request, Response}; +use hyper::Body; +use md5::{compute, Digest}; +use rs_tproxy_plugin::{RequestHeader, ResponseHeader}; +use wasmer_runtime::{compile, func, imports, DynFunc, Module, Value}; + +mod logger; +mod print; + +pub enum HandlerName { + Request, + Response, +} + +#[derive(Clone)] +pub enum Plugin { + WASM { module: Arc, hash: Digest }, +} + +impl Display for HandlerName { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + HandlerName::Request => f.write_str("handle_request"), + HandlerName::Response => f.write_str("handle_response"), + } + } +} + +impl Plugin { + pub fn hash(&self) -> &Digest { + match self { + Plugin::WASM { module: _, hash } => hash, + } + } + + pub fn wasm(wasm: &[u8]) -> anyhow::Result { + Ok(Self::WASM { + module: Arc::new(compile(wasm)?), + hash: compute(wasm), + }) + } + + pub fn is_change(&self, plugin: &[u8]) -> bool { + self.hash() != &compute(plugin) + } + + async fn read_body(header_map: &http::HeaderMap, body: Body) -> anyhow::Result> { + let size_hint = header_map + .get(http::header::CONTENT_LENGTH) + .and_then(|value| std::str::from_utf8(value.as_bytes()).ok()?.parse().ok()); + let mut body_data = match size_hint { + Some(hint) => Vec::with_capacity(hint), + None => Vec::new(), + }; + body.map_err(|err| io::Error::new(io::ErrorKind::Other, err)) + .into_async_read() + .read_to_end(&mut body_data) + .await?; + Ok(body_data) + } + + pub async fn handle_request(&self, request: Request) -> anyhow::Result> { + let (mut parts, body) = request.into_parts(); + let header: RequestHeader = (&parts).into(); + let header_data = serde_json::to_vec(&header)?; + let body_data = Self::read_body(&parts.headers, body).await?; + let plugin = self.clone(); + let new_body = tokio::task::spawn_blocking(move || { + plugin.handle_raw(HandlerName::Request, header_data, body_data) + }) + .await??; + parts.headers.remove(http::header::CONTENT_LENGTH); + Ok(Request::from_parts(parts, new_body.into())) + } + + pub async fn handle_response(&self, request: Response) -> anyhow::Result> { + let (mut parts, body) = request.into_parts(); + let header: ResponseHeader = (&parts).into(); + let header_data = serde_json::to_vec(&header)?; + let body_data = Self::read_body(&parts.headers, body).await?; + let plugin = self.clone(); + let new_body = tokio::task::spawn_blocking(move || { + plugin.handle_raw(HandlerName::Response, header_data, body_data) + }) + .await??; + parts.headers.remove(http::header::CONTENT_LENGTH); + Ok(Response::from_parts(parts, new_body.into())) + } + + fn handle_raw( + self, + hander_name: HandlerName, + header: Vec, + origin_body: Vec, + ) -> anyhow::Result> { + match self { + Plugin::WASM { module, hash: _ } => { + Self::handle_wasm(hander_name, module, &header, origin_body) + } + } + } + + fn handle_wasm( + hander_name: HandlerName, + wasm: Arc, + header: &[u8], + origin_body: Vec, + ) -> anyhow::Result> { + let ptr = Arc::new(Mutex::new(None)); + let writer = ptr.clone(); + let write_body = move |addr: u32, len: u32| { + *writer.lock().unwrap() = Some((addr as usize, len as usize)) + }; + + let import_object = imports! { + "env" => { + "write_body" => func!(write_body), + "println" => func!(print::println), + "eprintln" => func!(print::eprintln), + "log_enabled" => func!(logger::log_enabled), + "log_log" => func!(logger::log_log), + "log_flush" => func!(logger::log_flush), + }, + }; + + let mut instance = wasm + .instantiate(&import_object) + .map_err(|err| anyhow::anyhow!("{}", err))?; + + if instance + .exports + .get::(&hander_name.to_string()) + .is_err() + { + return Ok(origin_body); + } + + let memory = instance.context_mut().memory(0); + + for (byte, cell) in header + .iter() + .cloned() + .zip(memory.view()[0..(header.len()) as usize].iter()) + { + cell.set(byte); + } + + for (byte, cell) in origin_body.iter().cloned().zip( + memory.view()[header.len() as usize..(header.len() + origin_body.len()) as usize] + .iter(), + ) { + cell.set(byte); + } + + instance + .call( + &hander_name.to_string(), + &[ + Value::I64(0), + Value::I64(header.len() as _), + Value::I64(origin_body.len() as _), + ], + ) + .map_err(|err| anyhow::anyhow!("{}", err))?; + + let ptr_ref = *ptr.lock().map_err(|err| anyhow::anyhow!("{}", err))?; + match ptr_ref { + None => Ok(Vec::new()), + Some((addr, len)) => Ok(instance.context().memory(0).view()[addr..(addr + len)] + .iter() + .map(Cell::get) + .collect::>()), + } + } +} + +impl Debug for Plugin { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Plugin::WASM { module, hash } => f.write_fmt(format_args!( + "wasm module({:?}): {:?}", + hash, + module.info().exports + )), + } + } +} + +#[cfg(test)] +mod test; diff --git a/rs-tproxy-proxy/src/handler/http/plugin/logger.rs b/rs-tproxy-proxy/src/handler/http/plugin/logger.rs new file mode 100644 index 0000000..04b7b40 --- /dev/null +++ b/rs-tproxy-proxy/src/handler/http/plugin/logger.rs @@ -0,0 +1,38 @@ +use std::cell::Cell; +use std::convert::TryInto; + +use log::logger; +use rs_tproxy_plugin::{Metadata, Record}; +use wasmer_runtime::{Array, Ctx, WasmPtr}; + +fn read_data(ctx: &mut Ctx, ptr: WasmPtr, len: u32) -> Option> { + let memory = ctx.memory(0); + Some(ptr.deref(memory, 0, len)?.iter().map(Cell::get).collect()) +} + +pub fn log_enabled(ctx: &mut Ctx, ptr: WasmPtr, len: u32) -> i32 { + let data = read_data(ctx, ptr, len).unwrap(); + let meta = serde_json::from_slice::(&data) + .unwrap() + .try_into() + .unwrap(); + if logger().enabled(&meta) { + 1 + } else { + 0 + } +} + +pub fn log_log(ctx: &mut Ctx, ptr: WasmPtr, len: u32) { + let data = read_data(ctx, ptr, len).unwrap(); + let raw_record = serde_json::from_slice::(&data).unwrap(); + logger().log( + &raw_record + .build(format_args!("{}", raw_record.content)) + .unwrap(), + ) +} + +pub fn log_flush() { + logger().flush() +} diff --git a/rs-tproxy-proxy/src/handler/http/plugin/print.rs b/rs-tproxy-proxy/src/handler/http/plugin/print.rs new file mode 100644 index 0000000..4ff105e --- /dev/null +++ b/rs-tproxy-proxy/src/handler/http/plugin/print.rs @@ -0,0 +1,21 @@ +use wasmer_runtime::{Array, Ctx, WasmPtr}; + +pub fn println(ctx: &mut Ctx, ptr: WasmPtr, len: u32) { + let memory = ctx.memory(0); + + // Use helper method on `WasmPtr` to read a utf8 string + let string = ptr.get_utf8_string(memory, len).unwrap(); + + // Print it! + println!("{}", string); +} + +pub fn eprintln(ctx: &mut Ctx, ptr: WasmPtr, len: u32) { + let memory = ctx.memory(0); + + // Use helper method on `WasmPtr` to read a utf8 string + let string = ptr.get_utf8_string(memory, len).unwrap(); + + // Print it! + eprintln!("{}", string); +} diff --git a/rs-tproxy-proxy/src/handler/http/plugin/test.rs b/rs-tproxy-proxy/src/handler/http/plugin/test.rs new file mode 100644 index 0000000..af947a4 --- /dev/null +++ b/rs-tproxy-proxy/src/handler/http/plugin/test.rs @@ -0,0 +1,69 @@ +use std::io; + +use futures::stream::TryStreamExt; +use futures::AsyncReadExt; +use http::Response; +use hyper::Body; +use serde::Deserialize; + +use super::Plugin; + +/// +/// ## wasm plugin in base64 +/// +/// ```rust +/// use log::info; +/// use rs_tproxy_plugin::register_response_handler; +/// +/// register_response_handler!(|resp| { +/// let content_type = resp +/// .headers() +/// .get("content-type") +/// .ok_or(anyhow::anyhow!("content-type not found"))? +/// .to_str()?; +/// info!("get content-type: {}", content_type); +/// Ok(serde_json::to_vec(&serde_json::json!({ +/// "type": content_type, +/// "content": *resp.body(), +/// }))?) +/// }); +/// +/// ``` +/// +const PLUGIN: &[u8] = + include_bytes!("../../../../../target/wasm32-unknown-unknown/debug/plugin_example.wasm"); + +#[derive(Debug, Deserialize)] +struct Content { + #[serde(rename(deserialize = "type"))] + typ: String, + content: Vec, +} + +#[tokio::test] +async fn test_plugin() -> anyhow::Result<()> { + tracing_subscriber::fmt() + .with_max_level(tracing::level_filters::LevelFilter::INFO) + .with_writer(std::io::stderr) + .try_init() + .map_err(|err| anyhow::anyhow!("{}", err))?; + let body = "Hello World"; + let content_type = "plain/text"; + let plugin = Plugin::wasm(PLUGIN)?; + let resp = Response::builder() + .status(200) + .header("content-type", content_type) + .body(Body::from(body))?; + let new_resp = plugin.handle_response(resp).await?; + let mut body_data = Vec::new(); + new_resp + .into_body() + .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) + .into_async_read() + .read_to_end(&mut body_data) + .await?; + let content: Content = serde_json::from_slice(&body_data)?; + assert_eq!(content.typ, content_type); + assert_eq!(content.content, body.as_bytes()); + Ok(()) +} diff --git a/rs-tproxy-proxy/src/handler/http/rule.rs b/rs-tproxy-proxy/src/handler/http/rule.rs index f0ff6dc..a32718f 100644 --- a/rs-tproxy-proxy/src/handler/http/rule.rs +++ b/rs-tproxy-proxy/src/handler/http/rule.rs @@ -1,11 +1,12 @@ -use crate::handler::http::action::Actions; -use crate::handler::http::selector::Selector; +use super::action::Actions; +use super::selector::Selector; #[derive(Debug, Clone)] pub struct Rule { pub target: Target, pub selector: Selector, pub actions: Actions, + pub plugins: Vec, } #[derive(Debug, Eq, PartialEq, Clone)] diff --git a/rs-tproxy-proxy/src/lib.rs b/rs-tproxy-proxy/src/lib.rs index 97dc4aa..557d61a 100644 --- a/rs-tproxy-proxy/src/lib.rs +++ b/rs-tproxy-proxy/src/lib.rs @@ -1,38 +1,22 @@ -use std::convert::TryInto; use std::path::PathBuf; use tokio::signal::unix::SignalKind; -use tokio::sync::oneshot::channel; -use crate::proxy::http::server::HttpServer; -use crate::raw_config::RawConfig; +use crate::controller::CtrlServer; use crate::signal::Signals; -use crate::uds_client::UdsDataClient; +pub mod controller; pub mod handler; pub mod proxy; pub mod raw_config; pub mod signal; -pub mod uds_client; +pub mod task; pub async fn proxy_main(path: PathBuf) -> anyhow::Result<()> { - tracing::info!(target: "Proxy get uds path", "{:?}", path); - let client = UdsDataClient::new(path); - let mut buf: Vec = vec![]; - let raw_config: RawConfig = client.read_into(&mut buf).await?; - let config = raw_config.try_into()?; - let (sender, rx) = channel(); - - let spawn = tokio::spawn(async move { - tracing::info!(target: "Proxy", "Starting"); - let mut server = HttpServer::new(config); - server.serve(rx).await.unwrap(); - }); - + tracing::info!("proxy get uds path {:?}", path); + let mut server = CtrlServer::build(path).await?; + server.start().await?; let mut signals = Signals::from_kinds(&[SignalKind::interrupt(), SignalKind::terminate()])?; signals.wait().await?; - - let _ = sender.send(()); - spawn.await?; - Ok(()) + server.stop().await } diff --git a/rs-tproxy-proxy/src/proxy/mod.rs b/rs-tproxy-proxy/src/proxy.rs similarity index 100% rename from rs-tproxy-proxy/src/proxy/mod.rs rename to rs-tproxy-proxy/src/proxy.rs diff --git a/rs-tproxy-proxy/src/proxy/http/mod.rs b/rs-tproxy-proxy/src/proxy/http.rs similarity index 100% rename from rs-tproxy-proxy/src/proxy/http/mod.rs rename to rs-tproxy-proxy/src/proxy/http.rs diff --git a/rs-tproxy-proxy/src/proxy/http/config.rs b/rs-tproxy-proxy/src/proxy/http/config.rs index c757500..01e09e4 100644 --- a/rs-tproxy-proxy/src/proxy/http/config.rs +++ b/rs-tproxy-proxy/src/proxy/http/config.rs @@ -3,5 +3,6 @@ use crate::handler::http::rule::Rule; #[derive(Debug, Clone)] pub struct Config { pub proxy_port: u16, + pub plugin_path: String, pub rules: Vec, } diff --git a/rs-tproxy-proxy/src/proxy/http/server.rs b/rs-tproxy-proxy/src/proxy/http/server.rs index d78d380..5f67911 100644 --- a/rs-tproxy-proxy/src/proxy/http/server.rs +++ b/rs-tproxy-proxy/src/proxy/http/server.rs @@ -13,10 +13,9 @@ use hyper::service::Service; use hyper::{Body, Client, Request, Response}; use tokio::io::AsyncWriteExt; use tokio::net::TcpStream; -use tokio::select; -use tokio::sync::oneshot::Receiver; use tracing::{debug, error}; +use crate::controller::PluginMap; use crate::handler::http::action::{apply_request_action, apply_response_action}; use crate::handler::http::rule::Target; use crate::handler::http::selector::{select_request, select_response}; @@ -28,41 +27,39 @@ use crate::proxy::tcp::transparent_socket::TransparentSocket; #[derive(Debug)] pub struct HttpServer { config: Config, + plugin_map: PluginMap, } impl HttpServer { - pub fn new(config: Config) -> Self { - Self { config } + pub fn new(config: Config, plugin_map: PluginMap) -> Self { + Self { config, plugin_map } } - pub async fn serve(&mut self, rx: Receiver<()>) -> Result<()> { + pub async fn serve(&self) -> Result<()> { let addr = SocketAddr::from(([0, 0, 0, 0], self.config.proxy_port)); let listener = TcpListener::bind(addr)?; - tracing::info!(target : "Proxy", "Listening"); - select! { - _ = async { - loop { - let stream = listener.accept().await?; - let addr_remote = stream.peer_addr()?; - let addr_local = stream.local_addr()?; - tracing::debug!(target : "Accept streaming", "remote={:?}, local={:?}", addr_remote, addr_local); - let config = Arc::new(self.config.clone()); - let service = HttpService::new(addr_remote, addr_local, config); - tokio::spawn(async move { - match serve_http_with_error_return(stream, &service).await{ - Ok(_)=>{} - Err(e) => {tracing::error!("{}",e);} - }; - }); + tracing::info!("proxy listening"); + loop { + let stream = listener.accept().await?; + let addr_remote = stream.peer_addr()?; + let addr_local = stream.local_addr()?; + tracing::debug!( + "accept streaming: remote={:?}, local={:?}", + addr_remote, + addr_local + ); + let service = HttpService::new( + addr_remote, + addr_local, + self.config.clone(), + self.plugin_map.clone(), + ); + tokio::spawn(async move { + if let Err(err) = serve_http_with_error_return(stream, &service).await { + tracing::error!("{}", err); } - #[allow(unreachable_code)] - Ok::<_, anyhow::Error>(()) - } => {}, - _ = rx => { - return Ok(()); - } - }; - Ok(()) + }); + } } } @@ -70,11 +67,6 @@ pub async fn serve_http_with_error_return( mut stream: TcpStream, service: &HttpService, ) -> Result<()> { - let log_key = format!( - "{{ peer={},local={} }}", - stream.peer_addr()?, - stream.local_addr()? - ); loop { let (r, parts) = Http::new() .error_return(true) @@ -89,15 +81,15 @@ pub async fn serve_http_with_error_return( }, Err(e) => { return if e.is_parse() { - tracing::debug!("{}:Turn into tcp transfer.", log_key); + tracing::debug!("turn into tcp transfer"); match parts { Some(mut part) => { let addr_target = part.io.local_addr()?; let addr_local = part.io.peer_addr()?; let socket = TransparentSocket::bind(addr_local)?; - tracing::debug!("{}:Bind local addrs.", log_key); + tracing::debug!("bind local addrs."); let mut client_stream = socket.connect(addr_target).await?; - tracing::debug!("{}:Connected target addrs.", log_key); + tracing::debug!("connected target addrs."); client_stream .write_all(part.read_buf.as_ref()) .await @@ -109,7 +101,7 @@ pub async fn serve_http_with_error_return( } } else { if !e.to_string().contains("error shutting down connection") { - tracing::info!("{}:fail to serve http: {}", log_key, e); + tracing::info!("fail to serve http: {}", e); } Ok(()) } @@ -124,20 +116,27 @@ pub struct HttpService { remote: SocketAddr, target: SocketAddr, config: Arc, + plugin_map: Arc, } impl HttpService { - fn new(addr_remote: SocketAddr, addr_target: SocketAddr, config: Arc) -> Self { + fn new( + addr_remote: SocketAddr, + addr_target: SocketAddr, + config: Config, + plugin_map: PluginMap, + ) -> Self { Self { remote: addr_remote, target: addr_target, - config, + config: Arc::new(config), + plugin_map: Arc::new(plugin_map), } } + #[tracing::instrument] async fn handle(self, mut request: Request) -> Result> { - let log_key = format!("{{remote = {}, target = {} }}", self.remote, self.target); - debug!("{} : Proxy is handling http request", log_key); + debug!("proxy is handling http request"); let request_rules: Vec<_> = self .config .rules @@ -149,8 +148,15 @@ impl HttpService { .collect(); for rule in request_rules { - debug!("{} : request matched, rule({:?})", log_key, rule); + debug!("request matched, rule({:?})", rule); request = apply_request_action(request, &rule.actions).await?; + for name in rule.plugins.iter() { + request = self + .plugin_map + .must_get(name)? + .handle_request(request) + .await?; + } } let uri = request.uri().clone(); @@ -174,7 +180,7 @@ impl HttpService { let mut response = match client.request(request).await { Ok(resp) => resp, Err(err) => { - error!("{} : fail to forward request: {}", log_key, err); + error!("fail to forward request: {}", err); Response::builder() .status(StatusCode::BAD_GATEWAY) .body(Body::empty())? @@ -199,8 +205,15 @@ impl HttpService { .collect(); for rule in response_rules { - debug!("{} : response matched", log_key); + debug!("response matched"); response = apply_response_action(response, &rule.actions).await?; + for name in rule.plugins.iter() { + response = self + .plugin_map + .must_get(name)? + .handle_response(response) + .await? + } } Ok(response) } diff --git a/rs-tproxy-proxy/src/proxy/tcp/mod.rs b/rs-tproxy-proxy/src/proxy/tcp.rs similarity index 100% rename from rs-tproxy-proxy/src/proxy/tcp/mod.rs rename to rs-tproxy-proxy/src/proxy/tcp.rs diff --git a/rs-tproxy-proxy/src/raw_config.rs b/rs-tproxy-proxy/src/raw_config.rs index bc2e1c6..8c1ac0c 100644 --- a/rs-tproxy-proxy/src/raw_config.rs +++ b/rs-tproxy-proxy/src/raw_config.rs @@ -16,6 +16,8 @@ use crate::handler::http::rule::{Rule, Target}; use crate::handler::http::selector::Selector; use crate::proxy::http::config::Config; +pub const DEFAULT_PLUGIN_PATH: &str = "/etc/rs-tproxy/plugins"; + #[derive(Debug, Eq, PartialEq, Clone, Deserialize, Serialize, Default)] pub struct RawConfig { pub proxy_ports: Option, @@ -23,6 +25,7 @@ pub struct RawConfig { pub safe_mode: bool, pub interface: Option, pub rules: Vec, + pub plugin_path: Option, } #[derive(Debug, Eq, PartialEq, Clone, Deserialize, Serialize)] @@ -30,6 +33,7 @@ pub struct RawRule { pub target: RawTarget, pub selector: RawSelector, pub actions: RawActions, + pub plugins: Option>, } #[derive(Debug, Eq, PartialEq, Clone, Deserialize, Serialize)] @@ -152,6 +156,9 @@ impl TryFrom for Config { fn try_from(raw: RawConfig) -> Result { Ok(Self { proxy_port: raw.listen_port, + plugin_path: raw + .plugin_path + .unwrap_or_else(|| DEFAULT_PLUGIN_PATH.to_owned()), rules: raw .rules .into_iter() @@ -169,6 +176,7 @@ impl TryFrom for Rule { target: rule.target.into(), selector: rule.selector.try_into()?, actions: rule.actions.try_into()?, + plugins: rule.plugins.unwrap_or_default(), }) } } diff --git a/rs-tproxy-proxy/src/task.rs b/rs-tproxy-proxy/src/task.rs new file mode 100644 index 0000000..763a054 --- /dev/null +++ b/rs-tproxy-proxy/src/task.rs @@ -0,0 +1,41 @@ +use std::future::Future; + +use anyhow::Result; +use futures::{select, FutureExt}; +use tokio::sync::oneshot::{channel, Sender}; +use tokio::task::JoinHandle; + +#[derive(Debug)] +pub struct Task { + handler: JoinHandle>>, + sender: Sender<()>, +} + +impl Task +where + T: 'static + Send, +{ + pub fn start(f: F) -> Self + where + F: 'static + Send + Future>, + { + let (tx, rx) = channel(); + Self { + sender: tx, + handler: tokio::spawn(async move { + select! { + _ = rx.fuse() => Ok(None), + ret = f.fuse() => match ret { + Ok(v) => Ok(Some(v)), + Err(e) => Err(e) + }, + } + }), + } + } + + pub async fn stop(self) -> Result> { + let _ = self.sender.send(()); + self.handler.await? + } +} diff --git a/rs-tproxy-proxy/src/uds_client.rs b/rs-tproxy-proxy/src/uds_client.rs deleted file mode 100644 index e70b205..0000000 --- a/rs-tproxy-proxy/src/uds_client.rs +++ /dev/null @@ -1,43 +0,0 @@ -use std::path::PathBuf; - -use tokio::io::AsyncReadExt; -use tokio::net::UnixStream; - -#[derive(Debug, Clone)] -pub struct UdsDataClient { - pub path: PathBuf, -} - -impl UdsDataClient { - pub fn new(path: PathBuf) -> Self { - Self { path } - } - - pub async fn read_into<'a, T: serde::de::Deserialize<'a>>( - &self, - buf: &'a mut Vec, - ) -> anyhow::Result { - tracing::debug!("try connect path : {:?}", &self.path); - let mut stream = UnixStream::connect(self.path.clone()).await?; - return match stream.read_to_end(buf).await { - Ok(_) => { - tracing::debug!("Read data successfully."); - - match bincode::deserialize(buf.as_slice()) { - Ok(o) => { - tracing::debug!("Deserialize data successfully."); - Ok(o) - } - Err(e) => { - tracing::debug!("Deserialize data failed."); - Err(anyhow::anyhow!("{}", e)) - } - } - } - Err(e) => { - tracing::debug!("Read data failed with err {:?}.", e); - Err(anyhow::anyhow!("{}", e)) - } - }; - } -} diff --git a/rust-toolchain b/rust-toolchain index 870bbe4..c65ea30 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -stable \ No newline at end of file +nightly-2021-06-01 \ No newline at end of file diff --git a/rs-tproxy-controller/src/main.rs b/src/main.rs similarity index 80% rename from rs-tproxy-controller/src/main.rs rename to src/main.rs index ab41170..7a746f2 100644 --- a/rs-tproxy-controller/src/main.rs +++ b/src/main.rs @@ -1,18 +1,13 @@ use std::process::exit; use anyhow::anyhow; +use rs_tproxy_controller::cmd::command_line::{get_config_from_opt, Opt}; +use rs_tproxy_controller::cmd::interactive::handler::ConfigServer; +use rs_tproxy_controller::proxy::exec::Proxy; use rs_tproxy_proxy::proxy_main; use rs_tproxy_proxy::signal::Signals; use tokio::signal::unix::SignalKind; -use crate::cmd::command_line::{get_config_from_opt, Opt}; -use crate::cmd::interactive::handler::ConfigServer; -use crate::proxy::exec::Proxy; - -pub mod cmd; -pub mod proxy; -pub mod raw_config; - #[tokio::main] async fn main() -> anyhow::Result<()> { let opt = match Opt::from_args_checked() { @@ -23,7 +18,10 @@ async fn main() -> anyhow::Result<()> { Ok(o) => o, }; tracing_subscriber::fmt() - .with_max_level(opt.get_level_filter()) + .with_env_filter(tracing_subscriber::EnvFilter::new(format!( + "rs_tproxy={}", + opt.get_level() + ))) .with_writer(std::io::stderr) .try_init() .map_err(|err| anyhow!("{}", err))?; diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 284c354..e50aa12 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -5,7 +5,7 @@ edition = "2018" [[test]] name = "integration" -path = "./integrations/mod.rs" +path = "integrations/mod.rs" [dependencies] anyhow = "1.0" @@ -13,18 +13,26 @@ clap = "2.33.3" futures = "0.3.10" http = "0.2.3" humantime-serde = "1.0" -hyper = {git = "https://github.com/Andrewmatilde/hyper.git", features = ["runtime", "client", "server", "http1", "http2", "stream", "error_return"]} +hyper = { git = "https://github.com/Andrewmatilde/hyper.git", features = [ + "runtime", + "client", + "server", + "http1", + "http2", + "stream", + "error_return", +] } iptables = "0.4" -libc = {version = "0.2.81", features = ["std"]} +libc = { version = "0.2.81", features = ["std"] } paw = "1.0" -serde = {version = "1.0", features = ["derive"]} +serde = { version = "1.0", features = ["derive"] } serde_derive = "1.0.123" serde_json = "1.0" serde_urlencoded = "0.7" serde_yaml = "0.8" socket2 = "0.3" -structopt = {version = "0.3", features = ["paw"]} -tokio = {version = "1.4", features = ["full"]} +structopt = { version = "0.3", features = ["paw"] } +tokio = { version = "1.4", features = ["full"] } wildmatch = "2.1" tracing = "0.1" tracing-futures = "0.2" @@ -36,9 +44,12 @@ cfg-if = "1.0.0" bincode = "1.3.3" tempfile = "3.2.0" uuid = { version = "0.8", features = ["serde", "v4"] } -futures-util = { version = "0.3.7", default-features = false, features = ["alloc", "sink"] } -rs-tproxy-proxy = {path = "../rs-tproxy-proxy"} -rs-tproxy-controller = {path = "../rs-tproxy-controller"} +futures-util = { version = "0.3.7", default-features = false, features = [ + "alloc", + "sink", +] } +rs-tproxy-proxy = { path = "../rs-tproxy-proxy" } +rs-tproxy-controller = { path = "../rs-tproxy-controller" } pnet = "0.28.0" default-net = "0.2.0" -system_gateway = {git="https://github.com/aruntomar/system_gateway"} +system_gateway = { git = "https://github.com/aruntomar/system_gateway" } diff --git a/tests/integrations/mod.rs b/tests/integrations/mod.rs index e35e75d..ec8f0fb 100644 --- a/tests/integrations/mod.rs +++ b/tests/integrations/mod.rs @@ -1,2 +1,2 @@ +mod test_controller; mod test_http_action; -mod test_uds; diff --git a/tests/integrations/test_controller.rs b/tests/integrations/test_controller.rs new file mode 100644 index 0000000..d5db7f9 --- /dev/null +++ b/tests/integrations/test_controller.rs @@ -0,0 +1,19 @@ +use std::env; + +use rs_tproxy_controller::proxy::controller::send_config; +use rs_tproxy_proxy::proxy_main; +use rs_tproxy_proxy::raw_config::RawConfig; +use tokio::time::Duration; +use uuid::Uuid; + +#[tokio::test] +async fn test_controller() -> anyhow::Result<()> { + let uds_path = env::temp_dir() + .join(Uuid::new_v4().to_string()) + .with_extension("sock"); + let data = RawConfig::default(); + tokio::spawn(proxy_main(uds_path.clone())); + tokio::time::sleep(Duration::from_secs(2)).await; + send_config(uds_path, &data).await?; + Ok(()) +} diff --git a/tests/integrations/test_uds.rs b/tests/integrations/test_uds.rs deleted file mode 100644 index 2324325..0000000 --- a/tests/integrations/test_uds.rs +++ /dev/null @@ -1,31 +0,0 @@ -use std::env; - -use rs_tproxy_controller_lib::proxy::uds_server::UdsDataServer; -use rs_tproxy_proxy::uds_client::UdsDataClient; -use tokio::time::Duration; -use uuid::Uuid; - -#[tokio::test] -async fn test_uds() { - let uds_path = env::temp_dir() - .join(Uuid::new_v4().to_string()) - .with_extension("sock"); - let data = Uuid::new_v4().to_string(); - - let uds_server = UdsDataServer::new(data.clone(), uds_path.clone()); - let listener = uds_server.bind().unwrap(); - let server = uds_server.clone(); - - tokio::spawn(async move { - tokio::time::sleep(Duration::new(5, 0)).await; - let _ = server - .listen(listener) - .await - .map_err(|e| tracing::error!("{:?}", e)); - }); - - let client = UdsDataClient::new(uds_path.clone()); - let mut buf: Vec = vec![]; - let data_o: String = client.read_into(&mut buf).await.unwrap(); - assert_eq!(data, data_o); -}