File "RedisAdapter.php"
Full Path: /var/www/drive/laravel/pulse/src/Support/RedisAdapter.php
File size: 4.6 KB
MIME-type: text/x-php
Charset: utf-8
<?php
namespace Laravel\Pulse\Support;
use Illuminate\Contracts\Config\Repository;
use Illuminate\Redis\Connections\Connection;
use Illuminate\Support\Collection;
use Predis\Client as Predis;
use Predis\Command\RawCommand;
use Predis\Pipeline\Pipeline;
use Predis\Response\ServerException as PredisServerException;
use Redis as PhpRedis;
use Relay\Relay;
/**
* @internal
*/
class RedisAdapter
{
/**
* Create a new Redis instance.
*/
public function __construct(
protected Connection $connection,
protected Repository $config,
protected Pipeline|PhpRedis|Relay|null $client = null,
) {
//
}
/**
* Add an entry to the stream.
*
* @param array<string, string> $dictionary
*/
public function xadd(string $key, array $dictionary): string|Pipeline|PhpRedis|Relay
{
return $this->handle([
'XADD',
$this->config->get('database.redis.options.prefix').$key,
'*',
...collect($dictionary)->keys()->zip($dictionary)->flatten()->all(),
]);
}
/**
* Read a range of items from the stream.
*
* @return array<string, array<string, string>>
*/
public function xrange(string $key, string $start, string $end, ?int $count = null): array
{
return collect($this->handle([ // @phpstan-ignore return.type, argument.templateType, argument.templateType
'XRANGE',
$this->config->get('database.redis.options.prefix').$key,
$start,
$end,
...$count !== null ? ['COUNT', "$count"] : [],
]))->mapWithKeys(fn ($value, $key) => [
$value[0] => collect($value[1]) // @phpstan-ignore argument.templateType, argument.templateType
->chunk(2)
->map->values()
->mapWithKeys(fn ($value, $key) => [$value[0] => $value[1]])
->all(),
])->all();
}
/**
* Trim the stream.
*/
public function xtrim(string $key, string $strategy, string $strategyModifier, string|int $threshold): int
{
return $this->handle([
'XTRIM',
$this->config->get('database.redis.options.prefix').$key,
$strategy,
$strategyModifier,
(string) $threshold,
]);
}
/**
* Delete the items from the stream.
*
* @param \Illuminate\Support\Collection<int, string>|array<int, string> $keys
*/
public function xdel(string $stream, Collection|array $keys): int
{
return $this->handle([
'XDEL',
$this->config->get('database.redis.options.prefix').$stream,
...$keys,
]);
}
/**
* Run commands within a pipeline.
*
* @param (callable(self): void) $closure
* @return array<int, mixed>
*/
public function pipeline(callable $closure): array
{
// Create a pipeline and wrap the Redis client in an instance of this class to ensure our wrapper methods are used within the pipeline...
return $this->connection->pipeline(fn (Pipeline|PhpRedis|Relay $client) => $closure(new self($this->connection, $this->config, $client))); // @phpstan-ignore method.notFound
}
/**
* Run the given command.
*
* @param list<string> $args
*/
protected function handle(array $args): mixed
{
try {
return tap($this->run($args), function ($result) use ($args) {
if ($result === false && ($this->client() instanceof PhpRedis || $this->client() instanceof Relay)) {
throw RedisServerException::whileRunningCommand(implode(' ', $args), $this->client()->getLastError() ?? 'An unknown error occurred.');
}
});
} catch (PredisServerException $e) {
throw RedisServerException::whileRunningCommand(implode(' ', $args), $e->getMessage(), previous: $e);
}
}
/**
* Run the given command.
*
* @param list<string> $args
*/
protected function run(array $args): mixed
{
return match (true) {
$this->client() instanceof PhpRedis => $this->client()->rawCommand(...$args),
$this->client() instanceof Relay => $this->client()->rawCommand(...$args),
$this->client() instanceof Predis,
$this->client() instanceof Pipeline => $this->client()->executeCommand(RawCommand::create(...$args)),
};
}
/**
* Retrieve the Redis client.
*/
protected function client(): PhpRedis|Predis|Pipeline|Relay
{
return $this->client ?? $this->connection->client();
}
}