Developed with love by KnpLabs Hire us for your project!
32

JobQueueBundle

by usemarkup

The Markup Job Queue bundle integrates with RabbitMQ (via https://github.com/videlalvaro/RabbitMqBundle) to provide automatic scheduling of recurring console...

Build Status

Introduction

This bundle provides a few features for providing a simple job queue mechanic and scheduling system for symfony console commands.
It uses rabbit-mq to manage a job queue, (for which various workers process tasks). Before proceeding you should read: https://github.com/videlalvaro/RabbitMqBundle.
This bundle assumes the use of 'topic' consumers rather than 'direct' consumers.

These workers should be maintained by supervisord to ensure they don't fail.

Features

  • Add console command jobs to RabbitMq to be handled asyncronously
  • Add jobs to run at a date in the future
  • Log the status of jobs (status, peak memory use, output etc) in redis via uuid option added to all console commands
  • Consume Jobs with a PHP consumer or Golang consumer (thanks to ricbra/rabbitmq-cli-consumer)
  • Helper command for generating config to manage consumers with Supervisord

Scheduling Jobs

Rather than scheduling console commands using the crontab, they should be managed in environment specific configuration files (using crontab syntax). This allows the addition of new recurring jobs, or changing of timings, without having to modify crontab in multiple environments. It also has the advantage of forcing a common logging/exception notification strategy for all commands. Examples of these sorts of tasks are polling third party servers for files, sending spooled email or generating reports.

Configuration

markup_job_queue:
    recurring: recurring_jobs.yml # name of file within app/config/
# app/config/recurring_jobs.yml
- command: your:console:command --and --any --options and arguments
  schedule: 1-59/2 * * * *
  topic: topic-of-a-configured-rabbitmq-producer # e.g 'default'
- command: another:console:command --and --any --options and arguments
  schedule: * * * * *
  topic: topic-of-a-configured-rabbitmq-producer

Once you have configured your recurring schedule you need to add only one console command to your live crontab.
This will run a single console command every minute adding any 'due' jobs to RabbitMQ for processing:

* * * * * /usr/bin/php /your/app/location/current/app/console markup:job_queue:recurring:add --no-debug -e=prod >> /var/log/recurring_jobs.log

In development instead of installing to the crontab you can run on an interval in the command line if you prefer:

while true; do app/console markup:job_queue:recurring:add; sleep 60; done

For the value of 'topic' a valid consumer and producer need to be set up in the oldsound/rabbitmq-bundle configuration as follows, without a configuration of this type, processing of the job will fail (this is currently a convention but would be better enforced by allowing this bundle to configure the oldsound bundle directly - PR's welcome):
Due to the way oldsound/rabbitmq-bundle treats certain keys, do not use hypens in producers and consumers.


producers:
    a_valid_topic:
        connection:       default
        exchange_options: { name: 'a_valid_topic', type: topic }

consumers:
    a_valid_topic:
        connection:       default
        exchange_options: { name: 'a_valid_topic', type: topic }
        queue_options:    { name: 'a_valid_topic' }
        callback:         markup_job_queue.consumer

There are a few console commands that allow you to preview and validate your configured console jobs via the CLI (see /Command)

Adding Jobs

Jobs can also be added directly. There is a utility method for adding 'command' jobs, which uses the Symfony process component to execute console commands. Adding a 'Command Job' can be achieved using the 'jobby' service as follows:

$container->get('jobby')
    ->addCommandJob(
        'your:console:command --plus=any --options or arguments', #this needs to be a valid command
        'a_valid_topic', # should be a valid topic name
        600, # allowed timeout for command (see symfony process component documentation)
        600, # allowed idle timeout for command (see symfony process component documentation)
    )

You can use this mechanism to break down large import tasks into smaller sections that can be processed asynchronously. Make sure you appropriately escape any user provided parameters to your console commands. Due to the way that console commands are consumed using the Process component, unescaped parameters are a possible security attack vector.

Enabling and Monitoring Workers (via supervisord)

To aid with deployment of this bundle, a console command has been provided which can be run as part of a deployment. This console command will generate a supervisord file for the purpose of including within your main supervisord.conf file. This will produce a configuration that initiates and watches php 'consumers', providing one consumer per topic. There are two options for consuming jobs. The default mechanism is to use the PHP consumers provided by oldsound/rabbitmq-bundle, but an alternative mechanism uses the Golang based consumer (ricbra/rabbitmq-cli-consumer). To use the Golang variant, provide a configuration for the cli_consumer node.

markup_job_queue:
    cli_consumer:
        enabled: true

This console command requires a minimal configuration (one block for each consumer you want to start). By convention these must match the consumers you have already defined (as seen above). Due to the way oldsound/rabbitmq-bundle treats certain keys, do not use hypens in your topic names.:

By setting 'prefetch_count' you can select how many messages the consumer should process before respawning.

markup_job_queue:
    topics:
        test:
            prefetch_count: 10
        a_valid_topic:
            prefetch_count: 20

To write the configuration file:

app/console markup:job_queue:supervisord_config:write disambiguator

The file will be written to /etc/supervisord/conf.d/ by default. This can be amended:
yml
markup_job_queue:
supervisor_config_path: /path/to/conf/file/

This path needs to be included in your main /etc/supervisord.conf thus:
conf
[include]
files=/path/to/conf/file/*.conf

Deployment

To use this as part of a capistrano deployment for example you can write some custom capistrano tasks that:

  • Stop consumers
  • Rewrite the configuration
  • Restart the consumers

The following assumes use of capistrano multistage under capifony 2.X YMMV
ruby
namespace :supervisor do
desc "Supervisor Tasks"
task :check_config, :roles => :app do
stream "cd #{latest_release} && #{php_bin} #{symfony_console} markup:job_queue:recurring:check --env=#{symfony_env}"
end
task :write_config, :roles => :worker, :except => { :no_release => true } do
stream("cd #{latest_release} && #{php_bin} #{symfony_console} markup:job_queue:supervisord_config:write #{fetch(:stage)} --env=#{symfony_env_prod};")
end
task :restart_all, :roles => :app, :except => { :no_release => true } do
stream "#{try_sudo} supervisorctl stop all #{fetch(:stage)}:*"
stream "#{try_sudo} supervisorctl update"
stream "#{try_sudo} supervisorctl start all #{fetch(:stage)}:*"
capifony_puts_ok
end
task :stop_all, :roles => :app, :except => { :no_release => true } do
# stops all consumers in this group
stream "#{try_sudo} supervisorctl stop all #{fetch(:stage)}:*"
capifony_puts_ok
end
end

Copyright (c) 2015 Markup Digital Ltd

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
markup_job_queue:
topics:

# Prototype
name:
prefetch_count: 1
rabbitmq:

# RabbitMQ host
host: localhost

# RabbitMQ username
username: guest

# RabbitMQ password
password: guest

# RabbitMQ vhost
vhost: /

# RabbitMQ port
port: 5672
cli_consumer:

# If enabled, the supervisord config writer will use the golang cli-consumer instead of the php consumer
enabled: false

# The path into which to place rabbit-cli-consumer log files
log_path: /var/log/rabbitmq-cli-consumer

# The path into which to place rabbit-cli-consumer configuration files
config_path: /etc/rabbit-cli-consumer/config

# The full path to the binary rabbit-cli-consumer
consumer_path: /usr/local/bin/rabbitmq-cli-consumer

# The path to a .yml file containing configuration for recurring jobs
recurring: false

# Path to store supervisord configuration files. Your supervisord configuration should load all files from this path
supervisor_config_path: /etc/supervisord/conf.d/
job_logging_ttl: 1209600
clear_log_for_complete_jobs: false

# Choose whether to use a Symfony app root directory for the location of the Symfony console script. Defaults to app root for Symfony 2, and bin/ for Symfony 3+.
use_root_dir_for_symfony_console: false
  • fix: Explicitly set job queue environment in supervisord
    By , 2 years ago
  • Merge pull request #67 from usemarkup/feat/disable_jobs_in_envs
    By web-flow, 2 years ago
  • fix(travis/composer): correct php versions
    By calumbrodie, 2 years ago
  • feat(command): configure recurring jobs to only run in certain envs
    By calumbrodie, 2 years ago
  • chore: add php 7.2 to travis build matrix
    By shieldo, 2 years ago
  • fix: use console table class if available (sf3 compat) (#65)
    By gsdevme, 2 years ago
  • chore: fix travis builds (#64)
    By gsdevme, 2 years ago
  • fix: correct the data class in the search job logs form type
    By shieldo, 2 years ago
  • fix: add symfony 3 support to search job logs form type
    By shieldo, 2 years ago
  • feat: add a config option to allow setting location of the symfony console script
    By shieldo, 2 years ago
  • fix: use v2 of php rabbit management api package (#59)
    By web-flow, 2 years ago
  • fix: addMethodCall arg 2 must be an array (#58)
    By shieldo, 2 years ago
  • feat: adds config option to delete job logs immediately when the jobs are complete (#57)
    By web-flow, 2 years ago
  • chore: use trusty images on travis CI (#55)
    By web-flow, 2 years ago
  • fix: remove scalar type hint as this library still targets php5 (#56)
    By web-flow, 2 years ago
  • chore: specify using releases of rabbit management api package (#54)
    By web-flow, 2 years ago
  • chore: allow use with symfony 3 (#52)
    By web-flow, 2 years ago
  • fix: ensure exit codes are ints (#47)
    By shieldo, 2 years ago
  • chore(travis): improve build with a matrix & low deps (#53)
    By shieldo, 2 years ago
  • chore: correct composer, include orm as required (#49)
    By shieldo, 2 years ago
  • chore: allow twig 2 (#51)
    By web-flow, 3 years ago
  • Merge pull request #50 from usemarkup/fix/switchingToEm
    By web-flow, 3 years ago
  • Update ScheduledJobRepository.php
    By web-flow, 3 years ago
  • chore: fix unit test for ScheduledJobService
    By gsdevme, 3 years ago
  • chore: make repository more lazy to ensure metadata only loaded when required
    By gsdevme, 3 years ago
  • Merge pull request #48 from usemarkup/add-debug-to-cli
    By web-flow, 3 years ago
  • feat(config): Set no-debug option in CLI consumer config
    By Alasdair Stalker, 3 years ago
  • Merge pull request #46 from usemarkup/feat/add-the-ability-to-configure-the-job-logging-ttl
    By web-flow, 3 years ago
  • feat(logging): adds the ability to configure the ttl
    By gsdevme, 3 years ago
  • feat(doctrine): switching the save method to using the new doctrines get manager method (#45)
    By gsdevme, 3 years ago