Skip to content

Commit f0b0e29

Browse files
authored
Merge branch 'main' into desc
2 parents f79a04c + d28aa6a commit f0b0e29

File tree

22 files changed

+273
-277
lines changed

22 files changed

+273
-277
lines changed

docs/doc/13-monitor/jaeger.md

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
title: Monitoring with Jaeger
2+
title: Jaeger
33
---
44

55
[Jaeger](https://github.com/jaegertracing/jaeger) is an open-source, end-to-end distributed tracing tool that originated from [Uber](https://www.uber.com/). It helps monitor and troubleshoot microservices-based applications.
@@ -16,9 +16,9 @@ This tutorial uses the All In One image to deploy Jaeger in Docker. If you alrea
1616
docker run -d -p6831:6831/udp -p6832:6832/udp -p16686:16686 jaegertracing/all-in-one:latest
1717
```
1818

19-
### Step 2. Deploy Databend
19+
### Step 2. Set Environment Variables
2020

21-
1. Set the following environment variables according to your actual tracing level requirements and Jaeger endpoint.
21+
Set the following environment variables according to your actual tracing level requirements and Jaeger endpoint.
2222
- `RUST_LOG`: Sets the log level.
2323
- `DATABEND_JAEGER_AGENT_ENDPOINT`: Sets the endpoint the Jaeger agent is listening on.
2424

@@ -27,17 +27,19 @@ export RUST_LOG=DEBUG
2727
export DATABEND_JAEGER_AGENT_ENDPOINT=localhost:6831
2828
```
2929

30-
2. Follow the [Deployment Guide](https://databend.rs/doc/deploy) to deploy Databend.
30+
### Step 3. Deploy Databend
3131

32-
3. Run the following SQL statements:
32+
1. Follow the [Deployment Guide](https://databend.rs/doc/deploy) to deploy Databend.
33+
34+
2. Run the following SQL statements:
3335

3436
```sql
3537
CREATE TABLE t1(a INT);
3638
INSERT INTO t1 VALUES(1);
3739
INSERT INTO t1 SELECT * FROM t1;
3840
```
3941

40-
### Step 3. Check Tracing Information on Jaegar
42+
### Step 4. Check Tracing Information on Jaegar
4143

4244
1. Go to <http://127.0.0.1:16686/> and select the **Search** tab.
4345

docs/doc/13-monitor/sentry.md

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,51 @@
11
---
2-
title: Monitoring with Sentry
2+
title: Sentry
33
---
44

5-
With automated error reporting, you can not only get alerted when something breaks, but also access the details you need to reproduce the issue.
5+
[Sentry](https://sentry.io/welcome/) is a developer-first error tracking and performance monitoring platform that helps developers see what actually matters, solve quicker, and learn continuously about their applications.
66

7-
## Sentry
7+
Databend provides integration with both Cloud and self-hosted Sentry solutions. The following tutorial walks you through the integration process.
88

9-
Databend provides integration with [Sentry](https://github.com/getsentry/sentry), a developer-first error tracking and performance monitoring platform.
9+
## Tutorial: Monitor Databend with Sentry
1010

11-
### Deploy Sentry
11+
### Step 1. Deploy Sentry
1212

13-
You can use Sentry as a cloud service by signing up at [sentry.io](https://sentry.io), or you can host it yourself by following the instructions at [Self-Hosted Sentry](https://develop.sentry.dev/self-hosted/).
13+
To deploy an on-premises Sentry, follow the instructions: https://develop.sentry.dev/self-hosted/
1414

15-
<img src="/img/tracing/sentry-hosted.png"/>
15+
This tutorial uses the Sentry service on the cloud. To sign up an account for Cloud Sentry, go to https://sentry.io
1616

17-
### Create a Project
17+
### Step 2. Create a Sentry Project
1818

19-
To use Sentry with Databend, you need to create a project in Sentry and get its DSN (Data Source Name). The DSN is a unique identifier for your project that tells Sentry where to send your data. In this example, the DSN is `http://[email protected]:9000/5`
19+
Once you're logged into Sentry, create a Sentry project for the `Rust` platform to start. For how to create a project on Sentry, see https://docs.sentry.io/product/sentry-basics/integrate-frontend/create-new-project/
2020

21-
<img src="/img/tracing/sentry-get-dsn.png"/>
21+
![Alt text](../../public/img/tracing/sentry-rust.png)
2222

23-
### Start Databend
23+
### Step 3. Set Environment Variables
2424

25-
You can start Databend with Sentry in two ways:
25+
1. Get the DSN (Data Source Name) of your project. For what DSN is and where to find it, see https://docs.sentry.io/product/sentry-basics/dsn-explainer/
2626

27-
- **Enable Error Tracking Only**
27+
2. Set environment variables.
2828

29-
This option will only use the `sentry-log` feature, which will send error logs to Sentry.
29+
- To enable the error-tracking feature, run the following commands:
3030

31-
```bash
32-
export DATABEND_SENTRY_DSN="<your-sentry-dsn>"
33-
```
31+
```bash
32+
export DATABEND_SENTRY_DSN="<your-DSN>"
33+
```
3434

35-
<img src="/img/tracing/sentry-error.png"/>
35+
- To enable the performance monitoring feature, run the following commands:
3636

37-
- **Also Enable Performance Monitoring**
37+
```bash
38+
export DATABEND_SENTRY_DSN="<your-DSN>"
39+
export SENTRY_TRACES_SAMPLE_RATE=1.0 LOG_LEVEL=DEBUG
40+
```
41+
:::tip
42+
Set `SENTRY_TRACES_SAMPLE_RATE` to a small value in production.
43+
:::
3844

39-
Setting `SENTRY_TRACES_SAMPLE_RATE` greater than `0.0` will allow sentry to perform trace sampling, which will help set up performance monitoring.
45+
### Step 4. Deploy Databend
4046

41-
```bash
42-
export DATABEND_SENTRY_DSN="<your-sentry-dsn>"
43-
export SENTRY_TRACES_SAMPLE_RATE=1.0 LOG_LEVEL=DEBUG
44-
```
47+
Follow the [Deployment Guide](https://databend.rs/doc/deploy) to deploy Databend.
4548

46-
**Note:** Set `SENTRY_TRACES_SAMPLE_RATE` a to lower value in production.
49+
You're all set now. Check the pages on Sentry for alerts and performce information.
4750

48-
<img src="/img/tracing/sentry-performance.png"/>
51+
![Alt text](../../public/img/tracing/sentry-done.png)
857 KB
Loading
132 KB
Loading

src/meta/api/src/schema_api_impl.rs

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1869,6 +1869,15 @@ impl<KV: kvapi::KVApi<Error = MetaError>> SchemaApi for KV {
18691869
let mut retry = 0;
18701870
let table_id = req.table_id;
18711871

1872+
let mut keys = Vec::with_capacity(req.file_info.len());
1873+
for file in req.file_info.iter() {
1874+
let key = TableCopiedFileNameIdent {
1875+
table_id,
1876+
file: file.0.clone(),
1877+
};
1878+
keys.push(key.to_string_key());
1879+
}
1880+
18721881
while retry < TXN_MAX_RETRY_TIMES {
18731882
retry += 1;
18741883

@@ -1900,25 +1909,32 @@ impl<KV: kvapi::KVApi<Error = MetaError>> SchemaApi for KV {
19001909
// So now, in case that `TableCopiedFileInfo` has expire time, remove `TableCopiedFileLockKey`
19011910
// in each function. In this case there is chance that some `TableCopiedFileInfo` may not be
19021911
// removed in `remove_table_copied_files`, but these data can be purged in case of expire time.
1903-
for (file, file_info) in req.file_info.iter() {
1904-
let key = TableCopiedFileNameIdent {
1905-
table_id,
1906-
file: file.to_owned(),
1907-
};
1908-
let (file_seq, _): (_, Option<TableCopiedFileInfo>) =
1909-
get_pb_value(self, &key).await?;
1910-
1911-
condition.push(txn_cond_seq(&key, Eq, file_seq));
1912-
match &req.expire_at {
1913-
Some(expire_at) => {
1914-
if_then.push(txn_op_put_with_expire(
1915-
&key,
1916-
serialize_struct(file_info)?,
1917-
*expire_at,
1918-
));
1919-
}
1920-
None => {
1921-
if_then.push(txn_op_put(&key, serialize_struct(file_info)?));
1912+
1913+
let mut file_name_infos = req.file_info.clone().into_iter();
1914+
1915+
for c in keys.chunks(DEFAULT_MGET_SIZE) {
1916+
let seq_infos: Vec<(u64, Option<TableCopiedFileInfo>)> =
1917+
mget_pb_values(self, c).await?;
1918+
1919+
for (file_seq, _file_info_opt) in seq_infos {
1920+
let (f_name, file_info) = file_name_infos.next().unwrap();
1921+
1922+
let key = TableCopiedFileNameIdent {
1923+
table_id,
1924+
file: f_name.to_owned(),
1925+
};
1926+
condition.push(txn_cond_seq(&key, Eq, file_seq));
1927+
match &req.expire_at {
1928+
Some(expire_at) => {
1929+
if_then.push(txn_op_put_with_expire(
1930+
&key,
1931+
serialize_struct(&file_info)?,
1932+
*expire_at,
1933+
));
1934+
}
1935+
None => {
1936+
if_then.push(txn_op_put(&key, serialize_struct(&file_info)?));
1937+
}
19221938
}
19231939
}
19241940
}

src/query/pipeline/sources/src/input_formats/impls/input_format_parquet.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use common_arrow::parquet::metadata::ColumnChunkMetaData;
3434
use common_arrow::parquet::metadata::RowGroupMetaData;
3535
use common_arrow::parquet::read::read_metadata;
3636
use common_arrow::read_columns_async;
37+
use common_catalog::plan::StageFileInfo;
3738
use common_exception::ErrorCode;
3839
use common_exception::Result;
3940
use common_expression::DataBlock;
@@ -70,23 +71,28 @@ fn col_offset(meta: &ColumnChunkMetaData) -> i64 {
7071
impl InputFormat for InputFormatParquet {
7172
async fn get_splits(
7273
&self,
73-
files: &[String],
74+
file_infos: Vec<StageFileInfo>,
7475
_stage_info: &StageInfo,
7576
op: &Operator,
7677
_settings: &Arc<Settings>,
7778
) -> Result<Vec<Arc<SplitInfo>>> {
7879
let mut infos = vec![];
79-
for path in files {
80-
let size = op.stat(path).await?.content_length() as usize;
81-
let mut reader = op.reader(path).await?;
80+
let mut schema = None;
81+
for info in file_infos {
82+
let size = info.size as usize;
83+
let path = info.path.clone();
84+
85+
let mut reader = op.reader(&path).await?;
8286
let mut file_meta = read_metadata_async(&mut reader).await?;
8387
let row_groups = mem::take(&mut file_meta.row_groups);
84-
let infer_schema = infer_schema(&file_meta)?;
85-
let fields = Arc::new(infer_schema.fields);
88+
if schema.is_none() {
89+
schema = Some(infer_schema(&file_meta)?);
90+
}
91+
let fields = Arc::new(schema.clone().unwrap().fields);
8692
let read_file_meta = Arc::new(FileMeta { fields });
8793

8894
let file_info = Arc::new(FileInfo {
89-
path: path.clone(),
95+
path,
9096
size,
9197
num_splits: row_groups.len(),
9298
compress_alg: None,

src/query/pipeline/sources/src/input_formats/input_format.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::sync::Arc;
1616

17+
use common_catalog::plan::StageFileInfo;
1718
use common_exception::Result;
1819
use common_expression::TableSchemaRef;
1920
use common_meta_app::principal::StageInfo;
@@ -28,7 +29,7 @@ use crate::input_formats::SplitInfo;
2829
pub trait InputFormat: Send + Sync {
2930
async fn get_splits(
3031
&self,
31-
files: &[String],
32+
file_infos: Vec<StageFileInfo>,
3233
stage_info: &StageInfo,
3334
op: &Operator,
3435
settings: &Arc<Settings>,

src/query/pipeline/sources/src/input_formats/input_format_text.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::sync::atomic::AtomicU64;
1919
use std::sync::atomic::Ordering;
2020
use std::sync::Arc;
2121

22+
use common_catalog::plan::StageFileInfo;
2223
use common_compress::DecompressDecoder;
2324
use common_compress::DecompressState;
2425
use common_exception::ErrorCode;
@@ -286,17 +287,19 @@ impl<T: InputFormatTextBase> InputFormatPipe for InputFormatTextPipe<T> {
286287
impl<T: InputFormatTextBase> InputFormat for T {
287288
async fn get_splits(
288289
&self,
289-
files: &[String],
290+
file_infos: Vec<StageFileInfo>,
290291
stage_info: &StageInfo,
291-
op: &Operator,
292+
_op: &Operator,
292293
_settings: &Arc<Settings>,
293294
) -> Result<Vec<Arc<SplitInfo>>> {
294295
let mut infos = vec![];
295-
for path in files {
296-
let size = op.stat(path).await?.content_length() as usize;
296+
for info in file_infos {
297+
let size = info.size as usize;
298+
let path = info.path.clone();
299+
297300
let compress_alg = InputContext::get_compression_alg_copy(
298301
stage_info.file_format_options.compression,
299-
path,
302+
&path,
300303
)?;
301304
let split_size = stage_info.copy_options.split_size;
302305
if compress_alg.is_none() && T::is_splittable() && split_size > 0 {
@@ -310,7 +313,7 @@ impl<T: InputFormatTextBase> InputFormat for T {
310313
split_size
311314
);
312315
let file = Arc::new(FileInfo {
313-
path: path.clone(),
316+
path,
314317
size,
315318
num_splits: split_offsets.len(),
316319
compress_alg,
@@ -327,7 +330,7 @@ impl<T: InputFormatTextBase> InputFormat for T {
327330
}
328331
} else {
329332
let file = Arc::new(FileInfo {
330-
path: path.clone(),
333+
path,
331334
size, // dummy
332335
num_splits: 1,
333336
compress_alg,

0 commit comments

Comments
 (0)