Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/v1/server/handlers/v1/tasks/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/hatchet-dev/hatchet/api/v1/server/oas/gen"
"github.com/hatchet-dev/hatchet/internal/msgqueue"
tasktypes "github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes/v1"
"github.com/hatchet-dev/hatchet/pkg/analytics"
"github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1"
)

Expand All @@ -22,5 +23,7 @@ func (t *TasksService) V1TaskRestore(ctx echo.Context, request gen.V1TaskRestore
return nil, err
}

t.config.Analytics.Count(ctx.Request().Context(), analytics.DurableTask, analytics.Restore)

return gen.V1TaskRestore200JSONResponse{Requeued: true}, nil
}
2 changes: 2 additions & 0 deletions internal/services/admin/v1/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,8 @@ func (a *AdminServiceImpl) BranchDurableTask(ctx context.Context, req *contracts
return nil, status.Error(codes.InvalidArgument, "invalid task_external_id")
}

a.analytics.Count(ctx, analytics.DurableTask, analytics.Branch)

task, err := a.repo.Tasks().GetTaskByExternalId(ctx, tenantId, taskExternalId, true)
if err != nil {
return nil, status.Errorf(codes.NotFound, "task not found: %v", err)
Expand Down
1 change: 1 addition & 0 deletions internal/services/dispatcher/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ func (s *DispatcherImpl) ReleaseSlot(ctx context.Context, req *contracts.Release

func (s *DispatcherImpl) RestoreEvictedTask(ctx context.Context, req *contracts.RestoreEvictedTaskRequest) (*contracts.RestoreEvictedTaskResponse, error) {
tenant := ctx.Value("tenant").(*sqlcv1.Tenant)
s.analytics.Count(ctx, analytics.DurableTask, analytics.Restore)

return s.restoreEvictedTask(ctx, tenant, req)
}
Expand Down
29 changes: 29 additions & 0 deletions internal/services/dispatcher/v1/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,8 @@
return status.Errorf(codes.InvalidArgument, "invalid worker id: %v", err)
}

d.analytics.Count(ctx, analytics.DurableTask, analytics.Register)

invocation.workerId = workerId
d.workerInvocations.Store(workerId, invocation)

Expand Down Expand Up @@ -481,13 +483,13 @@
})
}

func (d *DispatcherServiceImpl) sendStaleInvocationEviction(invocation *durableTaskInvocation, sie *v1.StaleInvocationError) error {

Check warning on line 486 in internal/services/dispatcher/v1/server.go

View workflow job for this annotation

GitHub Actions / spelling

"sie" should be "size" or "sigh" or "side".
return invocation.send(&contracts.DurableTaskResponse{
Message: &contracts.DurableTaskResponse_ServerEvict{
ServerEvict: &contracts.DurableTaskServerEvictNotice{
DurableTaskExternalId: sie.TaskExternalId.String(),

Check warning on line 490 in internal/services/dispatcher/v1/server.go

View workflow job for this annotation

GitHub Actions / spelling

"sie" should be "size" or "sigh" or "side".
InvocationCount: sie.ActualInvocationCount,

Check warning on line 491 in internal/services/dispatcher/v1/server.go

View workflow job for this annotation

GitHub Actions / spelling

"sie" should be "size" or "sigh" or "side".
Reason: sie.Error(),

Check warning on line 492 in internal/services/dispatcher/v1/server.go

View workflow job for this annotation

GitHub Actions / spelling

"sie" should be "size" or "sigh" or "side".
},
},
})
Expand Down Expand Up @@ -552,6 +554,8 @@
return status.Errorf(codes.InvalidArgument, "invalid durable_task_external_id: %v", err)
}

d.analytics.Count(ctx, analytics.DurableTask, analytics.Memo)

task, err := d.repo.Tasks().GetTaskByExternalId(ctx, invocation.tenantId, taskExternalId, false)
if err != nil {
return status.Errorf(codes.NotFound, "task not found: %v", err)
Expand All @@ -571,11 +575,11 @@
})

var nde *v1.NonDeterminismError
var sie *v1.StaleInvocationError

Check warning on line 578 in internal/services/dispatcher/v1/server.go

View workflow job for this annotation

GitHub Actions / spelling

"sie" should be "size" or "sigh" or "side".
if err != nil && errors.As(err, &nde) {
return d.sendNonDeterminismError(invocation, nde, req.InvocationCount)
} else if err != nil && errors.As(err, &sie) {

Check warning on line 581 in internal/services/dispatcher/v1/server.go

View workflow job for this annotation

GitHub Actions / spelling

"sie" should be "size" or "sigh" or "side".
return d.sendStaleInvocationEviction(invocation, sie)

Check warning on line 582 in internal/services/dispatcher/v1/server.go

View workflow job for this annotation

GitHub Actions / spelling

"sie" should be "size" or "sigh" or "side".
} else if err != nil {
return status.Errorf(codes.Internal, "failed to ingest memo event: %v", err)
}
Expand Down Expand Up @@ -609,6 +613,17 @@
return status.Errorf(codes.InvalidArgument, "invalid durable_task_external_id: %v", err)
}

for _, w := range req.TriggerOpts {
d.analytics.Count(ctx, analytics.WorkflowRun, analytics.Create, analytics.Props(
"parent_is_durable_task", w.ParentTaskRunExternalId != nil,
"has_priority", w.Priority != nil,
"is_child", w.ParentId != nil,
"has_additional_meta", w.AdditionalMetadata != nil,
"has_desired_worker_id", w.DesiredWorkerId != nil,
"has_desired_worker_labels", len(w.DesiredWorkerLabels) > 0,
))
}

task, err := d.repo.Tasks().GetTaskByExternalId(ctx, invocation.tenantId, taskExternalId, false)
if err != nil {
return status.Errorf(codes.NotFound, "task not found: %v", err)
Expand Down Expand Up @@ -642,11 +657,11 @@
})

var nde *v1.NonDeterminismError
var sie *v1.StaleInvocationError

Check warning on line 660 in internal/services/dispatcher/v1/server.go

View workflow job for this annotation

GitHub Actions / spelling

"sie" should be "size" or "sigh" or "side".
if err != nil && errors.As(err, &nde) {
return d.sendNonDeterminismError(invocation, nde, req.InvocationCount)
} else if err != nil && errors.As(err, &sie) {

Check warning on line 663 in internal/services/dispatcher/v1/server.go

View workflow job for this annotation

GitHub Actions / spelling

"sie" should be "size" or "sigh" or "side".
return d.sendStaleInvocationEviction(invocation, sie)

Check warning on line 664 in internal/services/dispatcher/v1/server.go

View workflow job for this annotation

GitHub Actions / spelling

"sie" should be "size" or "sigh" or "side".
} else if err != nil {
return status.Errorf(codes.Internal, "failed to ingest trigger runs event: %v", err)
}
Expand Down Expand Up @@ -694,6 +709,16 @@
return status.Errorf(codes.InvalidArgument, "invalid durable_task_external_id: %v", err)
}

var hasSleep, hasUserEvent bool
if req.WaitForConditions != nil {
hasSleep = len(req.WaitForConditions.SleepConditions) > 0
hasUserEvent = len(req.WaitForConditions.UserEventConditions) > 0
}
d.analytics.Count(ctx, analytics.DurableTask, analytics.WaitFor, analytics.Props(
"has_sleep", hasSleep,
"has_user_event", hasUserEvent,
))

task, err := d.repo.Tasks().GetTaskByExternalId(ctx, invocation.tenantId, taskExternalId, false)
if err != nil {
return status.Errorf(codes.NotFound, "task not found: %v", err)
Expand Down Expand Up @@ -783,6 +808,8 @@
return status.Errorf(codes.InvalidArgument, "invalid durable_task_external_id: %v", err)
}

d.analytics.Count(ctx, analytics.DurableTask, analytics.Memo)

err = d.repo.DurableEvents().CompleteMemoEntry(ctx, v1.CompleteMemoEntryOpts{
TenantId: invocation.tenantId,
TaskExternalId: taskExternalId,
Expand Down Expand Up @@ -827,6 +854,8 @@
return d.sendEvictionError(invocation, req, fmt.Sprintf("invalid durable_task_external_id: %v", err))
}

d.analytics.Count(ctx, analytics.DurableTask, analytics.Evict)

task, err := d.repo.Tasks().GetTaskByExternalId(ctx, invocation.tenantId, taskExternalId, false)
if err != nil {
return d.sendEvictionError(invocation, req, fmt.Sprintf("task not found: %v", err))
Expand Down
6 changes: 6 additions & 0 deletions pkg/analytics/analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
Event Resource = "event"
WorkflowRun Resource = "workflow-run"
TaskRun Resource = "task-run"
DurableTask Resource = "durable-task"
Worker Resource = "worker"
RateLimit Resource = "rate-limit"
Webhook Resource = "webhook"
Expand Down Expand Up @@ -45,6 +46,11 @@ const (
Release Action = "release"
Refresh Action = "refresh"
Send Action = "send"
Evict Action = "evict"
Restore Action = "restore"
Branch Action = "branch"
Memo Action = "memo"
WaitFor Action = "wait-for"
)

type Properties map[string]interface{}
Expand Down
Loading