Skip to content

Commit bc4526d

Browse files
authored
feat: add demo for registering table and planning files (#205)
Below is an output of the demo to read and plan files for a V1 table: ``` » ./demo_example /Users/gangwu/Projects/iceberg-warehouse table_v1_test /Users/gangwu/Projects/iceberg-warehouse/db/table_v1_test/metadata/v3.metadata.json Scan tasks: - /Users/gangwu/Projects/iceberg-warehouse/db/table_v1_test/data/00000-12-297cb609-fb4b-4e6c-a7ee-0a9bf62d356f-0-00001.parquet - /Users/gangwu/Projects/iceberg-warehouse/db/table_v1_test/data/00000-0-9052e5ed-641e-4f02-873f-5a87510b7451-0-00001.parquet ```
1 parent 7404f14 commit bc4526d

File tree

9 files changed

+87
-20
lines changed

9 files changed

+87
-20
lines changed

example/demo_example.cc

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,63 @@
1919

2020
#include <iostream>
2121

22+
#include "iceberg/arrow/arrow_file_io.h"
2223
#include "iceberg/avro/avro_register.h"
23-
#include "iceberg/file_reader.h"
24+
#include "iceberg/catalog/in_memory_catalog.h"
2425
#include "iceberg/parquet/parquet_register.h"
26+
#include "iceberg/table.h"
27+
#include "iceberg/table_scan.h"
28+
29+
int main(int argc, char** argv) {
30+
if (argc != 4) {
31+
std::cerr << "Usage: " << argv[0]
32+
<< " <warehouse_location> <table_name> <table_location>" << std::endl;
33+
return 0;
34+
}
35+
36+
const std::string warehouse_location = argv[1];
37+
const std::string table_name = argv[2];
38+
const std::string table_location = argv[3];
39+
const std::unordered_map<std::string, std::string> properties;
2540

26-
int main() {
2741
iceberg::avro::RegisterAll();
2842
iceberg::parquet::RegisterAll();
29-
auto open_result = iceberg::ReaderFactoryRegistry::Open(
30-
iceberg::FileFormatType::kAvro, {.path = "non-existing-file.avro"});
31-
if (!open_result.has_value()) {
32-
std::cerr << "Failed to open avro file" << std::endl;
43+
44+
auto catalog = iceberg::InMemoryCatalog::Make("test", iceberg::arrow::MakeLocalFileIO(),
45+
warehouse_location, properties);
46+
47+
auto register_result = catalog->RegisterTable({.name = table_name}, table_location);
48+
if (!register_result.has_value()) {
49+
std::cerr << "Failed to register table: " << register_result.error().message
50+
<< std::endl;
3351
return 1;
3452
}
53+
54+
auto load_result = catalog->LoadTable({.name = table_name});
55+
if (!load_result.has_value()) {
56+
std::cerr << "Failed to load table: " << load_result.error().message << std::endl;
57+
return 1;
58+
}
59+
60+
auto table = std::move(load_result.value());
61+
auto scan_result = table->NewScan()->Build();
62+
if (!scan_result.has_value()) {
63+
std::cerr << "Failed to build scan: " << scan_result.error().message << std::endl;
64+
return 1;
65+
}
66+
67+
auto scan = std::move(scan_result.value());
68+
auto plan_result = scan->PlanFiles();
69+
if (!plan_result.has_value()) {
70+
std::cerr << "Failed to plan files: " << plan_result.error().message << std::endl;
71+
return 1;
72+
}
73+
74+
std::cout << "Scan tasks: " << std::endl;
75+
auto scan_tasks = std::move(plan_result.value());
76+
for (const auto& scan_task : scan_tasks) {
77+
std::cout << " - " << scan_task->data_file()->file_path << std::endl;
78+
}
79+
3580
return 0;
3681
}

src/iceberg/avro.h renamed to src/iceberg/arrow/arrow_file_io.h

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,15 @@
1919

2020
#pragma once
2121

22-
#include <string>
22+
#include <memory>
2323

24-
#include "iceberg/iceberg_export.h"
24+
#include "iceberg/file_io.h"
25+
#include "iceberg/iceberg_bundle_export.h"
2526

26-
namespace iceberg {
27+
namespace iceberg::arrow {
2728

28-
class ICEBERG_EXPORT Avro {
29-
public:
30-
virtual ~Avro() = default;
31-
virtual std::string print() const = 0;
32-
};
29+
ICEBERG_BUNDLE_EXPORT std::unique_ptr<FileIO> MakeMockFileIO();
3330

34-
} // namespace iceberg
31+
ICEBERG_BUNDLE_EXPORT std::unique_ptr<FileIO> MakeLocalFileIO();
32+
33+
} // namespace iceberg::arrow

src/iceberg/arrow/arrow_fs_file_io.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <arrow/filesystem/mockfs.h>
2424

2525
#include "iceberg/arrow/arrow_error_transform_internal.h"
26+
#include "iceberg/arrow/arrow_file_io.h"
2627
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
2728

2829
namespace iceberg::arrow {
@@ -80,4 +81,12 @@ std::unique_ptr<FileIO> ArrowFileSystemFileIO::MakeLocalFileIO() {
8081
std::make_shared<::arrow::fs::LocalFileSystem>());
8182
}
8283

84+
std::unique_ptr<FileIO> MakeMockFileIO() {
85+
return ArrowFileSystemFileIO::MakeMockFileIO();
86+
}
87+
88+
std::unique_ptr<FileIO> MakeLocalFileIO() {
89+
return ArrowFileSystemFileIO::MakeLocalFileIO();
90+
}
91+
8392
} // namespace iceberg::arrow
File renamed without changes.

src/iceberg/avro/avro_schema_util.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@
3030
#include <avro/Types.hh>
3131
#include <avro/ValidSchema.hh>
3232

33+
#include "iceberg/avro/avro_constants.h"
3334
#include "iceberg/avro/avro_register.h"
3435
#include "iceberg/avro/avro_schema_util_internal.h"
35-
#include "iceberg/avro/constants.h"
3636
#include "iceberg/metadata_columns.h"
3737
#include "iceberg/name_mapping.h"
3838
#include "iceberg/schema.h"

src/iceberg/catalog/in_memory_catalog.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,13 @@ Result<std::string> InMemoryNamespace::GetTableMetadataLocation(
314314
return it->second;
315315
}
316316

317+
std::shared_ptr<InMemoryCatalog> InMemoryCatalog::Make(
318+
std::string const& name, std::shared_ptr<FileIO> const& file_io,
319+
std::string const& warehouse_location,
320+
std::unordered_map<std::string, std::string> const& properties) {
321+
return std::make_shared<InMemoryCatalog>(name, file_io, warehouse_location, properties);
322+
}
323+
317324
InMemoryCatalog::InMemoryCatalog(
318325
std::string const& name, std::shared_ptr<FileIO> const& file_io,
319326
std::string const& warehouse_location,

src/iceberg/catalog/in_memory_catalog.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@ class ICEBERG_EXPORT InMemoryCatalog
4444
std::unordered_map<std::string, std::string> const& properties);
4545
~InMemoryCatalog() override;
4646

47+
static std::shared_ptr<InMemoryCatalog> Make(
48+
std::string const& name, std::shared_ptr<FileIO> const& file_io,
49+
std::string const& warehouse_location,
50+
std::unordered_map<std::string, std::string> const& properties);
51+
4752
std::string_view name() const override;
4853

4954
Status CreateNamespace(

src/iceberg/json_internal.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -745,9 +745,9 @@ Result<std::unique_ptr<Snapshot>> SnapshotFromJson(const nlohmann::json& json) {
745745
std::unordered_map<std::string, std::string> summary;
746746
if (summary_json.has_value()) {
747747
for (const auto& [key, value] : summary_json->items()) {
748-
if (!kValidSnapshotSummaryFields.contains(key)) {
749-
return JsonParseError("Invalid snapshot summary field: {}", key);
750-
}
748+
// if (!kValidSnapshotSummaryFields.contains(key)) {
749+
// return JsonParseError("Invalid snapshot summary field: {}", key);
750+
// }
751751
if (!value.is_string()) {
752752
return JsonParseError("Invalid snapshot summary field value: {}",
753753
SafeDumpJson(value));

test/json_internal_test.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,9 @@ TEST(JsonInternalTest, Snapshot) {
225225
TestJsonConversion(snapshot, expected_json);
226226
}
227227

228-
TEST(JsonInternalTest, SnapshotFromJsonWithInvalidSummary) {
228+
// FIXME: disable it for now since Iceberg Spark plugin generates
229+
// custom summary keys.
230+
TEST(JsonInternalTest, DISABLED_SnapshotFromJsonWithInvalidSummary) {
229231
nlohmann::json invalid_json =
230232
R"({"snapshot-id":1234567890,
231233
"parent-snapshot-id":9876543210,

0 commit comments

Comments
 (0)