Beanstalkd 学习研究
1. Beanstalkd 介绍
Beanstalkd是一个简单、高效的工作队列系统,其最初设计目的是通过后台异步执行耗时任务方式降低高容量Web应用的页面延时。而其简单、轻量、易用等特点,和对任务优先级(priority)、任务延时(delay)、任务超时重发(time-to-run)和任务预留(buired)等控制,以及众多语言版本的客户端的良好支持,使其能够很好的支持分布式的后台任务和定时任务处理。
beanstalkd还提供了binlog机制,当重启beanstalkd,当前任务的状态能够从记录的本地binlog中恢复。
2. Beanstalkd 中的重要概念
2.1 核心概念

Beanstalkd 使用 Producer-Consumer设计模式,无论是其协议结构还是使用方式都是类似Memcached风格的。以下是Beanstalkd设计思想中核心概念:
job - 任务
job是一个需要异步处理的处理,是 Beanstalkd中的基本单元,每个job都会有一个id和优先级,job需要放在一个tube中。 Beanstalkd中的任务(job)类似于其消息队列中的消息(message)的概念。
tube - 管道
管道即某一种类型的任务队列,其类似于消息的主题(topic),是Producer和Consumer的操作对象。一个Beanstalkd中可以有多个管道,每个管道都有自己的生产者(Producer)和消费者(Consumer),管道之间互相不影响。
producer - 生产者
任务(job)的生产者,通过put命令来将一个job放到一个tube中。
consumer - 消费者
任务(job)的消费者,通过reserve来获取job,通过delete、release、bury来改变job的状态。
2.2 任务生命周期
Beanstalkd中的任务(job)替代了消息(message)的概念,任务会有一系列状态。任务的生命周期如下:

一个 Beanstalkd任务可能会包含以下状态:
- READY - 需要立即处理的任务。当
producter直接put一个任务时,任务就处理READY状态,以等待consumer来处理。当延时(DELAYED)任务到期后会自动成为当前READY状态的任务 - RESERVED - 已经被消费者获取,正在执行的任务。当
consumer获取了当前READY的任务后,该任务的状态就会迁移到RESERVED状态,这时其它的consumer就不能再操作该任务。Beanstalkd会检查任务是否在TTR(time-to-run)内完成 - DELETED - 消息被删除,
Beanstalkd不再维持这些消息。即任务生命周期结束 - DELAYED - 延迟执行的任务。当任务被延时
put时,任务就处理DELAYED状态。等待时间过后,任务会被迁移到READY状态。当消费者处理任务后,可以将任务再次放回DELAYED队列延迟执行 - BURIED - 埋葬的任务,这时任务不会被执行,也不会消失。当
consumer完成该任务后,可以选择delete或release或bury操作delete后,任务被删除,生命周期结束release操作可以把任务状态迁移回READY状态或DELAYED状态,使其它consumer可以继续获取和执行该任务bury操作会埋葬任务,等需要该任务时,再将埋葬的任务kick回READY,也可以通过delete删除BURIED状态的任务
 
2.3 Beanstalkd特点
- 任务优先级(priority)
 
任务(job)可以有0~2^32个优先级,0表示优先级最高。Beanstalkd采用最大最小堆(Minx-max heap)处理任务优先级排序,任何时刻调用reverse命令的消费者总是能拿到当前优先级最高的任务,时间复杂度为O(logn)
- 任务延时(delay)
 
Beanstalkd中可以通过两种方式延时执行任务:生产者发布任务时指定延时;或者当任务处理完毕后,消费者再次将任务放入队列延时执行(release with delay)。这种机制可以实现分布式定时任务,这种任务机制的优势是:如果某个消费者节点故障,任务超时重发(time-to-run)以保证任务转移到其它节点执行
- 任务超时重发(time-to-run)
 
Beanstalkd把任务返回给消费者后,消费者必须在预设的TTR(time-to-run)时间内发送delete、或release、或bury命令改变任务的状态;否则Beanstalkd会认为任务处理失败,然后把任务交给另外的消费者节点执行。如果消费者预计在TTR时间内无法完成任务,可以发送touch命令,以使Beanstalkd重新计算TTR
- 任务预留(buried)
 
当RESERVED状态的任务因为某些原因无法执行时,消费者可以使用bury命令将其设置为buried状态,这时Beanstalkd会继续保留这些任务。在具备任务执行条件时,再通过kick将任务迁移回READY状态
3. beanstalkd安装使用
Beanstalkd分为服务端和客户端两部分。可以在其官网查找相关安装包及安装方法:
- 服务端:http://kr.github.io/beanstalkd/download.html
 - 客户端:https://github.com/kr/beanstalkd/wiki/client-libraries
 
3.1 服务端
源码安装
下载、解压并进入源码目录后,执行make或make install命令即可:
$ sudo make
// 或
$ sudo make install
// 或
$ sudo make install PERFIX=/usr/bin/beanstalkd安装包安装
在Unbuntu或Debian系统中,可以使用以下命令安装:
$ sudo apt-get install beanstalkd在CentOS或RHEL系统中,首先需要更新EPEL源,然后再使用yum命令安装。
小提示:更多关于EPEL的知识可以查阅下面的资料:
在RHEL6/CentOS6中使用以下命令更新源:
$ su -c 'rpm -Uvh http://download.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm'在RHEL7中:
$ su -c 'rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-9.noarch.rpm'检查EPEL源是否更新成功:
$ yum repolist enabled | grep epel
 * epel: mirrors.yun-idc.com
epel                  Extra Packages for Enterprise Linux 6 - i386        10,254执行安装:
$ yum install -y beanstalkd加入开启自启动:
$ chkconfig beanstalkd  on添加用户组:
$ groupadd beanstalkd添加用户:
$ useradd -M -g beanstalkd -s /sbin/nologin beanstalkd创建binlog存放目录并修改所有者/所属组(有权限写入):
$ mkdir -p /data/beanstalkd/binlog/
$ chown -R beanstalkd:beanstalkd  /data/beanstalkd修改配置文件中存放binlog的目录:
$ vi /etc/sysconfig/beanstalkdBEANSTALKD_BINLOG_DIR=/data/beanstalkd/binlog
运行beanstalkd
Beanstalkd安装后,就可以通过beanstalkd命令来启动或配置Beanstalkd。该命令的使用格式如下:
beanstalkd [OPTIONS]可选[OPTIONS]参数有:
- -b DIR - wal目录(开启binlog,断电重启后会自动恢复任务)
 - -f MS - 指定MS毫秒内的 fsync (-f0 为”always fsync”)
 - -F - 从不 fsync (默认)
 - -l ADDR - 指定监听地址(默认为:0.0.0.0)
 - -p PORT - 指定监听端口(默认为:11300)
 - -u USER - 用户与用户组
 - -z BYTE - 最大的任务大小(默认为:65535)
 - -s BYTE - 每个wal文件的大小(默认为:10485760)
 - -c - 压缩binlog(默认)
 - -n - 不压缩binlog
 - -v - 显示版本信息
 - -h - 显示帮助
 
我们使用nohup和&来配合启动程序,这样能免疫 Ctrl-C发送的SIGINT信号和关闭session发送的SIGHUP信号
$ nohup beanstalkd -l 0.0.0.0 -p 11300 -b /data/beanstalkd/binlog/ -u beanstalkd &3.2 客户端
客户端包含了Beanstalkd设计概念中的任务生产者(Producer)和消费者(Consumer)。Beanstalkd有很多语言版本客户端的实现,点击Beanstalkd 客户端查找自已所需要的版本,如果都不能满足需要,还可以根据Beanstalkd 协议自行实现。
笔者日常工作中,接触PHP语言较多,以下用一个PHP版本的Beanstalkd 客户端:pda/pheanstalk为例,简单演示Beanstalkd的任务处理流程。
安装 pda/pheanstalk
$ composer require pda/pheanstalk生产 job
创建一个 producer 来生产 job
producer.php
<?php
require_once('./vendor/autoload.php');
use Pheanstalk\Pheanstalk;
$pheanstalk  = new Pheanstalk('127.0.0.1', 11300);
$tubeName = 'syslog';
$jobData = [
    'type' => 'Debug',
    'level' => 3, // error log
    'content' => 'queue connect failed',
    'timestamp' => round(microtime(true) * 1000),
    'timeCreated' => date('Y-m-d H:i:s'),
];
$pheanstalk
    ->useTube($tubeName)
    ->put(json_encode($jobData));运行 producer.php
$ php producer.php消费 job
创建一个 consumer 来消费 job
consumer.php
<?php
if (PHP_SAPI !== 'cli') {
    echo 'Warning: should be invoked via the CLI version of PHP, not the '.PHP_SAPI.' SAPI'.PHP_EOL;
}
require_once('./vendor/autoload.php');
use Pheanstalk\Pheanstalk;
$pheanstalk  = new Pheanstalk('127.0.0.1', 11300);
$tubeName = 'syslog';
while (true) {
    // 从指定队列获取信息,reserve 阻塞获取
    $job = $pheanstalk->useTube($tubeName)->watch($tubeName)->ignore('default')->reserve(60);
    if ($job !== false) {
        // do stuff
        echo $data = $job->getData();
        // 处理完成,删除 job
        $pheanstalk->delete($job);
    }
    usleep(500000); // 0.5 s
}运行 consumer.php
$ php consumer.php
{"type":"Debug","level":3,"content":"queue connect failed","timestamp":1576113851568,"timeCreated":"2019-12-12 01:24:11"}可以使用deamontools和supervisor等将php consumer.php变为常驻内存的进程。
监控 beanstalkd 状态
创建一个 heartbeat 来检查与服务器的连接状态
heartbeat.php
<?php
/**
 * 心跳检查脚本:定期在定时任务系统(crontab、MySQL Event Scheduler、Elastic-Job)上运行并收集与服务器连接状态信息,
 * 如果连接不是活的状态,则可以发送消息报警(sms、email)
 */
if (PHP_SAPI !== 'cli') {
    echo 'Warning: should be invoked via the CLI version of PHP, not the '.PHP_SAPI.' SAPI'.PHP_EOL;
}
require_once('./vendor/autoload.php');
use Pheanstalk\Pheanstalk;
$pheanstalk  = new Pheanstalk('127.0.0.1', 11300);
$isAlive = $pheanstalk->getConnection()->isServiceListening();
var_dump($isAlive);运行 heartbeat.php
$ php heartbeat.php
bool(true)4. beanstalkd 管理工具
Tools:https://github.com/beanstalkd/beanstalkd/wiki/Tools
笔者经常使用的两款工具:
web 界面:https://github.com/ptrofimov/beanstalk_console
命令行:https://github.com/src-d/beanstool
对 beanstalkd 的操作也可以使用telnet,比如 telnet 127.0.0.1 11300。然后便可以执行 beanstalkd 的各命令,如 stats 查看信息,use, put, watch 等等。
telnet对beanstalkd的操作:
$ telnet 127.0.0.1 11300
stats
OK 929
---
current-jobs-urgent: 0
current-jobs-ready: 0
current-jobs-reserved: 0
current-jobs-delayed: 0
current-jobs-buried: 0
...
list-tubes
OK 23
---
- default
- syslog5. Beanstalkd 使用总结
如果需要对
job有持久化的需要,在启动beanstalkd时可以使用-b参数来开启binlog(二进制日志), 通过binlog可以将job及其状态记录到文件里,如果断电,则可以使用相同的选项重新启动beanstalkd,它将读取binlog来恢复之前的job及状态put前先要use tube xxxtube,这样put的时候就会把job放到指定名称的tube中,否则会放到一个default的tube中reserve或reserve-with-timeout前先要watch xxxtube,可以同时监控多个tube,这样可以同时取几个队列的任务。但是,千万要小心,如果在一个进程中,不小心watch到了多个tube,那么有时候会取错任务,一般取job的步骤为:useTube xxxtube -> watch xxxtube -> ignore default -> reservejob处理完成,应该delete删除掉,或者release再放回队列,或者bury把它埋葬掉,这个取决于你的设计
6. Beanstalkd 不足
无最大内存控制,如果有消息堆积或者业务使用方式有误,而导致内存暴涨拖垮机器
跟
Memcached类似,没有master-slave故障切换机制,需要自己解决单点问题