@@ -33,18 +33,26 @@ module Sidekiq
33
33
if defined? ( ::Sidekiq ::JobLogger )
34
34
# Let Semantic Logger handle duration logging
35
35
class JobLogger
36
- def call ( item , queue )
36
+ def call ( item , queue , & block )
37
37
klass = item [ "wrapped" ] || item [ "class" ]
38
- metric = "Sidekiq/#{ klass } /perform" if klass
39
38
logger = klass ? SemanticLogger [ klass ] : Sidekiq . logger
40
- logger . info ( "Start #perform" )
41
- logger . measure_info (
42
- "Completed #perform" ,
43
- on_exception_level : :error ,
44
- log_exception : :full ,
45
- metric : metric
46
- ) do
47
- yield
39
+
40
+ SemanticLogger . tagged ( queue : queue ) do
41
+ # Latency is the time between when the job was enqueued and when it started executing.
42
+ logger . info (
43
+ "Start #perform" ,
44
+ metric : "sidekiq.queue.latency" ,
45
+ metric_amount : job_latency_ms ( item )
46
+ )
47
+
48
+ # Measure the duration of running the job
49
+ logger . measure_info (
50
+ "Completed #perform" ,
51
+ on_exception_level : :error ,
52
+ log_exception : :full ,
53
+ metric : "sidekiq.job.perform" ,
54
+ &block
55
+ )
48
56
end
49
57
end
50
58
@@ -60,14 +68,18 @@ def prepare(job_hash, &block)
60
68
end
61
69
62
70
def job_hash_context ( job_hash )
63
- h = {
64
- class : job_hash [ "display_class" ] || job_hash [ "wrapped" ] || job_hash [ "class" ] ,
65
- jid : job_hash [ "jid" ]
66
- }
67
- h [ :bid ] = job_hash [ "bid" ] if job_hash [ "bid" ]
68
- h [ :tags ] = job_hash [ "tags" ] if job_hash [ "tags" ]
71
+ h = { jid : job_hash [ "jid" ] }
72
+ h [ :bid ] = job_hash [ "bid" ] if job_hash [ "bid" ]
73
+ h [ :tags ] = job_hash [ "tags" ] if job_hash [ "tags" ]
74
+ h [ :queue ] = job_hash [ "queue" ] if job_hash [ "queue" ]
69
75
h
70
76
end
77
+
78
+ def job_latency_ms ( job )
79
+ return unless job && job [ "enqueued_at" ]
80
+
81
+ ( Time . now . to_f - job [ "enqueued_at" ] . to_f ) * 1000
82
+ end
71
83
end
72
84
end
73
85
@@ -80,10 +92,10 @@ def self.with_context(msg, &block)
80
92
end
81
93
82
94
def self . job_hash_context ( job_hash )
83
- klass = job_hash [ "wrapped" ] || job_hash [ "class" ]
84
- event = { class : klass , jid : job_hash [ "jid" ] }
85
- event [ :bid ] = job_hash [ "bid " ] if job_hash [ "bid " ]
86
- event
95
+ h = { jid : job_hash [ "jid" ] }
96
+ h [ :bid ] = job_hash [ "bid" ] if job_hash [ "bid" ]
97
+ h [ :queue ] = job_hash [ "queue " ] if job_hash [ "queue " ]
98
+ h
87
99
end
88
100
end
89
101
end
@@ -93,49 +105,51 @@ def self.job_hash_context(job_hash)
93
105
if defined? ( ::Sidekiq ::ExceptionHandler )
94
106
module ExceptionHandler
95
107
class Logger
96
- def call ( ex , ctx )
97
- unless ctx . empty?
98
- job_hash = ctx [ :job ] || { }
99
- klass = job_hash [ "display_class" ] || job_hash [ "wrapped" ] || job_hash [ "class" ]
100
- logger = klass ? SemanticLogger [ klass ] : Sidekiq . logger
101
- ctx [ :context ] ? logger . warn ( ctx [ :context ] , ctx ) : logger . warn ( ctx )
102
- end
108
+ def call ( _exception , ctx )
109
+ return if ctx . empty?
110
+
111
+ job_hash = ctx [ :job ] || { }
112
+ klass = job_hash [ "display_class" ] || job_hash [ "wrapped" ] || job_hash [ "class" ]
113
+ logger = klass ? SemanticLogger [ klass ] : Sidekiq . logger
114
+ ctx [ :context ] ? logger . warn ( ctx [ :context ] , ctx ) : logger . warn ( ctx )
103
115
end
104
116
end
105
117
end
106
- # Sidekiq >= v7
118
+ # Sidekiq >= v7
107
119
elsif defined? ( ::Sidekiq ::Config )
108
120
class Config
109
121
remove_const :ERROR_HANDLER
110
122
111
- ERROR_HANDLER = -> ( ex , ctx , cfg = Sidekiq . default_configuration ) {
123
+ ERROR_HANDLER = lambda { | _ex , ctx , _cfg = Sidekiq . default_configuration |
112
124
unless ctx . empty?
113
125
job_hash = ctx [ :job ] || { }
114
- klass = job_hash [ "display_class" ] || job_hash [ "wrapped" ] || job_hash [ "class" ]
115
- logger = klass ? SemanticLogger [ klass ] : Sidekiq . logger
126
+ klass = job_hash [ "display_class" ] || job_hash [ "wrapped" ] || job_hash [ "class" ]
127
+ logger = klass ? SemanticLogger [ klass ] : Sidekiq . logger
116
128
ctx [ :context ] ? logger . warn ( ctx [ :context ] , ctx ) : logger . warn ( ctx )
117
129
end
118
130
}
119
131
end
120
132
else
121
133
# Sidekiq >= 6.5
122
134
# TODO: Not taking effect. See test/sidekiq_test.rb
123
- def self . default_error_handler ( ex , ctx )
124
- binding . irb
125
- unless ctx . empty?
126
- job_hash = ctx [ :job ] || { }
127
- klass = job_hash [ "display_class" ] || job_hash [ "wrapped" ] || job_hash [ "class" ]
128
- logger = klass ? SemanticLogger [ klass ] : Sidekiq . logger
129
- ctx [ :context ] ? logger . warn ( ctx [ :context ] , ctx ) : logger . warn ( ctx )
130
- end
135
+ def self . default_error_handler ( _exception , ctx )
136
+ return if ctx . empty?
137
+
138
+ job_hash = ctx [ :job ] || { }
139
+ klass = job_hash [ "display_class" ] || job_hash [ "wrapped" ] || job_hash [ "class" ]
140
+ logger = klass ? SemanticLogger [ klass ] : Sidekiq . logger
141
+ ctx [ :context ] ? logger . warn ( ctx [ :context ] , ctx ) : logger . warn ( ctx )
131
142
end
132
143
end
133
144
134
145
# Logging within each worker should use its own logger
135
- if Sidekiq ::VERSION . to_i == 4
146
+ case Sidekiq ::VERSION . to_i
147
+ when 4
136
148
module Worker
137
149
def self . included ( base )
138
- raise ArgumentError , "You cannot include Sidekiq::Worker in an ActiveJob: #{ base . name } " if base . ancestors . any? { |c | c . name == "ActiveJob::Base" }
150
+ raise ArgumentError , "You cannot include Sidekiq::Worker in an ActiveJob: #{ base . name } " if base . ancestors . any? do |c |
151
+ c . name == "ActiveJob::Base"
152
+ end
139
153
140
154
base . extend ( ClassMethods )
141
155
base . include ( SemanticLogger ::Loggable )
@@ -144,10 +158,12 @@ def self.included(base)
144
158
base . class_attribute :sidekiq_retries_exhausted_block
145
159
end
146
160
end
147
- elsif Sidekiq :: VERSION . to_i == 5
161
+ when 5
148
162
module Worker
149
163
def self . included ( base )
150
- raise ArgumentError , "You cannot include Sidekiq::Worker in an ActiveJob: #{ base . name } " if base . ancestors . any? { |c | c . name == "ActiveJob::Base" }
164
+ raise ArgumentError , "You cannot include Sidekiq::Worker in an ActiveJob: #{ base . name } " if base . ancestors . any? do |c |
165
+ c . name == "ActiveJob::Base"
166
+ end
151
167
152
168
base . extend ( ClassMethods )
153
169
base . include ( SemanticLogger ::Loggable )
@@ -156,10 +172,12 @@ def self.included(base)
156
172
base . sidekiq_class_attribute :sidekiq_retries_exhausted_block
157
173
end
158
174
end
159
- elsif Sidekiq :: VERSION . to_i == 6
175
+ when 6
160
176
module Worker
161
177
def self . included ( base )
162
- raise ArgumentError , "Sidekiq::Worker cannot be included in an ActiveJob: #{ base . name } " if base . ancestors . any? { |c | c . name == "ActiveJob::Base" }
178
+ raise ArgumentError , "Sidekiq::Worker cannot be included in an ActiveJob: #{ base . name } " if base . ancestors . any? do |c |
179
+ c . name == "ActiveJob::Base"
180
+ end
163
181
164
182
base . include ( Options )
165
183
base . extend ( ClassMethods )
@@ -169,7 +187,9 @@ def self.included(base)
169
187
else
170
188
module Job
171
189
def self . included ( base )
172
- raise ArgumentError , "Sidekiq::Job cannot be included in an ActiveJob: #{ base . name } " if base . ancestors . any? { |c | c . name == "ActiveJob::Base" }
190
+ raise ArgumentError , "Sidekiq::Job cannot be included in an ActiveJob: #{ base . name } " if base . ancestors . any? do |c |
191
+ c . name == "ActiveJob::Base"
192
+ end
173
193
174
194
base . include ( Options )
175
195
base . extend ( ClassMethods )
@@ -182,28 +202,39 @@ def self.included(base)
182
202
# Convert string to machine readable format
183
203
class Processor
184
204
def log_context ( job_hash )
185
- klass = job_hash [ "wrapped" ] || job_hash [ "class" ]
186
- event = { class : klass , jid : job_hash [ "jid" ] }
187
- event [ :bid ] = job_hash [ "bid " ] if job_hash [ "bid " ]
188
- event
205
+ h = { jid : job_hash [ "jid" ] }
206
+ h [ :bid ] = job_hash [ "bid" ] if job_hash [ "bid" ]
207
+ h [ :queue ] = job_hash [ "queue " ] if job_hash [ "queue " ]
208
+ h
189
209
end
190
210
end
191
211
192
212
# Let Semantic Logger handle duration logging
193
213
module Middleware
194
214
module Server
195
215
class Logging
196
- def call ( worker , item , queue )
197
- worker . logger . info ( "Start #perform" )
198
- worker . logger . measure_info (
199
- "Completed #perform" ,
200
- on_exception_level : :error ,
201
- log_exception : :full ,
202
- metric : "Sidekiq/#{ worker . class . name } /perform"
203
- ) do
204
- yield
216
+ def call ( worker , item , queue , &block )
217
+ SemanticLogger . named_tags ( queue : queue ) do
218
+ worker . logger . info (
219
+ "Start #perform" ,
220
+ metric : "sidekiq.queue.latency" ,
221
+ metric_amount : job_latency_ms ( item )
222
+ )
223
+ worker . logger . measure_info (
224
+ "Completed #perform" ,
225
+ on_exception_level : :error ,
226
+ log_exception : :full ,
227
+ metric : "sidekiq.job.perform" ,
228
+ &block
229
+ )
205
230
end
206
231
end
232
+
233
+ def job_latency_ms ( job )
234
+ return unless job && job [ "enqueued_at" ]
235
+
236
+ ( Time . now . to_f - job [ "enqueued_at" ] ) * 1000
237
+ end
207
238
end
208
239
end
209
240
end
0 commit comments