Skip to content

Commit e362f62

Browse files
authored
cleanup graph creation code, no change in behavior (#584)
1 parent e3b310c commit e362f62

File tree

1 file changed

+145
-110
lines changed

1 file changed

+145
-110
lines changed

pkg/sync/syncer.go

Lines changed: 145 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -1590,141 +1590,177 @@ func (s *syncer) SyncAssets(ctx context.Context) error {
15901590
return nil
15911591
}
15921592

1593-
// SyncGrantExpansion documentation pending.
1593+
// SyncGrantExpansion handles the grant expansion phase of sync.
1594+
// It first loads the entitlement graph from grants, fixes any cycles, then runs expansion.
15941595
func (s *syncer) SyncGrantExpansion(ctx context.Context) error {
15951596
ctx, span := tracer.Start(ctx, "syncer.SyncGrantExpansion")
15961597
defer span.End()
15971598

1598-
l := ctxzap.Extract(ctx)
15991599
entitlementGraph := s.state.EntitlementGraph(ctx)
1600-
if !entitlementGraph.Loaded {
1601-
pageToken := s.state.PageToken(ctx)
16021600

1603-
if pageToken == "" {
1604-
l.Info("Expanding grants...")
1605-
s.handleInitialActionForStep(ctx, *s.state.Current())
1606-
}
1607-
resp, err := s.store.ListGrants(ctx, v2.GrantsServiceListGrantsRequest_builder{PageToken: pageToken}.Build())
1601+
// Phase 1: Load the entitlement graph from grants (paginated)
1602+
if !entitlementGraph.Loaded {
1603+
err := s.loadEntitlementGraph(ctx, entitlementGraph)
16081604
if err != nil {
16091605
return err
16101606
}
1607+
return nil
1608+
}
16111609

1612-
// We want to take action on the next page before we push any new actions
1613-
if resp.GetNextPageToken() != "" {
1614-
err = s.state.NextPage(ctx, resp.GetNextPageToken())
1615-
if err != nil {
1616-
return err
1617-
}
1618-
} else {
1619-
l.Debug("Finished loading grants to expand")
1620-
entitlementGraph.Loaded = true
1610+
// Phase 2: Fix cycles in the graph (only runs once after loading completes)
1611+
if !entitlementGraph.HasNoCycles {
1612+
err := s.fixEntitlementGraphCycles(ctx, entitlementGraph)
1613+
if err != nil {
1614+
return err
16211615
}
1616+
}
16221617

1623-
for _, grant := range resp.GetList() {
1624-
annos := annotations.Annotations(grant.GetAnnotations())
1625-
expandable := &v2.GrantExpandable{}
1626-
_, err := annos.Pick(expandable)
1627-
if err != nil {
1628-
return err
1629-
}
1630-
if len(expandable.GetEntitlementIds()) == 0 {
1631-
continue
1632-
}
1618+
// Phase 3: Run the expansion algorithm
1619+
err := s.expandGrantsForEntitlements(ctx)
1620+
if err != nil {
1621+
return err
1622+
}
16331623

1634-
principalID := grant.GetPrincipal().GetId()
1635-
if principalID == nil {
1636-
return fmt.Errorf("principal id was nil")
1637-
}
1624+
return nil
1625+
}
16381626

1639-
// FIXME(morgabra) Log and skip some of the error paths here?
1640-
for _, srcEntitlementID := range expandable.GetEntitlementIds() {
1641-
l.Debug(
1642-
"Expandable entitlement found",
1643-
zap.String("src_entitlement_id", srcEntitlementID),
1644-
zap.String("dst_entitlement_id", grant.GetEntitlement().GetId()),
1645-
)
1627+
// loadEntitlementGraph loads one page of grants and adds expandable relationships to the graph.
1628+
// This method handles pagination via the syncer's state machine.
1629+
func (s *syncer) loadEntitlementGraph(ctx context.Context, graph *expand.EntitlementGraph) error {
1630+
l := ctxzap.Extract(ctx)
1631+
pageToken := s.state.PageToken(ctx)
16461632

1647-
srcEntitlement, err := s.store.GetEntitlement(ctx, reader_v2.EntitlementsReaderServiceGetEntitlementRequest_builder{
1648-
EntitlementId: srcEntitlementID,
1649-
}.Build())
1650-
if err != nil {
1651-
l.Error("error fetching source entitlement",
1652-
zap.String("src_entitlement_id", srcEntitlementID),
1653-
zap.String("dst_entitlement_id", grant.GetEntitlement().GetId()),
1654-
zap.Error(err),
1655-
)
1656-
continue
1657-
}
1633+
if pageToken == "" {
1634+
l.Info("Expanding grants...")
1635+
s.handleInitialActionForStep(ctx, *s.state.Current())
1636+
}
16581637

1659-
// The expand annotation points at entitlements by id. Those entitlements' resource should match
1660-
// the current grant's principal, so we don't allow expanding arbitrary entitlements.
1661-
sourceEntitlementResourceID := srcEntitlement.GetEntitlement().GetResource().GetId()
1662-
if sourceEntitlementResourceID == nil {
1663-
return fmt.Errorf("source entitlement resource id was nil")
1664-
}
1665-
if principalID.GetResourceType() != sourceEntitlementResourceID.GetResourceType() ||
1666-
principalID.GetResource() != sourceEntitlementResourceID.GetResource() {
1667-
l.Error(
1668-
"source entitlement resource id did not match grant principal id",
1669-
zap.String("grant_principal_id", principalID.String()),
1670-
zap.String("source_entitlement_resource_id", sourceEntitlementResourceID.String()))
1671-
1672-
return fmt.Errorf("source entitlement resource id did not match grant principal id")
1673-
}
1638+
resp, err := s.store.ListGrants(ctx, v2.GrantsServiceListGrantsRequest_builder{PageToken: pageToken}.Build())
1639+
if err != nil {
1640+
return err
1641+
}
16741642

1675-
entitlementGraph.AddEntitlement(grant.GetEntitlement())
1676-
entitlementGraph.AddEntitlement(srcEntitlement.GetEntitlement())
1677-
err = entitlementGraph.AddEdge(ctx,
1678-
srcEntitlement.GetEntitlement().GetId(),
1679-
grant.GetEntitlement().GetId(),
1680-
expandable.GetShallow(),
1681-
expandable.GetResourceTypeIds(),
1682-
)
1683-
if err != nil {
1684-
return fmt.Errorf("error adding edge to graph: %w", err)
1685-
}
1686-
}
1687-
}
1688-
if entitlementGraph.Loaded {
1689-
l.Info("Finished loading entitlement graph", zap.Int("edges", len(entitlementGraph.Edges)))
1643+
// Handle pagination
1644+
if resp.GetNextPageToken() != "" {
1645+
err = s.state.NextPage(ctx, resp.GetNextPageToken())
1646+
if err != nil {
1647+
return err
16901648
}
1691-
return nil
1649+
} else {
1650+
l.Debug("Finished loading grants to expand")
1651+
graph.Loaded = true
16921652
}
16931653

1694-
if entitlementGraph.Loaded {
1695-
comps, sccMetrics := entitlementGraph.ComputeCyclicComponents(ctx)
1696-
if len(comps) > 0 {
1697-
// Log a sample cycle
1698-
l.Warn(
1699-
"cycle detected in entitlement graph",
1700-
zap.Any("cycle", comps[0]),
1701-
zap.Any("scc_metrics", sccMetrics),
1702-
)
1703-
l.Debug("initial graph stats",
1704-
zap.Int("edges", len(entitlementGraph.Edges)),
1705-
zap.Int("nodes", len(entitlementGraph.Nodes)),
1706-
zap.Int("actions", len(entitlementGraph.Actions)),
1707-
zap.Int("depth", entitlementGraph.Depth),
1708-
zap.Bool("has_no_cycles", entitlementGraph.HasNoCycles),
1709-
)
1710-
if dontFixCycles {
1711-
return fmt.Errorf("cycles detected in entitlement graph")
1712-
}
1713-
err := entitlementGraph.FixCyclesFromComponents(ctx, comps)
1714-
if err != nil {
1715-
return err
1716-
}
1654+
// Process grants and add edges to the graph
1655+
for _, grant := range resp.GetList() {
1656+
err := s.processGrantForGraph(ctx, grant, graph)
1657+
if err != nil {
1658+
return err
17171659
}
17181660
}
17191661

1720-
err := s.expandGrantsForEntitlements(ctx)
1662+
if graph.Loaded {
1663+
l.Info("Finished loading entitlement graph", zap.Int("edges", len(graph.Edges)))
1664+
}
1665+
return nil
1666+
}
1667+
1668+
// processGrantForGraph examines a grant for expandable annotations and adds edges to the graph.
1669+
func (s *syncer) processGrantForGraph(ctx context.Context, grant *v2.Grant, graph *expand.EntitlementGraph) error {
1670+
l := ctxzap.Extract(ctx)
1671+
1672+
annos := annotations.Annotations(grant.GetAnnotations())
1673+
expandable := &v2.GrantExpandable{}
1674+
_, err := annos.Pick(expandable)
17211675
if err != nil {
17221676
return err
17231677
}
1678+
if len(expandable.GetEntitlementIds()) == 0 {
1679+
return nil
1680+
}
1681+
1682+
principalID := grant.GetPrincipal().GetId()
1683+
if principalID == nil {
1684+
return fmt.Errorf("principal id was nil")
1685+
}
17241686

1687+
for _, srcEntitlementID := range expandable.GetEntitlementIds() {
1688+
l.Debug(
1689+
"Expandable entitlement found",
1690+
zap.String("src_entitlement_id", srcEntitlementID),
1691+
zap.String("dst_entitlement_id", grant.GetEntitlement().GetId()),
1692+
)
1693+
1694+
srcEntitlement, err := s.store.GetEntitlement(ctx, reader_v2.EntitlementsReaderServiceGetEntitlementRequest_builder{
1695+
EntitlementId: srcEntitlementID,
1696+
}.Build())
1697+
if err != nil {
1698+
l.Error("error fetching source entitlement",
1699+
zap.String("src_entitlement_id", srcEntitlementID),
1700+
zap.String("dst_entitlement_id", grant.GetEntitlement().GetId()),
1701+
zap.Error(err),
1702+
)
1703+
continue
1704+
}
1705+
1706+
// The expand annotation points at entitlements by id. Those entitlements' resource should match
1707+
// the current grant's principal, so we don't allow expanding arbitrary entitlements.
1708+
sourceEntitlementResourceID := srcEntitlement.GetEntitlement().GetResource().GetId()
1709+
if sourceEntitlementResourceID == nil {
1710+
return fmt.Errorf("source entitlement resource id was nil")
1711+
}
1712+
if principalID.GetResourceType() != sourceEntitlementResourceID.GetResourceType() ||
1713+
principalID.GetResource() != sourceEntitlementResourceID.GetResource() {
1714+
l.Error(
1715+
"source entitlement resource id did not match grant principal id",
1716+
zap.String("grant_principal_id", principalID.String()),
1717+
zap.String("source_entitlement_resource_id", sourceEntitlementResourceID.String()))
1718+
1719+
return fmt.Errorf("source entitlement resource id did not match grant principal id")
1720+
}
1721+
1722+
graph.AddEntitlement(grant.GetEntitlement())
1723+
graph.AddEntitlement(srcEntitlement.GetEntitlement())
1724+
err = graph.AddEdge(ctx,
1725+
srcEntitlement.GetEntitlement().GetId(),
1726+
grant.GetEntitlement().GetId(),
1727+
expandable.GetShallow(),
1728+
expandable.GetResourceTypeIds(),
1729+
)
1730+
if err != nil {
1731+
return fmt.Errorf("error adding edge to graph: %w", err)
1732+
}
1733+
}
17251734
return nil
17261735
}
17271736

1737+
// fixEntitlementGraphCycles detects and fixes cycles in the entitlement graph.
1738+
func (s *syncer) fixEntitlementGraphCycles(ctx context.Context, graph *expand.EntitlementGraph) error {
1739+
l := ctxzap.Extract(ctx)
1740+
1741+
comps, sccMetrics := graph.ComputeCyclicComponents(ctx)
1742+
if len(comps) == 0 {
1743+
graph.HasNoCycles = true
1744+
return nil
1745+
}
1746+
l.Warn(
1747+
"cycle detected in entitlement graph",
1748+
zap.Any("cycle", comps[0]),
1749+
zap.Any("scc_metrics", sccMetrics),
1750+
)
1751+
l.Debug("initial graph stats",
1752+
zap.Int("edges", len(graph.Edges)),
1753+
zap.Int("nodes", len(graph.Nodes)),
1754+
zap.Int("actions", len(graph.Actions)),
1755+
zap.Int("depth", graph.Depth),
1756+
zap.Bool("has_no_cycles", graph.HasNoCycles),
1757+
)
1758+
if dontFixCycles {
1759+
return fmt.Errorf("cycles detected in entitlement graph")
1760+
}
1761+
return graph.FixCyclesFromComponents(ctx, comps)
1762+
}
1763+
17281764
// SyncGrants fetches the grants for each resource from the connector. It iterates each resource
17291765
// from the datastore, and pushes a new action to sync the grants for each individual resource.
17301766
func (s *syncer) SyncGrants(ctx context.Context) error {
@@ -2771,7 +2807,7 @@ func (s *syncer) runGrantExpandActions(ctx context.Context) (bool, error) {
27712807

27722808
// Peek the next action on the stack
27732809
if len(graph.Actions) == 0 {
2774-
l.Debug("runGrantExpandActions: no actions") // zap.Any("graph", graph),
2810+
l.Debug("runGrantExpandActions: no actions")
27752811

27762812
return true, nil
27772813
}
@@ -2961,7 +2997,7 @@ func (s *syncer) expandGrantsForEntitlements(ctx context.Context) error {
29612997

29622998
graph := s.state.EntitlementGraph(ctx)
29632999
l = l.With(zap.Int("depth", graph.Depth))
2964-
l.Debug("expandGrantsForEntitlements: start") // zap.Any("graph", graph)
3000+
l.Debug("expandGrantsForEntitlements: start")
29653001

29663002
s.counts.LogExpandProgress(ctx, graph.Actions)
29673003

@@ -2991,7 +3027,6 @@ func (s *syncer) expandGrantsForEntitlements(ctx context.Context) error {
29913027
if int64(graph.Depth) > maxDepth {
29923028
l.Error(
29933029
"expandGrantsForEntitlements: exceeded max depth",
2994-
// zap.Any("graph", graph),
29953030
zap.Int64("max_depth", maxDepth),
29963031
)
29973032
s.state.FinishAction(ctx)
@@ -3012,13 +3047,13 @@ func (s *syncer) expandGrantsForEntitlements(ctx context.Context) error {
30123047
}
30133048

30143049
if graph.IsExpanded() {
3015-
l.Debug("expandGrantsForEntitlements: graph is expanded") // zap.Any("graph", graph)
3050+
l.Debug("expandGrantsForEntitlements: graph is expanded")
30163051
s.state.FinishAction(ctx)
30173052
return nil
30183053
}
30193054

30203055
graph.Depth++
3021-
l.Debug("expandGrantsForEntitlements: graph is not expanded") // zap.Any("graph", graph)
3056+
l.Debug("expandGrantsForEntitlements: graph is not expanded")
30223057
return nil
30233058
}
30243059

0 commit comments

Comments
 (0)