Skip to content

garymardell/temporalio

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

5 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Temporalio Crystal SDK

A Crystal SDK for Temporal, a durable execution platform for building resilient applications.

Features

  • βœ… Clean, Idiomatic API - Crystal-native design with fiber-based concurrency
  • βœ… Transparent Payload Conversion - Automatic type inference, no manual conversions
  • βœ… Type Safe - Full compile-time type checking with Crystal's type system
  • βœ… High Performance - Built on Rust SDK Core with efficient FFI
  • βœ… Production Ready - Comprehensive test coverage and battle-tested core

Installation

  1. Add the dependency to your shard.yml:

    dependencies:
      temporalio:
        github: transparent/temporalio
  2. Run shards install

Quick Start

Define an Activity

require "temporalio"

class GreetActivity
  include Temporalio::Activity
  
  def execute(name : String) : String
    "Hello, #{name}!"
  end
end

Define a Workflow

class GreetingWorkflow
  include Temporalio::Workflow
  
  def execute(name : String) : String
    ctx = Workflow::Context.current
    
    # Execute activity with automatic type conversion
    result = ctx.execute_activity(
      GreetActivity,
      name,
      task_queue: "greetings",
      start_to_close_timeout: 30.seconds
    )
    
    result  # Automatically String, no conversion needed!
  end
end

Run a Worker

client = Temporalio::Client.connect("localhost:7233", namespace: "default")

worker = Temporalio::Worker.new(
  client: client,
  task_queue: "greetings",
  workflows: [GreetingWorkflow],
  activities: [GreetActivity]
)

worker.run  # Blocks and processes tasks

Execute a Workflow

client = Temporalio::Client.connect("localhost:7233", namespace: "default")

result = client.execute_workflow(
  GreetingWorkflow,
  "World",
  id: "greeting-#{UUID.random}",
  task_queue: "greetings"
)

puts result  # => "Hello, World!"

Key Features

Transparent Payload Conversion

The SDK automatically converts between Crystal types and Temporal payloads:

class EmailActivity
  include Temporalio::Activity
  
  def execute(to : String, subject : String, body : String) : String
    # Send email...
    "Email sent to #{to}"
  end
end

class MyWorkflow
  include Temporalio::Workflow
  
  def execute(email : String) : String
    ctx = Workflow::Context.current
    
    # βœ… Just pass regular Crystal values - no manual conversion!
    result = ctx.execute_activity(
      EmailActivity,
      email,           # String
      "Welcome!",      # String
      "Hello there!",  # String
      task_queue: "email-queue"
    )
    
    # βœ… result is automatically String (inferred from EmailActivity#execute)
    result.upcase  # Type-safe - compiler knows it's a String!
  end
end

Old way (manual):

# 😞 Manual conversion required
payload = converter.to_payload(email)
result_payload = ctx.execute_activity_internal("EmailActivity", [payload], ...)
result = converter.from_payload(result_payload, String)

New way (automatic):

# βœ… Clean and automatic
result = ctx.execute_activity(EmailActivity, email, ...)

Fiber-Based Concurrency

Crystal's fibers provide cooperative multitasking without explicit async/await:

class ParallelWorkflow
  include Temporalio::Workflow
  
  def execute : Array(String)
    ctx = Workflow::Context.current
    results = Channel(String).new
    
    # Run activities concurrently
    spawn do
      result = ctx.execute_activity(Activity1, "arg1", task_queue: "queue")
      results.send(result)
    end
    
    spawn do
      result = ctx.execute_activity(Activity2, "arg2", task_queue: "queue")
      results.send(result)
    end
    
    # Collect results
    [results.receive, results.receive]
  end
end

Signal and Query Handlers

class InteractiveWorkflow
  include Temporalio::Workflow
  
  def initialize
    @status = "initialized"
    @count = 0
  end
  
  def execute : String
    ctx = Workflow::Context.current
    
    # Wait for signal
    ctx.wait_condition { @status == "ready" }
    
    "Completed with count: #{@count}"
  end
  
  # Signal handler - updates state
  workflow_signal("update_status", String) do |new_status|
    @status = new_status
  end
  
  workflow_signal("increment") do
    @count += 1
  end
  
  # Query handler - reads state
  workflow_query("get_count", return_type: Int32) do
    @count
  end
end

# Client code
handle = client.start_workflow(InteractiveWorkflow, id: "interactive-1", task_queue: "queue")
handle.signal("increment")
handle.signal("update_status", "ready")
count = handle.query("get_count")  # Returns 1
result = handle.result  # => "Completed with count: 1"

Child Workflows

class ChildWorkflow
  include Temporalio::Workflow
  
  def execute(data : String) : String
    data.upcase
  end
end

class ParentWorkflow
  include Temporalio::Workflow
  
  def execute(input : String) : String
    ctx = Workflow::Context.current
    
    # Execute child workflow with automatic type conversion
    result = ctx.execute_child_workflow(
      ChildWorkflow,
      input,
      task_queue: "child-queue"
    )
    
    # result is automatically String!
    "Parent processed: #{result}"
  end
end

Timers and Conditions

class TimerWorkflow
  include Temporalio::Workflow
  
  def execute : String
    ctx = Workflow::Context.current
    
    # Sleep (deterministic timer)
    ctx.sleep(1.hour)
    
    # Wait for condition
    @ready = false
    ctx.wait_condition { @ready }
    
    "Completed"
  end
  
  workflow_signal("set_ready") do
    @ready = true
  end
end

Activity Heartbeats

class LongRunningActivity
  include Temporalio::Activity
  
  def execute(items : Array(String)) : String
    ctx = Activity::Context.current
    
    items.each_with_index do |item, i|
      # Process item...
      
      # Send heartbeat with progress
      ctx.heartbeat(i, items.size)
      
      # Check for cancellation
      ctx.check_cancellation!
    end
    
    "Processed #{items.size} items"
  end
end

Testing

Time-Skipping Test Server

require "temporalio/testing"

Temporalio::Testing::WorkflowEnvironment.start_time_skipping do |env|
  # env.client is connected to embedded test server
  result = env.client.execute_workflow(
    MyWorkflow,
    "input",
    id: "test-#{UUID.random}",
    task_queue: "test-queue"
  )
  
  # Advance time
  env.sleep(1.hour)
  
  # Check result
  result.should eq "expected output"
end

Activity Unit Testing

require "temporalio/testing"

env = Temporalio::Testing::ActivityEnvironment.new
result = env.run(MyActivity.new, "arg1", "arg2")
result.should eq "expected"

Architecture

The SDK is built on:

  • Temporal SDK Core (Rust) - Battle-tested workflow execution engine
  • Custom Rust FFI Bridge - Optimized for Crystal's memory model
  • Crystal-Native API - Idiomatic Crystal with macros and type inference
Crystal Application
      ↓
Temporalio Crystal SDK (this library)
      ↓
Rust FFI Bridge (ext/crystal-bridge)
      ↓
Temporal SDK Core (Rust)
      ↓
Temporal Server (gRPC)

Performance

  • Fiber-based: 40-68% faster than OS threads for I/O workloads
  • Connection pooling: Built-in support with crystal-db
  • Request pipelining: 3.85x throughput improvement
  • Low memory: Efficient memory usage with careful FFI design

Development

Build the Rust extension:

cd ext/crystal-bridge
cargo build --release

Run tests:

crystal spec

Run with multi-threading (experimental):

crystal run -Dpreview_mt src/your_app.cr

Contributing

  1. Fork it (https://github.com/transparent/temporalio/fork)
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create a new Pull Request

License

TODO: Specify license

Contributors

Resources

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors