Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 47 additions & 5 deletions influxdb3/src/commands/show.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,33 @@
use clap::Parser;
use secrecy::{ExposeSecret, Secret};
use std::error::Error;
use std::io;
use std::str::Utf8Error;
use tokio::fs::OpenOptions;
use tokio::io::AsyncWriteExt;
use url::Url;

use crate::commands::common::Format;
use system::Error as SystemCommandError;
#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
#[error(transparent)]
Client(#[from] influxdb3_client::Error),

#[error(
"must specify an output file path with `--output` parameter when formatting \
the output as `parquet`"
)]
NoOutputFileForParquet,

#[error("invalid UTF8 received from server: {0}")]
Utf8(#[from] Utf8Error),

#[error("io error: {0}")]
Io(#[from] io::Error),

#[error(transparent)]
SystemCommand(#[from] SystemCommandError),
}

mod system;
use system::SystemConfig;
Expand Down Expand Up @@ -45,30 +69,48 @@ pub struct DatabaseConfig {
/// The format in which to output the list of databases
#[clap(value_enum, long = "format", default_value = "pretty")]
output_format: Format,

/// Put the list of databases into `output`
#[clap(short = 'o', long = "output")]
output_file_path: Option<String>,
}

pub(crate) async fn command(config: Config) -> Result<(), Box<dyn Error>> {
pub(crate) async fn command(config: Config) -> Result<(), Error> {
match config.cmd {
SubCommand::Databases(DatabaseConfig {
host_url,
auth_token,
show_deleted,
output_format,
output_file_path,
}) => {
let mut client = influxdb3_client::Client::new(host_url)?;

if let Some(t) = auth_token {
client = client.with_auth_token(t.expose_secret());
}

let resp_bytes = client
let mut resp_bytes = client
.api_v3_configure_db_show()
.with_format(output_format.into())
.with_format(output_format.clone().into())
.with_show_deleted(show_deleted)
.send()
.await?;

println!("{}", std::str::from_utf8(&resp_bytes)?);
if let Some(path) = output_file_path {
let mut f = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)
.await?;
f.write_all_buf(&mut resp_bytes).await?;
} else {
if output_format.is_parquet() {
Err(Error::NoOutputFileForParquet)?
}
println!("{}", std::str::from_utf8(&resp_bytes)?);
}
}
SubCommand::System(cfg) => system::command(cfg).await?,
}
Expand Down
118 changes: 101 additions & 17 deletions influxdb3/src/commands/show/system.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use super::super::common::{Format, InfluxDb3Config};
use clap::Parser;
use influxdb3_client::Client;
use secrecy::ExposeSecret;
use serde::Deserialize;

use super::super::common::{Format, InfluxDb3Config};
use std::io;
use tokio::fs::OpenOptions;
use tokio::io::AsyncWriteExt;

#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
Expand All @@ -15,6 +17,15 @@ pub(crate) enum Error {

#[error("system table '{0}' not found: {1}")]
SystemTableNotFound(String, SystemTableNotFound),

#[error(
"must specify an output file path with `--output` parameter when formatting \
the output as `parquet`"
)]
NoOutputFileForParquet,

#[error("io error: {0}")]
Io(#[from] io::Error),
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -77,6 +88,10 @@ pub struct TableListConfig {
/// The format in which to output the query
#[clap(value_enum, long = "format", default_value = "pretty")]
output_format: Format,

/// Put the table lists output into `output`
#[clap(short = 'o', long = "output")]
output_file_path: Option<String>,
}

const SYS_TABLES_QUERY: &str = "WITH cols (table_name, column_name) AS (SELECT table_name, column_name FROM information_schema.columns WHERE table_schema = 'system' ORDER BY (table_name, column_name)) SELECT table_name, array_agg(column_name) AS column_names FROM cols GROUP BY table_name ORDER BY table_name";
Expand All @@ -100,14 +115,32 @@ impl std::fmt::Display for SystemTableNotFound {

impl SystemCommandRunner {
async fn list(&self, config: TableListConfig) -> Result<()> {
let bs = self
let TableListConfig {
output_format,
output_file_path,
} = &config;

let mut bs = self
.client
.api_v3_query_sql(self.db.as_str(), SYS_TABLES_QUERY)
.format(config.output_format.into())
.format(output_format.clone().into())
.send()
.await?;

println!("{}", String::from_utf8(bs.as_ref().to_vec()).unwrap());
if let Some(path) = output_file_path {
let mut f = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)
.await?;
f.write_all_buf(&mut bs).await?;
} else {
if output_format.is_parquet() {
Err(Error::NoOutputFileForParquet)?
}
println!("{}", String::from_utf8(bs.as_ref().to_vec()).unwrap());
}

Ok(())
}
Expand All @@ -124,7 +157,7 @@ pub struct TableConfig {
limit: u16,

/// Order by the specified fields.
#[clap(long = "order-by", short = 'o', num_args = 1, value_delimiter = ',')]
#[clap(long = "order-by", num_args = 1, value_delimiter = ',')]
order_by: Vec<String>,

/// Select specified fields from table.
Expand All @@ -134,6 +167,10 @@ pub struct TableConfig {
/// The format in which to output the query
#[clap(value_enum, long = "format", default_value = "pretty")]
output_format: Format,

/// Put the table output into `output`
#[clap(short = 'o', long = "output")]
output_file_path: Option<String>,
}

impl SystemCommandRunner {
Expand All @@ -157,6 +194,7 @@ impl SystemCommandRunner {
select,
order_by,
output_format,
output_file_path,
} = &config;

let select_expr = if !select.is_empty() {
Expand Down Expand Up @@ -185,7 +223,7 @@ impl SystemCommandRunner {

let query = clauses.join("\n");

let bs = match client
let mut bs = match client
.api_v3_query_sql(db, query)
.format(output_format.clone().into())
.send()
Expand All @@ -205,8 +243,20 @@ impl SystemCommandRunner {
}
};

println!("{}", String::from_utf8(bs.as_ref().to_vec()).unwrap());

if let Some(path) = output_file_path {
let mut f = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)
.await?;
f.write_all_buf(&mut bs).await?;
} else {
if output_format.is_parquet() {
Err(Error::NoOutputFileForParquet)?
}
println!("{}", String::from_utf8(bs.as_ref().to_vec()).unwrap());
}
Ok(())
}
}
Expand All @@ -221,25 +271,44 @@ pub struct SummaryConfig {
/// The format in which to output the query
#[clap(value_enum, long = "format", default_value = "pretty")]
output_format: Format,

/// Put the summary into `output`
#[clap(short = 'o', long = "output")]
output_file_path: Option<String>,
}

impl SystemCommandRunner {
async fn summary(&self, config: SummaryConfig) -> Result<()> {
self.summarize_all_tables(config.limit, &config.output_format)
.await?;
self.summarize_all_tables(
config.limit,
&config.output_format,
&config.output_file_path,
)
.await?;
Ok(())
}

async fn summarize_all_tables(&self, limit: u16, format: &Format) -> Result<()> {
async fn summarize_all_tables(
&self,
limit: u16,
format: &Format,
output_file_path: &Option<String>,
) -> Result<()> {
let system_tables = self.get_system_tables().await?;
for table in system_tables {
self.summarize_table(table.table_name.as_str(), limit, format)
self.summarize_table(table.table_name.as_str(), limit, format, output_file_path)
.await?;
}
Ok(())
}

async fn summarize_table(&self, table_name: &str, limit: u16, format: &Format) -> Result<()> {
async fn summarize_table(
&self,
table_name: &str,
limit: u16,
format: &Format,
output_file_path: &Option<String>,
) -> Result<()> {
let Self { db, client } = self;
let mut clauses = vec![format!("SELECT * FROM system.{table_name}")];

Expand All @@ -257,14 +326,29 @@ impl SystemCommandRunner {

let query = clauses.join("\n");

let bs = client
let mut bs = client
.api_v3_query_sql(db, query)
.format(format.clone().into())
.send()
.await?;

println!("{table_name} summary:");
println!("{}", String::from_utf8(bs.as_ref().to_vec()).unwrap());
if let Some(path) = output_file_path {
let mut f = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)
.await?;
f.write_all_buf(&mut bs).await?;
} else {
if format.is_parquet() {
Err(Error::NoOutputFileForParquet)?
}

println!("{table_name} summary:");
println!("{}", String::from_utf8(bs.as_ref().to_vec()).unwrap());
}

Ok(())
}
}
Expand Down
12 changes: 12 additions & 0 deletions influxdb3/tests/cli/mod.rs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to add a success test case that writes the parquet to a temp file, then reads it, and validates its contents.

Some helpful APIs that would enable that:

  • We use the tempfile crate for temporary files in tests
  • There are APIs for reading parquet files into Arrow RecordBatchs in the parquet crate
  • There are helpers for visually asserting on the contents of those record batches in DataFusion, e.g., assert_batches_sorted_eq

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @hiltontj,
Apologies for the late reply.
I have added test cases for validating the parquet files using the resources you provided.
Please let me know if you have any comments.

Original file line number Diff line number Diff line change
Expand Up @@ -1161,6 +1161,18 @@ async fn test_show_system() {
name: "iox schema table name exists, but should error because we're concerned here with system tables",
args: vec!["show", "system", "--host", server_addr.as_str(), "--database", db_name, "table", "cpu"],
},
FailTestCase {
name: "fail without output-file when format is parquet for table",
args: vec!["show", "system", "--host", server_addr.as_str(), "--database", db_name, "table", "--format", "parquet","distinct_caches"]
},
FailTestCase {
name: "fail without output-file when format is parquet for table-list",
args: vec!["show", "system", "--host", server_addr.as_str(), "--database", db_name, "table-list", "--format", "parquet"]
},
FailTestCase {
name: "fail without output-file when format is parquet for summary",
args: vec!["show", "system", "--host", server_addr.as_str(), "--database", db_name, "summary", "--format", "parquet"]
},
];

for case in cases {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
source: influxdb3/tests/cli/mod.rs
expression: output
---
Show command failed: must specify an output file path with `--output` parameter when formatting the output as `parquet`
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
source: influxdb3/tests/cli/mod.rs
expression: output
---
Show command failed: must specify an output file path with `--output` parameter when formatting the output as `parquet`
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
source: influxdb3/tests/cli/mod.rs
expression: output
---
Show command failed: must specify an output file path with `--output` parameter when formatting the output as `parquet`