Laravel - Digging Deeper - Queues

# Introduction

學習一個框架, Ray 的想法是, 在深入理解底層實作的原理之前, 應該先知道這個框架的 使用方法; 先學習怎麼使用這個前人造的輪子, 再學習怎麼樣一個輪子。
所以本篇文章重點在於細讀官方文件, 並將內容理解後以 Q&A 的方式記錄下來, 加速學習以及查詢。




# Connection Vs. Queues

在以下的 Laravel Queue 的程式碼, job 會被送往哪裡?

會被送往預設的 queue

<?php
Job::dispatch();

在以下的 Laravel Queue 的程式碼, job 會被送往哪裡?

會被送往自定義的 emails queue

<?php
Job::dispatch()->onQueue('emails');

以下的 Laravel example code 的意思是?
  • Example:

    php artisan queue:work --queue=high,default
  • Answer:
    啟動 queue daemon, 並排序 queue 的優先順序 high,default




# Driver Notes & Prerequisites

# Database

在 Laravel 中, 如果我使用的 driver 為 database, 該如何建立 queue table?
php artisan queue:table
php artisan migrate


# Redis

在 Laravel 的 QUEUE 當中, 如果我使用 redis 為 driver, 該如何設定 redis 的 database connection?

config/database.php 檔案中


# Redis Cluster

如果是使用 Redis Cluster, queue 的名稱需要包含什麼?
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => '{default}',
'retry_after' => 90,
],


# Blocking

在 Laravel Queue 當中, 如果我使用 redis, 且我想要讓 redis 在進去下一輪的 polling 之前等待數秒, 看有沒有新的 job 進來, 我可以怎麼做?
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => 'default',
'retry_after' => 90,
'block_for' => 5,
],




# Creating Jobs

# Generating Job Classes

Laravel 中, 一般 job 都會放在哪?

app/Jobs

Laravel 中, 如何建立一個 Job?
php artisan make:job jobName
在 Laravel 中, 當我們注入一個 Model 到 Job, 是會整個 Model 都被注入, 還是只會注入該 Model 的一個辨識物?

Only an identifier will be stored in the job

在 Laravel 中, 當我在 job 中注入一個 model, 當這個 job 被在 queue 中被取出執行時, 請入該 model 會是我當初注入時的狀態, 還是目前資料庫中最新的狀態?

資料庫中的狀態

# Handling Relationships

Laravel queue 中, 當我注入 model 到 job 中時, 如果我不想要讓一個 model 的 relation 被載入, 我可以怎麼做?
<?php
/**
* Create a new job instance.
*
* @param \App\Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast->withoutRelations();
}




# Job Middleware

在 Laravel Queue 中, 可以建立一個 Job middleware 嗎?

可以

在 Laravel Queue 中, 如果我想要建立一個 Job Middleware, 可以建立在什麼位置?

什麼位置都可以, 官方範例位置為 app/Jobs/Middleware

在 Laravel Queue 中, 如果我要建立一個 Job Middleware, 如何建立?
<?php

namespace App\Jobs\Middleware;

use Illuminate\Support\Facades\Redis;

class RateLimited
{
/**
* Process the queued job.
*
* @param mixed $job
* @param callable $next
* @return mixed
*/
public function handle($job, $next)
{
Redis::throttle('key')
->block(0)->allow(1)->every(5)
->then(function () use ($job, $next) {
// Lock obtained...

$next($job);
}, function () use ($job) {
// Could not obtain lock...

$job->release(5);
});
}
}
在 Laravel Queue 中, 如果我要在一個 Job 中使用建立好的 Job Middleware, 我可以怎麼做?
<?php
use App\Jobs\Middleware\RateLimited;

/**
* Get the middleware the job should pass through.
*
* @return array
*/
public function middleware()
{
return [new RateLimited];
}






# Dispatching Jobs

在 Laravel Queue 中, 該如何 dispatch 一個 job?
<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
/**
* Store a new podcast.
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// Create podcast...

ProcessPodcast::dispatch($podcast);
}
}




# Delayed Dispatching

在 Laravel Queue 中, 如果我想要 delay 一個 dispatch, 我可以怎麼做?
<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
/**
* Store a new podcast.
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// Create podcast...

ProcessPodcast::dispatch($podcast)
->delay(now()->addMinutes(10));
}
}
在 Laravel Queue 中, AWS 的 SQS 最多支援到 delay 多久?

15 分鐘




# Synchronous Dispatching

在 Laravel Queue 中, 如果我立即 dispatch 一個 job, 不要 queue, 那我可以怎麼做?
<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
/**
* Store a new podcast.
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// Create podcast...

ProcessPodcast::dispatchNow($podcast);
}
}




# Job Chaining

在 Laravel Job chaining 中, 如果其中一個 job 失敗了, 後面的還會繼續執行嗎?

不會

在 Laravel Job chaining 中, 如果我使用 $this->delete(), 會終止後面的 job 執行嗎?

不會

在 Laravel, 如何實作一個 job chaining?
<?php
ProcessPodcast::withChain([
new OptimizePodcast,
new ReleasePodcast
])->dispatch();


# Chain Connection & Queue

在 Laravel, 當我使用 job chaining 時, 如何指定這個 job chaining 使用的 connection 以及 queue?
<?php
ProcessPodcast::withChain([
new OptimizePodcast,
new ReleasePodcast
])->dispatch()->allOnConnection('redis')->allOnQueue('podcasts');




# Customizing The Queue & Connection

# Dispatching To A Particular Queue

在 Laravel job 當中, 如何指定特定的 queue?
<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
/**
* Store a new podcast.
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// Create podcast...

ProcessPodcast::dispatch($podcast)->onQueue('processing');
}
}


# Dispatching To A Particular Connection

在 Laravel job 當中, 如果我使用不只一個 connection, 如何指定特定的 connection?
<?Php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
/**
* Store a new podcast.
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// Create podcast...

ProcessPodcast::dispatch($podcast)->onConnection('sqs');
}
}
Laravel job 當中, 如果我想要從該 job 的 class 中指定該 job 使用的 connection, 我可以怎麼做?
<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
/**
* The queue connection that should handle the job.
*
* @var string
*/
public $connection = 'sqs';
}
Laravel job 當中, 如果我想要同時指定該 job 特定的 connection 以及 queue 時, 我可以怎麼做?
<?php
ProcessPodcast::dispatch($podcast)
->onConnection('sqs')
->onQueue('processing');




# Specifying Max Job Attempts / Timeout Values

# Max Attempts

Laravel job 當中, 如何透過 CLI 指定該 job 的最大嘗試次數?
php artisan queue:work --tries=3
Laravel job 當中, 如何在該 job class 中指定該 job 的最大嘗試次數?
<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
/**
* The number of times the job may be attempted.
*
* @var int
*/
public $tries = 5;
}
Laravel job 當中, 當我同時在 job class 中, 以及 CLI 中都指令最大嘗試次數, Laravel 會以誰為準?

job class 中指定的 property


# Time Based Attempts

Laravel job 當中, 如果我想要指定一段時間長度, Laravel 可在這段時間內嘗試執行這個 job, 但超過這段時間就不會再嘗試了, 那我該怎麼做?
<?php
/**
* Determine the time at which the job should timeout.
*
* @return \DateTime
*/
public function retryUntil()
{
return now()->addSeconds(5);
}


# Timeout

Laravel job 當中, 如果我要指定任務可以執行的最大秒數, 通過 CLI, 我可以怎麼做?
php artisan queue:work --timeout=30
Laravel job 當中, 如果我要指定任務可以執行的最大秒數, 在 job class 中, 我可以怎麼做?
<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
/**
* The number of seconds the job can run before timing out.
*
* @var int
*/
public $timeout = 120;
}
Laravel job 當中, 如果我再 CLI 以及 job 的 class 同時定義的 timeout, 哪一個的優先權比較大?

job class




# Rate Limiting

Laravel queue 當中, 如果我想使用 rate limiting, 哪一個服務會是必要的?

redis

下面的 Laravel 程式碼代表什麼意思?

每 60 秒最大可執行 10 次, 若失敗的話, 10 秒後 release job

<?php
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// Job logic...
}, function () {
// Could not obtain lock...

return $this->release(10);
});

下面的 Laravel 程式碼中的 key, 代表什麼意思?

可以是任意 string, 官方範例中, 可以是 job 的類型 + 該 Eloquent Model 的 id

<?php
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// Job logic...
}, function () {
// Could not obtain lock...

return $this->release(10);
});

下面的 Laravel 程式碼中, 當我 release 該 job, 這樣還會增加該 job 的 attempt 次數嗎?

會哦

<?php
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// Job logic...
}, function () {
// Could not obtain lock...

return $this->release(10);
});

以下的 Laravel example code 的意思是?
  • Example:

    <?php
    Redis::funnel('yourKey')->limit(1)->then(function () {
    // Job logic...
    }, function () {
    // Could not obtain lock...

    return $this->release(10);
    });
  • Answer:
    使用 redis 記錄一個 key, 並給予這個 key 一個 job, 限制有著該 key 的 job 每次只能由一個 worker 執行




# Queueing Closures

在 Laravel 中, 什麼情況我們可能會需要 dispatch 一個 closure?

當我們想要執行一個簡單的任務, 且需要它被在請求之外的週期執行

在 Laravel 中, 如何 dispatch 一個 closure?
<?php
$podcast = App\Podcast::find(1);

dispatch(function () use ($podcast) {
$podcast->publish();
});




# Running The Queue Worker

在 Laravel 中, 怎麼樣啟動 worker 來執行 queued job?
php artisan queue:work
在 Laravel 中, 要維持 worker 持續運行, 我們需要額外啟動什麼?

程序管理器, 像是 supervisor, 或 pm2

在 Laravel 中, 當我使用 php artisan queue:work, 當我代碼有變更時, 我需要重新啟動 queue 嗎?

需要

在 Laravel 中, 如果我運行 php artisan queue:listen, 然後我變更了代碼, 我需要再重新啟動 queue 嗎?

不需要

在 Laravel 中, 如果我不想要每次變更代碼就重新啟動 queue, 我可以使用哪一個 CLI?
php artisan queue:listen
在 Laravel 中, php artisan queue:workphp artisan queue:listen 何者較有效率?
php artisan queue:work


# Specifying The Connection & Queue

在 Laravel 中, 如何透過 CLI 指定 queue connection?
php artisan queue:work redis
在 Laravel 中, 如何透過 CLI 給特定的 queue 指定特定 connection?
php artisan queue:work redis --queue=emails


# Processing A Single Job

在 Laravel 中, 如何透過 CLI 指定 worker 只執行 queued job 一次
php artisan queue:work --once


# Processing All Queued Jobs & Then Exiting

在 Laravel 中, 如果我想要讓 queue worker 執行完所有 job 之後就關閉, 我可以怎麼做?
php artisan queue:work --stop-when-empty
以下的 Laravel 指令什麼時候可能會用到?

當我們利用容器運行 worker, 而我們需要工作都完成後自動關閉 container 時

php artisan queue:work --stop-when-empty


# Resource Considerations

在 Laravel 的 queue 中, 因為會緩存在 RAM 中運行, 所以如果我有執行到未釋放的資源, 像是 image 之類的, 那我是否要在 job 執行完成之後釋放掉這些資源?

要哦




# Queue Priorities

在 Laravel 的 queue 中, 我要如何排列 queue 的優先順序?

可以使用逗號做分隔排序

php artisan queue:work --queue=high,low




# Queue Workers & Deployment

# Job Expiration

在 Laravel 的 queue 監聽程序中, 我要如何重啟 queue?
php artisan queue:restart
在 Laravel 的 queue 監聽 CLI php artisan queue:restart, 會不會讓我丟失執行到一半的工作?

不會哦, 它會 gracefully restart, 完成目前手邊工作之後再重啟

Laravel 中, 在沒有使用 supervisor 的情況下, 如果我執行 php artisan queue:restart, 會發生什麼事?

worker 會關閉

Laravel 中, 若要使用 php artisan queue:restart, 務必先要確認什麼服務已經設置好?

cache

Laravel 中, 當我使用 php artisan queue:restart, queue 會將 restart 的 signal 存在什麼地方?

cache

Laravel queue 中, 如果我要指定一個 job 在執行之後多久時間之後才可以再被執行 (如果沒被刪除的話), 我可以怎麼做?

queue.php 的 config 檔案中, 可以設定 retry_after 的參數

Laravel queue 中, 哪一個服務無法在 queue.php config 中設定 retry_after 參數

Amazon SQS


# Worker Timeouts

Laravel queue 中, --timeoutretry_after 的差別在哪?

--timeout: queue 的 master 程序需等待多久的時間才可以殺掉一個執行同樣一個 job 的子程序
retry_after: queue 的 master 程序需要隔多久才可以重啟一個子程序來執行之前執行過的 job

Laravel queue 中, --timeoutretry_after 的時間, 哪個長哪個短?

--retry_after 需要比 --timeout 來得長

Laravel queue 中, 如果 retry_after--timeout 設定的時間還短, 可能會發生什麼事?

同一個 job 會被執行兩次


# Worker Sleep Duration

Laravel queue 中, 如果我要讓 worker 沒有新的 job 時會進入休眠狀態, 那我可以怎麼做?
php artisan queue:work --sleep=3
Laravel queue 中, 如果我有使用 --sleep 的 option, 當 queue 裡頭有很多 job 還沒有執行完畢時, worker 會進入 sleep 狀態嗎?

不會, 會等到所有 job 都處理完畢, 沒有新的 job 了 worker 才會進入 sleep 狀態

Laravel queue 中, 如果我有使用 --sleep 的 option, 當 worker 處於 sleep 的狀態時, 如果有新的 job 進來, worker 會等到 sleep 結束再處理, 還是會立即處理?

待 sleep 結束才會處理




# Supervisor Configuration

# Installing Supervisor

如何安裝 supervisor ?
sudo apt-get install supervisor
Supervisor 的設定檔位置在哪?

/etc/supervisor/conf.d

我該如何設定 Supervisor 設定檔?
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3
// 是否隨 supervisor 啟動一起啟動
autostart=true
// 程序異常退出後自動重啟
autorestart=true
user=forge
numprocs=8
// 是否教錯誤的 log 一併記錄到 stdout
redirect_stderr=true
// 一般輸出的記錄檔案位址
stdout_logfile=/home/forge/app.com/worker.log
// 刪除一個 job 的等待時間, 這邊需設定大於會執行最久的 job 的時間, 不然會在執行完畢前被砍掉
stopwaitsecs=3600


# Starting Supervisor

Supervisor 如何重新讀取 config 檔, 但不重啟 process?
sudo supervisorctl reread
Supervisor 如何重新讀取 config 檔, 並依照 config 檔案重啟 process?
sudo supervisorctl update
Supervisor 如何啟動?
sudo supervisorctl start laravel-worker:*
Supervisor 官方文件位址?

Supervisor 官方文件




# Dealing With Failed Jobs

Laravel queue 中, 當一個 job 一直執行失敗, 超出了指定的 tries 的限制, 那這個 job 會跑到哪裡去?

failed_jobs

Laravel queue 中, 如何建立 failed_jobs table?
php artisan queue:failed-table
php artisan migrate
以下的 Laravel example code 的意思是?
  • Example:

    php artisan queue:work redis --tries=3 --delay=3
  • Answer:
    如果失敗, 最多嘗試 3 次, 超過 3 次的會被歸類到 fail job 中, 每次嘗試的間隔最少 3 秒

Laravel queue 中, 如果我要使用 job class 指定 retry 一個 failed job 的間隔時間, 我可以怎麼做?
<?php
/**
* The number of seconds to wait before retrying the job.
*
* @var int
*/
public $retryAfter = 3;




# Cleaning Up After Failed Jobs

Laravel 中, 當我一個 job failed, 我想要在它 fail 的時候發通知, 我可以在哪一個地方寫這個邏輯?
<?php

namespace App\Jobs;

use App\AudioProcessor;
use App\Podcast;
use Exception;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ProcessPodcast implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;

protected $podcast;

/**
* Create a new job instance.
*
* @param Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}

/**
* Execute the job.
*
* @param AudioProcessor $processor
* @return void
*/
public function handle(AudioProcessor $processor)
{
// Process uploaded podcast...
}

/**
* The job failed to process.
*
* @param Exception $exception
* @return void
*/
public function failed(Exception $exception)
{
// Send user notification of failure, etc...
}
}




# Failed Job Events

以下的 Laravel example code 的意思是?
  • Example:

    <?php

    namespace App\Providers;

    use Illuminate\Support\Facades\Queue;
    use Illuminate\Support\ServiceProvider;
    use Illuminate\Queue\Events\JobFailed;

    class AppServiceProvider extends ServiceProvider
    {
    public function register()
    {
    //
    }

    public function boot()
    {
    Queue::failing(function (JobFailed $event) {
    // $event->connectionName
    // $event->job
    // $event->exception
    });
    }
    }
  • Answer:
    在 AppServiceProvider 的 boot method 中, 使用 Queue class 的 failing method, 定義了當 job 失敗時, 就會觸發這個事件, 並執行 closure 內的邏輯




# Retrying Failed Jobs

Laravel Queue 中, 如果我要從 failed_jobs table 中檢視所有 failed jobs, 我可以怎麼做?
php artisan queue:failed
Laravel Queue 中, 如果我要使用 CLI 獲取 failed jobs 的 ID, 我可以怎麼做?
php artisan queue:failed
Laravel Queue 中, 如果我要從 CLI retry 特定的 failed jobs, 我可以怎麼做?

5 為 failed job ID

php artisan queue:retry 5

Laravel Queue 中, 如果我要從 CLI retry 所有的 failed jobs, 我可以怎麼做?
php artisan queue:retry all
Laravel Queue 中, 如果我要從 CLI 刪除一個 failed job, 我可以怎麼做?

5 為 failed job ID

php artisan queue:forget 5

Laravel Queue 中, 如果我要從 CLI 刪除所有的 failed jobs, 我可以怎麼做?
php artisan queue:flush




# Ignoring Missing Models

Laravel Queue 中, 我們注入一個 Eloquent model 到一個 job 當中, 而當執行該 job 時, 該 model 已遭刪除, 這時會發生什麼事?

這個 job 會失敗, ModelNotFoundException

Laravel Queue 中, 我們注入一個 Eloquent model 到一個 job 當中, 而當執行該 job 時, 該 model 已遭刪除, 這時會報錯, ModelNotFoundException, 如何設定當遇到這種情況時, 自動刪除 job?

在 job class 當中

<?php
/**
* Delete the job if its models no longer exist.
*
* @var bool
*/
public $deleteWhenMissingModels = true;




# Job Events

Laravel Queue 中, 如果我想要在 job 事件執行前做一些事, 那麼我可以怎麼做?

AppServiceProvider

<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;

class AppServiceProvider extends ServiceProvider
{
/**
* Register any application services.
*
* @return void
*/
public function register()
{
//
}

/**
* Bootstrap any application services.
*
* @return void
*/
public function boot()
{
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});

Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
}
}

Laravel Queue 中, 如果我想要在 job 事件執行後做一些事, 那麼我可以怎麼做?

AppServiceProvider

<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;

class AppServiceProvider extends ServiceProvider
{
/**
* Register any application services.
*
* @return void
*/
public function register()
{
//
}

/**
* Bootstrap any application services.
*
* @return void
*/
public function boot()
{
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});

Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
}
}

Laravel Queue 中, 以下的程式碼通常可以應用在什麼情況?
<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;

class AppServiceProvider extends ServiceProvider
{
/**
* Register any application services.
*
* @return void
*/
public function register()
{
//
}

/**
* Bootstrap any application services.
*
* @return void
*/
public function boot()
{
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});

Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
}
}

記 log
統計特定 job 運行次數

Laravel Queue 中, 如果我想要設定一個 closure, 當我的 worker 要從 queue fetch 新的 job 回來執行之前, 執行這個 closure, 那我可以怎麼做?

AppServiceProvider 當中

<?php
Queue::looping(function () {
while (DB::transactionLevel() > 0) {
DB::rollBack();
}
});

GKE Migrating to Containers Docker 實戰入門

留言

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×