File "RedisIngest.php"

Full Path: /var/www/drive/laravel/pulse/src/Ingests/RedisIngest.php
File size: 2.85 KB
MIME-type: text/x-php
Charset: utf-8

<?php

namespace Laravel\Pulse\Ingests;

use Carbon\CarbonImmutable;
use Carbon\CarbonInterval;
use Illuminate\Contracts\Config\Repository;
use Illuminate\Redis\RedisManager;
use Illuminate\Support\Collection;
use Laravel\Pulse\Contracts\Ingest;
use Laravel\Pulse\Contracts\Storage;
use Laravel\Pulse\Entry;
use Laravel\Pulse\Support\RedisAdapter;
use Laravel\Pulse\Value;

/**
 * @internal
 */
class RedisIngest implements Ingest
{
    /**
     * The redis stream.
     */
    protected string $stream = 'laravel:pulse:ingest';

    /**
     * Create a new Redis Ingest instance.
     */
    public function __construct(
        protected RedisManager $redis,
        protected Repository $config,
    ) {
        //
    }

    /**
     * Ingest the items.
     *
     * @param  \Illuminate\Support\Collection<int, \Laravel\Pulse\Entry|\Laravel\Pulse\Value>  $items
     */
    public function ingest(Collection $items): void
    {
        if ($items->isEmpty()) {
            return;
        }

        $this->connection()->pipeline(function (RedisAdapter $pipeline) use ($items) {
            $items->each(fn (Entry|Value $entry) => $pipeline->xadd($this->stream, [
                'data' => serialize($entry),
            ]));
        });
    }

    /**
     * Trim the ingest.
     */
    public function trim(): void
    {
        $keep = $this->config->get('pulse.ingest.trim.keep');

        $this->connection()->xtrim(
            $this->stream,
            is_int($keep) ? 'MAXLEN' : 'MINID',
            '~',
            is_int($keep)
                ? $keep
                : CarbonImmutable::now()->subMilliseconds(
                    (int) CarbonInterval::fromString($keep)->totalMilliseconds
                )->getTimestampMs(),
        );
    }

    /**
     * Digest the ingested items.
     */
    public function digest(Storage $storage): int
    {
        $total = 0;

        while (true) {
            $entries = collect($this->connection()->xrange(
                $this->stream,
                '-',
                '+',
                $chunk = $this->config->get('pulse.ingest.redis.chunk')
            ));

            if ($entries->isEmpty()) {
                return $total;
            }

            $keys = $entries->keys();

            $storage->store(
                $entries->map(fn (array $payload): Entry|Value => unserialize($payload['data']))->values()
            );

            $this->connection()->xdel($this->stream, $keys);

            if ($entries->count() < $chunk) {
                return $total + $entries->count();
            }

            $total = $total + $entries->count();
        }
    }

    /**
     * Resolve the redis connection.
     */
    protected function connection(): RedisAdapter
    {
        return new RedisAdapter($this->redis->connection(
            $this->config->get('pulse.ingest.redis.connection')
        ), $this->config);
    }
}