Festi Cron

Festi Cron is a powerful task scheduling and job queue management library for the Festi Framework. It provides support for scheduled cron jobs, queue-based execution, and Kafka integration for distributed task processing.

Installation

To install Festi Cron using Composer, add the following to your composer.json file:

{
    "require": {
        "festi-team/festi-framework-cron": "dev-develop"
    }
}

Then, run:

composer install

Features

  • Scheduler Worker – Manage and execute scheduled tasks efficiently.
  • Queue-based Execution – Process tasks in the background using job queues.
  • Singleton Workers – Ensure that a cron job does not run multiple times simultaneously.
  • Kafka Integration – Use Kafka-based queue processing for distributed workloads.
  • RabbitMQ Integration – Use AMQP-based queue processing via php-amqplib.
  • Exception Handling – Prevent execution conflicts and errors with built-in exception management.

Usage

1. Defining a Cron Job

Create a worker by extending the CronWorker class:

use core\cron\CronWorker;

class MyCronJob extends CronWorker
{
    public function onStart(): void
    {
        // Your cron task logic here
        echo "Executing MyCronJob...";
    }
}

2. Running a Cron Job

To execute a cron job manually:

$worker = new MyCronJob();
$worker->start();

3. Using Queue Workers

CREATE TABLE IF NOT EXISTS `queue_[TYPE]` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `data` longtext NOT NULL,
  `error` longtext NULL DEFAULT NULL,
  `id_worker` int(10) unsigned DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

To handle queued execution:

use cron\CronQueueWorker;

class MyQueueJob extends CronQueueWorker
{
    public function onStart(): void
    {
        echo "Processing queued job...";
    }


}

Register and run:

$worker = new MyQueueJob();
$worker->enqueue();

4. Singleton Execution

Ensure a job runs only once at a time:

use cron\CronSingletonWorker;

class BankTransferAccountsSyncCronWorker extends CronSingletonWorker
{
    /**
     * @override
     */
    public function start()
    {
        $plugin = Core::getInstance()->getPluginInstance('Payments');
        assert($plugin instanceof PaymentsPlugin);

        $plugin->syncWithServices();
    }
}

try {
    $worker = new BankTransferAccountsSyncCronWorker();
    $worker->start();

} catch (AlreadyRunningCronWorkerException $alreadyExp) {
    exit(0);
} catch (Exception $exp) {
    FestiUtils::doHandleCoreException($exp);
}
use cron\CronSingletonWorker;

class MoneyReportCronWorker extends CronSingletonWorker
{

    /**
     * @var StaffPlugin
     */
    protected $userPlugin;

    /**
     * @var FinancePlugin
     */
    protected $financePlugin;

    /**
     * @override
     */
    protected function onStart()
    {
        $core = Core::getInstance();

        $this->userPlugin = $core->getPluginInstance('Staff');
        $this->financePlugin = $core->getPluginInstance('Finance');
    } // end onStart

    /**
     * @override
     */
    protected function onRow($data)
    {
        $userValuesObject = new UserValuesObject($data);

        $this->financePlugin->onCalculateUserMonthlyPayment(
            $userValuesObject, 
            date(static::DATA_FORMAT)
        );
    }

    /**
     * @override
     */
    protected function onRowError($record, $exp)
    {
        $values = array(
            MoneyTransferValuesObject::FIELD_STATUS => MoneyTransferValuesObject::STATUS_ERROR,
            MoneyTransferValuesObject::FIELD_ERROR  => $exp->getMessage()
        );

        $this->financePlugin->updateMoneyTransfer($values, $record['id']);

        throw $exp;
    }

    /**
     * @override
     */
    protected function getSpool()
    {
        $search = array(
            'salary&IS NOT' => 'NULL',
            'hire_date&<=' => date('Y-m-t', $this->_currentTime),
            'sql_or' => array(
                array('fired&IS' => 'NULL'),
                array('fired&>=' => date(static::DATA_FORMAT, $this->_currentTime))
            )
        );

        return $this->userPlugin->search($search);
    }
}

try {
    $worker = new MoneyReportCronWorker();
    $worker->start();
} catch (Exception $exp) {
    FestiUtils::doHandleCoreException($exp);
}
class СabinetsOrderSyncQueue extends CronQueueSingletonWorker
{

    protected function getStorageName()
    {
        return 'queue_cabinets_order_sync';
    }

    protected function onRow($record)
    {
        $data = json_decode($record['data'], true);

        $plugin = Controller::getInstance()->getPluginInstance("CabinetOrders");
        $plugin->onTaskOrderSync($data);
    }

    protected function onDelay()
    {
        sleep(60);
    }
}

$worker = new СabinetsOrderSyncQueue($controller->db);
$worker->start();

5. Kafka Worker

Install PHP Kafka Driver

  1. Install C/C++ Driver
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make
sudo make install
  1. Install php-rdkafka
pecl install rdkafka

Used

class TestWorker extends KafkaCronWorker
{
    protected function onRow($record)
    {
        print_r($record);
    }
}

$worker = new TestWorker();
$worker->start();
php test_kafka.php --brokers 78.46.49.42 --topic test --id g1
echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list 78.46.49.42:9092 --topic test > /dev/null
class StatisticsWorker extends KafkaCronQueueSingletonWorker
{
    const TIMEOUT = 100;
    const FILE_NAME_OFFSET_POSTFIX   = 'offset';
    const DEFAULT_OFFSET_VALUE       = '0';
    const FILE_NAME_OFFSET_DELIMITER = '-';
    const REGEXP_DUPLICATE_EXCEPTION = '#duplicate key value violates#Umis';

    protected $core;
    protected $plugin;

    protected function onStart()
    {
        $file = $this->_getOffsetFilePath();

        if (!file_exists($file)) {
            $this->_onPrepareFileOffset($file);
        }

        $result = file_get_contents($file);

        if (!$result) {
            $this->_onPrepareFileOffset($file);
        }

        return true;
    }

    protected function onRow($record)
    {
        $this->plugin->onTaskAddStats($record);

        return true;
    }

    protected function getConsumerConfig()
    {
        $conf = new RdKafka\Conf();

        $conf->set('max.partition.fetch.bytes', 100);
        $conf->set('security.protocol', 'ssl');
        $conf->set(
            'ssl.ca.location',
            KAFKA_SSL_CERT_PATH.'kafka_trust_cert.pem'
        );
        $conf->set(
            'ssl.certificate.location',
            KAFKA_SSL_CERT_PATH.'kafka_client_cert.pem'
        );
        $conf->set(
            'ssl.key.location',
            KAFKA_SSL_CERT_PATH.'kafka_client_cet_key.pem'
        );

        $conf->set('group.id', $this->getOption('id'));

        return $conf;
    } // end getConsumerConfig


    protected function getTopicConfig()
    {
        $topicConf = new RdKafka\TopicConf();
        $topicConf->set('auto.commit.interval.ms', static::AUTOCOMMIT_TIMEOUT);

        // Set the offset store method to 'file'
        $topicConf->set('offset.store.method', 'file');
        $topicConf->set('offset.store.path', sys_get_temp_dir());

        return $topicConf;
    } // end getTopicConfig

    protected function onSpoolStart()
    {
        $this->plugin = $this->_getStatisticsPlugin();

        return true;
    } // end getTopicConfig

    protected function onRowError($record, $exp)
    {
        $msg = $exp->getMessage();

        if ($this->_isDuplicateKeyException($msg)) {
            return false;
        }

        $this->_setOffsetToFile($record);

        throw $exp;
    } // end onRowError

    protected function onRowCompleted($record)
    {
        $this->_setOffsetToFile($record);

        return true;
    } // end onRowCompleted
}

6. RabbitMQ Worker

The php-amqplib/php-amqplib package is a Composer runtime dependency — no system extension needed.

Single-queue consumer

use cron\RabbitMqCronWorker;

class OrderNotificationWorker extends RabbitMqCronWorker
{
    protected function onRow($record): void
    {
        $data = json_decode($record['data'], true);
        // process $data ...
    }
}

try {
    $worker = new OrderNotificationWorker();
    $worker->start();
} catch (\cron\AlreadyRunningCronWorkerException $e) {
    exit(0);
}
php worker.php --host=localhost --queue=order_notifications --id=worker1

Optional flags with defaults: --port (5672), --user (guest), --password (guest), --vhost (/).

Multi-queue consumer

Extend RabbitMqCronQueueWorker and pass a comma-separated list of queues. Each getSpool() call polls all queues in order.

use cron\RabbitMqCronQueueWorker;

class EventWorker extends RabbitMqCronQueueWorker
{
    protected function onRow($record): void
    {
        $queue = $record['queue']; // which queue this message came from
        $data  = json_decode($record['data'], true);
        // process $data ...
    }
}
php worker.php --host=localhost --queue=orders,payments,refunds --id=worker1

Multi-queue + singleton

Use RabbitMqCronQueueSingletonWorker to prevent duplicate processes. The lock file is keyed by class name and --id.

use cron\RabbitMqCronQueueSingletonWorker;

class PaymentWorker extends RabbitMqCronQueueSingletonWorker
{
    protected function onRow($record): void
    {
        // process payment message ...
    }
}

try {
    $worker = new PaymentWorker();
    $worker->start();
} catch (\cron\AlreadyRunningCronWorkerException $e) {
    exit(0);
}

SSL / custom connection

Override onInit() to use a custom connection class (e.g. AMQPSSLConnection):

use cron\RabbitMqCronWorker;
use PhpAmqpLib\Connection\AMQPSSLConnection;

class SecureWorker extends RabbitMqCronWorker
{
    protected function onInit(): void
    {
        if (!class_exists('PhpAmqpLib\\Connection\\AMQPSSLConnection')) {
            throw new \cron\CronException("Not found PHP package: php-amqplib/php-amqplib");
        }

        $this->connection = new AMQPSSLConnection(
            $this->getOption('host'),
            $this->getOption('port'),
            $this->getOption('user'),
            $this->getOption('password'),
            $this->getOption('vhost'),
            ['cafile' => '/path/to/ca.pem']
        );
        $this->channel = $this->connection->channel();
        $this->channel->queue_declare($this->getOption('queue'), false, true, false, false);
    }

    protected function onRow($record): void
    {
        // ...
    }
}

ACK / NACK behaviour

  • Success (onRowCompleted): message is ACKed and removed from the queue.
  • Error (onRowError): message is NACKed with requeue=false (routed to dead-letter exchange or discarded), then the exception is re-thrown. Override onRowError() to implement custom retry logic.

Best Practices

  • Use Singleton Workers to prevent duplicate executions.
  • Monitor Cron Logs to troubleshoot job failures.
  • Use Kafka or RabbitMQ for Distributed Jobs when scaling across multiple servers.
  • Define Execution Time Limits to avoid long-running tasks blocking the queue.