上一章 文档首页 下一章


计算机科学中的任何问题都可以用另外的间接层解决,但是这通常会引发另一个问题。 -- David Wheeler

2.15.1 新型计划任务回顾

[1.31]-新型计划任务:以接口形式实现的计划任务 一章中,我们讨论了PhalApi中对计划任务的设计和底层实现。

但对于很多应用,很多项目,或者很多同学来说,仍然比较广泛,不能直接使用。
这一章则专门为此而进行演进,并提供最终可用的计划任务调度,同时我们也会阐明如何进行扩展定制。

也就是说,这一章将提供Task扩展类库的统一调度方式,以便在启动crontab任务后,可以通过数据库简单配置,即可执行各种任务。

2.15.2 最终调度的方式:crontab

出于对业务的考虑,我们首先需要明确此crontab调度方式所支持的功能,它应该包括但不限于:

  • 1、通过简单的数据库配置,即可启动一个新的任务
  • 2、具备循环调度的能力,并能初步防止并发调度
  • 3、可以对异常的任务进行修复
  • 4、优先执行太远未执行的任务
  • 5、支持本地和远程两种调度方式、三种MQ类型,以及扩展的能力

2.15.3 核心时序图与分层

在原来的时序图基础上,我们可以进行演进的设计,追加了统一的调度后如下所示:
a pic

通过上面详细的时序图,我们可以发现里面的设计是出于这样的分层考虑:

序号 关键操作 说明 如何使用
1 启动脚本 crontab.php 操作crontab执行的脚本 客户端可以进行必要的初始化工作
2 进程级 Task_Progress::run() 根据进程配置的数据库表,进行循环调度 不需要改动,直接使用
3 触发器 Task_Trigger::fire() 进行计划任务调度的上下文环境,用于指定runner和mq类型 客户端也可进行定制扩展,进行必要的操作
4 MQ消费与调度 Task_MQ::pop()和Task_Runner::go() 不断消费MQ队列,并依次进行调度 不需要改动,直接使用,也可扩展
5 计划任务服务 PhalApi_Api::doSth() 执行计划任务服务 由客户端按接口形式实现

虽然上面的层级,初看起来有点多,但我们再次验证了计算机那个伟大的定论:计算机的任何问题都可以通过一个中间层来解决。

由此看出,上面的层级其实相当于:

客户端初始化 --> 直接使用 --> 自由组合与操作 --> 直接使用 --> 任务服务实现

2.16.4 进程配置的数据库表设计

CREATE TABLE `phalapi_task_progress` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `title` varchar(200) DEFAULT '' COMMENT '任务标题',
  `trigger_class` varchar(50) DEFAULT '' COMMENT '触发器类名',
  `fire_params` varchar(255) DEFAULT '' COMMENT '需要传递的参数,格式自定',
  `interval_time` int(11) DEFAULT '0' COMMENT '执行间隔,单位:秒',
  `enable` tinyint(1) DEFAULT '1' COMMENT '是否启动,1启动,0禁止',
  `result` varchar(255) DEFAULT '' COMMENT '运行的结果,以json格式保存',
  `state` tinyint(1) DEFAULT '0' COMMENT '进程状态,0空闲,1运行中,-1异常退出',
  `last_fire_time` int(11) DEFAULT '0' COMMENT '上一次运行时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;

对此表的关键字段说明如下:

字段 说明 示例
trigger_class 触发器的类名 须实现Task_Progress_Trigger::fire($params)接口
fire_params 触发器的参数 加传给Task_Progress_Trigger::fire()函数的参数,格式为:service&MQ类名&runner类名
interval_time 执行间隔 单位为秒
enable 是否启动 此字段禁止时,将不再执行
state 进程状态 当此状态一直为异常或者运行且超过1天时,系统会进行修复,即重置为空闲状态

其中,对于fire_params参数,MQ类名和runner类名可选,以下是一些示例:

//示例1:完整的配置
//fire_params=Task_Demo.DoSth&Task_MQ_DB&Task_Runner_Local
$mq = new Task_MQ_DB();
$runner = new Task_Runner_Local($mq);
$runner->go('Task_Demo.DoSth');

//示例2:使用默认的Runner
//fire_params=Task_Demo.DoSth&Task_MQ_DB
$mq = new Task_MQ_DB();
$runner = new Task_Runner_Local($mq);  //默认使用本地Runner
$runner->go('Task_Demo.DoSth');

//示例3:使用默认的MQ和默认的Runner
//fire_params=Task_Demo.DoSth
$mq = new Task_MQ_Redis(); //默认使用redis的MQ
$runner = new Task_Runner_Local($mq);  //默认使用本地Runner
$runner->go('Task_Demo.DoSth');

//示例4:使用自定义的MQ和Runner
//fire_params=Task_Demo.DoSth&My_MQ&My_Runner
class My_MQ implements Task_MQ {
     // ...
}
class My_Runner extends Task_Runner {
     // ...
}
$mq = new My_MQ();
$runner = new My_Runner($mq);
$runner->go('Task_Demo.DoSth');

2.15.5 运行效果

最终的效果就是,我们通过这样两行简单的代码,即可实现一系列复杂的任务调度:

$progress = new Task_Progress();
$progress->run();

让我们来看下这样设计的运行效果吧!看下这两行代码背后所产生的魔力。

首先,我们先添加两条计划任务:

INSERT INTO `phalapi_task_progress` VALUES ('1', 'test demo', 'Task_Progress_Trigger_Common', 'Task_Demo.DoSth&Task_MQ_File&Task_Runner_Local', '300', '1', '', '0', '0');
INSERT INTO `phalapi_task_progress` VALUES ('2', 'test ok', 'Task_Progress_Trigger_Common', 'Default.Index&Task_MQ_DB&Task_Runner_Local', '100', '1', '', '0', '0');

然后,伪造一些MQ:

INSERT INTO `phalapi_task_mq_0` VALUES ('8', 'Default.Index', '', '0', '');

最后,生成单元测试:

<?php
class PhpUnderControl_TaskProgress_Test extends PHPUnit_Framework_TestCase
{
    public $taskProgress;

    protected function setUp()
    {
        parent::setUp();

        $this->taskProgress = new Task_Progress();
    }

    /**
     * @group testRun
     */
    public function testRun()
    {
        $rs = $this->taskProgress->run();
    }

}

并执行之:

$ phpunit ./Task_Progress_Test.php

[1 - 0.06666s]SELECT id, title FROM phalapi_task_progress WHERE (state != ?) AND (last_fire_time < ?) AND (enable = ?) ORDER BY last_fire_time ASC; -- 0, 1431965153, 1<br>
[2 - 0.07002s]SELECT id, title, trigger_class, fire_params FROM phalapi_task_progress WHERE (state = 0) AND (interval_time + last_fire_time < ?) AND (enable = ?); -- 1432051553, 1<br>
[3 - 0.06549s]SELECT enable, state FROM phalapi_task_progress WHERE (id = '1');<br>
[4 - 0.07432s]UPDATE phalapi_task_progress SET state = 1 WHERE (id = '1');<br>
[5 - 0.06469s]UPDATE phalapi_task_progress SET result = '{\"total\":0,\"fail\":0}', state = 0, last_fire_time = 1432051553 WHERE (id = '1');<br>
[6 - 0.06746s]SELECT enable, state FROM phalapi_task_progress WHERE (id = '2');<br>
[7 - 0.07043s]UPDATE phalapi_task_progress SET state = 1 WHERE (id = '2');<br>
[8 - 0.06673s]SELECT id, params FROM phalapi_task_mq_0 WHERE (service = 'Default.Index') ORDER BY id ASC LIMIT 0,10;<br>
[9 - 0.48185s]DELETE FROM phalapi_task_mq_0 WHERE (id IN ('8'));<br>
[10 - 0.06514s]SELECT id, params FROM phalapi_task_mq_0 WHERE (service = 'Default.Index') ORDER BY id ASC LIMIT 0,10;<br>
[11 - 0.50694s]UPDATE phalapi_task_progress SET result = '{\"total\":1,\"fail\":0}', state = 0, last_fire_time = 1432051553 WHERE (id = '2');<br>

Time: 1.98 seconds, Memory: 6.50Mb

OK (1 test, 0 assertions)

查看对比一下数据库,目前发现运行良好!

提交代码,保存文档,收工睡觉!

2.15.6 演进的乐趣

得益于前期良好的设计以及底层支持,我们发现,在提供这样一种统一的调度方式是非常方便的。

不仅如此,如果你明白了其中的设计,需要进行定制和扩展也是非常方便的。也就是说,我们不仅提供了一种具体实际可用的方式,也提供了广阔自由的扩展空间。具体与抽象,两者仍可得。

然而,这一切不仅依赖于良好的设计,还依赖于测试驱动开发下的浮现式设计。


上一章 文档首页 下一章

还有疑问?欢迎到社区提问!    切换到PhalApi 2.x 开发文档。

Fork me on GitHub