feat(server): add connection stats to the access key metrics (#1634)

* Make it clear that the first value is a timestamp.

* Use `Promise.all()` to make send the network requests concurrently.

* Add a function to send a range query.

* Add `lastConnected` and `lastTrafficSeen` metrics.

* Add `peakDevices` metrics.

* Account for missing ASN or ASOrg data.

* Align `getServerMetricsAccessKeyEntry()` with `getServerMetricsServerEntry()`.

* Update test cases for manager metrics.

* Fix comment "down" -> "up".

* Remove `@throws` JSDoc comments.

* Remove duplication across query functions.

* Change from `Date` to `number` type.

* Remove the use a heap in `findPeak()`.
This commit is contained in:
Sander Bruens 2025-02-10 11:50:21 -05:00 committed by GitHub
parent 13f62390bf
commit dbca08fce3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 463 additions and 156 deletions

View file

@ -21,15 +21,54 @@ import * as path from 'path';
import * as logging from '../infrastructure/logging';
/**
* Represents a Unix timestamp in seconds.
* @typedef {number} Timestamp
*/
type Timestamp = number;
/**
* Represents a Prometheus metric's labels.
* Each key in the object is a label name, and the corresponding value is the label's value.
*
* @typedef {Object<string, string>} PrometheusMetric
*/
export type PrometheusMetric = {[labelValue: string]: string};
/**
* Represents a Prometheus value, which is a tuple of a timestamp and a string value.
* @typedef {[Timestamp, string]} PrometheusValue
*/
export type PrometheusValue = [Timestamp, string];
/**
* Represents a Prometheus result, which can be a time series (values) or a single value.
* @typedef {Object} PrometheusResult
* @property {Object.<string, string>} metric - Labels associated with the metric.
* @property {Array<PrometheusValue>} [values] - Time series data (for range queries).
* @property {PrometheusValue} [value] - Single value (for instant queries).
*/
export type PrometheusResult = {
metric: PrometheusMetric;
values?: PrometheusValue[];
value?: PrometheusValue;
};
/**
* Represents the data part of a Prometheus query result.
* @interface QueryResultData
*/
export interface QueryResultData {
resultType: 'matrix' | 'vector' | 'scalar' | 'string';
result: Array<{
metric: {[labelValue: string]: string};
value: [number, string];
}>;
result: PrometheusResult[];
}
// From https://prometheus.io/docs/prometheus/latest/querying/api/
/**
* Represents the full JSON response from a Prometheus query. This interface
* is based on the Prometheus API documentation:
* https://prometheus.io/docs/prometheus/latest/querying/api/
* @interface QueryResult
*/
interface QueryResult {
status: 'success' | 'error';
data: QueryResultData;
@ -37,16 +76,36 @@ interface QueryResult {
error: string;
}
/**
* Interface for a Prometheus client.
* @interface PrometheusClient
*/
export interface PrometheusClient {
/**
* Performs an instant query against the Prometheus API.
* @function query
* @param {string} query - The PromQL query string.
* @returns {Promise<QueryResultData>} A Promise that resolves to the query result data.
*/
query(query: string): Promise<QueryResultData>;
/**
* Performs a range query against the Prometheus API.
* @function queryRange
* @param {string} query - The PromQL query string.
* @param {number} start - The start time for the query range.
* @param {number} end - The end time for the query range.
* @param {string} step - The step size for the query range (e.g., "1m", "5m"). This controls the resolution of the returned data.
* @returns {Promise<QueryResultData>} A Promise that resolves to the query result data.
*/
queryRange(query: string, start: number, end: number, step: string): Promise<QueryResultData>;
}
export class ApiPrometheusClient implements PrometheusClient {
constructor(private address: string) {}
query(query: string): Promise<QueryResultData> {
private request(url: string): Promise<QueryResultData> {
return new Promise<QueryResultData>((fulfill, reject) => {
const url = `${this.address}/api/v1/query?query=${encodeURIComponent(query)}`;
http
.get(url, (response) => {
if (response.statusCode < 200 || response.statusCode > 299) {
@ -71,6 +130,18 @@ export class ApiPrometheusClient implements PrometheusClient {
});
});
}
query(query: string): Promise<QueryResultData> {
const url = `${this.address}/api/v1/query?query=${encodeURIComponent(query)}`;
return this.request(url);
}
queryRange(query: string, start: number, end: number, step: string): Promise<QueryResultData> {
const url = `${this.address}/api/v1/query_range?query=${encodeURIComponent(
query
)}&start=${start}&end=${end}&step=${step}`;
return this.request(url);
}
}
export async function startPrometheus(

View file

@ -17,68 +17,112 @@ import {PrometheusClient, QueryResultData} from '../infrastructure/prometheus_sc
import {FakePrometheusClient} from './mocks/mocks';
export class QueryMapPrometheusClient implements PrometheusClient {
constructor(private queryMap: {[query: string]: QueryResultData}) {}
constructor(
private queryMap: {[query: string]: QueryResultData},
private queryRangeMap: {[query: string]: QueryResultData}
) {}
async query(_query: string): Promise<QueryResultData> {
return this.queryMap[_query];
async query(query: string): Promise<QueryResultData> {
return this.queryMap[query];
}
async queryRange(
query: string,
_start: number,
_end: number,
_step: string
): Promise<QueryResultData> {
return this.queryRangeMap[query];
}
}
describe('PrometheusManagerMetrics', () => {
it('getServerMetrics', async (done) => {
const managerMetrics = new PrometheusManagerMetrics(
new QueryMapPrometheusClient({
'sum(increase(shadowsocks_data_bytes_per_location{dir=~"c<p|p>t"}[0s])) by (location, asn, asorg)':
{
new QueryMapPrometheusClient(
{
'sum(increase(shadowsocks_data_bytes_per_location{dir=~"c<p|p>t"}[0s])) by (location, asn, asorg)':
{
resultType: 'vector',
result: [
{
metric: {
location: 'US',
asn: '49490',
asorg: 'Test AS Org',
},
value: [1738959398, '1000'],
},
],
},
'sum(increase(shadowsocks_tunnel_time_seconds_per_location[0s])) by (location, asn, asorg)':
{
resultType: 'vector',
result: [
{
metric: {
location: 'US',
asn: '49490',
asorg: 'Test AS Org',
},
value: [1738959398, '1000'],
},
],
},
'sum(increase(shadowsocks_data_bytes{dir=~"c<p|p>t"}[0s])) by (access_key)': {
resultType: 'vector',
result: [
{
metric: {
location: 'US',
asn: '49490',
asorg: null,
access_key: '0',
},
value: [null, '1000'],
value: [1738959398, '1000'],
},
],
},
'sum(increase(shadowsocks_tunnel_time_seconds_per_location[0s])) by (location, asn, asorg)':
{
'sum(increase(shadowsocks_tunnel_time_seconds[0s])) by (access_key)': {
resultType: 'vector',
result: [
{
metric: {
location: 'US',
asn: '49490',
asorg: null,
access_key: '0',
},
value: [null, '1000'],
value: [1738959398, '1000'],
},
],
},
'sum(increase(shadowsocks_data_bytes{dir=~"c<p|p>t"}[0s])) by (access_key)': {
resultType: 'vector',
result: [
{
metric: {
access_key: '0',
},
value: [null, '1000'],
},
],
},
'sum(increase(shadowsocks_tunnel_time_seconds[0s])) by (access_key)': {
resultType: 'vector',
result: [
{
metric: {
access_key: '0',
{
'sum(increase(shadowsocks_data_bytes{dir=~"c<p|p>t"}[300s])) by (access_key)': {
resultType: 'matrix',
result: [
{
metric: {
access_key: '0',
},
values: [
[1738959398, '1000'],
[1738959398, '2000'],
],
},
value: [null, '1000'],
},
],
},
})
],
},
'sum(increase(shadowsocks_tunnel_time_seconds[300s])) by (access_key)': {
resultType: 'matrix',
result: [
{
metric: {
access_key: '0',
},
values: [
[1738959398, '1000'],
[1738959398, '0'],
],
},
],
},
}
)
);
const serverMetrics = await managerMetrics.getServerMetrics({seconds: 0});
@ -88,23 +132,31 @@ describe('PrometheusManagerMetrics', () => {
{
"location": "US",
"asn": 49490,
"asOrg": "null",
"tunnelTime": {
"seconds": 1000
},
"asOrg": "Test AS Org",
"dataTransferred": {
"bytes": 1000
},
"tunnelTime": {
"seconds": 1000
}
}
],
"accessKeys": [
{
"accessKeyId": 0,
"dataTransferred": {
"bytes": 1000
},
"tunnelTime": {
"seconds": 1000
},
"dataTransferred": {
"bytes": 1000
"connection": {
"lastConnected": 1738959398,
"lastTrafficSeen": 1738959398,
"peakDevices": {
"count": 4,
"timestamp": 1738959398
}
}
}
]
@ -114,58 +166,88 @@ describe('PrometheusManagerMetrics', () => {
it('getServerMetrics - does a full outer join on metric data', async (done) => {
const managerMetrics = new PrometheusManagerMetrics(
new QueryMapPrometheusClient({
'sum(increase(shadowsocks_data_bytes_per_location{dir=~"c<p|p>t"}[0s])) by (location, asn, asorg)':
{
new QueryMapPrometheusClient(
{
'sum(increase(shadowsocks_data_bytes_per_location{dir=~"c<p|p>t"}[0s])) by (location, asn, asorg)':
{
resultType: 'vector',
result: [
{
metric: {
location: 'US',
asn: '49490',
asorg: 'Test AS Org',
},
value: [1738959398, '1000'],
},
],
},
'sum(increase(shadowsocks_tunnel_time_seconds_per_location[0s])) by (location, asn, asorg)':
{
resultType: 'vector',
result: [
{
metric: {
location: 'CA',
},
value: [1738959398, '1000'],
},
],
},
'sum(increase(shadowsocks_data_bytes{dir=~"c<p|p>t"}[0s])) by (access_key)': {
resultType: 'vector',
result: [
{
metric: {
location: 'US',
asn: '49490',
asorg: null,
access_key: '0',
},
value: [null, '1000'],
value: [1738959398, '1000'],
},
],
},
'sum(increase(shadowsocks_tunnel_time_seconds_per_location[0s])) by (location, asn, asorg)':
{
'sum(increase(shadowsocks_tunnel_time_seconds[0s])) by (access_key)': {
resultType: 'vector',
result: [
{
metric: {
location: 'CA',
asn: '53520',
asorg: null,
access_key: '1',
},
value: [null, '1000'],
value: [1738959398, '1000'],
},
],
},
'sum(increase(shadowsocks_data_bytes{dir=~"c<p|p>t"}[0s])) by (access_key)': {
resultType: 'vector',
result: [
{
metric: {
access_key: '0',
},
value: [null, '1000'],
},
],
},
'sum(increase(shadowsocks_tunnel_time_seconds[0s])) by (access_key)': {
resultType: 'vector',
result: [
{
metric: {
access_key: '1',
{
'sum(increase(shadowsocks_data_bytes{dir=~"c<p|p>t"}[300s])) by (access_key)': {
resultType: 'matrix',
result: [
{
metric: {
access_key: '0',
},
values: [
[1738959398, '1000'],
[1738959398, '2000'],
],
},
value: [null, '1000'],
},
],
},
})
],
},
'sum(increase(shadowsocks_tunnel_time_seconds[300s])) by (access_key)': {
resultType: 'matrix',
result: [
{
metric: {
access_key: '0',
},
values: [
[1738959398, '1000'],
[1738959398, '0'],
],
},
],
},
}
)
);
const serverMetrics = await managerMetrics.getServerMetrics({seconds: 0});
@ -174,8 +256,11 @@ describe('PrometheusManagerMetrics', () => {
"server": [
{
"location": "CA",
"asn": 53520,
"asOrg": "null",
"asn": null,
"asOrg": null,
"dataTransferred": {
"bytes": 0
},
"tunnelTime": {
"seconds": 1000
}
@ -183,23 +268,48 @@ describe('PrometheusManagerMetrics', () => {
{
"location": "US",
"asn": 49490,
"asOrg": "null",
"asOrg": "Test AS Org",
"dataTransferred": {
"bytes": 1000
},
"tunnelTime": {
"seconds": 0
}
}
],
"accessKeys": [
{
"accessKeyId": 1,
"dataTransferred": {
"bytes": 0
},
"tunnelTime": {
"seconds": 1000
},
"connection": {
"lastConnected": null,
"lastTrafficSeen": null,
"peakDevices": {
"count": 0,
"timestamp": null
}
}
},
{
"accessKeyId": 0,
"dataTransferred": {
"bytes": 1000
},
"tunnelTime": {
"seconds": 0
},
"connection": {
"lastConnected": 1738959398,
"lastTrafficSeen": 1738959398,
"peakDevices": {
"count": 4,
"timestamp": 1738959398
}
}
}
]

View file

@ -12,9 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
import {PrometheusClient} from '../infrastructure/prometheus_scraper';
import {
PrometheusClient,
PrometheusMetric,
PrometheusValue,
} from '../infrastructure/prometheus_scraper';
import {DataUsageByUser, DataUsageTimeframe} from '../model/metrics';
const PROMETHEUS_RANGE_QUERY_STEP_SECONDS = 5 * 60;
interface Duration {
seconds: number;
}
@ -23,10 +29,21 @@ interface Data {
bytes: number;
}
interface PeakDevices {
count: number;
timestamp: number | null;
}
interface ConnectionStats {
lastConnected: number | null;
lastTrafficSeen: number | null;
peakDevices: PeakDevices;
}
interface ServerMetricsServerEntry {
location: string;
asn: number;
asOrg: string;
asn: number | null;
asOrg: string | null;
tunnelTime: Duration;
dataTransferred: Data;
}
@ -35,6 +52,7 @@ interface ServerMetricsAccessKeyEntry {
accessKeyId: number;
tunnelTime: Duration;
dataTransferred: Data;
connection: ConnectionStats;
}
interface ServerMetrics {
@ -70,81 +88,180 @@ export class PrometheusManagerMetrics implements ManagerMetrics {
}
async getServerMetrics(timeframe: Duration): Promise<ServerMetrics> {
const dataTransferredByLocation = await this.prometheusClient.query(
`sum(increase(shadowsocks_data_bytes_per_location{dir=~"c<p|p>t"}[${timeframe.seconds}s])) by (location, asn, asorg)`
);
const tunnelTimeByLocation = await this.prometheusClient.query(
`sum(increase(shadowsocks_tunnel_time_seconds_per_location[${timeframe.seconds}s])) by (location, asn, asorg)`
);
const dataTransferredByAccessKey = await this.prometheusClient.query(
`sum(increase(shadowsocks_data_bytes{dir=~"c<p|p>t"}[${timeframe.seconds}s])) by (access_key)`
);
const tunnelTimeByAccessKey = await this.prometheusClient.query(
`sum(increase(shadowsocks_tunnel_time_seconds[${timeframe.seconds}s])) by (access_key)`
);
const now = new Date().getTime();
// We need to calculate consistent start and end times for Prometheus range
// queries. Rounding the end time *up* to the nearest multiple of the step
// prevents time "drift" between queries, which is crucial for reliable step
// alignment and consistent data retrieval, especially when using
// aggregations like increase() or rate(). This ensures that the same time
// windows are queried each time, leading to more stable and predictable
// results.
const end =
Math.ceil(now / (PROMETHEUS_RANGE_QUERY_STEP_SECONDS * 1000)) *
PROMETHEUS_RANGE_QUERY_STEP_SECONDS;
const start = end - timeframe.seconds;
const serverMap = new Map();
const serverMapKey = (entry) =>
`${entry.metric['location']},${entry.metric['asn']},${entry.metric['asorg']}`;
for (const entry of tunnelTimeByLocation.result) {
serverMap.set(serverMapKey(entry), {
tunnelTime: {
seconds: parseFloat(entry.value[1]),
},
});
const [
dataTransferredByLocation,
tunnelTimeByLocation,
dataTransferredByAccessKey,
tunnelTimeByAccessKey,
dataTransferredByAccessKeyRange,
tunnelTimeByAccessKeyRange,
] = await Promise.all([
this.prometheusClient.query(
`sum(increase(shadowsocks_data_bytes_per_location{dir=~"c<p|p>t"}[${timeframe.seconds}s])) by (location, asn, asorg)`
),
this.prometheusClient.query(
`sum(increase(shadowsocks_tunnel_time_seconds_per_location[${timeframe.seconds}s])) by (location, asn, asorg)`
),
this.prometheusClient.query(
`sum(increase(shadowsocks_data_bytes{dir=~"c<p|p>t"}[${timeframe.seconds}s])) by (access_key)`
),
this.prometheusClient.query(
`sum(increase(shadowsocks_tunnel_time_seconds[${timeframe.seconds}s])) by (access_key)`
),
this.prometheusClient.queryRange(
`sum(increase(shadowsocks_data_bytes{dir=~"c<p|p>t"}[${PROMETHEUS_RANGE_QUERY_STEP_SECONDS}s])) by (access_key)`,
start,
end,
`${PROMETHEUS_RANGE_QUERY_STEP_SECONDS}s`
),
this.prometheusClient.queryRange(
`sum(increase(shadowsocks_tunnel_time_seconds[${PROMETHEUS_RANGE_QUERY_STEP_SECONDS}s])) by (access_key)`,
start,
end,
`${PROMETHEUS_RANGE_QUERY_STEP_SECONDS}s`
),
]);
const serverMap = new Map<string, ServerMetricsServerEntry>();
for (const result of tunnelTimeByLocation.result) {
const entry = getServerMetricsServerEntry(serverMap, result.metric);
entry.tunnelTime.seconds = result.value ? parseFloat(result.value[1]) : 0;
}
for (const entry of dataTransferredByLocation.result) {
if (!serverMap.has(serverMapKey(entry))) {
serverMap.set(serverMapKey(entry), {});
for (const result of dataTransferredByLocation.result) {
const entry = getServerMetricsServerEntry(serverMap, result.metric);
entry.dataTransferred.bytes = result.value ? parseFloat(result.value[1]) : 0;
}
const accessKeyMap = new Map<string, ServerMetricsAccessKeyEntry>();
for (const result of tunnelTimeByAccessKey.result) {
const entry = getServerMetricsAccessKeyEntry(accessKeyMap, result.metric);
entry.tunnelTime.seconds = result.value ? parseFloat(result.value[1]) : 0;
}
for (const result of dataTransferredByAccessKey.result) {
const entry = getServerMetricsAccessKeyEntry(accessKeyMap, result.metric);
entry.dataTransferred.bytes = result.value ? parseFloat(result.value[1]) : 0;
}
for (const result of tunnelTimeByAccessKeyRange.result) {
const entry = getServerMetricsAccessKeyEntry(accessKeyMap, result.metric);
const lastConnected = findLastNonZero(result.values ?? []);
entry.connection.lastConnected = lastConnected ? Math.min(now, lastConnected[0]) : null;
const peakTunnelTimeSec = findPeak(result.values ?? []);
if (peakTunnelTimeSec !== null) {
const peakValue = parseFloat(peakTunnelTimeSec[1]);
if (peakValue > 0) {
const peakTunnelTimeOverTime = peakValue / PROMETHEUS_RANGE_QUERY_STEP_SECONDS;
entry.connection.peakDevices.count = Math.ceil(peakTunnelTimeOverTime);
entry.connection.peakDevices.timestamp = Math.min(now, peakTunnelTimeSec[0]);
}
}
serverMap.get(serverMapKey(entry)).dataTransferred = {
bytes: parseFloat(entry.value[1]),
};
}
const server = [];
for (const [key, metrics] of serverMap.entries()) {
const [location, asn, asOrg] = key.split(',');
server.push({
location,
asn: parseInt(asn),
asOrg,
...metrics,
});
}
const accessKeyMap = new Map();
for (const entry of tunnelTimeByAccessKey.result) {
accessKeyMap.set(entry.metric['access_key'], {
tunnelTime: {
seconds: parseFloat(entry.value[1]),
},
});
}
for (const entry of dataTransferredByAccessKey.result) {
if (!accessKeyMap.has(entry.metric['access_key'])) {
accessKeyMap.set(entry.metric['access_key'], {});
}
accessKeyMap.get(entry.metric['access_key']).dataTransferred = {
bytes: parseFloat(entry.value[1]),
};
}
const accessKeys = [];
for (const [key, metrics] of accessKeyMap.entries()) {
accessKeys.push({
accessKeyId: parseInt(key),
...metrics,
});
for (const result of dataTransferredByAccessKeyRange.result) {
const entry = getServerMetricsAccessKeyEntry(accessKeyMap, result.metric);
const lastTrafficSeen = findLastNonZero(result.values ?? []);
entry.connection.lastTrafficSeen = lastTrafficSeen ? Math.min(now, lastTrafficSeen[0]) : null;
}
return {
server,
accessKeys,
server: Array.from(serverMap.values()),
accessKeys: Array.from(accessKeyMap.values()),
};
}
}
function getServerMetricsServerEntry(
map: Map<string, ServerMetricsServerEntry>,
metric: PrometheusMetric
): ServerMetricsServerEntry {
const {location, asn, asorg} = metric;
const key = `${location},${asn},${asorg}`;
let entry = map.get(key);
if (entry === undefined) {
entry = {
location: location,
asn: asn ? parseInt(asn) : null,
asOrg: asorg ?? null,
dataTransferred: {bytes: 0},
tunnelTime: {seconds: 0},
};
map.set(key, entry);
}
return entry;
}
function getServerMetricsAccessKeyEntry(
map: Map<string, ServerMetricsAccessKeyEntry>,
metric: PrometheusMetric
): ServerMetricsAccessKeyEntry {
const accessKey = metric['access_key'];
let entry = map.get(accessKey);
if (entry === undefined) {
entry = {
accessKeyId: parseInt(accessKey),
dataTransferred: {bytes: 0},
tunnelTime: {seconds: 0},
connection: {
lastConnected: null,
lastTrafficSeen: null,
peakDevices: {
count: 0,
timestamp: null,
},
},
};
map.set(accessKey, entry);
}
return entry;
}
/**
* Finds the peak PrometheusValue in an array of PrometheusValues.
*
* The peak is determined by the highest value. If values are equal, the
* PrometheusValue with the latest timestamp is considered the peak.
*/
function findPeak(values: PrometheusValue[]): PrometheusValue | null {
let peak: PrometheusValue | null = null;
let maxValue = -Infinity;
for (const value of values) {
const currentValue = parseFloat(value[1]);
if (currentValue > maxValue) {
maxValue = currentValue;
peak = value;
} else if (currentValue === maxValue && value[0] > peak[0]) {
peak = value;
}
}
return peak;
}
/**
* Finds the last PrometheusValue in an array that has a value greater than zero.
*/
function findLastNonZero(values: PrometheusValue[]): PrometheusValue | null {
for (let i = values.length - 1; i >= 0; i--) {
const value = values[i];
if (parseFloat(value[1]) > 0) {
return value;
}
}
return null;
}

View file

@ -62,4 +62,13 @@ export class FakePrometheusClient implements PrometheusClient {
}
return queryResultData;
}
queryRange(
_query: string,
_start: number,
_end: number,
_step: string
): Promise<QueryResultData> {
throw new Error('unsupported');
}
}