【Laravel】 Redis Queue Worker のソースコードリーディング

./artisan queue:work redis

というコマンドを実行したとする。(Supervisor などでデーモナイズすることを想定)

WorkCommand の実行

WorkCommand::handle()WorkCommand::runWorker() という流れでワーカーのループ準備に入る。 --once オプションを渡していないので,Worker::daemon() メソッドが呼ばれることになる。(ここから無限ループ処理に入る)

以下に示す daemon() メソッドがループの中核となる。ここからこの処理を1つずつ追っていく。

 * Listen to the given queue in a loop.
 * @param  string  $connectionName
 * @param  string  $queue
 * @param  \Illuminate\Queue\WorkerOptions  $options
 * @return void
public function daemon($connectionName, $queue, WorkerOptions $options)
    if ($this->supportsAsyncSignals()) {

    $lastRestart = $this->getTimestampOfLastQueueRestart();

    while (true) {
        // Before reserving any jobs, we will make sure this queue is not paused and
        // if it is we will just pause this worker for a given amount of time and
        // make sure we do not need to kill this worker process off completely.
        if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
            $this->pauseWorker($options, $lastRestart);


        // First, we will attempt to get the next job off of the queue. We will also
        // register the timeout handler and reset the alarm for this job so it is
        // not stuck in a frozen state forever. Then, we can fire off this job.
        $job = $this->getNextJob(
            $this->manager->connection($connectionName), $queue

        if ($this->supportsAsyncSignals()) {
            $this->registerTimeoutHandler($job, $options);

        // If the daemon should run (not in maintenance mode, etc.), then we can run
        // fire off this job for processing. Otherwise, we will need to sleep the
        // worker so no more jobs are processed until they should be processed.
        if ($job) {
            $this->runJob($job, $connectionName, $options);
        } else {

        // Finally, we will check to see if we have exceeded our memory limits or if
        // the queue should restart based on other indications. If so, we'll stop
        // this worker and let whatever is "monitoring" it restart the process.
        $this->stopIfNecessary($options, $lastRestart, $job);


  1. SIGTERM シグナルハンドラを登録する。
  2. 最も古いジョブをキューから取得する。そして,ジョブの $timeout プロパティまたはワーカーの --timeout 設定値に合わせて SIGALRM シグナルハンドラを登録する。(2回目以降は上書き)
  3. ジョブを取得できたかどうかで分岐。
    • ジョブが取得できた場合, 4 に進む。
    • ジョブが取得できなかった場合, ワーカーの --sleep 設定値ぶんだけスリープし, 5 に進む。
  4. ジョブを実行し,成功したかどうかで分岐。
    • 成功したら 5 に進む。
    • 最大試行回数内で例外が発生もしくはタイムアウトしたら,ジョブをもう一度キューに投入する。例外があればスローする。そして 5 に進む。
    • 最大試行回数を超えて例外が発生したら,その例外をスローし, 5 に進む。
    • 最大試行回数を超えてタイムアウトしたら, MaxAttemptsExceededException をスローし, 5 に進む。
  5. ワーカーの終了判定を行う。SIGTERMを受け取るなどして終了すべきと判定された場合はここで終了し,それ以外は 2 に戻って次のループに入る。



if ($this->supportsAsyncSignals()) {
 * Enable async signals for the process.
 * @return void
protected function listenForSignals()

    pcntl_signal(SIGTERM, function () {
        $this->shouldQuit = true;

    pcntl_signal(SIGUSR2, function () {
        $this->paused = true;

    pcntl_signal(SIGCONT, function () {
        $this->paused = false;
  • SIGTERMkill コマンドなどのデフォルトの終了処理であるが,この登録を行っておくことによって, 強制終了シグナルの SIGKILL が飛んでこない限りは Graceful にシャットダウンすることができるようになる。設定された $this->shouldQuit は再起動判定時に利用される。
  • SIGUSR2 はユーザ定義シグナルである。なにやらポーズ処理が入っているように見えるが,これは Laravel Horizon 用のシグナルなので,ワーカーを単独で使用する分には無関係だ。
  • SIGCONT は本来は SIGSTOP のあとにプロセスを再開するためのシグナルだが, ここでは SIGUSR2 に対応する Laravel Horizon 用のシグナル として使われている。


$lastRestart = $this->getTimestampOfLastQueueRestart();
 * Get the last queue restart timestamp, or null.
 * @return int|null
protected function getTimestampOfLastQueueRestart()
    if ($this->cache) {
        return $this->cache->get('illuminate:queue:restart');

何やらキャッシュから値を取得しているが,これは RestartCommand によってセットされる値である。要するに,プロセス間通信の手段としてキャッシュストレージを利用しているわけだ。この $lastRestart も再起動判定時に利用される。


namespace Illuminate\Queue\Console;

use Illuminate\Console\Command;
use Illuminate\Support\InteractsWithTime;

class RestartCommand extends Command
    use InteractsWithTime;

     * The console command name.
     * @var string
    protected $name = 'queue:restart';

     * The console command description.
     * @var string
    protected $description = 'Restart queue worker daemons after their current job';

     * Execute the console command.
     * @return void
    public function handle()
        $this->laravel['cache']->forever('illuminate:queue:restart', $this->currentTime());

        $this->info('Broadcasting queue restart signal.');



// Before reserving any jobs, we will make sure this queue is not paused and
// if it is we will just pause this worker for a given amount of time and
// make sure we do not need to kill this worker process off completely.
if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
    $this->pauseWorker($options, $lastRestart);

 * Determine if the daemon should process on this iteration.
 * @param  \Illuminate\Queue\WorkerOptions  $options
 * @param  string  $connectionName
 * @param  string  $queue
 * @return bool
protected function daemonShouldRun(WorkerOptions $options, $connectionName, $queue)
    return ! (($this->manager->isDownForMaintenance() && ! $options->force) ||
        $this->paused ||
        $this->events->until(new Events\Looping($connectionName, $queue)) === false);

以下の場合に pauseWorker() が実行されることがわかる。大きく注意すべきはメンテナンスモードではオプションを渡さない限りワーカーが動かないことであろうか。

  • ./artisan down によってメンテナンスモードになっていて,かつ起動時に --force オプションが渡されていない場合
  • Laravel Horizon 制御下において SIGUSR2 を受け取った場合
  • Queue::looping() で登録されたリスナーのどれかが false をリターンして以後のリスナー実行を中断した場合
 * Pause the worker for the current loop.
 * @param  \Illuminate\Queue\WorkerOptions  $options
 * @param  int  $lastRestart
 * @return void
protected function pauseWorker(WorkerOptions $options, $lastRestart)
    $this->sleep($options->sleep > 0 ? $options->sleep : 1);

    $this->stopIfNecessary($options, $lastRestart);

pauseWorker() の実行は,スリープ処理が入る点を除けば stopIfNecessary() の実装そのままである。このメソッドはループの最後でも使用されており,後ほど確認する。


// First, we will attempt to get the next job off of the queue. We will also
// register the timeout handler and reset the alarm for this job so it is
// not stuck in a frozen state forever. Then, we can fire off this job.
$job = $this->getNextJob(
    $this->manager->connection($connectionName), $queue

もし ---queue=foo,bar のように優先度をつけて対象キューを複数設定していた場合,ここで優先度順にポップ処理が走る。

 * Get the next job from the queue connection.
 * @param  \Illuminate\Contracts\Queue\Queue  $connection
 * @param  string  $queue
 * @return \Illuminate\Contracts\Queue\Job|null
protected function getNextJob($connection, $queue)
    try {
        foreach (explode(',', $queue) as $queue) {
            if (! is_null($job = $connection->pop($queue))) {
                return $job;
    } catch (Exception $e) {
        /* ... */
    } catch (Throwable $e) {
        /* ... */

$connection->pop($queue) は, QueueManager::__call() を通じて RedisQueue::pop() に委譲される。

 * Pop the next job off of the queue.
 * @param  string  $queue
 * @return \Illuminate\Contracts\Queue\Job|null
public function pop($queue = null)
    $this->migrate($prefixed = $this->getQueue($queue));

    if (empty($nextJob = $this->retrieveNextJob($prefixed))) {

    [$job, $reserved] = $nextJob;

    if ($reserved) {
        return new RedisJob(
            $this->container, $this, $job,
            $reserved, $this->connectionName, $queue ?: $this->default


 * Migrate any delayed or expired jobs onto the primary queue.
 * @param  string  $queue
 * @return void
protected function migrate($queue)
    $this->migrateExpiredJobs($queue.':delayed', $queue);

    if (! is_null($this->retryAfter)) {
        $this->migrateExpiredJobs($queue.':reserved', $queue);


-- Get all of the jobs with an expired "score"...
local val = redis.call('zrangebyscore', "{$queue}", '-inf', "{$currentTimestamp}")

-- If we have values in the array, we will remove them from the first queue
-- and add them onto the destination queue in chunks of 100, which moves
-- all of the appropriate jobs onto the destination queue very safely.
if(next(val) ~= nil) then
    redis.call('zremrangebyrank', "{$queue}:delayed", 0, #val - 1)

    for i = 1, #val, 100 do
        redis.call('rpush', "{$queue}", unpack(val, i, math.min(i+99, #val)))

return val

queues:キュー名:delayed は発火時刻をスコア,値をジョブペイロードとする ZSET(ソート済みセット型)であり,以下のような処理が行われている。

  1. ZRANGEBYSCORE で現在時刻までにスケジュールされたジョブを全件取得し,変数に入れたあとは ZRANGEBYRANK でクリアする
  2. 変数に入れたジョブを100件ずつ RPUSH でキューに移動する

queues:キュー名:reserved に関しても同様で,こちらは遅延ジョブではなくリトライジョブが対象になる。発火時刻を算出するパラメータとして queue.connections.redis.retry_after で設定した秒数が用いられる。


 * Retrieve the next job from the queue.
 * @param  string  $queue
 * @return array
protected function retrieveNextJob($queue)
    if (! is_null($this->blockFor)) {
        return $this->blockingPop($queue);

    return $this->getConnection()->eval(
        LuaScripts::pop(), 2, $queue, $queue.':reserved',

queue.connections.redis.block_for の設定があるかどうかによって処理が分岐される。

 * Retrieve the next job by blocking-pop.
 * @param  string  $queue
 * @return array
protected function blockingPop($queue)
    $rawBody = $this->getConnection()->blpop($queue, $this->blockFor);

    if (! empty($rawBody)) {
        $payload = json_decode($rawBody[1], true);


        $reserved = json_encode($payload);

        $this->getConnection()->zadd($queue.':reserved', [
            $reserved => $this->availableAt($this->retryAfter),

        return [$rawBody[1], $reserved];

    return [null, null];
-- Pop the first job off of the queue...
local job = redis.call('lpop', "{$queue}")
local reserved = false

if(job ~= false) then
    -- Increment the attempt count and place job on the reserved queue...
    reserved = cjson.decode(job)
    reserved['attempts'] = reserved['attempts'] + 1
    reserved = cjson.encode(reserved)
    redis.call('zadd', "{$queue}:reserved", "{$this->availableAt($this->retryAfter)}", reserved)

return {job, reserved}


さて,ここでまた queues:キュー名:reserved セットが登場している。

  1. ペイロードに含まれる試行回数 "attempts" をインクリメント
  2. リトライ時刻を算出し,それをスコアとしてペイロードを queues:キュー名:reserved セットに追加

リトライ発火時刻は以下のように算出される。要するに, queue.connections.redis.retry_after の値に現在時刻を足しているだけだ。

 * Get the "available at" UNIX timestamp.
 * @param  \DateTimeInterface|\DateInterval|int  $delay
 * @return int
protected function availableAt($delay = 0)
    $delay = $this->parseDateInterval($delay);

    return $delay instanceof DateTimeInterface
                        ? $delay->getTimestamp()
                        : Carbon::now()->addSeconds($delay)->getTimestamp();
 * If the given value is an interval, convert it to a DateTime instance.
 * @param  \DateTimeInterface|\DateInterval|int  $delay
 * @return \DateTimeInterface|int
protected function parseDateInterval($delay)
    if ($delay instanceof DateInterval) {
        $delay = Carbon::now()->add($delay);

    return $delay;




 * Get the next job from the queue connection.
 * @param  \Illuminate\Contracts\Queue\Queue  $connection
 * @param  string  $queue
 * @return \Illuminate\Contracts\Queue\Job|null
protected function getNextJob($connection, $queue)
    try {
        /* ... */
    } catch (Exception $e) {


    } catch (Throwable $e) {
        $this->exceptions->report($e = new FatalThrowableError($e));


 * Stop the worker if we have lost connection to a database.
 * @param  \Throwable  $e
 * @return void
protected function stopWorkerIfLostConnection($e)
    if ($this->causedByLostConnection($e)) {
        $this->shouldQuit = true;


  1. エラーレポートを行う
  2. データベースコネクションが切断されたことが原因の例外であれば,次のループでの終了を予約
    (Redis ドライバを使っている今回は無関係)
  3. 1秒間スリープ

SIGALRM を利用してタイムアウトハンドラを登録

if ($this->supportsAsyncSignals()) {
    $this->registerTimeoutHandler($job, $options);

最初に設定した SIGTERM に加えて,サイクル1回ごとに SIGALRM ハンドラも繰り返し上書き登録されていく。ジョブの $timeout 設定またはワーカーの --timeout オプションからタイムアウト秒数を拾い,ハンドラとして SIGKILL による強制終了処理を登録している。

 * Register the worker timeout handler.
 * @param  \Illuminate\Contracts\Queue\Job|null  $job
 * @param  \Illuminate\Queue\WorkerOptions  $options
 * @return void
protected function registerTimeoutHandler($job, WorkerOptions $options)
    // We will register a signal handler for the alarm signal so that we can kill this
    // process if it is running too long because it has frozen. This uses the async
    // signals supported in recent versions of PHP to accomplish it conveniently.
    pcntl_signal(SIGALRM, function () {

        max($this->timeoutForJob($job, $options), 0)
 * Kill the process.
 * @param  int  $status
 * @return void
public function kill($status = 0)
    $this->events->dispatch(new Events\WorkerStopping($status));

    if (extension_loaded('posix')) {
        posix_kill(getmypid(), SIGKILL);

 * Get the appropriate timeout for the given job.
 * @param  \Illuminate\Contracts\Queue\Job|null  $job
 * @param  \Illuminate\Queue\WorkerOptions  $options
 * @return int
protected function timeoutForJob($job, WorkerOptions $options)
    return $job && ! is_null($job->timeout()) ? $job->timeout() : $options->timeout;

ジョブの実行 (またはスリープ)


// If the daemon should run (not in maintenance mode, etc.), then we can run
// fire off this job for processing. Otherwise, we will need to sleep the
// worker so no more jobs are processed until they should be processed.
if ($job) {
    $this->runJob($job, $connectionName, $options);
} else {
 * Process the given job.
 * @param  \Illuminate\Contracts\Queue\Job  $job
 * @param  string  $connectionName
 * @param  \Illuminate\Queue\WorkerOptions  $options
 * @return void
protected function runJob($job, $connectionName, WorkerOptions $options)
    try {
        return $this->process($connectionName, $job, $options);
    } catch (Exception $e) {
        /* ... */
    } catch (Throwable $e) {
        /* ... */
 * Process the given job from the queue.
 * @param  string  $connectionName
 * @param  \Illuminate\Contracts\Queue\Job  $job
 * @param  \Illuminate\Queue\WorkerOptions  $options
 * @return void
 * @throws \Throwable
public function process($connectionName, $job, WorkerOptions $options)
    try {
        // First we will raise the before job event and determine if the job has already ran
        // over its maximum attempt limits, which could primarily happen when this job is
        // continually timing out and not actually throwing any exceptions from itself.
        $this->raiseBeforeJobEvent($connectionName, $job);

            $connectionName, $job, (int) $options->maxTries

        // Here we will fire off the job and let it process. We will catch any exceptions so
        // they can be reported to the developers logs, etc. Once the job is finished the
        // proper events will be fired to let any listeners know this job has finished.

        $this->raiseAfterJobEvent($connectionName, $job);
    } catch (Exception $e) {
        /* ... */
    } catch (Throwable $e) {
        /* ... */



の部分であるが,これは キュー・イベント・ブロードキャストに関する補足資料 で既に解説済みであるため大部分は割愛する。


CallQueuedHandler の実装を見ると,ジョブを実行し終わったときに $job->delete() が実行されているのがわかる。

 * Handle the queued job.
 * @param  \Illuminate\Contracts\Queue\Job  $job
 * @param  array  $data
 * @return void
public function call(Job $job, array $data)
    try {
        $command = $this->setJobInstanceIfNecessary(
            $job, unserialize($data['command'])
    } catch (ModelNotFoundException $e) {
        return $this->handleModelNotFound($job, $e);

        $command, $this->resolveHandler($job, $command)

    if (! $job->hasFailed() && ! $job->isReleased()) {

    if (! $job->isDeletedOrReleased()) {

基底抽象クラス Job を見ると

 * Delete the job from the queue.
 * @return void
public function delete()
    $this->deleted = true;

という実装になっているが, これは RedisJob によって以下のように継承されている。

 * Delete the job from the queue.
 * @return void
public function delete()

    $this->redis->deleteReserved($this->queue, $this);
 * Delete a reserved job from the queue.
 * @param  string  $queue
 * @param  \Illuminate\Queue\Jobs\RedisJob  $job
 * @return void
public function deleteReserved($queue, $job)
    $this->getConnection()->zrem($this->getQueue($queue).':reserved', $job->getReservedJob());




 * Process the given job.
 * @param  \Illuminate\Contracts\Queue\Job  $job
 * @param  string  $connectionName
 * @param  \Illuminate\Queue\WorkerOptions  $options
 * @return void
protected function runJob($job, $connectionName, WorkerOptions $options)
    try {
        return $this->process($connectionName, $job, $options);
    } catch (Exception $e) {

    } catch (Throwable $e) {
        $this->exceptions->report($e = new FatalThrowableError($e));

 * Process the given job from the queue.
 * @param  string  $connectionName
 * @param  \Illuminate\Contracts\Queue\Job  $job
 * @param  \Illuminate\Queue\WorkerOptions  $options
 * @return void
 * @throws \Throwable
public function process($connectionName, $job, WorkerOptions $options)
    try {
        /* ... */
        /* ... */
    } catch (Exception $e) {
        $this->handleJobException($connectionName, $job, $options, $e);
    } catch (Throwable $e) {
            $connectionName, $job, $options, new FatalThrowableError($e)


  1. handleJobException() メソッドで例外をハンドルする
  2. データベースコネクションが切断されたことが原因の例外であれば,次のループでの終了を予約

では handleJobException() メソッドを見ていこう。ここから先は複雑なので,気合を入れて読んでいきたい。

 * Handle an exception that occurred while the job was running.
 * @param  string  $connectionName
 * @param  \Illuminate\Contracts\Queue\Job  $job
 * @param  \Illuminate\Queue\WorkerOptions  $options
 * @param  \Exception  $e
 * @return void
 * @throws \Exception
protected function handleJobException($connectionName, $job, WorkerOptions $options, $e)
    try {
        // First, we will go ahead and mark the job as failed if it will exceed the maximum
        // attempts it is allowed to run the next time we process it. If so we will just
        // go ahead and mark it as failed now so we do not have to release this again.
        if (! $job->hasFailed()) {
                $connectionName, $job, (int) $options->maxTries, $e

            $connectionName, $job, $e
    } finally {
        // If we catch an exception, we will attempt to release the job back onto the queue
        // so it is not lost entirely. This'll let the job be retried at a later time by
        // another listener (or this same one). We will re-throw this exception after.
        if (! $job->isDeleted() && ! $job->isReleased() && ! $job->hasFailed()) {

    throw $e;
 * Mark the given job as failed if it has exceeded the maximum allowed attempts.
 * @param  string  $connectionName
 * @param  \Illuminate\Contracts\Queue\Job  $job
 * @param  int  $maxTries
 * @param  \Exception  $e
 * @return void
protected function markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, $maxTries, $e)
    $maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries;

    if ($job->timeoutAt() && $job->timeoutAt() <= Carbon::now()->getTimestamp()) {
        $this->failJob($connectionName, $job, $e);

    if ($maxTries > 0 && $job->attempts() >= $maxTries) {
        $this->failJob($connectionName, $job, $e);
 * Mark the given job as failed and raise the relevant event.
 * @param  string  $connectionName
 * @param  \Illuminate\Contracts\Queue\Job  $job
 * @param  \Exception  $e
 * @return void
protected function failJob($connectionName, $job, $e)
    return FailingJob::handle($connectionName, $job, $e);

FailingJob::handle() メソッドは以下のように実装されている。

 * Delete the job, call the "failed" method, and raise the failed job event.
 * @param  string  $connectionName
 * @param  \Illuminate\Queue\Jobs\Job  $job
 * @param  \Exception $e
 * @return void
public static function handle($connectionName, $job, $e = null)

    if ($job->isDeleted()) {

    try {
        // If the job has failed, we will delete it, call the "failed" method and then call
        // an event indicating the job has failed so it can be logged if needed. This is
        // to allow every developer to better keep monitor of their failed queue jobs.

    } finally {
        static::events()->dispatch(new JobFailed(
            $connectionName, $job, $e ?: new ManuallyFailedException


  1. 以下に該当する場合にジョブに失敗フラグを立てて,キューからジョブを削除する。

    • 今回の試行が最大試行回数目となるとき
    • アプリケーションジョブの $timeoutAt プロパティまたは retryUntil() メソッドで返された時刻がきてしまい,これ以上の再試行がアプリケーション的に「無意味」「有効期限が過ぎているので必ず失敗」などと判断されるとき
      時間を指定する $timeout あるいは --timeout とは別物)
  2. Queue::exceptionOccurred() で登録されたリスナーを実行する。ここでジョブの削除,強制的に失敗フラグを立てる,といったことも行うことができる。

  3. ジョブがまだ失敗しておらず,キューからも削除されておらず, queues:キュー名:reserved セットに残っている場合に,queue.connections.redis.retry_after 秒数ぶんだけ遅延させるために queues:キュー名:delayed セットに移動する。

$job->release($options->delay) が初見だと何をやっているのか分かりづらいが,Luaスクリプトの中身を見れば一目瞭然である。Laravel はこの処理を「リリース」と呼んでいるらしい。

-- Remove the job from the current queue...
redis.call('zrem', "{$queue}:reserved", "{$job->getReservedJob()}")

-- Add the job onto the "delayed" queue...
redis.call('zadd', "{$queue}:delayed", "{$job->availableAt($options->delay)}", "{$job->getReservedJob()}")

return true


SIGALRM ハンドラにより


が実行され,ワーカーはそのまま強制終了される。ポイントは再起動時にタイムアウトジョブがどうなるか?という点である。注目すべきは process() メソッドにあるこの記述だ。

 * Process the given job from the queue.
 * @param  string  $connectionName
 * @param  \Illuminate\Contracts\Queue\Job  $job
 * @param  \Illuminate\Queue\WorkerOptions  $options
 * @return void
 * @throws \Throwable
public function process($connectionName, $job, WorkerOptions $options)
    try {

        /* ... */

            $connectionName, $job, (int) $options->maxTries


        /* ... */

    } catch (Exception $e) {
        /* ... */
    } catch (Throwable $e) {
        /* ... */
 * Mark the given job as failed if it has exceeded the maximum allowed attempts.
 * This will likely be because the job previously exceeded a timeout.
 * @param  string  $connectionName
 * @param  \Illuminate\Contracts\Queue\Job  $job
 * @param  int  $maxTries
 * @return void
protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries)
    $maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries;
    $timeoutAt = $job->timeoutAt();

    if ($timeoutAt && Carbon::now()->getTimestamp() <= $timeoutAt) {

    if (! $timeoutAt && ($maxTries === 0 || $job->attempts() <= $maxTries)) {

    $this->failJob($connectionName, $job, $e = new MaxAttemptsExceededException(
        $job->resolveName().' has been attempted too many times or run too long. The job may have previously timed out.'

    throw $e;


  • markJobAsFailedIfWillExceedMaxAttempts()
    → 今回の試行が最大試行回数目となるとき,ジョブに失敗フラグを立てる。例外スローは呼び出し元に任せる。
  • markJobAsFailedIfAlreadyExceedsMaxAttempts()
    前回の試行で最大試行回数目であったとき,ジョブに失敗フラグを立てて, MaxAttemptsExceededException をスローする。




// Finally, we will check to see if we have exceeded our memory limits or if
// the queue should restart based on other indications. If so, we'll stop
// this worker and let whatever is "monitoring" it restart the process.
$this->stopIfNecessary($options, $lastRestart, $job);
 * Stop the process if necessary.
 * @param  \Illuminate\Queue\WorkerOptions  $options
 * @param  int  $lastRestart
 * @param  mixed  $job
protected function stopIfNecessary(WorkerOptions $options, $lastRestart, $job = null)
    if ($this->shouldQuit) {
    } elseif ($this->memoryExceeded($options->memory)) {
    } elseif ($this->queueShouldRestart($lastRestart)) {
    } elseif ($options->stopWhenEmpty && is_null($job)) {


  • SIGTERM を受け取っていたとき
  • データベースコネクションが切断されたとき
    • プロセスを起動したまま長時間データベース通信を何も行わないと,TCPコネクションが切断されてしまう。
    • とはいえ,Laravel はデータベースサービス側で切断を検知して再接続処理はやってくれていたはず。なぜこっちで再起動する必要が…?(謎
  • メモリ使用量がワーカーの --memory 設定値を超えたとき
    • php.ini の memory_limit よりも余裕を持たせておくと,メモリ不足で身動きできなくなる前に余裕を持って再起動できる。メモリリーク対策として非常に有用。
    • プロセス終了ステータスに 12 を使っている理由は不明。 Laravel Horizon 関係か?
  • ./artisan illuminate:queue:restart が実行されていたとき
    • 最初に取得しておいた $lastRestart と違う値が取得され,終了すべきと判定される。
  • --stop-when-empty が有効でかつ,すべてのジョブ消化が終わったとき
    • 特殊な環境でのバッチ処理向けか?動かし続けるWebサービス向けではない気がする。
 * Determine if the queue worker should restart.
 * @param  int|null  $lastRestart
 * @return bool
protected function queueShouldRestart($lastRestart)
    return $this->getTimestampOfLastQueueRestart() != $lastRestart;
 * Determine if the memory limit has been exceeded.
 * @param  int   $memoryLimit
 * @return bool
public function memoryExceeded($memoryLimit)
    return (memory_get_usage(true) / 1024 / 1024) >= $memoryLimit;
 * Stop listening and bail out of the script.
 * @param  int  $status
 * @return void
public function stop($status = 0)
    $this->events->dispatch(new Events\WorkerStopping($status));




