Laravel - Digging Deeper - Queues (官方文件原子化翻譯筆記)

# Introduction

學習一個框架, Ray 的想法是, 在深入理解底層實作的原理之前, 應該先知道這個框架的 使用方法; 先學習怎麼使用這個前人造的輪子, 再學習怎麼樣一個輪子。
所以本篇文章重點在於細讀官方文件, 並將內容理解後以 Q&A 的方式記錄下來, 加速學習以及查詢。



# Connection Vs. Queues

以下的 Laravel example code 的意思是?
  • Example:
    <?php
    use App\Jobs\ProcessPodcast;

    ProcessPodcast::dispatch();

    ProcessPodcast::dispatch()->onQueue('emails');
  • Answer:
    將 job 送到 connection 定義的 default queue
    將 job 送到 connection emails 定義的 default queue
以下的 Laravel example code 的意思是?
  • Example:
    php artisan queue:work --queue=high,default
  • Answer:
    啟動 queue daemon, 並排序 queue 的優先順序 high,default


# Driver Notes & Prerequisites

# Database

在 Laravel 中, 如果我使用的 driver 為 database, 該如何建立 queue table?
php artisan queue:table
php artisan migrate

# Redis

在 Laravel 的 QUEUE 當中, 如果我使用 redis 為 driver, 該如何設定 redis 的 database connection?

config/database.php 檔案中


# Redis Cluster

以下的位於 config/queue.php Laravel example code 的意思是?
  • Example:
    'redis' => [
    'driver' => 'redis',
    'connection' => 'default',
    'queue' => '{default}',
    'retry_after' => 90,
    ],
  • Answer:
    定義 redis cluster, 因此 queue 需 enclosed { }

# Blocking

以下的位於 config/queue.php Laravel example code 的意思是?
  • Example:
    'redis' => [
    'driver' => 'redis',
    'connection' => 'default',
    'queue' => 'default',
    'retry_after' => 90,
    'block_for' => 5,
    ],
  • Answer:
    retry_after 表示每個 job 最多維持 reserved state 90 秒, 以避免 worker crash 之後一個 job 永遠不被執行
    block_for 表示 worker 在進入下一輪的循環之前, 會先 block 5 秒看有沒有新的 job 進來, 可參考 Mohamed Said 的 video


# Creating Jobs

# Generating Job Classes

Laravel 中, 一般 job 都會放在哪?

app/Jobs

Laravel 中, 如何建立一個 Job?
php artisan make:job jobName
在 Laravel 中, 當我們注入一個 Model 到 Job, 是會整個 Model 都被注入, 還是只會注入該 Model 的一個辨識物?

Only an identifier will be stored in the job

在 Laravel 中, 當我在 job 中注入一個 model, 當這個 job 被在 queue 中被取出執行時, 請入該 model 會是我當初注入時的狀態, 還是目前資料庫中最新的狀態?

資料庫中的狀態

# Handling Relationships

以下位於 job 的 Laravel example code 的意思是?
  • Example:
    <?php
    public function __construct(Podcast $podcast)
    {
    $this->podcast = $podcast->withoutRelations();
    }
  • Answer:
    當注入 model 到 job 時, 預設會載入 relations, 可指定不載入


# Job Middleware

在 Laravel Queue 中, 可以建立一個 Job middleware 嗎?

可以

在 Laravel Queue 中, 如果我想要建立一個 Job Middleware, 可以建立在什麼位置?

什麼位置都可以, 官方範例位置為 app/Jobs/Middleware

以下的 Laravel example code 的意思是?
  • Example:
    <?php

    namespace App\Jobs\Middleware;

    use Illuminate\Support\Facades\Redis;

    class RateLimited
    {
    public function handle($job, $next)
    {
    Redis::throttle('key')
    ->block(0)->allow(1)->every(5)
    ->then(function () use ($job, $next) {

    $next($job);
    }, function () use ($job) {

    $job->release(5);
    });
    }
    }

    // in job class
    public function middleware()
    {
    return [new RateLimited];
    }
  • Answer:
    建立一個 job middleware, 利用 redis throttle 限制 job 每 5 秒可以執行 1 次, 並在 job 中使用該 middleware


# Dispatching Jobs

以下的 Laravel example code 的意思是?
  • Example:
    <?php

    namespace App\Http\Controllers;

    use App\Http\Controllers\Controller;
    use App\Jobs\ProcessPodcast;
    use Illuminate\Http\Request;

    class PodcastController extends Controller
    {
    public function store(Request $request)
    {
    // Create podcast...

    ProcessPodcast::dispatch($podcast);
    }
    }
  • Answer:
    dispatch 一個 job


# Delayed Dispatching

以下的 Laravel example code 的意思是?
  • Example:
    <?php

    namespace App\Http\Controllers;

    use App\Http\Controllers\Controller;
    use App\Jobs\ProcessPodcast;
    use Illuminate\Http\Request;

    class PodcastController extends Controller
    {
    public function store(Request $request)
    {
    // Create podcast...

    ProcessPodcast::dispatch($podcast)
    ->delay(now()->addMinutes(10));
    }
    }
  • Answer:
    delay dispatch 該 job 的時間, 10 分鐘後在 dispatch
在 Laravel Queue 中, AWS 的 SQS 最多支援到 delay 多久?

15 分鐘



# Synchronous Dispatching

以下的 Laravel example code 的意思是?
  • Example:
    <?php

    namespace App\Http\Controllers;

    use App\Http\Controllers\Controller;
    use App\Jobs\ProcessPodcast;
    use Illuminate\Http\Request;

    class PodcastController extends Controller
    {
    public function store(Request $request)
    {
    // Create podcast...

    ProcessPodcast::dispatchNow($podcast);
    }
    }
  • Answer:
    立即 dispatch 這個 job, 不使用 queue


# Job Chaining

在 Laravel Job chaining 中, 如果其中一個 job 失敗了, 後面的還會繼續執行嗎?

不會

在 Laravel Job chaining 中, 如果我使用 $this->delete(), 會終止後面的 job 執行嗎?

不會

以下的 Laravel example code 的意思是?
  • Example:
    <?php
    ProcessPodcast::withChain([
    new OptimizePodcast,
    new ReleasePodcast
    ])->dispatch();
  • Answer:
    使用 withChain() 實現 job chaining, job 會按照順序執行

# Chain Connection & Queue

以下的 Laravel example code 的意思是?
  • Example:
    <?php
    ProcessPodcast::withChain([
    new OptimizePodcast,
    new ReleasePodcast
    ])->dispatch()->allOnConnection('redis')->allOnQueue('podcasts');
  • Answer:
    使用 withChain() 實現 job chaining, 並指定該 chaining 的 connection 以及 queue


# Customizing The Queue & Connection

# Dispatching To A Particular Queue

以下的 Laravel example code 的意思是?
  • Example:
    <?php

    namespace App\Http\Controllers;

    use App\Http\Controllers\Controller;
    use App\Jobs\ProcessPodcast;
    use Illuminate\Http\Request;

    class PodcastController extends Controller
    {
    public function store(Request $request)
    {
    // Create podcast...

    ProcessPodcast::dispatch($podcast)->onQueue('processing');
    }
    }
  • Answer:
    使用 onQueue() 來指定 queue

# Dispatching To A Particular Connection

以下的 Laravel example code 的意思是?
  • Example:
    <?Php

    namespace App\Http\Controllers;

    use App\Http\Controllers\Controller;
    use App\Jobs\ProcessPodcast;
    use Illuminate\Http\Request;

    class PodcastController extends Controller
    {
    public function store(Request $request)
    {
    // Create podcast...

    ProcessPodcast::dispatch($podcast)->onConnection('sqs');
    }
    }
  • Answer:
    如果我使用不只一個 connection, 可使用 onConnection() 指定特定的 connection
以下的 Laravel example code 的意思是?
  • Example:
    <?php

    namespace App\Jobs;

    class ProcessPodcast implements ShouldQueue
    {
    public $connection = 'sqs';
    }
  • Answer:
    在 job class 中, 可使用 $connection property 指定該 job 的 connection
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    ProcessPodcast::dispatch($podcast)
    ->onConnection('sqs')
    ->onQueue('processing');
  • Answer:
    dispatch 時指定該 job 的 connection 以及 queue


# Specifying Max Job Attempts / Timeout Values

# Max Attempts

以下的 Laravel example code 的意思是?
  • Example:
    php artisan queue:work --tries=3
  • Answer:
    指定 worker 的嘗試次數, 會嘗試執行 job 最多 3 次, 第一次也算一次, release() 也算一次, 如果 3 次都失敗, 才會把它放到 failed_job table 內
以下的 Laravel example code 的意思是?
  • Example:
    <?php

    namespace App\Jobs;

    class ProcessPodcast implements ShouldQueue
    {
    public $tries = 5;
    }
  • Answer:
    指定該 job 的最大嘗試次數, 如果 5 次都執行失敗, release() 也算 1 次, 第一次也算, 那 worker 就會將這個 job 放到 failed_job table 中, 不再嘗試執行
Laravel job 當中, 當我同時在 job class 中, 以及 CLI 中都指令最大嘗試次數, Laravel 會以誰為準?

job class 中指定的 property


# Time Based Attempts

以下的 Laravel example code 的意思是?
  • Example:
    <?php
    public function retryUntil()
    {
    return now()->addSeconds(5);
    }
  • Answer:
    為這個 job 定義一個可被執行的時間限制, 超過 5 秒後, 這個 job 就會被放到 failed_table 中

# Timeout

以下的 Laravel example code 的意思是?
  • Example:
    php artisan queue:work --timeout=30
  • Answer:
    該 worker 最多執行一個 job 30 秒, 若超過 30 秒視為 timeout
以下的 Laravel example code 的意思是?
  • Example:
    <?php

    namespace App\Jobs;

    class ProcessPodcast implements ShouldQueue
    {
    public $timeout = 120;
    }
  • Answer:
    指定 timeout, 為該 job 可執行的最大秒數, 通過即視為失敗, 失敗超過 tries property 限制即視為 failed
Laravel job 當中, 如果我再 CLI 以及 job 的 class 同時定義的 timeout, 哪一個的優先權比較大?

job class



# Rate Limiting

Laravel queue 當中, 如果我想使用 rate limiting, 哪一個服務會是必要的?

redis

以下的 Laravel example code 的意思是?
  • Example:
    <?php
    Redis::throttle('key')->allow(10)->every(60)->then(function () {
    // Job logic...
    }, function () {
    // Could not obtain lock...

    return $this->release(10);
    });
  • Answer:
    定義特定的 key, 可以是 job 的類型 + 該 Eloquent Model 的 id
    每 60 秒最大可執行 10 次, 若失敗的話, 10 秒後 release job
下面的 Laravel 程式碼中, 當我 release 該 job, 這樣還會增加該 job 的 attempt 次數嗎?
  • Example:
    <?php
    Redis::throttle('key')->allow(10)->every(60)->then(function () {
    // Job logic...
    }, function () {
    // Could not obtain lock...

    return $this->release(10);
    });
  • Answer:
    會哦
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    Redis::funnel('yourKey')->limit(1)->then(function () {
    // Job logic...
    }, function () {
    // Could not obtain lock...

    return $this->release(10);
    });
  • Answer:
    使用 redis 記錄一個 key, 並給予這個 key 一個 job, 限制有著該 key 的 job 每次只能由一個 worker 執行


# Queueing Closures

在 Laravel 中, 什麼情況我們可能會需要 dispatch 一個 closure?

當我們想要執行一個簡單的任務, 且需要它被在請求之外的週期執行

在 Laravel 中, 如何 dispatch 一個 closure?
<?php
$podcast = App\Podcast::find(1);

dispatch(function () use ($podcast) {
$podcast->publish();
});


# Running The Queue Worker

在 Laravel 中, 怎麼樣啟動 worker 來執行 queued job?
php artisan queue:work
在 Laravel 中, 要維持 worker 持續運行, 我們需要額外啟動什麼?

程序管理器, 像是 supervisor, 或 pm2

在 Laravel 中, 當我使用 php artisan queue:work, 當我代碼有變更時, 我需要重新啟動 queue 嗎?

需要

在 Laravel 中, 如果我運行 php artisan queue:listen, 然後我變更了代碼, 我需要再重新啟動 queue 嗎?

不需要

在 Laravel 中, 如果我不想要每次變更代碼就重新啟動 queue, 我可以使用哪一個 CLI?
php artisan queue:listen
在 Laravel 中, php artisan queue:workphp artisan queue:listen 何者較有效率?
php artisan queue:work

# Specifying The Connection & Queue

在 Laravel 中, 如何透過 CLI 指定 queue connection?
php artisan queue:work redis
在 Laravel 中, 如何透過 CLI 給特定的 queue 指定特定 connection?
php artisan queue:work redis --queue=emails

# Processing A Single Job

在 Laravel 中, 如何透過 CLI 指定 worker 只執行 queued job 一次
php artisan queue:work --once

# Processing All Queued Jobs & Then Exiting

以下的 Laravel example command 的意思是?
  • Example:
    php artisan queue:work --stop-when-empty
  • Answer:
    該 worker 執行完所有的 queue job 之後就會關閉
以下的 Laravel 指令什麼時候可能會用到?
  • Example:
    php artisan queue:work --stop-when-empty
  • Answer:
    當我們利用容器運行 worker, 而我們需要工作都完成後自動關閉 container 時

# Resource Considerations

在 Laravel 的 queue 中, 因為會緩存在 RAM 中運行, 所以如果我有執行到未釋放的資源, 像是 image 之類的, 那我是否要在 job 執行完成之後釋放掉這些資源?

要哦



# Queue Priorities

以下的 Laravel example code 的意思是?
  • Example:
    php artisan queue:work --queue=high,low
  • Answer:
    定義 queue 的 priority 順序


# Queue Workers & Deployment

# Job Expiration

在 Laravel 的 queue 監聽程序中, 我要如何重啟 queue?
php artisan queue:restart
在 Laravel 的 queue 監聽 CLI php artisan queue:restart, 會不會讓我丟失執行到一半的工作?

不會哦, 它會 gracefully restart, 完成目前手邊工作之後再重啟

Laravel 中, 在沒有使用 supervisor 的情況下, 如果我執行 php artisan queue:restart, 會發生什麼事?

worker 會關閉

Laravel 中, 若要使用 php artisan queue:restart, 務必先要確認什麼服務已經設置好?

cache

Laravel 中, 當我使用 php artisan queue:restart, queue 會將 restart 的 signal 存在什麼地方?

cache

Laravel queue 中, 如果我要 globally 定義 job 在執行之後多久時間之後才可以再被執行 (如果沒被刪除的話), 我可以怎麼做?

queue.php 的 config 檔案中, 可以設定 retry_after 的參數

Laravel queue 中, 哪一個服務無法在 queue.php config 中設定 retry_after 參數

Amazon SQS


# Worker Timeouts

Laravel queue 中, --timeoutretry_after 的差別在哪?

--timeout: queue 的 master 程序需等待多久的時間才可以殺掉一個執行同樣一個 job 的子程序
retry_after: queue 的 master 程序需要隔多久才可以重啟一個子程序來執行之前執行過的 job

Laravel queue 中, --timeoutretry_after 的時間, 哪個長哪個短?

--retry_after 需要比 --timeout 來得長

Laravel queue 中, 如果 retry_after--timeout 設定的時間還短, 可能會發生什麼事?

同一個 job 會被執行兩次


# Worker Sleep Duration

Laravel queue 中, 如果我要讓 worker 沒有新的 job 時會進入休眠狀態, 那我可以怎麼做?
php artisan queue:work --sleep=3
Laravel queue 中, 如果我有使用 --sleep 的 option, 當 queue 裡頭有很多 job 還沒有執行完畢時, worker 會進入 sleep 狀態嗎?

不會, 會等到所有 job 都處理完畢, 沒有新的 job 了 worker 才會進入 sleep 狀態

Laravel queue 中, 如果我有使用 --sleep 的 option, 當 worker 處於 sleep 的狀態時, 如果有新的 job 進來, worker 會等到 sleep 結束再處理, 還是會立即處理?

待 sleep 結束才會處理



# Supervisor Configuration

# Installing Supervisor

如何安裝 supervisor ?
sudo apt-get install supervisor
Supervisor 的設定檔位置在哪?

/etc/supervisor/conf.d

以下的 supervisor config example 的意思是?
  • Example:
    [program:laravel-worker]
    process_name=%(program_name)s_%(process_num)02d
    command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3
    autostart=true
    autorestart=true
    user=forge
    numprocs=8
    redirect_stderr=true
    stdout_logfile=/home/forge/app.com/worker.log
    stopwaitsecs=3600
  • Answer:
    [program:laravel-worker]
    process_name=%(program_name)s_%(process_num)02d
    command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3
    // 是否隨 supervisor 啟動一起啟動
    autostart=true
    // 程序異常退出後自動重啟
    autorestart=true
    user=forge
    numprocs=8
    // 是否教錯誤的 log 一併記錄到 stdout
    redirect_stderr=true
    // 一般輸出的記錄檔案位址
    stdout_logfile=/home/forge/app.com/worker.log
    // 刪除一個 job 的等待時間, 這邊需設定大於會執行最久的 job 的時間, 不然會在執行完畢前被砍掉
    stopwaitsecs=3600

# Starting Supervisor

以下的 supervisor example command 的意思是?
  • Example:
    sudo supervisorctl reread
  • Answer:
    重新讀取 config 檔, 但不重啟 process
以下的 supervisor example command 的意思是?
  • Example:
    sudo supervisorctl update
  • Answer:
    重新讀取 config 檔, 並依照 config 檔案重啟 process
以下的 supervisor example command 的意思是?
  • Example:
    sudo supervisorctl start laravel-worker:*
  • Answer:
    啟動 supervisor
Supervisor 官方文件位址?

Supervisor 官方文件



# Dealing With Failed Jobs

Laravel queue 中, 當一個 job 一直執行失敗, 超出了指定的 tries 的限制, 那這個 job 會跑到哪裡去?

failed_jobs

Laravel queue 中, 如何建立 failed_jobs table?
php artisan queue:failed-table
php artisan migrate
以下的 Laravel example code 的意思是?
  • Example:
    php artisan queue:work redis --tries=3 --delay=3
  • Answer:
    如果失敗, 最多嘗試 3 次, 超過 3 次的會被歸類到 fail job 中, 每次嘗試的間隔最少 3 秒
以下位於 job 的 Laravel example code 的意思是?
  • Example:
    <?php
    public $retryAfter = 3;
    public $backOff = 3;
  • Answer:
    定義該 job 如果執行失敗了, 或被 release 回 queue 中, 要間隔多久時間才會再被 worker pick up 執行。 有時失敗是因為第三方服務短時間內故障, 若不定義間隔時間, 很可能會在短時間內將 tries 的次數全部耗光, 如此便失去了給第三方服務時間恢復正常的空間
    $backOff 為 Laravel 8 的用法


# Cleaning Up After Failed Jobs

以下的 Laravel example code 的意思是?
  • Example:
    <?php

    namespace App\Jobs;

    class ProcessPodcast implements ShouldQueue
    {
    use InteractsWithQueue, Queueable, SerializesModels;

    protected $podcast;

    public function __construct(Podcast $podcast)
    {
    $this->podcast = $podcast;
    }

    public function handle(AudioProcessor $processor)
    {
    // Process uploaded podcast...
    }

    public function failed(Exception $exception)
    {
    // Send user notification of failure, etc...
    }
    }
  • Answer:
    handle() 內定義邏輯, failed() 內定義當 job failed 之後要做的事


# Failed Job Events

以下的 Laravel example code 的意思是?
  • Example:
    <?php

    namespace App\Providers;

    use Illuminate\Support\Facades\Queue;
    use Illuminate\Support\ServiceProvider;
    use Illuminate\Queue\Events\JobFailed;

    class AppServiceProvider extends ServiceProvider
    {
    public function register()
    {
    //
    }

    public function boot()
    {
    Queue::failing(function (JobFailed $event) {
    // $event->connectionName
    // $event->job
    // $event->exception
    });
    }
    }
  • Answer:
    在 AppServiceProvider 的 boot method 中, 使用 Queue class 的 failing method, 定義了當 job 失敗時, 就會觸發這個事件, 並執行 closure 內的邏輯


# Retrying Failed Jobs

以下的 Laravel example code 的意思是?
  • Example:
    php artisan queue:failed
  • Answer:
    從 failed_jobs table 中檢視所有的 failed jobs
以下的 Laravel example code 的意思是?
  • Example:
    php artisan queue:failed
  • Answer:
    取得 failed job id
以下的 Laravel example code 的意思是?
  • Example:
    php artisan queue:retry 5
  • Answer:
    retry 特定 failed job, 5 為 failed job ID
以下的 Laravel example code 的意思是?
  • Example:
    php artisan queue:retry all
  • Answer:
    retry 所有的 failed jobs
以下的 Laravel example code 的意思是?
  • Example:
    php artisan queue:forget 5
  • Answer:
    刪除一個 failed job
    5 為 failed job ID
以下的 Laravel example code 的意思是?
  • Example:
    php artisan queue:flush
  • Answer:
    刪除所有的 failed jobs


# Ignoring Missing Models

Laravel Queue 中, 我們注入一個 Eloquent model 到一個 job 當中, 而當執行該 job 時, 該 model 已遭刪除, 這時會發生什麼事?

這個 job 會失敗, ModelNotFoundException

以下的 Laravel example code 的意思是?
  • Example:
    <?php
    public $deleteWhenMissingModels = true;
  • Answer:
    當注入一個 Eloquent model 到一個 job 當中, 而當執行該 job 時, 該 model 已遭刪除, 這時會報錯, ModelNotFoundException, 可以使用 job class 中的 $deleteWhenMissingModel property, 當出現此狀況時會自動餐除該 job


# Job Events

以下的 Laravel example code 的意思是?
  • Example:
    <?php

    namespace App\Providers;

    use Illuminate\Support\Facades\Queue;
    use Illuminate\Support\ServiceProvider;
    use Illuminate\Queue\Events\JobProcessed;
    use Illuminate\Queue\Events\JobProcessing;

    class AppServiceProvider extends ServiceProvider
    {
    public function register()
    {
    //
    }

    public function boot()
    {
    Queue::before(function (JobProcessing $event) {
    // $event->connectionName
    // $event->job
    // $event->job->payload()
    });

    Queue::after(function (JobProcessed $event) {
    // $event->connectionName
    // $event->job
    // $event->job->payload()
    });
    }
    }
  • Answer:
    如果想在 job 開始或結束後做一些事, 可在 AppServiceProvider 的 boot() 中註冊 before() 以及 after() event, 並在 closure 內做事
    AppServiceProvider
Laravel Queue 中, 以下的程式碼通常可以應用在什麼情況?
  • Example:
    <?php
  • Answer:
    <?php

    namespace App\Providers;

    use Illuminate\Support\Facades\Queue;
    use Illuminate\Support\ServiceProvider;
    use Illuminate\Queue\Events\JobProcessed;
    use Illuminate\Queue\Events\JobProcessing;

    class AppServiceProvider extends ServiceProvider
    {
    public function register()
    {
    //
    }

    public function boot()
    {
    Queue::before(function (JobProcessing $event) {
    // $event->connectionName
    // $event->job
    // $event->job->payload()
    });

    Queue::after(function (JobProcessed $event) {
    // $event->connectionName
    // $event->job
    // $event->job->payload()
    });
    }
    }
  • Output:
    記 log
    統計特定 job 運行次數
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    Queue::looping(function () {
    while (DB::transactionLevel() > 0) {
    DB::rollBack();
    }
    });
  • Answer:
    在 AppServiceProvider boot() 中可以註冊 looping(), 在 worker fetch job 之前都會執行這個 closure, 將 active 的 transaction rollback

# Additional

以下的 Laravel example code 的意思是?
  • Example:
    <?php
    class DeployProject implements ShouldQueue, ShouldBeUniqueUntilProcessing
    {
    public $uniqueFor = 60;

    public function uniqueId()
    {
    return $this->project->id;
    }
    }
  • Answer:
    若 implement ShouldBeUniqueUntilProcessing interface, 則當該 job 被 processing 時, 就允許相同 unique id 的 job 被 dispatch 到 queue 中, 適用場景為, 如果說這個 job 是用來 deploy 最新的 commit, 只要該 queue 中有這個 job, 那就不需要有額外的 job 存在於該 queue, 但一旦該 job starts be processed, 可能就會出現新的 commit, 因此這時就允許這個 job 再度被 dispatch 到 queue 來
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    class UpdateBalanceJob implements ShouldQueue, ShouldBeUnique
    {
    public $uniqueId = 'products';
    public $uniqueFor = 60;

    public function uniqueId()
    {
    return $this->category->id;
    }
    }

  • Answer:
    Implement ShouldBeUnique, Laravel 會確保 queue 中, 該 $uniqueId 只會有這麼一個 job, 重複的都會被 ignore, ShouldBeUnique 跟 withoutOverlapping 的差異在於, 前者限制的是該 job 在 queue 中必須為 unique, 後者允許 multiple unique job 同時出現在 queue 中, 但只會一次執行一個
    $uniqueFor 代表 atomic lock 鎖住的時間, 單位為秒
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    public function boot()
    {
    RateLimiter::for('reports', function ($job) {
    return $job->customer->onPremiumPlan()
    ? Limit::perHour(100)->by($job->customer->id) : Limit::perHour(10)->by($job->customer->id);
    });
    }

    public function middleware()
    {
    return [
    new RateLimited('reports')
    ];
    }
  • Answer:
    先在 Service Provider 中定義 名為 report 的 RateLimiter middleware, 然後可以在 job 中使用該 middleware
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    class UpdateBalanceJob implements ShouldQueue
    {
    // The job handle method...

    public function middleware()
    {
    return [
    new WithoutOverlapping($this->customer->id)->expireAfter(10)
    ];
    }
    }
  • Answer:
    UpdateBalanceJob 將不會 run customer->id concurrently, 且在 10 秒後釋放 lock, 這樣即使該 job crashes 也不會讓該 lock 一直鎖住
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    class UpdateBalanceJob implements ShouldQueue
    {
    // The job handle method...

    public function middleware()
    {
    return [
    new WithoutOverlapping($this->customer->id)->dontRelease()
    ];
    }
    }
  • Answer:
    UpdateBalanceJob 將不會 run customer->id concurrently, 且會刪除 concurrent job
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    class UpdateBalanceJob implements ShouldQueue
    {
    // The job handle method...

    public function middleware()
    {
    return [
    new WithoutOverlapping($this->customer->id)->releaseAfter(10)
    ];
    }
    }
  • Answer:
    UpdateBalanceJob 將不會 run customer->id concurrently, 且會 delay 10 秒後在 release 到 queue 當中讓 worker attempts picking up
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    class UpdateBalanceJob implements ShouldQueue
    {
    // The job handle method...

    public function middleware()
    {
    return [
    new WithoutOverlapping()
    ];
    }
    }
  • Answer:
    UpdateBalanceJob 將不會 run concurrently
以下的 Laravel Horizon example configuration 的意思是?
  • Example:
    'environments' => [
    'environment' => [
    'supervisor-1' => [
    'nice' => 5,
    ],
    ],
    ]
  • Answer:
    設定 worker 的 priority 不要高於其他的 server process
以下的 Laravel Horizon example configuration 的意思是?
  • Example:
    'fast_termination' => false,
  • Answer:
    php artisan horizon process 預設會等到所有的 worker 都退出, 自己才會退出, 即 Supervisor 會等到所有的 worker 結束才會啟動新的 Horizon process, 但如果此時正好有 long running worker 的話, 可能就會造成 delay, 如果設定 fast_termination = true, 那 Horizon 在送出 exit signal 給所有 worker 後自己就會退出並重啟, 所以會出現新舊 worker process 共存, old worder process 處理完後會退出
以下的 Laravel Horizon example configuration 的意思是?
  • Example:
    'trim' => [
    'recent' => 10080,
    'completed' => 10080,
    'pending' => 10080,
    'recent_failed' => 10080,
    'failed' => 10080,
    'monitored' => 10080,
    ];
  • Answer:
    Horizon 會將一些 metrics 存在 Redis 中, 即 server memory 中, 可定義多久之後的 metrics 該被釋放
以下的 Laravel Horizon example configuration 的意思是?
  • Example:
    'memory_limit' => 64,
  • Answer:
    定義 Horizon process 的 memory limit
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    public $shouldBeEncrypted = true;
  • Answer:
    在存到 store 之前, encrypt, 只有 worker 可以 decrypt 這些 job
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    public function uniqueVia()
    {
    return Cache::store('redis');
    }

  • Answer:
    Laravel 預設會使用 default cache 來取得 atomic lock, 可指定其他的 cache instance
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    php artisan queue:work --once
  • Answer:
    worker 只執行一次就重啟, 通常用於需要執行完後立即釋放 memory, 但會消耗較多的 CPU, 因為每次都要重啟一個 application process
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    Worker::popUsing('notifications', function ($pop) {
    $queues = time()->atNight()
    ? ['mail', 'webhooks']
    : ['push-notifications', 'sms', 'mail', 'webhooks'];
    foreach ($queues as $queue) {
    if (!is_null($job = $pop($queue))) {
    return $job;
    }
    }
    });
  • Answer:
    使用 Worker::popUsing(), customize 名為 notifications 的 worker, 如果時間是晚上, 就從 mail, webhooks queue 中 consume jobs, 反之則從其他的 queue 中 consume job
以下的 Laravel example code 的意思是?
  • Example:
    sudo supervisorctl stop group-name:* 

    cd /home/forge/mysite.com
    # ...
    $FORGE_PHP artisan migrate --force
    sudo supervisorctl start group-name:*
  • Answer:
    當更新部署時, 若 database schema 更新了, 但 worker 的 code 沒有更新, 那會造成 worker 的 last job fails, 所以先關掉所有的 worker, 等到 migration 更新完畢, 在啟動新的 worker, 不過如果 database schema 沒有更動的話, 不建議每次都 stop workers
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    php artisan queue:work --tries=3 --backoff=30,90,300

    class SendInvoice implements ShouldQueue
    {
    public $backoff = [30, 90, 300];

    public function backoff()
    {
    return [30, 90, 300]; }
    }
    }
  • Answer:
    使用不同的方式來設定 backoff, 即 job fails 之後, 要經過多久時間才 retry, 第一次 30 秒, 第二次 90 秒, 第三次 300 秒
Laravel Vapor 中, 如果我有一些 job 需要 high memory, 而有些 job 只需要比較低的 memory, 建議的做法是?

建立一個 environment configuration 來啟動 high memory worker, 並將 heavy memory job 放到一個特別的 queue, 這個 worker 就專門 pick up 這個 queue 中的 job

Laravel Vapor 中, 以什麼計費?

container 啟動並執行 job 的時間

Laravel Vapor 中, 什麼是 cold start?

當 Vapor 呼叫 AWS Lambda 啟動 container as worker, 會需要一些時間來啟動, 取決於 project size, 這個步驟又稱為 cold start

SQS 可以設定哪個選項來避免 duplicated job?

VisibilityTimeout

Laravel Vapor 的最大 timeout 是多久?

900 秒

為避免 memory leaking, Vapor 會在處理幾個 job 後重啟 worker container?

250 jobs

AWS 的 default concurrency limit 是多少?

1000 per region

以下的 vapor configuration 意思是?
  • Example:
    <?php
    id: 1
    name: laravel-queues-in-action
    environments:
    production:
    queue-concurrency: 10
    queue-memory: 1024
    queue-timeout: 300
  • Answer:
    使用 vapor 時, 可定義可同時執行的 queue job 的數量, 等同於啟動 10 個 workers
    一個 job 最多使用 1024 mb
    一個 job 最多執行 300 秒, 之後 worker container 會關閉
如果要使用 Redis 在不同的用途, 假如是同一個 Redis instance, 那如果有一個應用的 traffic 特別高, 會影響其他的應用嗎?

會, 因為 single thread nature

如果要使用 Redis 在不同的用途, 例如不同應用的 cache 以及 queue, 建議 practice 是?

分別使用不同的資料庫

以下的 Laravel example code 的意思是?
  • Example:
    <?php
    Queue::connection('database')->size('invoices')
  • Answer:
    取得 queue 中的 job 數量
Laravel 中, 如果使用 sync driver, 是跑在 memory 還是 database?

memory

Laravel 中, 如果使用 SQS 當作 queue driver, 可以使用 retry_after configuration 嗎?

不行, 需到 AWS console 設定 “Default Visibility Timeout”

Laravel 中, 如果使用 Sever Redis 當作 queue driver, 可以確保 job 只被 dispatch 一次嗎?

可以, 因為 single thread

Laravel 中, 如果使用 SQS 當作 queue driver, 可以確保 job 只被 dispatch 一次嗎?

不可以

Laravel 中, 如果使用 SQS 當作 queue driver, job 的最長生命週期多長?

12 小時

以下的 Laravel example code 的意思是?
  • Example:
    <?php
    // controller
    $job = new SendOrderToSupplier($order);
    try {
    retry(2, function () use ($job) {
    dispatch($job)->onQueue('high');
    }, 5000);
    } catch (Throwable $e) {
    DB::table('dead_letter_queue')->insert([
    'message' => serialize(clone $job),
    'failed_at' => now()
    ]);
    }

    // failed handling
    DB::table('dead_letter_queue')->take(50)->each(function($record) {
    try {
    dispatch(
    unserialize($record->message)
    );
    } catch (Throwable $e) {
    return;
    }

    DB::table('dead_letter_queue')->where('id', $record->id)->delete();
    })
  • Answer:
    失敗時, 將 failed job serialize 並放到 table 中, 使用 cron 固定從該 table 中取出, 再次執行, 若執行成功則刪除該筆紀錄
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    retry(2, function() {
    GenerateReport::dispatch();
    }, 5000);
  • Answer:
    避免因為 network 的關係 dispatch 到 queue store 失敗, 所以使用 retry dispatch 兩次, 這邊要記得設定 unique
SQS 每個 message 的 size 限制是多少?

256 kb

以下的 Laravel example code 的意思是?
  • Example:
    <?php
    $queuedMessage = [
    'job' => serialize(clone $job), 'payload' => [
    // attempts, timeout, backoff, retryUntil, ...
    ]
    ];
    return json_encode($queuedMessage);
  • Answer:
    Laravel 會先把 queued job serialized 並放到 array, 在發送到 queue store 之前, json encoded, 所以務必確保 job instance 以及所有的 dependency 都是 serializable
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    0 * * * * forge php /home/forge/laravel.com/artisan horizon:terminate
  • Answer:
    每小時關閉 horizon, 防止 memory leak
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    'environments' => [
    'production' => [
    'supervisor-1' => [
    'queue' => ['deployments', 'notifications'],
    'balance' => 'auto',
    'min_processes' => 1,
    'max_processes' => 10,
    'balanceMaxShift' => 3,
    'balanceCooldown' => 1
    ],
    ],
    ]
  • Answer:
    使用 Horizon 的 auto balance strategy, 自動依照 queue busy 的程度來啟動 worker, 最小 1 個 worker, 最多 10 個
    每 1 秒 (balanceCooldown) 可以增加或減少 3 個 (balanceMaxShift) worker
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    'environments' => [
    'production' => [
    'supervisor-1' => [
    'connection' => 'redis',
    'queue' => ['deployments', 'notifications'],
    'balance' => 'simple',
    'processes' => 10,
    'tries' => 1,
    ],
    ],
    ]
  • Answer:
    使用 simple balance strategy, Horizon 會依照 queue priority 來分配 worker
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    'environments' => [
    'production' => [
    'deployments' => [
    // ...
    'timeout' => 300,
    'processes' => 7,
    'queue' => 'deployments'
    // ...
    ],
    'notifications' => [
    // ...
    'timeout' => 60,
    'processes' => 3,
    'queue' => 'notifications'
    // ...
    ],
    ],
    ]
  • Answer:
    使用 Horizon 在 production 下, 啟動 7 個 worker for queue deployments, 3 個 worker for notifications
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    'environments' => [
    'production' => [
    'supervisor-1' => [
    'connection' => 'redis',
    'queue' => ['deployments', 'notifications'],
    'balance' => 'off',
    'processes' => 10,
    'tries' => 1,
    ],
    ],
    ]
  • Answer:
    Horizon 會執行如下指令, 並確保有 10 個 worker, 如果其中有的掛了, 會自動啟動, 若有其他要設定的, 例如 backoff, 也可以加到 config 檔中
    artisan horizon:work redis --name=default 
    --supervisor=host-tdjk:supervisor-1
    --backoff=0
    --memory=128 --queue=deployments,notifications --sleep=3
    --timeout=60
    --tries=1
以下的 Laravel example code 的意思是?
  • Example:
    // command
    php artisan horizon

    // configuration
    [program:horizon] command=php artisan horizon
    process_name=%(program_name)s
    autostart=true
    autorestart=true
    stopasgroup=true
    user=forge
    stdout_logfile=/home/forge/.forge/notifications-workers.log
    stopwaitsecs=3600

    // 位置
    /etc/supervisor/conf.d/horizon.conf
  • Answer:
    將 configuration 置於 .../conf.d/horizon.conf, 由 horizon 來統一控管所有的 worker, 不需要指定 numprocess, 這些參數將設定在 Horizon 的 config file, 由 Horizon 統一控管
以下的 Laravel example code 的意思是?
  • Example:
    */5 * * * * forge php /home/forge/laravel.com/artisan queue:work --stop-when-empty --max-time=240
  • Answer:
    每 5 分鐘起動一個 worker, 結束工作後即自動退出, 若 server 的記憶體有限, 為避免 worker 越起越多 (如果 queue 非常 busy 的話), 可定義最長多少時間要退出
以下的 Laravel example code 的意思是?
  • Example:
    */5 * * * * forge php /home/forge/laravel.com/artisan queue:work --stop-when-empty
    */5 * * * * forge php /home/forge/laravel.com/artisan queue:work --stop-when-empty
    */5 * * * * forge php /home/forge/laravel.com/artisan queue:work --stop-when-empty
  • Answer:
    每 5 分鐘啟動三個 worker, 工作結束後自動關閉
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    // on schedule
    $schedule->call(function () {
    if (Queue::size('orders') < 30) {
    return $this->scaleDown();
    }
    Cache::increment('timer');
    if (Cache::get('timer') == 4) {
    return $this->scaleUp();
    }
    })->everyMinute();

    public function scaleDown()
    {
    Cache::forget('timer');
    Process::fromShellCommandline(
    'sudo supervisorctl stop extra-workers:*'
    )->run();
    }

    public function scaleUp()
    {
    Cache::forget('timer');
    Process::fromShellCommandline(
    'sudo supervisorctl start extra-workers:*'
    )->run();
    }
  • Answer:
    如果連續四次, 即四分鐘檢測, 如果 queue orders 中的 job 數量都保持在 30 個以上的話, 即啟動 extra worker, 如果 job 數量低於 30, 則關閉 extra workers
以下的 Laravel example code 的意思是?
  • Example:
    // supervisor
    [program:extra-workers]
    command=php artisan queue:work notifications -- queue=orders,notifications
    process_name=%(program_name)s_%(process_num)02d autostart=false
    autorestart=true
    stopasgroup=true
    user=forge
    numprocs=3 stdout_logfile=/home/forge/.forge/extra-workers.log stopwaitsecs=3600

    // suvisor command
    supervisorctl reread
    supervisorctl update

    // cron
    0 7 * * * root supervisorctl start extra-workers:*
    0 11 * * * root supervisorctl stop extra-workers:*
  • Answer:
    rush hour scaling, 少了 autostart, 所以當 restart supervisor 時, 不會自動啟動這個 group
    reread 並 update, 將這個 group 更新到 supervisor memory
    使用 cron 來管理 start and stop
以下的 Laravel example code 的意思是?
  • Example:
    php artisan queue:work --max-jobs=1000 --max-time=3600
  • Answer:
    定義 worker 最多執行 1000 個 job, 以及最長的執行一小時, 達到任一時, 會自動重啟, 以避免 memory leak
    worker 會在每次完成一個 job 後, 檢查是否達到條件來決定退出或繼續 pick up job
以下的 cron example 的意思是?
  • Example:
    0 * * * * forge php /home/forge/laravel.com/artisan queue:restart
  • Answer:
    每小時 restart worker 來避免 memory leak
以下的 supervisor 的意思是?
  • Example:
    supervisorctl restart notification-workers:* 
    supervisorctl restart reports-workers:*
  • Answer:
    當 code 更新時, worker 的 code 並未更新, 所以必須 restart
當使用 supervisor stop 時, 會中斷執行到一半的 task 嗎?

不會, 會送出 stop signal, 讓 worker 先完成手邊的工作, 並停止 pick up new task, 然後退出

以下的 supervisor example 的意思是?
  • Example:
    [program:notification-workers]
    command=php artisan queue:work notifications --tries=3 --memory=256
    process_name=%(program_name)s_%(process_num)02d autostart=true
    autorestart=true
    stopasgroup=true
    user=forge
    numprocs=4 stdout_logfile=/home/forge/.forge/notifications-workers.log stopwaitsecs=3600
  • Answer:
    stopasgroup=true, 如果這個選項開啟, 當要求 supervisor stop this group 時, signal 會被發送到 every process in this group
Laravel 中, 如果一個 worker 吃光了 PHP 可用的所有 memory, 那會發生什麼事?

該 worker 會 exit with a fetal error

以下的 Laravel Testing example code 的意思是?
  • Example:
    <?php
    php artisan queue:work --memory=256
  • Answer:
    指定每個 worker 可以使用的最大 memory size, worker-level 使用的 memory 不可超過定義在 php.ini 中, 分配給 PHP 的 memory limit
以下的 Laravel Queue example code 的意思是?
  • Example:
    <?php
    SomeClass::$property[] = $item;
    SomeClass::$property = [];
  • Answer:
    當使用 singleton 方法 cache value 時, 在 job 結束時別忘了 clean resource
Laravel 中, 如果 server 是一台 single CPU server, and jobs involve a lot of waiting, 那建議啟動幾個 worker?

可以啟用多個, 因為當一個 worker waiting 時, 其他 worker 可以使用 CPU

Laravel 中, 如果 server 是一台 single CPU server, 而 job 是 CPU intensive job, 那建議啟動幾個 worker?

一個, 因為該 worker 將會使用掉大部分的 CPU

以下的 Laravel example code 的意思是?
  • Example:
    <?php
    while (true) {
    $job = $this->getNextJob();
    if ($job) {
    $this->process($job);
    } else {
    sleep(3);
    }
    }
  • Answer:
    worker 實際的運作模式
以下的 Laravel Testing example code 的意思是?
  • Example:
    <?php
    // Event listener
    class EventServiceProvider extends ServiceProvider
    {

    protected $listen = [
    NewOrderSubmitted::class => [
    UpdateCustomerMetrics::class, SendInvoice::class, SendGiftCoupon::class,
    ],
    ];
    }

    // Controller
    class OrderController
    {

    public function store()
    {
    // ...
    event(new NewOrderSubmitted($order));

    return redirect('/thanks');
    }
    }

    // Job
    class SendGiftCoupon implements ShouldQueue
    {

    public function handle(NewOrderSubmitted $event)
    {
    $customer = $event->customer;
    $coupon = Coupon::createForCustomer($customer);
    Mail::to($customer)->send(new CouponGift($coupon)
    );
    }

    public function shouldQueue(NewOrderSubmitted $event)
    {
    return $event->customer->eligibleForRewards();
    }
    }
  • Answer:
    使用 shouldQueue 來判斷 customer 是否 eligibleForRewards, 如果 return false, 則該 job 不會被放到 queue 中
    如果不使用 shouldQueue 而在 handle() 中寫 if 判斷, 一是無法掌握判斷的時機點, 因為 job 何時會被 pick up 是無法掌握的, 二是萬一是 false 的話, 會浪費資源執行一個沒有動作的 job
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    nice -n 10 php artisan queue:work
    php artisan queue:work --rest=0.5 --sleep=5
  • Answer:
    給 worker 指派較低的 priority, 這樣 CPU 會讓 request 優先的被處理, 像是 php-fpm, nginx
    可指定 0 到 19, 越高則 priority 越低
    rest 0.5 秒, 可讓 worker 在處理完 job 後, 停頓 0.5s, 讓 OS 有機會分配更多的 CPU 給 high priority 的 process
Laravel queue 中, 使用 queue 的要點是什麼?

job payload 越小越好

Laravel queue 中, 當使用 Redis 為 queue driver 時, 務必要讓 job 越可能得越快被執行越好, 為什麼?

因為未處理的 job 會堆積在 redis 中

以下的 Laravel queue example code 的意思是?
  • Example:
    <?php
    public function handle()
    {
    app('queue.worker')->shouldQuit = 1;
    }
  • Answer:
    如果有某些 job 會耗費大量的 memory, 那麼極有可能在該 worker 完成該 job 後, 會累積大量 PHP garbage collector 無法偵測到的 reference 在 server memory 中, 可在 handle method 的結尾, 指定該 worker restart
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    php artisan queue:work --max-jobs=1000 --max-time=3600
  • Answer:
    隨著 worker 處理 job, 會慢慢地堆積一些 reference, 這些 reference 不會被 PHP garbage collector 偵測並回收, 會累積在 server memory, 直到累積到某個點, 造成 server crash, 因此可定義讓 worker 執行到達某個量的 job 或時間便自動釋放重啟
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    // In controller
    public function store()
    {
    Messenger::dispatch($customer);
    }

    // In Messenger Job
    public $tries = 0;

    public function handle()
    {
    if (!$this->customer->is_active) {
    return $this->fail(
    new \Exception('Customer account is not active!')
    );
    }

    $messages = Messages::where('customer', $this->customer->id)->where('status', 'pending')
    ->orderBy('timestamp')->limit(10)->get();

    if (!$messages->count()) {
    return $this->release(5);
    }

    foreach ($messages as $message) {
    $this->chained[] = $this->serializeJob(new ProcessMessage($message));
    }

    $this->chained[] = $this->serializeJob(new self($this->customer));

    Bus::chain($this->chained)->dispatch();
    }

    // In ProcessMessage Job
    public $tries = 0;

    public function handle()
    {
    if ($this->attemps() > 5) {
    Log::error(...);

    return;
    }

    Intercom::send($this->message);

    $this->message->update([
    'status' => 'sent'
    ]);
    }
  • Answer:
    <?php
    // In controller
    // 當建立新的 customer 時, 啟動該 job
    public function store()
    {
    Messenger::dispatch($customer);
    }

    // In Messenger Job
    // $treis 設為 0, 因為這個 job 將會無限制次數的一直跑在背景, 監聽著 database
    public $tries = 0;

    public function handle()
    {
    // 如果該 customer 帳號為啟動, 則 return 該 job, 啟動時需 dispatch messenger job
    if (!$this->customer->is_active) {
    return $this->fail(
    new \Exception('Customer account is not active!')
    );
    }

    // Laravel 會將 chain's jobs payloads 存在第一個 the payload of the first job in the chain
    // 所以使用 limit() 以避免 payload size 過大
    $messages = Messages::where('customer', $this->customer->id)->where('status', 'pending')
    ->orderBy('timestamp')->limit(10)->get();

    // 如果目前資料庫中沒有未處理的 message, 則 5 秒後再看一次
    if (!$messages->count()) {
    return $this->release(5);
    }

    // 取得將會用到的 job
    foreach ($messages as $message) {
    $this->chained[] = $this->serializeJob(new ProcessMessage($message));
    }

    // 最後, 將 messenger 這個 job 放在最後, 這樣當所有的 message 都處理完成
    // 會再次執行 messenger job, 再檢查一次資料庫
    $this->chained[] = $this->serializeJob(new self($this->customer));

    // dispatch chained jobs
    Bus::chain($this->chained)->dispatch();
    }

    // In ProcessMessage Job
    // $tries 需設為 0, 這樣才不會因為失敗被 mark as failed
    // 才不會讓後面的 job 都無法被執行
    public $tries = 0;

    public function handle()
    {
    // 因為 $tries 設為 0, 所以這邊要手動的處理 failure
    // 如果嘗試超過 5 次, 就 log 並 return, 執行下一個 message job
    if ($this->attemps() > 5) {
    Log::error(...);

    return;
    }

    Intercom::send($this->message);

    $this->message->update([
    'status' => 'sent'
    ]);
    }
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    class RefundAttendee implements ShouldQueue, ShouldBeUnique
    {

    public $uniqueFor = 10; // ...

    public function __construct($attendee)
    {
    $this->attendee = $attendee;
    }

    public function handle()
    {
    if (!$this->attendee->invoice->wasRefunded()) {
    $this->attendee->invoice->refund();
    }
    Mail::to($this->attendee)->send(...);
    }

    public function uniqueId()
    {
    return $this->attendee->invoice->id;
    }

    }
  • Answer:
    Laravel 8 的新功能, 如果擔心一個 job 會被 dispatch 多次, 那可以 implement ShouldBeUnique interface, 當 queue 內已有該 instance, 後面 dispatch 的都會被忽略
    如果需要 dispatch 多個相同的 job class, 但根據帶入的 parameter 而不同的話, 可使用 uniqueId() 來作為 unique key
    Laravel 在底層會嘗試取得該 job 該 unique key 的 lock, 只要該 lock 存在, 後面 dispatch 的 job 都會被忽略
    通常 lock 會在該 job fails 或 finish 之後 release, 若要自定義釋放時間, 可使用 $uniqueFor property
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    class ReportGenerationPlan
    {

    public static function start($report)
    {
    Bus::chain([
    new ExtractData($report),
    function () use ($report) {
    static::step2($report);
    }
    ])->catch(function () use ($report) {
    static::failed($report);
    })->dispatch();
    }

    private static function step2($report)
    {
    $jobs = $report->chunks->mapInto(TransformChunk::class);
    Bus::batch($jobs)
    ->then(function () use ($report) {
    static::step3($report);
    })->dispatch();
    }

    private static function step3($report)
    {
    Bus::chain([
    new StoreData($report),
    new GenerateSummary($report)
    ])->dispatch();
    }

    private static function failed($report)
    {
    // Run any cleaning work ...
    $report->update([
    'status' => 'failed'
    ]);
    }
    }

    // in controller
    $report = Report::create([...]);
    ReportGenerationPlan:::start($report);
  • Answer:
    START
    -> RUN ExtractData THEN
    -> DISPATCH multiple TransformChunk jobs THEN
    -> RUN all TransformChunk jobs from the batch THEN
    -> DISPATCH a chain THEN
    -> RUN StoreData THEN
    -> RUN GenerateSummary END
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    public function store()
    {
    $database = Database::create([
    'status' => 'provisioning'
    ]);

    Bus::chain(
    new EnsureANetworkExists(),
    new EnsureNetworkHasInternetAccess(),
    new CreateDatabase($database)
    )->catch(function () {
    $network = Network::first();
    if (!$network->usedByOtherResources()) {
    $network->delete();

    return;
    }
    if (!$network->activeDatabases()->count()) {
    $network->removeInternetAccess();
    }
    })->dispatch();
    }
  • Answer:
    多個 job 會按照 chain() 內的順序執行, 當第一個成功了才會執行下一個, 如果第一個一直 retry 直到 attempt 好耗盡而失敗, 那便會 invoke cache() 內的 closure
    先確定 network 存在, 否則則建立, 在確認 network 有 internet access, 否則則建立, 最後確認 database 是否有被 attached, 若無, 則 attach 到 network
    若失敗, 先確認 network 是否有被其他 resource 使用, 若無, 則直接刪除該 network, 若有被其他資源使用, 但無連接 active database, 則移除 internet access
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    // in controller
    public function store()
    {
    $temporaryFilePath = request()->file('video')
    ->store('uploaded-videos');
    CompressAndStoreVideo::dispatch($temporaryFilePath);
    }

    // in job
    public function handle()
    {
    if (!app('files')->exists($this->temporaryFilePath)) {
    report(new \Exception('Temporary file not found!'));

    return $this->delete();
    }

    $newFile = VideoProcessesor::compress($this->temporaryFilePath);

    app('files')->delete($this->temporaryFilePath);
    }


    public function failed(Exception $e)
    {
    // 如果之後不重試, 也可立即刪掉
    DeleteFile::dispatch($this->temporaryFilePath)->delay(now()->addHours(24));
    }
  • Answer:
    在 controller 中, 將 file 先存到 default disk, 並取得 path, dispatch path 到 job
    job handle method 中, 檢查該 path file 是否依然存在, 如果不存在則 report 並刪掉該 job
    如果存在, 則 compress 該 file, 完成後刪掉該 file
    如果失敗了, 延遲 24 小時後刪除, 在這期間可以手動嘗試
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    class ModelIdentifier
    {

    public function __construct(
    $class,
    $id,
    $relations,
    $connection
    ) {
    $this->id = $id;
    $this->class = $class;
    $this->relations = $relations;
    $this->connection = $connection;
    }
    }
  • Answer:
    Laravel 為了減少 job payload size, 當 queued job 如果其中有 model, 只會記住該 model identifier, 待真正執行該 job 時才會依照 identifier 取出 model
以下的 Laravel job middleware example code 的意思是?
  • Example:
    <?php
    public function handle($job, $next)
    {
    $this->lastFailureTimestamp = Cache::get('circuit:open');
    // Check if the circuit is open and release the job.
    if (!$this->shouldRun()) {
    return $job->release(
    $this->lastFailureTimestamp + $this->secondsToCloseCircuit + rand(1, 120)
    );
    }
    // If the circuit is closed or half-open, we will try // running the job and catch exceptions.
    try {
    $next($job);
    // If the job passes, we'll close the circuit if it's // open and reset the failures counter. $this->closeCircuit();
    } catch (RequestException $e) {
    if ($e->response->serverError()) {
    $this->handleFailure($job);
    }
    } catch (ConnectionException $e) {
    $this->handleFailure($job);
    }
    }
  • Answer:
    將 circuit pattern 的概念寫成一個 job middleware, 當 error 達到一定程度時打開 circuit breaker, 打開一段時間後啟動 half open 狀態, 放一個 job 過去, 要是該 job 成功, 則關掉 circuit breaker, 要是失敗則重新計算 circuit breaker 時間, 同時根據 exception 的不同使用不同的方式來 handle failure, request exception 或 connection exception
以下的 Laravel job example code 的意思是?
  • Example:
    <?php
    class RateLimitingJobMiddleware
    {
    public $key;

    public function __construct($key)
    {
    $this->key = $key;
    }

    public function handle($job, $next)
    {
    Redis::throttle(
    $this->key
    )
    // ...
    }
    }

    // job class
    public function middleware()
    {
    return [
    new BulkHeadingJobMiddleware('slow_service'),
    new CircuitBreakerJobMiddleware('unstable_service')
    ];
    }
  • Answer:
    可 pass parameter 到 job middleware, 以 limit 特定的 job
以下位於 job 中的 Laravel example code 的意思是?
  • Example:
    <?php
    public function middleware()
    {
    return [
    new RateLimitingJobMiddleware($this->customer->id)
    ];
    }
  • Answer:
    在 job class 中的 middleware method 中可定義該 job 會經過哪一些 middleware, 並且可 pass parameter 到 middleware
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    class RateLimitingJobMiddleware
    {
    public function handle($job, $next)
    {
    Redis::throttle('job-limiter')
    ->allow(5)
    ->every(60)
    ->then(function () use ($job, $next) {
    $next($job);
    }, function () use ($job) {
    $job->release(60);
    });
    }
    }
  • Answer:
    使用 job middleware 來限制最多可以 concurrent 執行的 job 數量, Redis 的 throttle method 會鎖住 job-limiter 字段, 使該字段只能最多 5 個 current job 被 worker pick up, $next($job) 代表執行該 job 的 handle(), 也可透過 property 來取代字段, 這樣就可以限制同類型的 job
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    public function handle()
    {
    if ($lastFailureTimestamp = Cache::get('circuit:open')) {
    if (time() - $lastFailureTimestamp < 8 * 60) {
    return $this->release(
    $lastFailureTimestamp + 600 + rand(1, 120)
    );
    } else {
    $halfOpen = true;
    }
    }
    $response = Http::acceptJson()->timeout(10)
    ->get('...');
    if ($response->serverError()) {
    if (isset($halfOpen)) {
    Cache::put('circuit:open', time(), 600);

    return $this->release(600);
    }
    if (!$failures = Cache::get('failures')) {
    Cache::put('failures', 1, 60);
    } else {
    Cache::increment('failures');
    }
    if (Cache::get('failures') > 10) {
    Cache::put('circuit:open', time(), 600);
    }

    return $this->release(600);
    }
    Cache::forget('failures');
    Cache::forget('circuit:open');
    // Use the response to run the business logic.
    }
  • Answer:
    circuit pattern
    如果第三方服務失敗超過 10 次, 則開啟斷路器, 10 分鐘後再嘗試
    若斷路器已開啟超過 8 分鐘, 先放一個 job 過去試試看, 若成功則關閉斷路器, 失敗則更新斷路器開啟時間, 重新計算 10 分鐘
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    public function handle()
    {
    Redis::throttle($this->report->customer_id)->allow(5)
    ->every(60 * 60)
    ->then(function () {
    $results = ReportGenerator::generate($this->report);
    $this->report->update([
    'status' => 'done', 'results' => $results
    ]);
    }, function () {
    return $this->release(60 * 10);
    });
    }
  • Answer:
    每 1 個小時只允許每個 customer_id 可以最多產生 5 份 report, 如果 5 個 slots 都滿了, 則 acquire lock 失敗的 job 會被在 10 分鐘後 release, 60 分鐘從第一個 slot 被佔據之後開始計算時間
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    Redis::funnel($this->report->customer_id)->releaseAfter(5 * 60)
    ->block(...)
    ->limit(...)
    ->then(...)
  • Answer:
    funnel limiter 預設會在 60 秒後清掉 slot, 若該 job 執行時間超過 60 秒, limiter 會清掉該 slot, 但原本的 job 還在執行中, 這樣會變成又有一個新的 job 可被執行, 這樣就超出了我們預期 limiter 的最大限制, 可使用 releaseAfter 定義 timeout 時間
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    Redis::funnel($this->report->customer_id)->block(5)
    ->limit(...)->then(...);
  • Answer:
    funnel 預設會嘗試 acquire lock 3 秒, 可使用 block() 指定嘗試時間
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    public function handle()
    {
    public $tries = 10;
    public $maxExceptions = 2;
    Redis::funnel($this->report->customer_id)->limit(5)
    ->then(function () {
    $results = ReportGenerator::generate($this->report);
    $this->report->update([
    'status' => 'done', 'results' => $results
    ]);
    }, function () {
    return $this->release(10);
    });
    }
  • Answer:
    限制同一個 customer_id 最多只能同時有 5 筆 job 被執行, 若成功取得 lock 則會執行 closure, 若無法取得 lock (預設 3 秒), 則執行 second closure, 10 秒後可重新被 worker pick up
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    public function handle()
    {
    if ($timestamp = Cache::get('api-limit')) {
    return $this->release(
    $timestamp - time());
    }
    $response = Http::acceptJson()->timeout(10)
    ->withToken('...')->get('https://...');
    if ($response->failed() && $response->status() == 429) {
    $secondsRemaining = $response->header('Retry-After');
    Cache::put('api-limit',
    now()->addSeconds($secondsRemaining)->timestamp,
    $secondsRemaining
    );

    return $this->release($secondsRemaining
    );
    }
    // ...
    }
  • Answer:
    <?php
    public function handle()
    {
    // 所有會呼叫此外部服務的 job 會在第一個判斷重新被 release, 如果該外部服務
    // 目前處於 429 的狀態的話
    // 如果 cache 中存在 api-limit 這個 key, 代表還處於 429 狀態
    // api-limit 的 value 為實際上允許再被存取的時間, 所以這個時間扣掉當前
    // 的時間就會是距離可再被存取還需經過的秒數
    if ($timestamp = Cache::get('api-limit')) {
    return $this->release(
    $timestamp - time());
    }
    $response = Http::acceptJson()->timeout(10)
    ->withToken('...')->get('https://...');

    // 如果 response failed, 且 status 為 429, 代表 too many attempt
    // 得出可再被存取的時間點, 並將這個時間點存去 cache
    if ($response->failed() && $response->status() == 429) {
    $secondsRemaining = $response->header('Retry-After');
    Cache::put('api-limit',
    now()->addSeconds($secondsRemaining)->timestamp,
    $secondsRemaining
    );

    return $this->release($secondsRemaining
    );
    }
    // ...
    }
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    // Controller
    foreach (User::all() as $user) {
    $exchangeRate = Currency::exchangeRateFor($user->currency);
    GenerateInvoice::dispatch($user, $month, $exchangeRate);
    }

    // Currency Class
    public static $rates = [];

    public static function exchangeRateFor($currency)
    {
    if (!isset(static::$rates[$currency])) {
    static::$rates[$currency] = ExchangeRateApi::get('USD', $currency);
    }

    return static::$rates[$currency];
    }

    // Job handle method
    public function handle()
    {
    $amountInLocalCurrency = $amount * $this->exchangeRate;
    $taxInLocalCurrency = ($amount * 14 / 100) * $this->exchangeRate;

    $total = $amountInLocalCurrency + $taxInLocalCurrency;
    Mail::to($this->user)->send("Your usage last month was {$total}{$this->user->currency}");
    }
  • Answer:
    發送月帳單給客戶
    使用 memorization tech 取得 exchange rate, 避免重複呼叫 API
    取得 exchange rate 之後在 pass 到 job, 因為 job 運行在 memory 中的 single instance, 若是在 job 中使用 memorization tech, 那將會一直使用該 instance 中的初始得到的 exchange rate
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    // controller
    foreach (Site::all() as $site) {
    if ($site->current_visitors >= $site->threshold) {
    SendSpikeDetectionNotification::dispatch(
    $site, $site->current_visitors);
    }
    }

    // job class
    private $site;
    private $visitors;

    public function __construct(Conference $site, $visitors)
    {
    $this->site = $site;
    $this->visitors = $visitors;
    }

    public function handle()
    {
    SMS::send(
    $this->site->owner,
    "Spike detected on {$this->site->name}! Current visitors: {$this->visitors}"
    );
    }
  • Answer:
    當偵測到某個 site 的 visitor 已達 threshold, 發送 notification, 且 pass 當時的 value
    因為 job execute 可能會有 delay, 若是在 job 中使用 model, serialization 會讓 job 取得最新的 model 狀態, 因此可能會出現 notification 中的資訊並非當時觸發 threshold 的數字, 而是 delay 之後可能降下來的數字
以下兩個 job class example, 差異處在於?
  • Job Example 1:
    <?php
    private $site;

    public function __construct(Conference $site)
    {
    $this->site = $site;
    }

    public function handle()
    {
    SMS::send(
    $this->site->owner,
    "Spike detected on {$this->site->name}! Current visitors: {$this->site->current_visitors}"
    );
    }
  • Job Example 2:
    <?php
    private $site;
    private $visitors;

    public function __construct(Conference $site, $visitors)
    {
    $this->site = $site;
    $this->visitors = $visitors;
    }

    public function handle()
    {
    SMS::send(
    $this->site->owner,
    "Spike detected on {$this->site->name}! Current visitors: {$this->visitors}"
    );
    }
  • Answer:
    當 job 被 queue 時, serialization 會記下 model identifier 而非 model instance, 因此
    example 1 中, 因為是在 job 中 fetch model attribute, 會取得 model attribute 最新的狀態, 值可能會跟 dispatch 時不同
    example 2 中, 因為是在 job 中取得 job class property, 因此值會跟 dispatch 時一樣
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    public function store()
    {
    DB::transaction(function () use ($conference) {
    $attendee = Attendee::create([
    'conference_id' => $conference->id, 'name' => request('name'),
    'reference' => $reference = Str::uuid() // ...
    ]);
    $invoice = BillingProvider::invoice([
    'customer_reference' => $reference, 'card_token' => request('card_token'), // ...
    ]);
    });
    SendTicketInformation::dispatch($attendee);
    }
  • Answer:
    在 transaction 內, 建立 attendee 並 bill, 如果失敗則回滾並 throw exception, 如果都成功, 則 dispatch SendTicketInformation job, 若要在 transaction 內 dispatch 也可, 只要設定 delay 就可, 確保 transaction commit 完成之後 worker 才去 pick up job
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    Bus::batch($jobs)->allowFailures()
    ->then(...)
    ->catch(...)->finally(function ($batch) {
    $conference = Conference::firstWhere('refunds_batch', '=', $batch->id);
    Mail::to($conference->organizer)->send(
    'Refunding attendees completed!'
    );
    })->dispatch();
  • Answer:
    當所有 job 都執行完畢, 儘管有些 fail, 有些 succeed, 那便會觸發 finally(), 通知 organizer
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    Bus::batch($jobs)
    ->allowFailures()
    ->then(...)
    ->catch(function ($batch, $e) {
    $conference = Conference::firstWhere('refunds_batch', '=', $batch->id);
    Mail::to($conference->organizer)->send(
    'We failed to refund some of the attendees!'
    );
    })
    ->dispatch();
  • Answer:
    當首次有 job fails 時, 會執行 cache() 內的 closure, 通知 organizer, 如果有 job fails 就不會觸發 then()
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    Bus::batch($jobs)->allowFailures()->then(function ($batch) {
    $conference = Conference::firstWhere('refunds_batch', '=', $batch->id);
    Mail::to($conference->organizer)->send(
    'All attendees were refunded successfully!'
    );
    })->dispatch();
  • Answer:
    當 batch 內所有的 job 都 successfully executed, 會執行 then() 內的 closure, 發 mail 通知 organizer
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    $batch = Bus::findBatch($conference->refunds_batch);

    return [
    'progress' => $batch->progress().'%',
    'remaining_refunds' => $batch->pendingJobs,
    'has_failures' => $batch->hasFailures(),
    'is_cancelled' => $batch->canceled()
    ];
  • Answer:
    取得 batch 目前的整體進度 (1-100, int)
    取得目前 pending jobs 的數量 (int)
    取得是否有 failed job (boolean)
    取得該 batch 是否 cancelled (boolean)
以下的 Laravel example command 的意思是?
  • Example:
    php artisan queue:retry-batch {batch_id}
  • Answer:
    使用 CLI retry 指定 batch 中的 failed jobs
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    $batch = Bus::batch($jobs)->allowFailures()->dispatch();
  • Answer:
    當使用 batch dispatch jobs, 預設如果其中一個 job fails, 那該 batch 就會終止, 使用 allowFailures(), 即使有 job fails 該 batch 也會 dispatch 其他 job
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    $batch = Bus::findBatch($conference->refunds_batch);
    $batch->cancel();

    // in job class
    use Batchable;

    public function handle()
    {
    if ($this->batch()->canceled()) {
    return;
    }
    // Actual refunding code here ...
    $this->attendee->update([
    'refunded' => true
    ]);
    }
  • Answer:
    從存在資料庫的 batch_id 取得該 batch, 並執行 cancel(), 該 batch 會被 marked as cancelled
    在 job handle() 中, 判斷如果該 job 的 batch 已經被 marked as cancelled, 那立即 return 該 job, 若否則繼續執行
    藉此當我們 mark batch as cancelled 之後, 所以在 queue 中尚未執行的 job 都會被 cancelled
以下的 Laravel example command 的意思是?
  • Example:
    php artisan queue:batches-table
  • Answer:
    Laravel 會將 batch 資料存在 table, 因此需要建立該 table
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    use Illuminate\Bus\Batchable;

    class RefundAttendee implements ShouldQueue
    {
    use Batchable;
    }

  • Answer:
    若要 job 可被 batch dispatch, 需 use Batchable trait
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    $jobs = $this->conference->attendees->map(function ($attendee) {
    return new RefundAttendee($attendee);
    });
    $batch = Bus::batch($jobs)->dispatch();
    $this->conference->update([
    'refunds_batch' => $batch->id
    ]);
  • Answer:
    使用 batch(), 一次性的 queue 複數的 jobs, 相當於用一句 command 將複數的 job push 到 queue, 而不是分成多句 command
    並將 batch id 儲存在資料庫
以下的 Laravel job example 中, 要是 worker 在取得 lock 之後 crash 了, 那會發生什麼事?
  • Example:
    <?php
    public function handle()
    {
    $invoice = $this->attendee->invoice;

    Cache::lock('refund.'.$invoice->id)
    ->get(function() {
    if (! $invoice->wasRefunded()) {
    $invoice->refund();
    }

    Mail::to($this->attendee)->send(...);
    });
    }
  • Answer:
    因為 lock 並沒有設定持有秒數, 所以會永遠持有, 那當該 job reserved tag 被移除後, 其他 worker 會嘗試 pick up 該 job, 但會卡在 lock 處, 直到 $timeout, 最後耗盡 $tries 次數
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    class RefundAttendee implements ShouldQueue
    {

    public $tries = 3;
    public $timeout = 60;
    public $backoff = 11;

    private $attendee;

    public function __construct(Attendee $attendee)
    {
    $this->attendee = $attendee;
    }

    public function handle()
    {
    $invoice = $this->attendee->invoice;

    Cache::lock('refund.'.$invoice->id, 10)
    ->get(function() {
    if (! $invoice->wasRefunded()) {
    $invoice->refund();
    }

    Mail::to($this->attendee)->send(...);
    });
    }
    }

    // wasRefunded
    public function wasRefunded()
    {
    $response = HTTP::timeout(5)->get('../invoice/'.$id)->throw()
    ->json();

    return $response['invoice']['status'] == 'refunded';
    }
  • Answer:
    使用 lock, 這樣當不小心 dispatch 同一個 job 兩次時, 也不用擔心 worker 會執行兩次
    設定 lock 秒數, 如果沒設定, 假如 worker obtain lock 之後 crash, 那該 job 將永遠不會被執行, 直到 $tries 耗盡, 因為 lock 並沒有釋放
    $backoff 設定 11 秒, 以避免 lock 10 秒期間 worker 不停的嘗試 obtain lock
    在真正 refund 之前, 都呼叫 billing provider 以確定該 invoice 是否已經 refunded, 以避免 refund 兩次
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    // on controller
    $this->conference->attendees->each(function ($attendee) {
    RefundAttendee::dispatch($attendee);
    });

    // on job class
    class RefundAttendee implements ShouldQueue
    {

    public $tries = 3;
    public $timeout = 60;
    private $attendee;

    public function __construct(Attendee $attendee)
    {
    $this->attendee = $attendee;
    }

    public function handle()
    {
    $this->attendee->invoice->refund();
    Mail::to($this->attendee)->send(...);
    }
    }

    // worker
    php artisan queue:work --queue=cancelations,default
    php artisan queue:work --queue=default,cancelations
  • Answer:
    假如一場 conference 忽然要取消, 我取消 refund 這場 conference 中的每一個 attendee, 可將 refund 作業分拆成多個 small job
    使用兩個 worker, 定義不同的 priority 以避免 starvation 的狀況
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    'connections' => [
    'database' => [
    'driver' => 'database',
    'retry_after' => 60,
    ],
    'database_long_running' => [
    'driver' => 'database',
    'retry_after' => 18060,
    ],
    ],
  • Answer:
    retry_after 會將 job 的 reserved tag 移除, 以避免 worker crash 後該 job 始終保持 reserved 狀態而永遠不被執行
    但一個 connection 只能設定一個 retry_after, 所以如果有 job 運行時間會比較久 的需求時, 很可能需要第二個 connection
    或是盡可能地避免這種狀況, 將需要運行比較久的 job 分拆成多個 job
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    // job
    class CancelConference implements ShouldQueue
    {

    public $timeout = 18000;
    }

    // config/queue.php
    'connections' => [
    'database' => [
    'driver' => 'database',
    'table' => 'jobs',
    'queue' => 'default',
    'retry_after' => 18060,
    ],
    ],
  • Answer:
    假如該 job timeout, 給 worker 60 秒的時間完成清理工作, 不管如何, 18060 秒之後, 該 job 都會被移除 reserved tag
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    class ProvisionServer implements ShouldQueue
    {

    private $server;
    private $payload;

    public $tries = 20;
    public $maxExceptions = 3;

    public function __construct(Server $server, $payload)
    {
    $this->server = $server;
    $this->payload = $payload;
    }

    public function handle()
    {
    if (!$this->server->forge_server_id) {
    $response = Http::timeout(5)->post(
    '.../servers', $this->payload)->throw()->json();
    $this->server->update([
    'forge_server_id' => $response['id']
    ]);

    return $this->release(120);
    }
    if ($this->server->stillProvisioning($this->server)) {
    return $this->release(60);
    }
    $this->server->update([
    'is_ready' => true
    ]);
    }

    public function failed(Exception $e)
    {
    Alert::create([
    // ...
    'message' => "Provisioning failed!"
    ]);

    $this->server->delete();
    }

    }
  • Answer:
    如果沒有 forge_server_id, 代表 Forge server 尚未建立, 所以呼叫 Forge API, 建立 server 並取得 server id, 將 job 丟回 queue, 指定 120 秒後再執行一次
    如果 server 處於 provisioning 狀態, 丟回 queue, 60 秒後再執行一次
    建立完成後, 更新 Server model 中的 id_ready 為 true
    該 job 可被執行最多達 20 次, 但如果是 exception, 最多 3 次
    如果失敗, 建立 Alert 並刪除此筆 server record
以下位於 config/queue.php 的 Laravel example code 的意思是?
  • Example:
    <?php
    'connections' => [
    'database' => [
    'driver' => 'database',
    // ...
    'retry_after' => 90,
    ],
    ],
  • Answer:
    當一個 worker pick up 一個 job 時, 會在這個 job mark reserved, 這樣其他 worker 就不會嘗試 pick up 這個 job, 但當一個 worker 在執行這個 job 時 crash 了, 會導致這個 job 的 reserved mark 不會被清除, 導致這個 job 永遠不會被執行。 Laravel 為了預防這一點, 定義了一個 job 最長可以處於 reserved state 多久, 就是 retry_after
Laravel Queue 中, 當一個 worker 執行一個 job 時, 為何其他 worker 不會執行同一個 job?

因為當一個 worker pick up 一個 job 時, 會 mark job as reserved

Laravel Queue 中, 當使用 job 的 retry_until = 120, 代表該 job 在 120 秒後就會立即被執行嗎?

不是哦, 這只代表該 job 在 release 或 dispatch 後的 120 秒內不會被執行, 如果 worker 都很忙, 10 分鐘後才被執行也是有可能的

以下的 Laravel example code 的意思是?
  • Example:
    <?php
    public $tries = 0;

    public function handle()
    {
    $response = Http::timeout(...)->post(...);

    if ($response->failed()) {
    $this->release(
    now()->addMinutes(15 * $this->attempts()));
    }
    }

    public function retryUntil()
    {
    return now()->addDay();
    }

    public function failed(Exception $e)
    {
    Mail::to($this->integration->developer_email
    )->send(...);
    }
  • Answer:
    如果 request 失敗的話, 逐次的遞增再次執行該 job 的時間間隔, 最多嘗試一天(會從第一次 dispatch 時開始計算時間), 在一天內可以無限次數嘗試(依照定義的時間間隔頻率), 若超過一天則視為 failed job
    失敗的話, 採取相對應動作
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    class CheckoutController
    {

    public function store()
    {
    $order = Order::create([
    'status' => Order::PENDING, // ...
    ]);
    MonitorPendingOrder::dispatch($order)->delay(900);
    // also ... delay(now()->addMinutes(15))
    }
    }

    // MonitorPendingOrder class
    public $tries = 4;

    public function handle()
    {
    if ($this->order->status == Order::CONFIRMED || $this->order->status == Order::CANCELED) {
    return;
    }
    if ($this->order->olderThan(59, 'minutes')) {
    $this->order->markAsCanceled();

    return;
    }
    SMS::send(...);
    $this->release(now()->addMinutes(15)
    );
    }
  • Answer:
    常常使用者建立訂單後, 忽然決定不買了, 但也不會手動取消這張訂單
    如上 example 邏輯
    訂單建立後每 15 分鐘提醒使用者有這張訂單存在
    若過了 59 分鐘後, 訂單並未被 confirmed, 也未 cancelled, 這時 job 就取消這張訂單
    public $tries 確保這個 job 能被執行 4 次, 這邊 4 次還是有點太少了, 可以在設大一點
以下的 Laravel example command 的意思是?
  • Example:
    php artisan queue:work --backoff=60,120
  • Answer:
    定義該 worker 在嘗試 job 的時間間隔, 比如說嘗試執行 job A 失敗了, 第一次要間隔 60 秒之後才可嘗試再次執行, 第二次之後都要間隔至少 120 秒
    Laravel 8 之前又叫做 –delay
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    namespace App\Jobs;

    class SendVerificationMessage implements ShouldQueue
    {

    public $tries = 3;
    public $backoff = [60, 120];
    }
  • Answer:
    $tries 定義該 job 最多會被執行 3 次, release() 也算一次, 如果 3 次過了還在 queue 當中, 那就會被放到 failed_job 當中
    $backoff 定義每次嘗試執行 job 的時間間隔, 第一次為 60 秒, 之後都間隔 120 秒, Laravel 8.0 之前叫做 retryAfter
以下的 Laravel example code 的意思是?
  • Example:
    php artisan queue:work --queue=payments,default 
    php artisan queue:work --queue=default,payments
  • Answer:
    讓兩個 worker 分別執行兩個 queue, 以避免一個執行完就處於 idle 狀態
以下的 Laravel example code 的意思是?
  • Example:
    <?php
    public function boot()
    {
    Event::listen(function (QueueBusy $event) {
    Notification::route('mail', 'dev@example.com')
    ->notify(new QueueHasLongWaitTime(
    $event->connection,
    $event->queue,
    $event->size
    ));
    });
    }
  • Answer:
    當使用 queue:monitor 檢查到 queue 的 job size 超過 threshold, 且觸發 QueueBusy event, 可監聽此事件來做通知
以下的 Laravel example command 的意思是?
  • Example:
    php artisan queue:monitor redis:default,redis:deployments --max=100 --threshold=8
  • Answer:
    列出指定 queue 中的 job size, 若超過 threshold 則觸發 QueueBusy event
GKE Migrating to Containers Docker 實戰入門

留言

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×