浩文 联系 关于本站 登录 注册
  • 关注新浪微博:
  • 关注微信公众号:

Sidekiq Github wiki 中文文档

API

commit信息:Add information about display_class and display_args helper methods | 提交者:j15e | 提交时间:2020-09-09 | 版本:7d17aa9

Sidekiq有一个公开的API,允许你实时获取有关workers,队列和作业的信息。参阅sidekiq/api来获取RDoc文档。请求API来取得所有以下描述的功能。

require 'sidekiq/api'

Web UI只使用了该API - 你在UI中可以做的任何事情,都可以使用API编写脚本

队列

获取所有队列

Sidekiq::Queue.all

获取一个队列

Sidekiq::Queue.new # 得到 'default' 队列
Sidekiq::Queue.new("mailer")

获取一个队列中作业的数量。

Sidekiq::Queue.new.size # => 4

通过删除一个队列来删除该队列中所有的作业。

Sidekiq::Queue.new.clear

删除mailer队列中jid'abcdef1234567890'的作业

queue = Sidekiq::Queue.new("mailer")
queue.each do |job|
  job.klass # => 'MyWorker'
  job.args # => [1, 2, 3]
  # 如果你的作业由ActiveJob dapter或者其他扩展程序包装,请另参见display_class和display_args方法。
  # 只获取作业原本的类名和参数
  job.delete if job.jid == 'abcdef1234567890'
end

注意: 检索进程容易发生条件竞争。例如,如果在Sidekiq::Queue.new.each { |job| puts job.id }执行的同时其它进程正在更新该队列,这样或许会跳过一些作业或者多次输出同一个作业。当改变共享数据结构(即Redis中的队列)并且没有使用锁定功能时,可能发生这种状况。

计算一个队列的延迟(单位:秒)(当前时间 - 最老的入队作业的入队时间)

> Sidekiq::Queue.new.latency
14.5

使用JID查找一个作业(警告: 如果你有一个很大的队列,这将非常低效!)

> Sidekiq::Queue.new.find_job(somejid)

命名队列

计划的

计划的排序集按时间顺序排列所有计划的作业。在源码中有更多的信息, 查看详情,请参考 sidekiq/api

ss = Sidekiq::ScheduledSet.new
ss.size
ss.clear

可以枚举Sidekiq中的计划作业。通过该功能,你可以搜索/过滤 作业。下面是一个例子,我搜索一个特定类型的所有的作业,然后把它们从计划队列中删除(效率低下)。

ss = Sidekiq::ScheduledSet.new
# 使用 `scan` 在Redis中过滤元素,这样更快
# 这将返回与"* SomeWorker*"匹配的所有的作业负载
jobs = ss.scan("SomeWorker").select {|retri| retri.klass == 'SomeWorker' }
# 把所有的东西推都送给Ruby,这样是更慢的。
jobs = ss.select {|retri| retri.klass == 'SomeWorker' }
jobs.each(&:delete)

使用jid和 时间戳(epoch)来删除一个作业

time = Time.zone.now.to_f
jid = '80b1e7e46381a20c0c567285'
Sidekiq::ScheduledSet.new.delete_by_jid(time, jid)

重试

当一个作业引发一个错误时,Sidekiq将把它放入RetrySet中,以便在之后可以自动重试它。sidekiq会根据它们下次重试的时间来排序这些作业。

rs = Sidekiq::RetrySet.new
rs.size
rs.clear

可以在Sidekiq中枚举重试的作业。通过该功能,你可以 搜索/过滤 作业。下面是一个例子: 我搜索了一个特定类型的所有的作业,然后把它们从重试队列中删除(效率低下)。

query = Sidekiq::RetrySet.new
query.select do |job|
  job.klass == 'Sidekiq::Extensions::DelayedClass' &&
    # For Sidekiq::Extensions (e.g., Foo.delay.bar(*args)),
    # the context is serialized to YAML, and must
    # be deserialized to get to the original args
    ((klass, method, args) = YAML.load(job.args[0])) &&
    klass == User &&
    method == :setup_new_subscriber
end.map(&:delete)

# 一个更好的方法是使用Redis 6.0的`scan`进行预过滤
query.scan("setup_new_subscriber").select do |job|
  job.klass == 'Sidekiq::Extensions::DelayedClass' &&
    # For Sidekiq::Extensions (e.g., Foo.delay.bar(*args)),
    # the context is serialized to YAML, and must
    # be deserialized to get to the original args
    ((klass, method, args) = YAML.load(job.args[0])) &&
    klass == User &&
    method == :setup_new_subscriber
end.map(&:delete)

Dead

像‘重试’和‘计划’一样,这个DeadSet存放着所有被Sidekiq判定为死亡的作业,并且以它们的死亡时间排列。它也有像‘重试’和‘计划’那样的基础操作。

ds = Sidekiq::DeadSet.new
ds.size
ds.clear

或许你只是部署了一个拖了很久的补丁?迭代DeadSet,查找一些特定特征的作业,然后重试它们。

ds.select do |job|
  job.klass == 'FixedWorker' && job.args[0] == 123
end.map(&:retry)

Scan

Sidekiq 6.0 添加了使用glob模式来扫描命名集的支持。允许你在Redis中过滤出无关的作业,比在Ruby中做这要快很多。它在原生JSON负载执行扫描操作,因此你需要确保你使用了一个精准的模式:

ss = Sidekiq::ScheduledSet.new
# 匹配给定类名的所有作业
ss.scan("\"class\":\"HardWorker\"") { |job| ... }

# 简洁,但稍微不同: "HardWorker"可能出现在作业的参数中或者其它元素中
ss.scan("HardWorker") { |job| ... }

如果你不传递块,扫描将返回一个枚举器。

参阅 Redis 文档来详细了解glob模式匹配 - 不支持完整的正则表达式。For convenience Sidekiq wraps your pattern with a splat (*) on either side unless you include a *, in which case you must wrap the string yourself if you need it.

Processes

Sidekiq::ProcessSet使你可以获取当前运行Sidekiq进程的近实时(每5秒更新)设置信息。你也可以远程控制这些进程:

ps = Sidekiq::ProcessSet.new
ps.size # => 2
ps.each do |process|
  p process['busy']     # => 3
  p process['hostname'] # => 'myhost.local'
  p process['pid']      # => 16131
end
ps.each(&:quiet!) # equivalent to the TSTP signal (USR1 for version < 5)
ps.each(&:stop!) # equivalent to the TERM signal

在不支持信号的情况下,远程控制很有用:例如使用Windows,JRuby和JVM或Heroku时。

Workers

以编程方式访问所有Sidekiq进程的当前活动worker集合。一个'worker'被定义为一个正在执行作业的线程。

注意:这个数据被每5秒异步更新。它不是毫秒精度的。

workers = Sidekiq::Workers.new
workers.size # => 2
workers.each do |process_id, thread_id, work|
  # process_id 是每个进程的唯一标识
  # thread_id是每个线程的唯一标识
  # work看起来像下面的Hash
  # { 'queue' => name, 'run_at' => timestamp, 'payload' => msg }
  # run_at is an epoch Integer.
  # payload 看起来像下面的Hash:
  # { 'retry' => true,
  #   'queue' => 'default',
  #   'class' => 'Redacted',
  #   'args' => [1, 2, 'foo'],
  #   'jid' => '80b1e7e46381a20c0c567285',
  #   'enqueued_at' => 1427811033.2067106 }
end

统计信息

你安装的Sidekiq的各种相关统计信息。

stats = Sidekiq::Stats.new
stats.processed # => 100
stats.failed # => 3
stats.queues # => { "default" => 1001, "email" => 50 }

获取所有队列中入队的作业数(不包括重试和计划作业)。

stats.enqueued # => 5 

统计历史

所有日期均为UTC,并且历史数据在5年后会被清除。

获取 ‘失败/处理过’ 的统计信息历史记录:

s = Sidekiq::Stats::History.new(2) # Indicates how many days of data you want starting from today (UTC)
s.failed # => { "2012-12-05" => 120, "2012-12-04" => 234 }
s.processed # => { "2012-12-05" => 1010, "2012-12-04" => 1500 }

从其它日期开始:

s = Sidekiq::Stats::History.new( 3, Date.parse("2012-12-3") )
s.failed # => { "2012-12-03" => 10, "2012-12-02" => 24, "2012-12-01" => 4 }
s.processed # => { "2012-12-03" => 124, "2012-12-02" => 345, "2012-12-01" => 355 }

备注

上一篇: 监视 下一篇: 中间件