diff --git a/core/modules.go b/core/modules.go index 4e48b5e1..4a6a449e 100644 --- a/core/modules.go +++ b/core/modules.go @@ -4,15 +4,16 @@ import ( "context" "errors" "fmt" + "reflect" + "slices" + "strings" + "sync" + "github.com/cryptopunkscc/astrald/astral" log2 "github.com/cryptopunkscc/astrald/astral/log" "github.com/cryptopunkscc/astrald/core/assets" "github.com/cryptopunkscc/astrald/debug" "github.com/cryptopunkscc/astrald/sig" - "reflect" - "slices" - "strings" - "sync" ) type Modules struct { diff --git a/lib/astrald/services.go b/lib/astrald/services.go new file mode 100644 index 00000000..832a2676 --- /dev/null +++ b/lib/astrald/services.go @@ -0,0 +1,62 @@ +package astrald + +import ( + "github.com/cryptopunkscc/astrald/astral" + "github.com/cryptopunkscc/astrald/astral/channel" + "github.com/cryptopunkscc/astrald/lib/query" + "github.com/cryptopunkscc/astrald/mod/services" +) + +type ServicesClient struct { + c *Client + targetID *astral.Identity +} + +func NewServicesClient(c *Client, targetID *astral.Identity) *ServicesClient { + return &ServicesClient{c: c, targetID: targetID} +} + +var defaultServicesClient *ServicesClient + +func Services() *ServicesClient { + if defaultServicesClient == nil { + defaultServicesClient = NewServicesClient(DefaultClient(), nil) + } + return defaultServicesClient +} + +func (client *ServicesClient) Discover(ctx *astral.Context, follow bool) (<-chan *services.Update, error) { + ch, err := client.queryCh(ctx, "services.discover", query.Args{ + "follow": follow, + }) + if err != nil { + return nil, err + } + + var out = make(chan *services.Update) + + go func() { + defer ch.Close() + defer close(out) + for { + obj, err := ch.Receive() + if err != nil { + return + } + switch obj := obj.(type) { + case *services.Update: + out <- obj + case *astral.EOS: + out <- nil + default: + return + } + } + }() + + return out, nil +} + +func (client *ServicesClient) queryCh(ctx *astral.Context, method string, args any, cfg ...channel.ConfigFunc) (*channel.Channel, error) { + return client.c.WithTarget(client.targetID).QueryChannel(ctx, method, args, cfg...) +} diff --git a/main.go b/main.go index 38dd16da..f4ac2c96 100644 --- a/main.go +++ b/main.go @@ -1,3 +1,24 @@ package main -func main() {} +import ( + "fmt" + + "github.com/cryptopunkscc/astrald/lib/astrald" +) + +func main() { + ctx := astrald.NewContext() + + ch, err := astrald.Services().Discover(ctx, true) + if err != nil { + panic(err) + } + + for u := range ch { + if u == nil { + fmt.Println("snapshot done") + continue + } + fmt.Println(u.Available, u.Name, u.ProviderID) + } +} diff --git a/mod/all/mods.go b/mod/all/mods.go index e90f6427..c0210839 100644 --- a/mod/all/mods.go +++ b/mod/all/mods.go @@ -25,6 +25,7 @@ import ( _ "github.com/cryptopunkscc/astrald/mod/nodes/src" _ "github.com/cryptopunkscc/astrald/mod/objects/src" _ "github.com/cryptopunkscc/astrald/mod/scheduler/src" + _ "github.com/cryptopunkscc/astrald/mod/services/src" _ "github.com/cryptopunkscc/astrald/mod/shell/src" _ "github.com/cryptopunkscc/astrald/mod/tcp/src" _ "github.com/cryptopunkscc/astrald/mod/tor/src" diff --git a/mod/nat/src/loader.go b/mod/nat/src/loader.go index cfbf99dc..2308bc8e 100644 --- a/mod/nat/src/loader.go +++ b/mod/nat/src/loader.go @@ -1,6 +1,8 @@ package nat import ( + "sync" + "github.com/cryptopunkscc/astrald/astral" "github.com/cryptopunkscc/astrald/astral/log" "github.com/cryptopunkscc/astrald/core" @@ -14,6 +16,7 @@ func (Loader) Load(node astral.Node, assets assets.Assets, l *log.Logger) (core. mod := &Module{ node: node, log: l, + cond: sync.NewCond(&sync.Mutex{}), } mod.pool = NewPairPool(mod) diff --git a/mod/nat/src/module.go b/mod/nat/src/module.go index 4f94729f..60f85b6e 100644 --- a/mod/nat/src/module.go +++ b/mod/nat/src/module.go @@ -1,6 +1,9 @@ package nat import ( + "sync" + "sync/atomic" + "github.com/cryptopunkscc/astrald/astral" "github.com/cryptopunkscc/astrald/astral/log" "github.com/cryptopunkscc/astrald/mod/dir" @@ -32,6 +35,9 @@ type Module struct { pool *PairPool ops shell.Scope + + enabled atomic.Bool + cond *sync.Cond } func (mod *Module) Run(ctx *astral.Context) error { @@ -45,6 +51,12 @@ func (mod *Module) Scope() *shell.Scope { return &mod.ops } +func (mod *Module) SetEnabled(enabled bool) { + if mod.enabled.Swap(enabled) != enabled { + mod.cond.Broadcast() + } +} + func (mod *Module) String() string { return nat.ModuleName } diff --git a/mod/nat/src/op_enable.go b/mod/nat/src/op_enable.go new file mode 100644 index 00000000..e58a7606 --- /dev/null +++ b/mod/nat/src/op_enable.go @@ -0,0 +1,22 @@ +package nat + +import ( + "github.com/cryptopunkscc/astrald/astral" + "github.com/cryptopunkscc/astrald/astral/channel" + "github.com/cryptopunkscc/astrald/mod/shell" +) + +type opSetEnabledArgs struct { + Arg bool `query:"optional"` + In string `query:"optional"` + Out string `query:"optional"` +} + +func (mod *Module) OpSetEnabled(ctx *astral.Context, q shell.Query, args opSetEnabledArgs) (err error) { + ch := channel.New(q.Accept(), channel.WithFormats(args.In, args.Out)) + defer ch.Close() + + mod.SetEnabled(args.Arg) + + return ch.Send(&astral.Ack{}) +} diff --git a/mod/nat/src/service_discoverer.go b/mod/nat/src/service_discoverer.go new file mode 100644 index 00000000..1c47a05c --- /dev/null +++ b/mod/nat/src/service_discoverer.go @@ -0,0 +1,55 @@ +package nat + +import ( + "github.com/cryptopunkscc/astrald/astral" + "github.com/cryptopunkscc/astrald/mod/nat" + "github.com/cryptopunkscc/astrald/mod/services" +) + +func (mod *Module) DiscoverServices( + ctx *astral.Context, + caller *astral.Identity, + follow bool, +) (<-chan *services.Update, error) { + var ch = make(chan *services.Update, 2) + + if mod.enabled.Load() { + ch <- mod.newServiceUpdate(true) + } + + if !follow { + close(ch) + return ch, nil + } + + ch <- nil + + go func() { + <-ctx.Done() + mod.cond.Broadcast() + }() + + go func() { + mod.cond.L.Lock() + defer mod.cond.L.Unlock() + for { + mod.cond.Wait() + select { + case <-ctx.Done(): + return + case ch <- mod.newServiceUpdate(mod.enabled.Load()): + } + + } + }() + + return ch, nil +} + +func (mod *Module) newServiceUpdate(available bool) *services.Update { + return &services.Update{ + Available: available, + Name: nat.ModuleName, + ProviderID: mod.node.Identity(), + } +} diff --git a/mod/services/module.go b/mod/services/module.go new file mode 100644 index 00000000..cfcf1551 --- /dev/null +++ b/mod/services/module.go @@ -0,0 +1,15 @@ +package services + +import "github.com/cryptopunkscc/astrald/astral" + +const ModuleName = "services" +const DBPrefix = "services__" + +type Module interface { + AddDiscoverer(Discoverer) error + Discoverer +} + +type Discoverer interface { + DiscoverServices(ctx *astral.Context, caller *astral.Identity, follow bool) (<-chan *Update, error) +} diff --git a/mod/services/src/db.go b/mod/services/src/db.go new file mode 100644 index 00000000..92b39351 --- /dev/null +++ b/mod/services/src/db.go @@ -0,0 +1,58 @@ +package services + +import ( + "github.com/cryptopunkscc/astrald/astral" + "gorm.io/gorm" +) + +type DB struct { + db *gorm.DB +} + +func (db *DB) Migrate() error { + return db.db.AutoMigrate( + &dbService{}, + ) +} + +// InTx executes fn inside a database transaction. +// This is the only transaction API - commit/rollback are fully encapsulated. +func (db *DB) InTx(fn func(tx *DB) error) error { + return db.db.Transaction(func(txGorm *gorm.DB) error { + return fn(&DB{db: txGorm}) + }) +} + +// deleteAllProviderServices deletes all cached services for a specific identity +func (db *DB) deleteAllProviderServices(providerID *astral.Identity) error { + return db.db. + Delete(&dbService{}, "provider_id = ?", providerID). + Error +} + +// deleteProviderService deletes a specific cached service for a specific identity +func (db *DB) deleteProviderService(providerID *astral.Identity, name string) error { + return db.db. + Delete(&dbService{}, "name = ? AND provider_id = ?", name, providerID). + Error +} + +// createProviderService creates a new service record +func (db *DB) createProviderService(providerID *astral.Identity, name string, info *astral.Bundle) error { + return db.db.Create(&dbService{ + Name: name, + ProviderID: providerID, + Info: info, + }).Error +} + +// findProviderServices returns all current services for a specific identity +func (db *DB) findProviderServices(providerID *astral.Identity) ([]dbService, error) { + var svcList []dbService + err := db.db. + Where("provider_id = ?", providerID). + Order("created_at DESC"). + Find(&svcList). + Error + return svcList, err +} diff --git a/mod/services/src/db_service.go b/mod/services/src/db_service.go new file mode 100644 index 00000000..79dbf34b --- /dev/null +++ b/mod/services/src/db_service.go @@ -0,0 +1,20 @@ +package services + +import ( + "time" + + "github.com/cryptopunkscc/astrald/astral" + "github.com/cryptopunkscc/astrald/mod/services" +) + +// dbService represents discovered service of a specific identity cached in the database +type dbService struct { + Name string `gorm:"uniqueIndex:idx_db_service_name_provider_id"` + ProviderID *astral.Identity `gorm:"uniqueIndex:idx_db_service_name_provider_id"` + Info *astral.Bundle `gorm:"serializer:json"` + CreatedAt time.Time +} + +func (dbService) TableName() string { + return services.DBPrefix + "services" +} diff --git a/mod/services/src/deps.go b/mod/services/src/deps.go new file mode 100644 index 00000000..f028939b --- /dev/null +++ b/mod/services/src/deps.go @@ -0,0 +1,32 @@ +package services + +import ( + "github.com/cryptopunkscc/astrald/core" + "github.com/cryptopunkscc/astrald/mod/dir" + "github.com/cryptopunkscc/astrald/mod/services" +) + +type Deps struct { + Dir dir.Module +} + +func (mod *Module) LoadDependencies() (err error) { + err = core.Inject(mod.node, &mod.Deps) + if err != nil { + return err + } + + if cnode, ok := mod.node.(*core.Node); ok { + for _, m := range cnode.Modules().Loaded() { + if m == mod { + continue + } + + if d, ok := m.(services.Discoverer); ok { + mod.AddDiscoverer(d) + } + } + } + + return nil +} diff --git a/mod/services/src/discover_services.go b/mod/services/src/discover_services.go new file mode 100644 index 00000000..cc9e47d1 --- /dev/null +++ b/mod/services/src/discover_services.go @@ -0,0 +1,81 @@ +package services + +import ( + "sync" + + "github.com/cryptopunkscc/astrald/astral" + "github.com/cryptopunkscc/astrald/mod/services" +) + +// DiscoverServices runs all registered ServiceDiscoverers with the provided options and merges +// their returned streams into a single output channel. +func (mod *Module) DiscoverServices( + ctx *astral.Context, + caller *astral.Identity, + follow bool, +) (<-chan *services.Update, error) { + var out = make(chan *services.Update) + var sources []<-chan *services.Update + + // collect all sources for discovery + for _, discoverer := range mod.discoverers.Clone() { + source, err := discoverer.DiscoverServices(ctx, caller, follow) + if err != nil { + mod.log.Logv(2, "%v.DiscoverServices: %v", discoverer, err) + continue + } + sources = append(sources, source) + } + + go func() { + defer close(out) + + // collect snapshots and send them to the output channel + var wg sync.WaitGroup + for _, source := range sources { + wg.Add(1) + go func(source <-chan *services.Update) { + defer wg.Done() + for update := range source { + if update == nil { + return + } + select { + case <-ctx.Done(): + return + case out <- update: + } + } + }(source) + } + wg.Wait() + + // return if we only wanted the snapshot + if !follow { + return + } + + // send the separator + out <- nil + + // collect updates and send them to the output channel + for _, source := range sources { + wg.Add(1) + go func(source <-chan *services.Update) { + defer wg.Done() + for update := range source { + select { + case <-ctx.Done(): + return + case out <- update: + } + + } + }(source) + } + + wg.Wait() + }() + + return out, nil +} diff --git a/mod/services/src/loader.go b/mod/services/src/loader.go new file mode 100644 index 00000000..e5e4df3a --- /dev/null +++ b/mod/services/src/loader.go @@ -0,0 +1,34 @@ +package services + +import ( + "github.com/cryptopunkscc/astrald/astral" + "github.com/cryptopunkscc/astrald/astral/log" + "github.com/cryptopunkscc/astrald/core" + "github.com/cryptopunkscc/astrald/core/assets" + "github.com/cryptopunkscc/astrald/mod/services" +) + +type Loader struct{} + +func (Loader) Load(node astral.Node, assets assets.Assets, log *log.Logger) (core.Module, error) { + var mod = &Module{ + node: node, + log: log, + } + + mod.db = &DB{db: assets.Database()} + + if err := mod.db.Migrate(); err != nil { + return nil, err + } + + _ = mod.ops.AddStruct(mod, "Op") + + return mod, nil +} + +func init() { + if err := core.RegisterModule(services.ModuleName, Loader{}); err != nil { + panic(err) + } +} diff --git a/mod/services/src/module.go b/mod/services/src/module.go new file mode 100644 index 00000000..2cc62028 --- /dev/null +++ b/mod/services/src/module.go @@ -0,0 +1,75 @@ +package services + +import ( + "github.com/cryptopunkscc/astrald/astral" + "github.com/cryptopunkscc/astrald/astral/log" + "github.com/cryptopunkscc/astrald/lib/astrald" + "github.com/cryptopunkscc/astrald/mod/services" + "github.com/cryptopunkscc/astrald/mod/shell" + "github.com/cryptopunkscc/astrald/sig" +) + +const ModuleName = "services" + +type Module struct { + Deps + + node astral.Node + log *log.Logger + ops shell.Scope + db *DB + + discoverers sig.Set[services.Discoverer] +} + +var _ services.Module = &Module{} + +func (mod *Module) Run(ctx *astral.Context) error { + <-ctx.Done() + return nil +} + +func (mod *Module) syncServices(ctx *astral.Context, providerID *astral.Identity, follow bool) error { + client := astrald.NewServicesClient(astrald.DefaultClient(), providerID) + + ch, err := client.Discover(ctx, follow) + if err != nil { + return err + } + + // clear cache + err = mod.db.deleteAllProviderServices(providerID) + if err != nil { + return err + } + + // process updates + for update := range ch { + switch { + case update == nil: + continue + case update.Available: + err = mod.db.createProviderService(update.ProviderID, string(update.Name), update.Info) + default: + err = mod.db.deleteProviderService(update.ProviderID, string(update.Name)) + } + + if err != nil { + return err + } + } + + return nil +} + +func (mod *Module) AddDiscoverer(discoverer services.Discoverer) error { + return mod.discoverers.Add(discoverer) +} + +func (mod *Module) Scope() *shell.Scope { + return &mod.ops +} + +func (mod *Module) String() string { + return ModuleName +} diff --git a/mod/services/src/op_discover.go b/mod/services/src/op_discover.go new file mode 100644 index 00000000..df55e351 --- /dev/null +++ b/mod/services/src/op_discover.go @@ -0,0 +1,40 @@ +package services + +import ( + "github.com/cryptopunkscc/astrald/astral" + "github.com/cryptopunkscc/astrald/astral/channel" + "github.com/cryptopunkscc/astrald/mod/shell" +) + +type opServiceDiscoverArgs struct { + In string `query:"optional"` + Out string `query:"optional"` + Follow bool `query:"optional"` +} + +func (mod *Module) OpDiscover( + ctx *astral.Context, + q shell.Query, + args opServiceDiscoverArgs, +) error { + ch := q.AcceptChannel(channel.WithFormats(args.In, args.Out)) + defer ch.Close() + + updates, err := mod.DiscoverServices(ctx, q.Caller(), args.Follow) + if err != nil { + return ch.Send(astral.NewError(err.Error())) + } + + for update := range updates { + if update == nil { + err = ch.Send(&astral.EOS{}) + } else { + err = ch.Send(update) + } + if err != nil { + return err + } + } + + return ch.Send(&astral.EOS{}) +} diff --git a/mod/services/src/op_sync.go b/mod/services/src/op_sync.go new file mode 100644 index 00000000..156584aa --- /dev/null +++ b/mod/services/src/op_sync.go @@ -0,0 +1,41 @@ +package services + +import ( + "github.com/cryptopunkscc/astrald/astral" + "github.com/cryptopunkscc/astrald/astral/channel" + "github.com/cryptopunkscc/astrald/mod/shell" +) + +type opSyncArgs struct { + ID string + Follow bool `query:"optional"` + In string `query:"optional"` + Out string `query:"optional"` +} + +func (mod *Module) OpSync(ctx *astral.Context, q shell.Query, args opSyncArgs) (err error) { + ch := q.AcceptChannel(channel.WithFormats(args.In, args.Out)) + defer ch.Close() + + // resolve the target identity + targetID, err := mod.Dir.ResolveIdentity(args.ID) + if err != nil { + return ch.Send(astral.NewError(err.Error())) + } + + ctx, cancel := ctx.WithZone(astral.ZoneNetwork).WithCancel() + + // cancel the sync on any channel activity + go func() { + _, _ = ch.Receive() + cancel() + }() + + // run the updater + err = mod.syncServices(ctx, targetID, args.Follow) + if err != nil { + return ch.Send(astral.NewError(err.Error())) + } + + return ch.Send(&astral.Ack{}) +} diff --git a/mod/services/update.go b/mod/services/update.go new file mode 100644 index 00000000..39900f58 --- /dev/null +++ b/mod/services/update.go @@ -0,0 +1,32 @@ +package services + +import ( + "io" + + "github.com/cryptopunkscc/astrald/astral" +) + +var _ astral.Object = &Update{} + +type Update struct { + Available bool + Name astral.String8 + ProviderID *astral.Identity + Info *astral.Bundle +} + +func (s Update) ObjectType() string { + return "services.update" +} + +func (s Update) WriteTo(w io.Writer) (n int64, err error) { + return astral.Objectify(&s).WriteTo(w) +} + +func (s *Update) ReadFrom(r io.Reader) (n int64, err error) { + return astral.Objectify(s).ReadFrom(r) +} + +func init() { + _ = astral.DefaultBlueprints.Add(&Update{}) +} diff --git a/sig/chan.go b/sig/chan.go index bce96c11..f347cd9f 100644 --- a/sig/chan.go +++ b/sig/chan.go @@ -1,5 +1,6 @@ package sig +// ChanToArray drains ch until it is closed and returns all values in arrival order. func ChanToArray[T any](ch <-chan T) (arr []T) { for i := range ch { arr = append(arr, i) @@ -7,6 +8,7 @@ func ChanToArray[T any](ch <-chan T) (arr []T) { return } +// ArrayToChan converts arr into a closed, buffered channel containing all elements. func ArrayToChan[T any](arr []T) chan T { var ch = make(chan T, len(arr)) for _, i := range arr {