Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/util/arrow/row_batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ Status convert_to_arrow_type(const vectorized::DataTypePtr& origin_type,
case TYPE_DATEV2:
*result = std::make_shared<arrow::Date32Type>();
break;
// TODO: maybe need to distinguish TYPE_DATETIME and TYPE_TIMESTAMPTZ
case TYPE_TIMESTAMPTZ:
case TYPE_DATETIMEV2:
if (type->get_scale() > 3) {
*result = std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO, timezone);
Expand Down
65 changes: 65 additions & 0 deletions be/src/vec/data_types/serde/data_type_timestamptz_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "data_type_timestamptz_serde.h"

#include <arrow/builder.h>

#include "runtime/primitive_type.h"
#include "vec/functions/cast/cast_parameters.h"
#include "vec/functions/cast/cast_to_timestamptz.h"
Expand Down Expand Up @@ -174,4 +176,67 @@ Status DataTypeTimeStampTzSerDe::write_column_to_mysql_binary(const IColumn& col
return Status::OK();
}

Status DataTypeTimeStampTzSerDe::write_column_to_arrow(const IColumn& column,
const NullMap* null_map,
arrow::ArrayBuilder* array_builder,
int64_t start, int64_t end,
const cctz::time_zone& ctz) const {
const auto& col_data = assert_cast<const ColumnTimeStampTz&>(column).get_data();
auto& timestamp_builder = assert_cast<arrow::TimestampBuilder&>(*array_builder);
std::shared_ptr<arrow::TimestampType> timestamp_type =
std::static_pointer_cast<arrow::TimestampType>(array_builder->type());
static const auto& utc = cctz::utc_time_zone();
for (size_t i = start; i < end; ++i) {
if (null_map && (*null_map)[i]) {
RETURN_IF_ERROR(checkArrowStatus(timestamp_builder.AppendNull(), column.get_name(),
array_builder->type()->name()));
} else {
int64_t timestamp = 0;
DateV2Value<DateTimeV2ValueType> datetime_val =
binary_cast<UInt64, DateV2Value<DateTimeV2ValueType>>(col_data[i]);
datetime_val.unix_timestamp(&timestamp, utc);

if (_scale > 3) {
uint32_t microsecond = datetime_val.microsecond();
timestamp = (timestamp * 1000000) + microsecond;
} else if (_scale > 0) {
uint32_t millisecond = datetime_val.microsecond() / 1000;
timestamp = (timestamp * 1000) + millisecond;
}
RETURN_IF_ERROR(checkArrowStatus(timestamp_builder.Append(timestamp), column.get_name(),
array_builder->type()->name()));
}
}
return Status::OK();
}

Status DataTypeTimeStampTzSerDe::write_column_to_orc(const std::string& timezone,
const IColumn& column, const NullMap* null_map,
orc::ColumnVectorBatch* orc_col_batch,
int64_t start, int64_t end,
vectorized::Arena& arena,
const FormatOptions& options) const {
const auto& col_data = assert_cast<const ColumnTimeStampTz&>(column).get_data();
auto* cur_batch = dynamic_cast<orc::TimestampVectorBatch*>(orc_col_batch);
static const int64_t micro_to_nano_second = 1000;
static const auto& utc = cctz::utc_time_zone();
for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 0) {
continue;
}

int64_t timestamp = 0;
DateV2Value<DateTimeV2ValueType> datetime_val =
binary_cast<UInt64, DateV2Value<DateTimeV2ValueType>>(col_data[row_id]);
if (!datetime_val.unix_timestamp(&timestamp, utc)) {
return Status::InternalError("get unix timestamp error.");
}

cur_batch->data[row_id] = timestamp;
cur_batch->nanoseconds[row_id] = datetime_val.microsecond() * micro_to_nano_second;
}
cur_batch->numElements = end - start;
return Status::OK();
}

} // namespace doris::vectorized
9 changes: 9 additions & 0 deletions be/src/vec/data_types/serde/data_type_timestamptz_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ class DataTypeTimeStampTzSerDe : public DataTypeNumberSerDe<PrimitiveType::TYPE_
const FormatOptions& options) const override;
int get_scale() const override { return _scale; }

Status write_column_to_arrow(const IColumn& column, const NullMap* null_map,
arrow::ArrayBuilder* array_builder, int64_t start, int64_t end,
const cctz::time_zone& ctz) const override;

Status write_column_to_orc(const std::string& timezone, const IColumn& column,
const NullMap* null_map, orc::ColumnVectorBatch* orc_col_batch,
int64_t start, int64_t end, vectorized::Arena& arena,
const FormatOptions& options) const override;

private:
const UInt32 _scale = 6;
};
Expand Down
10 changes: 9 additions & 1 deletion be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1493,6 +1493,11 @@ DataTypePtr OrcReader::convert_to_doris_type(const orc::Type* orc_type) {
return DataTypeFactory::instance().create_data_type(
PrimitiveType::TYPE_CHAR, true, 0, 0, cast_set<int>(orc_type->getMaximumLength()));
case orc::TypeKind::TIMESTAMP_INSTANT:
if (_scan_params.__isset.enable_mapping_timestamp_tz &&
_scan_params.enable_mapping_timestamp_tz) {
return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_TIMESTAMPTZ,
true, 0, 6);
}
return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_DATETIMEV2, true, 0,
6);
case orc::TypeKind::LIST: {
Expand Down Expand Up @@ -1853,6 +1858,8 @@ Status OrcReader::_fill_doris_data_column(const std::string& col_name,
// : std::make_unique<StringVectorBatch>(capacity, memoryPool);
return _decode_string_column<is_filter>(col_name, data_column, orc_column_type->getKind(),
cvb, num_values);
case PrimitiveType::TYPE_TIMESTAMPTZ:
return _decode_timestamp_tz_column<is_filter>(col_name, data_column, cvb, num_values);
case PrimitiveType::TYPE_ARRAY: {
if (orc_column_type->getKind() != orc::TypeKind::LIST) {
return Status::InternalError(
Expand Down Expand Up @@ -2054,7 +2061,8 @@ Status OrcReader::_fill_doris_data_column(const std::string& col_name,
default:
break;
}
return Status::InternalError("Unsupported type for column '{}'", col_name);
return Status::InternalError("Unsupported type {} for column '{}'", data_type->get_name(),
col_name);
}

template <bool is_filter>
Expand Down
35 changes: 35 additions & 0 deletions be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "orc/Type.hh"
#include "orc/Vector.hh"
#include "orc/sargs/Literal.hh"
#include "runtime/primitive_type.h"
#include "util/runtime_profile.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/columns/column_array.h"
Expand Down Expand Up @@ -552,6 +553,40 @@ class OrcReader : public GenericReader {
return Status::OK();
}

template <bool is_filter>
Status _decode_timestamp_tz_column(const std::string& col_name,
const MutableColumnPtr& data_column,
const orc::ColumnVectorBatch* cvb, size_t num_values) {
SCOPED_RAW_TIMER(&_statistics.decode_value_time);
const auto* data = dynamic_cast<const orc::TimestampVectorBatch*>(cvb);
if (data == nullptr) {
return Status::InternalError(
"Wrong data type for timestamp_tz column '{}', expected {}", col_name,
cvb->toString());
}
auto& column_data = assert_cast<ColumnTimeStampTz&>(*data_column).get_data();
auto origin_size = column_data.size();
column_data.resize(origin_size + num_values);
UInt8* __restrict filter_data;
if constexpr (is_filter) {
filter_data = _filter->data();
}
static const cctz::time_zone utc_time_zone = cctz::utc_time_zone();
for (int i = 0; i < num_values; ++i) {
auto& v = reinterpret_cast<DateV2Value<DateTimeV2ValueType>&>(
column_data[origin_size + i]);
if constexpr (is_filter) {
if (!filter_data[i]) {
continue;
}
}
v.from_unixtime(data->data[i], utc_time_zone);
// nanoseconds will lose precision. only keep microseconds.
v.set_microsecond(data->nanoseconds[i] / 1000);
}
return Status::OK();
}

template <bool is_filter>
Status _decode_string_column(const std::string& col_name, const MutableColumnPtr& data_column,
const orc::TypeKind& type_kind, const orc::ColumnVectorBatch* cvb,
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/exec/format/parquet/parquet_column_convert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,10 @@ std::unique_ptr<PhysicalToLogicalConverter> PhysicalToLogicalConverter::get_conv
DCHECK(src_physical_type == tparquet::Type::BYTE_ARRAY) << src_physical_type;
physical_converter = std::make_unique<ConsistentPhysicalConverter>();
}
} else if (src_logical_primitive == TYPE_TIMESTAMPTZ) {
DCHECK(src_physical_type == tparquet::Type::INT64) << src_physical_type;
DCHECK(parquet_schema.logicalType.__isset.TIMESTAMP) << parquet_schema.name;
physical_converter = std::make_unique<Int64ToTimestampTz>();
} else {
physical_converter =
std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
Expand Down
25 changes: 25 additions & 0 deletions be/src/vec/exec/format/parquet/parquet_column_convert.h
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,31 @@ struct Int64ToTimestamp : public PhysicalToLogicalConverter {
}
};

struct Int64ToTimestampTz : public PhysicalToLogicalConverter {
Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override {
ColumnPtr src_col = remove_nullable(src_physical_col);
MutableColumnPtr dst_col = remove_nullable(src_logical_column)->assume_mutable();

size_t rows = src_col->size();
size_t start_idx = dst_col->size();
dst_col->resize(start_idx + rows);

const auto& src_data = assert_cast<const ColumnInt64*>(src_col.get())->get_data();
auto& dest_data = assert_cast<ColumnTimeStampTz*>(dst_col.get())->get_data();
static const cctz::time_zone utc = cctz::utc_time_zone();

for (int i = 0; i < rows; i++) {
int64_t x = src_data[i];
auto& num = dest_data[start_idx + i];
auto& value = reinterpret_cast<DateV2Value<DateTimeV2ValueType>&>(num);
value.from_unixtime(x / _convert_params->second_mask, utc);
value.set_microsecond((x % _convert_params->second_mask) *
(_convert_params->scale_to_nano_factor / 1000));
}
return Status::OK();
}
};

struct Int96toTimestamp : public PhysicalToLogicalConverter {
Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override {
ColumnPtr src_col = remove_nullable(src_physical_col);
Expand Down
6 changes: 4 additions & 2 deletions be/src/vec/exec/format/parquet/parquet_thrift_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ constexpr size_t INIT_META_SIZE = 48 * 1024; // 48k

static Status parse_thrift_footer(io::FileReaderSPtr file,
std::unique_ptr<FileMetaData>* file_metadata, size_t* meta_size,
io::IOContext* io_ctx, const bool enable_mapping_varbinary) {
io::IOContext* io_ctx, const bool enable_mapping_varbinary,
const bool enable_mapping_timestamp_tz) {
size_t file_size = file->size();
size_t bytes_read = std::min(file_size, INIT_META_SIZE);
std::vector<uint8_t> footer(bytes_read);
Expand Down Expand Up @@ -81,7 +82,8 @@ static Status parse_thrift_footer(io::FileReaderSPtr file,
// deserialize footer
RETURN_IF_ERROR(deserialize_thrift_msg(meta_ptr, &metadata_size, true, &t_metadata));
*file_metadata = std::make_unique<FileMetaData>(t_metadata, metadata_size);
RETURN_IF_ERROR((*file_metadata)->init_schema(enable_mapping_varbinary));
RETURN_IF_ERROR(
(*file_metadata)->init_schema(enable_mapping_varbinary, enable_mapping_timestamp_tz));
*meta_size = PARQUET_FOOTER_SIZE + metadata_size;
return Status::OK();
}
Expand Down
9 changes: 9 additions & 0 deletions be/src/vec/exec/format/parquet/schema_desc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,15 @@ std::pair<DataTypePtr, bool> FieldDescriptor::convert_to_doris_type(
} else if (logicalType.__isset.TIME) {
ans.first = DataTypeFactory::instance().create_data_type(TYPE_TIMEV2, nullable);
} else if (logicalType.__isset.TIMESTAMP) {
if (_enable_mapping_timestamp_tz) {
if (logicalType.TIMESTAMP.isAdjustedToUTC) {
// treat TIMESTAMP with isAdjustedToUTC as TIMESTAMPTZ
ans.first = DataTypeFactory::instance().create_data_type(
TYPE_TIMESTAMPTZ, nullable, 0,
logicalType.TIMESTAMP.unit.__isset.MILLIS ? 3 : 6);
return ans;
}
}
ans.first = DataTypeFactory::instance().create_data_type(
TYPE_DATETIMEV2, nullable, 0, logicalType.TIMESTAMP.unit.__isset.MILLIS ? 3 : 6);
} else if (logicalType.__isset.JSON) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/format/parquet/schema_desc.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class FieldDescriptor {
size_t _next_schema_pos;
// useful for parse_node_field to decide whether to convert byte_array to VARBINARY type
bool _enable_mapping_varbinary = false;
bool _enable_mapping_timestamp_tz = false;

private:
void parse_physical_field(const tparquet::SchemaElement& physical_schema, bool is_nullable,
Expand Down Expand Up @@ -164,6 +165,7 @@ class FieldDescriptor {

const FieldSchema* find_column_by_id(uint64_t column_id) const;
void set_enable_mapping_varbinary(bool enable) { _enable_mapping_varbinary = enable; }
void set_enable_mapping_timestamp_tz(bool enable) { _enable_mapping_timestamp_tz = enable; }
};
#include "common/compile_check_end.h"

Expand Down
4 changes: 3 additions & 1 deletion be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ FileMetaData::~FileMetaData() {
ExecEnv::GetInstance()->parquet_meta_tracker()->release(_mem_size);
}

Status FileMetaData::init_schema(const bool enable_mapping_varbinary) {
Status FileMetaData::init_schema(const bool enable_mapping_varbinary,
bool enable_mapping_timestamp_tz) {
if (_metadata.schema[0].num_children <= 0) {
return Status::Corruption("Invalid parquet schema");
}
_schema.set_enable_mapping_varbinary(enable_mapping_varbinary);
_schema.set_enable_mapping_timestamp_tz(enable_mapping_timestamp_tz);
return _schema.parse_from_thrift(_metadata.schema);
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/format/parquet/vparquet_file_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class FileMetaData {
public:
FileMetaData(tparquet::FileMetaData& metadata, size_t mem_size);
~FileMetaData();
Status init_schema(const bool enable_mapping_varbinary);
Status init_schema(const bool enable_mapping_varbinary, const bool enable_mapping_timestamp_tz);
const FieldDescriptor& schema() const { return _schema; }
FieldDescriptor& schema() { return _schema; }
const tparquet::FileMetaData& to_thrift() const;
Expand Down
9 changes: 7 additions & 2 deletions be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,14 @@ Status ParquetReader::_open_file() {
bool enable_mapping_varbinary = _scan_params.__isset.enable_mapping_varbinary
? _scan_params.enable_mapping_varbinary
: false;
bool enable_mapping_timestamp_tz = _scan_params.__isset.enable_mapping_timestamp_tz
? _scan_params.enable_mapping_timestamp_tz
: false;
if (_meta_cache == nullptr) {
// wrap _file_metadata with unique ptr, so that it can be released finally.
RETURN_IF_ERROR(parse_thrift_footer(_tracing_file_reader, &_file_metadata_ptr,
&meta_size, _io_ctx, enable_mapping_varbinary));
&meta_size, _io_ctx, enable_mapping_varbinary,
enable_mapping_timestamp_tz));
_file_metadata = _file_metadata_ptr.get();
// parse magic number & parse meta data
_reader_statistics.file_footer_read_calls += 1;
Expand All @@ -273,7 +277,8 @@ Status ParquetReader::_open_file() {
FileMetaCache::get_key(_tracing_file_reader, _file_description);
if (!_meta_cache->lookup(file_meta_cache_key, &_meta_cache_handle)) {
RETURN_IF_ERROR(parse_thrift_footer(_tracing_file_reader, &_file_metadata_ptr,
&meta_size, _io_ctx, enable_mapping_varbinary));
&meta_size, _io_ctx, enable_mapping_varbinary,
enable_mapping_timestamp_tz));
// _file_metadata_ptr.release() : move control of _file_metadata to _meta_cache_handle
_meta_cache->insert(file_meta_cache_key, _file_metadata_ptr.release(),
&_meta_cache_handle);
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/exec/format/table/parquet_metadata_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,8 @@ Status ParquetMetadataReader::_append_file_rows(const std::string& path,
std::unique_ptr<FileMetaData> file_metadata;
size_t meta_size = 0;
io::IOContext io_ctx;
RETURN_IF_ERROR(parse_thrift_footer(file_reader, &file_metadata, &meta_size, &io_ctx, false));
RETURN_IF_ERROR(
parse_thrift_footer(file_reader, &file_metadata, &meta_size, &io_ctx, true, true));

if (_mode_handler == nullptr) {
return Status::InternalError(
Expand Down
8 changes: 8 additions & 0 deletions be/src/vec/exec/jni_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,10 @@ std::string JniConnector::get_jni_type(const DataTypePtr& data_type) {
buffer << "datetimev2(" << type->get_scale() << ")";
return buffer.str();
}
case TYPE_TIMESTAMPTZ: {
buffer << "timestamptz(" << type->get_scale() << ")";
return buffer.str();
}
case TYPE_BINARY:
return "binary";
case TYPE_DECIMALV2: {
Expand Down Expand Up @@ -588,6 +592,10 @@ std::string JniConnector::get_jni_type_with_different_string(const DataTypePtr&
buffer << "datetimev2(" << data_type->get_scale() << ")";
return buffer.str();
}
case TYPE_TIMESTAMPTZ: {
buffer << "timestamptz(" << data_type->get_scale() << ")";
return buffer.str();
}
case TYPE_BINARY:
return "binary";
case TYPE_CHAR: {
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/runtime/vorc_transformer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,10 @@ std::unique_ptr<orc::Type> VOrcTransformer::_build_orc_type(
type = orc::createPrimitiveType(orc::BINARY);
break;
}
case TYPE_TIMESTAMPTZ: {
type = orc::createPrimitiveType(orc::TIMESTAMP_INSTANT);
break;
}
case TYPE_STRUCT: {
type = orc::createStructType();
const auto* type_struct =
Expand Down
2 changes: 1 addition & 1 deletion be/test/vec/exec/format/parquet/parquet_expr_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ class ParquetExprTest : public testing::Test {

size_t meta_size;
static_cast<void>(parse_thrift_footer(p_reader->_file_reader, &doris_file_metadata,
&meta_size, nullptr, false));
&meta_size, nullptr, false, false));
doris_metadata = doris_file_metadata->to_thrift();

p_reader->_ctz = &ctz;
Expand Down
Loading
Loading