Skip to content

domain‐model

Andrei G edited this page Aug 12, 2025 · 1 revision

Domain Model

This diagram shows the core domain entities, value objects, and their relationships in the PJS system.

classDiagram
direction TB
    class StreamSession {
     +SessionId id
     +Vec[Stream] streams
     +StreamSessionStatus status
     +Instant created_at
     +Option[Instant] completed_at
     +create() StreamSession
     +add_stream(stream: Stream) Result[() , DomainError]
     +complete_session() Result[() , DomainError]
     +get_session_metrics() SessionMetrics
    }

    class Stream {
     +StreamId id
     +SessionId session_id
     +JsonValue original_data
     +Vec[Frame] frames
     +StreamStatus status
     +Priority base_priority
     +Instant created_at
     +create(data: JsonValue) Result[Stream, DomainError]
     +generate_frames() Result[Vec[Frame], DomainError]
     +get_frame_by_path(path: JsonPath) Option[Frame]
     +calculate_total_size() usize
    }

    class Frame {
     +FrameId id
     +StreamId stream_id
     +JsonPath path
     +JsonValue data
     +Priority priority
     +FrameType frame_type
     +usize size_bytes
     +Option[Checksum] checksum
     +create(path: JsonPath, data: JsonValue) Frame
     +calculate_priority() Priority
     +validate_data() Result[() , DomainError]
     +serialize() Result[Bytes, DomainError]
    }

    class Priority {
     +NonZeroU8 value
     +CRITICAL: Priority
     +HIGH: Priority
     +MEDIUM: Priority
     +LOW: Priority
     +BACKGROUND: Priority
     +new(value: u8) Result[Priority, DomainError]
     +increase_by(delta: u8) Priority
     +decrease_by(delta: u8) Priority
     +is_critical() bool
     +to_percentage() f32
    }

    class JsonPath {
     +String path
     +Vec[PathSegment] segments
     +new(path: String) Result[JsonPath, DomainError]
     +append(segment: PathSegment) JsonPath
     +parent() Option[JsonPath]
     +depth() usize
     +is_array_index() bool
     +matches_pattern(pattern: &str) bool
    }

    class SessionId {
     +Uuid value
     +new() SessionId
     +from_str(s: &str) Result[SessionId, DomainError]
     +as_bytes() &[u8]
    }

    class StreamId {
     +Uuid value
     +new() StreamId
     +from_session_and_index(session: SessionId, index: u32) StreamId
    }

    class FrameId {
     +Uuid value
     +new() FrameId
     +from_stream_and_path(stream: StreamId, path: JsonPath) FrameId
    }

    class StreamSessionStatus {
     +Active
     +Streaming
     +Completed
     +Failed(DomainError)
    }

    class StreamStatus {
     +Created
     +Processing
     +Streaming
     +Completed
     +Failed(DomainError)
    }

    class FrameType {
     +Skeleton
     +Data
     +Array
     +Object
     +Complete
    }

    class PathSegment {
     +Wildcard
     +RecursiveDescent
     +Field(String)
     +Index(usize)
    }

    class PriorityCalculator {
     +calculate_frame_priority(frame: &Frame, strategy: PriorityStrategy) Priority
     +calculate_stream_priority(stream: &Stream) Priority
     +adjust_priority_by_urgency(base: Priority, urgency: AdjustmentUrgency) Priority
    }

    class FrameGenerator {
     +generate_skeleton(data: &JsonValue) Frame
     +generate_data_frames(data: &JsonValue, strategy: FramingStrategy) Vec[Frame]
     +optimize_frame_sequence(frames: &mut Vec[Frame]) Result[() , DomainError]
    }

    class StreamOrchestrator {
     +create_session(config: StreamingConfig) Result[StreamSession, DomainError]
     +process_streams(session: &mut StreamSession) Result[() , DomainError]
     +prioritize_frames(frames: &mut Vec[Frame]) Result[() , DomainError]
    }

    class SessionStarted {
     +SessionId session_id
     +Instant timestamp
     +StreamingConfig config
    }

    class StreamAdded {
     +SessionId session_id
     +StreamId stream_id
     +usize estimated_frames
     +Priority base_priority
    }

    class FrameGenerated {
     +StreamId stream_id
     +FrameId frame_id
     +JsonPath path
     +Priority priority
     +usize size_bytes
    }

    class SessionCompleted {
     +SessionId session_id
     +Duration total_duration
     +SessionMetrics metrics
    }

    class Checksum {
     +String hash
     +Algorithm algorithm
     +verify(data: &[u8]) bool
    }

    class Bytes {
     +Vec[u8] data
     +len() usize
    }

    class JsonValue {
     +Value inner
     +get_type() JsonType
     +navigate(path: &JsonPath) Option[JsonValue]
    }

    class Instant {
     +u64 timestamp
     +elapsed() Duration
    }

    class Duration {
     +u64 millis
     +as_secs() u64
    }

    class SessionMetrics {
     +u64 total_frames
     +Duration processing_time
     +usize total_bytes
    }

    class SessionRepository {
     +save(session: StreamSession) Result[() , RepositoryError]
     +find_by_id(id: SessionId) Option[StreamSession]
     +find_active_sessions() Vec[StreamSession]
     +delete(id: SessionId) Result[() , RepositoryError]
    }

    class StreamRepository {
     +save(stream: Stream) Result[() , RepositoryError]
     +find_by_id(id: StreamId) Option[Stream]
     +find_by_session(session_id: SessionId) Vec[Stream]
    }

 <<enumeration>> StreamSessionStatus
 <<enumeration>> StreamStatus
 <<enumeration>> FrameType
 <<enumeration>> PathSegment
 <<service>> PriorityCalculator
 <<service>> FrameGenerator
 <<service>> StreamOrchestrator
 <<interface>> SessionRepository
 <<interface>> StreamRepository

    StreamSession "1" --> "many" Stream : contains
    Stream "1" --> "many" Frame : generates
    Stream "1" --> Priority : has_base
    Frame "1" --> Priority : has
    Frame "1" --> JsonPath : located_at
    Frame "1" --> FrameType : is_type
    StreamSession "1" --> SessionId : identified_by
    Stream "1" --> StreamId : identified_by
    Frame "1" --> FrameId : identified_by
    StreamSession "1" --> StreamSessionStatus : has_status
    Stream "1" --> StreamStatus : has_status
    JsonPath "1" --> "many" PathSegment : composed_of
    PriorityCalculator ..> Frame : calculates
    PriorityCalculator ..> Priority : produces
    FrameGenerator ..> Frame : creates
    FrameGenerator ..> JsonPath : uses
    StreamOrchestrator ..> StreamSession : orchestrates
    StreamOrchestrator ..> Stream : manages
    StreamOrchestrator ..> Priority : evaluates
    SessionStarted ..> SessionId : references
    StreamAdded ..> StreamId : references
    StreamAdded ..> SessionId : references
    StreamAdded ..> Priority : includes
    FrameGenerated ..> FrameId : references
    FrameGenerated ..> StreamId : references
    FrameGenerated ..> JsonPath : includes
    FrameGenerated ..> Priority : includes
    SessionCompleted ..> SessionId : references
    SessionRepository ..> StreamSession : persists
    SessionRepository ..> SessionId : uses_key
    StreamRepository ..> Stream : persists
    StreamRepository ..> StreamId : uses_key
    StreamRepository ..> SessionId : filters_by
    Frame "1" --> Checksum : may_have
    Frame "1" --> Bytes : serializes_to
    Stream "1" --> JsonValue : contains_original
    Stream "1" --> Instant : created_at
    StreamSession "1" --> Instant : created_at
    StreamSession "1" --> Instant : completed_at
    StreamSession "1" --> SessionMetrics : produces
    SessionCompleted "1" --> Duration : includes
    SessionCompleted "1" --> SessionMetrics : contains
    SessionStarted "1" --> Instant : timestamp
    StreamAdded "1" --> Instant : timestamp
    FrameGenerated "1" --> Instant : timestamp
    Duration "1" --> Instant : measures_between
    PriorityCalculator ..> StreamSessionStatus : considers
    PriorityCalculator ..> StreamStatus : evaluates
    FrameGenerator ..> FrameType : assigns
    FrameGenerator ..> JsonValue : processes
    FrameGenerator ..> Bytes : produces
    StreamOrchestrator ..> FrameGenerator : uses
    StreamOrchestrator ..> PriorityCalculator : uses
    SessionStarted ..> StreamOrchestrator : triggers
    StreamAdded ..> FrameGenerator : triggers
    FrameGenerated ..> PriorityCalculator : triggers
Loading

Domain Concepts

Aggregates

StreamSession is the primary aggregate root that maintains consistency boundaries around streaming operations. It ensures that all streams within a session follow the same configuration and lifecycle rules.

Entities

  • Stream: Represents a single JSON data stream with its own lifecycle and frame generation
  • Frame: Individual units of data with priority, path, and type information

Value Objects

  • Priority: Type-safe priority system with predefined levels and validation
  • JsonPath: JSONPath expression for precise data location
  • SessionId/StreamId/FrameId: Type-safe identifiers preventing ID confusion

Domain Services

  • PriorityCalculator: Complex business logic for priority assignment
  • FrameGenerator: Algorithms for optimal frame creation and sequencing
  • StreamOrchestrator: High-level coordination of streaming operations

Domain Events

Events capture important business moments and enable loose coupling between domain components. All events are immutable and carry sufficient context for event handlers.

Business Rules

  1. Priority Consistency: All frames within a stream respect the stream's base priority
  2. Path Uniqueness: No two frames in a stream can have identical JsonPath
  3. Session Lifecycle: Sessions must be explicitly completed or will timeout
  4. Frame Ordering: Higher priority frames are always processed first
  5. Data Integrity: Each frame maintains a checksum for validation

Invariants

  • Priority values are always non-zero (1-255 range)
  • JsonPath expressions are syntactically valid
  • Stream status transitions follow defined state machine
  • Frame data matches the declared JsonPath location
  • Session contains at least one stream when active
Clone this wiki locally