隊(duì)列
隊(duì)列
隊(duì)列
簡(jiǎn)介
{tip} Laravel 現(xiàn)在為你的 Redis 隊(duì)列 提供了 Horizon,一個(gè)漂亮的儀表盤(pán)和配置系統(tǒng)。查看完整的 Horizon documentation 文檔 了解更多信息。
Laravel 隊(duì)列為不同的后臺(tái)隊(duì)列服務(wù)提供統(tǒng)一的 API,例如 Beanstalk,Amazon SQS,Redis,甚至其他基于關(guān)系型數(shù)據(jù)庫(kù)的隊(duì)列。隊(duì)列的目的是將耗時(shí)的任務(wù)延時(shí)處理,比如發(fā)送郵件,從而大幅度縮短 Web 請(qǐng)求和響應(yīng)的時(shí)間。
隊(duì)列配置文件存放在 config/queue.php
文件中。每一種隊(duì)列驅(qū)動(dòng)的配置都可以在該文件中找到,包括數(shù)據(jù)庫(kù), Beanstalkd, Amazon SQS, Redis,以及同步(本地使用)驅(qū)動(dòng)。其中還包含了一個(gè) null
隊(duì)列驅(qū)動(dòng)用于那些放棄隊(duì)列的任務(wù)。
連接 Vs. 隊(duì)列
在開(kāi)始使用 Laravel 隊(duì)列前,弄明白 「連接」 和 「隊(duì)列」 的區(qū)別是很重要的。在你的 config/queue.php
配置文件里,有一個(gè) connections
配置選項(xiàng)。這個(gè)選項(xiàng)給 Amazon SQS,Beanstalk,或者 Redis 這樣的后端服務(wù)定義了一個(gè)特有的連接。不管是哪一種,一個(gè)給定的連接可能會(huì)有多個(gè) 「隊(duì)列」,而 「隊(duì)列」 可以被認(rèn)為是不同的?;蛘叽罅康年?duì)列任務(wù)。
要注意的是,queue
配置文件中每個(gè)連接的配置示例中都包含一個(gè) queue
屬性。這是默認(rèn)隊(duì)列任務(wù)被發(fā)給指定連接的時(shí)候會(huì)被分發(fā)到這個(gè)隊(duì)列中。換句話說(shuō),如果你分發(fā)任務(wù)的時(shí)候沒(méi)有顯式定義隊(duì)列,那么它就會(huì)被放到連接配置中 queue
屬性所定義的隊(duì)列中:
// 這個(gè)任務(wù)將被分發(fā)到默認(rèn)隊(duì)列... Job::dispatch(); // 這個(gè)任務(wù)將被發(fā)送到「emails」隊(duì)列... Job::dispatch()->onQueue('emails');
有些應(yīng)用可能不需要把任務(wù)發(fā)到不同的隊(duì)列,而只發(fā)到一個(gè)簡(jiǎn)單的隊(duì)列中就行了。但是把任務(wù)推到不同的隊(duì)列仍然是非常有用的,因?yàn)?Laravel 隊(duì)列處理器允許你定義隊(duì)列的優(yōu)先級(jí),所以你能給不同的隊(duì)列劃分不同的優(yōu)先級(jí)或者區(qū)分不同任務(wù)的不同處理方式了。比如說(shuō),如果你把任務(wù)推到 high
隊(duì)列中,你就能讓隊(duì)列處理器優(yōu)先處理這些任務(wù)了:
php artisan queue:work --queue=high,default
驅(qū)動(dòng)的必要設(shè)置
Database
為了使用 database
隊(duì)列驅(qū)動(dòng),你需要一張數(shù)據(jù)表來(lái)存儲(chǔ)任務(wù)。運(yùn)行 queue:table
Artisan 命令來(lái)創(chuàng)建這張表的遷移文件。當(dāng)遷移文件創(chuàng)建好后,你就可以使用 migrate
命令來(lái)進(jìn)行遷移:
php artisan queue:table php artisan migrate
Redis
為了使用 redis
隊(duì)列驅(qū)動(dòng),你需要在 config/database.php
配置文件中配置 Redis 的數(shù)據(jù)庫(kù)連接。
Redis 集群
如果你的 Redis 隊(duì)列驅(qū)動(dòng)使用了 Redis 集群,你的隊(duì)列名必須包含一個(gè) key hash tag 。這是為了確保所有的 Redis 鍵對(duì)于一個(gè)隊(duì)列都被放在同一哈希中。
'redis' => [ 'driver' => 'redis', 'connection' => 'default', 'queue' => '{default}', 'retry_after' => 90, ],
阻塞
當(dāng)使用 Redis 隊(duì)列時(shí),你可以用 block_for
配置項(xiàng)來(lái)具體說(shuō)明驅(qū)動(dòng)應(yīng)該在將任務(wù)重新放入 Redis 數(shù)據(jù)庫(kù)以及處理器輪詢之前阻塞多久。
基于你的隊(duì)列加載來(lái)調(diào)整這個(gè)值比把新任務(wù)放入 Redis 數(shù)據(jù)庫(kù)輪詢要更有效率的多。例如,你可以將這個(gè)值設(shè)置為 5
來(lái)表明這個(gè)驅(qū)動(dòng)應(yīng)該在等待任務(wù)可用時(shí)阻塞 5 秒。
'redis' => [ 'driver' => 'redis', 'connection' => 'default', 'queue' => 'default', 'retry_after' => 90, 'block_for' => 5, ],
其它隊(duì)列驅(qū)動(dòng)的依賴擴(kuò)展包
在使用列表里的隊(duì)列服務(wù)前,必須安裝以下依賴擴(kuò)展包:
- Amazon SQS:
aws/aws-sdk-php ~3.0
- Beanstalkd:
pda/pheanstalk ~4.0
- Redis:
predis/predis ~1.0
創(chuàng)建任務(wù)
生成任務(wù)類(lèi)
在你的應(yīng)用程序中,隊(duì)列的任務(wù)類(lèi)都默認(rèn)放在 app/Jobs
目錄下。如果這個(gè)目錄不存在,那當(dāng)你運(yùn)行 make:job
Artisan 命令時(shí)目錄就會(huì)被自動(dòng)創(chuàng)建。你可以用以下的 Artisan 命令來(lái)生成一個(gè)新的隊(duì)列任務(wù):
php artisan make:job ProcessPodcast
生成的類(lèi)實(shí)現(xiàn)了 Illuminate\Contracts\Queue\ShouldQueue
接口,這意味著這個(gè)任務(wù)將會(huì)被推送到隊(duì)列中,而不是同步執(zhí)行。
任務(wù)類(lèi)結(jié)構(gòu)
任務(wù)類(lèi)的結(jié)構(gòu)很簡(jiǎn)單,一般來(lái)說(shuō)只會(huì)包含一個(gè)讓隊(duì)列用來(lái)調(diào)用此任務(wù)的 handle
方法。我們來(lái)看一個(gè)示例的任務(wù)類(lèi)。這個(gè)示例里,假設(shè)我們管理著一個(gè)播客發(fā)布服務(wù),在發(fā)布之前需要處理上傳播客文件:
<?php namespace App\Jobs; use App\Podcast; use App\AudioProcessor; use Illuminate\Bus\Queueable; use Illuminate\Queue\SerializesModels; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; class ProcessPodcast implements ShouldQueue{ use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; protected $podcast; /** * 創(chuàng)建一個(gè)新的任務(wù)實(shí)例。 * * @param Podcast $podcast * @return void */ public function __construct(Podcast $podcast) { $this->podcast = $podcast; } /** * 運(yùn)行任務(wù)。 * * @param AudioProcessor $processor * @return void */ public function handle(AudioProcessor $processor) { // Process uploaded podcast... } }
注意,在這個(gè)例子中,我們?cè)谌蝿?wù)類(lèi)的構(gòu)造器中直接傳遞了一個(gè) Eloquent 模型 。因?yàn)槲覀冊(cè)谌蝿?wù)類(lèi)里引用了 SerializesModels
這個(gè) trait,使得 Eloquent 模型在處理任務(wù)時(shí)可以被優(yōu)雅地序列化和反序列化。如果你的隊(duì)列任務(wù)類(lèi)在構(gòu)造器中接收了一個(gè) Eloquent 模型,那么只有可識(shí)別出該模型的屬性會(huì)被序列化到隊(duì)列里。當(dāng)任務(wù)被實(shí)際運(yùn)行時(shí),隊(duì)列系統(tǒng)便會(huì)自動(dòng)從數(shù)據(jù)庫(kù)中重新取回完整的模型。這整個(gè)過(guò)程對(duì)你的應(yīng)用程序來(lái)說(shuō)是完全透明的,這樣可以避免在序列化完整的 Eloquent 模式實(shí)例時(shí)所帶來(lái)的一些問(wèn)題。
在隊(duì)列處理任務(wù)時(shí),會(huì)調(diào)用 handle
方法,而這里我們也可以通過(guò) handle
方法的參數(shù)類(lèi)型提示,讓 Laravel 的 服務(wù)容器 自動(dòng)注入依賴對(duì)象。
如果你想完全控制容器如何將依賴對(duì)象注入至 handle
方法,可以使用容器的 bindMethod
方法。bindMethod
方法接受一個(gè)任務(wù)和容器的回調(diào)。雖然可以直接在回調(diào)中可以調(diào)用 handle
方法,但建議應(yīng)該從 service provider 調(diào)用為佳:
use App\Jobs\ProcessPodcast; $this->app->bindMethod(ProcessPodcast::class.'@handle', function ($job, $app) { return $job->handle($app->make(AudioProcessor::class)); });
{note} 像圖片內(nèi)容這種二進(jìn)制數(shù)據(jù),在放入隊(duì)列任務(wù)之前必須使用
base64_encode
方法轉(zhuǎn)換一下。否則,當(dāng)這項(xiàng)任務(wù)放置到隊(duì)列中時(shí),可能無(wú)法正確序列化為 JSON。
分發(fā)任務(wù)
一旦你寫(xiě)完了你的任務(wù)類(lèi)你就可以使用它自帶的 dispatch
方法分發(fā)它。傳遞給 dispatch
方法的參數(shù)將會(huì)被傳遞給任務(wù)的構(gòu)造函數(shù):
<?php namespace App\Http\Controllers; use App\Jobs\ProcessPodcast; use Illuminate\Http\Request; use App\Http\Controllers\Controller; class PodcastController extends Controller{ /** * 存儲(chǔ)一個(gè)新的播客節(jié)目。 * * @param Request $request * @return Response */ public function store(Request $request) { // 創(chuàng)建播客... ProcessPodcast::dispatch($podcast); } }
延遲分發(fā)
如果你想延遲你的隊(duì)列任務(wù)的執(zhí)行,你可以在分發(fā)任務(wù)的時(shí)候使用 delay
方法。例如,讓我們?cè)敿?xì)說(shuō)明一個(gè)十分鐘之后才會(huì)執(zhí)行的任務(wù):
<?php namespace App\Http\Controllers; use App\Jobs\ProcessPodcast; use Illuminate\Http\Request; use App\Http\Controllers\Controller;class PodcastController extends Controller{ /** * 存儲(chǔ)一個(gè)新的播客節(jié)目。 * * @param Request $request * @return Response */ public function store(Request $request) { // 創(chuàng)建播客... ProcessPodcast::dispatch($podcast) ->delay(now()->addMinutes(10)); } }
{note} Amazon SQS 隊(duì)列服務(wù)最大延遲 15 分鐘的時(shí)間。
同步調(diào)度
如果您想立即(同步)執(zhí)行隊(duì)列任務(wù),可以使用 dispatchNow
方法。 使用此方法時(shí),隊(duì)列任務(wù)將不會(huì)排隊(duì),并立即在當(dāng)前進(jìn)程中運(yùn)行:
<?php namespace App\Http\Controllers; use Illuminate\Http\Request; use App\Jobs\ProcessPodcast; use App\Http\Controllers\Controller; class PodcastController extends Controller{ /** * Store a new podcast. * * @param Request $request * @return Response */ public function store(Request $request) { // Create podcast... ProcessPodcast::dispatchNow($podcast); } }
工作鏈
工作鏈允許你具體定義一個(gè)按序列執(zhí)行隊(duì)列任務(wù)的列表。一旦序列中的任務(wù)失敗了,剩余的工作將不會(huì)執(zhí)行。要運(yùn)行一個(gè)工作鏈,你可以對(duì)可分發(fā)的任務(wù)使用 withChain
方法:
ProcessPodcast::withChain([ new OptimizePodcast, new ReleasePodcast ])->dispatch();
{note} 使用
$this->delete()
方法刪除隊(duì)列任務(wù)不會(huì)阻止工作鏈任務(wù)執(zhí)行。只有當(dāng)工作鏈中的任務(wù)執(zhí)行失敗時(shí),工作鏈才會(huì)停止執(zhí)行。
工作鏈連接 & 隊(duì)列
如果你想定義用于工作鏈的默認(rèn)連接和隊(duì)列,你可以使用 allOnConnection
和 allOnQueue
方法。 這些方法指定了所需隊(duì)列的連接和隊(duì)列 —— 除非隊(duì)列任務(wù)被明確指定給了不同的連接 / 隊(duì)列:
ProcessPodcast::withChain([ new OptimizePodcast, new ReleasePodcast ])->dispatch()->allOnConnection('redis')->allOnQueue('podcasts');
自定義連接 & 隊(duì)列
分發(fā)任務(wù)到指定隊(duì)列
通過(guò)將任務(wù)分發(fā)到不同隊(duì)列,你可以將你的隊(duì)列任務(wù)「分類(lèi)」,甚至指定給不同隊(duì)列分配的任務(wù)數(shù)量。記住,這不是推送任務(wù)到你定義的隊(duì)列配置文件的不同的連接里,而是一個(gè)單一的連接。要指定隊(duì)列,在分發(fā)任務(wù)時(shí)使用 onQueue
方法:
<?php namespace App\Http\Controllers; use App\Jobs\ProcessPodcast; use Illuminate\Http\Request; use App\Http\Controllers\Controller; class PodcastController extends Controller{ /** * 存儲(chǔ)一個(gè)新的播客節(jié)目。 * * @param Request $request * @return Response */ public function store(Request $request) { // 創(chuàng)建播客... ProcessPodcast::dispatch($podcast)->onQueue('processing'); } }
分發(fā)任務(wù)到指定連接
如果你在多隊(duì)列連接中工作,你可以指定將任務(wù)分發(fā)到哪個(gè)連接。要指定連接,在分發(fā)任務(wù)時(shí)使用 onConnection
方法:
<?php namespace App\Http\Controllers; use App\Jobs\ProcessPodcast; use Illuminate\Http\Request; use App\Http\Controllers\Controller; class PodcastController extends Controller{ /** * 存儲(chǔ)一個(gè)新播客節(jié)目。 * * @param Request $request * @return Response */ public function store(Request $request) { // 創(chuàng)建播客... ProcessPodcast::dispatch($podcast)->onConnection('sqs'); } }
當(dāng)然,你可以鏈?zhǔn)秸{(diào)用 onConnection
和 onQueue
方法來(lái)指定連接和隊(duì)列。
ProcessPodcast::dispatch($podcast) ->onConnection('sqs') ->onQueue('processing');
指定最大任務(wù)嘗試次數(shù) / 超時(shí)值
最大嘗試次數(shù)
在一個(gè)任務(wù)重指定最大嘗試次數(shù)可以通過(guò) Artisan 命令的 --tries
選項(xiàng) 指定:
php artisan queue:work --tries=3
你可能想通過(guò)任務(wù)類(lèi)自身對(duì)最大任務(wù)嘗試次數(shù)進(jìn)行一個(gè)更顆?;奶幚?。如果最大嘗試次數(shù)是在任務(wù)類(lèi)中定義的,它將優(yōu)先于命令行中的值提供:
<?phpnamespace App\Jobs;class ProcessPodcast implements ShouldQueue{ /** * 任務(wù)可以嘗試的最大次數(shù)。 * * @var int */ public $tries = 5;}
基于時(shí)間的嘗試
作為另外一個(gè)選擇來(lái)定義任務(wù)在失敗前會(huì)嘗試多少次,你可以定義一個(gè)任務(wù)超時(shí)時(shí)間。這樣的話,在給定的時(shí)間范圍內(nèi),任務(wù)可以無(wú)限次嘗試。要定義一個(gè)任務(wù)的超時(shí)時(shí)間,在你的任務(wù)類(lèi)中新增一個(gè) retryUntil
方法:
/** * 定義任務(wù)超時(shí)時(shí)間 * * @return \DateTime */ public function retryUntil(){ return now()->addSeconds(5); }
{tip} 你也可以在你的隊(duì)列事件監(jiān)聽(tīng)器中使用
retryUntil
方法。
超時(shí)
{note}
timeout
特性對(duì)于 PHP 7.1+ 和pcntl
PHP 擴(kuò)展進(jìn)行了優(yōu)化.
同樣的,任務(wù)執(zhí)行最大秒數(shù)的數(shù)值可以通過(guò) Artisan 命令行的 --timeout
選項(xiàng)指定。
php artisan queue:work --timeout=30
然而,你可能也想在任務(wù)類(lèi)自身定義一個(gè)超時(shí)時(shí)間。如果在任務(wù)類(lèi)中指定,優(yōu)先級(jí)將會(huì)高于命令行:
<?php namespace App\Jobs;class ProcessPodcast implements ShouldQueue{ /** * 任務(wù)可以執(zhí)行的最大秒數(shù) (超時(shí)時(shí)間)。 * * @var int */ public $timeout = 120;}
頻率限制
{note} 這個(gè)特性要求你的應(yīng)用可以使用 Redis 服務(wù)器.
如果你的應(yīng)用使用了 Redis,你可以通過(guò)時(shí)間或并發(fā)限制你的隊(duì)列任務(wù)。當(dāng)你的隊(duì)列任務(wù)通過(guò)同樣有速率限制的 API 使用時(shí),這個(gè)特性將很有幫助。
例如,使用 throttle
方法,你可以限制一個(gè)給定類(lèi)型的任務(wù)每 60 秒只執(zhí)行 10 次。如果沒(méi)有獲得鎖,一般情況下你應(yīng)該將任務(wù)放回隊(duì)列以使其可以被稍后重試。
Redis::throttle('key')->allow(10)->every(60)->then(function () { // 任務(wù)邏輯... }, function () { // 無(wú)法獲得鎖... return $this->release(10); });
{tip} 在上述的例子里,
key
可以是任何你想要限制頻率的任務(wù)類(lèi)型的唯一識(shí)別字符串。例如,使用構(gòu)件基于任務(wù)類(lèi)名的 key,或它操作的 Eloquent 模型的 ID。{note} 將受限制的作業(yè)釋放回隊(duì)列,仍然會(huì)增加工作的總數(shù)
attempts
。
或者,你可以指定一個(gè)任務(wù)可以同時(shí)執(zhí)行的最大數(shù)量。在如下情況時(shí)這會(huì)很有用處:當(dāng)一個(gè)隊(duì)列中的任務(wù)正在修改資源時(shí),一次只能被一個(gè)任務(wù)修改。例如,使用 funnel
方法,你可以限制一個(gè)給定類(lèi)型的任務(wù)一次只能執(zhí)行一個(gè)處理器:
Redis::funnel('key')->limit(1)->then(function () { // 任務(wù)邏輯...}, function () { // 無(wú)法獲得鎖... return $this->release(10); });
{tip} 當(dāng)使用頻率限制時(shí),任務(wù)執(zhí)行成功的嘗試的次數(shù)可能會(huì)難以確定。所以,將頻率限制與 時(shí)間限制 組合是很有作用的。
錯(cuò)誤處理
如果在任務(wù)執(zhí)行的時(shí)候出現(xiàn)異常,任務(wù)會(huì)被自動(dòng)釋放到隊(duì)列中以再次嘗試。任務(wù)將會(huì)一直被釋放直到達(dá)到應(yīng)用允許的最大重試次數(shù)。最大重試的數(shù)值由 queue:work
Artisan 命令的 --tries
選項(xiàng)定義,或者在任務(wù)類(lèi)中定義。更多執(zhí)行隊(duì)列處理器的信息可以 在以下找到 。
排隊(duì)閉包
你也可以直接調(diào)用閉包,而不是將任務(wù)類(lèi)調(diào)度到隊(duì)列中。這對(duì)于需要執(zhí)行的快速、簡(jiǎn)單的任務(wù)非常有用:
$podcast = App\Podcast::find(1); dispatch(function () use ($podcast) { $podcast->publish(); });
將閉包分派給隊(duì)列時(shí),閉包的代碼內(nèi)容將以加密方式簽名,因此無(wú)法在傳輸過(guò)程中對(duì)其進(jìn)行修改。
運(yùn)行隊(duì)列處理器
Laravel 包含了一個(gè)隊(duì)列處理器以將推送到隊(duì)列中的任務(wù)執(zhí)行。你可以使用 queue:work
Artisan 命令運(yùn)行處理器。 注意一旦 queue:work
命令開(kāi)始執(zhí)行,它會(huì)一直運(yùn)行直到它被手動(dòng)停止或終端被關(guān)閉。
php artisan queue:work
{tip} 要使
queue:work
進(jìn)程一直在后臺(tái)運(yùn)行,你應(yīng)該使用進(jìn)程管理器比如 Supervisor 來(lái)確保隊(duì)列處理器不會(huì)停止運(yùn)行
記住,隊(duì)列處理器是一個(gè)常駐的進(jìn)程并且在內(nèi)存中保存著已經(jīng)啟動(dòng)的應(yīng)用狀態(tài)。因此,它們并不會(huì)在啟動(dòng)后注意到你代碼的更改。所以,在你的重新部署過(guò)程中,請(qǐng)記得 重啟你的隊(duì)列處理器.
指定連接 & 隊(duì)列
你也可以具體說(shuō)明隊(duì)列處理器應(yīng)該使用哪個(gè)隊(duì)列連接。 傳遞給 work
的連接名應(yīng)該與你的 config/queue.php
配置文件中定義的連接之一相符。
php artisan queue:work redis
你甚至可以自定義你的隊(duì)列處理器使其只執(zhí)行連接中指定的隊(duì)列。例如,如果你的所有郵件都由 redis
連接的 emails
隊(duì)列處理,你可以使用如下的命令啟動(dòng)一個(gè)僅執(zhí)行此隊(duì)列的處理器:
php artisan queue:work redis --queue=emails
執(zhí)行單一任務(wù)
--once
選項(xiàng)用于使隊(duì)列處理器只處理隊(duì)列中的單一任務(wù)。
php artisan queue:work --once
處理所有隊(duì)列的任務(wù)然后退出
--stop-when-empty
選項(xiàng)可用于處理隊(duì)列處理器處理所有作業(yè)然后優(yōu)雅地退出。如果您希望在隊(duì)列為空后關(guān)閉容器,則在 Docker 容器中運(yùn)行 Laravel 隊(duì)列時(shí),此選項(xiàng)很有用:
php artisan queue:work --stop-when-empty
資源注意事項(xiàng)
后臺(tái)駐留的隊(duì)列處理器不會(huì)在執(zhí)行完每個(gè)任務(wù)后「重啟」框架。因此,你應(yīng)該在每個(gè)任務(wù)完成后釋放任何占用過(guò)大的資源。例如,如果你正在用 GD 庫(kù)執(zhí)行圖像處理,你應(yīng)該在完成后使用 imagedestroy
釋放內(nèi)存。
隊(duì)列優(yōu)先級(jí)
有時(shí)你可能想確定隊(duì)列執(zhí)行的優(yōu)先順序。例如在 config/queue.php
中你可以將 redis
連接的 queue
隊(duì)列的優(yōu)先級(jí)從 default
設(shè)置為 low
。然而, 偶爾你也想像如下方式將一個(gè)任務(wù)推送到 high
隊(duì)列:
dispatch((new Job)->onQueue('high'));
要運(yùn)行一個(gè)處理器來(lái)確認(rèn) low
隊(duì)列中的任務(wù)在全部的 high
隊(duì)列任務(wù)完成后才繼續(xù)執(zhí)行,你可以傳遞一個(gè)逗號(hào)分隔的隊(duì)列名列表作為 work
命令的參數(shù)。
php artisan queue:work --queue=high,low
隊(duì)列處理器 & 部署
因?yàn)殛?duì)列處理器是常駐進(jìn)程,他們?cè)谥貑⑶安粫?huì)應(yīng)用你代碼的更改。因此,部署使用隊(duì)列處理器的應(yīng)用最簡(jiǎn)單的方法是在部署進(jìn)程中重啟隊(duì)列處理器。你可以平滑地重啟所有隊(duì)列處理器通過(guò)使用 queue:restart
方法:
php artisan queue:restart
這個(gè)命令將會(huì)引導(dǎo)所有的隊(duì)列處理器在完成當(dāng)前任務(wù)后平滑「中止」,這樣不會(huì)有丟失的任務(wù)。由于在執(zhí)行 queue:restart
后隊(duì)列處理器將會(huì)中止,所以你應(yīng)該運(yùn)行一個(gè)進(jìn)程管理器例如 Supervisor 來(lái)自動(dòng)重啟隊(duì)列處理器。
{tip} 隊(duì)列使用 緩存 存儲(chǔ)重啟信號(hào),所以你應(yīng)該確定在使用這個(gè)功能之前配置好緩存驅(qū)動(dòng)。
任務(wù)過(guò)期 & 超時(shí)
任務(wù)過(guò)期
在你的 config/queue.php
配置文件中,每個(gè)隊(duì)列連接都定義了一個(gè) retry_after
選項(xiàng)。這個(gè)選項(xiàng)指定了隊(duì)列連接在重試一個(gè)任務(wù)前應(yīng)該等它執(zhí)行多久。例如,如果 retry_after
的值設(shè)置為 90
,那么任務(wù)在執(zhí)行了 90 秒后將會(huì)被放回隊(duì)列而不是刪除它。一般情況下,你應(yīng)該將 retry_after
的值設(shè)置為你認(rèn)為你的任務(wù)可能會(huì)執(zhí)行需要最長(zhǎng)時(shí)間的值。
{note} 只有在 Amazon SQS 中不存在
retry_after
這個(gè)值。 SQS 將會(huì)以 AWS 控制臺(tái)配置的 默認(rèn)可見(jiàn)超時(shí)值 作為重試任務(wù)的依據(jù)。
處理器超時(shí)
queue:work
Artisan 命令包含一個(gè) --timeout
選項(xiàng)。 --timeout
選項(xiàng)指定了 Laravel 的隊(duì)列主進(jìn)程在中止一個(gè)執(zhí)行任務(wù)的子進(jìn)程之前需要等到多久。有時(shí)一個(gè)子進(jìn)程可能會(huì)因?yàn)楦鞣N原因「凍結(jié)」,比如一個(gè)外部的 HTTP 請(qǐng)求失去響應(yīng)。 --timeout
選項(xiàng)會(huì)移除那些超過(guò)指定時(shí)間被凍結(jié)的進(jìn)程。
php artisan queue:work --timeout=60
retry_after
配置項(xiàng)和 --timeout
命令行配置并不同,但將它們同時(shí)使用可以確保任務(wù)不會(huì)丟失并且任務(wù)只會(huì)成功執(zhí)行一次。
{note}
--timeout
的值應(yīng)該比你在retry_after
中配置的值至少短幾秒。這會(huì)確保處理器永遠(yuǎn)會(huì)在一個(gè)任務(wù)被重試之前中止。如果你的--timeout
值比retry_after
的值長(zhǎng)的話,你的任務(wù)可能會(huì)被執(zhí)行兩次。
隊(duì)列進(jìn)程睡眠時(shí)間
當(dāng)任務(wù)在隊(duì)列中可用時(shí),處理器將會(huì)一直無(wú)間隔地處理任務(wù)。 然而, sleep
選項(xiàng)定義了如果沒(méi)有新任務(wù)的時(shí)候處理器將會(huì)「睡眠」多長(zhǎng)時(shí)間。在處理器睡眠時(shí),它不會(huì)處理任何新任務(wù) —— 任務(wù)將會(huì)在隊(duì)列處理器再次啟動(dòng)后執(zhí)行。
php artisan queue:work --sleep=3
Supervisor 配置
安裝 Supervisor
Supervisor 是 Linux 操作系統(tǒng)下中的一個(gè)進(jìn)程監(jiān)控器,它可以在 queue:work
掛掉時(shí)自動(dòng)重啟之。在 Ubuntu 上安裝 Supervisor,你可以使用如下命令:
sudo apt-get install supervisor
{小提醒} 如果覺(jué)得配置 Supervisor 難于登天,可以考慮使用 Laravel Forge,它將自動(dòng)為你的 Laravel 項(xiàng)目安裝和配置 Supervisor。
配置 Supervisor
Supervisor 的配置文件通常位于 /etc/supervisor/conf.d
目錄下。在該目錄中,你可以創(chuàng)建任意數(shù)量的配置文件,用來(lái)控制 supervisor 將如何監(jiān)控你的進(jìn)程。例如,創(chuàng)建一個(gè) laravel-worker.conf
文件使之啟動(dòng)和監(jiān)控一個(gè) queue:work
進(jìn)程:
[program:laravel-worker] process_name=%(program_name)s_%(process_num)02d command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 autostart=true autorestart=true user=forge numprocs=8 redirect_stderr=true stdout_logfile=/home/forge/app.com/worker.log
在這個(gè)例子中,numprocs
指令將指定 Supervisor 運(yùn)行 8 個(gè) queue:work
進(jìn)程并對(duì)其進(jìn)行監(jiān)控,如果它們掛掉就自動(dòng)重啟它們。你應(yīng)該更改 command
選項(xiàng)中的 queue:work sqs
部分以表示你所需的隊(duì)列連接。
啟動(dòng) Supervisor
配置文件創(chuàng)建完畢后,你就可以使用如下命令更新 Supervisor 配置并啟動(dòng)進(jìn)程了:
sudo supervisorctl reread sudo supervisorctl update sudo supervisorctl start laravel-worker:*
獲取關(guān)于 Supervisor 的更多信息,可以查閱 Supervisor 文檔.
處理失敗的任務(wù)
有時(shí)你的隊(duì)列化任務(wù)會(huì)執(zhí)行失敗。放平心態(tài),好事多磨。 Laravel 包含了一種方便的方法來(lái)指定任務(wù)應(yīng)該嘗試的最大次數(shù)。如果一個(gè)任務(wù)已經(jīng)到達(dá)了最大嘗試次數(shù),它就會(huì)被插入到 failed_jobs
數(shù)據(jù)庫(kù)表中。要?jiǎng)?chuàng)建 failed_jobs
數(shù)據(jù)庫(kù)遷移表,你可以使用 queue:failed-table
命令:
php artisan queue:failed-table php artisan migrate
然后,當(dāng)你運(yùn)行 queue worker,你應(yīng)該使用 queue:work
命令中的 --tries
開(kāi)關(guān)指定應(yīng)嘗試運(yùn)行任務(wù)的最大次數(shù)。 如果沒(méi)有為 --tries
選項(xiàng)指定值,則將死循環(huán)嘗試運(yùn)行任務(wù):
php artisan queue:work redis --tries=3
任務(wù)失敗后清理
你可以直接在任務(wù)類(lèi)中定義 failed
方法,允許你在任務(wù)失敗時(shí)執(zhí)行針對(duì)于該任務(wù)的清理工作。 這是向用戶發(fā)送警報(bào)或恢復(fù)任務(wù)執(zhí)行的任何操作的絕佳位置。導(dǎo)致任務(wù)失敗的 Exception
將被傳遞給 failed
方法:
<?php namespace App\Jobs; use Exception;use App\Podcast; use App\AudioProcessor; use Illuminate\Bus\Queueable; use Illuminate\Queue\SerializesModels; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Contracts\Queue\ShouldQueue; class ProcessPodcast implements ShouldQueue{ use InteractsWithQueue, Queueable, SerializesModels; protected $podcast; /** * 創(chuàng)建任務(wù)實(shí)例 * * @param Podcast $podcast * @return void */ public function __construct(Podcast $podcast) { $this->podcast = $podcast; } /** * 執(zhí)行任務(wù) * * @param AudioProcessor $processor * @return void */ public function handle(AudioProcessor $processor) { // 上傳播客…… } /** * 任務(wù)失敗的處理過(guò)程 * * @param Exception $exception * @return void */ public function failed(Exception $exception) { // 給用戶發(fā)送任務(wù)失敗的通知,等等…… } }
任務(wù)失敗事件
如果你想在任務(wù)失敗時(shí)注冊(cè)一個(gè)可調(diào)用的事件,你可以使用 Queue::failing
方法。該事件是通過(guò) email 或 Slack 通知你團(tuán)隊(duì)的絕佳時(shí)機(jī)。例如,我們可以在 Laravel 中的 AppServiceProvider
中附加一個(gè)回調(diào)事件:
<?php namespace App\Providers; use Illuminate\Support\Facades\Queue; use Illuminate\Queue\Events\JobFailed; use Illuminate\Support\ServiceProvider; class AppServiceProvider extends ServiceProvider{ /** * 啟動(dòng)任意服務(wù)。 * * @return void */ public function boot() { Queue::failing(function (JobFailed $event) { // $event->connectionName // $event->job // $event->exception }); } /** * 注冊(cè)服務(wù)提供者。 * * @return void */ public function register() { // } }
重試失敗的任務(wù)
要想查看所有被放入 failed_jobs
數(shù)據(jù)表中的任務(wù),你可以使用 Artisan 命令 queue:failed
:
php artisan queue:failed
queue:failed
命令會(huì)列出任務(wù) ID ,隊(duì)列,以及失敗的時(shí)間。任務(wù) ID 可能會(huì)被用于重試失敗的任務(wù)。例如,要重試一個(gè)任務(wù) ID 為 5
的任務(wù),使用如下命令:
php artisan queue:retry 5
要重試所有失敗的任務(wù),執(zhí)行 queue:retry
命令,將 all
作為 ID 傳入:
php artisan queue:retry all
如果你想刪除一個(gè)失敗的任務(wù),使用 queue:forget
命令:
php artisan queue:forget 5
要清空所有失敗的任務(wù),使用 queue:flush
命令:
php artisan queue:flush
忽略缺失的模型
在向任務(wù)中注入 Eloquent 模型時(shí),模型被放入隊(duì)列前將被自動(dòng)序列化并在執(zhí)行任務(wù)時(shí)還原。但是,如果在任務(wù)等待執(zhí)行時(shí)刪除了模型,任務(wù)可能會(huì)失敗并拋出 ModelNotFoundException
。
為了方便,你可以選擇設(shè)置任務(wù)的 deleteWhenMissingModels
屬性為 true
來(lái)自動(dòng)地刪除缺失模型的任務(wù)。
/** * 如果模型缺失即刪除任務(wù)。 * * @var bool */ public $deleteWhenMissingModels = true;
任務(wù)事件
通過(guò)在 Queue
facade 中使用 before
和 after
方法,你可以指定一個(gè)隊(duì)列任務(wù)被執(zhí)行前后的回調(diào)。這些回調(diào)是添加額外的日志或增加統(tǒng)計(jì)的絕好時(shí)機(jī)。通常,你應(yīng)該在 服務(wù)提供者中調(diào)用這些方法。例如,我們可以使用 Laravel 的 AppServiceProvider
:
<?php namespace App\Providers; use Illuminate\Support\Facades\Queue; use Illuminate\Support\ServiceProvider; use Illuminate\Queue\Events\JobProcessed; use Illuminate\Queue\Events\JobProcessing; class AppServiceProvider extends ServiceProvider{ /** * 引導(dǎo)啟動(dòng)任意應(yīng)用服務(wù)。 * * @return void */ public function boot() { Queue::before(function (JobProcessing $event) { // $event->connectionName // $event->job // $event->job->payload() }); Queue::after(function (JobProcessed $event) { // $event->connectionName // $event->job // $event->job->payload() }); } /** * 注冊(cè)服務(wù)提供者。 * * @return void */ public function register() { // } }
在 Queue
facade 使用 looping
方法可以在處理器嘗試獲取任務(wù)之前執(zhí)行回調(diào)。例如,你也許想用一個(gè)閉包來(lái)回滾之前失敗的任務(wù)尚未關(guān)閉的事務(wù):
Queue::looping(function () { while (DB::transactionLevel() > 0) { DB::rollBack(); } });