Skip to content

Commit aeec78a

Browse files
committed
fix base url and integreation test
1 parent 62d4dd4 commit aeec78a

File tree

6 files changed

+37
-20
lines changed

6 files changed

+37
-20
lines changed

src/sinks/doris/client.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ pub type ThreadSafeDorisSinkClient = Arc<DorisSinkClient>;
3737
#[derive(Clone, Debug)]
3838
pub struct DorisSinkClient {
3939
http_client: HttpClient,
40-
base_url: String,
40+
base_url: Uri,
4141
auth: Option<Auth>,
4242
compression: Compression,
4343
label_prefix: String,
@@ -47,7 +47,7 @@ pub struct DorisSinkClient {
4747
impl DorisSinkClient {
4848
pub async fn new(
4949
http_client: HttpClient,
50-
base_url: String,
50+
base_url: Uri,
5151
auth: Option<Auth>,
5252
compression: Compression,
5353
label_prefix: String,
@@ -108,9 +108,13 @@ impl DorisSinkClient {
108108
StreamLoadError::InvalidRedirectUri { source }
109109
})?
110110
} else {
111-
// Build original URL
112-
let stream_load_url =
113-
format!("{}/api/{}/{}/_stream_load", self.base_url, database, table);
111+
// Build original URL using Uri components to avoid trailing slash issues
112+
let scheme = self.base_url.scheme_str().unwrap_or("http");
113+
let authority = self.base_url.authority().map(|a| a.as_str()).unwrap_or("");
114+
let stream_load_url = format!(
115+
"{}://{}/api/{}/{}/_stream_load",
116+
scheme, authority, database, table
117+
);
114118

115119
stream_load_url.parse::<Uri>().map_err(|source| {
116120
debug!(
@@ -293,11 +297,11 @@ impl DorisSinkClient {
293297
})
294298
}
295299

296-
pub async fn healthcheck_fenode(&self, endpoint: String) -> crate::Result<()> {
300+
pub async fn healthcheck_fenode(&self, endpoint: &Uri) -> crate::Result<()> {
297301
// Use Doris bootstrap API endpoint for health check, GET method
298-
let query_path = "/api/bootstrap";
299-
let endpoint_str = endpoint.trim_end_matches('/');
300-
let uri_str = format!("{}{}", endpoint_str, query_path);
302+
let scheme = endpoint.scheme_str().unwrap_or("http");
303+
let authority = endpoint.authority().map(|a| a.as_str()).unwrap_or("");
304+
let uri_str = format!("{}://{}/api/bootstrap", scheme, authority);
301305

302306
let uri = uri_str.parse::<Uri>().map_err(|source| {
303307
debug!(

src/sinks/doris/common.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@ use crate::{
1010
},
1111
tls::TlsSettings,
1212
};
13+
use http::Uri;
1314
use vector_lib::codecs::encoding::Framer;
1415

1516
#[derive(Debug, Clone)]
1617
pub struct DorisCommon {
17-
pub base_url: String,
18+
pub base_url: Uri,
1819
pub auth: Option<Auth>,
1920
pub request_builder: DorisRequestBuilder,
2021
pub tls_settings: TlsSettings,
@@ -30,7 +31,7 @@ impl DorisCommon {
3031

3132
// basic auth must be some for now
3233
let auth = config.auth.choose_one(&endpoint.auth)?;
33-
let base_url = endpoint.uri.to_string().trim_end_matches('/').to_owned();
34+
let base_url = endpoint.uri.clone();
3435
let tls_settings = TlsSettings::from_options(config.tls.as_ref())?;
3536

3637
// Build encoder from the encoding configuration
@@ -61,6 +62,6 @@ impl DorisCommon {
6162
}
6263

6364
pub async fn healthcheck(&self, client: ThreadSafeDorisSinkClient) -> crate::Result<()> {
64-
client.healthcheck_fenode(self.base_url.clone()).await
65+
client.healthcheck_fenode(&self.base_url).await
6566
}
6667
}

src/sinks/doris/config.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -161,21 +161,22 @@ impl SinkConfig for DorisConfig {
161161

162162
let services_futures = commons
163163
.iter()
164-
.cloned()
165164
.map(|common| {
166165
let client_clone = client.clone();
167166
let compression = self.compression;
168167
let label_prefix = self.label_prefix.clone();
169168
let headers = self.headers.clone();
170169
let log_request = self.log_request;
170+
let base_url = common.base_url.clone();
171+
let auth = common.auth.clone();
171172

172173
async move {
173-
let endpoint = common.base_url.clone();
174+
let endpoint = base_url.to_string();
174175

175176
let doris_client = DorisSinkClient::new(
176177
client_clone,
177-
common.base_url.clone(),
178-
common.auth.clone(),
178+
base_url,
179+
auth,
179180
compression,
180181
label_prefix,
181182
headers,

src/sinks/doris/integration_test.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ struct DorisAuth {
8484
fn config_auth() -> DorisAuth {
8585
DorisAuth {
8686
user: "root".to_string(),
87-
password: "123456".to_string(),
87+
password: "".to_string(),
8888
}
8989
}
9090

@@ -202,16 +202,20 @@ impl DorisTestClient {
202202

203203
// Configure MySQL connection parameters - For Doris specifically adjusted
204204
// Disable these options to not send `SET @@sql_mode=CONCAT(@@sql_mode, {})` which is not supported on Doris.
205-
let connect_options = MySqlConnectOptions::new()
205+
let mut connect_options = MySqlConnectOptions::new()
206206
.host(&host)
207207
.port(port)
208208
.username(&auth.user)
209-
.password(&auth.password)
210209
.no_engine_substitution(false) // Keep false to avoid SET statement
211210
.pipes_as_concat(false) // Keep false to avoid SET statement
212211
.ssl_mode(sqlx::mysql::MySqlSslMode::Disabled)
213212
.disable_statement_logging();
214213

214+
// Only set password if it's not empty (Doris root user has no password by default)
215+
if !auth.password.is_empty() {
216+
connect_options = connect_options.password(&auth.password);
217+
}
218+
215219
info!(
216220
message = "DorisTestClient initialized successfully.",
217221
internal_log_rate_limit = true

tests/integration/doris/config/compose.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ services:
33
doris:
44
image: docker.io/yagagagaga/doris-standalone:${CONFIG_VERSION}
55
container_name: doris
6+
hostname: doris
67
restart: unless-stopped
78
privileged: true
89
ports:
@@ -11,6 +12,12 @@ services:
1112
- "8040"
1213
environment:
1314
- SKIP_CHECK_ULIMIT=true
15+
healthcheck:
16+
test: ["CMD", "curl", "-f", "http://localhost:8030/api/bootstrap"]
17+
interval: 10s
18+
timeout: 5s
19+
retries: 30
20+
start_period: 60s
1421

1522
networks:
1623
default:

tests/integration/doris/config/test.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ features:
55
test_filter: 'sinks::doris::integration_test::'
66

77
env:
8-
DORIS_ADDRESS: http://doris:8030
8+
DORIS_ADDRESS: http://doris:8040
99

1010
matrix:
1111
version: ['2.1.7']

0 commit comments

Comments
 (0)