隊列
隊列
隊列
簡介
{tip} Laravel 現(xiàn)在為你的 Redis 隊列 提供了 Horizon,一個漂亮的儀表盤和配置系統(tǒng)。查看完整的 Horizon documentation 文檔 了解更多信息。
Laravel 隊列為不同的后臺隊列服務提供統(tǒng)一的 API,例如 Beanstalk,Amazon SQS,Redis,甚至其他基于關系型數(shù)據(jù)庫的隊列。隊列的目的是將耗時的任務延時處理,比如發(fā)送郵件,從而大幅度縮短 Web 請求和響應的時間。
隊列配置文件存放在 config/queue.php
文件中。每一種隊列驅動的配置都可以在該文件中找到,包括數(shù)據(jù)庫, Beanstalkd, Amazon SQS, Redis,以及同步(本地使用)驅動。其中還包含了一個 null
隊列驅動用于那些放棄隊列的任務。
連接 Vs. 隊列
在開始使用 Laravel 隊列前,弄明白 「連接」 和 「隊列」 的區(qū)別是很重要的。在你的 config/queue.php
配置文件里,有一個 connections
配置選項。這個選項給 Amazon SQS,Beanstalk,或者 Redis 這樣的后端服務定義了一個特有的連接。不管是哪一種,一個給定的連接可能會有多個 「隊列」,而 「隊列」 可以被認為是不同的?;蛘叽罅康年犃腥蝿?。
要注意的是,queue
配置文件中每個連接的配置示例中都包含一個 queue
屬性。這是默認隊列任務被發(fā)給指定連接的時候會被分發(fā)到這個隊列中。換句話說,如果你分發(fā)任務的時候沒有顯式定義隊列,那么它就會被放到連接配置中 queue
屬性所定義的隊列中:
// 這個任務將被分發(fā)到默認隊列... Job::dispatch(); // 這個任務將被發(fā)送到「emails」隊列... Job::dispatch()->onQueue('emails');
有些應用可能不需要把任務發(fā)到不同的隊列,而只發(fā)到一個簡單的隊列中就行了。但是把任務推到不同的隊列仍然是非常有用的,因為 Laravel 隊列處理器允許你定義隊列的優(yōu)先級,所以你能給不同的隊列劃分不同的優(yōu)先級或者區(qū)分不同任務的不同處理方式了。比如說,如果你把任務推到 high
隊列中,你就能讓隊列處理器優(yōu)先處理這些任務了:
php artisan queue:work --queue=high,default
驅動的必要設置
Database
為了使用 database
隊列驅動,你需要一張數(shù)據(jù)表來存儲任務。運行 queue:table
Artisan 命令來創(chuàng)建這張表的遷移文件。當遷移文件創(chuàng)建好后,你就可以使用 migrate
命令來進行遷移:
php artisan queue:table php artisan migrate
Redis
為了使用 redis
隊列驅動,你需要在 config/database.php
配置文件中配置 Redis 的數(shù)據(jù)庫連接。
Redis 集群
如果你的 Redis 隊列驅動使用了 Redis 集群,你的隊列名必須包含一個 key hash tag 。這是為了確保所有的 Redis 鍵對于一個隊列都被放在同一哈希中。
'redis' => [ 'driver' => 'redis', 'connection' => 'default', 'queue' => '{default}', 'retry_after' => 90, ],
阻塞
當使用 Redis 隊列時,你可以用 block_for
配置項來具體說明驅動應該在將任務重新放入 Redis 數(shù)據(jù)庫以及處理器輪詢之前阻塞多久。
基于你的隊列加載來調整這個值比把新任務放入 Redis 數(shù)據(jù)庫輪詢要更有效率的多。例如,你可以將這個值設置為 5
來表明這個驅動應該在等待任務可用時阻塞 5 秒。
'redis' => [ 'driver' => 'redis', 'connection' => 'default', 'queue' => 'default', 'retry_after' => 90, 'block_for' => 5, ],
其它隊列驅動的依賴擴展包
在使用列表里的隊列服務前,必須安裝以下依賴擴展包:
- Amazon SQS:
aws/aws-sdk-php ~3.0
- Beanstalkd:
pda/pheanstalk ~4.0
- Redis:
predis/predis ~1.0
創(chuàng)建任務
生成任務類
在你的應用程序中,隊列的任務類都默認放在 app/Jobs
目錄下。如果這個目錄不存在,那當你運行 make:job
Artisan 命令時目錄就會被自動創(chuàng)建。你可以用以下的 Artisan 命令來生成一個新的隊列任務:
php artisan make:job ProcessPodcast
生成的類實現(xiàn)了 Illuminate\Contracts\Queue\ShouldQueue
接口,這意味著這個任務將會被推送到隊列中,而不是同步執(zhí)行。
任務類結構
任務類的結構很簡單,一般來說只會包含一個讓隊列用來調用此任務的 handle
方法。我們來看一個示例的任務類。這個示例里,假設我們管理著一個播客發(fā)布服務,在發(fā)布之前需要處理上傳播客文件:
<?php namespace App\Jobs; use App\Podcast; use App\AudioProcessor; use Illuminate\Bus\Queueable; use Illuminate\Queue\SerializesModels; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; class ProcessPodcast implements ShouldQueue{ use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; protected $podcast; /** * 創(chuàng)建一個新的任務實例。 * * @param Podcast $podcast * @return void */ public function __construct(Podcast $podcast) { $this->podcast = $podcast; } /** * 運行任務。 * * @param AudioProcessor $processor * @return void */ public function handle(AudioProcessor $processor) { // Process uploaded podcast... } }
注意,在這個例子中,我們在任務類的構造器中直接傳遞了一個 Eloquent 模型 。因為我們在任務類里引用了 SerializesModels
這個 trait,使得 Eloquent 模型在處理任務時可以被優(yōu)雅地序列化和反序列化。如果你的隊列任務類在構造器中接收了一個 Eloquent 模型,那么只有可識別出該模型的屬性會被序列化到隊列里。當任務被實際運行時,隊列系統(tǒng)便會自動從數(shù)據(jù)庫中重新取回完整的模型。這整個過程對你的應用程序來說是完全透明的,這樣可以避免在序列化完整的 Eloquent 模式實例時所帶來的一些問題。
在隊列處理任務時,會調用 handle
方法,而這里我們也可以通過 handle
方法的參數(shù)類型提示,讓 Laravel 的 服務容器 自動注入依賴對象。
如果你想完全控制容器如何將依賴對象注入至 handle
方法,可以使用容器的 bindMethod
方法。bindMethod
方法接受一個任務和容器的回調。雖然可以直接在回調中可以調用 handle
方法,但建議應該從 service provider 調用為佳:
use App\Jobs\ProcessPodcast; $this->app->bindMethod(ProcessPodcast::class.'@handle', function ($job, $app) { return $job->handle($app->make(AudioProcessor::class)); });
{note} 像圖片內(nèi)容這種二進制數(shù)據(jù),在放入隊列任務之前必須使用
base64_encode
方法轉換一下。否則,當這項任務放置到隊列中時,可能無法正確序列化為 JSON。
分發(fā)任務
一旦你寫完了你的任務類你就可以使用它自帶的 dispatch
方法分發(fā)它。傳遞給 dispatch
方法的參數(shù)將會被傳遞給任務的構造函數(shù):
<?php namespace App\Http\Controllers; use App\Jobs\ProcessPodcast; use Illuminate\Http\Request; use App\Http\Controllers\Controller; class PodcastController extends Controller{ /** * 存儲一個新的播客節(jié)目。 * * @param Request $request * @return Response */ public function store(Request $request) { // 創(chuàng)建播客... ProcessPodcast::dispatch($podcast); } }
延遲分發(fā)
如果你想延遲你的隊列任務的執(zhí)行,你可以在分發(fā)任務的時候使用 delay
方法。例如,讓我們詳細說明一個十分鐘之后才會執(zhí)行的任務:
<?php namespace App\Http\Controllers; use App\Jobs\ProcessPodcast; use Illuminate\Http\Request; use App\Http\Controllers\Controller;class PodcastController extends Controller{ /** * 存儲一個新的播客節(jié)目。 * * @param Request $request * @return Response */ public function store(Request $request) { // 創(chuàng)建播客... ProcessPodcast::dispatch($podcast) ->delay(now()->addMinutes(10)); } }
{note} Amazon SQS 隊列服務最大延遲 15 分鐘的時間。
同步調度
如果您想立即(同步)執(zhí)行隊列任務,可以使用 dispatchNow
方法。 使用此方法時,隊列任務將不會排隊,并立即在當前進程中運行:
<?php namespace App\Http\Controllers; use Illuminate\Http\Request; use App\Jobs\ProcessPodcast; use App\Http\Controllers\Controller; class PodcastController extends Controller{ /** * Store a new podcast. * * @param Request $request * @return Response */ public function store(Request $request) { // Create podcast... ProcessPodcast::dispatchNow($podcast); } }
工作鏈
工作鏈允許你具體定義一個按序列執(zhí)行隊列任務的列表。一旦序列中的任務失敗了,剩余的工作將不會執(zhí)行。要運行一個工作鏈,你可以對可分發(fā)的任務使用 withChain
方法:
ProcessPodcast::withChain([ new OptimizePodcast, new ReleasePodcast ])->dispatch();
{note} 使用
$this->delete()
方法刪除隊列任務不會阻止工作鏈任務執(zhí)行。只有當工作鏈中的任務執(zhí)行失敗時,工作鏈才會停止執(zhí)行。
工作鏈連接 & 隊列
如果你想定義用于工作鏈的默認連接和隊列,你可以使用 allOnConnection
和 allOnQueue
方法。 這些方法指定了所需隊列的連接和隊列 —— 除非隊列任務被明確指定給了不同的連接 / 隊列:
ProcessPodcast::withChain([ new OptimizePodcast, new ReleasePodcast ])->dispatch()->allOnConnection('redis')->allOnQueue('podcasts');
自定義連接 & 隊列
分發(fā)任務到指定隊列
通過將任務分發(fā)到不同隊列,你可以將你的隊列任務「分類」,甚至指定給不同隊列分配的任務數(shù)量。記住,這不是推送任務到你定義的隊列配置文件的不同的連接里,而是一個單一的連接。要指定隊列,在分發(fā)任務時使用 onQueue
方法:
<?php namespace App\Http\Controllers; use App\Jobs\ProcessPodcast; use Illuminate\Http\Request; use App\Http\Controllers\Controller; class PodcastController extends Controller{ /** * 存儲一個新的播客節(jié)目。 * * @param Request $request * @return Response */ public function store(Request $request) { // 創(chuàng)建播客... ProcessPodcast::dispatch($podcast)->onQueue('processing'); } }
分發(fā)任務到指定連接
如果你在多隊列連接中工作,你可以指定將任務分發(fā)到哪個連接。要指定連接,在分發(fā)任務時使用 onConnection
方法:
<?php namespace App\Http\Controllers; use App\Jobs\ProcessPodcast; use Illuminate\Http\Request; use App\Http\Controllers\Controller; class PodcastController extends Controller{ /** * 存儲一個新播客節(jié)目。 * * @param Request $request * @return Response */ public function store(Request $request) { // 創(chuàng)建播客... ProcessPodcast::dispatch($podcast)->onConnection('sqs'); } }
當然,你可以鏈式調用 onConnection
和 onQueue
方法來指定連接和隊列。
ProcessPodcast::dispatch($podcast) ->onConnection('sqs') ->onQueue('processing');
指定最大任務嘗試次數(shù) / 超時值
最大嘗試次數(shù)
在一個任務重指定最大嘗試次數(shù)可以通過 Artisan 命令的 --tries
選項 指定:
php artisan queue:work --tries=3
你可能想通過任務類自身對最大任務嘗試次數(shù)進行一個更顆?;奶幚?。如果最大嘗試次數(shù)是在任務類中定義的,它將優(yōu)先于命令行中的值提供:
<?phpnamespace App\Jobs;class ProcessPodcast implements ShouldQueue{ /** * 任務可以嘗試的最大次數(shù)。 * * @var int */ public $tries = 5;}
基于時間的嘗試
作為另外一個選擇來定義任務在失敗前會嘗試多少次,你可以定義一個任務超時時間。這樣的話,在給定的時間范圍內(nèi),任務可以無限次嘗試。要定義一個任務的超時時間,在你的任務類中新增一個 retryUntil
方法:
/** * 定義任務超時時間 * * @return \DateTime */ public function retryUntil(){ return now()->addSeconds(5); }
{tip} 你也可以在你的隊列事件監(jiān)聽器中使用
retryUntil
方法。
超時
{note}
timeout
特性對于 PHP 7.1+ 和pcntl
PHP 擴展進行了優(yōu)化.
同樣的,任務執(zhí)行最大秒數(shù)的數(shù)值可以通過 Artisan 命令行的 --timeout
選項指定。
php artisan queue:work --timeout=30
然而,你可能也想在任務類自身定義一個超時時間。如果在任務類中指定,優(yōu)先級將會高于命令行:
<?php namespace App\Jobs;class ProcessPodcast implements ShouldQueue{ /** * 任務可以執(zhí)行的最大秒數(shù) (超時時間)。 * * @var int */ public $timeout = 120;}
頻率限制
{note} 這個特性要求你的應用可以使用 Redis 服務器.
如果你的應用使用了 Redis,你可以通過時間或并發(fā)限制你的隊列任務。當你的隊列任務通過同樣有速率限制的 API 使用時,這個特性將很有幫助。
例如,使用 throttle
方法,你可以限制一個給定類型的任務每 60 秒只執(zhí)行 10 次。如果沒有獲得鎖,一般情況下你應該將任務放回隊列以使其可以被稍后重試。
Redis::throttle('key')->allow(10)->every(60)->then(function () { // 任務邏輯... }, function () { // 無法獲得鎖... return $this->release(10); });
{tip} 在上述的例子里,
key
可以是任何你想要限制頻率的任務類型的唯一識別字符串。例如,使用構件基于任務類名的 key,或它操作的 Eloquent 模型的 ID。{note} 將受限制的作業(yè)釋放回隊列,仍然會增加工作的總數(shù)
attempts
。
或者,你可以指定一個任務可以同時執(zhí)行的最大數(shù)量。在如下情況時這會很有用處:當一個隊列中的任務正在修改資源時,一次只能被一個任務修改。例如,使用 funnel
方法,你可以限制一個給定類型的任務一次只能執(zhí)行一個處理器:
Redis::funnel('key')->limit(1)->then(function () { // 任務邏輯...}, function () { // 無法獲得鎖... return $this->release(10); });
{tip} 當使用頻率限制時,任務執(zhí)行成功的嘗試的次數(shù)可能會難以確定。所以,將頻率限制與 時間限制 組合是很有作用的。
錯誤處理
如果在任務執(zhí)行的時候出現(xiàn)異常,任務會被自動釋放到隊列中以再次嘗試。任務將會一直被釋放直到達到應用允許的最大重試次數(shù)。最大重試的數(shù)值由 queue:work
Artisan 命令的 --tries
選項定義,或者在任務類中定義。更多執(zhí)行隊列處理器的信息可以 在以下找到 。
排隊閉包
你也可以直接調用閉包,而不是將任務類調度到隊列中。這對于需要執(zhí)行的快速、簡單的任務非常有用:
$podcast = App\Podcast::find(1); dispatch(function () use ($podcast) { $podcast->publish(); });
將閉包分派給隊列時,閉包的代碼內(nèi)容將以加密方式簽名,因此無法在傳輸過程中對其進行修改。
運行隊列處理器
Laravel 包含了一個隊列處理器以將推送到隊列中的任務執(zhí)行。你可以使用 queue:work
Artisan 命令運行處理器。 注意一旦 queue:work
命令開始執(zhí)行,它會一直運行直到它被手動停止或終端被關閉。
php artisan queue:work
{tip} 要使
queue:work
進程一直在后臺運行,你應該使用進程管理器比如 Supervisor 來確保隊列處理器不會停止運行
記住,隊列處理器是一個常駐的進程并且在內(nèi)存中保存著已經(jīng)啟動的應用狀態(tài)。因此,它們并不會在啟動后注意到你代碼的更改。所以,在你的重新部署過程中,請記得 重啟你的隊列處理器.
指定連接 & 隊列
你也可以具體說明隊列處理器應該使用哪個隊列連接。 傳遞給 work
的連接名應該與你的 config/queue.php
配置文件中定義的連接之一相符。
php artisan queue:work redis
你甚至可以自定義你的隊列處理器使其只執(zhí)行連接中指定的隊列。例如,如果你的所有郵件都由 redis
連接的 emails
隊列處理,你可以使用如下的命令啟動一個僅執(zhí)行此隊列的處理器:
php artisan queue:work redis --queue=emails
執(zhí)行單一任務
--once
選項用于使隊列處理器只處理隊列中的單一任務。
php artisan queue:work --once
處理所有隊列的任務然后退出
--stop-when-empty
選項可用于處理隊列處理器處理所有作業(yè)然后優(yōu)雅地退出。如果您希望在隊列為空后關閉容器,則在 Docker 容器中運行 Laravel 隊列時,此選項很有用:
php artisan queue:work --stop-when-empty
資源注意事項
后臺駐留的隊列處理器不會在執(zhí)行完每個任務后「重啟」框架。因此,你應該在每個任務完成后釋放任何占用過大的資源。例如,如果你正在用 GD 庫執(zhí)行圖像處理,你應該在完成后使用 imagedestroy
釋放內(nèi)存。
隊列優(yōu)先級
有時你可能想確定隊列執(zhí)行的優(yōu)先順序。例如在 config/queue.php
中你可以將 redis
連接的 queue
隊列的優(yōu)先級從 default
設置為 low
。然而, 偶爾你也想像如下方式將一個任務推送到 high
隊列:
dispatch((new Job)->onQueue('high'));
要運行一個處理器來確認 low
隊列中的任務在全部的 high
隊列任務完成后才繼續(xù)執(zhí)行,你可以傳遞一個逗號分隔的隊列名列表作為 work
命令的參數(shù)。
php artisan queue:work --queue=high,low
隊列處理器 & 部署
因為隊列處理器是常駐進程,他們在重啟前不會應用你代碼的更改。因此,部署使用隊列處理器的應用最簡單的方法是在部署進程中重啟隊列處理器。你可以平滑地重啟所有隊列處理器通過使用 queue:restart
方法:
php artisan queue:restart
這個命令將會引導所有的隊列處理器在完成當前任務后平滑「中止」,這樣不會有丟失的任務。由于在執(zhí)行 queue:restart
后隊列處理器將會中止,所以你應該運行一個進程管理器例如 Supervisor 來自動重啟隊列處理器。
{tip} 隊列使用 緩存 存儲重啟信號,所以你應該確定在使用這個功能之前配置好緩存驅動。
任務過期 & 超時
任務過期
在你的 config/queue.php
配置文件中,每個隊列連接都定義了一個 retry_after
選項。這個選項指定了隊列連接在重試一個任務前應該等它執(zhí)行多久。例如,如果 retry_after
的值設置為 90
,那么任務在執(zhí)行了 90 秒后將會被放回隊列而不是刪除它。一般情況下,你應該將 retry_after
的值設置為你認為你的任務可能會執(zhí)行需要最長時間的值。
{note} 只有在 Amazon SQS 中不存在
retry_after
這個值。 SQS 將會以 AWS 控制臺配置的 默認可見超時值 作為重試任務的依據(jù)。
處理器超時
queue:work
Artisan 命令包含一個 --timeout
選項。 --timeout
選項指定了 Laravel 的隊列主進程在中止一個執(zhí)行任務的子進程之前需要等到多久。有時一個子進程可能會因為各種原因「凍結」,比如一個外部的 HTTP 請求失去響應。 --timeout
選項會移除那些超過指定時間被凍結的進程。
php artisan queue:work --timeout=60
retry_after
配置項和 --timeout
命令行配置并不同,但將它們同時使用可以確保任務不會丟失并且任務只會成功執(zhí)行一次。
{note}
--timeout
的值應該比你在retry_after
中配置的值至少短幾秒。這會確保處理器永遠會在一個任務被重試之前中止。如果你的--timeout
值比retry_after
的值長的話,你的任務可能會被執(zhí)行兩次。
隊列進程睡眠時間
當任務在隊列中可用時,處理器將會一直無間隔地處理任務。 然而, sleep
選項定義了如果沒有新任務的時候處理器將會「睡眠」多長時間。在處理器睡眠時,它不會處理任何新任務 —— 任務將會在隊列處理器再次啟動后執(zhí)行。
php artisan queue:work --sleep=3
Supervisor 配置
安裝 Supervisor
Supervisor 是 Linux 操作系統(tǒng)下中的一個進程監(jiān)控器,它可以在 queue:work
掛掉時自動重啟之。在 Ubuntu 上安裝 Supervisor,你可以使用如下命令:
sudo apt-get install supervisor
{小提醒} 如果覺得配置 Supervisor 難于登天,可以考慮使用 Laravel Forge,它將自動為你的 Laravel 項目安裝和配置 Supervisor。
配置 Supervisor
Supervisor 的配置文件通常位于 /etc/supervisor/conf.d
目錄下。在該目錄中,你可以創(chuàng)建任意數(shù)量的配置文件,用來控制 supervisor 將如何監(jiān)控你的進程。例如,創(chuàng)建一個 laravel-worker.conf
文件使之啟動和監(jiān)控一個 queue:work
進程:
[program:laravel-worker] process_name=%(program_name)s_%(process_num)02d command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 autostart=true autorestart=true user=forge numprocs=8 redirect_stderr=true stdout_logfile=/home/forge/app.com/worker.log
在這個例子中,numprocs
指令將指定 Supervisor 運行 8 個 queue:work
進程并對其進行監(jiān)控,如果它們掛掉就自動重啟它們。你應該更改 command
選項中的 queue:work sqs
部分以表示你所需的隊列連接。
啟動 Supervisor
配置文件創(chuàng)建完畢后,你就可以使用如下命令更新 Supervisor 配置并啟動進程了:
sudo supervisorctl reread sudo supervisorctl update sudo supervisorctl start laravel-worker:*
獲取關于 Supervisor 的更多信息,可以查閱 Supervisor 文檔.
處理失敗的任務
有時你的隊列化任務會執(zhí)行失敗。放平心態(tài),好事多磨。 Laravel 包含了一種方便的方法來指定任務應該嘗試的最大次數(shù)。如果一個任務已經(jīng)到達了最大嘗試次數(shù),它就會被插入到 failed_jobs
數(shù)據(jù)庫表中。要創(chuàng)建 failed_jobs
數(shù)據(jù)庫遷移表,你可以使用 queue:failed-table
命令:
php artisan queue:failed-table php artisan migrate
然后,當你運行 queue worker,你應該使用 queue:work
命令中的 --tries
開關指定應嘗試運行任務的最大次數(shù)。 如果沒有為 --tries
選項指定值,則將死循環(huán)嘗試運行任務:
php artisan queue:work redis --tries=3
任務失敗后清理
你可以直接在任務類中定義 failed
方法,允許你在任務失敗時執(zhí)行針對于該任務的清理工作。 這是向用戶發(fā)送警報或恢復任務執(zhí)行的任何操作的絕佳位置。導致任務失敗的 Exception
將被傳遞給 failed
方法:
<?php namespace App\Jobs; use Exception;use App\Podcast; use App\AudioProcessor; use Illuminate\Bus\Queueable; use Illuminate\Queue\SerializesModels; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Contracts\Queue\ShouldQueue; class ProcessPodcast implements ShouldQueue{ use InteractsWithQueue, Queueable, SerializesModels; protected $podcast; /** * 創(chuàng)建任務實例 * * @param Podcast $podcast * @return void */ public function __construct(Podcast $podcast) { $this->podcast = $podcast; } /** * 執(zhí)行任務 * * @param AudioProcessor $processor * @return void */ public function handle(AudioProcessor $processor) { // 上傳播客…… } /** * 任務失敗的處理過程 * * @param Exception $exception * @return void */ public function failed(Exception $exception) { // 給用戶發(fā)送任務失敗的通知,等等…… } }
任務失敗事件
如果你想在任務失敗時注冊一個可調用的事件,你可以使用 Queue::failing
方法。該事件是通過 email 或 Slack 通知你團隊的絕佳時機。例如,我們可以在 Laravel 中的 AppServiceProvider
中附加一個回調事件:
<?php namespace App\Providers; use Illuminate\Support\Facades\Queue; use Illuminate\Queue\Events\JobFailed; use Illuminate\Support\ServiceProvider; class AppServiceProvider extends ServiceProvider{ /** * 啟動任意服務。 * * @return void */ public function boot() { Queue::failing(function (JobFailed $event) { // $event->connectionName // $event->job // $event->exception }); } /** * 注冊服務提供者。 * * @return void */ public function register() { // } }
重試失敗的任務
要想查看所有被放入 failed_jobs
數(shù)據(jù)表中的任務,你可以使用 Artisan 命令 queue:failed
:
php artisan queue:failed
queue:failed
命令會列出任務 ID ,隊列,以及失敗的時間。任務 ID 可能會被用于重試失敗的任務。例如,要重試一個任務 ID 為 5
的任務,使用如下命令:
php artisan queue:retry 5
要重試所有失敗的任務,執(zhí)行 queue:retry
命令,將 all
作為 ID 傳入:
php artisan queue:retry all
如果你想刪除一個失敗的任務,使用 queue:forget
命令:
php artisan queue:forget 5
要清空所有失敗的任務,使用 queue:flush
命令:
php artisan queue:flush
忽略缺失的模型
在向任務中注入 Eloquent 模型時,模型被放入隊列前將被自動序列化并在執(zhí)行任務時還原。但是,如果在任務等待執(zhí)行時刪除了模型,任務可能會失敗并拋出 ModelNotFoundException
。
為了方便,你可以選擇設置任務的 deleteWhenMissingModels
屬性為 true
來自動地刪除缺失模型的任務。
/** * 如果模型缺失即刪除任務。 * * @var bool */ public $deleteWhenMissingModels = true;
任務事件
通過在 Queue
facade 中使用 before
和 after
方法,你可以指定一個隊列任務被執(zhí)行前后的回調。這些回調是添加額外的日志或增加統(tǒng)計的絕好時機。通常,你應該在 服務提供者中調用這些方法。例如,我們可以使用 Laravel 的 AppServiceProvider
:
<?php namespace App\Providers; use Illuminate\Support\Facades\Queue; use Illuminate\Support\ServiceProvider; use Illuminate\Queue\Events\JobProcessed; use Illuminate\Queue\Events\JobProcessing; class AppServiceProvider extends ServiceProvider{ /** * 引導啟動任意應用服務。 * * @return void */ public function boot() { Queue::before(function (JobProcessing $event) { // $event->connectionName // $event->job // $event->job->payload() }); Queue::after(function (JobProcessed $event) { // $event->connectionName // $event->job // $event->job->payload() }); } /** * 注冊服務提供者。 * * @return void */ public function register() { // } }
在 Queue
facade 使用 looping
方法可以在處理器嘗試獲取任務之前執(zhí)行回調。例如,你也許想用一個閉包來回滾之前失敗的任務尚未關閉的事務:
Queue::looping(function () { while (DB::transactionLevel() > 0) { DB::rollBack(); } });