Skip to content

Commit 4bdd04b

Browse files
committed
Fix thrift connection setup
1 parent a45175a commit 4bdd04b

File tree

1 file changed

+22
-14
lines changed

1 file changed

+22
-14
lines changed

src/thrift_client/mod.rs

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,27 +32,29 @@
3232
//! # }
3333
//! ```
3434
35-
use thrift::protocol::{TBinaryInputProtocol, TBinaryOutputProtocol};
35+
use thrift::protocol::{TBinaryInputProtocol, TBinaryOutputProtocol, TMultiplexedOutputProtocol};
3636
use thrift::transport::{
37-
ReadHalf, TFramedReadTransport, TFramedWriteTransport, TIoChannel, TTcpChannel, WriteHalf,
37+
ReadHalf, TBufferedReadTransport, TBufferedWriteTransport, TIoChannel, TTcpChannel, WriteHalf,
3838
};
3939

4040
use crate::error::RBFRTError;
4141

4242
/// Type alias for Thrift input protocol
43-
pub type ThriftInputProtocol = TBinaryInputProtocol<TFramedReadTransport<ReadHalf<TTcpChannel>>>;
43+
pub type ThriftInputProtocol = TBinaryInputProtocol<TBufferedReadTransport<ReadHalf<TTcpChannel>>>;
4444

45-
/// Type alias for Thrift output protocol
46-
pub type ThriftOutputProtocol =
47-
TBinaryOutputProtocol<TFramedWriteTransport<WriteHalf<TTcpChannel>>>;
45+
/// Type alias for Thrift output protocol (wrapped with multiplexed protocol for service routing)
46+
pub type ThriftOutputProtocol = TMultiplexedOutputProtocol<
47+
TBinaryOutputProtocol<TBufferedWriteTransport<WriteHalf<TTcpChannel>>>,
48+
>;
4849

4950
/// Connect to a Thrift service and return protocol layers
5051
///
5152
/// This function handles all the boilerplate of creating a TCP connection
52-
/// and setting up the Thrift protocol layers.
53+
/// and setting up the Thrift protocol layers with multiplexed service routing.
5354
///
5455
/// # Arguments
5556
/// * `address` - Server address in format "host:port" (e.g., "localhost:9090")
57+
/// * `service_name` - Name of the Thrift service to connect to (e.g., "ts", "port_mgr", "tm")
5658
///
5759
/// # Returns
5860
/// Returns a tuple of (InputProtocol, OutputProtocol) that can be passed to any Thrift client's new() method
@@ -64,19 +66,22 @@ pub type ThriftOutputProtocol =
6466
///
6567
/// # fn example() -> Result<(), Box<dyn std::error::Error>> {
6668
/// // Create timestamp client
67-
/// let (i_prot, o_prot) = thrift_client::connect("localhost:9090")?;
69+
/// let (i_prot, o_prot) = thrift_client::connect("localhost:9090", "ts")?;
6870
/// let mut ts_client = TsSyncClient::new(i_prot, o_prot);
6971
/// ts_client.ts_global_ts_value_set(0, 1_000_000_000)?;
7072
///
7173
/// // Create port manager client
7274
/// use rbfrt::thrift_generated::port_mgr::PortMgrSyncClient;
73-
/// let (i_prot, o_prot) = thrift_client::connect("localhost:9090")?;
75+
/// let (i_prot, o_prot) = thrift_client::connect("localhost:9090", "port_mgr")?;
7476
/// let mut port_client = PortMgrSyncClient::new(i_prot, o_prot);
7577
/// port_client.port_mgr_mtu_set(0, 128, 9000, 9000)?;
7678
/// # Ok(())
7779
/// # }
7880
/// ```
79-
pub fn connect(address: &str) -> Result<(ThriftInputProtocol, ThriftOutputProtocol), RBFRTError> {
81+
pub fn connect(
82+
address: &str,
83+
service_name: &str,
84+
) -> Result<(ThriftInputProtocol, ThriftOutputProtocol), RBFRTError> {
8085
// Create TCP connection
8186
let mut channel = TTcpChannel::new();
8287
channel
@@ -90,9 +95,12 @@ pub fn connect(address: &str) -> Result<(ThriftInputProtocol, ThriftOutputProtoc
9095
message: format!("Failed to split channel: {}", e),
9196
})?;
9297

93-
// Create protocol layers
94-
let i_prot = TBinaryInputProtocol::new(TFramedReadTransport::new(i_chan), true);
95-
let o_prot = TBinaryOutputProtocol::new(TFramedWriteTransport::new(o_chan), true);
98+
// Create protocol layers with buffered transport
99+
let i_prot = TBinaryInputProtocol::new(TBufferedReadTransport::new(i_chan), true);
100+
let o_prot_base = TBinaryOutputProtocol::new(TBufferedWriteTransport::new(o_chan), true);
101+
102+
// Wrap output protocol with multiplexed protocol for service routing
103+
let o_prot = TMultiplexedOutputProtocol::new(service_name, o_prot_base);
96104

97105
Ok((i_prot, o_prot))
98106
}
@@ -107,7 +115,7 @@ mod tests {
107115
// This test ensures the types compile correctly
108116
// Actual connection would require a running Thrift server
109117
fn _example() -> Result<(), RBFRTError> {
110-
let (i_prot, o_prot) = connect("localhost:9090")?;
118+
let (i_prot, o_prot) = connect("localhost:9090", "ts")?;
111119
let _client = TsSyncClient::new(i_prot, o_prot);
112120
Ok(())
113121
}

0 commit comments

Comments
 (0)