File "DatabaseStorage.php"
Full Path: /var/www/drive/laravel/pulse/src/Storage/DatabaseStorage.php
File size: 32.73 KB
MIME-type: text/x-php
Charset: utf-8
<?php
namespace Laravel\Pulse\Storage;
use Carbon\CarbonImmutable;
use Carbon\CarbonInterval;
use Closure;
use Illuminate\Contracts\Config\Repository;
use Illuminate\Database\Connection;
use Illuminate\Database\DatabaseManager;
use Illuminate\Database\Query\Builder;
use Illuminate\Database\Query\Expression;
use Illuminate\Support\Collection;
use InvalidArgumentException;
use Laravel\Pulse\Contracts\Storage;
use Laravel\Pulse\Entry;
use Laravel\Pulse\Value;
use RuntimeException;
/**
* @phpstan-type AggregateRow array{bucket: int, period: int, type: string, aggregate: string, key: string, value: int|float, count?: int}
*
* @internal
*/
class DatabaseStorage implements Storage
{
/**
* Create a new Database storage instance.
*/
public function __construct(
protected DatabaseManager $db,
protected Repository $config,
) {
//
}
/**
* Store the items.
*
* @param \Illuminate\Support\Collection<int, \Laravel\Pulse\Entry|\Laravel\Pulse\Value> $items
*/
public function store(Collection $items): void
{
if ($items->isEmpty()) {
return;
}
[$entries, $values] = $items->partition(fn (Entry|Value $entry) => $entry instanceof Entry);
$entryChunks = $entries
->reject->isOnlyBuckets()
->when(
$this->requiresManualKeyHash(),
fn ($entries) => $entries->map(fn ($entry) => [
...($attributes = $entry->attributes()),
'key_hash' => md5($attributes['key']),
]),
fn ($entries) => $entries->map->attributes()
)
->chunk($this->config->get('pulse.storage.database.chunk'));
[$counts, $minimums, $maximums, $sums, $averages] = array_values($entries
->reduce(function ($carry, $entry) {
foreach ($entry->aggregations() as $aggregation) {
$carry[$aggregation][] = $entry;
}
return $carry;
}, ['count' => [], 'min' => [], 'max' => [], 'sum' => [], 'avg' => []])
);
$countChunks = $this->preaggregateCounts(collect($counts)) // @phpstan-ignore argument.templateType, argument.templateType
->chunk($this->config->get('pulse.storage.database.chunk'));
$minimumChunks = $this->preaggregateMinimums(collect($minimums)) // @phpstan-ignore argument.templateType, argument.templateType
->chunk($this->config->get('pulse.storage.database.chunk'));
$maximumChunks = $this->preaggregateMaximums(collect($maximums)) // @phpstan-ignore argument.templateType, argument.templateType
->chunk($this->config->get('pulse.storage.database.chunk'));
$sumChunks = $this->preaggregateSums(collect($sums)) // @phpstan-ignore argument.templateType, argument.templateType
->chunk($this->config->get('pulse.storage.database.chunk'));
$averageChunks = $this->preaggregateAverages(collect($averages)) // @phpstan-ignore argument.templateType, argument.templateType
->chunk($this->config->get('pulse.storage.database.chunk'));
$valueChunks = $this // @phpstan-ignore argument.templateType
->collapseValues($values)
->when(
$this->requiresManualKeyHash(),
fn ($values) => $values->map(fn ($value) => [
...($attributes = $value->attributes()),
'key_hash' => md5($attributes['key']),
]),
fn ($values) => $values->map->attributes() // @phpstan-ignore method.notFound
)
->chunk($this->config->get('pulse.storage.database.chunk'));
$this->connection()->transaction(function () use ($entryChunks, $countChunks, $minimumChunks, $maximumChunks, $sumChunks, $averageChunks, $valueChunks) {
$entryChunks->each(fn ($chunk) => $this->connection()
->table('pulse_entries')
->insert($chunk->all()));
$countChunks->each(fn ($chunk) => $this->upsertCount($chunk->all()));
$minimumChunks->each(fn ($chunk) => $this->upsertMin($chunk->all()));
$maximumChunks->each(fn ($chunk) => $this->upsertMax($chunk->all()));
$sumChunks->each(fn ($chunk) => $this->upsertSum($chunk->all()));
$averageChunks->each(fn ($chunk) => $this->upsertAvg($chunk->all()));
$valueChunks->each(fn ($chunk) => $this->connection()
->table('pulse_values')
->upsert($chunk->all(), ['type', 'key_hash'], ['timestamp', 'value'])
);
}, 3);
}
/**
* Trim the storage.
*/
public function trim(): void
{
$now = CarbonImmutable::now();
$this->connection()
->table('pulse_values')
->where('timestamp', '<=', $now->subWeek()->getTimestamp())
->delete();
$this->connection()
->table('pulse_entries')
->where('timestamp', '<=', $now->subWeek()->getTimestamp())
->delete();
$this->connection()
->table('pulse_aggregates')
->distinct()
->pluck('period')
->each(fn (int $period) => $this->connection()
->table('pulse_aggregates')
->where('period', $period)
->where('bucket', '<=', $now->subMinutes($period)->getTimestamp())
->delete());
}
/**
* Purge the storage.
*
* @param list<string> $types
*/
public function purge(?array $types = null): void
{
if ($types === null) {
$this->connection()->table('pulse_values')->truncate();
$this->connection()->table('pulse_entries')->truncate();
$this->connection()->table('pulse_aggregates')->truncate();
return;
}
$this->connection()->table('pulse_values')->whereIn('type', $types)->delete();
$this->connection()->table('pulse_entries')->whereIn('type', $types)->delete();
$this->connection()->table('pulse_aggregates')->whereIn('type', $types)->delete();
}
/**
* Insert new records or update the existing ones and update the count.
*
* @param list<AggregateRow> $values
*/
protected function upsertCount(array $values): int
{
return $this->connection()->table('pulse_aggregates')->upsert(
$values,
['bucket', 'period', 'type', 'aggregate', 'key_hash'],
[
'value' => match ($driver = $this->connection()->getDriverName()) {
'mariadb', 'mysql' => new Expression('`value` + values(`value`)'),
'pgsql', 'sqlite' => new Expression('"pulse_aggregates"."value" + "excluded"."value"'),
default => throw new RuntimeException("Unsupported database driver [{$driver}]"),
},
]
);
}
/**
* Insert new records or update the existing ones and the minimum.
*
* @param list<AggregateRow> $values
*/
protected function upsertMin(array $values): int
{
return $this->connection()->table('pulse_aggregates')->upsert(
$values,
['bucket', 'period', 'type', 'aggregate', 'key_hash'],
[
'value' => match ($driver = $this->connection()->getDriverName()) {
'mariadb', 'mysql' => new Expression('least(`value`, values(`value`))'),
'pgsql' => new Expression('least("pulse_aggregates"."value", "excluded"."value")'),
'sqlite' => new Expression('min("pulse_aggregates"."value", "excluded"."value")'),
default => throw new RuntimeException("Unsupported database driver [{$driver}]"),
},
]
);
}
/**
* Insert new records or update the existing ones and the maximum.
*
* @param list<AggregateRow> $values
*/
protected function upsertMax(array $values): int
{
return $this->connection()->table('pulse_aggregates')->upsert(
$values,
['bucket', 'period', 'type', 'aggregate', 'key_hash'],
[
'value' => match ($driver = $this->connection()->getDriverName()) {
'mariadb', 'mysql' => new Expression('greatest(`value`, values(`value`))'),
'pgsql' => new Expression('greatest("pulse_aggregates"."value", "excluded"."value")'),
'sqlite' => new Expression('max("pulse_aggregates"."value", "excluded"."value")'),
default => throw new RuntimeException("Unsupported database driver [{$driver}]"),
},
]
);
}
/**
* Insert new records or update the existing ones and the sum.
*
* @param list<AggregateRow> $values
*/
protected function upsertSum(array $values): int
{
return $this->connection()->table('pulse_aggregates')->upsert(
$values,
['bucket', 'period', 'type', 'aggregate', 'key_hash'],
[
'value' => match ($driver = $this->connection()->getDriverName()) {
'mariadb', 'mysql' => new Expression('`value` + values(`value`)'),
'pgsql', 'sqlite' => new Expression('"pulse_aggregates"."value" + "excluded"."value"'),
default => throw new RuntimeException("Unsupported database driver [{$driver}]"),
},
]
);
}
/**
* Insert new records or update the existing ones and the average.
*
* @param list<AggregateRow> $values
*/
protected function upsertAvg(array $values): int
{
return $this->connection()->table('pulse_aggregates')->upsert(
$values,
['bucket', 'period', 'type', 'aggregate', 'key_hash'],
match ($driver = $this->connection()->getDriverName()) {
'mariadb', 'mysql' => [
'value' => new Expression('(`value` * `count` + (values(`value`) * values(`count`))) / (`count` + values(`count`))'),
'count' => new Expression('`count` + values(`count`)'),
],
'pgsql', 'sqlite' => [
'value' => new Expression('("pulse_aggregates"."value" * "pulse_aggregates"."count" + ("excluded"."value" * "excluded"."count")) / ("pulse_aggregates"."count" + "excluded"."count")'),
'count' => new Expression('"pulse_aggregates"."count" + "excluded"."count"'),
],
default => throw new RuntimeException("Unsupported database driver [{$driver}]"),
}
);
}
/**
* Pre-aggregate entry counts.
*
* @param \Illuminate\Support\Collection<int, \Laravel\Pulse\Entry> $entries
* @return \Illuminate\Support\Collection<int, AggregateRow>
*/
protected function preaggregateCounts(Collection $entries): Collection
{
return $this->preaggregate($entries, 'count', fn ($aggregate) => [
...$aggregate,
'value' => ($aggregate['value'] ?? 0) + 1,
]);
}
/**
* Pre-aggregate entry minimums.
*
* @param \Illuminate\Support\Collection<int, \Laravel\Pulse\Entry> $entries
* @return \Illuminate\Support\Collection<int, AggregateRow>
*/
protected function preaggregateMinimums(Collection $entries): Collection
{
return $this->preaggregate($entries, 'min', fn ($aggregate, $entry) => [
...$aggregate,
'value' => ! isset($aggregate['value'])
? $entry->value
: (int) min($aggregate['value'], $entry->value),
]);
}
/**
* Pre-aggregate entry maximums.
*
* @param \Illuminate\Support\Collection<int, \Laravel\Pulse\Entry> $entries
* @return \Illuminate\Support\Collection<int, AggregateRow>
*/
protected function preaggregateMaximums(Collection $entries): Collection
{
return $this->preaggregate($entries, 'max', fn ($aggregate, $entry) => [
...$aggregate,
'value' => ! isset($aggregate['value'])
? $entry->value
: (int) max($aggregate['value'], $entry->value),
]);
}
/**
* Pre-aggregate entry sums.
*
* @param \Illuminate\Support\Collection<int, \Laravel\Pulse\Entry> $entries
* @return \Illuminate\Support\Collection<int, AggregateRow>
*/
protected function preaggregateSums(Collection $entries): Collection
{
return $this->preaggregate($entries, 'sum', fn ($aggregate, $entry) => [
...$aggregate,
'value' => ($aggregate['value'] ?? 0) + $entry->value,
]);
}
/**
* Pre-aggregate entry averages.
*
* @param \Illuminate\Support\Collection<int, \Laravel\Pulse\Entry> $entries
* @return \Illuminate\Support\Collection<int, AggregateRow>
*/
protected function preaggregateAverages(Collection $entries): Collection
{
return $this->preaggregate($entries, 'avg', fn ($aggregate, $entry) => [
...$aggregate,
'value' => ! isset($aggregate['value'])
? $entry->value
: ($aggregate['value'] * $aggregate['count'] + $entry->value) / ($aggregate['count'] + 1),
'count' => ($aggregate['count'] ?? 0) + 1,
]);
}
/**
* Collapse the given values.
*
* @param \Illuminate\Support\Collection<int, \Laravel\Pulse\Value> $values
* @return \Illuminate\Support\Collection<int, \Laravel\Pulse\Value>
*/
protected function collapseValues(Collection $values): Collection
{
return $values->reverse()->unique(fn (Value $value) => [$value->key, $value->type]);
}
/**
* Pre-aggregate entries with a callback.
*
* @param \Illuminate\Support\Collection<int, \Laravel\Pulse\Entry> $entries
* @return \Illuminate\Support\Collection<int, AggregateRow>
*/
protected function preaggregate(Collection $entries, string $aggregate, Closure $callback): Collection
{
$aggregates = [];
foreach ($entries as $entry) {
foreach ($this->periods() as $period) {
// Exclude entries that would be trimmed.
if ($entry->timestamp < CarbonImmutable::now()->subMinutes($period)->getTimestamp()) {
continue;
}
$bucket = (int) (floor($entry->timestamp / $period) * $period);
$key = $entry->type.':'.$period.':'.$bucket.':'.$entry->key;
if (! isset($aggregates[$key])) {
$aggregates[$key] = $callback([
'bucket' => $bucket,
'period' => $period,
'type' => $entry->type,
'aggregate' => $aggregate,
'key' => $entry->key,
], $entry);
if ($this->requiresManualKeyHash()) {
$aggregates[$key]['key_hash'] = md5($entry->key);
}
} else {
$aggregates[$key] = $callback($aggregates[$key], $entry);
}
}
}
return collect(array_values($aggregates));
}
/**
* The periods to aggregate for.
*
* @return list<int>
*/
protected function periods(): array
{
return [
(int) (CarbonInterval::hour()->totalSeconds / 60),
(int) (CarbonInterval::hours(6)->totalSeconds / 60),
(int) (CarbonInterval::hours(24)->totalSeconds / 60),
(int) (CarbonInterval::days(7)->totalSeconds / 60),
];
}
/**
* Retrieve values for the given type.
*
* @param list<string> $keys
* @return \Illuminate\Support\Collection<string, object{
* timestamp: int,
* key: string,
* value: string
* }>
*/
public function values(string $type, ?array $keys = null): Collection
{
return $this->connection()
->table('pulse_values')
->select('timestamp', 'key', 'value')
->where('type', $type)
->when($keys, fn ($query) => $query->whereIn('key', $keys))
->get()
->keyBy('key');
}
/**
* Retrieve aggregate values for plotting on a graph.
*
* @param list<string> $types
* @param 'count'|'min'|'max'|'sum'|'avg' $aggregate
* @return \Illuminate\Support\Collection<string, \Illuminate\Support\Collection<string, \Illuminate\Support\Collection<string, int|null>>>
*/
public function graph(array $types, string $aggregate, CarbonInterval $interval): Collection
{
if (! in_array($aggregate, $allowed = ['count', 'min', 'max', 'sum', 'avg'])) {
throw new InvalidArgumentException("Invalid aggregate type [$aggregate], allowed types: [".implode(', ', $allowed).'].');
}
$now = CarbonImmutable::now();
$period = $interval->totalSeconds / 60;
$maxDataPoints = 60;
$secondsPerPeriod = ($interval->totalSeconds / $maxDataPoints);
$currentBucket = (int) (floor($now->getTimestamp() / $secondsPerPeriod) * $secondsPerPeriod);
$firstBucket = $currentBucket - (($maxDataPoints - 1) * $secondsPerPeriod);
$padding = collect()
->range(0, 59)
->mapWithKeys(fn ($i) => [CarbonImmutable::createFromTimestamp($firstBucket + $i * $secondsPerPeriod)->toDateTimeString() => null]);
$structure = collect($types)->mapWithKeys(fn ($type) => [$type => $padding]);
return $this->connection()->table('pulse_aggregates')
->select(['bucket', 'type', 'key', 'value'])
->whereIn('type', $types)
->where('aggregate', $aggregate)
->where('period', $period)
->where('bucket', '>=', $firstBucket)
->orderBy('bucket')
->get()
->groupBy('key')
->sortKeys()
->map(fn ($readings) => $structure->merge($readings
->groupBy('type')
->map(fn ($readings) => $padding->merge(
$readings->mapWithKeys(function ($reading) {
return [CarbonImmutable::createFromTimestamp($reading->bucket)->toDateTimeString() => $reading->value];
})
))
));
}
/**
* Retrieve aggregate values for the given type.
*
* @param 'count'|'min'|'max'|'sum'|'avg'|list<'count'|'min'|'max'|'sum'|'avg'> $aggregates
* @return \Illuminate\Support\Collection<int, object{
* key: string,
* min?: int,
* max?: int,
* sum?: int,
* avg?: int,
* count?: int
* }>
*/
public function aggregate(
string $type,
array|string $aggregates,
CarbonInterval $interval,
?string $orderBy = null,
string $direction = 'desc',
int $limit = 101,
): Collection {
$aggregates = is_array($aggregates) ? $aggregates : [$aggregates];
if ($invalid = array_diff($aggregates, $allowed = ['count', 'min', 'max', 'sum', 'avg'])) {
throw new InvalidArgumentException('Invalid aggregate type(s) ['.implode(', ', $invalid).'], allowed types: ['.implode(', ', $allowed).'].');
}
$orderBy ??= $aggregates[0];
return $this->connection()
->query()
->select([
'key' => fn (Builder $query) => $query
->select('key')
->from('pulse_entries', as: 'keys')
->whereColumn('keys.key_hash', 'aggregated.key_hash')
->limit(1),
...$aggregates,
])
->fromSub(function (Builder $query) use ($type, $aggregates, $interval, $orderBy, $direction, $limit) {
$query->select('key_hash');
foreach ($aggregates as $aggregate) {
$query->selectRaw(match ($aggregate) {
'count' => "sum({$this->wrap('count')})",
'min' => "min({$this->wrap('min')})",
'max' => "max({$this->wrap('max')})",
'sum' => "sum({$this->wrap('sum')})",
'avg' => "avg({$this->wrap('avg')})",
}." as {$this->wrap($aggregate)}");
}
$query->fromSub(function (Builder $query) use ($type, $aggregates, $interval) {
$now = CarbonImmutable::now();
$period = $interval->totalSeconds / 60;
$windowStart = (int) ($now->getTimestamp() - $interval->totalSeconds + 1);
$currentBucket = (int) (floor($now->getTimestamp() / $period) * $period);
$oldestBucket = $currentBucket - $interval->totalSeconds + $period;
// Tail
$query->select('key_hash');
foreach ($aggregates as $aggregate) {
$query->selectRaw(match ($aggregate) {
'count' => 'count(*)',
'min' => "min({$this->wrap('value')})",
'max' => "max({$this->wrap('value')})",
'sum' => "sum({$this->wrap('value')})",
'avg' => "avg({$this->wrap('value')})",
}." as {$this->wrap($aggregate)}");
}
$query
->from('pulse_entries')
->where('type', $type)
->where('timestamp', '>=', $windowStart)
->where('timestamp', '<=', $oldestBucket - 1)
->groupBy('key_hash');
// Buckets
foreach ($aggregates as $currentAggregate) {
$query->unionAll(function (Builder $query) use ($type, $aggregates, $currentAggregate, $period, $oldestBucket) {
$query->select('key_hash');
foreach ($aggregates as $aggregate) {
if ($aggregate === $currentAggregate) {
$query->selectRaw(match ($aggregate) {
'count' => "sum({$this->wrap('value')})",
'min' => "min({$this->wrap('value')})",
'max' => "max({$this->wrap('value')})",
'sum' => "sum({$this->wrap('value')})",
'avg' => "avg({$this->wrap('value')})",
}." as {$this->wrap($aggregate)}");
} else {
$query->selectRaw("null as {$this->wrap($aggregate)}");
}
}
$query
->from('pulse_aggregates')
->where('period', $period)
->where('type', $type)
->where('aggregate', $currentAggregate)
->where('bucket', '>=', $oldestBucket)
->groupBy('key_hash');
});
}
}, as: 'results')
->groupBy('key_hash')
->orderBy($orderBy, $direction)
->limit($limit);
}, as: 'aggregated')
->get();
}
/**
* Retrieve aggregate values for the given types.
*
* @param string|list<string> $types
* @param 'count'|'min'|'max'|'sum'|'avg' $aggregate
* @return \Illuminate\Support\Collection<int, object>
*/
public function aggregateTypes(
string|array $types,
string $aggregate,
CarbonInterval $interval,
?string $orderBy = null,
string $direction = 'desc',
int $limit = 101,
): Collection {
if (! in_array($aggregate, $allowed = ['count', 'min', 'max', 'sum', 'avg'])) {
throw new InvalidArgumentException("Invalid aggregate type [$aggregate], allowed types: [".implode(', ', $allowed).'].');
}
$types = is_array($types) ? $types : [$types];
$orderBy ??= $types[0];
return $this->connection()
->query()
->select([
'key' => fn (Builder $query) => $query
->select('key')
->from('pulse_entries', as: 'keys')
->whereColumn('keys.key_hash', 'aggregated.key_hash')
->limit(1),
...$types,
])
->fromSub(function (Builder $query) use ($types, $aggregate, $interval, $orderBy, $direction, $limit) {
$query->select('key_hash');
foreach ($types as $type) {
$query->selectRaw(match ($aggregate) {
'count' => "sum({$this->wrap($type)})",
'min' => "min({$this->wrap($type)})",
'max' => "max({$this->wrap($type)})",
'sum' => "sum({$this->wrap($type)})",
'avg' => "avg({$this->wrap($type)})",
}." as {$this->wrap($type)}");
}
$query->fromSub(function (Builder $query) use ($types, $aggregate, $interval) {
$now = CarbonImmutable::now();
$period = $interval->totalSeconds / 60;
$windowStart = (int) ($now->getTimestamp() - $interval->totalSeconds + 1);
$currentBucket = (int) (floor($now->getTimestamp() / $period) * $period);
$oldestBucket = $currentBucket - $interval->totalSeconds + $period;
// Tail
$query->select('key_hash');
foreach ($types as $type) {
$query->selectRaw(match ($aggregate) {
'count' => "count(case when ({$this->wrap('type')} = ?) then true else null end)",
'min' => "min(case when ({$this->wrap('type')} = ?) then {$this->wrap('value')} else null end)",
'max' => "max(case when ({$this->wrap('type')} = ?) then {$this->wrap('value')} else null end)",
'sum' => "sum(case when ({$this->wrap('type')} = ?) then {$this->wrap('value')} else null end)",
'avg' => "avg(case when ({$this->wrap('type')} = ?) then {$this->wrap('value')} else null end)",
}." as {$this->wrap($type)}", [$type]);
}
$query
->from('pulse_entries')
->whereIn('type', $types)
->where('timestamp', '>=', $windowStart)
->where('timestamp', '<=', $oldestBucket - 1)
->groupBy('key_hash');
// Buckets
$query->unionAll(function (Builder $query) use ($types, $aggregate, $period, $oldestBucket) {
$query->select('key_hash');
foreach ($types as $type) {
$query->selectRaw(match ($aggregate) {
'count' => "sum(case when ({$this->wrap('type')} = ?) then {$this->wrap('value')} else null end)",
'min' => "min(case when ({$this->wrap('type')} = ?) then {$this->wrap('value')} else null end)",
'max' => "max(case when ({$this->wrap('type')} = ?) then {$this->wrap('value')} else null end)",
'sum' => "sum(case when ({$this->wrap('type')} = ?) then {$this->wrap('value')} else null end)",
'avg' => "avg(case when ({$this->wrap('type')} = ?) then {$this->wrap('value')} else null end)",
}." as {$this->wrap($type)}", [$type]);
}
$query
->from('pulse_aggregates')
->where('period', $period)
->whereIn('type', $types)
->where('aggregate', $aggregate)
->where('bucket', '>=', $oldestBucket)
->groupBy('key_hash');
});
}, as: 'results')
->groupBy('key_hash')
->orderBy($orderBy, $direction)
->limit($limit);
}, as: 'aggregated')
->get();
}
/**
* Retrieve an aggregate total for the given types.
*
* @param string|list<string> $types
* @param 'count'|'min'|'max'|'sum'|'avg' $aggregate
* @return float|\Illuminate\Support\Collection<string, int>
*/
public function aggregateTotal(
array|string $types,
string $aggregate,
CarbonInterval $interval,
): float|Collection {
if (! in_array($aggregate, $allowed = ['count', 'min', 'max', 'sum', 'avg'])) {
throw new InvalidArgumentException("Invalid aggregate type [$aggregate], allowed types: [".implode(', ', $allowed).'].');
}
$now = CarbonImmutable::now();
$period = $interval->totalSeconds / 60;
$windowStart = (int) ($now->getTimestamp() - $interval->totalSeconds + 1);
$currentBucket = (int) (floor($now->getTimestamp() / $period) * $period);
$oldestBucket = $currentBucket - $interval->totalSeconds + $period;
$tailStart = $windowStart;
$tailEnd = $oldestBucket - 1;
return $this->connection()->query() // @phpstan-ignore return.type
->when(is_array($types), fn ($query) => $query->addSelect('type'))
->selectRaw(match ($aggregate) {
'count' => "sum({$this->wrap('count')})",
'min' => "min({$this->wrap('min')})",
'max' => "max({$this->wrap('max')})",
'sum' => "sum({$this->wrap('sum')})",
'avg' => "avg({$this->wrap('avg')})",
}." as {$this->wrap($aggregate)}")
->fromSub(fn (Builder $query) => $query
// Tail
->addSelect('type')
->selectRaw(match ($aggregate) {
'count' => 'count(*)',
'min' => "min({$this->wrap('value')})",
'max' => "max({$this->wrap('value')})",
'sum' => "sum({$this->wrap('value')})",
'avg' => "avg({$this->wrap('value')})",
}." as {$this->wrap($aggregate)}")
->from('pulse_entries')
->when(
is_array($types),
fn ($query) => $query->whereIn('type', $types),
fn ($query) => $query->where('type', $types)
)
->where('timestamp', '>=', $tailStart)
->where('timestamp', '<=', $tailEnd)
->groupBy('type')
// Buckets
->unionAll(fn (Builder $query) => $query
->select('type')
->selectRaw(match ($aggregate) {
'count' => "sum({$this->wrap('value')})",
'min' => "min({$this->wrap('value')})",
'max' => "max({$this->wrap('value')})",
'sum' => "sum({$this->wrap('value')})",
'avg' => "avg({$this->wrap('value')})",
}." as {$this->wrap($aggregate)}")
->from('pulse_aggregates')
->where('period', $period)
->when(
is_array($types),
fn ($query) => $query->whereIn('type', $types),
fn ($query) => $query->where('type', $types)
)
->where('aggregate', $aggregate)
->where('bucket', '>=', $oldestBucket)
->groupBy('type')
), as: 'child'
)
->groupBy('type')
->when(
is_array($types),
fn ($query) => $query->pluck($aggregate, 'type'),
fn ($query) => (float) $query->value($aggregate)
);
}
/**
* Resolve the database connection.
*/
protected function connection(): Connection
{
return $this->db->connection($this->config->get('pulse.storage.database.connection'));
}
/**
* Wrap a value in keyword identifiers.
*/
protected function wrap(string $value): string
{
return $this->connection()->getQueryGrammar()->wrap($value);
}
/**
* Determine whether a manually generated key hash is required.
*/
protected function requiresManualKeyHash(): bool
{
return $this->connection()->getDriverName() === 'sqlite';
}
}