Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 51 additions & 6 deletions example/demo_example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,63 @@

#include <iostream>

#include "iceberg/arrow/arrow_file_io.h"
#include "iceberg/avro/avro_register.h"
#include "iceberg/file_reader.h"
#include "iceberg/catalog/in_memory_catalog.h"
#include "iceberg/parquet/parquet_register.h"
#include "iceberg/table.h"
#include "iceberg/table_scan.h"

int main(int argc, char** argv) {
if (argc != 4) {
std::cerr << "Usage: " << argv[0]
<< " <warehouse_location> <table_name> <table_location>" << std::endl;
return 0;
}

const std::string warehouse_location = argv[1];
const std::string table_name = argv[2];
const std::string table_location = argv[3];
const std::unordered_map<std::string, std::string> properties;

int main() {
iceberg::avro::RegisterAll();
iceberg::parquet::RegisterAll();
auto open_result = iceberg::ReaderFactoryRegistry::Open(
iceberg::FileFormatType::kAvro, {.path = "non-existing-file.avro"});
if (!open_result.has_value()) {
std::cerr << "Failed to open avro file" << std::endl;

auto catalog = iceberg::InMemoryCatalog::Make("test", iceberg::arrow::MakeLocalFileIO(),
warehouse_location, properties);

auto register_result = catalog->RegisterTable({.name = table_name}, table_location);
if (!register_result.has_value()) {
std::cerr << "Failed to register table: " << register_result.error().message
<< std::endl;
return 1;
}

auto load_result = catalog->LoadTable({.name = table_name});
if (!load_result.has_value()) {
std::cerr << "Failed to load table: " << load_result.error().message << std::endl;
return 1;
}

auto table = std::move(load_result.value());
auto scan_result = table->NewScan()->Build();
if (!scan_result.has_value()) {
std::cerr << "Failed to build scan: " << scan_result.error().message << std::endl;
return 1;
}

auto scan = std::move(scan_result.value());
auto plan_result = scan->PlanFiles();
if (!plan_result.has_value()) {
std::cerr << "Failed to plan files: " << plan_result.error().message << std::endl;
return 1;
}

std::cout << "Scan tasks: " << std::endl;
auto scan_tasks = std::move(plan_result.value());
for (const auto& scan_task : scan_tasks) {
std::cout << " - " << scan_task->data_file()->file_path << std::endl;
}

return 0;
}
17 changes: 8 additions & 9 deletions src/iceberg/avro.h → src/iceberg/arrow/arrow_file_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@

#pragma once

#include <string>
#include <memory>

#include "iceberg/iceberg_export.h"
#include "iceberg/file_io.h"
#include "iceberg/iceberg_bundle_export.h"

namespace iceberg {
namespace iceberg::arrow {

class ICEBERG_EXPORT Avro {
public:
virtual ~Avro() = default;
virtual std::string print() const = 0;
};
ICEBERG_BUNDLE_EXPORT std::unique_ptr<FileIO> MakeMockFileIO();

} // namespace iceberg
ICEBERG_BUNDLE_EXPORT std::unique_ptr<FileIO> MakeLocalFileIO();

} // namespace iceberg::arrow
9 changes: 9 additions & 0 deletions src/iceberg/arrow/arrow_fs_file_io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <arrow/filesystem/mockfs.h>

#include "iceberg/arrow/arrow_error_transform_internal.h"
#include "iceberg/arrow/arrow_file_io.h"
#include "iceberg/arrow/arrow_fs_file_io_internal.h"

namespace iceberg::arrow {
Expand Down Expand Up @@ -80,4 +81,12 @@ std::unique_ptr<FileIO> ArrowFileSystemFileIO::MakeLocalFileIO() {
std::make_shared<::arrow::fs::LocalFileSystem>());
}

std::unique_ptr<FileIO> MakeMockFileIO() {
return ArrowFileSystemFileIO::MakeMockFileIO();
}

std::unique_ptr<FileIO> MakeLocalFileIO() {
return ArrowFileSystemFileIO::MakeLocalFileIO();
}

} // namespace iceberg::arrow
File renamed without changes.
2 changes: 1 addition & 1 deletion src/iceberg/avro/avro_schema_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
#include <avro/Types.hh>
#include <avro/ValidSchema.hh>

#include "iceberg/avro/avro_constants.h"
#include "iceberg/avro/avro_register.h"
#include "iceberg/avro/avro_schema_util_internal.h"
#include "iceberg/avro/constants.h"
#include "iceberg/metadata_columns.h"
#include "iceberg/name_mapping.h"
#include "iceberg/schema.h"
Expand Down
7 changes: 7 additions & 0 deletions src/iceberg/catalog/in_memory_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,13 @@ Result<std::string> InMemoryNamespace::GetTableMetadataLocation(
return it->second;
}

std::shared_ptr<InMemoryCatalog> InMemoryCatalog::Make(
std::string const& name, std::shared_ptr<FileIO> const& file_io,
std::string const& warehouse_location,
std::unordered_map<std::string, std::string> const& properties) {
return std::make_shared<InMemoryCatalog>(name, file_io, warehouse_location, properties);
}

InMemoryCatalog::InMemoryCatalog(
std::string const& name, std::shared_ptr<FileIO> const& file_io,
std::string const& warehouse_location,
Expand Down
5 changes: 5 additions & 0 deletions src/iceberg/catalog/in_memory_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ class ICEBERG_EXPORT InMemoryCatalog
std::unordered_map<std::string, std::string> const& properties);
~InMemoryCatalog() override;

static std::shared_ptr<InMemoryCatalog> Make(
std::string const& name, std::shared_ptr<FileIO> const& file_io,
std::string const& warehouse_location,
std::unordered_map<std::string, std::string> const& properties);

std::string_view name() const override;

Status CreateNamespace(
Expand Down
6 changes: 3 additions & 3 deletions src/iceberg/json_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -745,9 +745,9 @@ Result<std::unique_ptr<Snapshot>> SnapshotFromJson(const nlohmann::json& json) {
std::unordered_map<std::string, std::string> summary;
if (summary_json.has_value()) {
for (const auto& [key, value] : summary_json->items()) {
if (!kValidSnapshotSummaryFields.contains(key)) {
return JsonParseError("Invalid snapshot summary field: {}", key);
}
// if (!kValidSnapshotSummaryFields.contains(key)) {
// return JsonParseError("Invalid snapshot summary field: {}", key);
// }
if (!value.is_string()) {
return JsonParseError("Invalid snapshot summary field value: {}",
SafeDumpJson(value));
Expand Down
4 changes: 3 additions & 1 deletion test/json_internal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,9 @@ TEST(JsonInternalTest, Snapshot) {
TestJsonConversion(snapshot, expected_json);
}

TEST(JsonInternalTest, SnapshotFromJsonWithInvalidSummary) {
// FIXME: disable it for now since Iceberg Spark plugin generates
// custom summary keys.
TEST(JsonInternalTest, DISABLED_SnapshotFromJsonWithInvalidSummary) {
nlohmann::json invalid_json =
R"({"snapshot-id":1234567890,
"parent-snapshot-id":9876543210,
Expand Down
Loading