Class | Delayed::Worker |
In: |
lib/delayed/worker.rb
|
Parent: | Object |
name_prefix | [RW] | name_prefix is ignored if name is set directly |
# File lib/delayed/worker.rb, line 63 63: def self.after_fork 64: # Re-open file handles 65: @files_to_reopen.each do |file| 66: begin 67: file.reopen file.path, "a+" 68: file.sync = true 69: rescue ::Exception 70: end 71: end 72: 73: backend.after_fork 74: end
# File lib/delayed/worker.rb, line 38 38: def self.backend=(backend) 39: if backend.is_a? Symbol 40: require "delayed/serialization/#{backend}" 41: require "delayed/backend/#{backend}" 42: backend = "Delayed::Backend::#{backend.to_s.classify}::Job".constantize 43: end 44: @@backend = backend 45: silence_warnings { ::Delayed.const_set(:Job, backend) } 46: end
# File lib/delayed/worker.rb, line 52 52: def self.before_fork 53: unless @files_to_reopen 54: @files_to_reopen = [] 55: ObjectSpace.each_object(File) do |file| 56: @files_to_reopen << file unless file.closed? 57: end 58: end 59: 60: backend.before_fork 61: end
# File lib/delayed/worker.rb, line 48 48: def self.guess_backend 49: warn "[DEPRECATION] guess_backend is deprecated. Please remove it from your code." 50: end
# File lib/delayed/worker.rb, line 76 76: def self.lifecycle 77: @lifecycle ||= Delayed::Lifecycle.new 78: end
# File lib/delayed/worker.rb, line 80 80: def initialize(options={}) 81: @quiet = options.has_key?(:quiet) ? options[:quiet] : true 82: self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority) 83: self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority) 84: self.class.sleep_delay = options[:sleep_delay] if options.has_key?(:sleep_delay) 85: self.class.queues = options[:queues] if options.has_key?(:queues) 86: 87: self.plugins.each { |klass| klass.new } 88: end
# File lib/delayed/worker.rb, line 189 189: def failed(job) 190: self.class.lifecycle.run_callbacks(:failure, self, job) do 191: job.hook(:failure) 192: self.class.destroy_failed_jobs ? job.destroy : job.fail! 193: end 194: end
# File lib/delayed/worker.rb, line 202 202: def max_attempts(job) 203: job.max_attempts || self.class.max_attempts 204: end
Every worker has a unique name which by default is the pid of the process. There are some advantages to overriding this with something which survives worker retarts: Workers can# safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.
# File lib/delayed/worker.rb, line 94 94: def name 95: return @name unless @name.nil? 96: "#{@name_prefix}host:#{Socket.gethostname} pid:#{Process.pid}" rescue "#{@name_prefix}pid:#{Process.pid}" 97: end
Reschedule the job in the future (when a job fails). Uses an exponential scale depending on the number of failed attempts.
# File lib/delayed/worker.rb, line 177 177: def reschedule(job, time = nil) 178: if (job.attempts += 1) < max_attempts(job) 179: time ||= job.reschedule_at 180: job.run_at = time 181: job.unlock 182: job.save! 183: else 184: say "PERMANENTLY removing #{job.name} because of #{job.attempts} consecutive failures.", Logger::INFO 185: failed(job) 186: end 187: end
# File lib/delayed/worker.rb, line 160 160: def run(job) 161: runtime = Benchmark.realtime do 162: Timeout.timeout(self.class.max_run_time.to_i) { job.invoke_job } 163: job.destroy 164: end 165: say "#{job.name} completed after %.4f" % runtime 166: return true # did work 167: rescue DeserializationError => error 168: job.last_error = "{#{error.message}\n#{error.backtrace.join("\n")}" 169: failed(job) 170: rescue Exception => error 171: self.class.lifecycle.run_callbacks(:error, self, job){ handle_failed_job(job, error) } 172: return false # work failed 173: end
# File lib/delayed/worker.rb, line 196 196: def say(text, level = Logger::INFO) 197: text = "[Worker(#{name})] #{text}" 198: puts text unless @quiet 199: logger.add level, "#{Time.now.strftime('%FT%T%z')}: #{text}" if logger 200: end
# File lib/delayed/worker.rb, line 105 105: def start 106: trap('TERM') { say 'Exiting...'; stop } 107: trap('INT') { say 'Exiting...'; stop } 108: 109: say "Starting job worker" 110: 111: self.class.lifecycle.run_callbacks(:execute, self) do 112: loop do 113: self.class.lifecycle.run_callbacks(:loop, self) do 114: result = nil 115: 116: realtime = Benchmark.realtime do 117: result = work_off 118: end 119: 120: count = result.sum 121: 122: break if @exit 123: 124: if count.zero? 125: sleep(self.class.sleep_delay) 126: else 127: say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last] 128: end 129: end 130: 131: break if @exit 132: end 133: end 134: end
Do num jobs and return stats on success/failure. Exit early if interrupted.
# File lib/delayed/worker.rb, line 142 142: def work_off(num = 100) 143: success, failure = 0, 0 144: 145: num.times do 146: case reserve_and_run_one_job 147: when true 148: success += 1 149: when false 150: failure += 1 151: else 152: break # leave if no work could be done 153: end 154: break if $exit # leave if we're exiting 155: end 156: 157: return [success, failure] 158: end
# File lib/delayed/worker.rb, line 208 208: def handle_failed_job(job, error) 209: job.last_error = "{#{error.message}\n#{error.backtrace.join("\n")}" 210: say "#{job.name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts", Logger::ERROR 211: reschedule(job) 212: end