@@ -33,18 +33,26 @@ module Sidekiq
33
33
if defined? ( ::Sidekiq ::JobLogger )
34
34
# Let Semantic Logger handle duration logging
35
35
class JobLogger
36
- def call ( item , queue )
36
+ def call ( item , queue , & block )
37
37
klass = item [ "wrapped" ] || item [ "class" ]
38
- metric = "Sidekiq/#{ klass } /perform" if klass
39
38
logger = klass ? SemanticLogger [ klass ] : Sidekiq . logger
40
- logger . info ( "Start #perform" )
41
- logger . measure_info (
42
- "Completed #perform" ,
43
- on_exception_level : :error ,
44
- log_exception : :full ,
45
- metric : metric
46
- ) do
47
- yield
39
+
40
+ SemanticLogger . tagged ( queue : queue ) do
41
+ # Latency is the time between when the job was enqueued and when it started executing.
42
+ logger . info (
43
+ "Start #perform" ,
44
+ metric : "sidekiq.queue.latency" ,
45
+ metric_amount : job_latency_ms ( item )
46
+ )
47
+
48
+ # Measure the duration of running the job
49
+ logger . measure_info (
50
+ "Completed #perform" ,
51
+ on_exception_level : :error ,
52
+ log_exception : :full ,
53
+ metric : "sidekiq.job.perform" ,
54
+ &block
55
+ )
48
56
end
49
57
end
50
58
@@ -60,14 +68,18 @@ def prepare(job_hash, &block)
60
68
end
61
69
62
70
def job_hash_context ( job_hash )
63
- h = {
64
- class : job_hash [ "display_class" ] || job_hash [ "wrapped" ] || job_hash [ "class" ] ,
65
- jid : job_hash [ "jid" ]
66
- }
67
- h [ :bid ] = job_hash [ "bid" ] if job_hash [ "bid" ]
68
- h [ :tags ] = job_hash [ "tags" ] if job_hash [ "tags" ]
71
+ h = { jid : job_hash [ "jid" ] }
72
+ h [ :bid ] = job_hash [ "bid" ] if job_hash [ "bid" ]
73
+ h [ :tags ] = job_hash [ "tags" ] if job_hash [ "tags" ]
74
+ h [ :queue ] = job_hash [ "queue" ] if job_hash [ "queue" ]
69
75
h
70
76
end
77
+
78
+ def job_latency_ms ( job )
79
+ return unless job && job [ "enqueued_at" ]
80
+
81
+ ( Time . now . to_f - job [ "enqueued_at" ] . to_f ) * 1000
82
+ end
71
83
end
72
84
end
73
85
@@ -80,48 +92,47 @@ def self.with_context(msg, &block)
80
92
end
81
93
82
94
def self . job_hash_context ( job_hash )
83
- klass = job_hash [ "wrapped" ] || job_hash [ "class" ]
84
- event = { class : klass , jid : job_hash [ "jid" ] }
85
- event [ :bid ] = job_hash [ "bid " ] if job_hash [ "bid " ]
86
- event
95
+ h = { jid : job_hash [ "jid" ] }
96
+ h [ :bid ] = job_hash [ "bid" ] if job_hash [ "bid" ]
97
+ h [ :queue ] = job_hash [ "queue " ] if job_hash [ "queue " ]
98
+ h
87
99
end
88
100
end
89
101
end
90
102
91
103
# Exception is already logged by Semantic Logger during the perform call
92
- # Sidekiq <= v6.5
93
104
if defined? ( ::Sidekiq ::ExceptionHandler )
105
+ # Sidekiq <= v6.5
94
106
module ExceptionHandler
95
107
class Logger
96
- def call ( ex , ctx )
97
- unless ctx . empty?
98
- job_hash = ctx [ :job ] || { }
99
- klass = job_hash [ "display_class" ] || job_hash [ "wrapped" ] || job_hash [ "class" ]
100
- logger = klass ? SemanticLogger [ klass ] : Sidekiq . logger
101
- ctx [ :context ] ? logger . warn ( ctx [ :context ] , ctx ) : logger . warn ( ctx )
102
- end
108
+ def call ( _exception , ctx )
109
+ return if ctx . empty?
110
+
111
+ job_hash = ctx [ :job ] || { }
112
+ klass = job_hash [ "display_class" ] || job_hash [ "wrapped" ] || job_hash [ "class" ]
113
+ logger = klass ? SemanticLogger [ klass ] : Sidekiq . logger
114
+ ctx [ :context ] ? logger . warn ( ctx [ :context ] , ctx ) : logger . warn ( ctx )
103
115
end
104
116
end
105
117
end
106
- # Sidekiq >= v7
107
118
elsif defined? ( ::Sidekiq ::Config )
119
+ # Sidekiq >= v7
108
120
class Config
109
121
remove_const :ERROR_HANDLER
110
122
111
- ERROR_HANDLER = -> ( ex , ctx , cfg = Sidekiq . default_configuration ) {
123
+ ERROR_HANDLER = -> ( ex , ctx , cfg = Sidekiq . default_configuration ) do
112
124
unless ctx . empty?
113
125
job_hash = ctx [ :job ] || { }
114
- klass = job_hash [ "display_class" ] || job_hash [ "wrapped" ] || job_hash [ "class" ]
115
- logger = klass ? SemanticLogger [ klass ] : Sidekiq . logger
126
+ klass = job_hash [ "display_class" ] || job_hash [ "wrapped" ] || job_hash [ "class" ]
127
+ logger = klass ? SemanticLogger [ klass ] : Sidekiq . logger
116
128
ctx [ :context ] ? logger . warn ( ctx [ :context ] , ctx ) : logger . warn ( ctx )
117
129
end
118
- }
130
+ end
119
131
end
120
132
else
121
133
# Sidekiq >= 6.5
122
- # TODO: Not taking effect. See test/sidekiq_test.rb
123
- def self . default_error_handler ( ex , ctx )
124
- binding . irb
134
+ Sidekiq . error_handlers . delete ( Sidekiq ::DEFAULT_ERROR_HANDLER )
135
+ Sidekiq . error_handlers << -> ( ex , ctx ) do
125
136
unless ctx . empty?
126
137
job_hash = ctx [ :job ] || { }
127
138
klass = job_hash [ "display_class" ] || job_hash [ "wrapped" ] || job_hash [ "class" ]
@@ -132,10 +143,13 @@ def self.default_error_handler(ex, ctx)
132
143
end
133
144
134
145
# Logging within each worker should use its own logger
135
- if Sidekiq ::VERSION . to_i == 4
146
+ case Sidekiq ::VERSION . to_i
147
+ when 4
136
148
module Worker
137
149
def self . included ( base )
138
- raise ArgumentError , "You cannot include Sidekiq::Worker in an ActiveJob: #{ base . name } " if base . ancestors . any? { |c | c . name == "ActiveJob::Base" }
150
+ if base . ancestors . any? { |c | c . name == "ActiveJob::Base" }
151
+ raise ArgumentError , "You cannot include Sidekiq::Worker in an ActiveJob: #{ base . name } "
152
+ end
139
153
140
154
base . extend ( ClassMethods )
141
155
base . include ( SemanticLogger ::Loggable )
@@ -144,10 +158,12 @@ def self.included(base)
144
158
base . class_attribute :sidekiq_retries_exhausted_block
145
159
end
146
160
end
147
- elsif Sidekiq :: VERSION . to_i == 5
161
+ when 5
148
162
module Worker
149
163
def self . included ( base )
150
- raise ArgumentError , "You cannot include Sidekiq::Worker in an ActiveJob: #{ base . name } " if base . ancestors . any? { |c | c . name == "ActiveJob::Base" }
164
+ if base . ancestors . any? { |c | c . name == "ActiveJob::Base" }
165
+ raise ArgumentError , "You cannot include Sidekiq::Worker in an ActiveJob: #{ base . name } "
166
+ end
151
167
152
168
base . extend ( ClassMethods )
153
169
base . include ( SemanticLogger ::Loggable )
@@ -156,10 +172,12 @@ def self.included(base)
156
172
base . sidekiq_class_attribute :sidekiq_retries_exhausted_block
157
173
end
158
174
end
159
- elsif Sidekiq :: VERSION . to_i == 6
175
+ when 6
160
176
module Worker
161
177
def self . included ( base )
162
- raise ArgumentError , "Sidekiq::Worker cannot be included in an ActiveJob: #{ base . name } " if base . ancestors . any? { |c | c . name == "ActiveJob::Base" }
178
+ if base . ancestors . any? { |c | c . name == "ActiveJob::Base" }
179
+ raise ArgumentError , "Sidekiq::Worker cannot be included in an ActiveJob: #{ base . name } "
180
+ end
163
181
164
182
base . include ( Options )
165
183
base . extend ( ClassMethods )
@@ -169,7 +187,9 @@ def self.included(base)
169
187
else
170
188
module Job
171
189
def self . included ( base )
172
- raise ArgumentError , "Sidekiq::Job cannot be included in an ActiveJob: #{ base . name } " if base . ancestors . any? { |c | c . name == "ActiveJob::Base" }
190
+ if base . ancestors . any? { |c | c . name == "ActiveJob::Base" }
191
+ raise ArgumentError , "Sidekiq::Job cannot be included in an ActiveJob: #{ base . name } "
192
+ end
173
193
174
194
base . include ( Options )
175
195
base . extend ( ClassMethods )
@@ -178,14 +198,15 @@ def self.included(base)
178
198
end
179
199
end
180
200
181
- if Sidekiq ::VERSION . to_i == 4
201
+ if defined? ( ::Sidekiq ::Middleware ::Server ::Logging )
202
+ # Sidekiq v4
182
203
# Convert string to machine readable format
183
204
class Processor
184
205
def log_context ( job_hash )
185
- klass = job_hash [ "wrapped" ] || job_hash [ "class" ]
186
- event = { class : klass , jid : job_hash [ "jid" ] }
187
- event [ :bid ] = job_hash [ "bid " ] if job_hash [ "bid " ]
188
- event
206
+ h = { jid : job_hash [ "jid" ] }
207
+ h [ :bid ] = job_hash [ "bid" ] if job_hash [ "bid" ]
208
+ h [ :queue ] = job_hash [ "queue " ] if job_hash [ "queue " ]
209
+ h
189
210
end
190
211
end
191
212
@@ -194,16 +215,26 @@ module Middleware
194
215
module Server
195
216
class Logging
196
217
def call ( worker , item , queue )
197
- worker . logger . info ( "Start #perform" )
198
- worker . logger . measure_info (
199
- "Completed #perform" ,
200
- on_exception_level : :error ,
201
- log_exception : :full ,
202
- metric : "Sidekiq/#{ worker . class . name } /perform"
203
- ) do
204
- yield
218
+ SemanticLogger . tagged ( queue : queue ) do
219
+ worker . logger . info (
220
+ "Start #perform" ,
221
+ metric : "sidekiq.queue.latency" ,
222
+ metric_amount : job_latency_ms ( item )
223
+ )
224
+ worker . logger . measure_info (
225
+ "Completed #perform" ,
226
+ on_exception_level : :error ,
227
+ log_exception : :full ,
228
+ metric : "sidekiq.job.perform"
229
+ ) { yield }
205
230
end
206
231
end
232
+
233
+ def job_latency_ms ( job )
234
+ return unless job && job [ "enqueued_at" ]
235
+
236
+ ( Time . now . to_f - job [ "enqueued_at" ] . to_f ) * 1000
237
+ end
207
238
end
208
239
end
209
240
end
0 commit comments