Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ make lint-install
| Check Namespace Exists | X | X | X | X |
| Drop Namespace | X | X | X | X |
| Update Namespace Properties | X | X | X | X |
| Create View | X | | | X |
| Create View | X | X | | X |
| Load View | | X | | X |
| List View | X | X | | X |
| Drop View | X | X | | X |
Expand Down
117 changes: 117 additions & 0 deletions catalog/hive/hive.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,123 @@ func (c *Catalog) CreateTable(ctx context.Context, identifier table.Identifier,
return c.LoadTable(ctx, identifier)
}

// CreateView creates a new view in the catalog. It uses the same signature as the REST catalog:
// identifier, version (with SQL representations), schema, and optional CreateViewOpt for location and properties.
// Returns the created *view.View, or an error if the namespace is missing, a view/table already exists, or creation fails.
func (c *Catalog) CreateView(ctx context.Context, identifier table.Identifier, version *view.Version, schema *iceberg.Schema, opts ...catalog.CreateViewOpt) (*view.View, error) {
database, viewName, err := identifierToTableName(identifier)
if err != nil {
return nil, err
}

cfg := catalog.NewCreateViewCfg()
for _, opt := range opts {
opt(&cfg)
}

exists, err := c.CheckNamespaceExists(ctx, DatabaseIdentifier(database))
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace, database)
}

viewExists, err := c.CheckViewExists(ctx, identifier)
if err != nil {
return nil, err
}
if viewExists {
return nil, fmt.Errorf("%w: %s.%s", catalog.ErrViewAlreadyExists, database, viewName)
}

tableExists, err := c.CheckTableExists(ctx, identifier)
if err != nil {
return nil, err
}
if tableExists {
return nil, fmt.Errorf("%w: %s.%s", catalog.ErrTableAlreadyExists, database, viewName)
}

loc := strings.TrimSuffix(cfg.Location, "/")
if loc == "" {
var err error
loc, err = internal.ResolveTableLocation(ctx, "", database, viewName, c.opts.props, c.LoadNamespaceProperties)
if err != nil {
return nil, err
}
}

freshSchema, err := iceberg.AssignFreshSchemaIDs(schema, nil)
if err != nil {
return nil, err
}

viewSQL, err := sqlFromVersion(version)
if err != nil {
return nil, err
}

defaultNS := catalog.NamespaceFromIdent(identifier)
catalogName := "hive"

// Merge catalog props with view props so io.LoadFS can resolve and view metadata gets user properties.
props := make(iceberg.Properties)
if c.opts.props != nil {
maps.Copy(props, c.opts.props)
}
if cfg.Properties != nil {
maps.Copy(props, cfg.Properties)
}

createdView, err := view.CreateView(ctx, catalogName, identifier, freshSchema, viewSQL, defaultNS, loc, props)
if err != nil {
return nil, err
}

viewProps := make(map[string]string)
if cfg.Properties != nil {
for k, v := range cfg.Properties {
viewProps[k] = v
}
}

hiveTbl := constructHiveViewTable(database, viewName, loc, createdView.MetadataLocation(), freshSchema, viewSQL, viewProps)
if err := c.client.CreateTable(ctx, hiveTbl); err != nil {
if isAlreadyExistsError(err) {
return nil, fmt.Errorf("%w: %s.%s", catalog.ErrViewAlreadyExists, database, viewName)
}

return nil, fmt.Errorf("failed to create view %s.%s: %w", database, viewName, err)
}

return createdView, nil
}

// sqlFromVersion returns the SQL from the first SQL representation in version, preferring dialect "hive".
func sqlFromVersion(v *view.Version) (string, error) {
if v == nil || len(v.Representations) == 0 {
return "", errors.New("view version has no representations")
}
var fallback string
for _, r := range v.Representations {
if r.Type != "sql" {
continue
}
if strings.EqualFold(r.Dialect, "hive") {
return r.Sql, nil
}
if fallback == "" {
fallback = r.Sql
}
}
if fallback != "" {
return fallback, nil
}

return "", errors.New("view version has no SQL representation")
}

func (c *Catalog) DropTable(ctx context.Context, identifier table.Identifier) error {
database, tableName, err := identifierToTableName(identifier)
if err != nil {
Expand Down
147 changes: 147 additions & 0 deletions catalog/hive/hive_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (

"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/catalog"
"github.com/apache/iceberg-go/table"
"github.com/apache/iceberg-go/view"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -392,3 +394,148 @@ func TestHiveIntegrationDropViewNoSuchView(t *testing.T) {
assert.Error(err)
assert.True(errors.Is(err, catalog.ErrNoSuchView))
}

func TestHiveIntegrationCreateView(t *testing.T) {
assert := require.New(t)

cat := createTestCatalog(t)
defer cat.Close()

dbName := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
viewName := "test_view"

err := cat.CreateNamespace(context.TODO(), DatabaseIdentifier(dbName), iceberg.Properties{
"location": getTestTableLocation() + "/" + dbName,
})
assert.NoError(err)
defer cat.DropNamespace(context.TODO(), DatabaseIdentifier(dbName))

schema := iceberg.NewSchema(1, iceberg.NestedField{ID: 1, Name: "col", Type: iceberg.PrimitiveTypes.Int32, Required: true})
viewSQL := "SELECT 1 AS col"
ver, err := view.NewVersionFromSQL(1, 0, viewSQL, table.Identifier{dbName})
assert.NoError(err)

viewLocation := getTestTableLocation() + "/" + dbName + "/" + viewName
v, err := cat.CreateView(context.TODO(), TableIdentifier(dbName, viewName), ver, schema,
catalog.WithViewLocation(viewLocation),
)
assert.NoError(err)
assert.NotNil(v)
defer cat.DropView(context.TODO(), TableIdentifier(dbName, viewName))

exists, err := cat.CheckViewExists(context.TODO(), TableIdentifier(dbName, viewName))
assert.NoError(err)
assert.True(exists)

loaded, err := cat.LoadView(context.TODO(), TableIdentifier(dbName, viewName))
assert.NoError(err)
assert.NotNil(loaded)
assert.True(schema.Equals(loaded.CurrentSchema()))
assert.Len(loaded.CurrentVersion().Representations, 1)
assert.Equal("sql", loaded.CurrentVersion().Representations[0].Type)
assert.Equal(viewSQL, loaded.CurrentVersion().Representations[0].Sql)
}

func TestHiveIntegrationCreateViewThenDrop(t *testing.T) {
assert := require.New(t)

cat := createTestCatalog(t)
defer cat.Close()

dbName := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
viewName := "view_to_drop"

err := cat.CreateNamespace(context.TODO(), DatabaseIdentifier(dbName), iceberg.Properties{
"location": getTestTableLocation() + "/" + dbName,
})
assert.NoError(err)
defer cat.DropNamespace(context.TODO(), DatabaseIdentifier(dbName))

schema := iceberg.NewSchema(1, iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int64, Required: true})
ver, _ := view.NewVersionFromSQL(1, 0, "SELECT 1 AS id", table.Identifier{dbName})

viewLocation := getTestTableLocation() + "/" + dbName + "/" + viewName
_, err = cat.CreateView(context.TODO(), TableIdentifier(dbName, viewName), ver, schema,
catalog.WithViewLocation(viewLocation),
)
assert.NoError(err)

exists, err := cat.CheckViewExists(context.TODO(), TableIdentifier(dbName, viewName))
assert.NoError(err)
assert.True(exists)

err = cat.DropView(context.TODO(), TableIdentifier(dbName, viewName))
assert.NoError(err)

exists, err = cat.CheckViewExists(context.TODO(), TableIdentifier(dbName, viewName))
assert.NoError(err)
assert.False(exists)

_, err = cat.LoadView(context.TODO(), TableIdentifier(dbName, viewName))
assert.Error(err)
assert.True(errors.Is(err, catalog.ErrNoSuchView))
}

func TestHiveIntegrationCreateView_TableConflict(t *testing.T) {
assert := require.New(t)

cat := createTestCatalog(t)
defer cat.Close()

dbName := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
tableName := "t1"

err := cat.CreateNamespace(context.TODO(), DatabaseIdentifier(dbName), iceberg.Properties{
"location": getTestTableLocation() + "/" + dbName,
})
assert.NoError(err)
defer cat.DropNamespace(context.TODO(), DatabaseIdentifier(dbName))

schema := iceberg.NewSchema(1, iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int64, Required: true})
tableLocation := getTestTableLocation() + "/" + dbName + "/" + tableName
_, err = cat.CreateTable(context.TODO(), TableIdentifier(dbName, tableName), schema,
catalog.WithLocation(tableLocation),
)
assert.NoError(err)
defer cat.DropTable(context.TODO(), TableIdentifier(dbName, tableName))

ver, _ := view.NewVersionFromSQL(1, 0, "SELECT * FROM t1", table.Identifier{dbName})
viewLocation := getTestTableLocation() + "/" + dbName + "/" + tableName + "_view"
_, err = cat.CreateView(context.TODO(), TableIdentifier(dbName, tableName), ver, schema,
catalog.WithViewLocation(viewLocation),
)
assert.Error(err)
assert.True(errors.Is(err, catalog.ErrTableAlreadyExists))
}

func TestHiveIntegrationCreateView_ViewConflict(t *testing.T) {
assert := require.New(t)

cat := createTestCatalog(t)
defer cat.Close()

dbName := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
viewName := "v1"

err := cat.CreateNamespace(context.TODO(), DatabaseIdentifier(dbName), iceberg.Properties{
"location": getTestTableLocation() + "/" + dbName,
})
assert.NoError(err)
defer cat.DropNamespace(context.TODO(), DatabaseIdentifier(dbName))

schema := iceberg.NewSchema(1, iceberg.NestedField{ID: 1, Name: "col", Type: iceberg.PrimitiveTypes.Int32, Required: true})
ver, _ := view.NewVersionFromSQL(1, 0, "SELECT 1 AS col", table.Identifier{dbName})
viewLocation := getTestTableLocation() + "/" + dbName + "/" + viewName

_, err = cat.CreateView(context.TODO(), TableIdentifier(dbName, viewName), ver, schema,
catalog.WithViewLocation(viewLocation),
)
assert.NoError(err)
defer cat.DropView(context.TODO(), TableIdentifier(dbName, viewName))

_, err = cat.CreateView(context.TODO(), TableIdentifier(dbName, viewName), ver, schema,
catalog.WithViewLocation(viewLocation+"/second"),
)
assert.Error(err)
assert.True(errors.Is(err, catalog.ErrViewAlreadyExists))
}
Loading
Loading