Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 50 additions & 22 deletions cmd/iceberg/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"log"
"os"
"strconv"
"strings"

"github.com/apache/iceberg-go"
Expand All @@ -47,6 +48,7 @@ Usage:
iceberg create [options] (namespace | table) IDENTIFIER
iceberg drop [options] (namespace | table) IDENTIFIER
iceberg files [options] TABLE_ID [--history]
iceberg add-files [options] TABLE_ID FILES... [--branch TEXT] [--ignore-duplicates]
iceberg rename [options] <from> <to>
iceberg properties [options] get (namespace | table) IDENTIFIER [PROPNAME]
iceberg properties [options] set (namespace | table) IDENTIFIER PROPNAME VALUE
Expand All @@ -63,13 +65,15 @@ Commands:
location Return the location of the table.
drop Operations to drop a namespace or table.
files List all the files of the table.
add-files Add existing data files to a table.
rename Rename a table.
properties Properties on tables/namespaces.

Arguments:
PARENT Catalog parent namespace
IDENTIFIER fully qualified namespace or table
TABLE_ID full path to a table
FILES one or more data file paths to add (for add-files)
PROPNAME name of a property
VALUE value to set

Expand All @@ -92,7 +96,9 @@ Options:
--partition-spec TEXT specify partition spec as comma-separated field names(for create table use only)
Ex:"field1,field2"
--sort-order TEXT specify sort order as field:direction[:null-order] format(for create table use only)
Ex:"field1:asc,field2:desc:nulls-first,field3:asc:nulls-last"`
Ex:"field1:asc,field2:desc:nulls-first,field3:asc:nulls-last"
--branch TEXT target branch for add-files [default: main]
--ignore-duplicates allow adding files already referenced by the table`

type Config struct {
List bool `docopt:"list"`
Expand All @@ -105,6 +111,7 @@ type Config struct {
Create bool `docopt:"create"`
Drop bool `docopt:"drop"`
Files bool `docopt:"files"`
AddFiles bool `docopt:"add-files"`
Rename bool `docopt:"rename"`

Get bool `docopt:"get"`
Expand All @@ -117,27 +124,30 @@ type Config struct {
RenameFrom string `docopt:"<from>"`
RenameTo string `docopt:"<to>"`

Parent string `docopt:"PARENT"`
Ident string `docopt:"IDENTIFIER"`
TableID string `docopt:"TABLE_ID"`
PropName string `docopt:"PROPNAME"`
Value string `docopt:"VALUE"`

Catalog string `docopt:"--catalog"`
URI string `docopt:"--uri"`
Output string `docopt:"--output"`
History bool `docopt:"--history"`
Cred string `docopt:"--credential"`
Token string `docopt:"--token"`
Warehouse string `docopt:"--warehouse"`
Config string `docopt:"--config"`
Scope string `docopt:"--scope"`
Description string `docopt:"--description"`
LocationURI string `docopt:"--location-uri"`
SchemaStr string `docopt:"--schema"`
TableProps string `docopt:"--properties"`
PartitionSpec string `docopt:"--partition-spec"`
SortOrder string `docopt:"--sort-order"`
Parent string `docopt:"PARENT"`
Ident string `docopt:"IDENTIFIER"`
TableID string `docopt:"TABLE_ID"`
FilesToAdd []string `docopt:"FILES"`
PropName string `docopt:"PROPNAME"`
Value string `docopt:"VALUE"`

Catalog string `docopt:"--catalog"`
URI string `docopt:"--uri"`
Output string `docopt:"--output"`
History bool `docopt:"--history"`
Cred string `docopt:"--credential"`
Token string `docopt:"--token"`
Warehouse string `docopt:"--warehouse"`
Config string `docopt:"--config"`
Scope string `docopt:"--scope"`
Description string `docopt:"--description"`
LocationURI string `docopt:"--location-uri"`
SchemaStr string `docopt:"--schema"`
TableProps string `docopt:"--properties"`
PartitionSpec string `docopt:"--partition-spec"`
SortOrder string `docopt:"--sort-order"`
Branch string `docopt:"--branch"`
IgnoreDuplicates bool `docopt:"--ignore-duplicates"`
}

func main() {
Expand Down Expand Up @@ -343,6 +353,24 @@ func main() {
case cfg.Files:
tbl := loadTable(ctx, output, cat, cfg.TableID)
output.Files(tbl, cfg.History)
case cfg.AddFiles:
tbl := loadTable(ctx, output, cat, cfg.TableID)
txn := tbl.NewTransaction()
var opts []table.WriteOpt
if cfg.Branch != "" {
opts = append(opts, table.WithBranch(cfg.Branch))
}
err := txn.AddFiles(ctx, cfg.FilesToAdd, nil, cfg.IgnoreDuplicates, opts...)
if err != nil {
output.Error(err)
os.Exit(1)
}
_, err = txn.Commit(ctx)
if err != nil {
output.Error(err)
os.Exit(1)
}
output.Text("Added " + strconv.Itoa(len(cfg.FilesToAdd)) + " file(s) to " + cfg.TableID)
}
}

Expand Down
46 changes: 40 additions & 6 deletions cmd/iceberg/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"log"
"os"
"sort"
"strconv"
"strings"

Expand Down Expand Up @@ -106,6 +107,32 @@ func (t textOutput) DescribeTable(tbl *table.Table) {
WithData(pterm.TableData{
{"Current Snapshot", snap},
}).Render()

refsData := pterm.TableData{{"Name", "Type", "Snapshot ID"}}
type refRow struct {
name string
ref table.SnapshotRef
}
var refRows []refRow
for name, ref := range tbl.Metadata().Refs() {
refRows = append(refRows, refRow{name, ref})
}
sort.Slice(refRows, func(i, j int) bool { return refRows[i].name < refRows[j].name })
for _, r := range refRows {
refsData = append(refsData, []string{
r.name,
string(r.ref.SnapshotRefType),
strconv.FormatInt(r.ref.SnapshotID, 10),
})
}
if len(refsData) > 1 {
pterm.Println("Refs")
pterm.DefaultTable.
WithHasHeader(true).
WithHeaderRowSeparator("-").
WithData(refsData).Render()
}

pterm.DefaultTree.WithRoot(snapshotTreeNode).Render()
pterm.Println("Properties")
propTable.Render()
Expand Down Expand Up @@ -241,12 +268,18 @@ func (j jsonOutput) Identifiers(idList []table.Identifier) {

func (j jsonOutput) DescribeTable(tbl *table.Table) {
type dataType struct {
Metadata table.Metadata `json:"metadata,omitempty"`
MetadataLocation string `json:"metadata-location,omitempty"`
SortOrder table.SortOrder `json:"sort-order,omitempty"`
CurrentSnapshot *table.Snapshot `json:"current-snapshot,omitempty"`
Spec iceberg.PartitionSpec `json:"spec,omitempty"`
Schema *iceberg.Schema `json:"schema,omitempty"`
Metadata table.Metadata `json:"metadata,omitempty"`
MetadataLocation string `json:"metadata-location,omitempty"`
SortOrder table.SortOrder `json:"sort-order,omitempty"`
CurrentSnapshot *table.Snapshot `json:"current-snapshot,omitempty"`
Spec iceberg.PartitionSpec `json:"spec,omitempty"`
Schema *iceberg.Schema `json:"schema,omitempty"`
Refs map[string]table.SnapshotRef `json:"refs,omitempty"`
}

refs := make(map[string]table.SnapshotRef)
for name, ref := range tbl.Metadata().Refs() {
refs[name] = ref
}

data := dataType{
Expand All @@ -256,6 +289,7 @@ func (j jsonOutput) DescribeTable(tbl *table.Table) {
CurrentSnapshot: tbl.CurrentSnapshot(),
Spec: tbl.Spec(),
Schema: tbl.Schema(),
Refs: refs,
}
if err := json.NewEncoder(os.Stdout).Encode(data); err != nil {
j.Error(err)
Expand Down
10 changes: 8 additions & 2 deletions cmd/iceberg/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ Current Schema, id=1

Current Snapshot | append, {}: id=3055729675574597004, parent_id=3051729675574597004, schema_id=1, sequence_number=1, timestamp_ms=1555100955770, manifest_list=s3://a/b/2.avro

Refs
Name | Type | Snapshot ID
-----------------------------------
main | branch | 3055729675574597004
test | tag | 3051729675574597004

Snapshots
├──Snapshot 3051729675574597004, schema 1: s3://a/b/1.avro
└──Snapshot 3055729675574597004, schema 1: s3://a/b/2.avro
Expand Down Expand Up @@ -341,7 +347,7 @@ func Test_jsonOutput_DescribeTable(t *testing.T) {
}
}`,
},
expected: `{"metadata":{"last-sequence-number":34,"format-version":2,"table-uuid":"9c12d441-03fe-4693-9a96-a0705ddf69c1","location":"s3://bucket/test/location","last-updated-ms":1602638573590,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true}],"schema-id":0,"identifier-field-ids":[]},{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true},{"type":"long","id":2,"name":"y","required":true,"doc":"comment"},{"type":"long","id":3,"name":"z","required":true}],"schema-id":1,"identifier-field-ids":[1,2]}],"current-schema-id":1,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"name":"x","transform":"identity"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{"read.split.target.size":"134217728"},"snapshots":[{"snapshot-id":3051729675574597004,"sequence-number":0,"timestamp-ms":1515100955770,"manifest-list":"s3://a/b/1.avro","summary":{"operation":"append"},"schema-id":1},{"snapshot-id":3055729675574597004,"parent-snapshot-id":3051729675574597004,"sequence-number":1,"timestamp-ms":1555100955770,"manifest-list":"s3://a/b/2.avro","summary":{"operation":"append"},"schema-id":1}],"current-snapshot-id":3055729675574597004,"snapshot-log":[{"snapshot-id":3051729675574597004,"timestamp-ms":1515100955770},{"snapshot-id":3055729675574597004,"timestamp-ms":1555100955770}],"metadata-log":[{"metadata-file":"s3://bucket/.../v1.json","timestamp-ms":1515100}],"sort-orders":[{"order-id":3,"fields":[{"source-id":2,"transform":"identity","direction":"asc","null-order":"nulls-first"},{"source-id":3,"transform":"bucket[4]","direction":"desc","null-order":"nulls-last"}]}],"default-sort-order-id":3,"refs":{"main":{"snapshot-id":3055729675574597004,"type":"branch"},"test":{"snapshot-id":3051729675574597004,"type":"tag","max-ref-age-ms":10000000}}},"sort-order":{"order-id":3,"fields":[{"source-id":2,"transform":"identity","direction":"asc","null-order":"nulls-first"},{"source-id":3,"transform":"bucket[4]","direction":"desc","null-order":"nulls-last"}]},"current-snapshot":{"snapshot-id":3055729675574597004,"parent-snapshot-id":3051729675574597004,"sequence-number":1,"timestamp-ms":1555100955770,"manifest-list":"s3://a/b/2.avro","summary":{"operation":"append"},"schema-id":1},"spec":{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"name":"x","transform":"identity"}]},"schema":{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true},{"type":"long","id":2,"name":"y","required":true,"doc":"comment"},{"type":"long","id":3,"name":"z","required":true}],"schema-id":1,"identifier-field-ids":[1,2]}}`,
expected: `{"metadata":{"last-sequence-number":34,"format-version":2,"table-uuid":"9c12d441-03fe-4693-9a96-a0705ddf69c1","location":"s3://bucket/test/location","last-updated-ms":1602638573590,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true}],"schema-id":0,"identifier-field-ids":[]},{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true},{"type":"long","id":2,"name":"y","required":true,"doc":"comment"},{"type":"long","id":3,"name":"z","required":true}],"schema-id":1,"identifier-field-ids":[1,2]}],"current-schema-id":1,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"name":"x","transform":"identity"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{"read.split.target.size":"134217728"},"snapshots":[{"snapshot-id":3051729675574597004,"sequence-number":0,"timestamp-ms":1515100955770,"manifest-list":"s3://a/b/1.avro","summary":{"operation":"append"},"schema-id":1},{"snapshot-id":3055729675574597004,"parent-snapshot-id":3051729675574597004,"sequence-number":1,"timestamp-ms":1555100955770,"manifest-list":"s3://a/b/2.avro","summary":{"operation":"append"},"schema-id":1}],"current-snapshot-id":3055729675574597004,"snapshot-log":[{"snapshot-id":3051729675574597004,"timestamp-ms":1515100955770},{"snapshot-id":3055729675574597004,"timestamp-ms":1555100955770}],"metadata-log":[{"metadata-file":"s3://bucket/.../v1.json","timestamp-ms":1515100}],"sort-orders":[{"order-id":3,"fields":[{"source-id":2,"transform":"identity","direction":"asc","null-order":"nulls-first"},{"source-id":3,"transform":"bucket[4]","direction":"desc","null-order":"nulls-last"}]}],"default-sort-order-id":3,"refs":{"main":{"snapshot-id":3055729675574597004,"type":"branch"},"test":{"snapshot-id":3051729675574597004,"type":"tag","max-ref-age-ms":10000000}}},"sort-order":{"order-id":3,"fields":[{"source-id":2,"transform":"identity","direction":"asc","null-order":"nulls-first"},{"source-id":3,"transform":"bucket[4]","direction":"desc","null-order":"nulls-last"}]},"current-snapshot":{"snapshot-id":3055729675574597004,"parent-snapshot-id":3051729675574597004,"sequence-number":1,"timestamp-ms":1555100955770,"manifest-list":"s3://a/b/2.avro","summary":{"operation":"append"},"schema-id":1},"spec":{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"name":"x","transform":"identity"}]},"schema":{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true},{"type":"long","id":2,"name":"y","required":true,"doc":"comment"},{"type":"long","id":3,"name":"z","required":true}],"schema-id":1,"identifier-field-ids":[1,2]},"refs":{"main":{"snapshot-id":3055729675574597004,"type":"branch"},"test":{"snapshot-id":3051729675574597004,"type":"tag","max-ref-age-ms":10000000}}}`,
},
{
name: "Describe a table with empty objects",
Expand Down Expand Up @@ -381,7 +387,7 @@ func Test_jsonOutput_DescribeTable(t *testing.T) {
"refs": { }
}`,
},
expected: `{"metadata":{"last-sequence-number":0,"format-version":2,"table-uuid":"9c12d441-03fe-4693-9a96-a0705ddf69c1","location":"s3://bucket/test/location","last-updated-ms":1602638573590,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true}],"schema-id":0,"identifier-field-ids":[]},{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[]}],"default-spec-id":0,"last-partition-id":1000,"properties":{"read.split.target.size":"134217728"},"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0},"sort-order":{"order-id":0,"fields":[]},"spec":{"spec-id":0,"fields":[]},"schema":{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true}],"schema-id":0,"identifier-field-ids":[]}}`,
expected: `{"metadata":{"current-schema-id":0,"default-sort-order-id":0,"default-spec-id":0,"format-version":2,"last-column-id":3,"last-partition-id":1000,"last-sequence-number":0,"last-updated-ms":1602638573590,"location":"s3://bucket/test/location","partition-specs":[{"spec-id":0,"fields":[]}],"properties":{"read.split.target.size":"134217728"},"schemas":[{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true}],"schema-id":0,"identifier-field-ids":[]},{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true}],"schema-id":0,"identifier-field-ids":[]}],"sort-orders":[{"order-id":0,"fields":[]}],"table-uuid":"9c12d441-03fe-4693-9a96-a0705ddf69c1"},"schema":{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true}],"schema-id":0,"identifier-field-ids":[]},"sort-order":{"order-id":0,"fields":[]},"spec":{"spec-id":0,"fields":[]}}`,
},
}
for _, tt := range tests {
Expand Down
16 changes: 16 additions & 0 deletions table/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,22 @@ func (b *MetadataBuilder) currentSnapshot() *Snapshot {
return s
}

// SnapshotIDForRef returns the snapshot ID for the given ref (branch or tag name).
// For MainBranch it returns currentSnapshotID; for other refs it looks up b.refs.
// Returns nil if the ref does not exist (e.g. a new branch not yet created).
func (b *MetadataBuilder) SnapshotIDForRef(refName string) *int64 {
if refName == MainBranch {
return b.currentSnapshotID
}
if ref, ok := b.refs[refName]; ok {
id := ref.SnapshotID

return &id
}

return nil
}

func (b *MetadataBuilder) AddSchema(schema *iceberg.Schema) error {
if err := checkSchemaCompatibility(schema, b.formatVersion); err != nil {
return err
Expand Down
30 changes: 30 additions & 0 deletions table/metadata_builder_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,36 @@ func TestSetBranchSnapshotCreatesBranchIfNotExists(t *testing.T) {
require.Equal(t, int64(2), builder.updates[1].(*setSnapshotRefUpdate).SnapshotID)
}

func TestSnapshotIDForRef(t *testing.T) {
builder := builderWithoutChanges(2)
schemaID := 0
snapshot := Snapshot{
SnapshotID: 2,
ParentSnapshotID: nil,
SequenceNumber: 0,
TimestampMs: builder.base.LastUpdatedMillis(),
ManifestList: "/snap-1.avro",
Summary: &Summary{Operation: OpAppend},
SchemaID: &schemaID,
}
require.NoError(t, builder.AddSnapshot(&snapshot))
require.NoError(t, builder.SetSnapshotRef(MainBranch, 2, BranchRef))
require.NoError(t, builder.SetSnapshotRef("feature", 2, BranchRef))

// MainBranch returns currentSnapshotID
mainID := builder.SnapshotIDForRef(MainBranch)
require.NotNil(t, mainID)
require.Equal(t, int64(2), *mainID)

// Other ref returns ref's snapshot ID
featureID := builder.SnapshotIDForRef("feature")
require.NotNil(t, featureID)
require.Equal(t, int64(2), *featureID)

// Unknown ref returns nil
require.Nil(t, builder.SnapshotIDForRef("nonexistent"))
}

func TestRemoveSnapshotRemovesBranch(t *testing.T) {
builder := builderWithoutChanges(2)
schemaID := 0
Expand Down
Loading
Loading