Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 62 additions & 25 deletions pkg/connector/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2"
ent "github.com/conductorone/baton-sdk/pkg/types/entitlement"
"github.com/conductorone/baton-sdk/pkg/types/grant"
"github.com/conductorone/baton-sdk/pkg/pagination"
rs "github.com/conductorone/baton-sdk/pkg/types/resource"
"github.com/conductorone/baton-snowflake/pkg/snowflake"
)
Expand Down Expand Up @@ -135,45 +136,81 @@ func (o *tableBuilder) List(ctx context.Context, parentResourceID *v2.ResourceId

databaseName := parentResourceID.Resource

parentDB, _, err := o.client.GetDatabase(ctx, databaseName)
if err != nil {
return nil, nil, wrapError(err, "failed to get parent database")
}
isSharedOrSystemDB := parentDB != nil && parentDB.IsSharedOrSystem()
bag := &pagination.Bag{}
if err := bag.Unmarshal(opts.PageToken.Token); err != nil {
return nil, nil, wrapError(err, "failed to parse page token")
}

// On first call, enumerate all schemas and push them onto the bag stack.
// Each schema becomes a PageState:
// ResourceID = schema name
// ResourceTypeID = "shared" if the DB is shared/system, "" otherwise
// Token = table name cursor within the schema
// Encoding isSharedOrSystemDB in ResourceTypeID avoids re-querying the
// database on every subsequent page.
if bag.Current() == nil {
parentDB, statusCode, err := o.client.GetDatabase(ctx, databaseName)
if err != nil && !snowflake.IsUnprocessableEntity(statusCode, err) {
return nil, nil, wrapError(err, "failed to get parent database")
}

// Paginate at List boundary: fetch one page from SHOW TABLES IN ACCOUNT, filter to this database, return page + next token.
bag, cursor, err := parseCursorFromToken(opts.PageToken.Token, &v2.ResourceId{ResourceType: tableResourceType.Id, Resource: parentResourceID.Resource})
if err != nil {
return nil, nil, wrapError(err, "failed to get next page cursor")
schemas, err := o.client.ListSchemasInDatabase(ctx, databaseName)
if err != nil {
return nil, nil, wrapError(err, "failed to list schemas in database")
}

sharedFlag := ""
if snowflake.IsUnprocessableEntity(statusCode, nil) || (parentDB != nil && parentDB.IsSharedOrSystem()) {
sharedFlag = "shared"
}

// Skip INFORMATION_SCHEMA — it contains system views with no manageable grants.
pushed := 0
for _, schema := range schemas {
if strings.EqualFold(schema.Name, "INFORMATION_SCHEMA") {
continue
}
bag.Push(pagination.PageState{
ResourceTypeID: sharedFlag,
ResourceID: schema.Name,
})
pushed++
}
if pushed == 0 {
return nil, &rs.SyncOpResults{}, nil
}
}

const accountPageSize = 200
tables, nextCursor, err := o.client.ListTablesInAccount(ctx, cursor, accountPageSize)
isSharedOrSystemDB := bag.ResourceTypeID() == "shared"
schemaName := bag.ResourceID()
tableCursor := bag.PageToken()

const pageSize = 200
tables, nextTableCursor, err := o.client.ListTablesInSchema(ctx, databaseName, schemaName, tableCursor, pageSize)
if err != nil {
return nil, nil, wrapError(err, "failed to list tables in account")
return nil, nil, wrapError(err, "failed to list tables in schema")
}

var resources []*v2.Resource
for i := range tables {
t := &tables[i]
if strings.EqualFold(t.DatabaseName, databaseName) {
resource, err := tableResource(ctx, t, parentResourceID, isSharedOrSystemDB)
if err != nil {
return nil, nil, wrapError(err, "failed to create table resource")
}
resources = append(resources, resource)
resource, err := tableResource(ctx, t, parentResourceID, isSharedOrSystemDB)
if err != nil {
return nil, nil, wrapError(err, "failed to create table resource")
}
resources = append(resources, resource)
}

if nextCursor != "" {
nextToken, err := bag.NextToken(nextCursor)
if err != nil {
return nil, nil, wrapError(err, "failed to create next page cursor")
}
return resources, &rs.SyncOpResults{NextPageToken: nextToken}, nil
// NextToken("") pops the current schema without re-pushing it, advancing to
// the next schema. NextToken(cursor) updates the current schema's cursor for
// the next page within this schema. When the bag is empty, Marshal returns ""
// and the SDK stops calling List.
nextToken, err := bag.NextToken(nextTableCursor)
if err != nil {
return nil, nil, wrapError(err, "failed to create next page token")
}

return resources, &rs.SyncOpResults{}, nil
return resources, &rs.SyncOpResults{NextPageToken: nextToken}, nil
}

func parseTableResourceID(resource *v2.Resource) (string, string, string, error) {
Expand Down
106 changes: 88 additions & 18 deletions pkg/snowflake/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,81 @@ import (
"github.com/conductorone/baton-sdk/pkg/uhttp"
)

var schemaStructFieldToColumnMap = map[string]string{
"Name": "name",
"DatabaseName": "database_name",
}

type (
Schema struct {
Name string
DatabaseName string
}

ListSchemasRawResponse struct {
StatementsApiResponseBase
}
)

func (s *Schema) GetColumnName(fieldName string) string {
return schemaStructFieldToColumnMap[fieldName]
}

func (r *ListSchemasRawResponse) ListSchemas() ([]Schema, error) {
var schemas []Schema
for _, row := range r.Data {
schema := &Schema{}
if err := r.ResultSetMetadata.ParseRow(schema, row); err != nil {
return nil, err
}
schemas = append(schemas, *schema)
}
return schemas, nil
}

func (c *Client) ListSchemasInDatabase(ctx context.Context, databaseName string) ([]Schema, error) {
l := ctxzap.Extract(ctx)

escapedDB := escapeDoubleQuotedIdentifier(databaseName)
queries := []string{
fmt.Sprintf("SHOW SCHEMAS IN DATABASE \"%s\";", escapedDB),
}

req, err := c.PostStatementRequest(ctx, queries)
if err != nil {
return nil, err
}

var response ListSchemasRawResponse
resp1, err := c.Do(req, uhttp.WithJSONResponse(&response))
defer closeResponseBody(resp1)
if err != nil {
if resp1 != nil && resp1.StatusCode == http.StatusUnprocessableEntity {
l.Debug("Insufficient privileges for SHOW SCHEMAS IN DATABASE", zap.String("database", databaseName))
wrappedErr := fmt.Errorf("baton-snowflake: insufficient privileges for SHOW SCHEMAS IN DATABASE %s: %w", databaseName, err)
return nil, status.Error(codes.PermissionDenied, wrappedErr.Error())
}
return nil, err
}

req, err = c.GetStatementResponse(ctx, response.StatementHandle)
if err != nil {
return nil, err
}
resp2, err := c.Do(req, uhttp.WithJSONResponse(&response))
defer closeResponseBody(resp2)
if err != nil {
if resp2 != nil && resp2.StatusCode == http.StatusUnprocessableEntity {
l.Debug("Insufficient privileges for SHOW SCHEMAS IN DATABASE (statement result)", zap.String("database", databaseName))
wrappedErr := fmt.Errorf("baton-snowflake: insufficient privileges for SHOW SCHEMAS IN DATABASE %s (statement result): %w", databaseName, err)
return nil, status.Error(codes.PermissionDenied, wrappedErr.Error())
}
return nil, err
}

return response.ListSchemas()
}

var tableStructFieldToColumnMap = map[string]string{
"CreatedOn": "created_on",
"Name": "name",
Expand Down Expand Up @@ -59,23 +134,16 @@ func (r *ListTablesRawResponse) ListTables() ([]Table, error) {
return tables, nil
}

const tableListCursorSep = "\x00"

func (c *Client) ListTablesInAccount(ctx context.Context, cursor string, limit int) ([]Table, string, error) {
func (c *Client) ListTablesInSchema(ctx context.Context, databaseName, schemaName string, cursor string, limit int) ([]Table, string, error) {
l := ctxzap.Extract(ctx)

escapedDB := escapeDoubleQuotedIdentifier(databaseName)
escapedSchema := escapeDoubleQuotedIdentifier(schemaName)
var q string
if cursor != "" {
// FROM expects a name_string; use fully qualified to avoid duplicates across account
parts := strings.SplitN(cursor, tableListCursorSep, 3)
if len(parts) >= 3 {
fromName := escapeSingleQuote(parts[0] + "." + parts[1] + "." + parts[2])
q = fmt.Sprintf("SHOW TABLES IN ACCOUNT LIMIT %d FROM '%s';", limit, fromName)
} else {
q = fmt.Sprintf("SHOW TABLES IN ACCOUNT LIMIT %d;", limit)
}
q = fmt.Sprintf("SHOW TABLES IN SCHEMA \"%s\".\"%s\" LIMIT %d FROM '%s';", escapedDB, escapedSchema, limit, escapeSingleQuote(cursor))
} else {
q = fmt.Sprintf("SHOW TABLES IN ACCOUNT LIMIT %d;", limit)
q = fmt.Sprintf("SHOW TABLES IN SCHEMA \"%s\".\"%s\" LIMIT %d;", escapedDB, escapedSchema, limit)
}
queries := []string{q}

Expand All @@ -89,8 +157,9 @@ func (c *Client) ListTablesInAccount(ctx context.Context, cursor string, limit i
defer closeResponseBody(resp1)
if err != nil {
if resp1 != nil && resp1.StatusCode == http.StatusUnprocessableEntity {
l.Debug("Insufficient privileges for SHOW TABLES IN ACCOUNT")
wrappedErr := fmt.Errorf("baton-snowflake: insufficient privileges for SHOW TABLES IN ACCOUNT: %w", err)
l.Debug("Insufficient privileges for SHOW TABLES IN SCHEMA",
zap.String("database", databaseName), zap.String("schema", schemaName))
wrappedErr := fmt.Errorf("baton-snowflake: insufficient privileges for SHOW TABLES IN SCHEMA %s.%s: %w", databaseName, schemaName, err)
return nil, "", status.Error(codes.PermissionDenied, wrappedErr.Error())
}
return nil, "", err
Expand All @@ -104,8 +173,9 @@ func (c *Client) ListTablesInAccount(ctx context.Context, cursor string, limit i
defer closeResponseBody(resp2)
if err != nil {
if resp2 != nil && resp2.StatusCode == http.StatusUnprocessableEntity {
l.Debug("Insufficient privileges for SHOW TABLES IN ACCOUNT (statement result)")
wrappedErr := fmt.Errorf("baton-snowflake: insufficient privileges for SHOW TABLES IN ACCOUNT (statement result): %w", err)
l.Debug("Insufficient privileges for SHOW TABLES IN SCHEMA (statement result)",
zap.String("database", databaseName), zap.String("schema", schemaName))
wrappedErr := fmt.Errorf("baton-snowflake: insufficient privileges for SHOW TABLES IN SCHEMA %s.%s (statement result): %w", databaseName, schemaName, err)
return nil, "", status.Error(codes.PermissionDenied, wrappedErr.Error())
}
return nil, "", err
Expand All @@ -117,9 +187,9 @@ func (c *Client) ListTablesInAccount(ctx context.Context, cursor string, limit i
}

var nextCursor string
if len(tables) >= limit {
if limit > 0 && len(tables) >= limit {
last := tables[len(tables)-1]
nextCursor = last.DatabaseName + tableListCursorSep + last.SchemaName + tableListCursorSep + last.Name
nextCursor = last.Name
}
return tables, nextCursor, nil
}
Expand Down
Loading