Laravel - Digging Deeper - Queues (官方文件原子化翻譯筆記)

# Introduction

學習一個框架, Ray 的想法是, 在深入理解底層實作的原理之前, 應該先知道這個框架的 使用方法; 先學習怎麼使用這個前人造的輪子, 再學習怎麼樣一個輪子。
所以本篇文章重點在於細讀官方文件, 並將內容理解後以 Q&A 的方式記錄下來, 加速學習以及查詢。



# Connection Vs. Queues

以下的 Laravel example code 的意思是?
  • Example:

    <?php
    use App\Jobs\ProcessPodcast;

    ProcessPodcast::dispatch();

    ProcessPodcast::dispatch()->onQueue('emails');
  • Answer:
    將 job 送到 connection 定義的 default queue
    將 job 送到 connection emails 定義的 default queue

以下的 Laravel example code 的意思是?
  • Example:

    php artisan queue:work --queue=high,default
  • Answer:
    啟動 queue daemon, 並排序 queue 的優先順序 high,default



# Driver Notes & Prerequisites

# Database

在 Laravel 中, 如果我使用的 driver 為 database, 該如何建立 queue table?
php artisan queue:table
php artisan migrate

# Redis

在 Laravel 的 QUEUE 當中, 如果我使用 redis 為 driver, 該如何設定 redis 的 database connection?

config/database.php 檔案中


# Redis Cluster

以下的位於 config/queue.php Laravel example code 的意思是?
  • Example:

    'redis' => [
    'driver' => 'redis',
    'connection' => 'default',
    'queue' => '{default}',
    'retry_after' => 90,
    ],
  • Answer:
    定義 redis cluster, 因此 queue 需 enclosed { }


# Blocking

以下的位於 config/queue.php Laravel example code 的意思是?
  • Example:

    'redis' => [
    'driver' => 'redis',
    'connection' => 'default',
    'queue' => 'default',
    'retry_after' => 90,
    'block_for' => 5,
    ],
  • Answer:
    retry_after 表示每個 job 最多維持 reserved state 90 秒, 以避免 worker crash 之後一個 job 永遠不被執行
    block_for 表示 worker 在進入下一輪的循環之前, 會先 block 5 秒看有沒有新的 job 進來, 可參考 Mohamed Said 的 video



# Creating Jobs

# Generating Job Classes

Laravel 中, 一般 job 都會放在哪?

app/Jobs

Laravel 中, 如何建立一個 Job?
php artisan make:job jobName
在 Laravel 中, 當我們注入一個 Model 到 Job, 是會整個 Model 都被注入, 還是只會注入該 Model 的一個辨識物?

Only an identifier will be stored in the job

在 Laravel 中, 當我在 job 中注入一個 model, 當這個 job 被在 queue 中被取出執行時, 請入該 model 會是我當初注入時的狀態, 還是目前資料庫中最新的狀態?

資料庫中的狀態

# Handling Relationships

以下位於 job 的 Laravel example code 的意思是?
  • Example:

    <?php
    public function __construct(Podcast $podcast)
    {
    $this->podcast = $podcast->withoutRelations();
    }
  • Answer:
    當注入 model 到 job 時, 預設會載入 relations, 可指定不載入



# Job Middleware

在 Laravel Queue 中, 可以建立一個 Job middleware 嗎?

可以

在 Laravel Queue 中, 如果我想要建立一個 Job Middleware, 可以建立在什麼位置?

什麼位置都可以, 官方範例位置為 app/Jobs/Middleware

以下的 Laravel example code 的意思是?
  • Example:

    <?php

    namespace App\Jobs\Middleware;

    use Illuminate\Support\Facades\Redis;

    class RateLimited
    {
    public function handle($job, $next)
    {
    Redis::throttle('key')
    ->block(0)->allow(1)->every(5)
    ->then(function () use ($job, $next) {

    $next($job);
    }, function () use ($job) {

    $job->release(5);
    });
    }
    }

    // in job class
    public function middleware()
    {
    return [new RateLimited];
    }
  • Answer:
    建立一個 job middleware, 利用 redis throttle 限制 job 每 5 秒可以執行 1 次, 並在 job 中使用該 middleware



# Dispatching Jobs

以下的 Laravel example code 的意思是?
  • Example:

    <?php

    namespace App\Http\Controllers;

    use App\Http\Controllers\Controller;
    use App\Jobs\ProcessPodcast;
    use Illuminate\Http\Request;

    class PodcastController extends Controller
    {
    public function store(Request $request)
    {
    // Create podcast...

    ProcessPodcast::dispatch($podcast);
    }
    }
  • Answer:
    dispatch 一個 job



# Delayed Dispatching

以下的 Laravel example code 的意思是?
  • Example:

    <?php

    namespace App\Http\Controllers;

    use App\Http\Controllers\Controller;
    use App\Jobs\ProcessPodcast;
    use Illuminate\Http\Request;

    class PodcastController extends Controller
    {
    public function store(Request $request)
    {
    // Create podcast...

    ProcessPodcast::dispatch($podcast)
    ->delay(now()->addMinutes(10));
    }
    }
  • Answer:
    delay dispatch 該 job 的時間, 10 分鐘後在 dispatch

在 Laravel Queue 中, AWS 的 SQS 最多支援到 delay 多久?

15 分鐘



# Synchronous Dispatching

以下的 Laravel example code 的意思是?
  • Example:

    <?php

    namespace App\Http\Controllers;

    use App\Http\Controllers\Controller;
    use App\Jobs\ProcessPodcast;
    use Illuminate\Http\Request;

    class PodcastController extends Controller
    {
    public function store(Request $request)
    {
    // Create podcast...

    ProcessPodcast::dispatchNow($podcast);
    }
    }
  • Answer:
    立即 dispatch 這個 job, 不使用 queue



# Job Chaining

在 Laravel Job chaining 中, 如果其中一個 job 失敗了, 後面的還會繼續執行嗎?

不會

在 Laravel Job chaining 中, 如果我使用 $this->delete(), 會終止後面的 job 執行嗎?

不會

以下的 Laravel example code 的意思是?
  • Example:

    <?php
    ProcessPodcast::withChain([
    new OptimizePodcast,
    new ReleasePodcast
    ])->dispatch();
  • Answer:
    使用 withChain() 實現 job chaining, job 會按照順序執行


# Chain Connection & Queue

以下的 Laravel example code 的意思是?
  • Example:

    <?php
    ProcessPodcast::withChain([
    new OptimizePodcast,
    new ReleasePodcast
    ])->dispatch()->allOnConnection('redis')->allOnQueue('podcasts');
  • Answer:
    使用 withChain() 實現 job chaining, 並指定該 chaining 的 connection 以及 queue



# Customizing The Queue & Connection

# Dispatching To A Particular Queue

以下的 Laravel example code 的意思是?
  • Example:

    <?php

    namespace App\Http\Controllers;

    use App\Http\Controllers\Controller;
    use App\Jobs\ProcessPodcast;
    use Illuminate\Http\Request;

    class PodcastController extends Controller
    {
    public function store(Request $request)
    {
    // Create podcast...

    ProcessPodcast::dispatch($podcast)->onQueue('processing');
    }
    }
  • Answer:
    使用 onQueue() 來指定 queue


# Dispatching To A Particular Connection

以下的 Laravel example code 的意思是?
  • Example:

    <?Php

    namespace App\Http\Controllers;

    use App\Http\Controllers\Controller;
    use App\Jobs\ProcessPodcast;
    use Illuminate\Http\Request;

    class PodcastController extends Controller
    {
    public function store(Request $request)
    {
    // Create podcast...

    ProcessPodcast::dispatch($podcast)->onConnection('sqs');
    }
    }
  • Answer:
    如果我使用不只一個 connection, 可使用 onConnection() 指定特定的 connection

以下的 Laravel example code 的意思是?
  • Example:

    <?php

    namespace App\Jobs;

    class ProcessPodcast implements ShouldQueue
    {
    public $connection = 'sqs';
    }
  • Answer:
    在 job class 中, 可使用 $connection property 指定該 job 的 connection

以下的 Laravel example code 的意思是?
  • Example:

    <?php
    ProcessPodcast::dispatch($podcast)
    ->onConnection('sqs')
    ->onQueue('processing');
  • Answer:
    dispatch 時指定該 job 的 connection 以及 queue



# Specifying Max Job Attempts / Timeout Values

# Max Attempts

以下的 Laravel example code 的意思是?
  • Example:

    php artisan queue:work --tries=3
  • Answer:
    指定 worker 的嘗試次數, 會嘗試執行 job 最多 3 次, 第一次也算一次, release() 也算一次, 如果 3 次都失敗, 才會把它放到 failed_job table 內

    以下的 Laravel example code 的意思是?
  • Example:

    <?php

    namespace App\Jobs;

    class ProcessPodcast implements ShouldQueue
    {
    public $tries = 5;
    }
  • Answer:
    指定該 job 的最大嘗試次數, 如果 5 次都執行失敗, release() 也算 1 次, 第一次也算, 那 worker 就會將這個 job 放到 failed_job table 中, 不再嘗試執行

    Laravel job 當中, 當我同時在 job class 中, 以及 CLI 中都指令最大嘗試次數, Laravel 會以誰為準?

    job class 中指定的 property


# Time Based Attempts

以下的 Laravel example code 的意思是?
  • Example:

    <?php
    public function retryUntil()
    {
    return now()->addSeconds(5);
    }
  • Answer:
    為這個 job 定義一個可被執行的時間限制, 超過 5 秒後, 這個 job 就會被放到 failed_table 中


# Timeout

以下的 Laravel example code 的意思是?
  • Example:

    php artisan queue:work --timeout=30
  • Answer:
    該 worker 最多執行一個 job 30 秒, 若超過 30 秒視為 timeout

以下的 Laravel example code 的意思是?
  • Example:

    <?php

    namespace App\Jobs;

    class ProcessPodcast implements ShouldQueue
    {
    public $timeout = 120;
    }
  • Answer:
    指定 timeout, 為該 job 可執行的最大秒數, 通過即視為失敗, 失敗超過 tries property 限制即視為 failed

Laravel job 當中, 如果我再 CLI 以及 job 的 class 同時定義的 timeout, 哪一個的優先權比較大?

job class



# Rate Limiting

Laravel queue 當中, 如果我想使用 rate limiting, 哪一個服務會是必要的?

redis

以下的 Laravel example code 的意思是?
  • Example:

    <?php
    Redis::throttle('key')->allow(10)->every(60)->then(function () {
    // Job logic...
    }, function () {
    // Could not obtain lock...

    return $this->release(10);
    });
  • Answer:
    定義特定的 key, 可以是 job 的類型 + 該 Eloquent Model 的 id
    每 60 秒最大可執行 10 次, 若失敗的話, 10 秒後 release job

下面的 Laravel 程式碼中, 當我 release 該 job, 這樣還會增加該 job 的 attempt 次數嗎?
  • Example:

    <?php
    Redis::throttle('key')->allow(10)->every(60)->then(function () {
    // Job logic...
    }, function () {
    // Could not obtain lock...

    return $this->release(10);
    });
  • Answer:
    會哦

以下的 Laravel example code 的意思是?
  • Example:

    <?php
    Redis::funnel('yourKey')->limit(1)->then(function () {
    // Job logic...
    }, function () {
    // Could not obtain lock...

    return $this->release(10);
    });
  • Answer:
    使用 redis 記錄一個 key, 並給予這個 key 一個 job, 限制有著該 key 的 job 每次只能由一個 worker 執行



# Queueing Closures

在 Laravel 中, 什麼情況我們可能會需要 dispatch 一個 closure?

當我們想要執行一個簡單的任務, 且需要它被在請求之外的週期執行

在 Laravel 中, 如何 dispatch 一個 closure?
<?php
$podcast = App\Podcast::find(1);

dispatch(function () use ($podcast) {
$podcast->publish();
});


# Running The Queue Worker

在 Laravel 中, 怎麼樣啟動 worker 來執行 queued job?
php artisan queue:work
在 Laravel 中, 要維持 worker 持續運行, 我們需要額外啟動什麼?

程序管理器, 像是 supervisor, 或 pm2

在 Laravel 中, 當我使用 php artisan queue:work, 當我代碼有變更時, 我需要重新啟動 queue 嗎?

需要

在 Laravel 中, 如果我運行 php artisan queue:listen, 然後我變更了代碼, 我需要再重新啟動 queue 嗎?

不需要

在 Laravel 中, 如果我不想要每次變更代碼就重新啟動 queue, 我可以使用哪一個 CLI?
php artisan queue:listen
在 Laravel 中, php artisan queue:workphp artisan queue:listen 何者較有效率?
php artisan queue:work

# Specifying The Connection & Queue

在 Laravel 中, 如何透過 CLI 指定 queue connection?
php artisan queue:work redis
在 Laravel 中, 如何透過 CLI 給特定的 queue 指定特定 connection?
php artisan queue:work redis --queue=emails

# Processing A Single Job

在 Laravel 中, 如何透過 CLI 指定 worker 只執行 queued job 一次
php artisan queue:work --once

# Processing All Queued Jobs & Then Exiting

以下的 Laravel example command 的意思是?
  • Example:

    php artisan queue:work --stop-when-empty
  • Answer:
    該 worker 執行完所有的 queue job 之後就會關閉

以下的 Laravel 指令什麼時候可能會用到?
  • Example:

    php artisan queue:work --stop-when-empty
  • Answer:
    當我們利用容器運行 worker, 而我們需要工作都完成後自動關閉 container 時


# Resource Considerations

在 Laravel 的 queue 中, 因為會緩存在 RAM 中運行, 所以如果我有執行到未釋放的資源, 像是 image 之類的, 那我是否要在 job 執行完成之後釋放掉這些資源?

要哦



# Queue Priorities

以下的 Laravel example code 的意思是?
  • Example:

    php artisan queue:work --queue=high,low
  • Answer:
    定義 queue 的 priority 順序



# Queue Workers & Deployment

# Job Expiration

在 Laravel 的 queue 監聽程序中, 我要如何重啟 queue?
php artisan queue:restart
在 Laravel 的 queue 監聽 CLI php artisan queue:restart, 會不會讓我丟失執行到一半的工作?

不會哦, 它會 gracefully restart, 完成目前手邊工作之後再重啟

Laravel 中, 在沒有使用 supervisor 的情況下, 如果我執行 php artisan queue:restart, 會發生什麼事?

worker 會關閉

Laravel 中, 若要使用 php artisan queue:restart, 務必先要確認什麼服務已經設置好?

cache

Laravel 中, 當我使用 php artisan queue:restart, queue 會將 restart 的 signal 存在什麼地方?

cache

Laravel queue 中, 如果我要 globally 定義 job 在執行之後多久時間之後才可以再被執行 (如果沒被刪除的話), 我可以怎麼做?

queue.php 的 config 檔案中, 可以設定 retry_after 的參數

Laravel queue 中, 哪一個服務無法在 queue.php config 中設定 retry_after 參數

Amazon SQS


# Worker Timeouts

Laravel queue 中, --timeoutretry_after 的差別在哪?

--timeout: queue 的 master 程序需等待多久的時間才可以殺掉一個執行同樣一個 job 的子程序
retry_after: queue 的 master 程序需要隔多久才可以重啟一個子程序來執行之前執行過的 job

Laravel queue 中, --timeoutretry_after 的時間, 哪個長哪個短?

--retry_after 需要比 --timeout 來得長

Laravel queue 中, 如果 retry_after--timeout 設定的時間還短, 可能會發生什麼事?

同一個 job 會被執行兩次


# Worker Sleep Duration

Laravel queue 中, 如果我要讓 worker 沒有新的 job 時會進入休眠狀態, 那我可以怎麼做?
php artisan queue:work --sleep=3
Laravel queue 中, 如果我有使用 --sleep 的 option, 當 queue 裡頭有很多 job 還沒有執行完畢時, worker 會進入 sleep 狀態嗎?

不會, 會等到所有 job 都處理完畢, 沒有新的 job 了 worker 才會進入 sleep 狀態

Laravel queue 中, 如果我有使用 --sleep 的 option, 當 worker 處於 sleep 的狀態時, 如果有新的 job 進來, worker 會等到 sleep 結束再處理, 還是會立即處理?

待 sleep 結束才會處理



# Supervisor Configuration

# Installing Supervisor

如何安裝 supervisor ?
sudo apt-get install supervisor
Supervisor 的設定檔位置在哪?

/etc/supervisor/conf.d

以下的 supervisor config example 的意思是?
  • Example:

    [program:laravel-worker]
    process_name=%(program_name)s_%(process_num)02d
    command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3
    autostart=true
    autorestart=true
    user=forge
    numprocs=8
    redirect_stderr=true
    stdout_logfile=/home/forge/app.com/worker.log
    stopwaitsecs=3600
  • Answer:

    [program:laravel-worker]
    process_name=%(program_name)s_%(process_num)02d
    command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3
    // 是否隨 supervisor 啟動一起啟動
    autostart=true
    // 程序異常退出後自動重啟
    autorestart=true
    user=forge
    numprocs=8
    // 是否教錯誤的 log 一併記錄到 stdout
    redirect_stderr=true
    // 一般輸出的記錄檔案位址
    stdout_logfile=/home/forge/app.com/worker.log
    // 刪除一個 job 的等待時間, 這邊需設定大於會執行最久的 job 的時間, 不然會在執行完畢前被砍掉
    stopwaitsecs=3600

# Starting Supervisor

以下的 supervisor example command 的意思是?
  • Example:

    sudo supervisorctl reread
  • Answer:
    重新讀取 config 檔, 但不重啟 process

以下的 supervisor example command 的意思是?
  • Example:

    sudo supervisorctl update
  • Answer:
    重新讀取 config 檔, 並依照 config 檔案重啟 process

以下的 supervisor example command 的意思是?
  • Example:

    sudo supervisorctl start laravel-worker:*
  • Answer:
    啟動 supervisor

Supervisor 官方文件位址?

Supervisor 官方文件



# Dealing With Failed Jobs

Laravel queue 中, 當一個 job 一直執行失敗, 超出了指定的 tries 的限制, 那這個 job 會跑到哪裡去?

failed_jobs

Laravel queue 中, 如何建立 failed_jobs table?
php artisan queue:failed-table
php artisan migrate
以下的 Laravel example code 的意思是?
  • Example:

    php artisan queue:work redis --tries=3 --delay=3
  • Answer:
    如果失敗, 最多嘗試 3 次, 超過 3 次的會被歸類到 fail job 中, 每次嘗試的間隔最少 3 秒

以下位於 job 的 Laravel example code 的意思是?
  • Example:

    <?php
    public $retryAfter = 3;
    public $backOff = 3;
  • Answer:
    定義該 job 如果執行失敗了, 或被 release 回 queue 中, 要間隔多久時間才會再被 worker pick up 執行。 有時失敗是因為第三方服務短時間內故障, 若不定義間隔時間, 很可能會在短時間內將 tries 的次數全部耗光, 如此便失去了給第三方服務時間恢復正常的空間
    $backOff 為 Laravel 8 的用法



# Cleaning Up After Failed Jobs

以下的 Laravel example code 的意思是?
  • Example:

    <?php

    namespace App\Jobs;

    class ProcessPodcast implements ShouldQueue
    {
    use InteractsWithQueue, Queueable, SerializesModels;

    protected $podcast;

    public function __construct(Podcast $podcast)
    {
    $this->podcast = $podcast;
    }

    public function handle(AudioProcessor $processor)
    {
    // Process uploaded podcast...
    }

    public function failed(Exception $exception)
    {
    // Send user notification of failure, etc...
    }
    }
  • Answer:
    handle() 內定義邏輯, failed() 內定義當 job failed 之後要做的事



# Failed Job Events

以下的 Laravel example code 的意思是?
  • Example:

    <?php

    namespace App\Providers;

    use Illuminate\Support\Facades\Queue;
    use Illuminate\Support\ServiceProvider;
    use Illuminate\Queue\Events\JobFailed;

    class AppServiceProvider extends ServiceProvider
    {
    public function register()
    {
    //
    }

    public function boot()
    {
    Queue::failing(function (JobFailed $event) {
    // $event->connectionName
    // $event->job
    // $event->exception
    });
    }
    }
  • Answer:
    在 AppServiceProvider 的 boot method 中, 使用 Queue class 的 failing method, 定義了當 job 失敗時, 就會觸發這個事件, 並執行 closure 內的邏輯



# Retrying Failed Jobs

以下的 Laravel example code 的意思是?
  • Example:

    php artisan queue:failed
  • Answer:
    從 failed_jobs table 中檢視所有的 failed jobs

以下的 Laravel example code 的意思是?
  • Example:

    php artisan queue:failed
  • Answer:
    取得 failed job id

以下的 Laravel example code 的意思是?
  • Example:

    php artisan queue:retry 5
  • Answer:
    retry 特定 failed job, 5 為 failed job ID

以下的 Laravel example code 的意思是?
  • Example:

    php artisan queue:retry all
  • Answer:
    retry 所有的 failed jobs

以下的 Laravel example code 的意思是?
  • Example:

    php artisan queue:forget 5
  • Answer:
    刪除一個 failed job
    5 為 failed job ID

以下的 Laravel example code 的意思是?
  • Example:

    php artisan queue:flush
  • Answer:
    刪除所有的 failed jobs



# Ignoring Missing Models

Laravel Queue 中, 我們注入一個 Eloquent model 到一個 job 當中, 而當執行該 job 時, 該 model 已遭刪除, 這時會發生什麼事?

這個 job 會失敗, ModelNotFoundException

以下的 Laravel example code 的意思是?
  • Example:

    <?php
    public $deleteWhenMissingModels = true;
  • Answer:
    當注入一個 Eloquent model 到一個 job 當中, 而當執行該 job 時, 該 model 已遭刪除, 這時會報錯, ModelNotFoundException, 可以使用 job class 中的 $deleteWhenMissingModel property, 當出現此狀況時會自動餐除該 job



# Job Events

以下的 Laravel example code 的意思是?
  • Example:

    <?php

    namespace App\Providers;

    use Illuminate\Support\Facades\Queue;
    use Illuminate\Support\ServiceProvider;
    use Illuminate\Queue\Events\JobProcessed;
    use Illuminate\Queue\Events\JobProcessing;

    class AppServiceProvider extends ServiceProvider
    {
    public function register()
    {
    //
    }

    public function boot()
    {
    Queue::before(function (JobProcessing $event) {
    // $event->connectionName
    // $event->job
    // $event->job->payload()
    });

    Queue::after(function (JobProcessed $event) {
    // $event->connectionName
    // $event->job
    // $event->job->payload()
    });
    }
    }
  • Answer:
    如果想在 job 開始或結束後做一些事, 可在 AppServiceProvider 的 boot() 中註冊 before() 以及 after() event, 並在 closure 內做事
    AppServiceProvider

Laravel Queue 中, 以下的程式碼通常可以應用在什麼情況?
  • Example:

    <?php
  • Answer:

    <?php

    namespace App\Providers;

    use Illuminate\Support\Facades\Queue;
    use Illuminate\Support\ServiceProvider;
    use Illuminate\Queue\Events\JobProcessed;
    use Illuminate\Queue\Events\JobProcessing;

    class AppServiceProvider extends ServiceProvider
    {
    public function register()
    {
    //
    }

    public function boot()
    {
    Queue::before(function (JobProcessing $event) {
    // $event->connectionName
    // $event->job
    // $event->job->payload()
    });

    Queue::after(function (JobProcessed $event) {
    // $event->connectionName
    // $event->job
    // $event->job->payload()
    });
    }
    }
  • Output:
    記 log
    統計特定 job 運行次數

以下的 Laravel example code 的意思是?
  • Example:

    <?php
    Queue::looping(function () {
    while (DB::transactionLevel() > 0) {
    DB::rollBack();
    }
    });
  • Answer:
    在 AppServiceProvider boot() 中可以註冊 looping(), 在 worker fetch job 之前都會執行這個 closure, 將 active 的 transaction rollback

GKE Migrating to Containers Docker 實戰入門

留言

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×