Define more usable queue DX for multithreaded background work
See also: dev-docs#674 dev-docs#676
Today, if you want to do background work, the general expectation among Civi devs is that you need to create a cron job which queries some bespoke data-structure - and, consequently, the general default is for people to do work synchronously. The aim of this ticket is to reduce the barrier to doing asynchronous work.
Queues are particularly useful for doing expensive background work and deferred work. The CRM_Queue
drivers were based on Drupal's interface, and they were originally added to support certain upgrade/frontend tasks. However, the docs and original built-in helpers were focused on blocking, single-threaded, foreground usage - they need revision to be pleasant for multiprocess background work.
General sketch of usage
// Send an email in the background...
Civi::queue()->createItem(new CRM_Queue_Task(
['CRM_Core_BAO_MessageTemplates', 'sendTemplate'],
[[
'groupName' => 'foo',
'messageTemplateID' => 123,
...
]],
));
// Process imports in the background
for ($offset = 0; $offset < $importRows; $offset += $pageSize) {
Civi::queue()->createItem(new CRM_Queue_Task(
['CRM_Import_Processor', 'importRows'],
[$offset, $pageSize, ...]
));
}
The Civi::queue($name = 'default?open,multi')
would use the $name
expression to find or create the queue. The name includes a brief type expression to indicate how one expects the queue to operate.
Civi::queue('db-upgrader?closed,single')
Civi::queue('email/thankyou?open,multi')
Civi::queue('import/1234abcd?closed,single')
Civi::queue('import/4567efgh?closed,multi')
Notes:
-
import/1234abcd
andimport/4567efgh
would be different queues for different tasks. -
?open
: The task-list is open-ended. You may add more tasks to the queue at any time. There is no "last" task. -
?closed
: The task-list is pre-planned at the start. Some task will be "last". This means that you may have an 'on-complete' type item. -
?single
: The runner(s) may only execute one task from this queue at any given time. -
?multi
: The runner(s) may execute multiple tasks from this queue at any given time.
To run a queue, one could use an API like:
## Run any/all known queues
cv api4 Queue.run maxTime=300 maxTasks=20 +w 'name like "%"'
## Run only the import-related queues
cv api4 Queue.run maxTime=300 maxTasks=20 +w 'name like "import/%"'
Some tasks
- Add a variant of
CRM_Queue_Queue_Sql
which supports non-blocking/parallel-processing behavior (e.g.CRM_Queue_Queue_ParaSql
). @artfulrobot reported working on an implementation of htis. - Add a facade
Civi::queue($name)
to create-or-load a queue. - Add an API call and default cron job which can work on tasks from the queue
- Add an example of a dedicated worker script which runs continuously in the background
- Add some drivers that use dedicated queuing systems instead of MySQL
- For
CRM_Queue_Task
, add an on-error callback