feat: cache keys for online users and node versions

This commit is contained in:
kastov 2026-03-27 21:01:21 +03:00
parent 76a3f0372d
commit 4e0f41321f
No known key found for this signature in database
GPG key ID: 1B27BE29057F4C90
26 changed files with 328 additions and 194 deletions

View file

@ -10,6 +10,9 @@ export const CACHE_KEYS = {
REMNAWAVE_SETTINGS: 'remnawave_settings',
NODE_SYSTEM_INFO: (uuid: string) => `node_system_info:${uuid}`,
NODE_SYSTEM_STATS: (uuid: string) => `node_system_stats:${uuid}`,
NODE_USERS_ONLINE: (uuid: string) => `node_users_online:${uuid}`,
NODE_VERSIONS: (uuid: string) => `node_versions:${uuid}`,
NODE_XRAY_UPTIME: (uuid: string) => `node_xray_uptime:${uuid}`,
} as const;
export const CACHE_KEYS_TTL = {
@ -17,6 +20,8 @@ export const CACHE_KEYS_TTL = {
EXTERNAL_SQUAD_SETTINGS: 3_600, // 1 hour
SUBSCRIPTION_SETTINGS: 3_600, // 1 hour
NODE_SYSTEM_STATS: 30, // 30 seconds
NODE_USERS_ONLINE: 16, // 16 seconds
NODE_XRAY_UPTIME: 16, // 16 seconds
} as const;
export const INTERNAL_CACHE_KEYS = {

View file

@ -19,16 +19,11 @@ export const NodesSchema = z.object({
.transform((str) => new Date(str)),
),
lastStatusMessage: z.nullable(z.string()),
xrayVersion: z.nullable(z.string()),
nodeVersion: z.nullable(z.string()),
xrayUptime: z.string(),
isTrafficTrackingActive: z.boolean(),
trafficResetDay: z.nullable(z.number().int()),
trafficLimitBytes: z.nullable(z.number()),
trafficUsedBytes: z.nullable(z.number()),
notifyPercent: z.nullable(z.number().int()),
usersOnline: z.nullable(z.number().int()),
viewPosition: z.number().int(),
countryCode: z.string(),
consumptionMultiplier: z.number(),
@ -52,4 +47,12 @@ export const NodesSchema = z.object({
provider: z.nullable(PartialInfraProviderSchema),
activePluginUuid: z.nullable(z.string().uuid()),
system: z.nullable(NodeSystemSchema),
versions: z.nullable(
z.object({
xray: z.string(),
node: z.string(),
}),
),
xrayUptime: z.number(),
usersOnline: z.number(),
});

View file

@ -1,6 +1,6 @@
{
"name": "@remnawave/backend-contract",
"version": "2.6.55",
"version": "2.6.56",
"public": true,
"license": "AGPL-3.0-only",
"description": "A contract library for Remnawave Backend. It can be used in backend and frontend.",

View file

@ -0,0 +1,14 @@
/*
Warnings:
- You are about to drop the column `node_version` on the `nodes` table. All the data in the column will be lost.
- You are about to drop the column `users_online` on the `nodes` table. All the data in the column will be lost.
- You are about to drop the column `xray_uptime` on the `nodes` table. All the data in the column will be lost.
- You are about to drop the column `xray_version` on the `nodes` table. All the data in the column will be lost.
*/
-- AlterTable
ALTER TABLE "nodes" DROP COLUMN "node_version",
DROP COLUMN "users_online",
DROP COLUMN "xray_uptime",
DROP COLUMN "xray_version";

View file

@ -177,11 +177,7 @@ model Nodes {
lastStatusChange DateTime? @map("last_status_change")
lastStatusMessage String? @map("last_status_message")
xrayVersion String? @map("xray_version")
nodeVersion String? @map("node_version")
xrayUptime String @default("0") @map("xray_uptime")
usersOnline Int? @default(0) @map("users_online")
consumptionMultiplier BigInt @default(1000000000) @map("consumption_multiplier")
isTrafficTrackingActive Boolean @default(false) @map("is_traffic_tracking_active")

View file

@ -21,10 +21,28 @@ export class RawCacheService {
}
}
async getNumber(key: string): Promise<number> {
const raw = await this.redis.get(key);
return raw ? Number(raw) : 0;
}
async setNumber(key: string, value: number, ttlSeconds?: number): Promise<void> {
if (ttlSeconds) {
await this.redis.set(key, value, 'EX', ttlSeconds);
} else {
await this.redis.set(key, value);
}
}
async del(key: string): Promise<void> {
await this.redis.del(key);
}
async delMany(keys: string[]): Promise<void> {
if (keys.length === 0) return;
await this.redis.del(...keys);
}
async hgetallParsed<T>(key: string): Promise<T | null> {
const raw = await this.redis.hgetall(key);
if (!Object.keys(raw).length) return null;
@ -47,6 +65,22 @@ export class RawCacheService {
await this.redis.hset(key, field, JSON.stringify(value));
}
async setMany(entries: { key: string; value: unknown; ttlSeconds?: number }[]): Promise<void> {
const pipe = this.redis.pipeline();
for (const { key, value, ttlSeconds } of entries) {
const raw =
typeof value === 'number' || typeof value === 'string'
? value
: JSON.stringify(value);
if (ttlSeconds) {
pipe.set(key, raw, 'EX', ttlSeconds);
} else {
pipe.set(key, raw);
}
}
await pipe.exec();
}
createPipeline(): ChainableCommander {
return this.redis.pipeline();
}

View file

@ -20,9 +20,9 @@ import {
TorrentBlockerEvent,
} from '@integration-modules/notifications/interfaces';
import { INodeHotCache, INodeSystem, INodeVersions } from '@modules/nodes/interfaces';
import { GetFullUserResponseModel } from '@modules/users/models';
import { NodeResponseModel } from '@modules/nodes/models';
import { INodeSystem } from '@modules/nodes/interfaces';
import { WebhookLoggerQueueService } from '@queue/notifications/webhook-logger/webhook-logger.service';
@ -259,14 +259,20 @@ export class WebhookEvents {
}
}
private async getNodesSystemInfo(uuid: string): Promise<INodeSystem | null> {
const [rawInfo, rawHot] = await Promise.all([
private async getNodesSystemInfo(uuid: string): Promise<INodeHotCache> {
const [info, stats, onlineUsers, xrayUptime, versions] = await Promise.all([
this.rawCacheService.get<INodeSystem['info']>(CACHE_KEYS.NODE_SYSTEM_INFO(uuid)),
this.rawCacheService.get<INodeSystem['stats']>(CACHE_KEYS.NODE_SYSTEM_STATS(uuid)),
this.rawCacheService.getNumber(CACHE_KEYS.NODE_USERS_ONLINE(uuid)),
this.rawCacheService.getNumber(CACHE_KEYS.NODE_XRAY_UPTIME(uuid)),
this.rawCacheService.get<INodeVersions>(CACHE_KEYS.NODE_VERSIONS(uuid)),
]);
if (!rawInfo || !rawHot) return null;
return { info: rawInfo, stats: rawHot };
return {
system: info && stats ? { info, stats } : null,
onlineUsers,
versions,
xrayUptime,
};
}
}

View file

@ -17,12 +17,6 @@ export class NodesEntity implements Nodes {
public lastStatusChange: Date | null;
public lastStatusMessage: null | string;
public xrayVersion: null | string;
public nodeVersion: null | string;
public xrayUptime: string;
public usersOnline: null | number;
public isTrafficTrackingActive: boolean;
public trafficResetDay: null | number;
public trafficLimitBytes: bigint | null;

View file

@ -1,2 +1,3 @@
export * from './node-host-info.interface';
export * from './node-hot-cache.interface';
export * from './reorder-node.interface';

View file

@ -0,0 +1,13 @@
import { INodeSystem } from './node-host-info.interface';
export interface INodeVersions {
xray: string;
node: string;
}
export interface INodeHotCache {
system: INodeSystem | null;
versions: INodeVersions | null;
xrayUptime: number;
onlineUsers: number;
}

View file

@ -3,7 +3,7 @@ import { fromNanoToNumber } from '@common/utils/nano';
import { ConfigProfileInboundEntity } from '@modules/config-profiles/entities';
import { InfraProviderEntity } from '@modules/infra-billing/entities';
import { INodeSystem } from '../interfaces';
import { INodeHotCache, INodeSystem, INodeVersions } from '../interfaces';
import { NodesEntity } from '../entities';
export class NodeResponseModel {
@ -16,14 +16,9 @@ export class NodeResponseModel {
public isDisabled: boolean;
public lastStatusChange: Date | null;
public lastStatusMessage: null | string;
public xrayVersion: null | string;
public nodeVersion: null | string;
public xrayUptime: string;
public isTrafficTrackingActive: boolean;
public trafficResetDay: null | number;
public usersOnline: null | number;
public consumptionMultiplier: number;
public isTrafficTrackingActive: boolean;
public trafficLimitBytes: null | number;
public trafficUsedBytes: null | number;
public notifyPercent: null | number;
@ -42,9 +37,12 @@ export class NodeResponseModel {
public provider: InfraProviderEntity | null;
public activePluginUuid: string | null;
public xrayUptime: number;
public usersOnline: number;
public system: INodeSystem | null;
public versions: INodeVersions | null;
constructor(data: NodesEntity, system: INodeSystem | null) {
constructor(data: NodesEntity, hotCache: INodeHotCache) {
this.uuid = data.uuid;
this.name = data.name;
this.address = data.address;
@ -54,15 +52,11 @@ export class NodeResponseModel {
this.isDisabled = data.isDisabled;
this.lastStatusChange = data.lastStatusChange;
this.lastStatusMessage = data.lastStatusMessage;
this.xrayVersion = data.xrayVersion;
this.nodeVersion = data.nodeVersion;
this.xrayUptime = data.xrayUptime;
this.isTrafficTrackingActive = data.isTrafficTrackingActive;
this.trafficResetDay = data.trafficResetDay;
this.trafficLimitBytes = Number(data.trafficLimitBytes);
this.trafficUsedBytes = Number(data.trafficUsedBytes);
this.notifyPercent = data.notifyPercent;
this.usersOnline = data.usersOnline;
this.consumptionMultiplier = fromNanoToNumber(data.consumptionMultiplier);
@ -81,6 +75,10 @@ export class NodeResponseModel {
this.providerUuid = data.providerUuid;
this.provider = data.provider;
this.activePluginUuid = data.activePluginUuid;
this.system = system;
this.system = hotCache.system;
this.usersOnline = hotCache.onlineUsers;
this.versions = hotCache.versions;
this.xrayUptime = hotCache.xrayUptime;
}
}

View file

@ -4,57 +4,104 @@ import { Injectable } from '@nestjs/common';
import { RawCacheService } from '@common/raw-cache';
import { INodeSystem } from './interfaces';
import { INodeHotCache, INodeSystem, INodeVersions } from './interfaces';
@Injectable()
export class NodesSystemCacheService {
constructor(private readonly rawCacheService: RawCacheService) {}
async getMany(nodes: { uuid: string }[]): Promise<Map<string, INodeSystem | null>> {
async getMany(nodes: { uuid: string }[]): Promise<Map<string, INodeHotCache>> {
const pipe = this.rawCacheService.createPipeline();
for (const node of nodes) {
pipe.get(CACHE_KEYS.NODE_SYSTEM_INFO(node.uuid));
pipe.get(CACHE_KEYS.NODE_SYSTEM_STATS(node.uuid));
pipe.get(CACHE_KEYS.NODE_USERS_ONLINE(node.uuid));
pipe.get(CACHE_KEYS.NODE_VERSIONS(node.uuid));
pipe.get(CACHE_KEYS.NODE_XRAY_UPTIME(node.uuid));
}
const results = await pipe.exec();
const map = new Map<string, INodeSystem | null>();
const map = new Map<string, INodeHotCache>();
const KEYS_PER_NODE = 5;
if (!results) {
for (const node of nodes) {
map.set(node.uuid, null);
map.set(node.uuid, {
system: null,
versions: null,
xrayUptime: 0,
onlineUsers: 0,
});
}
return map;
}
for (let i = 0; i < nodes.length; i++) {
const [infoErr, rawInfo] = results[i * 2];
const [hotErr, rawHot] = results[i * 2 + 1];
const base = i * KEYS_PER_NODE;
const [infoErr, rawInfo] = results[base];
const [statsErr, rawStats] = results[base + 1];
const [onlineErr, rawOnline] = results[base + 2];
const [versionsErr, rawVersions] = results[base + 3];
const [uptimeErr, rawUptime] = results[base + 4];
map.set(
nodes[i].uuid,
!infoErr && !hotErr && rawInfo && rawHot
? { info: JSON.parse(rawInfo as string), stats: JSON.parse(rawHot as string) }
: null,
);
const system =
!infoErr && !statsErr && rawInfo && rawStats
? {
info: JSON.parse(rawInfo as string),
stats: JSON.parse(rawStats as string),
}
: null;
const versions = !versionsErr && rawVersions ? JSON.parse(rawVersions as string) : null;
const xrayUptime = !uptimeErr && rawUptime ? Number(rawUptime) : 0;
const onlineUsers = !onlineErr && rawOnline ? Number(rawOnline) : 0;
map.set(nodes[i].uuid, { system, versions, xrayUptime, onlineUsers });
}
return map;
}
async getOne(uuid: string): Promise<INodeSystem | null> {
const [rawInfo, rawHot] = await Promise.all([
async getOne(uuid: string): Promise<INodeHotCache> {
const [info, stats, versions, xrayUptime, onlineUsers] = await Promise.all([
this.rawCacheService.get<INodeSystem['info']>(CACHE_KEYS.NODE_SYSTEM_INFO(uuid)),
this.rawCacheService.get<INodeSystem['stats']>(CACHE_KEYS.NODE_SYSTEM_STATS(uuid)),
this.rawCacheService.get<INodeVersions>(CACHE_KEYS.NODE_VERSIONS(uuid)),
this.rawCacheService.getNumber(CACHE_KEYS.NODE_XRAY_UPTIME(uuid)),
this.rawCacheService.getNumber(CACHE_KEYS.NODE_USERS_ONLINE(uuid)),
]);
if (!rawInfo || !rawHot) return null;
let system: INodeSystem | null = null;
if (info && stats) {
system = {
info: info,
stats: stats,
};
}
return { info: rawInfo, stats: rawHot };
return { system, versions, xrayUptime, onlineUsers };
}
async delete(uuid: string): Promise<void> {
await this.rawCacheService.del(CACHE_KEYS.NODE_SYSTEM_INFO(uuid));
await this.rawCacheService.del(CACHE_KEYS.NODE_SYSTEM_STATS(uuid));
await this.rawCacheService.delMany([
CACHE_KEYS.NODE_SYSTEM_INFO(uuid),
CACHE_KEYS.NODE_SYSTEM_STATS(uuid),
CACHE_KEYS.NODE_USERS_ONLINE(uuid),
CACHE_KEYS.NODE_VERSIONS(uuid),
CACHE_KEYS.NODE_XRAY_UPTIME(uuid),
]);
}
async getTotalOnlineUsers(nodes: { uuid: string }[]): Promise<number> {
const pipe = this.rawCacheService.createPipeline();
for (const node of nodes) {
pipe.get(CACHE_KEYS.NODE_USERS_ONLINE(node.uuid));
}
const results = await pipe.exec();
if (!results) return 0;
return results.reduce((sum, [err, raw]) => sum + (!err && raw ? Number(raw) : 0), 0);
}
}

View file

@ -22,15 +22,11 @@ const entityToModel = (entity: NodesEntity): Nodes => {
isDisabled: entity.isDisabled,
lastStatusChange: entity.lastStatusChange,
lastStatusMessage: entity.lastStatusMessage,
xrayVersion: entity.xrayVersion,
nodeVersion: entity.nodeVersion,
xrayUptime: entity.xrayUptime,
isTrafficTrackingActive: entity.isTrafficTrackingActive,
trafficResetDay: entity.trafficResetDay,
trafficLimitBytes: entity.trafficLimitBytes,
trafficUsedBytes: entity.trafficUsedBytes,
notifyPercent: entity.notifyPercent,
usersOnline: entity.usersOnline,
createdAt: entity.createdAt,
updatedAt: entity.updatedAt,
viewPosition: entity.viewPosition,

View file

@ -133,7 +133,16 @@ export class NodesService {
return ok(
nodes.map(
(node) => new NodeResponseModel(node, systemInfoMap.get(node.uuid) ?? null),
(node) =>
new NodeResponseModel(
node,
systemInfoMap.get(node.uuid) ?? {
system: null,
onlineUsers: 0,
versions: null,
xrayUptime: 0,
},
),
),
);
} catch (error) {
@ -346,6 +355,8 @@ export class NodesService {
return fail(ERRORS.NODE_NOT_FOUND);
}
await this.nodesSystemCacheService.delete(node.uuid);
if (!node.activeConfigProfileUuid || node.activeInbounds.length === 0) {
const result = await this.nodesRepository.update({
uuid: node.uuid,
@ -355,7 +366,6 @@ export class NodesService {
isConnected: false,
lastStatusMessage: null,
lastStatusChange: new Date(),
usersOnline: 0,
});
if (!result) {
@ -411,6 +421,8 @@ export class NodesService {
});
}
await this.nodesSystemCacheService.delete(node.uuid);
const result = await this.nodesRepository.update({
uuid: node.uuid,
isDisabled: true,
@ -418,7 +430,6 @@ export class NodesService {
isConnected: false,
lastStatusMessage: null,
lastStatusChange: new Date(),
usersOnline: 0,
});
if (!result) {
@ -455,7 +466,16 @@ export class NodesService {
const systemInfoMap = await this.nodesSystemCacheService.getMany(nodes);
return ok(
nodes.map(
(node) => new NodeResponseModel(node, systemInfoMap.get(node.uuid) ?? null),
(node) =>
new NodeResponseModel(
node,
systemInfoMap.get(node.uuid) ?? {
system: null,
onlineUsers: 0,
versions: null,
xrayUptime: 0,
},
),
),
);
} catch (error) {

View file

@ -1,25 +1,28 @@
import { IQueryHandler, QueryHandler } from '@nestjs/cqrs';
import { Logger } from '@nestjs/common';
import { fail, ok, TResult } from '@common/types';
import { fail, ok } from '@common/types';
import { ERRORS } from '@libs/contracts/constants';
import { NodesSystemCacheService } from '@modules/nodes/nodes-system-cache.service';
import { NodesRepository } from '../../repositories/nodes.repository';
import { CountOnlineUsersQuery } from './count-online-users.query';
@QueryHandler(CountOnlineUsersQuery)
export class CountOnlineUsersHandler implements IQueryHandler<
CountOnlineUsersQuery,
TResult<{ usersOnline: number }>
> {
export class CountOnlineUsersHandler implements IQueryHandler<CountOnlineUsersQuery> {
private readonly logger = new Logger(CountOnlineUsersHandler.name);
constructor(private readonly nodesRepository: NodesRepository) {}
constructor(
private readonly nodesRepository: NodesRepository,
private readonly nodesSystemCacheService: NodesSystemCacheService,
) {}
async execute(): Promise<TResult<{ usersOnline: number }>> {
async execute() {
try {
const nodes = await this.nodesRepository.countOnlineUsers();
const nodes = await this.nodesRepository.findConnectedNodesPartial();
const usersOnline = await this.nodesSystemCacheService.getTotalOnlineUsers(nodes);
return ok({ usersOnline: nodes });
return ok({ usersOnline });
} catch (error) {
this.logger.error(error);
return fail(ERRORS.INTERNAL_SERVER_ERROR);

View file

@ -1,3 +1,9 @@
export class CountOnlineUsersQuery {
constructor() {}
import { Query } from '@nestjs/cqrs';
import { TResult } from '@common/types';
export class CountOnlineUsersQuery extends Query<TResult<{ usersOnline: number }>> {
constructor() {
super();
}
}

View file

@ -27,12 +27,12 @@ export class GetNodesRecapHandler implements IQueryHandler<GetNodesRecapQuery> {
const countries = new Set<string>();
for (const row of nodes) {
const systemInfo = systemInfoMap.get(row.uuid);
if (!systemInfo) {
const hotCache = systemInfoMap.get(row.uuid);
if (!hotCache || !hotCache.system) {
continue;
}
totalRamBytes += systemInfo.info.memoryTotal;
totalCpuCores += systemInfo.info.cpus;
totalRamBytes += hotCache.system.info.memoryTotal;
totalCpuCores += hotCache.system.info.cpus;
if (row.countryCode && row.countryCode !== 'XX') {
countries.add(row.countryCode);
}

View file

@ -2,9 +2,9 @@ import { Query } from '@nestjs/cqrs';
import { TResult } from '@common/types';
import { INodeSystem } from '@modules/nodes/interfaces';
import { INodeHotCache } from '@modules/nodes/interfaces';
export class GetNodesSystemStatsQuery extends Query<TResult<Map<string, INodeSystem | null>>> {
export class GetNodesSystemStatsQuery extends Query<TResult<Map<string, INodeHotCache>>> {
constructor(public readonly nodes: { uuid: string }[]) {
super();
}

View file

@ -215,19 +215,6 @@ export class NodesRepository implements ICrud<NodesEntity> {
return true;
}
public async countOnlineUsers(): Promise<number> {
const result = await this.prisma.tx.nodes.aggregate({
where: {
isConnected: true,
},
_sum: {
usersOnline: true,
},
});
return result._sum.usersOnline || 0;
}
public async removeInboundsFromNode(nodeUuid: string): Promise<boolean> {
const result = await this.qb.kysely
.deleteFrom('configProfileInboundsToNodes')

View file

@ -109,7 +109,7 @@ export class SystemService implements OnApplicationBootstrap {
public async getStats(): Promise<TResult<GetStatsResponseModel>> {
try {
const userStats = await this.getShortUserStats();
const onlineUsers = await this.getOnlineUsers();
const onlineUsers = await this.queryBus.execute(new CountOnlineUsersQuery());
const nodesSumLifetime = await this.queryBus.execute(new GetSumLifetimeQuery());
if (!userStats.isOk || !nodesSumLifetime.isOk || !onlineUsers.isOk) {
@ -133,7 +133,7 @@ export class SystemService implements OnApplicationBootstrap {
users: userStats.response.statusCounts,
onlineStats: userStats.response.onlineStats,
nodes: {
totalOnline: onlineUsers.response?.usersOnline || 0,
totalOnline: onlineUsers.response.usersOnline,
totalBytesLifetime: nodesSumLifetime.response.totalBytes,
},
}),
@ -378,12 +378,6 @@ export class SystemService implements OnApplicationBootstrap {
);
}
private async getOnlineUsers(): Promise<TResult<{ usersOnline: number }>> {
return this.queryBus.execute<CountOnlineUsersQuery, TResult<{ usersOnline: number }>>(
new CountOnlineUsersQuery(),
);
}
private async getLastSevenDaysNodesUsage(): Promise<TResult<IGet7DaysStats[]>> {
return this.queryBus.execute<Get7DaysStatsQuery, TResult<IGet7DaysStats[]>>(
new Get7DaysStatsQuery(),

View file

@ -102,22 +102,26 @@ export class NodeHealthCheckQueueProcessor extends WorkerHost {
new UpdateNodeCommand({
uuid: nodeUuid,
isConnected: true,
lastStatusChange: new Date(),
lastStatusMessage: '',
xrayUptime: stats.xrayInfo.uptime.toString(),
}),
);
await this.rawCacheService.set(
CACHE_KEYS.NODE_SYSTEM_STATS(nodeUuid),
stats.system.stats,
CACHE_KEYS_TTL.NODE_SYSTEM_STATS,
);
if (!nodeUpdatedResponse.isOk) {
return;
}
await this.rawCacheService.setMany([
{
key: CACHE_KEYS.NODE_SYSTEM_STATS(nodeUuid),
value: stats.system.stats,
ttlSeconds: CACHE_KEYS_TTL.NODE_SYSTEM_STATS,
},
{
key: CACHE_KEYS.NODE_XRAY_UPTIME(nodeUuid),
value: stats.xrayInfo.uptime,
ttlSeconds: CACHE_KEYS_TTL.NODE_XRAY_UPTIME,
},
]);
const reports = stats.plugins.torrentBlocker.reportsCount;
if (reports !== undefined && reports > 0) {
await this.nodesQueuesService.collectReports({
@ -146,7 +150,11 @@ export class NodeHealthCheckQueueProcessor extends WorkerHost {
isConnected: boolean,
message: string | undefined,
) {
await this.rawCacheService.del(CACHE_KEYS.NODE_SYSTEM_INFO(nodeUuid));
await this.rawCacheService.delMany([
CACHE_KEYS.NODE_SYSTEM_INFO(nodeUuid),
CACHE_KEYS.NODE_USERS_ONLINE(nodeUuid),
CACHE_KEYS.NODE_XRAY_UPTIME(nodeUuid),
]);
const newNodeEntity = await this.commandBus.execute(
new UpdateNodeCommand({
@ -154,8 +162,6 @@ export class NodeHealthCheckQueueProcessor extends WorkerHost {
isConnected: false,
lastStatusChange: new Date(),
lastStatusMessage: message,
usersOnline: 0,
xrayUptime: '0',
}),
);

View file

@ -12,9 +12,12 @@ import { GetUsersStatsCommand } from '@remnawave/node-contract';
import { fromNanoToNumber } from '@common/utils/nano';
import { RawCacheService } from '@common/raw-cache';
import { AxiosService } from '@common/axios';
import { INTERNAL_CACHE_KEYS, INTERNAL_CACHE_KEYS_TTL } from '@libs/contracts/constants';
import { UpdateNodeCommand } from '@modules/nodes/commands/update-node';
import {
CACHE_KEYS,
CACHE_KEYS_TTL,
INTERNAL_CACHE_KEYS,
INTERNAL_CACHE_KEYS_TTL,
} from '@libs/contracts/constants';
import { PushFromRedisQueueService } from '@queue/push-from-redis/push-from-redis.service';
import { UsersQueuesService } from '@queue/_users';
@ -66,11 +69,10 @@ export class RecordUserUsageQueueProcessor extends WorkerHost {
consumptionMultiplier,
);
case false:
await this.commandBus.execute(
new UpdateNodeCommand({
uuid: nodeUuid,
usersOnline: 0,
}),
await this.rawCacheService.set(
CACHE_KEYS.NODE_USERS_ONLINE(nodeUuid),
0,
CACHE_KEYS_TTL.NODE_USERS_ONLINE,
);
this.logger.error(
@ -99,11 +101,10 @@ export class RecordUserUsageQueueProcessor extends WorkerHost {
try {
if (response.response.users.length === 0) {
await this.commandBus.execute(
new UpdateNodeCommand({
uuid: nodeUuid,
usersOnline: 0,
}),
await this.rawCacheService.set(
CACHE_KEYS.NODE_USERS_ONLINE(nodeUuid),
0,
CACHE_KEYS_TTL.NODE_USERS_ONLINE,
);
return;
@ -140,25 +141,14 @@ export class RecordUserUsageQueueProcessor extends WorkerHost {
};
});
if (userUsageIndex === 0) {
await this.commandBus.execute(
new UpdateNodeCommand({
uuid: nodeUuid,
usersOnline: 0,
}),
);
return;
}
pipeline.expire(nodeRedisKey, INTERNAL_CACHE_KEYS_TTL.NODE_USER_USAGE);
await pipeline.exec();
await this.commandBus.execute(
new UpdateNodeCommand({
uuid: nodeUuid,
usersOnline: userUsageIndex,
}),
await this.rawCacheService.set(
CACHE_KEYS.NODE_USERS_ONLINE(nodeUuid),
userUsageIndex,
CACHE_KEYS_TTL.NODE_USERS_ONLINE,
);
await this.usersQueuesService.updateUserUsage(userUsageList.slice(0, userUsageIndex));

View file

@ -83,6 +83,12 @@ export class StartAllNodesByProfileQueueProcessor extends WorkerHost {
const activeNodeTags = new Map<string, string[]>();
for (const node of nodes) {
await this.rawCacheService.delMany([
CACHE_KEYS.NODE_SYSTEM_STATS(node.uuid),
CACHE_KEYS.NODE_USERS_ONLINE(node.uuid),
CACHE_KEYS.NODE_XRAY_UPTIME(node.uuid),
]);
if (node.activeInbounds.length === 0) {
this.logger.warn(
`No active inbounds found for node ${node.uuid} with profile ${payload.profileUuid}, disabling and clearing profile from node...`,
@ -97,7 +103,6 @@ export class StartAllNodesByProfileQueueProcessor extends WorkerHost {
isConnected: false,
lastStatusMessage: null,
lastStatusChange: new Date(),
usersOnline: 0,
}),
);
@ -182,7 +187,6 @@ export class StartAllNodesByProfileQueueProcessor extends WorkerHost {
lastStatusChange: new Date(),
isConnected: false,
isConnecting: false,
usersOnline: 0,
}),
);
@ -205,7 +209,6 @@ export class StartAllNodesByProfileQueueProcessor extends WorkerHost {
lastStatusChange: new Date(),
isConnected: false,
isConnecting: false,
usersOnline: 0,
}),
);
@ -259,7 +262,6 @@ export class StartAllNodesByProfileQueueProcessor extends WorkerHost {
isConnected: false,
lastStatusMessage: `Failed to sync node plugins: ${syncNodePluginsResponse.message}`,
lastStatusChange: new Date(),
usersOnline: 0,
}),
);
@ -305,7 +307,6 @@ export class StartAllNodesByProfileQueueProcessor extends WorkerHost {
lastStatusChange: new Date(),
isConnected: false,
isConnecting: false,
usersOnline: 0,
}),
);
@ -313,27 +314,35 @@ export class StartAllNodesByProfileQueueProcessor extends WorkerHost {
case true:
const nodeResponse = startXrayResponse.response.response;
await this.rawCacheService.set(
CACHE_KEYS.NODE_SYSTEM_INFO(node.uuid),
nodeResponse.system.info,
);
await this.rawCacheService.set(
CACHE_KEYS.NODE_SYSTEM_STATS(node.uuid),
nodeResponse.system.stats,
CACHE_KEYS_TTL.NODE_SYSTEM_STATS,
);
await this.rawCacheService.setMany([
{
key: CACHE_KEYS.NODE_SYSTEM_STATS(node.uuid),
value: nodeResponse.system.stats,
ttlSeconds: CACHE_KEYS_TTL.NODE_SYSTEM_STATS,
},
{
key: CACHE_KEYS.NODE_SYSTEM_INFO(node.uuid),
value: nodeResponse.system.info,
},
{
key: CACHE_KEYS.NODE_VERSIONS(node.uuid),
value:
nodeResponse.nodeInformation.version && nodeResponse.version
? {
xray: nodeResponse.version,
node: nodeResponse.nodeInformation.version,
}
: null,
},
]);
await this.commandBus.execute(
new UpdateNodeCommand({
uuid: node.uuid,
xrayVersion: nodeResponse.version,
nodeVersion: nodeResponse.nodeInformation?.version || null,
isConnected: nodeResponse.isStarted,
lastStatusMessage: nodeResponse.error ?? null,
lastStatusChange: new Date(),
isConnecting: false,
usersOnline: 0,
}),
);

View file

@ -57,6 +57,12 @@ export class StartNodeProcessor extends WorkerHost {
return;
}
await this.rawCacheService.delMany([
CACHE_KEYS.NODE_SYSTEM_STATS(nodeUuid),
CACHE_KEYS.NODE_USERS_ONLINE(nodeUuid),
CACHE_KEYS.NODE_XRAY_UPTIME(nodeUuid),
]);
if (node.activeInbounds.length === 0 || !node.activeConfigProfileUuid) {
this.logger.warn(
`Node ${nodeUuid} has no active config profile or inbounds, disabling and clearing profile from node...`,
@ -71,7 +77,6 @@ export class StartNodeProcessor extends WorkerHost {
isConnected: false,
lastStatusMessage: null,
lastStatusChange: new Date(),
usersOnline: 0,
}),
);
@ -100,7 +105,6 @@ export class StartNodeProcessor extends WorkerHost {
lastStatusChange: new Date(),
isConnected: false,
isConnecting: false,
usersOnline: 0,
}),
);
@ -119,7 +123,6 @@ export class StartNodeProcessor extends WorkerHost {
lastStatusChange: new Date(),
isConnected: false,
isConnecting: false,
usersOnline: 0,
}),
);
@ -169,7 +172,6 @@ export class StartNodeProcessor extends WorkerHost {
isConnected: false,
lastStatusMessage: `Failed to sync node plugins: ${syncNodePluginsResponse.message}`,
lastStatusChange: new Date(),
usersOnline: 0,
}),
);
@ -207,8 +209,6 @@ export class StartNodeProcessor extends WorkerHost {
this.logger.log(`Started node in ${formatExecutionTime(reqStartTime)}`);
if (!startNodeResult.isOk) {
await this.rawCacheService.del(CACHE_KEYS.NODE_SYSTEM_INFO(node.uuid));
await this.commandBus.execute(
new UpdateNodeCommand({
uuid: node.uuid,
@ -216,7 +216,6 @@ export class StartNodeProcessor extends WorkerHost {
lastStatusChange: new Date(),
isConnected: false,
isConnecting: false,
usersOnline: 0,
}),
);
@ -225,27 +224,35 @@ export class StartNodeProcessor extends WorkerHost {
const nodeResponse = startNodeResult.response.response;
await this.rawCacheService.set(
CACHE_KEYS.NODE_SYSTEM_INFO(node.uuid),
nodeResponse.system.info,
);
await this.rawCacheService.set(
CACHE_KEYS.NODE_SYSTEM_STATS(node.uuid),
nodeResponse.system.stats,
CACHE_KEYS_TTL.NODE_SYSTEM_STATS,
);
await this.rawCacheService.setMany([
{
key: CACHE_KEYS.NODE_SYSTEM_INFO(node.uuid),
value: nodeResponse.system.info,
},
{
key: CACHE_KEYS.NODE_VERSIONS(node.uuid),
value:
nodeResponse.nodeInformation.version && nodeResponse.version
? {
xray: nodeResponse.version,
node: nodeResponse.nodeInformation.version,
}
: null,
},
{
key: CACHE_KEYS.NODE_SYSTEM_STATS(node.uuid),
value: nodeResponse.system.stats,
ttlSeconds: CACHE_KEYS_TTL.NODE_SYSTEM_STATS,
},
]);
const updateNodeResult = await this.commandBus.execute(
new UpdateNodeCommand({
uuid: node.uuid,
xrayVersion: nodeResponse.version,
nodeVersion: nodeResponse.nodeInformation?.version || null,
isConnected: nodeResponse.isStarted,
lastStatusMessage: nodeResponse.error ?? null,
lastStatusChange: new Date(),
isConnecting: false,
usersOnline: 0,
}),
);

View file

@ -57,7 +57,6 @@ export class StopNodeProcessor extends WorkerHost {
isConnected: false,
isConnecting: false,
isDisabled: true,
usersOnline: 0,
}),
);
}

View file

@ -189,54 +189,60 @@ export class ExportMetricsTask {
node_uuid: node.uuid,
} satisfies INodeMetricLabel;
this.nodeOnlineUsers.set(baseNodeLabels, node.usersOnline ?? 0);
this.nodeStatus.set(baseNodeLabels, node.isConnected ? 1 : 0);
if (nodesSystemStats.isOk && nodesSystemStats.response.get(node.uuid)) {
const nodeSystemStats = nodesSystemStats.response.get(node.uuid);
if (nodeSystemStats) {
this.nodeOnlineUsers.set(baseNodeLabels, nodeSystemStats.onlineUsers);
}
if (nodeSystemStats && nodeSystemStats.system) {
this.nodeSystemInfo.set(
{
node_uuid: node.uuid,
arch: nodeSystemStats.info.arch,
cpu_model: nodeSystemStats.info.cpuModel,
hostname: nodeSystemStats.info.hostname,
platform: nodeSystemStats.info.platform,
release: nodeSystemStats.info.release,
version: nodeSystemStats.info.version,
arch: nodeSystemStats.system.info.arch,
cpu_model: nodeSystemStats.system.info.cpuModel,
hostname: nodeSystemStats.system.info.hostname,
platform: nodeSystemStats.system.info.platform,
release: nodeSystemStats.system.info.release,
version: nodeSystemStats.system.info.version,
} satisfies INodeSystemMetricLabels,
1,
);
this.nodeMemoryTotalBytes.set(
baseNodeLabels,
nodeSystemStats.info.memoryTotal,
nodeSystemStats.system.info.memoryTotal,
);
this.nodeMemoryFreeBytes.set(
baseNodeLabels,
nodeSystemStats.stats.memoryFree,
nodeSystemStats.system.stats.memoryFree,
);
this.nodeUptimeSeconds.set(baseNodeLabels, nodeSystemStats.stats.uptime);
this.nodeCpuCount.set(baseNodeLabels, nodeSystemStats.info.cpus);
this.nodeUptimeSeconds.set(
baseNodeLabels,
nodeSystemStats.system.stats.uptime,
);
this.nodeCpuCount.set(baseNodeLabels, nodeSystemStats.system.info.cpus);
if (nodeSystemStats?.stats.interface) {
if (nodeSystemStats && nodeSystemStats.system.stats.interface) {
this.nodeNetworkRxBytesPerSec.set(
baseNodeLabels,
nodeSystemStats.stats.interface.rxBytesPerSec,
nodeSystemStats.system.stats.interface.rxBytesPerSec,
);
this.nodeNetworkTxBytesPerSec.set(
baseNodeLabels,
nodeSystemStats.stats.interface.txBytesPerSec,
nodeSystemStats.system.stats.interface.txBytesPerSec,
);
this.nodeNetworkRxBytesTotal.set(
baseNodeLabels,
nodeSystemStats.stats.interface.rxTotal,
nodeSystemStats.system.stats.interface.rxTotal,
);
this.nodeNetworkTxBytesTotal.set(
baseNodeLabels,
nodeSystemStats.stats.interface.txTotal,
nodeSystemStats.system.stats.interface.txTotal,
);
} else {
this.removeNodeSystemMetrics(baseNodeLabels);