Sidekiq Github wiki 中文文档
API
commit信息:Add information about display_class and display_args helper methods | 提交者:j15e | 提交时间:2020-09-09 | 版本:7d17aa9
Sidekiq有一个公开的API,允许你实时获取有关workers,队列和作业的信息。参阅sidekiq/api
来获取RDoc文档。请求API来取得所有以下描述的功能。
require 'sidekiq/api'
Web UI只使用了该API - 你在UI中可以做的任何事情,都可以使用API编写脚本。
队列
获取所有队列
Sidekiq::Queue.all
获取一个队列
Sidekiq::Queue.new # 得到 'default' 队列
Sidekiq::Queue.new("mailer")
获取一个队列中作业的数量。
Sidekiq::Queue.new.size # => 4
通过删除一个队列来删除该队列中所有的作业。
Sidekiq::Queue.new.clear
删除mailer
队列中jid
为'abcdef1234567890'
的作业
queue = Sidekiq::Queue.new("mailer")
queue.each do |job|
job.klass # => 'MyWorker'
job.args # => [1, 2, 3]
# 如果你的作业由ActiveJob dapter或者其他扩展程序包装,请另参见display_class和display_args方法。
# 只获取作业原本的类名和参数
job.delete if job.jid == 'abcdef1234567890'
end
注意: 检索进程容易发生条件竞争。例如,如果在Sidekiq::Queue.new.each { |job| puts job.id }
执行的同时其它进程正在更新该队列,这样或许会跳过一些作业或者多次输出同一个作业。当改变共享数据结构(即Redis中的队列)并且没有使用锁定功能时,可能发生这种状况。
计算一个队列的延迟(单位:秒)(当前时间 - 最老的入队作业的入队时间)
> Sidekiq::Queue.new.latency
14.5
使用JID查找一个作业(警告: 如果你有一个很大的队列,这将非常低效!)
> Sidekiq::Queue.new.find_job(somejid)
命名队列
计划的
计划的排序集按时间顺序排列所有计划的作业。在源码中有更多的信息, 查看详情,请参考 sidekiq/api
。
ss = Sidekiq::ScheduledSet.new
ss.size
ss.clear
可以枚举Sidekiq中的计划作业。通过该功能,你可以搜索/过滤 作业。下面是一个例子,我搜索一个特定类型的所有的作业,然后把它们从计划队列中删除(效率低下)。
ss = Sidekiq::ScheduledSet.new
# 使用 `scan` 在Redis中过滤元素,这样更快
# 这将返回与"* SomeWorker*"匹配的所有的作业负载
jobs = ss.scan("SomeWorker").select {|retri| retri.klass == 'SomeWorker' }
# 把所有的东西推都送给Ruby,这样是更慢的。
jobs = ss.select {|retri| retri.klass == 'SomeWorker' }
jobs.each(&:delete)
使用jid和 时间戳(epoch)来删除一个作业
time = Time.zone.now.to_f
jid = '80b1e7e46381a20c0c567285'
Sidekiq::ScheduledSet.new.delete_by_jid(time, jid)
重试
当一个作业引发一个错误时,Sidekiq将把它放入RetrySet中,以便在之后可以自动重试它。sidekiq会根据它们下次重试的时间来排序这些作业。
rs = Sidekiq::RetrySet.new
rs.size
rs.clear
可以在Sidekiq中枚举重试的作业。通过该功能,你可以 搜索/过滤 作业。下面是一个例子: 我搜索了一个特定类型的所有的作业,然后把它们从重试队列中删除(效率低下)。
query = Sidekiq::RetrySet.new
query.select do |job|
job.klass == 'Sidekiq::Extensions::DelayedClass' &&
# For Sidekiq::Extensions (e.g., Foo.delay.bar(*args)),
# the context is serialized to YAML, and must
# be deserialized to get to the original args
((klass, method, args) = YAML.load(job.args[0])) &&
klass == User &&
method == :setup_new_subscriber
end.map(&:delete)
# 一个更好的方法是使用Redis 6.0的`scan`进行预过滤
query.scan("setup_new_subscriber").select do |job|
job.klass == 'Sidekiq::Extensions::DelayedClass' &&
# For Sidekiq::Extensions (e.g., Foo.delay.bar(*args)),
# the context is serialized to YAML, and must
# be deserialized to get to the original args
((klass, method, args) = YAML.load(job.args[0])) &&
klass == User &&
method == :setup_new_subscriber
end.map(&:delete)
Dead
像‘重试’和‘计划’一样,这个DeadSet存放着所有被Sidekiq判定为死亡的作业,并且以它们的死亡时间排列。它也有像‘重试’和‘计划’那样的基础操作。
ds = Sidekiq::DeadSet.new
ds.size
ds.clear
或许你只是部署了一个拖了很久的补丁?迭代DeadSet,查找一些特定特征的作业,然后重试它们。
ds.select do |job|
job.klass == 'FixedWorker' && job.args[0] == 123
end.map(&:retry)
Scan
Sidekiq 6.0 添加了使用glob模式来扫描命名集的支持。允许你在Redis中过滤出无关的作业,比在Ruby中做这要快很多。它在原生JSON负载执行扫描操作,因此你需要确保你使用了一个精准的模式:
ss = Sidekiq::ScheduledSet.new
# 匹配给定类名的所有作业
ss.scan("\"class\":\"HardWorker\"") { |job| ... }
# 简洁,但稍微不同: "HardWorker"可能出现在作业的参数中或者其它元素中
ss.scan("HardWorker") { |job| ... }
如果你不传递块,扫描将返回一个枚举器。
参阅 Redis 文档来详细了解glob模式匹配 - 不支持完整的正则表达式。For convenience Sidekiq wraps your pattern with a splat (*
) on either side unless you include a *
, in which case you must wrap the string yourself if you need it.
Processes
Sidekiq::ProcessSet
使你可以获取当前运行Sidekiq进程的近实时(每5秒更新)设置信息。你也可以远程控制这些进程:
ps = Sidekiq::ProcessSet.new
ps.size # => 2
ps.each do |process|
p process['busy'] # => 3
p process['hostname'] # => 'myhost.local'
p process['pid'] # => 16131
end
ps.each(&:quiet!) # equivalent to the TSTP signal (USR1 for version < 5)
ps.each(&:stop!) # equivalent to the TERM signal
在不支持信号的情况下,远程控制很有用:例如使用Windows,JRuby和JVM或Heroku时。
Workers
以编程方式访问所有Sidekiq进程的当前活动worker集合。一个'worker'被定义为一个正在执行作业的线程。
注意:这个数据被每5秒异步更新。它不是毫秒精度的。
workers = Sidekiq::Workers.new
workers.size # => 2
workers.each do |process_id, thread_id, work|
# process_id 是每个进程的唯一标识
# thread_id是每个线程的唯一标识
# work看起来像下面的Hash
# { 'queue' => name, 'run_at' => timestamp, 'payload' => msg }
# run_at is an epoch Integer.
# payload 看起来像下面的Hash:
# { 'retry' => true,
# 'queue' => 'default',
# 'class' => 'Redacted',
# 'args' => [1, 2, 'foo'],
# 'jid' => '80b1e7e46381a20c0c567285',
# 'enqueued_at' => 1427811033.2067106 }
end
统计信息
你安装的Sidekiq的各种相关统计信息。
stats = Sidekiq::Stats.new
stats.processed # => 100
stats.failed # => 3
stats.queues # => { "default" => 1001, "email" => 50 }
获取所有队列中入队的作业数(不包括重试和计划作业)。
stats.enqueued # => 5
统计历史
所有日期均为UTC,并且历史数据在5年后会被清除。
获取 ‘失败/处理过’ 的统计信息历史记录:
s = Sidekiq::Stats::History.new(2) # Indicates how many days of data you want starting from today (UTC)
s.failed # => { "2012-12-05" => 120, "2012-12-04" => 234 }
s.processed # => { "2012-12-05" => 1010, "2012-12-04" => 1500 }
从其它日期开始:
s = Sidekiq::Stats::History.new( 3, Date.parse("2012-12-3") )
s.failed # => { "2012-12-03" => 10, "2012-12-02" => 24, "2012-12-01" => 4 }
s.processed # => { "2012-12-03" => 124, "2012-12-02" => 345, "2012-12-01" => 355 }
备注
- API提供的一些不可扩展的操作,因此不应该以自动化方式使用,或者在你的应用功能里大量使用。
- 在API的内部,sidekiq 没有加锁 地操作Redis中共享的易变的数据结构。这意味着通过该结构进行的任何迭代都是尽力而为的,并且不能保证结果。例如:如果你调用
Sidekiq::Queue#find_job(jid)
,如果队列在迭代时改变了,该调用可能会错过一个作业。 - 大多数小型系统可能不会注意到这些竞争的条件,但是一个大型的,繁忙的系统将会出现这种情况。除非出现了问题,并且你需要手动修复数据,否则你不应扫面‘队列/数据集’ 并删除作业。