修身洁行,言必由绳墨

beanstalkd轻量级消息队列,pheanstalk操作全过程

2020.03.03

公司项目随着消息量的越来越大,整消息队列刻不容缓,秉着够用的理念,找个轻量级的,考虑过redis,但显得有些不专业,找啊找啊找,看对眼beanstalkd,用了这么久,还真是挺不错,占用资源极少。

介绍

Beanstalkd,一个高性能、轻量级的分布式内存队列系统,最初设计的目的是想通过后台异步执行耗时的任务来降低高容量Web应用系统的页面访问延迟,支持过有9.5 million用户的Facebook Causes应用。后来开源,现在有PostRank大规模部署和使用,每天处理百万级任务。

安装使用

方式1

yum -y install beanstalkd

方式2

官方:https://beanstalkd.github.io/download.html

wget -c https://github.com/beanstalkd/beanstalkd/archive/v1.11.tar.gz
tar -zxvf v1.11.tar.gz
cd beanstalkd-1.11
make install PERFIX=/usr/bin/beanstalkd

启动

/usr/bin/beanstalkd -l 0.0.0.0 -p 11300 -b /var/lib/beanstalkd/binlog -F &

参数

/usr/bin/beanstalkd -h
Use: /usr/bin/beanstalkd [OPTIONS]

Options:
-b 开启binlog,断电后重启会自动恢复任务。
-f MS fsync最多每MS毫秒
-F 从不fsync(默认)
-l ADDR侦听地址(默认为0.0.0.0)
-p 端口侦听端口(默认为11300)
-u USER成为用户和组
-z BYTES设置最大作业大小(以字节为单位)(默认值为65535)
-s BYTES设置每个wal文件的大小(默认为10485760)(将被舍入到512字节的倍数)
-c 压缩binlog(默认)
-n 不要压缩binlog
-v 显示版本信息
-V 增加冗长度
-h 显示这个帮助

配置

/etc/sysconfig/beanstalkd

特性

  • 优先级:可以设置任务的优先级
  • 延迟:设置任务多少秒后才允许被消费者使用
  • 持久化:定时刷新数据到文件,服务器挂掉后数据依旧存在
  • 超时会重发:消费者必须在指定时间内完成任务,否则就会重新放入管道
  • 任务预留:消费者先暂时跳过任务不处理

任务状态

任务和管道
任务的状态转换

  • ready:任务已经准备好了,可以给消费者使用。
  • delay:任务放入管道的时候设置了延迟时间。
  • reserve:消费者把任务读取出来
  • buried:任务先放在一边,以后还会用
  • delete:任务从队列删除

php类管理

pheanstalk 是php类操作beanstalkd,对beanstalkd命令的封装。可以通过该类实现对任务的操作。

https://github.com/pda/pheanstalk

安装pheanstalk

composer require pda/pheanstalk

查看状态

require 'vendor/autoload.php';
use Pheanstalk\Pheanstalk;
$pheanstalk = new Pheanstalk('127.0.0.1',11300);
//查看beanstalk的当前信息命令行的封装
print_r($pheanstalk->stats());

结果:
beanstalkd当前状态

查看某个任务

require 'vendor/autoload.php';
use Pheanstalk\Pheanstalk;
$pheanstalk = new Pheanstalk('127.0.0.1',11300);
print_r($pheanstalk->putInTube('mytube','job',1000));
$job = $pheanstalk->watch('mytube')->reserve();
print_r( $pheanstalk->statsJob($job));

结果:
job的信息

其他操作

require 'vendor/autoload.php';
use Pheanstalk\Pheanstalk;
//现在命令行输入  beanstalkd -l 127.0.0.1 -p 11300 & 开启
$pheanstalk = new Pheanstalk('127.0.0.1',11300);
//查看beanstalk的当前信息
// 可以通过命令行对管道进行管理,本php类对管道的管理就是对beanstalkd命令行的封装
print_r($pheanstalk->stats());
// 当前的管道
print_r($pheanstalk->listTubes());
// 查看默认的管道
print_r($pheanstalk->statsTube('default'));
//在mytube 里面放入任务 one,使用默认的优先级,延迟重发时间,超时重发时间,
$pheanstalk->useTube('mytube')->put('one');

//从管道取出任务
$job = $pheanstalk->watch('mytube')->reserve();
//根据job的编号获取信息
$job = $pheanstalk->peek(1);
$pheanstalk->delete($job);//删除任务

$pheanstalk->watch('mytube')->delete($job);
//会把任务重新放入管道,并把任务设置为ready状态
$pheanstalk->release($job);

//把任务放在一边暂时不处理,条件成熟了在读取出来
$pheanstalk->bury($job);
$pheanstalk->peekBuried('mytube');// 获取处理bury状态的任务
$pheanstalk->kickJob($job);//会把job转变成ready状态
$pheanstalk->kick(200);//把job编号小于200的变成ready状态

//--- 获取某个状态的任务
$pheanstalk->peekDelayed();
$pheanstalk->peekReady();
$pheanstalk->peekBuried();

$pheanstalk->pauseTube('mytube',10);//阻塞整个管道时间

生成和消费

任务放入队列:

require  'vendor/autoload.php';
use Pheanstalk\Pheanstalk;
$pheanstalk = new Pheanstalk("127.0.0.1",11300);
$pheanstalk->useTube('mytube')->put('one',1024,10,3);
$pheanstalk->useTube('mytube')->put('two',1023);
$pheanstalk->useTube('mytube')->put('three',1025);
print_r($pheanstalk->stats());

消费处理任务:

require  'vendor/autoload.php';
use Pheanstalk\Pheanstalk;
//终端等待表示没有生产者产生数据
$pheanstalk = new Pheanstalk("127.0.0.1",11300);
//reserve可以设置阻塞时间,不设置会一直等待,watch 可以同时监听两个管道,
$job = $pheanstalk->watch('mytube')->watch('default')->reserve(); // reserver是获取ready的job
$pheanstalk->ignore('default');//忽略管道
//处理代码
if ($job->getData() == 'one') {
    sleep(2);// 超时重回队列3秒,睡眠两秒,delete之前的代码还能执行1秒,如果想在加
    $pheanstalk->watch('mytube')->touch($job);//续命
}

具体对队列和任务的操作可以结合实际逻辑和设置任务和管道的状态。