Skip to content

Commit 441b9dc

Browse files
gcmsgclaude
andcommitted
feat: add agent-to-agent contact request system
Add contactreq package (store/service with SQLite+PostgreSQL) for managing contact requests between agents. Add REST endpoints for sending, listing, approving/rejecting requests (both agent-side and provider-side). Change signaling to return error feedback instead of silently dropping messages when sender is not in recipient's contacts. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 6af307c commit 441b9dc

File tree

8 files changed

+753
-0
lines changed

8 files changed

+753
-0
lines changed

cmd/peerclawd/main.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/peerclaw/peerclaw-server/internal/bridge/mcp"
2020
"github.com/peerclaw/peerclaw-server/internal/claimtoken"
2121
"github.com/peerclaw/peerclaw-server/internal/config"
22+
"github.com/peerclaw/peerclaw-server/internal/contactreq"
2223
"github.com/peerclaw/peerclaw-server/internal/contacts"
2324
"github.com/peerclaw/peerclaw-server/internal/federation"
2425
"github.com/peerclaw/peerclaw-server/internal/observability"
@@ -197,6 +198,18 @@ func main() {
197198
logger.Info("contacts service initialized")
198199
}
199200

201+
// Initialize contact request service.
202+
var contactReqService *contactreq.Service
203+
if sqlDB != nil {
204+
crStore := contactreq.NewStore(cfg.Database.Driver, sqlDB)
205+
if err := crStore.Migrate(context.Background()); err != nil {
206+
logger.Error("failed to migrate contact request tables", "error", err)
207+
os.Exit(1)
208+
}
209+
contactReqService = contactreq.NewService(crStore, contactsService, logger)
210+
logger.Info("contact request service initialized")
211+
}
212+
200213
// Initialize user ACL service.
201214
var userACLService *useracl.Service
202215
if sqlDB != nil {
@@ -339,6 +352,9 @@ func main() {
339352
httpServer.SetBridgeRateLimiter(bridgeRL)
340353
logger.Info("bridge per-agent rate limiter enabled", "rate", 1.0, "burst", 10)
341354
}
355+
if contactReqService != nil {
356+
httpServer.SetContactRequests(contactReqService)
357+
}
342358
if userACLService != nil {
343359
httpServer.SetUserACL(userACLService)
344360
}

internal/contactreq/postgres.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package contactreq
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"fmt"
7+
"time"
8+
)
9+
10+
// PostgresStore implements Store using PostgreSQL.
11+
type PostgresStore struct {
12+
db *sql.DB
13+
}
14+
15+
// NewPostgresStore creates a new PostgreSQL-backed contact request store.
16+
func NewPostgresStore(db *sql.DB) *PostgresStore {
17+
return &PostgresStore{db: db}
18+
}
19+
20+
func (s *PostgresStore) Migrate(ctx context.Context) error {
21+
schema := `
22+
CREATE TABLE IF NOT EXISTS agent_contact_requests (
23+
id TEXT PRIMARY KEY,
24+
from_agent_id TEXT NOT NULL,
25+
to_agent_id TEXT NOT NULL,
26+
status TEXT NOT NULL DEFAULT 'pending',
27+
message TEXT DEFAULT '',
28+
reject_reason TEXT DEFAULT '',
29+
created_at TIMESTAMPTZ NOT NULL,
30+
updated_at TIMESTAMPTZ NOT NULL,
31+
UNIQUE(from_agent_id, to_agent_id)
32+
);
33+
CREATE INDEX IF NOT EXISTS idx_contact_req_to ON agent_contact_requests(to_agent_id);
34+
CREATE INDEX IF NOT EXISTS idx_contact_req_from ON agent_contact_requests(from_agent_id);
35+
`
36+
_, err := s.db.ExecContext(ctx, schema)
37+
return err
38+
}
39+
40+
func (s *PostgresStore) Create(ctx context.Context, req *ContactRequest) error {
41+
_, err := s.db.ExecContext(ctx, `
42+
INSERT INTO agent_contact_requests (id, from_agent_id, to_agent_id, status, message, reject_reason, created_at, updated_at)
43+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
44+
ON CONFLICT (from_agent_id, to_agent_id) DO UPDATE SET
45+
id = EXCLUDED.id, status = EXCLUDED.status, message = EXCLUDED.message,
46+
reject_reason = EXCLUDED.reject_reason, updated_at = EXCLUDED.updated_at`,
47+
req.ID, req.FromAgentID, req.ToAgentID, req.Status, req.Message, req.RejectReason,
48+
req.CreatedAt.UTC(), req.UpdatedAt.UTC(),
49+
)
50+
return err
51+
}
52+
53+
func (s *PostgresStore) GetByID(ctx context.Context, id string) (*ContactRequest, error) {
54+
row := s.db.QueryRowContext(ctx, `
55+
SELECT id, from_agent_id, to_agent_id, status, COALESCE(message, ''), COALESCE(reject_reason, ''),
56+
created_at, updated_at
57+
FROM agent_contact_requests WHERE id = $1`, id)
58+
return s.scanRow(row)
59+
}
60+
61+
func (s *PostgresStore) GetByAgents(ctx context.Context, fromAgentID, toAgentID string) (*ContactRequest, error) {
62+
row := s.db.QueryRowContext(ctx, `
63+
SELECT id, from_agent_id, to_agent_id, status, COALESCE(message, ''), COALESCE(reject_reason, ''),
64+
created_at, updated_at
65+
FROM agent_contact_requests WHERE from_agent_id = $1 AND to_agent_id = $2`, fromAgentID, toAgentID)
66+
return s.scanRow(row)
67+
}
68+
69+
func (s *PostgresStore) ListByTarget(ctx context.Context, toAgentID string, status string) ([]ContactRequest, error) {
70+
query := `SELECT id, from_agent_id, to_agent_id, status, COALESCE(message, ''), COALESCE(reject_reason, ''),
71+
created_at, updated_at
72+
FROM agent_contact_requests WHERE to_agent_id = $1`
73+
args := []any{toAgentID}
74+
if status != "" {
75+
query += " AND status = $2"
76+
args = append(args, status)
77+
}
78+
query += " ORDER BY created_at DESC"
79+
return s.queryRows(ctx, query, args...)
80+
}
81+
82+
func (s *PostgresStore) ListBySender(ctx context.Context, fromAgentID string, status string) ([]ContactRequest, error) {
83+
query := `SELECT id, from_agent_id, to_agent_id, status, COALESCE(message, ''), COALESCE(reject_reason, ''),
84+
created_at, updated_at
85+
FROM agent_contact_requests WHERE from_agent_id = $1`
86+
args := []any{fromAgentID}
87+
if status != "" {
88+
query += " AND status = $2"
89+
args = append(args, status)
90+
}
91+
query += " ORDER BY created_at DESC"
92+
return s.queryRows(ctx, query, args...)
93+
}
94+
95+
func (s *PostgresStore) UpdateStatus(ctx context.Context, id, status, rejectReason string) error {
96+
_, err := s.db.ExecContext(ctx, `
97+
UPDATE agent_contact_requests SET status = $1, reject_reason = $2, updated_at = $3
98+
WHERE id = $4`,
99+
status, rejectReason, time.Now().UTC(), id,
100+
)
101+
return err
102+
}
103+
104+
func (s *PostgresStore) Delete(ctx context.Context, id string) error {
105+
res, err := s.db.ExecContext(ctx, `DELETE FROM agent_contact_requests WHERE id = $1`, id)
106+
if err != nil {
107+
return err
108+
}
109+
n, _ := res.RowsAffected()
110+
if n == 0 {
111+
return fmt.Errorf("contact request not found")
112+
}
113+
return nil
114+
}
115+
116+
func (s *PostgresStore) Close() error {
117+
return nil // shared db
118+
}
119+
120+
func (s *PostgresStore) scanRow(row *sql.Row) (*ContactRequest, error) {
121+
var r ContactRequest
122+
err := row.Scan(&r.ID, &r.FromAgentID, &r.ToAgentID, &r.Status, &r.Message, &r.RejectReason,
123+
&r.CreatedAt, &r.UpdatedAt)
124+
if err != nil {
125+
if err == sql.ErrNoRows {
126+
return nil, nil
127+
}
128+
return nil, err
129+
}
130+
return &r, nil
131+
}
132+
133+
func (s *PostgresStore) queryRows(ctx context.Context, query string, args ...any) ([]ContactRequest, error) {
134+
rows, err := s.db.QueryContext(ctx, query, args...)
135+
if err != nil {
136+
return nil, err
137+
}
138+
defer rows.Close()
139+
140+
var results []ContactRequest
141+
for rows.Next() {
142+
var r ContactRequest
143+
if err := rows.Scan(&r.ID, &r.FromAgentID, &r.ToAgentID, &r.Status, &r.Message, &r.RejectReason,
144+
&r.CreatedAt, &r.UpdatedAt); err != nil {
145+
return nil, err
146+
}
147+
results = append(results, r)
148+
}
149+
return results, rows.Err()
150+
}

internal/contactreq/service.go

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
package contactreq
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log/slog"
7+
"time"
8+
9+
"github.com/google/uuid"
10+
"github.com/peerclaw/peerclaw-server/internal/contacts"
11+
)
12+
13+
// Service implements contact request business logic.
14+
type Service struct {
15+
store Store
16+
contacts *contacts.Service
17+
logger *slog.Logger
18+
}
19+
20+
// NewService creates a new contact request service.
21+
func NewService(store Store, contacts *contacts.Service, logger *slog.Logger) *Service {
22+
if logger == nil {
23+
logger = slog.Default()
24+
}
25+
return &Service{store: store, contacts: contacts, logger: logger}
26+
}
27+
28+
// Submit creates a new contact request from one agent to another.
29+
func (s *Service) Submit(ctx context.Context, fromAgentID, toAgentID, message string) (*ContactRequest, error) {
30+
if fromAgentID == "" || toAgentID == "" {
31+
return nil, fmt.Errorf("from and to agent IDs are required")
32+
}
33+
if fromAgentID == toAgentID {
34+
return nil, fmt.Errorf("cannot send contact request to self")
35+
}
36+
37+
// Check for existing pending request.
38+
existing, _ := s.store.GetByAgents(ctx, fromAgentID, toAgentID)
39+
if existing != nil && existing.Status == "pending" {
40+
return nil, fmt.Errorf("a pending contact request already exists")
41+
}
42+
if existing != nil && existing.Status == "approved" {
43+
return nil, fmt.Errorf("contact request already approved")
44+
}
45+
46+
now := time.Now().UTC()
47+
req := &ContactRequest{
48+
ID: uuid.New().String(),
49+
FromAgentID: fromAgentID,
50+
ToAgentID: toAgentID,
51+
Status: "pending",
52+
Message: message,
53+
CreatedAt: now,
54+
UpdatedAt: now,
55+
}
56+
57+
if err := s.store.Create(ctx, req); err != nil {
58+
return nil, fmt.Errorf("create contact request: %w", err)
59+
}
60+
61+
s.logger.Info("contact request submitted",
62+
"id", req.ID,
63+
"from", fromAgentID,
64+
"to", toAgentID,
65+
)
66+
return req, nil
67+
}
68+
69+
// Approve approves a contact request and adds both agents as contacts of each other.
70+
func (s *Service) Approve(ctx context.Context, id string) error {
71+
req, err := s.store.GetByID(ctx, id)
72+
if err != nil {
73+
return fmt.Errorf("get contact request: %w", err)
74+
}
75+
if req == nil {
76+
return fmt.Errorf("contact request not found")
77+
}
78+
if req.Status != "pending" {
79+
return fmt.Errorf("contact request is not pending (status: %s)", req.Status)
80+
}
81+
82+
if err := s.store.UpdateStatus(ctx, id, "approved", ""); err != nil {
83+
return fmt.Errorf("approve contact request: %w", err)
84+
}
85+
86+
// Bidirectional contact addition.
87+
if s.contacts != nil {
88+
if _, err := s.contacts.Add(ctx, req.ToAgentID, req.FromAgentID, "", nil); err != nil {
89+
s.logger.Warn("failed to add contact (to→from)", "error", err)
90+
}
91+
if _, err := s.contacts.Add(ctx, req.FromAgentID, req.ToAgentID, "", nil); err != nil {
92+
s.logger.Warn("failed to add contact (from→to)", "error", err)
93+
}
94+
}
95+
96+
s.logger.Info("contact request approved", "id", id, "from", req.FromAgentID, "to", req.ToAgentID)
97+
return nil
98+
}
99+
100+
// Reject rejects a contact request with a reason.
101+
func (s *Service) Reject(ctx context.Context, id, reason string) error {
102+
req, err := s.store.GetByID(ctx, id)
103+
if err != nil {
104+
return fmt.Errorf("get contact request: %w", err)
105+
}
106+
if req == nil {
107+
return fmt.Errorf("contact request not found")
108+
}
109+
if req.Status != "pending" {
110+
return fmt.Errorf("contact request is not pending (status: %s)", req.Status)
111+
}
112+
113+
if err := s.store.UpdateStatus(ctx, id, "rejected", reason); err != nil {
114+
return fmt.Errorf("reject contact request: %w", err)
115+
}
116+
117+
s.logger.Info("contact request rejected", "id", id, "reason", reason)
118+
return nil
119+
}
120+
121+
// ListPending returns pending contact requests for the target agent.
122+
func (s *Service) ListPending(ctx context.Context, toAgentID string) ([]ContactRequest, error) {
123+
return s.store.ListByTarget(ctx, toAgentID, "pending")
124+
}
125+
126+
// ListIncoming returns all contact requests for the target agent, optionally filtered by status.
127+
func (s *Service) ListIncoming(ctx context.Context, toAgentID, status string) ([]ContactRequest, error) {
128+
return s.store.ListByTarget(ctx, toAgentID, status)
129+
}
130+
131+
// ListSent returns contact requests sent by the given agent, optionally filtered by status.
132+
func (s *Service) ListSent(ctx context.Context, fromAgentID, status string) ([]ContactRequest, error) {
133+
return s.store.ListBySender(ctx, fromAgentID, status)
134+
}
135+
136+
// GetByID returns a single contact request.
137+
func (s *Service) GetByID(ctx context.Context, id string) (*ContactRequest, error) {
138+
return s.store.GetByID(ctx, id)
139+
}

0 commit comments

Comments
 (0)