-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathjobs_integration.go
More file actions
136 lines (113 loc) · 2.75 KB
/
jobs_integration.go
File metadata and controls
136 lines (113 loc) · 2.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package smtp
import (
"context"
"encoding/json"
"github.com/google/uuid"
"github.com/roadrunner-server/api/v4/plugins/v4/jobs"
)
// Jobs is the interface provided by Jobs plugin for pushing jobs
type Jobs interface {
Push(ctx context.Context, msg jobs.Message) error
}
// Job represents a job message to be pushed to Jobs plugin
// Implements jobs.Message interface
type Job struct {
// Job contains name of job broker (usually PHP class)
Job string `json:"job"`
// Ident is unique identifier of the job
Ident string `json:"id"`
// Pld is the payload (usually JSON)
Pld []byte `json:"payload"`
// Hdr contains headers with key-value pairs
Hdr map[string][]string `json:"headers"`
// Options contains job execution options
Options *JobOptions `json:"options,omitempty"`
}
// JobOptions carry information about how to handle given job
type JobOptions struct {
// Priority is job priority, default - 10
Priority int64 `json:"priority"`
// Pipeline manually specified pipeline
Pipeline string `json:"pipeline,omitempty"`
// Delay defines time duration to delay execution for
Delay int64 `json:"delay,omitempty"`
// AutoAck use to ack a job right after it arrived from the driver
AutoAck bool `json:"auto_ack"`
}
// Implement jobs.Message interface methods
func (j *Job) ID() string {
return j.Ident
}
func (j *Job) GroupID() string {
if j.Options == nil {
return ""
}
return j.Options.Pipeline
}
func (j *Job) Priority() int64 {
if j.Options == nil {
return 10
}
return j.Options.Priority
}
func (j *Job) Name() string {
return j.Job
}
func (j *Job) Payload() []byte {
return j.Pld
}
func (j *Job) Headers() map[string][]string {
return j.Hdr
}
func (j *Job) Delay() int64 {
if j.Options == nil {
return 0
}
return j.Options.Delay
}
func (j *Job) AutoAck() bool {
if j.Options == nil {
return false
}
return j.Options.AutoAck
}
// Kafka-specific methods (required by jobs.Message interface)
func (j *Job) Offset() int64 {
return 0
}
func (j *Job) Partition() int32 {
return 0
}
func (j *Job) Topic() string {
return ""
}
func (j *Job) Metadata() string {
return ""
}
func (j *Job) UpdatePriority(p int64) {
if j.Options == nil {
j.Options = &JobOptions{}
}
j.Options.Priority = p
}
// emailToJobMessage converts EmailData to a jobs.Message for the Jobs plugin
func emailToJobMessage(email *EmailData, cfg *JobsConfig) jobs.Message {
payload, _ := json.Marshal(email)
// Generate a unique job ID
jobID := uuid.NewString()
return &Job{
Job: "smtp.email",
Ident: jobID,
Pld: payload,
Hdr: map[string][]string{
"uuid": {email.UUID},
"payload_class": {"smtp:handler"},
},
Options: &JobOptions{
Pipeline: cfg.Pipeline,
Priority: cfg.Priority,
Delay: cfg.Delay,
AutoAck: cfg.AutoAck,
},
}
}