Replies: 4 comments 1 reply
-
Stripe Integration - Low Level DesignSequence Diagrams & State Machine Definitions1. Core Event Flow Sequences1.1 Complete Usage Event Sync FlowsequenceDiagram
participant CA as Customer App
participant FP as FlexPrice API
participant ES as Event Service
participant CH as ClickHouse
participant BS as Billing Service
participant PG as PostgreSQL
participant TW as Temporal Worker
participant IS as Integration Service
participant ST as Stripe API
Note over CA,ST: Complete End-to-End Usage Event Sync Flow
%% Event Ingestion
CA->>FP: POST /events (external_customer_id: "client_123")
FP->>ES: Validate & Store Event
ES->>CH: Store Raw Event
ES->>BS: Trigger Billing Processing
%% FlexPrice Billing Processing
BS->>CH: Query Event Data
BS->>PG: Apply Pricing Rules
Note over BS,PG: Calculate free tier, credits, tiers<br/>Result: qty_billable, qty_free
BS->>PG: Store in events_processed
%% Temporal Sync Workflow (Hourly)
Note over TW: Cron Trigger: Every Hour
TW->>IS: Start Sync Decision Activity
IS->>PG: Query Unprocessed Events
loop For Each Unprocessed Event
IS->>IS: Make Sync Decision
alt qty_billable > 0
IS->>PG: Mark as 'billable' in integration_sync_state
else qty_free > 0
IS->>PG: Mark as 'skipped_free'
else
IS->>PG: Mark as 'skipped_credited'
end
end
%% Event Aggregation
TW->>IS: Start Aggregation Activity
IS->>PG: Query Billable Events (1 hour window)
IS->>IS: Group by customer_id + meter_id
IS->>PG: Mark events with batch_id
%% Stripe Sync
TW->>IS: Start Stripe Sync Activity
loop For Each Aggregated Batch
IS->>PG: Lookup Customer Mapping
IS->>ST: POST /v1/billing/meter_events
ST-->>IS: meter_event_id
IS->>PG: Update sync_status = 'synced'
end
1.2 Customer Onboarding SequencesequenceDiagram
participant C as Customer
participant ST as Stripe
participant CA as Customer App
participant FP as FlexPrice
participant WH as Webhook Handler
participant IS as Integration Service
Note over C,IS: Customer Onboarding & Mapping Flow
%% Phase 1: Stripe Setup
C->>ST: Create Customer
ST-->>C: stripe_customer_id: "cus_abc123"
C->>ST: Create Subscription
ST-->>C: subscription_id: "sub_def456"
%% Phase 2: FlexPrice Setup
C->>FP: Create Customer (external_id: "client_789")
FP-->>C: flexprice_customer_id: "fp_ghi123"
%% Phase 3: Integration Mapping
C->>WH: POST /webhook/customer-sync
Note right of WH: Payload:<br/>{<br/> "stripe_customer_id": "cus_abc123",<br/> "flexprice_customer_id": "fp_ghi123",<br/> "external_customer_id": "client_789"<br/>}
WH->>IS: Create Customer Mapping
IS->>IS: Validate IDs
IS->>PG: Insert customer_integration_mapping
IS-->>WH: Mapping Created
WH-->>C: Success
%% Phase 4: Usage Events Can Now Flow
Note over C: Ready to send usage events<br/>with external_customer_id: "client_789"
1.3 Sync Decision Processing SequencesequenceDiagram
participant TW as Temporal Worker
participant IS as Integration Service
participant PG as PostgreSQL
participant CH as ClickHouse
Note over TW,CH: Sync Decision Processing for Unprocessed Events
TW->>IS: ProcessSyncDecisionsActivity(windowStart, windowEnd)
%% Query Unprocessed Events
IS->>PG: Query events needing sync decision
Note right of PG: SELECT e.*, ep.qty_billable, ep.qty_free<br/>FROM events e<br/>JOIN events_processed ep ON e.id = ep.event_id<br/>LEFT JOIN integration_sync_state iss ON e.id = iss.event_id<br/>WHERE iss.id IS NULL
PG-->>IS: List of UnsyncedEvent objects
loop For Each Event
IS->>IS: Analyze Event Processing Results
alt qty_billable > 0
Note over IS: Event has billable usage after free tier/credits
IS->>PG: Check Provider Config for meter_id
alt Meter configured for Stripe sync
IS->>PG: CREATE integration_sync_state<br/>(status='billable', should_sync=true)
else
IS->>PG: CREATE integration_sync_state<br/>(status='skipped_config', skip_reason='meter_not_configured')
end
else qty_free > 0
Note over IS: Event covered by free tier
IS->>PG: CREATE integration_sync_state<br/>(status='skipped_free', skip_reason='free_tier_applied')
else
Note over IS: Event covered by credits
IS->>PG: CREATE integration_sync_state<br/>(status='skipped_credited', skip_reason='credit_applied')
end
end
IS-->>TW: SyncDecisionResult{EventsProcessed: count}
1.4 Event Aggregation SequencesequenceDiagram
participant TW as Temporal Worker
participant IS as Integration Service
participant PG as PostgreSQL
Note over TW,PG: Event Aggregation for Batch Sync
TW->>IS: AggregateBillableEventsActivity(windowStart, windowEnd)
%% Query Billable Events
IS->>PG: Query billable events in time window
Note right of PG: SELECT * FROM integration_sync_state iss<br/>JOIN events e ON iss.event_id = e.id<br/>WHERE sync_status = 'billable'<br/>AND aggregation_window_start BETWEEN ? AND ?<br/>AND batch_id IS NULL
PG-->>IS: List of billable sync states
%% Group and Aggregate
IS->>IS: Group by (customer_id, meter_id, provider_meter_id)
loop For Each Customer+Meter Group
IS->>IS: Sum billable_quantity
IS->>IS: Collect event_ids in group
IS->>IS: Generate batch_id = uuid()
%% Mark events as batched
IS->>PG: UPDATE integration_sync_state<br/>SET batch_id = ?, updated_at = NOW()<br/>WHERE event_id IN (...)
IS->>IS: Create AggregatedUsage object
Note right of IS: {<br/> customer_id,<br/> provider_customer_id,<br/> meter_id,<br/> provider_meter_id,<br/> total_usage: sum(billable_quantity),<br/> event_ids: [...]<br/>}
end
IS-->>TW: AggregationResult{AggregatedUsage: [...], TotalEvents: count}
1.5 Stripe API Sync SequencesequenceDiagram
participant TW as Temporal Worker
participant IS as Integration Service
participant PG as PostgreSQL
participant SC as Stripe Client
participant ST as Stripe API
Note over TW,ST: Stripe API Synchronization
TW->>IS: SyncToStripeActivity(aggregationResult)
loop For Each AggregatedUsage
%% Prepare Stripe Request
IS->>PG: Lookup Customer Mapping
PG-->>IS: provider_customer_id
IS->>IS: Build Stripe Meter Event Request
Note right of IS: event_name: meter_name<br/>payload.stripe_customer_id: provider_customer_id<br/>payload.value: total_usage
%% Make Stripe API Call
IS->>SC: CreateMeterEvent(request)
SC->>ST: POST /v1/billing/meter_events
ST-->>SC: meter_event_id: mtr_evt_abc123
SC-->>IS: stripe_event_id
%% Update Sync State
IS->>PG: UPDATE integration_sync_state SET sync_status = synced
Note right of PG: Updates all event_ids in batch<br/>with provider_event_id and timestamps
end
IS-->>TW: StripeSyncActivityResult{SyncedCount: count}
1.6 Error Handling & Retry SequencesequenceDiagram
participant TW as Temporal Worker
participant IS as Integration Service
participant PG as PostgreSQL
participant SC as Stripe Client
participant ST as Stripe API
participant RW as Retry Workflow
Note over TW,RW: Error Handling & Retry Flow
TW->>IS: SyncToStripeActivity(aggregationResult)
IS->>SC: CreateMeterEvent(request)
SC->>ST: POST /v1/billing/meter_events
alt Stripe API Error
ST-->>SC: 429 Rate Limit Error
SC-->>IS: RateLimitError
IS->>IS: Calculate exponential backoff delay
IS->>PG: UPDATE integration_sync_state<br/>SET sync_status = 'retrying',<br/>sync_attempt_count = sync_attempt_count + 1,<br/>next_retry_at = NOW() + delay,<br/>error_message = 'Rate limit exceeded'
IS-->>TW: RetryableError
TW->>TW: Schedule Retry Workflow
TW->>RW: Start RetryWorkflow(eventIds, delay)
%% Retry Workflow
RW->>RW: Sleep(delay)
RW->>IS: RetryFailedEventsActivity(eventIds)
IS->>SC: CreateMeterEvent(request) [Retry]
alt Retry Success
SC-->>IS: stripe_event_id
IS->>PG: UPDATE sync_status = 'synced'
else Retry Fails Again
IS->>PG: UPDATE sync_attempt_count++, next_retry_at
alt Max Attempts Reached
IS->>PG: UPDATE sync_status = 'failed'
IS->>IS: Send Alert to Operations
end
end
end
2. State Machine Definitions2.1 Integration Sync State MachinestateDiagram-v2
[*] --> Pending: Event received
Pending --> Billable: qty_billable > 0 AND meter_configured
Pending --> SkippedFree: qty_billable = 0 AND qty_free > 0
Pending --> SkippedCredited: qty_billable = 0 AND qty_free = 0
Pending --> SkippedConfig: qty_billable > 0 AND meter_not_configured
Billable --> Syncing: Added to batch
Syncing --> Synced: Stripe API success
Syncing --> Failed: Stripe API error
Failed --> Retrying: retry_attempts < max_attempts
Retrying --> Synced: Retry success
Retrying --> Failed: Retry failed
Failed --> ManualReview: retry_attempts >= max_attempts
SkippedFree --> [*]
SkippedCredited --> [*]
SkippedConfig --> [*]
Synced --> [*]
ManualReview --> [*]
state Failed {
[*] --> Attempt1: Initial failure
Attempt1 --> Attempt2: delay=1min
Attempt2 --> Attempt3: delay=2min
Attempt3 --> Attempt4: delay=4min
Attempt4 --> Attempt5: delay=8min
Attempt5 --> MaxAttemptsReached: delay=16min
}
2.2 Customer Mapping State MachinestateDiagram-v2
[*] --> PendingMapping: Customer created in provider
PendingMapping --> Active: Mapping created successfully
PendingMapping --> ValidationFailed: Invalid customer IDs
ValidationFailed --> PendingMapping: Retry with corrected data
ValidationFailed --> Rejected: Permanent validation failure
Active --> Suspended: Temporary sync disable
Active --> Updated: Mapping data changed
Active --> Deleted: Customer relationship ended
Suspended --> Active: Re-enable sync
Updated --> Active: Changes applied
Deleted --> [*]
Rejected --> [*]
state Active {
[*] --> Syncing
Syncing --> Healthy: All events syncing successfully
Healthy --> Degraded: Some sync failures
Degraded --> Healthy: Error resolved
Degraded --> Critical: High failure rate
Critical --> Healthy: Recovery completed
}
2.3 Temporal Workflow State MachinestateDiagram-v2
[*] --> Scheduled: Cron trigger
Scheduled --> DecisionProcessing: Start sync workflow
DecisionProcessing --> Aggregating: Sync decisions complete
DecisionProcessing --> Failed: Decision processing error
Aggregating --> Syncing: Events aggregated
Aggregating --> Completed: No billable events
Aggregating --> Failed: Aggregation error
Syncing --> Completed: All batches synced
Syncing --> PartialSuccess: Some batches failed
Syncing --> Failed: All batches failed
PartialSuccess --> RetryScheduled: Schedule retry for failed batches
Failed --> RetryScheduled: Schedule retry workflow
RetryScheduled --> Retrying: Retry delay elapsed
Retrying --> Completed: Retry successful
Retrying --> Failed: Retry failed
Failed --> AlertGenerated: Max retries exceeded
AlertGenerated --> [*]
Completed --> [*]
state DecisionProcessing {
[*] --> QueryingEvents
QueryingEvents --> AnalyzingBilling: Events retrieved
AnalyzingBilling --> CreatingSyncStates: Decisions made
CreatingSyncStates --> [*]: States persisted
}
state Syncing {
[*] --> BatchProcessing
BatchProcessing --> StripeAPICalls: Batches prepared
StripeAPICalls --> UpdatingStates: API calls complete
UpdatingStates --> [*]: States updated
}
2.4 Provider Configuration State MachinestateDiagram-v2
[*] --> Draft: Configuration created
Draft --> Validating: Submit for validation
Validating --> Active: Validation passed
Validating --> Invalid: Validation failed
Invalid --> Draft: Fix configuration
Invalid --> Rejected: Permanent failure
Active --> Paused: Temporarily disable sync
Active --> Updating: Modify configuration
Active --> Deleted: Remove configuration
Paused --> Active: Resume sync
Updating --> Validating: Validate changes
Deleted --> [*]
Rejected --> [*]
state Active {
entry / enable_meter_sync
exit / disable_meter_sync
[*] --> Monitoring
Monitoring --> Healthy: Sync working normally
Healthy --> Warning: Sync performance degraded
Warning --> Healthy: Performance recovered
Warning --> Critical: Sync failures increasing
Critical --> Healthy: Issues resolved
state Critical {
entry / alert_high_failure_rate
do / throttle_sync_attempts
}
}
state Validating {
[*] --> CheckingMeterExists
CheckingMeterExists --> CheckingProviderMeter: FlexPrice meter valid
CheckingProviderMeter --> CheckingFieldMappings: Provider meter valid
CheckingFieldMappings --> [*]: All validations passed
CheckingMeterExists --> [*]: Meter not found
CheckingProviderMeter --> [*]: Provider meter invalid
CheckingFieldMappings --> [*]: Field mapping invalid
}
2.5 Batch Processing State MachinestateDiagram-v2
[*] --> Collecting: Gather billable events
Collecting --> Grouping: Events collected for window
Collecting --> Empty: No billable events found
Grouping --> Batched: Events grouped by customer+meter
Batched --> Submitting: Submit batches to Stripe
Submitting --> AllSuccessful: All API calls succeeded
Submitting --> PartialFailure: Some API calls failed
Submitting --> AllFailed: All API calls failed
AllSuccessful --> Completed: Mark events as synced
PartialFailure --> RetryQueued: Queue failed batches for retry
AllFailed --> RetryQueued: Queue all batches for retry
RetryQueued --> [*]: Retry workflow scheduled
Empty --> [*]
Completed --> [*]
state Grouping {
[*] --> SortingByCustomer
SortingByCustomer --> SortingByMeter: Customers grouped
SortingByMeter --> AggregatingUsage: Meters grouped
AggregatingUsage --> GeneratingBatchIDs: Usage summed
GeneratingBatchIDs --> [*]: Batch IDs assigned
}
state Submitting {
[*] --> PreparingRequests
PreparingRequests --> CallingStripeAPI: Requests formatted
CallingStripeAPI --> ProcessingResponses: API calls made
ProcessingResponses --> [*]: Responses handled
}
3. State Transition Rules3.1 Sync Status Transitions
3.2 Workflow Transition Rules
3.3 Error Recovery Rules
|
Beta Was this translation helpful? Give feedback.
-
Customer Integration and Sync DesignFlexPrice Stripe Integration - Customer Management OverviewThis document outlines the design for customer integration and synchronization between FlexPrice and external billing providers (starting with Stripe). The solution enables FlexPrice to work as an enhancement layer for existing Stripe billing infrastructure while maintaining proper customer identity mapping. Problem StatementUse Case
Key Requirements
Database DesignCustomer Integration Mapping TableCREATE TABLE customer_integration_mappings (
customer_id UUID NOT NULL,
provider_type VARCHAR(50) NOT NULL,
provider_customer_id VARCHAR(255) NOT NULL,
tenant_id UUID NOT NULL,
environment_id UUID NOT NULL,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
-- Composite primary key (no artificial UUID)
PRIMARY KEY (customer_id, provider_type, tenant_id, environment_id),
-- Foreign key to customers table
CONSTRAINT fk_customer FOREIGN KEY (customer_id) REFERENCES customers(id) ON DELETE CASCADE,
-- Unique constraint for reverse lookup
CONSTRAINT uk_provider_customer UNIQUE (provider_type, provider_customer_id, tenant_id, environment_id)
);
-- Index for provider-to-customer lookup
CREATE INDEX idx_provider_customer_lookup ON customer_integration_mappings(provider_type, provider_customer_id, tenant_id, environment_id);Ent Schema (Go)// ent/schema/customerintegrationmapping.go
func (CustomerIntegrationMapping) Fields() []ent.Field {
return []ent.Field{
field.UUID("customer_id", uuid.UUID{}),
field.String("provider_type").MaxLen(50),
field.String("provider_customer_id").MaxLen(255),
field.UUID("tenant_id", uuid.UUID{}),
field.UUID("environment_id", uuid.UUID{}),
field.JSON("metadata", map[string]interface{}{}).Optional(),
field.Time("created_at").Default(time.Now),
field.Time("updated_at").Default(time.Now).UpdateDefault(time.Now),
}
}
func (CustomerIntegrationMapping) Indexes() []ent.Index {
return []ent.Index{
index.Fields("customer_id", "provider_type", "tenant_id", "environment_id").Unique(),
index.Fields("provider_type", "provider_customer_id", "tenant_id", "environment_id").Unique(),
}
}Key Design Decisions
ID Mapping StrategyThree-Layer MappingImplementation
Lookup Patterns-- Find customer by client internal ID
SELECT * FROM customers WHERE external_id = '123' AND tenant_id = ?
-- Find provider mappings for customer
SELECT * FROM customer_integration_mappings WHERE customer_id = ? AND tenant_id = ?
-- Find customer by provider ID
SELECT c.* FROM customers c
JOIN customer_integration_mappings cim ON c.id = cim.customer_id
WHERE cim.provider_type = 'stripe'
AND cim.provider_customer_id = 'cus_xyz'
AND cim.tenant_id = ?Customer Sync ApproachesEvaluated Options1. API Pull + Polling ❌Approach: Periodically poll Stripe API to fetch new customers Problems:
Verdict: Rejected due to race condition complexity 2. Just-In-Time (JIT) Fallback ❌Approach: Fetch customer from Stripe when usage event arrives for unknown customer Problems:
Verdict: Impossible due to ID mapping limitations 3. Webhook-Driven Sync ✅Approach: Client triggers FlexPrice sync after creating customer in Stripe Benefits:
Verdict: Recommended approach Recommended ImplementationBulk Import (One-time Migration)POST /integrations/stripe/customers/bulk-import
Authorization: Bearer {api_key}Implementation:
Real-time Sync (Ongoing)POST /integrations/stripe/customers/sync
Content-Type: application/json
{
"external_id": "123", // Client's internal customer ID
"stripe_customer_id": "cus_abc" // Stripe customer ID
}Required Client Flow:
API Endpoints// Bulk import existing customers
POST /integrations/stripe/customers/bulk-import
// Sync individual customer
POST /integrations/stripe/customers/sync
// Get customer integrations
GET /customers/{id}/integrations
// Add integration mapping
POST /customers/{id}/integrations
// Find customer by provider ID
GET /customers/by-provider/{provider}/{provider_id}Critical RequirementsMandatory Webhook Ordering
Error Handling
Data Requirements
Future ExtensibilityMulti-Provider SupportThe schema supports additional providers by changing
Provider-Specific MetadataUse {
"stripe": {
"subscription_id": "sub_xyz",
"payment_method": "card_abc"
}
}Implementation TimelineMVP (Week 1)
Future Enhancements
|
Beta Was this translation helpful? Give feedback.
-
PRD: Stripe Usage Sync IntegrationOverviewImplement a bidirectional usage event synchronization system between FlexPrice and Stripe, enabling customers to maintain existing Stripe billing infrastructure while leveraging FlexPrice's advanced usage-based billing capabilities. The system will sync only billable usage events (excluding free tier and credited usage) using Temporal workflows for reliability and batch aggregation to avoid rate limits. ArchitectureCore Components
Database Schema ImplementationTask 1: Create Integration Sync State Ent SchemaFile Path: Create Ent schema for integration sync state: package schema
import (
"entgo.io/ent"
"entgo.io/ent/schema/edge"
"entgo.io/ent/schema/field"
"entgo.io/ent/schema/index"
"github.com/shopspring/decimal"
)
type IntegrationSyncState struct {
ent.Schema
}
func (IntegrationSyncState) Fields() []ent.Field {
return []ent.Field{
field.String("id").Unique(),
field.String("event_id"),
field.String("tenant_id"),
field.String("environment_id"),
field.String("provider_type"),
field.String("sync_status"),
field.Bool("should_sync").Default(false),
field.String("skip_reason").Optional(),
field.String("subscription_id").Optional(),
field.String("meter_id").Optional(),
field.Other("billable_quantity", decimal.Decimal{}).SchemaType(map[string]string{
dialect.Postgres: "numeric",
}).Optional(),
field.Other("free_quantity", decimal.Decimal{}).SchemaType(map[string]string{
dialect.Postgres: "numeric",
}).Optional(),
field.Other("total_quantity", decimal.Decimal{}).SchemaType(map[string]string{
dialect.Postgres: "numeric",
}).Optional(),
field.String("provider_event_id").Optional(),
field.String("provider_meter_id").Optional(),
field.String("provider_customer_id").Optional(),
field.JSON("sync_payload", map[string]interface{}{}).Optional(),
field.Int("sync_attempt_count").Default(0),
field.Time("last_sync_attempt").Optional(),
field.Time("next_retry_at").Optional(),
field.String("error_message").Optional(),
field.String("error_code").Optional(),
field.String("batch_id").Optional(),
field.Time("aggregation_window_start").Optional(),
field.Time("aggregation_window_end").Optional(),
field.Time("created_at").Default(time.Now),
field.Time("updated_at").Default(time.Now).UpdateDefault(time.Now),
}
}
func (IntegrationSyncState) Indexes() []ent.Index {
return []ent.Index{
index.Fields("event_id", "provider_type", "tenant_id", "environment_id").Unique(),
index.Fields("tenant_id", "environment_id", "provider_type", "sync_status"),
index.Fields("next_retry_at", "sync_status"),
index.Fields("batch_id", "provider_type"),
}
}Task 2: Create Provider Config Ent SchemaFile Path: Create Ent schema for provider configurations: package schema
import (
"entgo.io/ent"
"entgo.io/ent/schema/field"
"entgo.io/ent/schema/index"
)
type IntegrationProviderConfig struct {
ent.Schema
}
func (IntegrationProviderConfig) Fields() []ent.Field {
return []ent.Field{
field.String("id").Unique(),
field.String("tenant_id"),
field.String("environment_id"),
field.String("provider_type"),
field.String("meter_id"),
field.String("provider_meter_id"),
field.Bool("sync_enabled").Default(true),
field.String("sync_frequency").Default("1h"),
field.String("aggregation_window").Default("1h"),
field.JSON("field_mappings", map[string]interface{}{}).Optional(),
field.Time("created_at").Default(time.Now),
field.Time("updated_at").Default(time.Now).UpdateDefault(time.Now),
}
}
func (IntegrationProviderConfig) Indexes() []ent.Index {
return []ent.Index{
index.Fields("tenant_id", "environment_id", "provider_type", "meter_id").Unique(),
}
}Task 3: Create Customer Integration Mapping Ent SchemaFile Path: Create Ent schema for customer mappings: package schema
import (
"entgo.io/ent"
"entgo.io/ent/schema/field"
"entgo.io/ent/schema/index"
)
type CustomerIntegrationMapping struct {
ent.Schema
}
func (CustomerIntegrationMapping) Fields() []ent.Field {
return []ent.Field{
field.String("customer_id"),
field.String("provider_type"),
field.String("provider_customer_id"),
field.String("tenant_id"),
field.String("environment_id"),
field.JSON("metadata", map[string]interface{}{}).Optional(),
field.Time("created_at").Default(time.Now),
field.Time("updated_at").Default(time.Now).UpdateDefault(time.Now),
}
}
func (CustomerIntegrationMapping) Indexes() []ent.Index {
return []ent.Index{
index.Fields("customer_id", "provider_type", "tenant_id", "environment_id").Unique(),
}
}Task 4: Generate Ent CodeFile Path: Add Ent generation command to existing Makefile: # Add to existing Makefile
generate-ent:
go generate ./entRun: Task 3: Create Customer Integration Mapping TableFile Path: Create table for mapping FlexPrice customers to external provider customers: CREATE TABLE customer_integration_mappings (
customer_id STRING,
provider_type STRING,
provider_customer_id STRING,
tenant_id STRING,
environment_id STRING,
metadata JSONB,
created_at TIMESTAMP,
updated_at TIMESTAMP,
PRIMARY KEY (customer_id, provider_type, tenant_id, environment_id)
);Domain Models ImplementationTask 4: Create Integration Sync Domain ModelFile Path: Define the domain model for integration sync state with validation and business logic: package integration
import (
"time"
"github.com/flexprice/flexprice/internal/types"
"github.com/shopspring/decimal"
)
type SyncStatus string
const (
SyncStatusBillable SyncStatus = "billable"
SyncStatusSkippedFree SyncStatus = "skipped_free"
SyncStatusSkippedCredits SyncStatus = "skipped_credited"
SyncStatusSkippedConfig SyncStatus = "skipped_config"
SyncStatusSynced SyncStatus = "synced"
SyncStatusFailed SyncStatus = "failed"
SyncStatusRetrying SyncStatus = "retrying"
)
type IntegrationSyncState struct {
ID string
EventID string
ProviderType string
SyncStatus SyncStatus
ShouldSync bool
SkipReason string
BillableQuantity decimal.Decimal
// ... other fields
}
type ProviderConfig struct {
ID string
MeterID string
ProviderMeterID string
SyncEnabled bool
SyncFrequency string
AggregationWindow string
// ... other fields
}
type CustomerMapping struct {
CustomerID string
ProviderType string
ProviderCustomerID string
// ... other fields
}Task 5: Create Integration Repository InterfaceFile Path: Define repository interface for integration operations: package integration
import "context"
type Repository interface {
CreateSyncState(ctx context.Context, syncState *IntegrationSyncState) error
UpdateSyncState(ctx context.Context, syncState *IntegrationSyncState) error
GetSyncStateByEventID(ctx context.Context, eventID string, providerType string) (*IntegrationSyncState, error)
GetBillableEventsForWindow(ctx context.Context, params *GetBillableEventsParams) ([]*IntegrationSyncState, error)
GetEventsNeedingSyncDecision(ctx context.Context, params *GetUnsyncedEventsParams) ([]*UnsyncedEvent, error)
CreateProviderConfig(ctx context.Context, config *ProviderConfig) error
GetProviderConfig(ctx context.Context, meterID string, providerType string) (*ProviderConfig, error)
CreateCustomerMapping(ctx context.Context, mapping *CustomerMapping) error
GetCustomerMapping(ctx context.Context, customerID string, providerType string) (*CustomerMapping, error)
}Repository ImplementationTask 6: Create Integration Repository ImplementationFile Path: Implement the repository interface using FlexPrice's existing Ent patterns: package ent
import (
"context"
"github.com/flexprice/flexprice/internal/domain/integration"
"github.com/flexprice/flexprice/ent"
)
type IntegrationRepository struct {
client *ent.Client
}
func NewIntegrationRepository(client *ent.Client) integration.Repository {
return &IntegrationRepository{client: client}
}
func (r *IntegrationRepository) GetEventsNeedingSyncDecision(ctx context.Context, params *integration.GetUnsyncedEventsParams) ([]*integration.UnsyncedEvent, error) {
// Query events that exist in events table, have been processed by FlexPrice,
// but don't have sync decision yet
query := `
SELECT e.id, e.event_name, e.external_customer_id, ep.qty_billable, ep.qty_free_applied
FROM events e
JOIN events_processed ep ON e.id = ep.id
LEFT JOIN integration_sync_state iss ON e.id = iss.event_id
AND iss.provider_type = ? AND iss.tenant_id = e.tenant_id
WHERE e.tenant_id = ? AND e.environment_id = ?
AND e.timestamp BETWEEN ? AND ?
AND iss.id IS NULL
ORDER BY e.timestamp ASC LIMIT ?
`
// Implementation...
}
func (r *IntegrationRepository) GetBillableEventsForWindow(ctx context.Context, params *integration.GetBillableEventsParams) ([]*integration.IntegrationSyncState, error) {
// Query billable events ready for sync within aggregation window
query := `
SELECT iss.*, e.event_name, e.properties
FROM integration_sync_state iss
JOIN events e ON iss.event_id = e.id
WHERE iss.tenant_id = ? AND iss.environment_id = ?
AND iss.provider_type = ? AND iss.sync_status = 'billable'
AND iss.aggregation_window_start BETWEEN ? AND ?
AND iss.batch_id IS NULL
ORDER BY e.timestamp ASC
`
// Implementation...
}Temporal Workflow ImplementationTask 7: Create Stripe Sync WorkflowFile Path: Implement the main workflow that orchestrates the sync process using FlexPrice's existing Temporal patterns: package workflows
import (
"time"
"go.temporal.io/sdk/workflow"
"github.com/flexprice/flexprice/internal/temporal/models"
)
func StripeSyncWorkflow(ctx workflow.Context, input models.StripeSyncInput) (*models.StripeSyncResult, error) {
logger := workflow.GetLogger(ctx)
logger.Info("Starting Stripe sync workflow", "windowStart", input.WindowStart, "windowEnd", input.WindowEnd)
activityOptions := workflow.ActivityOptions{
StartToCloseTimeout: time.Minute * 5,
RetryPolicy: &temporalsdk.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute * 2,
MaximumAttempts: 3,
},
}
ctx = workflow.WithActivityOptions(ctx, activityOptions)
// Step 1: Process sync decisions for unprocessed events
var decisionResult models.SyncDecisionResult
err := workflow.ExecuteActivity(ctx, ProcessSyncDecisionsActivity, input).Get(ctx, &decisionResult)
if err != nil {
logger.Error("Failed to process sync decisions", "error", err)
return nil, err
}
// Step 2: Aggregate billable events for sync
var aggregationResult models.AggregationResult
err = workflow.ExecuteActivity(ctx, AggregateBillableEventsActivity, input).Get(ctx, &aggregationResult)
if err != nil {
logger.Error("Failed to aggregate billable events", "error", err)
return nil, err
}
// Step 3: Sync aggregated usage to Stripe
if len(aggregationResult.AggregatedUsage) > 0 {
var syncResult models.StripeSyncActivityResult
err = workflow.ExecuteActivity(ctx, SyncToStripeActivity, aggregationResult).Get(ctx, &syncResult)
if err != nil {
logger.Error("Failed to sync to Stripe", "error", err)
return nil, err
}
}
return &models.StripeSyncResult{
EventsProcessed: decisionResult.EventsProcessed,
EventsSynced: aggregationResult.TotalEvents,
BatchesSynced: len(aggregationResult.AggregatedUsage),
}, nil
}
func CronStripeSyncWorkflow(ctx workflow.Context, input models.CronSyncInput) error {
logger := workflow.GetLogger(ctx)
// Run sync for the last hour
now := workflow.Now(ctx)
windowStart := now.Add(-1 * time.Hour)
windowEnd := now
syncInput := models.StripeSyncInput{
WindowStart: windowStart,
WindowEnd: windowEnd,
ProviderType: "stripe",
}
childWorkflowOptions := workflow.ChildWorkflowOptions{
WorkflowID: fmt.Sprintf("stripe-sync-%d", now.Unix()),
}
childCtx := workflow.WithChildOptions(ctx, childWorkflowOptions)
var result models.StripeSyncResult
future := workflow.ExecuteChildWorkflow(childCtx, StripeSyncWorkflow, syncInput)
return future.Get(ctx, &result)
}Task 8: Create Stripe Sync ActivitiesFile Path: Implement activities for sync decision processing, aggregation, and Stripe API calls: package activities
import (
"context"
"fmt"
"github.com/flexprice/flexprice/internal/domain/integration"
"github.com/flexprice/flexprice/internal/temporal/models"
)
type StripeSyncActivities struct {
integrationRepo integration.Repository
stripeClient StripeClient
}
func (a *StripeSyncActivities) ProcessSyncDecisionsActivity(ctx context.Context, input models.StripeSyncInput) (*models.SyncDecisionResult, error) {
// Get events that need sync decision
params := &integration.GetUnsyncedEventsParams{
WindowStart: input.WindowStart,
WindowEnd: input.WindowEnd,
ProviderType: input.ProviderType,
BatchSize: 1000,
}
unsyncedEvents, err := a.integrationRepo.GetEventsNeedingSyncDecision(ctx, params)
if err != nil {
return nil, err
}
var processed int
for _, event := range unsyncedEvents {
// Determine sync decision based on processed event data
syncState := a.decideSyncStatus(event)
// Save sync decision
err := a.integrationRepo.CreateSyncState(ctx, syncState)
if err != nil {
return nil, err
}
processed++
}
return &models.SyncDecisionResult{EventsProcessed: processed}, nil
}
func (a *StripeSyncActivities) AggregateBillableEventsActivity(ctx context.Context, input models.StripeSyncInput) (*models.AggregationResult, error) {
// Get billable events for the window
params := &integration.GetBillableEventsParams{
WindowStart: input.WindowStart,
WindowEnd: input.WindowEnd,
ProviderType: input.ProviderType,
}
billableEvents, err := a.integrationRepo.GetBillableEventsForWindow(ctx, params)
if err != nil {
return nil, err
}
// Aggregate by customer + meter
aggregatedUsage := a.aggregateEventsByCustomerAndMeter(billableEvents)
return &models.AggregationResult{
AggregatedUsage: aggregatedUsage,
TotalEvents: len(billableEvents),
}, nil
}
func (a *StripeSyncActivities) SyncToStripeActivity(ctx context.Context, input models.AggregationResult) (*models.StripeSyncActivityResult, error) {
batchID := generateBatchID()
var syncedCount int
for _, usage := range input.AggregatedUsage {
// Create Stripe meter event
stripeEventID, err := a.stripeClient.CreateMeterEvent(ctx, &StripeEventRequest{
EventName: usage.EventName,
Payload: map[string]interface{}{
"stripe_customer_id": usage.ProviderCustomerID,
"value": usage.TotalUsage.String(),
},
})
if err != nil {
// Mark events as failed
a.markEventsFailed(ctx, usage.EventIDs, err.Error())
continue
}
// Mark events as synced
err = a.markEventsSynced(ctx, usage.EventIDs, batchID, stripeEventID)
if err != nil {
return nil, err
}
syncedCount++
}
return &models.StripeSyncActivityResult{SyncedCount: syncedCount}, nil
}
func (a *StripeSyncActivities) decideSyncStatus(event *integration.UnsyncedEvent) *integration.IntegrationSyncState {
// If billable quantity is zero (covered by free tier/credits)
if event.QtyBillable.IsZero() {
if event.QtyFreeApplied.GreaterThan(decimal.Zero) {
return &integration.IntegrationSyncState{
EventID: event.EventID,
SyncStatus: integration.SyncStatusSkippedFree,
SkipReason: "free_tier_applied",
ShouldSync: false,
}
}
return &integration.IntegrationSyncState{
EventID: event.EventID,
SyncStatus: integration.SyncStatusSkippedCredits,
SkipReason: "credit_applied",
ShouldSync: false,
}
}
return &integration.IntegrationSyncState{
EventID: event.EventID,
SyncStatus: integration.SyncStatusBillable,
ShouldSync: true,
BillableQuantity: event.QtyBillable,
TotalQuantity: event.QtyTotal,
FreeQuantity: event.QtyFreeApplied,
}
}Task 9: Create Temporal ModelsFile Path: Define input/output models for Temporal workflows and activities: package models
import (
"time"
"github.com/shopspring/decimal"
)
type StripeSyncInput struct {
WindowStart time.Time
WindowEnd time.Time
ProviderType string
}
type CronSyncInput struct {
ProviderType string
}
type StripeSyncResult struct {
EventsProcessed int
EventsSynced int
BatchesSynced int
}
type SyncDecisionResult struct {
EventsProcessed int
}
type AggregationResult struct {
AggregatedUsage []AggregatedUsage
TotalEvents int
}
type AggregatedUsage struct {
CustomerID string
ProviderCustomerID string
MeterID string
ProviderMeterID string
EventName string
TotalUsage decimal.Decimal
EventIDs []string
}
type StripeSyncActivityResult struct {
SyncedCount int
}Stripe Client ImplementationTask 10: Create Stripe Client InterfaceFile Path: Create Stripe API client following FlexPrice's httpclient patterns: package stripe
import (
"context"
"github.com/flexprice/flexprice/internal/httpclient"
)
type Client interface {
CreateMeterEvent(ctx context.Context, req *MeterEventRequest) (string, error)
GetMeter(ctx context.Context, meterID string) (*Meter, error)
CreateMeter(ctx context.Context, req *CreateMeterRequest) (*Meter, error)
}
type client struct {
httpClient httpclient.Client
apiKey string
baseURL string
}
type MeterEventRequest struct {
EventName string `json:"event_name"`
Payload map[string]interface{} `json:"payload"`
Timestamp int64 `json:"timestamp,omitempty"`
}
type MeterEventResponse struct {
ID string `json:"id"`
EventName string `json:"event_name"`
}
func NewClient(httpClient httpclient.Client, apiKey string) Client {
return &client{
httpClient: httpClient,
apiKey: apiKey,
baseURL: "https://api.stripe.com",
}
}
func (c *client) CreateMeterEvent(ctx context.Context, req *MeterEventRequest) (string, error) {
// Implementation using FlexPrice's httpclient patterns
headers := map[string]string{
"Authorization": "Bearer " + c.apiKey,
"Content-Type": "application/json",
"Stripe-Version": "2025-04-30.preview",
}
response := &MeterEventResponse{}
err := c.httpClient.Post(ctx, c.baseURL+"/v1/billing/meter_events", req, response, headers)
if err != nil {
return "", err
}
return response.ID, nil
}Service Layer ImplementationTask 11: Create Integration ServiceFile Path: Create service layer for integration operations following FlexPrice's service patterns: package service
import (
"context"
"github.com/flexprice/flexprice/internal/domain/integration"
"github.com/flexprice/flexprice/internal/temporal"
)
type IntegrationService struct {
integrationRepo integration.Repository
temporalClient temporal.Client
}
func NewIntegrationService(
integrationRepo integration.Repository,
temporalClient temporal.Client,
) *IntegrationService {
return &IntegrationService{
integrationRepo: integrationRepo,
temporalClient: temporalClient,
}
}
func (s *IntegrationService) CreateCustomerMapping(ctx context.Context, req *CreateCustomerMappingRequest) error {
mapping := &integration.CustomerMapping{
CustomerID: req.CustomerID,
ProviderType: req.ProviderType,
ProviderCustomerID: req.ProviderCustomerID,
TenantID: types.GetTenantID(ctx),
EnvironmentID: types.GetEnvironmentID(ctx),
}
return s.integrationRepo.CreateCustomerMapping(ctx, mapping)
}
func (s *IntegrationService) TriggerSync(ctx context.Context, req *TriggerSyncRequest) error {
workflowInput := models.StripeSyncInput{
WindowStart: req.WindowStart,
WindowEnd: req.WindowEnd,
ProviderType: req.ProviderType,
}
return s.temporalClient.ExecuteWorkflow(ctx, "stripe-sync-manual", workflowInput)
}API ImplementationTask 12: Create Integration API EndpointsFile Path: Implement REST API endpoints for integration management: package v1
import (
"net/http"
"github.com/gin-gonic/gin"
"github.com/flexprice/flexprice/internal/service"
)
type IntegrationHandler struct {
integrationService *service.IntegrationService
}
func NewIntegrationHandler(integrationService *service.IntegrationService) *IntegrationHandler {
return &IntegrationHandler{integrationService: integrationService}
}
func (h *IntegrationHandler) CreateCustomerMapping(c *gin.Context) {
var req service.CreateCustomerMappingRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
err := h.integrationService.CreateCustomerMapping(c.Request.Context(), &req)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusCreated, gin.H{"status": "created"})
}
func (h *IntegrationHandler) TriggerSync(c *gin.Context) {
var req service.TriggerSyncRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
err := h.integrationService.TriggerSync(c.Request.Context(), &req)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"status": "triggered"})
}Task 13: Register API RoutesFile Path: Add integration routes to the existing router: // Add to existing router.go RegisterRoutes function
func RegisterRoutes(r *gin.Engine, handlers *Handlers) {
// ... existing routes
// Integration routes
integrationRoutes := v1Group.Group("/integrations")
{
integrationRoutes.POST("/customer-mappings", handlers.Integration.CreateCustomerMapping)
integrationRoutes.POST("/sync/trigger", handlers.Integration.TriggerSync)
integrationRoutes.GET("/sync/status", handlers.Integration.GetSyncStatus)
}
}Temporal Worker RegistrationTask 14: Register Workflows and ActivitiesFile Path: Register new workflows and activities with Temporal worker: // Add to existing registration.go
func RegisterWorkflowsAndActivities(w worker.Worker, activities *Activities) {
// ... existing registrations
// Stripe sync workflows
w.RegisterWorkflow(workflows.StripeSyncWorkflow)
w.RegisterWorkflow(workflows.CronStripeSyncWorkflow)
// Stripe sync activities
w.RegisterActivity(activities.StripeSyncActivities.ProcessSyncDecisionsActivity)
w.RegisterActivity(activities.StripeSyncActivities.AggregateBillableEventsActivity)
w.RegisterActivity(activities.StripeSyncActivities.SyncToStripeActivity)
}ConfigurationTask 15: Add Stripe ConfigurationFile Path: Add Stripe configuration to existing config structure: // Add to existing Configuration struct
type Configuration struct {
// ... existing fields
Stripe StripeConfig `mapstructure:"stripe"`
}
type StripeConfig struct {
Enabled bool `mapstructure:"enabled"`
APIKey string `mapstructure:"api_key"`
SyncFrequency time.Duration `mapstructure:"sync_frequency"`
AggregationWindow time.Duration `mapstructure:"aggregation_window"`
RetryAttempts int `mapstructure:"retry_attempts"`
RateLimit int `mapstructure:"rate_limit"`
}Task 16: Add Configuration ValidationFile Path: Add validation for Stripe configuration: func (c *Configuration) Validate() error {
// ... existing validation
if c.Stripe.Enabled {
if c.Stripe.APIKey == "" {
return errors.New("stripe.api_key is required when stripe is enabled")
}
if c.Stripe.SyncFrequency <= 0 {
return errors.New("stripe.sync_frequency must be greater than 0")
}
}
return nil
}Testing ImplementationTask 17: Create Integration TestsFile Path: Create comprehensive tests for integration service using FlexPrice's testing patterns: package service
import (
"testing"
"github.com/flexprice/flexprice/internal/testutil"
"github.com/stretchr/testify/suite"
)
type IntegrationServiceTestSuite struct {
testutil.BaseServiceSuite
integrationService *IntegrationService
}
func (suite *IntegrationServiceTestSuite) SetupTest() {
// Setup using FlexPrice's existing test patterns
suite.BaseServiceSuite.SetupTest()
suite.integrationService = NewIntegrationService(
suite.Store.IntegrationStore,
suite.TemporalClient,
)
}
func (suite *IntegrationServiceTestSuite) TestCreateCustomerMapping() {
// Test customer mapping creation
}
func (suite *IntegrationServiceTestSuite) TestSyncDecisionLogic() {
// Test sync decision logic (billable vs skipped events)
}
func TestIntegrationServiceTestSuite(t *testing.T) {
suite.Run(t, new(IntegrationServiceTestSuite))
}Task 18: Create Temporal Workflow TestsFile Path: Create tests for Temporal workflows using FlexPrice's testing patterns: package workflows
import (
"testing"
"go.temporal.io/sdk/testsuite"
)
func TestStripeSyncWorkflow(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()
// Mock activities
env.OnActivity(ProcessSyncDecisionsActivity, mock.Anything).Return(&models.SyncDecisionResult{EventsProcessed: 10}, nil)
env.OnActivity(AggregateBillableEventsActivity, mock.Anything).Return(&models.AggregationResult{TotalEvents: 5}, nil)
env.OnActivity(SyncToStripeActivity, mock.Anything).Return(&models.StripeSyncActivityResult{SyncedCount: 5}, nil)
// Execute workflow
env.ExecuteWorkflow(StripeSyncWorkflow, models.StripeSyncInput{})
// Verify results
require.True(t, env.IsWorkflowCompleted())
}DeploymentTask 19: Add Docker ConfigurationFile Path: Add Stripe configuration to existing docker-compose: # Add to existing docker-compose.yml services section
services:
flexprice:
environment:
- STRIPE_ENABLED=true
- STRIPE_API_KEY=${STRIPE_API_KEY}
- STRIPE_SYNC_FREQUENCY=1h
- STRIPE_AGGREGATION_WINDOW=1hTask 20: Update Environment ConfigurationFile Path: Add Stripe environment variables to example configuration: # Add to existing .env.example
# Stripe Configuration
STRIPE_ENABLED=false
STRIPE_API_KEY=sk_test_your_stripe_secret_key
STRIPE_SYNC_FREQUENCY=1h
STRIPE_AGGREGATION_WINDOW=1h
STRIPE_RETRY_ATTEMPTS=3
STRIPE_RATE_LIMIT=100Task 21: Create Webhook Handler for Customer SyncFile Path: Create webhook handler for customer synchronization using existing webhook infrastructure: package handler
import (
"context"
"github.com/flexprice/flexprice/internal/service"
"github.com/flexprice/flexprice/internal/webhook/dto"
)
type CustomerSyncHandler struct {
integrationService *service.IntegrationService
}
func NewCustomerSyncHandler(integrationService *service.IntegrationService) *CustomerSyncHandler {
return &CustomerSyncHandler{integrationService: integrationService}
}
func (h *CustomerSyncHandler) HandleStripeCustomerCreated(ctx context.Context, payload *dto.StripeCustomerWebhook) error {
// Create customer mapping when Stripe customer is created
req := &service.CreateCustomerMappingRequest{
CustomerID: payload.FlexPriceCustomerID,
ProviderType: "stripe",
ProviderCustomerID: payload.StripeCustomerID,
}
return h.integrationService.CreateCustomerMapping(ctx, req)
}Task 22: Create Bulk Customer Import EndpointFile Path: Create endpoint for bulk customer import: package v1
import (
"net/http"
"github.com/gin-gonic/gin"
"github.com/flexprice/flexprice/internal/service"
)
type CustomerImportHandler struct {
integrationService *service.IntegrationService
}
func NewCustomerImportHandler(integrationService *service.IntegrationService) *CustomerImportHandler {
return &CustomerImportHandler{integrationService: integrationService}
}
func (h *CustomerImportHandler) BulkImportStripeCustomers(c *gin.Context) {
var req service.BulkImportRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
result, err := h.integrationService.BulkImportStripeCustomers(c.Request.Context(), &req)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, result)
}Task 23: Create Retry WorkflowFile Path: Create workflow for handling failed sync retries: package workflows
import (
"time"
"go.temporal.io/sdk/workflow"
"github.com/flexprice/flexprice/internal/temporal/models"
)
func StripeRetryWorkflow(ctx workflow.Context, input models.RetryInput) error {
logger := workflow.GetLogger(ctx)
// Wait for retry delay (exponential backoff)
err := workflow.Sleep(ctx, input.RetryDelay)
if err != nil {
return err
}
// Execute retry activity
activityOptions := workflow.ActivityOptions{
StartToCloseTimeout: time.Minute * 3,
RetryPolicy: &temporalsdk.RetryPolicy{
InitialInterval: time.Second * 30,
MaximumAttempts: 1, // Single retry attempt per workflow
},
}
ctx = workflow.WithActivityOptions(ctx, activityOptions)
var result models.RetryResult
err = workflow.ExecuteActivity(ctx, RetryFailedEventsActivity, input).Get(ctx, &result)
if err != nil {
logger.Error("Retry failed", "error", err, "eventIDs", input.EventIDs)
return err
}
logger.Info("Retry completed", "retriedCount", result.RetriedCount)
return nil
}Task 24: Create Admin API for MonitoringFile Path: Create admin endpoints for monitoring sync status: package v1
import (
"net/http"
"github.com/gin-gonic/gin"
"github.com/flexprice/flexprice/internal/service"
)
type IntegrationAdminHandler struct {
integrationService *service.IntegrationService
}
func NewIntegrationAdminHandler(integrationService *service.IntegrationService) *IntegrationAdminHandler {
return &IntegrationAdminHandler{integrationService: integrationService}
}
func (h *IntegrationAdminHandler) GetSyncStatus(c *gin.Context) {
status, err := h.integrationService.GetSyncStatus(c.Request.Context())
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, status)
}
func (h *IntegrationAdminHandler) GetFailedEvents(c *gin.Context) {
failedEvents, err := h.integrationService.GetFailedEvents(c.Request.Context())
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, failedEvents)
}
func (h *IntegrationAdminHandler) RetryFailedEvents(c *gin.Context) {
var req service.RetryFailedEventsRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
err := h.integrationService.RetryFailedEvents(c.Request.Context(), &req)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"status": "retry_triggered"})
}Task 25: Create Backfill WorkflowFile Path: Create workflow for historical data backfill: package workflows
import (
"time"
"go.temporal.io/sdk/workflow"
"github.com/flexprice/flexprice/internal/temporal/models"
)
func StripeBackfillWorkflow(ctx workflow.Context, input models.BackfillInput) (*models.BackfillResult, error) {
logger := workflow.GetLogger(ctx)
logger.Info("Starting Stripe backfill workflow",
"startDate", input.StartDate,
"endDate", input.EndDate,
"chunkSize", input.ChunkSize)
var totalProcessed int
// Split time range into chunks to avoid overwhelming the system
chunks := splitTimeRange(input.StartDate, input.EndDate, input.ChunkSize)
for _, chunk := range chunks {
childWorkflowOptions := workflow.ChildWorkflowOptions{
WorkflowID: fmt.Sprintf("stripe-backfill-chunk-%d", chunk.Start.Unix()),
}
childCtx := workflow.WithChildOptions(ctx, childWorkflowOptions)
syncInput := models.StripeSyncInput{
WindowStart: chunk.Start,
WindowEnd: chunk.End,
ProviderType: "stripe",
IsBackfill: true,
}
var chunkResult models.StripeSyncResult
err := workflow.ExecuteChildWorkflow(childCtx, StripeSyncWorkflow, syncInput).Get(ctx, &chunkResult)
if err != nil {
logger.Error("Backfill chunk failed", "error", err, "chunk", chunk)
continue // Continue with next chunk
}
totalProcessed += chunkResult.EventsProcessed
// Add delay between chunks to avoid overwhelming Stripe API
if chunk != chunks[len(chunks)-1] { // Don't wait after last chunk
workflow.Sleep(ctx, time.Minute)
}
}
return &models.BackfillResult{
TotalProcessed: totalProcessed,
ChunksProcessed: len(chunks),
}, nil
}Task 26: Create Event Publisher for Integration EventsFile Path: Create publisher for integration-related events using existing publisher infrastructure: package publisher
import (
"context"
"github.com/flexprice/flexprice/internal/pubsub"
"github.com/flexprice/flexprice/internal/types"
)
type IntegrationPublisher struct {
publisher pubsub.Publisher
}
func NewIntegrationPublisher(publisher pubsub.Publisher) *IntegrationPublisher {
return &IntegrationPublisher{publisher: publisher}
}
func (p *IntegrationPublisher) PublishSyncCompleted(ctx context.Context, event *SyncCompletedEvent) error {
return p.publisher.Publish(ctx, &pubsub.Message{
Topic: "integration.sync.completed",
Payload: event,
Headers: map[string]string{
"tenant_id": types.GetTenantID(ctx),
"provider_type": event.ProviderType,
},
})
}
func (p *IntegrationPublisher) PublishSyncFailed(ctx context.Context, event *SyncFailedEvent) error {
return p.publisher.Publish(ctx, &pubsub.Message{
Topic: "integration.sync.failed",
Payload: event,
Headers: map[string]string{
"tenant_id": types.GetTenantID(ctx),
"provider_type": event.ProviderType,
},
})
}
type SyncCompletedEvent struct {
ProviderType string `json:"provider_type"`
BatchID string `json:"batch_id"`
EventsProcessed int `json:"events_processed"`
EventsSynced int `json:"events_synced"`
}
type SyncFailedEvent struct {
ProviderType string `json:"provider_type"`
EventIDs []string `json:"event_ids"`
ErrorMessage string `json:"error_message"`
}Task 27: Create CLI Commands for ManagementFile Path: Create CLI commands for Stripe integration management: package cmd
import (
"context"
"fmt"
"time"
"github.com/spf13/cobra"
"github.com/flexprice/flexprice/internal/service"
)
var stripeCmd = &cobra.Command{
Use: "stripe",
Short: "Stripe integration management commands",
}
var stripeSyncCmd = &cobra.Command{
Use: "sync",
Short: "Trigger Stripe sync manually",
RunE: func(cmd *cobra.Command, args []string) error {
// Initialize services (using existing pattern from other commands)
integrationService := initializeIntegrationService()
windowStart, _ := cmd.Flags().GetString("start")
windowEnd, _ := cmd.Flags().GetString("end")
start, err := time.Parse("2006-01-02T15:04:05Z", windowStart)
if err != nil {
return fmt.Errorf("invalid start time format: %w", err)
}
end, err := time.Parse("2006-01-02T15:04:05Z", windowEnd)
if err != nil {
return fmt.Errorf("invalid end time format: %w", err)
}
req := &service.TriggerSyncRequest{
WindowStart: start,
WindowEnd: end,
ProviderType: "stripe",
}
err = integrationService.TriggerSync(context.Background(), req)
if err != nil {
return fmt.Errorf("failed to trigger sync: %w", err)
}
fmt.Println("Stripe sync triggered successfully")
return nil
},
}
var stripeBackfillCmd = &cobra.Command{
Use: "backfill",
Short: "Backfill historical data to Stripe",
RunE: func(cmd *cobra.Command, args []string) error {
// Implementation for backfill command
return nil
},
}
func init() {
rootCmd.AddCommand(stripeCmd)
stripeCmd.AddCommand(stripeSyncCmd)
stripeCmd.AddCommand(stripeBackfillCmd)
stripeSyncCmd.Flags().String("start", "", "Start time (RFC3339 format)")
stripeSyncCmd.Flags().String("end", "", "End time (RFC3339 format)")
stripeSyncCmd.MarkFlagRequired("start")
stripeSyncCmd.MarkFlagRequired("end")
stripeBackfillCmd.Flags().String("start-date", "", "Start date for backfill")
stripeBackfillCmd.Flags().String("end-date", "", "End date for backfill")
stripeBackfillCmd.Flags().Duration("chunk-size", time.Hour*24, "Chunk size for backfill")
}Task 28: Create DocumentationFile Path: Create comprehensive documentation for the Stripe integration: # Stripe Integration Guide
## Overview
FlexPrice's Stripe integration enables bidirectional usage synchronization while maintaining existing Stripe billing infrastructure.
## Setup
### 1. Environment Configuration
```env
STRIPE_ENABLED=true
STRIPE_API_KEY=sk_live_your_stripe_secret_key
STRIPE_SYNC_FREQUENCY=1h
STRIPE_AGGREGATION_WINDOW=1h2. Customer MappingMap FlexPrice customers to Stripe customers: curl -X POST /api/v1/integrations/customer-mappings \
-H "Content-Type: application/json" \
-d '{
"customer_id": "cust_flexprice_123",
"provider_type": "stripe",
"provider_customer_id": "cus_stripe_456"
}'3. Meter ConfigurationConfigure which FlexPrice meters sync to Stripe: curl -X POST /api/v1/integrations/provider-configs \
-H "Content-Type: application/json" \
-d '{
"meter_id": "meter_api_calls",
"provider_type": "stripe",
"provider_meter_id": "stripe_meter_123",
"sync_enabled": true
}'UsageManual Syncflexprice stripe sync --start="2024-12-10T15:00:00Z" --end="2024-12-10T16:00:00Z"Backfill Historical Dataflexprice stripe backfill --start-date="2024-01-01" --end-date="2024-12-10"MonitoringSync Statuscurl /api/v1/integrations/sync/statusFailed Eventscurl /api/v1/admin/integrations/failed-eventsTask 30: Create Deployment ScriptsFile Path: Create deployment script for Stripe integration: #!/bin/bash
set -e
echo "Deploying Stripe Integration..."
# 1. Run database migrations
echo "Running database migrations..."
make migrate-up
# 2. Generate Ent code
echo "Generating Ent code..."
make generate-ent
# 3. Build application
echo "Building application..."
make build
# 4. Deploy to production
echo "Deploying to production..."
make deploy
# 5. Verify deployment
echo "Verifying deployment..."
curl -f http://localhost:8080/health || exit 1
echo "Stripe Integration deployment completed successfully!" |
Beta Was this translation helpful? Give feedback.
-
FlexPrice Stripe Integration - Requirements DocumentExecutive SummaryThis document defines the requirements for FlexPrice's Stripe integration, enabling bidirectional synchronization between FlexPrice's usage-based billing system and Stripe's billing infrastructure. The integration serves as an enhancement layer for existing Stripe customers while providing a migration pathway to FlexPrice. Timeline: 4-5 days for MVP implementation Business RequirementsPrimary Objectives
Success Criteria
Technical RequirementsCore RequirementsCR-1: Customer Synchronization
CR-2: Usage Event Aggregation
CR-3: Multi-tenant Support
CR-4: Error Handling & Observability
Functional RequirementsFR-1: Webhook Customer ManagementFR-2: Batch Event ProcessingFR-3: Idempotent OperationsFR-4: Configuration ManagementArchitecture OverviewSystem Architecturegraph TB
subgraph "Client Ecosystem"
App[Client Application]
Stripe[Stripe Dashboard]
end
subgraph "FlexPrice Core"
API[FlexPrice API]
Billing[Billing Engine]
CH[(ClickHouse)]
PG[(PostgreSQL)]
end
subgraph "Stripe Integration Layer"
Webhook[Webhook Handler]
Temporal[Temporal Worker]
StripeClient[Stripe Client]
BatchTracker[Batch Tracker]
end
subgraph "External Services"
StripeAPI[Stripe API]
WebhookEndpoint[Stripe Webhooks]
end
%% Customer Flow
App --> StripeAPI
StripeAPI --> WebhookEndpoint
WebhookEndpoint --> Webhook
Webhook --> PG
%% Event Flow
App --> API
API --> Billing
Billing --> CH
Temporal --> CH
Temporal --> StripeClient
StripeClient --> StripeAPI
Temporal --> BatchTracker
BatchTracker --> PG
%% Monitoring
Webhook -.-> BatchTracker
Temporal -.-> BatchTracker
Data ArchitectureerDiagram
CUSTOMERS {
uuid id PK
string external_id "Client internal ID"
string tenant_id
string environment_id
jsonb properties
timestamp created_at
}
CUSTOMER_INTEGRATION_MAPPINGS {
uuid customer_id PK,FK
string provider_type PK "stripe"
string provider_customer_id "cus_stripe_abc"
string tenant_id PK
string environment_id PK
jsonb metadata
timestamp created_at
timestamp updated_at
}
STRIPE_SYNC_BATCHES {
uuid id PK
uuid tenant_id
uuid environment_id
uuid customer_id FK
uuid meter_id FK
string stripe_customer_id
string stripe_meter_id
decimal aggregated_quantity
integer event_count
string stripe_event_id "Idempotency key"
string sync_status "success/failed/retrying"
integer retry_count
text error_message
timestamp window_start
timestamp window_end
timestamp created_at
timestamp synced_at
}
METER_PROVIDER_MAPPINGS {
uuid meter_id PK,FK
string provider_type PK "stripe"
string provider_meter_id "meter_stripe_xyz"
string tenant_id PK
string environment_id PK
boolean sync_enabled
jsonb configuration
timestamp created_at
}
STRIPE_TENANT_CONFIGS {
uuid tenant_id PK
uuid environment_id PK
text api_key_encrypted
boolean sync_enabled
integer aggregation_window_minutes
jsonb webhook_config
timestamp created_at
timestamp updated_at
}
CUSTOMERS ||--o{ CUSTOMER_INTEGRATION_MAPPINGS : maps_to
CUSTOMERS ||--o{ STRIPE_SYNC_BATCHES : generates
METERS ||--o{ METER_PROVIDER_MAPPINGS : configured_for
METERS ||--o{ STRIPE_SYNC_BATCHES : measured_by
Flow SpecificationsFlow 1: Customer ManagementsequenceDiagram
participant Client as Client App
participant Stripe as Stripe API
participant Webhook as FlexPrice Webhook
participant DB as FlexPrice DB
participant Mapping as Integration Mapping
Note over Client,Mapping: Customer Creation & Sync Flow
%% Customer creation
Client->>Stripe: Create customer with metadata
Note right of Stripe: metadata: {<br/>"external_id": "client_123",<br/>"flexprice_customer_id": "fp_xyz"<br/>}
Stripe-->>Client: customer_id: "cus_stripe_abc"
%% Webhook processing
Stripe->>Webhook: customer.created webhook
Webhook->>Webhook: Validate webhook signature
Webhook->>Webhook: Extract metadata
alt Valid metadata present
Webhook->>DB: Find customer by external_id
alt Customer exists
DB-->>Webhook: Customer found
Webhook->>Mapping: Update/create mapping
Note right of Mapping: customer_id ↔ cus_stripe_abc
else Customer doesn't exist
DB-->>Webhook: Customer not found
Webhook->>DB: Create new customer
Note right of DB: Use metadata for customer data
Webhook->>Mapping: Create integration mapping
end
Webhook-->>Stripe: 200 OK - Success
else Invalid/missing metadata
Webhook->>Webhook: Log error
Webhook-->>Stripe: 400 Bad Request
end
Note over Client,Mapping: Customer ready for usage events
Flow 2: Event SynchronizationsequenceDiagram
participant App as Client App
participant FP as FlexPrice API
participant CH as ClickHouse
participant Temporal as Temporal Worker
participant Batch as Batch Tracker
participant Stripe as Stripe API
Note over App,Stripe: Hourly Event Sync Flow
%% Event ingestion (existing FlexPrice flow)
loop Multiple usage events
App->>FP: POST /events
FP->>CH: Store raw event
FP->>CH: Process → events_processed
Note right of CH: Calculate qty_billable,<br/>qty_free, cost
end
%% Temporal cron workflow
Note over Temporal: Cron Trigger: Every Hour
Temporal->>CH: Query billable events
Note right of CH: SELECT customer_id, meter_id,<br/>SUM(qty_billable), COUNT(*)<br/>FROM events_processed<br/>WHERE qty_billable > 0<br/>AND timestamp in last hour<br/>GROUP BY customer_id, meter_id
CH-->>Temporal: Aggregated batches
loop For each customer+meter batch
Temporal->>Temporal: Lookup customer mapping
alt Customer mapping exists
Temporal->>Temporal: Lookup meter mapping
alt Meter mapping exists
Temporal->>Batch: Create batch record (pending)
Temporal->>Stripe: Create meter event
Note right of Stripe: Single event with<br/>aggregated quantity
alt Stripe API success
Stripe-->>Temporal: stripe_event_id
Temporal->>Batch: Update batch (success)
else Stripe API failure
Stripe-->>Temporal: Error response
Temporal->>Batch: Update batch (failed)
Temporal->>Temporal: Schedule retry
end
else Meter mapping missing
Temporal->>Batch: Log skip (meter not configured)
end
else Customer mapping missing
Temporal->>Batch: Log skip (customer not mapped)
end
end
Temporal->>Temporal: Complete workflow
Error Handling Flowflowchart TD
A[Sync Operation] --> B{Operation Type}
B -->|Customer Webhook| C[Webhook Error Handler]
B -->|Event Sync| D[Temporal Error Handler]
C --> E{Error Type}
E -->|Invalid Payload| F[Return 400, Log Error]
E -->|Database Error| G[Return 500, Stripe Retries]
E -->|Mapping Conflict| H[Upsert Pattern, Return 200]
D --> I{Error Type}
I -->|Stripe Rate Limit| J[Exponential Backoff Retry]
I -->|Network Timeout| K[Linear Retry with Circuit Breaker]
I -->|Auth Error| L[Immediate Failure, Alert Ops]
I -->|Invalid Data| M[Skip Batch, Log for Manual Review]
J --> N{Retry Count < Max}
N -->|Yes| O[Wait & Retry]
N -->|No| P[Mark Failed, Alert]
O --> D
P --> Q[Manual Intervention Required]
style F fill:#ffcccc
style G fill:#ffcccc
style L fill:#ffcccc
style Q fill:#ffcccc
style H fill:#ccffcc
Design Decisions & Trade-offsDecision 1: Simplified vs Complex State ManagementOriginal Design: Per-event state tracking with integration_sync_state table -- Complex state per event
CREATE TABLE integration_sync_state (
event_id UUID,
sync_status VARCHAR, -- 7+ states
skip_reason VARCHAR,
retry_count INTEGER,
-- ... many fields
);Current Design: Batch-level tracking only -- Simple batch tracking
CREATE TABLE stripe_sync_batches (
id UUID,
customer_id UUID,
meter_id UUID,
aggregated_quantity DECIMAL,
sync_status VARCHAR -- 3 states
);Trade-off Analysis:
Decision: Accept trade-off for MVP speed, add detailed tracking in Phase 2 Decision 2: Direct ClickHouse IntegrationAlternative: Event streaming via Kafka/separate event store Trade-off Analysis:
Decision: Accept coupling for MVP, consider event streaming for scale Decision 3: Webhook-First Customer SyncAlternative: API polling or JIT (Just-In-Time) creation Trade-off Analysis:
Decision: Webhook-first with monitoring, add reconciliation fallback in Phase 2 Decision 4: Hourly Aggregation WindowsAlternatives: Real-time, 15-minute, daily aggregation Trade-off Analysis:
Decision: 1-hour windows for MVP, make configurable per tenant later Implementation PlanPhase 1: Core MVP (Days 1-3)Goal: Basic customer sync + event sync working end-to-end Day 1: Foundation
Day 2: Event Sync
Day 3: Integration & Testing
Phase 2: Production Hardening (Days 4-5)Goal: Production-ready reliability and observability Day 4: Reliability
Day 5: Operations
Phase 3: Future Enhancements (Post-MVP)Scope: Advanced features for production scale
Risk AssessmentHigh-Risk Items
Medium-Risk Items
Low-Risk Items
Success MetricsFunctional Metrics
Performance Metrics
Business Metrics
ConclusionThis requirements document defines a pragmatic approach to Stripe integration that prioritizes MVP delivery speed while maintaining a clear evolution path to production sophistication. The simplified architecture trades some advanced features for implementation speed, with explicit plans to add complexity as needed. The design leverages FlexPrice's existing strengths (billing engine, event processing) while adding minimal new complexity. The webhook-driven customer sync and batch event aggregation provide a solid foundation that can be enhanced with more sophisticated patterns as the integration matures. Key Success Factor: Disciplined scope management to deliver a working demo in 5 days while maintaining code quality for future enhancements. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Stripe Integration - High Level Design
System Overview
The Stripe Integration system enables bidirectional usage synchronization between FlexPrice and Stripe, allowing customers to maintain their existing Stripe billing infrastructure while leveraging FlexPrice's advanced usage-based billing capabilities. The system acts as a bridge, intelligently deciding which usage events should be synchronized to Stripe based on billing logic and business rules.
Key Objectives
Architecture Overview
graph TB subgraph "Customer Systems" CS[Customer Application] CDB[Customer Database] end subgraph "FlexPrice Platform" subgraph "API Layer" API[REST API] WH[Webhook Endpoints] end subgraph "Core Services" ES[Event Service] BS[Billing Service] IS[Integration Service] CS_FP[Customer Service] end subgraph "Integration Layer" ISS[Integration Sync State] CM[Customer Mapping] PC[Provider Config] end subgraph "Workflow Engine" TW[Temporal Workflows] TA[Temporal Activities] end subgraph "Data Layer" PG[(PostgreSQL)] CH[(ClickHouse)] RD[(Redis)] end end subgraph "External Providers" ST[Stripe API] HB[HubSpot API] RZ[RazorPay API] end CS -->|Usage Events| API API --> ES ES --> CH ES --> BS BS --> PG IS --> ISS IS --> CM IS --> PC TW --> TA TA --> ST WH -->|Customer Sync| IS IS --> TW classDef external fill:#e1f5fe classDef core fill:#f3e5f5 classDef data fill:#e8f5e8 classDef integration fill:#fff3e0 class CS,CDB,ST,HB,RZ external class ES,BS,IS,CS_FP core class PG,CH,RD data class ISS,CM,PC,TW,TA integrationCore Components
1. Event Processing Pipeline
sequenceDiagram participant CA as Customer App participant FP as FlexPrice API participant ES as Event Service participant CH as ClickHouse participant BS as Billing Service participant PG as PostgreSQL CA->>FP: Send Usage Events FP->>ES: Validate & Process ES->>CH: Store Raw Events ES->>BS: Trigger Billing Processing BS->>CH: Query Event Data BS->>PG: Store Processed Events Note over PG: events_processed table<br/>with qty_billable, qty_free2. Integration Sync Decision Engine
flowchart TD A[Raw Event] --> B{Processed by<br/>FlexPrice Billing?} B -->|No| C[Mark as Pending] B -->|Yes| D{Billable Quantity > 0?} D -->|No| E{Free Tier Applied?} D -->|Yes| F{Meter Configured<br/>for Sync?} E -->|Yes| G[Skip - Free Tier] E -->|No| H[Skip - Credited] F -->|No| I[Skip - Not Configured] F -->|Yes| J[Mark as Billable] C --> K[integration_sync_state] G --> K H --> K I --> K J --> K classDef billable fill:#c8e6c9 classDef skipped fill:#ffecb3 classDef pending fill:#e1f5fe class J billable class G,H,I skipped class C pending3. Temporal Workflow Architecture
graph TB subgraph "Temporal Workflows" CW[Cron Sync Workflow<br/>Runs Hourly] SW[Stripe Sync Workflow<br/>Main Orchestrator] RW[Retry Workflow<br/>Failed Events] BW[Backfill Workflow<br/>Historical Data] end subgraph "Activities" SDA[Sync Decision Activity<br/>Process Unsynced Events] AGA[Aggregation Activity<br/>Group by Customer+Meter] SSA[Stripe Sync Activity<br/>API Calls] RFA[Retry Failed Activity<br/>Retry Logic] end CW --> SW SW --> SDA SW --> AGA SW --> SSA RW --> RFA BW --> SW SDA --> ISS[(Integration Sync State)] AGA --> ISS SSA --> ST[Stripe API] SSA --> ISSData Flow Architecture
1. Customer Onboarding Flow
sequenceDiagram participant C as Customer participant S as Stripe participant FP as FlexPrice participant WH as Webhook Handler participant IS as Integration Service Note over C,S: Customer creates Stripe customer C->>S: Create Customer S-->>C: customer_id: cus_stripe_123 Note over C,FP: Customer creates FlexPrice customer C->>FP: Create Customer FP-->>C: customer_id: cust_fp_456 Note over C,WH: Customer maps the two systems C->>WH: POST /webhook/customer-sync Note right of WH: Payload: {<br/> stripe_customer_id: "cus_stripe_123",<br/> flexprice_customer_id: "cust_fp_456"<br/>} WH->>IS: Create Customer Mapping IS->>IS: Store in customer_integration_mappings Note over C: Customer can now send usage events<br/>FlexPrice will sync billable usage to Stripe2. Usage Event Sync Flow
sequenceDiagram participant CA as Customer App participant FP as FlexPrice participant TW as Temporal Workflow participant ST as Stripe Note over CA,FP: 1. Usage Events Flow to FlexPrice CA->>FP: Send Usage Events<br/>(customer_id: external_123) FP->>FP: Process Events<br/>Calculate Billable Usage Note over TW: 2. Hourly Sync Workflow TW->>TW: Cron: Every Hour TW->>FP: Find Unprocessed Events FP-->>TW: Events List TW->>TW: Make Sync Decisions<br/>(billable vs skipped) TW->>TW: Aggregate Billable Events<br/>by Customer + Meter Note over TW,ST: 3. Sync to Stripe loop For each Customer+Meter TW->>ST: Create Meter Event<br/>{value: aggregated_usage} ST-->>TW: meter_event_id TW->>FP: Mark as Synced<br/>(batch_id, stripe_event_id) end3. Sync Decision Matrix
graph TD E[Event] --> P{Processed?} P -->|No| W[Wait for Processing] P -->|Yes| Q{qty_billable > 0?} Q -->|No| F{qty_free > 0?} Q -->|Yes| M{Meter Configured?} F -->|Yes| SF[Status: skipped_free<br/>Reason: free_tier_applied] F -->|No| SC[Status: skipped_credited<br/>Reason: credit_applied] M -->|No| SM[Status: skipped_config<br/>Reason: meter_not_configured] M -->|Yes| SB[Status: billable<br/>Ready for Sync] W --> P SF --> END[Store Decision] SC --> END SM --> END SB --> AGG[Add to Aggregation Queue] AGG --> END classDef billable fill:#c8e6c9 classDef skipped fill:#ffecb3 classDef waiting fill:#f8bbd9 class SB,AGG billable class SF,SC,SM skipped class W waitingCustomer Journey
Phase 1: Initial Setup
Phase 2: Usage Tracking
Phase 3: Automated Sync
Phase 4: Monitoring & Management
Integration Patterns
1. Three-Layer ID Mapping
graph LR subgraph "Customer System" CID[Customer Internal ID<br/>user_12345] end subgraph "FlexPrice" FPID[FlexPrice Customer ID<br/>cust_fp_789] EID[Event.external_customer_id<br/>user_12345] end subgraph "Stripe" SID[Stripe Customer ID<br/>cus_stripe_456] end subgraph "Mapping Table" MT[customer_integration_mappings<br/>fp_id ↔ stripe_id] end CID -.->|Usage Events| EID EID -.->|Links to| FPID FPID -.->|Maps via| MT MT -.->|Resolves to| SID classDef system fill:#e3f2fd classDef mapping fill:#fff3e0 class CID,FPID,SID system class MT,EID mapping2. Event Aggregation Strategy
gantt title Hourly Aggregation Window (3 PM - 4 PM) dateFormat HH:mm axisFormat %H:%M section Customer A Events 100 API calls :active, 15:05, 15:06 200 API calls :active, 15:15, 15:16 150 API calls :active, 15:30, 15:31 section Customer B Events 50 API calls :active, 15:05, 15:06 25 API calls :active, 15:30, 15:31 section Customer C Events 75 API calls :active, 15:15, 15:16 125 API calls :active, 15:45, 15:46 section Aggregation Workflow Trigger :milestone, 16:00, 16:00 Customer A Total 450 :crit, 16:01, 16:02 Customer B Total 75 :crit, 16:01, 16:02 Customer C Total 200 :crit, 16:01, 16:02 Send to Stripe :milestone, 16:05, 16:053. Provider Extensibility
graph TB subgraph "Core Integration Engine" ISE[Integration Sync Engine] CM[Customer Mapping] PC[Provider Config] end subgraph "Provider Adapters" SA[Stripe Adapter] HA[HubSpot Adapter] RA[RazorPay Adapter] CA[Custom Adapter] end subgraph "External APIs" ST[Stripe API] HB[HubSpot API] RZ[RazorPay API] EX[External System] end ISE --> SA ISE --> HA ISE --> RA ISE --> CA SA --> ST HA --> HB RA --> RZ CA --> EX CM -.-> ISE PC -.-> ISE classDef core fill:#e8f5e8 classDef adapter fill:#fff3e0 classDef external fill:#e1f5fe class ISE,CM,PC core class SA,HA,RA,CA adapter class ST,HB,RZ,EX externalError Handling & Resilience
1. Retry Strategy
stateDiagram-v2 [*] --> Pending Pending --> Processing: Workflow Triggered Processing --> Billable: qty_billable > 0 Processing --> Skipped: qty_billable = 0 Billable --> Syncing: Send to Stripe Syncing --> Synced: Success Syncing --> Failed: API Error Failed --> Retrying: Exponential Backoff Retrying --> Synced: Retry Success Retrying --> Failed: Retry Failed Failed --> ManualReview: Max Attempts Reached Skipped --> [*] Synced --> [*] ManualReview --> [*] state Failed { [*] --> Attempt1: 1min delay Attempt1 --> Attempt2: 2min delay Attempt2 --> Attempt3: 4min delay Attempt3 --> Attempt4: 8min delay Attempt4 --> Attempt5: 16min delay Attempt5 --> [*]: Max attempts }2. Circuit Breaker Pattern
stateDiagram-v2 [*] --> Closed: Normal Operation Closed --> Open: Failure Threshold Reached Open --> HalfOpen: Timeout Expired HalfOpen --> Closed: Test Request Success HalfOpen --> Open: Test Request Failed note right of Closed : All requests pass through\nTrack failure rate note right of Open : All requests fail fast\nNo calls to Stripe API note right of HalfOpen : Limited test requests\nGradual recovery3. Failure Categories & Responses
Monitoring & Observability
1. Key Metrics Dashboard
graph TB subgraph "Sync Metrics" SM1[Events Processed/Hour] SM2[Events Synced/Hour] SM3[Sync Success Rate] SM4[Average Sync Latency] end subgraph "Error Metrics" EM1[Failed Events Count] EM2[Retry Attempts] EM3[Circuit Breaker State] EM4[Error Rate by Type] end subgraph "Business Metrics" BM1[Free Tier Usage %] BM2[Billable Events %] BM3[Revenue Impact] BM4[Customer Adoption Rate] end subgraph "Performance Metrics" PM1[Temporal Workflow Health] PM2[API Response Times] PM3[Database Performance] PM4[Queue Depths] end classDef sync fill:#e8f5e8 classDef error fill:#ffebee classDef business fill:#e3f2fd classDef performance fill:#fff3e0 class SM1,SM2,SM3,SM4 sync class EM1,EM2,EM3,EM4 error class BM1,BM2,BM3,BM4 business class PM1,PM2,PM3,PM4 performance2. Alert Hierarchy
graph TD A[System Health Check] --> B{Sync Success Rate} B -->|< 95%| C[Warning Alert] B -->|< 85%| D[Critical Alert] B -->|< 70%| E[Emergency Alert] F[Individual Event] --> G{Retry Count} G -->|> 3 attempts| H[Error Log] G -->|> 5 attempts| I[Support Ticket] J[Temporal Workflow] --> K{Workflow Status} K -->|Failed| L[Immediate Alert] K -->|Delayed| M[Warning] C --> N[Slack Notification] D --> O[PagerDuty + Slack] E --> P[PagerDuty + Phone + Slack] classDef warning fill:#fff3e0 classDef critical fill:#ffebee classDef emergency fill:#f3e5f5 class C,H,M warning class D,I critical class E,L emergencySecurity & Compliance
1. Data Security
graph TB subgraph "Data in Transit" TLS[TLS 1.3 Encryption] API[API Key Authentication] SIGN[Request Signing] end subgraph "Data at Rest" ENC[Database Encryption] VAULT[Secret Management] BACKUP[Encrypted Backups] end subgraph "Access Control" RBAC[Role-Based Access] MFA[Multi-Factor Auth] AUDIT[Audit Logging] end subgraph "Compliance" SOC[SOC 2 Type II] GDPR[GDPR Compliance] PCI[PCI DSS] end TLS --> ENC API --> VAULT SIGN --> RBAC ENC --> SOC VAULT --> GDPR RBAC --> PCI classDef security fill:#e8f5e8 classDef compliance fill:#e3f2fd class TLS,API,SIGN,ENC,VAULT,BACKUP,RBAC,MFA,AUDIT security class SOC,GDPR,PCI compliance2. API Security Model
Performance & Scalability
1. Scalability Targets
2. Performance Optimization
graph TB subgraph "Database Optimization" IDX[Strategic Indexes] PART[Table Partitioning] CACHE[Redis Caching] end subgraph "API Optimization" POOL[Connection Pooling] BATCH[Batch Processing] COMP[Response Compression] end subgraph "Workflow Optimization" PAR[Parallel Activities] CHUNK[Chunked Processing] PRIO[Priority Queues] end IDX --> POOL PART --> BATCH CACHE --> COMP POOL --> PAR BATCH --> CHUNK COMP --> PRIO classDef db fill:#e8f5e8 classDef api fill:#e3f2fd classDef workflow fill:#fff3e0 class IDX,PART,CACHE db class POOL,BATCH,COMP api class PAR,CHUNK,PRIO workflowDeployment Architecture
1. Infrastructure Layout
graph TB subgraph "Load Balancer" LB[Application Load Balancer] end subgraph "Application Tier" APP1[FlexPrice Instance 1] APP2[FlexPrice Instance 2] APP3[FlexPrice Instance 3] end subgraph "Temporal Cluster" TW1[Temporal Worker 1] TW2[Temporal Worker 2] TS[Temporal Server] end subgraph "Data Tier" PG_M[PostgreSQL Primary] PG_S[PostgreSQL Replica] CH_C[ClickHouse Cluster] RD_C[Redis Cluster] end subgraph "External Services" ST[Stripe API] MON[Monitoring Stack] end LB --> APP1 LB --> APP2 LB --> APP3 APP1 --> TW1 APP2 --> TW2 APP3 --> TS TW1 --> PG_M TW2 --> PG_M TS --> PG_M APP1 --> CH_C APP2 --> CH_C APP3 --> RD_C TW1 --> ST TW2 --> ST APP1 --> MON TW1 --> MON classDef app fill:#e8f5e8 classDef temporal fill:#fff3e0 classDef data fill:#e3f2fd classDef external fill:#ffebee class APP1,APP2,APP3 app class TW1,TW2,TS temporal class PG_M,PG_S,CH_C,RD_C data class ST,MON external2. Deployment Strategy
timeline title Blue-Green Deployment Process Planning : Prepare new environment : Run integration tests : Verify Stripe connectivity Deployment : Deploy to Green environment : Run smoke tests : Validate Temporal workflows Validation : Process test transactions : Verify sync functionality : Monitor error rates Switch : Update load balancer : Route traffic to Green : Monitor system health Cleanup : Keep Blue as fallback : Clean up old resources : Update monitoringRisk Assessment & Mitigation
1. Technical Risks
2. Business Risks
Success Criteria
Technical Success
Business Success
Operational Success
This high-level design provides a comprehensive framework for implementing a robust, scalable, and maintainable Stripe integration that serves as a foundation for multi-provider billing synchronization.
Beta Was this translation helpful? Give feedback.
All reactions