Skip to main content

Queues

Sometimes when building your application, you may have to write features that are time intensive. In most cases, these features are not ideal to be ran during a web request as you don't want to keep the user waiting. Thankfully, Formidable allows you to easily create queued jobs that may be processed in the background. This will improve the speed of your application.

Getting Started

Formidable Queues don't come pre-configured with Formidable applications. But we can easily set them up.

Prerequisites

Configuration

Out of the box, a new Formidable application comes with a config/queue.ts or config/queue.imba config file. This file contains the default configuration for the queue. You can change the default configuration to suit your needs:

config/queue.ts
import { env } from '@formidablejs/framework'

export default {

/**
* --------------------------------------------------------------------------
* Default Queue Connection Name
* --------------------------------------------------------------------------
*
* Here you may specify which of the queue connections below you wish
* to use as your default connection for all queue workers.
*/

default: env('QUEUE_CONNECTION', 'sync'),

/**
* --------------------------------------------------------------------------
* Queue Connections
* --------------------------------------------------------------------------
*
* Here are each of the queue connections setup for your application.
* Feel free to add more.
*
* Drivers: "sync", "redis"
*/

connections: {
sync: {
driver: 'sync',
queue: 'sync'
},

redis: {
driver: 'redis',
queue: 'default',
redis: env('REDIS_QUEUE', 'queue'),
timeout: 3000,
retries: 3
}
}

}

In the config/queue.ts or config/queue.imba config file, you can configure the default queue connection. By default, Formidable Queues ships with a sync driver. This driver will run the jobs synchronously within the same process as the server. This is great for development, but not ideal for production. For production, you should use a queue driver such as redis. This will allow you to run your jobs in the background.

Database Consideration

When using the redis driver, Formidable Queues will attempt to use the queues redis connection defined in the config/database.ts or config/database.imba config file. If you wish to use a different redis connection, you can add a new connection in the config/database.ts or config/database.imba config file under the redis key:

config/database.ts
export default {
...

redis: {
...

queue: {
url: helpers.env('REDIS_URL'),
host: helpers.env('REDIS_HOST', '127.0.0.1'),
password: helpers.env('REDIS_PASSWORD', null),
port: helpers.env('REDIS_PORT', '6379'),
database: helpers.env('REDIS_CACHE_DB', '2')
}
}
}

Creating Jobs

Generating Job Classes

By default, all of the queueable jobs for your application are stored in the app/Jobs directory. If the app/Jobs directory doesn't exist, it will be created when you run the make:job Craftsman command:

node craftsman make:job ProcessAudio

The generated class will extend the Queueable class from the @formidablejs/queues package.

Class Structure

Job classes are very simple, normally containing only a handle method that is invoked when the job is processed by the queue. To get started, let's take a look at an example job class. In this example, we'll pretend we manage a audio publishing service and need to process the uploaded audio files before they are published:

app/Jobs/ProcessAudio.ts
import { Queueable } from '@formidablejs/queues'

export class ProcessAudio extends Queueable {
/**
* Handle job.
*/
handle(audioId: number): any {
console.log('Process uploaded audio...')
}
}

Custom Queue

By default, Formidable Queues use the default queue. You can change this per job. If you want a specific job to use a specific queue, you can add a queue getter in the job class:

app/Jobs/ProcessAudio.ts
import { Queueable } from '@formidablejs/queues'

export class ProcessAudio extends Queueable {
/**
* Queue to run job on.
*/
get queue(): string {
return 'custom_queue'
}
}

Timeout

You can also define a timeout for specific jobs by adding a timeout getter that returns the delay in human readable format as a string:

app/Jobs/ProcessAudio.ts
import { Queueable } from '@formidablejs/queues'

export class ProcessAudio extends Queueable {
/**
* The timeout time for the job.
*/
get timeout(): string {
return '2 hours'
}
}

Registering Jobs

Once you have created your first job, you will need to register it for the framework to know which job to run when the queue is running.

To register a job, simply import it in the Kernel.ts or Kernel.imba file under the app/Console directory and add it in the jobs getter:

app/Console/Kernel.ts
import { Queueable } from '@formidablejs/queues'
import { ProcessAudio } from '../Jobs/ProcessAudio'
...
export class Kernel extends ConsoleKernel {
...
get jobs(): Array<typeof Queueable> {
return [
ProcessAudio
]
}
...
}

Dispatching Jobs

Once you have written your job class, you may dispatch it using the dispatch method on the job itself. The arguments passed to the dispatch method will be given to the job's handle method:

app/Http/Controllers/AudioController.ts
import { Request } from '@formidablejs/framework'
import { strRandom } from '@formidablejs/framework'
import { AudioRepository } from '../../Repositories/AudioRepository'
import { Controller } from './Controller'
import { ProcessAudio } from '../../Jobs/ProcessAudio'

export class AudioController extends Controller {
private readonly audio = new AudioRepository

/**
* Store a new audio.
*/
async store(request: Request): Promise<any> {
const file = request.file('audio').first!

const path = `storage/framework/audio/${strRandom(20)}.${file.ext}`

const audio = await this.audio.create({
...,
path: path
})

file.move(path)

await ProcessAudio.dispatch(audio.id)
}
}

Delayed Dispatching

You may also delay a job with the delay method:

app/Http/Controllers/AudioController.ts
export class AudioController extends Controller {
private readonly audio = new AudioRepository

/**
* Store a new audio.
*/
async store(request: Request): Promise<any> {
...

await ProcessAudio
.delay('30 minutes')
.dispatch(audio.id)
}
}

Processing Jobs

Running Queue

By default, when running the queue, Formidable will run from the default queue:

node craftsman queue:work

But if you wish to use a different queue, you may pass the --queue flag with the name of the queue you wish to run:

node craftsman queue:work --queue=custom_queue

You may also pass as many --queue flags as you wish.

Retrying Failed Jobs

To retry failed jobs, you may use the queue:retry command:

node craftsman queue:retry

This will retry failed jobs from the default queue, to retry failed jobs of a different queue, simply pass the --queue flag with the name of the queue:

node craftsman queue:retry --queue=custom_queue

You may also pass as many --queue flags as you wish.

Removing Failed Jobs

If you wish to remove failed jobs that may never be ran again, you can use the queue:flush command:

node craftsman queue:flush

Similar to other commands we've already looked at, you can also pass the --queue flag with the name of the queue to delete failed jobs from:

node craftsman queue:retry --queue=custom_queue

Queue Information

Should you wish to know more about your application's jobs, you may use the queue:about command:

node craftsman queue:flush

Production Consideration

When running your application in production, it is recommended to use pm2 for your queues:

ecosystem.config.js
module.exports = {
apps: [
{
name: "queue",
script: "node craftsman queue:work --no-ansi",
max_memory_restart: "100M",
time: false,
error_file: "./storage/logs/queue/error.log",
out_file: "./storage/logs/queue/log.log"
}
]
}

Events

Formidable Queues come with handy events that you can tap into during the lifecycle of your jobs.

onReady

The onReady event gets called when the queue worker is ready:

import { Queue } from '@formidablejs/queues'

Queue.onReady((queueName: string) => {
// do something
})

onError

The onError event, gets called when a job fails:

import { Queue } from '@formidablejs/queues'

Queue.onReady((queueName: string, job, error: Error) => {
// do something
})

onLog

The onLog event gets called when the queue logs the current state of jobs:

import { Queue } from '@formidablejs/queues'

Queue.onLog((queueName: string, job, type: string) => {
// do something
})