Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions pkg/sync/expand/expander.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (e *Expander) runAction(ctx context.Context, action *EntitlementGraphAction
return "", fmt.Errorf("runAction: error creating new grant: %w", err)
}
newGrants = append(newGrants, descendantGrant)
newGrants, err = e.putGrantsInChunks(ctx, newGrants, 10000)
newGrants, err = PutGrantsInChunks(ctx, e.store, newGrants, 10000)
if err != nil {
l.Error("runAction: error updating descendant grants", zap.Error(err))
return "", fmt.Errorf("runAction: error updating descendant grants: %w", err)
Expand Down Expand Up @@ -255,7 +255,7 @@ func (e *Expander) runAction(ctx context.Context, action *EntitlementGraphAction
}
newGrants = append(newGrants, grantsToUpdate...)

newGrants, err = e.putGrantsInChunks(ctx, newGrants, 10000)
newGrants, err = PutGrantsInChunks(ctx, e.store, newGrants, 10000)
if err != nil {
l.Error("runAction: error updating descendant grants", zap.Error(err))
return "", fmt.Errorf("runAction: error updating descendant grants: %w", err)
Expand All @@ -268,7 +268,7 @@ func (e *Expander) runAction(ctx context.Context, action *EntitlementGraphAction
}
}

_, err = e.putGrantsInChunks(ctx, newGrants, 0)
_, err = PutGrantsInChunks(ctx, e.store, newGrants, 0)
if err != nil {
l.Error("runAction: error updating descendant grants", zap.Error(err))
return "", fmt.Errorf("runAction: error updating descendant grants: %w", err)
Expand All @@ -277,16 +277,16 @@ func (e *Expander) runAction(ctx context.Context, action *EntitlementGraphAction
return sourceGrants.GetNextPageToken(), nil
}

// putGrantsInChunks accumulates grants until the buffer exceeds minChunkSize,
// PutGrantsInChunks accumulates grants until the buffer exceeds minChunkSize,
// then writes all grants to the store at once.
func (e *Expander) putGrantsInChunks(ctx context.Context, grants []*v2.Grant, minChunkSize int) ([]*v2.Grant, error) {
func PutGrantsInChunks(ctx context.Context, store ExpanderStore, grants []*v2.Grant, minChunkSize int) ([]*v2.Grant, error) {
if len(grants) < minChunkSize {
return grants, nil
}

err := e.store.PutGrants(ctx, grants...)
err := store.PutGrants(ctx, grants...)
if err != nil {
return nil, fmt.Errorf("putGrantsInChunks: error putting grants: %w", err)
return nil, fmt.Errorf("PutGrantsInChunks: error putting grants: %w", err)
}

return make([]*v2.Grant, 0), nil
Expand Down
29 changes: 29 additions & 0 deletions pkg/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1649,11 +1649,40 @@ func (s *syncer) loadEntitlementGraph(ctx context.Context, graph *expand.Entitle
}

// Process grants and add edges to the graph
updatedGrants := make([]*v2.Grant, 0)
for _, grant := range resp.GetList() {
err := s.processGrantForGraph(ctx, grant, graph)
if err != nil {
return err
}

// Remove expandable annotation from descendant grant now that we've added it to the graph.
// That way if this sync is part of a compaction, expanding grants at the end of compaction won't redo work.
newAnnos := make(annotations.Annotations, 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we ever do implement a two pass system, we could handle this bit then.

updated := false
for _, anno := range grant.GetAnnotations() {
if anno.MessageIs(&v2.GrantExpandable{}) {
updated = true
} else {
newAnnos = append(newAnnos, anno)
}
}
if !updated {
continue
}

grant.SetAnnotations(newAnnos)
l.Debug("removed expandable annotation from grant", zap.String("grant_id", grant.GetId()))
updatedGrants = append(updatedGrants, grant)
updatedGrants, err = expand.PutGrantsInChunks(ctx, s.store, updatedGrants, 10000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should capture that the grant was expandable but we already did the work.

if err != nil {
return err
}
}

_, err = expand.PutGrantsInChunks(ctx, s.store, updatedGrants, 0)
if err != nil {
return err
}

if graph.Loaded {
Expand Down
102 changes: 59 additions & 43 deletions pkg/sync/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ var userResourceType = v2.ResourceType_builder{
}.Build()

func TestExpandGrants(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := t.Context()

// 2500 * 4 = 10K - used to cause an infinite loop on pagition
usersPerLayer := 2500
Expand All @@ -46,48 +45,79 @@ func TestExpandGrants(t *testing.T) {
mc := newMockConnector()

mc.rtDB = append(mc.rtDB, groupResourceType, userResourceType)
type asdf struct {
type grantData struct {
r *v2.Resource
e *v2.Entitlement
}
groups := make([]*asdf, 0)
for i := 0; i < groupCount; i++ {
expectedGrantCount := 0
groups := make([]*grantData, 0)
for i := range groupCount {
groupId := "group_" + strconv.Itoa(i)
group, groupEnt, err := mc.AddGroup(ctx, groupId)
for _, g := range groups {
// Add previous groups to the new group. Make the entitlement expandable so that users in previous groups are added to the new group.
_ = mc.AddGroupMember(ctx, g.r, group, groupEnt)
expectedGrantCount++
}
groups = append(groups, &asdf{
groups = append(groups, &grantData{
r: group,
e: groupEnt,
})
require.NoError(t, err)

for j := 0; j < usersPerLayer; j++ {
for j := range usersPerLayer {
pid := fmt.Sprintf("user_%d_%d_%d", i, usersPerLayer, j)
principal, err := mc.AddUser(ctx, pid)
require.NoError(t, err)

_ = mc.AddGroupMember(ctx, group, principal)
}
expectedGrantCount += usersPerLayer * (i + 1)
}

tempDir, err := os.MkdirTemp("", "baton-benchmark-expand-grants")
require.NoError(t, err)
defer os.RemoveAll(tempDir)
c1zpath := filepath.Join(tempDir, "expand-grants.c1z")
syncer, err := NewSyncer(ctx, mc, WithC1ZPath(c1zpath), WithTmpDir(tempDir))
defer func() { _ = os.Remove(c1zpath) }()
require.NoError(t, err)
err = syncer.Sync(ctx)
require.NoError(t, err)

err = syncer.Close(ctx)
require.NoError(t, err)
_ = os.Remove(c1zpath)

// Validate that grants got expanded
c1zManager, err := manager.New(ctx, c1zpath)
require.NoError(t, err)
store, err := c1zManager.LoadC1Z(ctx)
require.NoError(t, err)

// Yes it's wasteful to load all grants into memory, but this connector doesn't make a ton of grants.
allGrants := make([]*v2.Grant, 0)
pageToken := ""
for {
resp, err := store.ListGrants(ctx, v2.GrantsServiceListGrantsRequest_builder{PageToken: pageToken}.Build())
require.NoError(t, err)
allGrants = append(allGrants, resp.GetList()...)
pageToken = resp.GetNextPageToken()
if pageToken == "" {
break
}
}
require.Len(t, allGrants, expectedGrantCount, "should have %d grants but got %d", expectedGrantCount, len(allGrants))
for _, grant := range allGrants {
annos := annotations.Annotations(grant.GetAnnotations())
expandable := &v2.GrantExpandable{}
ok, err := annos.Pick(expandable)
require.NoError(t, err)
require.False(t, ok, "grants are expanded, but grant %s has expandable annotation with entitlement ids %v", grant.GetId(), expandable.GetEntitlementIds())
}
}

func TestInvalidResourceTypeFilter(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := t.Context()

tempDir, err := os.MkdirTemp("", "baton-invalid-resource-type-filter-sync-test")
require.NoError(t, err)
Expand All @@ -114,8 +144,7 @@ func TestInvalidResourceTypeFilter(t *testing.T) {
}

func TestResourceTypeFilter(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := t.Context()

tempDir, err := os.MkdirTemp("", "baton-resource-type-filter-sync-test")
require.NoError(t, err)
Expand All @@ -142,8 +171,7 @@ func TestResourceTypeFilter(t *testing.T) {
}

func TestExpandGrantBadEntitlement(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := t.Context()

ctx, err := logging.Init(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -217,8 +245,7 @@ func TestExpandGrantBadEntitlement(t *testing.T) {
}

func TestExpandGrantImmutable(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := t.Context()

mc := newMockConnector()

Expand Down Expand Up @@ -317,8 +344,7 @@ func TestExpandGrantImmutable(t *testing.T) {
}

func TestExpandGrantImmutableCycle(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := t.Context()

mc := newMockConnector()

Expand Down Expand Up @@ -430,8 +456,7 @@ func TestExpandGrantImmutableCycle(t *testing.T) {
_ = os.Remove(c1zpath)
}
func BenchmarkExpandCircle(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := b.Context()

// create a loop of N entitlements
circleSize := 7
Expand All @@ -443,7 +468,7 @@ func BenchmarkExpandCircle(b *testing.B) {

mc.rtDB = append(mc.rtDB, groupResourceType, userResourceType)

for i := 0; i < groupCount; i++ {
for i := range groupCount {
groupId := "group_" + strconv.Itoa(i)
group, _, err := mc.AddGroup(ctx, groupId)
require.NoError(b, err)
Expand All @@ -454,7 +479,7 @@ func BenchmarkExpandCircle(b *testing.B) {

_ = mc.AddGroupMember(ctx, group, childGroup, childEnt)

for j := 0; j < usersPerLayer; j++ {
for j := range usersPerLayer {
pid := "user_" + strconv.Itoa(i*usersPerLayer+j)
principal, err := mc.AddUser(ctx, pid)
require.NoError(b, err)
Expand All @@ -466,7 +491,7 @@ func BenchmarkExpandCircle(b *testing.B) {
}

// create the circle
for i := 0; i < circleSize; i += 1 {
for i := range circleSize {
groupId := "group_" + strconv.Itoa(i)
nextGroupId := "group_" + strconv.Itoa((i+1)%circleSize) // Wrap around to the start for the last element
currentEnt := mc.entDB[groupId][0]
Expand All @@ -479,8 +504,8 @@ func BenchmarkExpandCircle(b *testing.B) {
require.NoError(b, err)
defer os.RemoveAll(tempDir)
c1zpath := filepath.Join(tempDir, "expand-circle.c1z")
b.ResetTimer()
for i := 0; i < b.N; i++ {

for b.Loop() {
syncer, err := NewSyncer(ctx, mc, WithC1ZPath(c1zpath), WithTmpDir(tempDir))
require.NoError(b, err)
err = syncer.Sync(ctx)
Expand All @@ -492,8 +517,7 @@ func BenchmarkExpandCircle(b *testing.B) {
}

func TestExternalResourcePath(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := t.Context()

tempDir, err := os.MkdirTemp("", "baton-id-test")
require.NoError(t, err)
Expand Down Expand Up @@ -596,8 +620,7 @@ func TestExternalResourcePath(t *testing.T) {
}

func TestPartialSync(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := t.Context()

tempDir, err := os.MkdirTemp("", "baton-partial-sync-test")
require.NoError(t, err)
Expand Down Expand Up @@ -687,8 +710,7 @@ func TestPartialSync(t *testing.T) {
}

func TestPartialSyncSkipEntitlementsAndGrants(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := t.Context()

tempDir, err := os.MkdirTemp("", "baton-partial-sync-test-skip-entitlements-and-grants")
require.NoError(t, err)
Expand Down Expand Up @@ -747,8 +769,7 @@ func TestPartialSyncSkipEntitlementsAndGrants(t *testing.T) {
}

func TestPartialSyncUnimplemented(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := t.Context()

tempDir, err := os.MkdirTemp("", "baton-partial-sync-test-unimplemented")
require.NoError(t, err)
Expand Down Expand Up @@ -818,8 +839,7 @@ func TestPartialSyncUnimplemented(t *testing.T) {
}

func TestExternalResourceMatchAll(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := t.Context()

tempDir, err := os.MkdirTemp("", "baton-external-match-all-test")
require.NoError(t, err)
Expand Down Expand Up @@ -913,8 +933,7 @@ func TestExternalResourceMatchAll(t *testing.T) {
}

func TestExternalResourceMatchID(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := t.Context()

tempDir, err := os.MkdirTemp("", "baton-external-match-id-test")
require.NoError(t, err)
Expand Down Expand Up @@ -976,8 +995,7 @@ func TestExternalResourceMatchID(t *testing.T) {
}

func TestExternalResourceEmailMatch(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := t.Context()

tempDir, err := os.MkdirTemp("", "baton-external-email-match-test")
require.NoError(t, err)
Expand Down Expand Up @@ -1065,8 +1083,7 @@ func TestExternalResourceEmailMatch(t *testing.T) {
}

func TestExternalResourceGroupProfileMatch(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := t.Context()

tempDir, err := os.MkdirTemp("", "baton-external-group-profile-match-test")
require.NoError(t, err)
Expand Down Expand Up @@ -1179,8 +1196,7 @@ func TestExternalResourceGroupProfileMatch(t *testing.T) {
}

func TestExternalResourceWithGrantToEntitlement(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := t.Context()

tempDir, err := os.MkdirTemp("", "baton-external-grant-to-entitlement-test")
require.NoError(t, err)
Expand Down
Loading