Skip to content

Commit 9c81f4c

Browse files
authored
Send outside of workflows + send/set message type registration (#42)
- Allow standalone `Send()` and `SetEvent()` - Hash executable for standalone versioning - Automatically register `time.Time` for `encoding/gob` - Make `Send` and `SetEvent` generic so we can register the message type with `encoding/gob` automatically
1 parent 7c7dc42 commit 9c81f4c

File tree

6 files changed

+404
-146
lines changed

6 files changed

+404
-146
lines changed

dbos/dbos.go

Lines changed: 35 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,13 @@ package dbos
33
import (
44
"context"
55
"crypto/sha256"
6+
"encoding/gob"
67
"encoding/hex"
78
"fmt"
9+
"io"
810
"log/slog"
911
"net/url"
1012
"os"
11-
"reflect"
12-
"runtime"
13-
"sort"
1413
"time"
1514

1615
"github.com/robfig/cron/v3"
@@ -23,36 +22,6 @@ var (
2322
_DEFAULT_ADMIN_SERVER_PORT = 3001
2423
)
2524

26-
func computeApplicationVersion() string {
27-
if len(registry) == 0 {
28-
fmt.Println("DBOS: No registered workflows found, cannot compute application version")
29-
return ""
30-
}
31-
32-
// Collect all function names and sort them for consistent hashing
33-
var functionNames []string
34-
for fqn := range registry {
35-
functionNames = append(functionNames, fqn)
36-
}
37-
sort.Strings(functionNames)
38-
39-
hasher := sha256.New()
40-
41-
for _, fqn := range functionNames {
42-
workflowEntry := registry[fqn]
43-
44-
// Try to get function source location and other identifying info
45-
if pc := runtime.FuncForPC(reflect.ValueOf(workflowEntry.wrappedFunction).Pointer()); pc != nil {
46-
// Get the function's entry point - this reflects the actual compiled code
47-
entry := pc.Entry()
48-
fmt.Fprintf(hasher, "%x", entry)
49-
}
50-
}
51-
52-
return hex.EncodeToString(hasher.Sum(nil))
53-
54-
}
55-
5625
var workflowScheduler *cron.Cron // Global because accessed during workflow registration before the dbos singleton is initialized
5726

5827
var logger *slog.Logger // Global because accessed everywhere inside the library
@@ -141,6 +110,10 @@ func Initialize(inputConfig Config) error {
141110
// Set global logger
142111
logger = config.Logger
143112

113+
// Register types we serialize with gob
114+
var t time.Time
115+
gob.Register(t)
116+
144117
// Initialize global variables with environment variables, providing defaults if not set
145118
_APP_VERSION = os.Getenv("DBOS__APPVERSION")
146119
if _APP_VERSION == "" {
@@ -273,3 +246,32 @@ func Shutdown() {
273246
}
274247
dbos = nil
275248
}
249+
250+
func GetBinaryHash() (string, error) {
251+
execPath, err := os.Executable()
252+
if err != nil {
253+
return "", err
254+
}
255+
256+
file, err := os.Open(execPath)
257+
if err != nil {
258+
return "", err
259+
}
260+
defer file.Close()
261+
262+
hasher := sha256.New()
263+
if _, err := io.Copy(hasher, file); err != nil {
264+
return "", err
265+
}
266+
267+
return hex.EncodeToString(hasher.Sum(nil)), nil
268+
}
269+
270+
func computeApplicationVersion() string {
271+
hash, err := GetBinaryHash()
272+
if err != nil {
273+
fmt.Printf("DBOS: Failed to compute binary hash: %v\n", err)
274+
return ""
275+
}
276+
return hash
277+
}

dbos/dbos_test.go

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
package dbos
22

33
import (
4-
"context"
5-
"encoding/hex"
6-
"maps"
74
"testing"
85
)
96

@@ -60,28 +57,3 @@ func TestConfigValidationErrorTypes(t *testing.T) {
6057
}
6158
})
6259
}
63-
func TestAppVersion(t *testing.T) {
64-
if _, err := hex.DecodeString(_APP_VERSION); err != nil {
65-
t.Fatalf("APP_VERSION is not a valid hex string: %v", err)
66-
}
67-
68-
// Save the original registry content
69-
originalRegistry := make(map[string]workflowRegistryEntry)
70-
maps.Copy(originalRegistry, registry)
71-
72-
// Restore the registry after the test
73-
defer func() {
74-
registry = originalRegistry
75-
}()
76-
77-
// Replace the registry and verify the hash is different
78-
registry = make(map[string]workflowRegistryEntry)
79-
80-
WithWorkflow(func(ctx context.Context, input string) (string, error) {
81-
return "new-registry-workflow-" + input, nil
82-
})
83-
hash2 := computeApplicationVersion()
84-
if _APP_VERSION == hash2 {
85-
t.Fatalf("APP_VERSION hash did not change after replacing registry")
86-
}
87-
}

dbos/serialization_test.go

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"testing"
8+
"time"
89
)
910

1011
/** Test serialization and deserialization
@@ -13,6 +14,7 @@ import (
1314
[x] Workflow inputs/outputs
1415
[x] Step inputs/outputs
1516
[x] Direct handlers, polling handler, list workflows results, get step infos
17+
[x] Set/get event with user defined types
1618
*/
1719

1820
var (
@@ -289,3 +291,180 @@ func TestWorkflowEncoding(t *testing.T) {
289291
}
290292
})
291293
}
294+
295+
type UserDefinedEventData struct {
296+
ID int `json:"id"`
297+
Name string `json:"name"`
298+
Details struct {
299+
Description string `json:"description"`
300+
Tags []string `json:"tags"`
301+
} `json:"details"`
302+
}
303+
304+
func setEventUserDefinedTypeWorkflow(ctx context.Context, input string) (string, error) {
305+
eventData := UserDefinedEventData{
306+
ID: 42,
307+
Name: "test-event",
308+
Details: struct {
309+
Description string `json:"description"`
310+
Tags []string `json:"tags"`
311+
}{
312+
Description: "This is a test event with user-defined data",
313+
Tags: []string{"test", "user-defined", "serialization"},
314+
},
315+
}
316+
317+
err := SetEvent(ctx, WorkflowSetEventInput[UserDefinedEventData]{Key: input, Message: eventData})
318+
if err != nil {
319+
return "", err
320+
}
321+
return "user-defined-event-set", nil
322+
}
323+
324+
var setEventUserDefinedTypeWf = WithWorkflow(setEventUserDefinedTypeWorkflow)
325+
326+
func TestSetEventSerialize(t *testing.T) {
327+
setupDBOS(t)
328+
329+
t.Run("SetEventUserDefinedType", func(t *testing.T) {
330+
// Start a workflow that sets an event with a user-defined type
331+
setHandle, err := setEventUserDefinedTypeWf(context.Background(), "user-defined-key")
332+
if err != nil {
333+
t.Fatalf("failed to start workflow with user-defined event type: %v", err)
334+
}
335+
336+
// Wait for the workflow to complete
337+
result, err := setHandle.GetResult(context.Background())
338+
if err != nil {
339+
t.Fatalf("failed to get result from user-defined event workflow: %v", err)
340+
}
341+
if result != "user-defined-event-set" {
342+
t.Fatalf("expected result to be 'user-defined-event-set', got '%s'", result)
343+
}
344+
345+
// Retrieve the event to verify it was properly serialized and can be deserialized
346+
retrievedEvent, err := GetEvent[UserDefinedEventData](context.Background(), WorkflowGetEventInput{
347+
TargetWorkflowID: setHandle.GetWorkflowID(),
348+
Key: "user-defined-key",
349+
Timeout: 3 * time.Second,
350+
})
351+
if err != nil {
352+
t.Fatalf("failed to get user-defined event: %v", err)
353+
}
354+
355+
// Verify the retrieved data matches what we set
356+
if retrievedEvent.ID != 42 {
357+
t.Fatalf("expected ID to be 42, got %d", retrievedEvent.ID)
358+
}
359+
if retrievedEvent.Name != "test-event" {
360+
t.Fatalf("expected Name to be 'test-event', got '%s'", retrievedEvent.Name)
361+
}
362+
if retrievedEvent.Details.Description != "This is a test event with user-defined data" {
363+
t.Fatalf("expected Description to be 'This is a test event with user-defined data', got '%s'", retrievedEvent.Details.Description)
364+
}
365+
if len(retrievedEvent.Details.Tags) != 3 {
366+
t.Fatalf("expected 3 tags, got %d", len(retrievedEvent.Details.Tags))
367+
}
368+
expectedTags := []string{"test", "user-defined", "serialization"}
369+
for i, tag := range retrievedEvent.Details.Tags {
370+
if tag != expectedTags[i] {
371+
t.Fatalf("expected tag %d to be '%s', got '%s'", i, expectedTags[i], tag)
372+
}
373+
}
374+
})
375+
}
376+
377+
378+
func sendUserDefinedTypeWorkflow(ctx context.Context, destinationID string) (string, error) {
379+
// Create an instance of our user-defined type inside the workflow
380+
sendData := UserDefinedEventData{
381+
ID: 42,
382+
Name: "test-send-message",
383+
Details: struct {
384+
Description string `json:"description"`
385+
Tags []string `json:"tags"`
386+
}{
387+
Description: "This is a test send message with user-defined data",
388+
Tags: []string{"test", "user-defined", "serialization", "send"},
389+
},
390+
}
391+
392+
// Send should automatically register this type with gob
393+
// Note the explicit type parameter since compiler cannot infer UserDefinedEventData from string input
394+
err := Send(ctx, WorkflowSendInput[UserDefinedEventData]{
395+
DestinationID: destinationID,
396+
Topic: "user-defined-topic",
397+
Message: sendData,
398+
})
399+
if err != nil {
400+
return "", err
401+
}
402+
return "user-defined-message-sent", nil
403+
}
404+
405+
func recvUserDefinedTypeWorkflow(ctx context.Context, input string) (UserDefinedEventData, error) {
406+
// Receive the user-defined type message
407+
result, err := Recv[UserDefinedEventData](ctx, WorkflowRecvInput{
408+
Topic: "user-defined-topic",
409+
Timeout: 3 * time.Second,
410+
})
411+
return result, err
412+
}
413+
414+
var sendUserDefinedTypeWf = WithWorkflow(sendUserDefinedTypeWorkflow)
415+
var recvUserDefinedTypeWf = WithWorkflow(recvUserDefinedTypeWorkflow)
416+
417+
func TestSendSerialize(t *testing.T) {
418+
setupDBOS(t)
419+
420+
t.Run("SendUserDefinedType", func(t *testing.T) {
421+
// Start a receiver workflow first
422+
recvHandle, err := recvUserDefinedTypeWf(context.Background(), "recv-input")
423+
if err != nil {
424+
t.Fatalf("failed to start receive workflow: %v", err)
425+
}
426+
427+
// Start a sender workflow that sends a message with a user-defined type
428+
sendHandle, err := sendUserDefinedTypeWf(context.Background(), recvHandle.GetWorkflowID())
429+
if err != nil {
430+
t.Fatalf("failed to start workflow with user-defined send type: %v", err)
431+
}
432+
433+
// Wait for the sender workflow to complete
434+
sendResult, err := sendHandle.GetResult(context.Background())
435+
if err != nil {
436+
t.Fatalf("failed to get result from user-defined send workflow: %v", err)
437+
}
438+
if sendResult != "user-defined-message-sent" {
439+
t.Fatalf("expected result to be 'user-defined-message-sent', got '%s'", sendResult)
440+
}
441+
442+
// Wait for the receiver workflow to complete and get the message
443+
receivedData, err := recvHandle.GetResult(context.Background())
444+
if err != nil {
445+
t.Fatalf("failed to get result from receive workflow: %v", err)
446+
}
447+
448+
// Verify the received data matches what we sent
449+
if receivedData.ID != 42 {
450+
t.Fatalf("expected ID to be 42, got %d", receivedData.ID)
451+
}
452+
if receivedData.Name != "test-send-message" {
453+
t.Fatalf("expected Name to be 'test-send-message', got '%s'", receivedData.Name)
454+
}
455+
if receivedData.Details.Description != "This is a test send message with user-defined data" {
456+
t.Fatalf("expected Description to be 'This is a test send message with user-defined data', got '%s'", receivedData.Details.Description)
457+
}
458+
459+
// Verify tags
460+
expectedTags := []string{"test", "user-defined", "serialization", "send"}
461+
if len(receivedData.Details.Tags) != len(expectedTags) {
462+
t.Fatalf("expected %d tags, got %d", len(expectedTags), len(receivedData.Details.Tags))
463+
}
464+
for i, tag := range receivedData.Details.Tags {
465+
if tag != expectedTags[i] {
466+
t.Fatalf("expected tag %d to be '%s', got '%s'", i, expectedTags[i], tag)
467+
}
468+
}
469+
})
470+
}

0 commit comments

Comments
 (0)