Class | Delayed::Worker |
In: |
lib/delayed/worker.rb
|
Parent: | Object |
name_prefix | [RW] | name_prefix is ignored if name is set directly |
# File lib/delayed/worker.rb, line 32 32: def self.backend=(backend) 33: if backend.is_a? Symbol 34: require "delayed/serialization/#{backend}" 35: require "delayed/backend/#{backend}" 36: backend = "Delayed::Backend::#{backend.to_s.classify}::Job".constantize 37: end 38: @@backend = backend 39: silence_warnings { ::Delayed.const_set(:Job, backend) } 40: end
# File lib/delayed/worker.rb, line 42 42: def self.guess_backend 43: self.backend ||= :active_record if defined?(ActiveRecord) 44: end
# File lib/delayed/worker.rb, line 46 46: def initialize(options={}) 47: @quiet = options.has_key?(:quiet) ? options[:quiet] : true 48: self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority) 49: self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority) 50: self.class.sleep_delay = options[:sleep_delay] if options.has_key?(:sleep_delay) 51: end
# File lib/delayed/worker.rb, line 147 147: def failed(job) 148: job.hook(:failure) 149: if job.respond_to?(:on_permanent_failure) 150: warn "[DEPRECATION] The #on_permanent_failure hook has been renamed to #failure." 151: end 152: self.class.destroy_failed_jobs ? job.destroy : job.update_attributes(:failed_at => Delayed::Job.db_time_now) 153: end
# File lib/delayed/worker.rb, line 161 161: def max_attempts(job) 162: job.max_attempts || self.class.max_attempts 163: end
Every worker has a unique name which by default is the pid of the process. There are some advantages to overriding this with something which survives worker retarts: Workers can# safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.
# File lib/delayed/worker.rb, line 57 57: def name 58: return @name unless @name.nil? 59: "#{@name_prefix}host:#{Socket.gethostname} pid:#{Process.pid}" rescue "#{@name_prefix}pid:#{Process.pid}" 60: end
Reschedule the job in the future (when a job fails). Uses an exponential scale depending on the number of failed attempts.
# File lib/delayed/worker.rb, line 135 135: def reschedule(job, time = nil) 136: if (job.attempts += 1) < max_attempts(job) 137: time ||= job.reschedule_at 138: job.run_at = time 139: job.unlock 140: job.save! 141: else 142: say "PERMANENTLY removing #{job.name} because of #{job.attempts} consecutive failures.", Logger::INFO 143: failed(job) 144: end 145: end
# File lib/delayed/worker.rb, line 118 118: def run(job) 119: runtime = Benchmark.realtime do 120: Timeout.timeout(self.class.max_run_time.to_i) { job.invoke_job } 121: job.destroy 122: end 123: say "#{job.name} completed after %.4f" % runtime 124: return true # did work 125: rescue DeserializationError => error 126: job.last_error = "{#{error.message}\n#{error.backtrace.join('\n')}" 127: failed(job) 128: rescue Exception => error 129: handle_failed_job(job, error) 130: return false # work failed 131: end
# File lib/delayed/worker.rb, line 155 155: def say(text, level = Logger::INFO) 156: text = "[Worker(#{name})] #{text}" 157: puts text unless @quiet 158: logger.add level, "#{Time.now.strftime('%FT%T%z')}: #{text}" if logger 159: end
# File lib/delayed/worker.rb, line 68 68: def start 69: say "Starting job worker" 70: 71: trap('TERM') { say 'Exiting...'; $exit = true } 72: trap('INT') { say 'Exiting...'; $exit = true } 73: 74: loop do 75: result = nil 76: 77: realtime = Benchmark.realtime do 78: result = work_off 79: end 80: 81: count = result.sum 82: 83: break if $exit 84: 85: if count.zero? 86: sleep(self.class.sleep_delay) 87: else 88: say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last] 89: end 90: 91: break if $exit 92: end 93: 94: ensure 95: Delayed::Job.clear_locks!(name) 96: end
Do num jobs and return stats on success/failure. Exit early if interrupted.
# File lib/delayed/worker.rb, line 100 100: def work_off(num = 100) 101: success, failure = 0, 0 102: 103: num.times do 104: case reserve_and_run_one_job 105: when true 106: success += 1 107: when false 108: failure += 1 109: else 110: break # leave if no work could be done 111: end 112: break if $exit # leave if we're exiting 113: end 114: 115: return [success, failure] 116: end
# File lib/delayed/worker.rb, line 167 167: def handle_failed_job(job, error) 168: job.last_error = "{#{error.message}\n#{error.backtrace.join('\n')}" 169: say "#{job.name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts", Logger::ERROR 170: reschedule(job) 171: end