Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
808faec
draft
Rekseto Dec 29, 2025
e7ca13f
mod/services: service discovery subscriptions
Rekseto Jan 2, 2026
387badc
mod/services: draft of caching
Rekseto Jan 2, 2026
fb75662
mod/services: remove comments
Rekseto Jan 2, 2026
75d8647
mod/services: cleanups
Rekseto Jan 2, 2026
44b8edc
mod/services: add fixme
Rekseto Jan 2, 2026
3db6b28
mod/services: Add UoW pattern to db.go when caching services
Rekseto Jan 6, 2026
365f9f0
mod/services: add discover options, simplifying logic etc
Rekseto Jan 6, 2026
1aeb14f
WIP
Rekseto Jan 6, 2026
e3cb78d
always emit service change
Rekseto Jan 6, 2026
568a3ca
add module name into loader
Rekseto Jan 6, 2026
5ed4db2
cleanups
Rekseto Jan 6, 2026
09da765
remove helper op
Rekseto Jan 6, 2026
a52601e
Update mod/services/service_change_feed.go
Rekseto Jan 8, 2026
e006e83
Update mod/services/src/module.go
Rekseto Jan 8, 2026
2f0d8e0
add constraint
Rekseto Jan 8, 2026
8cc15f8
usage of new channels
Rekseto Jan 8, 2026
9d7fb5c
use ch.Collect in module
Rekseto Jan 8, 2026
5a791da
use sig.Set when adding service discoverers
Rekseto Jan 8, 2026
18c696b
add missing init() for objects
Rekseto Jan 9, 2026
d2c8c91
take out heavy logic out of op_discovery.go
Rekseto Jan 9, 2026
35dba08
minor cleanup
Rekseto Jan 9, 2026
72ae8a0
WIP
Rekseto Jan 13, 2026
572a03a
WIP
Rekseto Jan 13, 2026
c77a6c8
WIP
Rekseto Jan 13, 2026
65931f9
sig: add new helpers
Rekseto Jan 13, 2026
180a584
mod/services: transform discovery logic to use new sig helpers
Rekseto Jan 13, 2026
43b8650
mod/services: WIP
Rekseto Jan 13, 2026
4d1c812
mod/services: final touches
Rekseto Jan 13, 2026
266d93f
sig: improve snapshot follow stream
Rekseto Jan 13, 2026
bce307a
sig: remove useless comments
Rekseto Jan 14, 2026
1aa1ac5
minimalized services module (#146)
Rekseto Jan 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions core/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ import (
"context"
"errors"
"fmt"
"reflect"
"slices"
"strings"
"sync"

"github.com/cryptopunkscc/astrald/astral"
log2 "github.com/cryptopunkscc/astrald/astral/log"
"github.com/cryptopunkscc/astrald/core/assets"
"github.com/cryptopunkscc/astrald/debug"
"github.com/cryptopunkscc/astrald/sig"
"reflect"
"slices"
"strings"
"sync"
)

type Modules struct {
Expand Down
62 changes: 62 additions & 0 deletions lib/astrald/services.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package astrald

import (
"github.com/cryptopunkscc/astrald/astral"
"github.com/cryptopunkscc/astrald/astral/channel"
"github.com/cryptopunkscc/astrald/lib/query"
"github.com/cryptopunkscc/astrald/mod/services"
)

type ServicesClient struct {
c *Client
targetID *astral.Identity
}

func NewServicesClient(c *Client, targetID *astral.Identity) *ServicesClient {
return &ServicesClient{c: c, targetID: targetID}
}

var defaultServicesClient *ServicesClient

func Services() *ServicesClient {
if defaultServicesClient == nil {
defaultServicesClient = NewServicesClient(DefaultClient(), nil)
}
return defaultServicesClient
}

func (client *ServicesClient) Discover(ctx *astral.Context, follow bool) (<-chan *services.Update, error) {
ch, err := client.queryCh(ctx, "services.discover", query.Args{
"follow": follow,
})
if err != nil {
return nil, err
}

var out = make(chan *services.Update)

go func() {
defer ch.Close()
defer close(out)
for {
obj, err := ch.Receive()
if err != nil {
return
}
switch obj := obj.(type) {
case *services.Update:
out <- obj
case *astral.EOS:
out <- nil
default:
return
}
}
}()

return out, nil
}

func (client *ServicesClient) queryCh(ctx *astral.Context, method string, args any, cfg ...channel.ConfigFunc) (*channel.Channel, error) {
return client.c.WithTarget(client.targetID).QueryChannel(ctx, method, args, cfg...)
}
23 changes: 22 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,24 @@
package main

func main() {}
import (
"fmt"

"github.com/cryptopunkscc/astrald/lib/astrald"
)

func main() {
ctx := astrald.NewContext()

ch, err := astrald.Services().Discover(ctx, true)
if err != nil {
panic(err)
}

for u := range ch {
if u == nil {
fmt.Println("snapshot done")
continue
}
fmt.Println(u.Available, u.Name, u.ProviderID)
}
}
1 change: 1 addition & 0 deletions mod/all/mods.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
_ "github.com/cryptopunkscc/astrald/mod/nodes/src"
_ "github.com/cryptopunkscc/astrald/mod/objects/src"
_ "github.com/cryptopunkscc/astrald/mod/scheduler/src"
_ "github.com/cryptopunkscc/astrald/mod/services/src"
_ "github.com/cryptopunkscc/astrald/mod/shell/src"
_ "github.com/cryptopunkscc/astrald/mod/tcp/src"
_ "github.com/cryptopunkscc/astrald/mod/tor/src"
Expand Down
3 changes: 3 additions & 0 deletions mod/nat/src/loader.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package nat

import (
"sync"

"github.com/cryptopunkscc/astrald/astral"
"github.com/cryptopunkscc/astrald/astral/log"
"github.com/cryptopunkscc/astrald/core"
Expand All @@ -14,6 +16,7 @@ func (Loader) Load(node astral.Node, assets assets.Assets, l *log.Logger) (core.
mod := &Module{
node: node,
log: l,
cond: sync.NewCond(&sync.Mutex{}),
}

mod.pool = NewPairPool(mod)
Expand Down
12 changes: 12 additions & 0 deletions mod/nat/src/module.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package nat

import (
"sync"
"sync/atomic"

"github.com/cryptopunkscc/astrald/astral"
"github.com/cryptopunkscc/astrald/astral/log"
"github.com/cryptopunkscc/astrald/mod/dir"
Expand Down Expand Up @@ -32,6 +35,9 @@ type Module struct {

pool *PairPool
ops shell.Scope

enabled atomic.Bool
cond *sync.Cond
}

func (mod *Module) Run(ctx *astral.Context) error {
Expand All @@ -45,6 +51,12 @@ func (mod *Module) Scope() *shell.Scope {
return &mod.ops
}

func (mod *Module) SetEnabled(enabled bool) {
if mod.enabled.Swap(enabled) != enabled {
mod.cond.Broadcast()
}
}

func (mod *Module) String() string {
return nat.ModuleName
}
Expand Down
22 changes: 22 additions & 0 deletions mod/nat/src/op_enable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package nat

import (
"github.com/cryptopunkscc/astrald/astral"
"github.com/cryptopunkscc/astrald/astral/channel"
"github.com/cryptopunkscc/astrald/mod/shell"
)

type opSetEnabledArgs struct {
Arg bool `query:"optional"`
In string `query:"optional"`
Out string `query:"optional"`
}

func (mod *Module) OpSetEnabled(ctx *astral.Context, q shell.Query, args opSetEnabledArgs) (err error) {
ch := channel.New(q.Accept(), channel.WithFormats(args.In, args.Out))
defer ch.Close()

mod.SetEnabled(args.Arg)

return ch.Send(&astral.Ack{})
}
55 changes: 55 additions & 0 deletions mod/nat/src/service_discoverer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package nat

import (
"github.com/cryptopunkscc/astrald/astral"
"github.com/cryptopunkscc/astrald/mod/nat"
"github.com/cryptopunkscc/astrald/mod/services"
)

func (mod *Module) DiscoverServices(
ctx *astral.Context,
caller *astral.Identity,
follow bool,
) (<-chan *services.Update, error) {
var ch = make(chan *services.Update, 2)

if mod.enabled.Load() {
ch <- mod.newServiceUpdate(true)
}

if !follow {
close(ch)
return ch, nil
}

ch <- nil

go func() {
<-ctx.Done()
mod.cond.Broadcast()
}()

go func() {
mod.cond.L.Lock()
defer mod.cond.L.Unlock()
for {
mod.cond.Wait()
select {
case <-ctx.Done():
return
case ch <- mod.newServiceUpdate(mod.enabled.Load()):
}

}
}()

return ch, nil
}

func (mod *Module) newServiceUpdate(available bool) *services.Update {
return &services.Update{
Available: available,
Name: nat.ModuleName,
ProviderID: mod.node.Identity(),
}
}
15 changes: 15 additions & 0 deletions mod/services/module.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package services

import "github.com/cryptopunkscc/astrald/astral"

const ModuleName = "services"
const DBPrefix = "services__"

type Module interface {
AddDiscoverer(Discoverer) error
Discoverer
}

type Discoverer interface {
DiscoverServices(ctx *astral.Context, caller *astral.Identity, follow bool) (<-chan *Update, error)
}
58 changes: 58 additions & 0 deletions mod/services/src/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package services

import (
"github.com/cryptopunkscc/astrald/astral"
"gorm.io/gorm"
)

type DB struct {
db *gorm.DB
}

func (db *DB) Migrate() error {
return db.db.AutoMigrate(
&dbService{},
)
}

// InTx executes fn inside a database transaction.
// This is the only transaction API - commit/rollback are fully encapsulated.
func (db *DB) InTx(fn func(tx *DB) error) error {
return db.db.Transaction(func(txGorm *gorm.DB) error {
return fn(&DB{db: txGorm})
})
}

// deleteAllProviderServices deletes all cached services for a specific identity
func (db *DB) deleteAllProviderServices(providerID *astral.Identity) error {
return db.db.
Delete(&dbService{}, "provider_id = ?", providerID).
Error
}

// deleteProviderService deletes a specific cached service for a specific identity
func (db *DB) deleteProviderService(providerID *astral.Identity, name string) error {
return db.db.
Delete(&dbService{}, "name = ? AND provider_id = ?", name, providerID).
Error
}

// createProviderService creates a new service record
func (db *DB) createProviderService(providerID *astral.Identity, name string, info *astral.Bundle) error {
return db.db.Create(&dbService{
Name: name,
ProviderID: providerID,
Info: info,
}).Error
}

// findProviderServices returns all current services for a specific identity
func (db *DB) findProviderServices(providerID *astral.Identity) ([]dbService, error) {
var svcList []dbService
err := db.db.
Where("provider_id = ?", providerID).
Order("created_at DESC").
Find(&svcList).
Error
return svcList, err
}
20 changes: 20 additions & 0 deletions mod/services/src/db_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package services

import (
"time"

"github.com/cryptopunkscc/astrald/astral"
"github.com/cryptopunkscc/astrald/mod/services"
)

// dbService represents discovered service of a specific identity cached in the database
type dbService struct {
Name string `gorm:"uniqueIndex:idx_db_service_name_provider_id"`
ProviderID *astral.Identity `gorm:"uniqueIndex:idx_db_service_name_provider_id"`
Info *astral.Bundle `gorm:"serializer:json"`
CreatedAt time.Time
}

func (dbService) TableName() string {
return services.DBPrefix + "services"
}
32 changes: 32 additions & 0 deletions mod/services/src/deps.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package services

import (
"github.com/cryptopunkscc/astrald/core"
"github.com/cryptopunkscc/astrald/mod/dir"
"github.com/cryptopunkscc/astrald/mod/services"
)

type Deps struct {
Dir dir.Module
}

func (mod *Module) LoadDependencies() (err error) {
err = core.Inject(mod.node, &mod.Deps)
if err != nil {
return err
}

if cnode, ok := mod.node.(*core.Node); ok {
for _, m := range cnode.Modules().Loaded() {
if m == mod {
continue
}

if d, ok := m.(services.Discoverer); ok {
mod.AddDiscoverer(d)
}
}
}

return nil
}
Loading