File "Queues.php"

Full Path: /var/www/drive/laravel/pulse/src/Recorders/Queues.php
File size: 5.08 KB
MIME-type: text/x-php
Charset: utf-8

<?php

namespace Laravel\Pulse\Recorders;

use Carbon\CarbonImmutable;
use Illuminate\Contracts\Config\Repository;
use Illuminate\Events\CallQueuedListener;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
use Illuminate\Queue\Events\JobQueued;
use Illuminate\Queue\Events\JobReleasedAfterException;
use Laravel\Pulse\Pulse;
use ReflectionClass;

/**
 * @internal
 */
class Queues
{
    use Concerns\Ignores, Concerns\Sampling;

    /**
     * The events to listen for.
     *
     * @var list<class-string>
     */
    public array $listen = [
        JobReleasedAfterException::class,
        JobFailed::class,
        JobProcessed::class,
        JobProcessing::class,
        JobQueued::class,
    ];

    /**
     * Create a new recorder instance.
     */
    public function __construct(
        protected Pulse $pulse,
        protected Repository $config,
    ) {
        //
    }

    /**
     * Record the job.
     */
    public function record(JobReleasedAfterException|JobFailed|JobProcessed|JobProcessing|JobQueued $event): void
    {
        if ($event->connectionName === 'sync') {
            return;
        }

        [$timestamp, $class, $connection, $queue, $uuid, $name] = [
            CarbonImmutable::now()->getTimestamp(),
            $class = $event::class,
            match ($class) {
                JobQueued::class => $event->connectionName,
                default => $event->job->getConnectionName(), // @phpstan-ignore method.nonObject
            },
            $this->resolveQueue($event),
            match ($class) {
                JobQueued::class => $event->payload()['uuid'], // @phpstan-ignore method.notFound
                default => $event->job->uuid(), // @phpstan-ignore method.nonObject
            },
            match ($class) {
                JobQueued::class => match (true) {
                    is_string($event->job) => $event->job,
                    method_exists($event->job, 'displayName') => $event->job->displayName(),
                    default => $event->job::class,
                },
                default => $event->job->resolveName(), // @phpstan-ignore method.nonObject
            },
        ];

        $this->pulse->lazy(function () use ($timestamp, $class, $connection, $queue, $uuid, $name) {
            if (! $this->shouldSampleDeterministically($uuid) || $this->shouldIgnore($name)) {
                return;
            }

            $queue = $queue === null
                ? $this->getDefaultQueue($connection)
                : $this->normalizeSqsQueue($connection, $queue);

            $this->pulse->record(
                type: match ($class) { // @phpstan-ignore match.unhandled
                    JobQueued::class => 'queued',
                    JobProcessing::class => 'processing',
                    JobProcessed::class => 'processed',
                    JobReleasedAfterException::class => 'released',
                    JobFailed::class => 'failed',
                },
                key: "{$connection}:{$queue}",
                timestamp: $timestamp,
            )->count()->onlyBuckets();
        });
    }

    /**
     * Get the default queue for the connection
     */
    protected function getDefaultQueue(string $connection): string
    {
        return $this->config->get('queue.connections.'.$connection.'.queue', 'default');
    }

    /**
     * Normalize the SQS queue name.
     */
    protected function normalizeSqsQueue(string $connection, string $queue): string
    {
        $config = $this->config->get("queue.connections.{$connection}") ?? [];

        if (($config['driver'] ?? null) !== 'sqs') {
            return $queue;
        }

        if ($config['prefix'] ?? null) {
            $prefix = preg_quote($config['prefix'], '#');

            $queue = preg_replace("#^{$prefix}/#", '', $queue) ?? $queue;
        }

        if ($config['suffix'] ?? null) {
            $suffix = preg_quote($config['suffix'], '#');

            $queue = preg_replace("#{$suffix}$#", '', $queue) ?? $queue;
        }

        return $queue;
    }

    /**
     * Resolve the queue.
     */
    protected function resolveQueue(JobReleasedAfterException|JobFailed|JobProcessed|JobProcessing|JobQueued $event): ?string
    {
        return match ($event::class) {
            JobQueued::class => match (is_object($event->job) ? $event->job::class : $event->job) {
                CallQueuedListener::class => $this->resolveQueuedListenerQueue($event),
                default => $event->job->queue ?? null,
            },
            default => $event->job->getQueue(), // @phpstan-ignore method.nonObject
        };
    }

    /**
     * Resolve the queued listener's queue.
     */
    protected function resolveQueuedListenerQueue(JobQueued $event): ?string
    {
        return with(
            (new ReflectionClass($event->job->class))->newInstanceWithoutConstructor(), // @phpstan-ignore property.nonObject
            fn ($listener) => method_exists($listener, 'viaQueue')
                ? $listener->viaQueue($event->job->data[0] ?? null)
                : ($listener->queue ?? null)
        );
    }
}