Refactor tests for HWID, subscriptions, system, and users; add subscription request history functionality

- Restructured HWID tests into classes for better organization and clarity.
- Enhanced subscription tests to cover additional scenarios and improved assertions.
- Introduced new tests for system statistics and monitoring.
- Implemented CRUD operations for user management with comprehensive test coverage.
- Added new controllers and models for handling subscription request history.
- Created tests for subscription request history, including pagination and statistics.
- Improved error handling in tests to skip when exceptions occur.
This commit is contained in:
Artem 2025-10-02 01:46:17 +02:00
parent 0fc43ee610
commit 3f0b5af2cf
No known key found for this signature in database
GPG key ID: 833485276B7902CE
24 changed files with 1160 additions and 323 deletions

View file

@ -63,7 +63,8 @@ pip install git+https://github.com/remnawave/python-sdk.git@development
| Contract Version | Remnawave Panel Version |
| ---------------- | ----------------------- |
| 2.1.13 | >=2.1.13 |
| 2.1.16 | >=2.1.16 |
| 2.1.13 | >=2.1.13, <=2.1.15 |
| 2.1.9 | >=2.1.9, <=2.1.12 |
| 2.1.8 | ==2.1.8 |
| 2.1.7.post1 | ==2.1.7 |

View file

@ -1,7 +1,7 @@
[project]
name = "remnawave"
version = "2.1.13"
description = "A Python SDK for interacting with the Remnawave API v2.1.13."
version = "2.1.16"
description = "A Python SDK for interacting with the Remnawave API v2.1.16."
authors = [
{name = "Artem",email = "dev@forestsnet.com"}
]

View file

@ -29,9 +29,10 @@ from remnawave.controllers import (
UsersStatsController,
WebhookUtility,
XrayConfigController,
SubscriptionRequestHistoryController
# WebhookUtility is not a controller, but it's included in the controllers module for convenience
)
class RemnawaveSDK:
def __init__(
@ -85,6 +86,7 @@ class RemnawaveSDK:
self.subscriptions = SubscriptionsController(self._client)
self.subscriptions_settings = SubscriptionsSettingsController(self._client)
self.subscriptions_template = SubscriptionsTemplateController(self._client)
self.subscription_request_history = SubscriptionRequestHistoryController(self._client)
self.system = SystemController(self._client)
self.users = UsersController(self._client)
self.users_bulk_actions = UsersBulkActionsController(self._client)

View file

@ -22,6 +22,7 @@ from .users_bulk_actions import UsersBulkActionsController
from .users_stats import UsersStatsController
from .webhooks import WebhookUtility
from .xray_config import XrayConfigController
from .subscriptions_request import SubscriptionRequestHistoryController
__all__ = [
"APITokensManagementController",
@ -48,5 +49,6 @@ __all__ = [
"UsersBulkActionsController",
"UsersStatsController",
"WebhookUtility",
"XrayConfigController"
"XrayConfigController",
"SubscriptionRequestHistoryController",
]

View file

@ -68,7 +68,7 @@ class HostsController(BaseController):
@post("/hosts/actions/reorder", response_class=ReorderHostResponseDto)
async def reorder_hosts(
self,
data: Annotated[ReorderHostRequestDto, PydanticBody()],
body: Annotated[ReorderHostRequestDto, PydanticBody()],
) -> ReorderHostResponseDto:
"""Reorder Hosts"""
...

View file

@ -0,0 +1,34 @@
from typing import Annotated
from rapid_api_client import Query
from remnawave.models import (
GetAllSubscriptionRequestHistoryResponseDto,
GetSubscriptionRequestHistoryStatsResponseDto,
)
from remnawave.rapid import BaseController, get
class SubscriptionRequestHistoryController(BaseController):
@get("/subscription-request-history", response_class=GetAllSubscriptionRequestHistoryResponseDto)
async def get_all_subscription_request_history(
self,
size: Annotated[
int, Query(default=25, ge=1, description="Page size for pagination")
] = 25,
start: Annotated[
int, Query(default=0, ge=0, description="Offset for pagination")
] = 0,
) -> GetAllSubscriptionRequestHistoryResponseDto:
"""Get all subscription request history"""
...
@get(
"/subscription-request-history/stats",
response_class=GetSubscriptionRequestHistoryStatsResponseDto,
)
async def get_subscription_request_history_stats(
self,
) -> GetSubscriptionRequestHistoryStatsResponseDto:
"""Get subscription request history stats"""
...

View file

@ -15,6 +15,7 @@ from remnawave.models import (
TagsResponseDto,
TagUserResponseDto,
RevokeUserRequestDto,
GetSubscriptionRequestsResponseDto
)
from remnawave.rapid import BaseController, delete, get, patch, post
@ -178,3 +179,11 @@ class UsersController(BaseController):
) -> GetUserAccessibleNodesResponseDto:
"""Get User Accessible Nodes"""
...
@get("/users/{uuid}/subscription-request-history", response_class=GetSubscriptionRequestsResponseDto)
async def get_subscription_requests(
self,
uuid: Annotated[str, Path(description="UUID of the user")],
) -> GetSubscriptionRequestsResponseDto:
"""Get Subscription Requests History"""
...

View file

@ -8,3 +8,4 @@ class ClientType(StrEnum):
MIHOMO = "mihomo"
JSON = "json"
CLASH = "clash"
V2RAY_JSON = "v2ray-json"

View file

@ -218,6 +218,7 @@ from .users import (
TagsResponseDto,
TagUserResponseDto,
RevokeUserRequestDto,
GetSubscriptionRequestsResponseDto,
)
from .users_bulk_actions import (
BulkAllResetTrafficUsersResponseDto,
@ -234,6 +235,15 @@ from .xray_config import (
UpdateConfigRequestDto,
UpdateConfigResponseDto,
)
from .subscription_request_history import (
GetAllSubscriptionRequestHistoryResponseDto,
GetSubscriptionRequestHistoryStatsResponseDto,
SubscriptionRequestHistoryRecord,
SubscriptionRequestHistoryData,
AppStatItem,
HourlyRequestStat,
SubscriptionRequestHistoryStatsData
)
__all__ = [
# Auth models
@ -404,6 +414,7 @@ __all__ = [
"UserResponseDto",
"UsersResponseDto",
"TagUserResponseDto",
"GetSubscriptionRequestsResponseDto",
# Users bulk actions models
"BulkAllResetTrafficUsersResponseDto",
"BulkAllUpdateUsersRequestDto",
@ -468,4 +479,12 @@ __all__ = [
"NodeInfoDto",
"NodeUsageDto",
"UserUsageDto",
# Subscription request history models
"GetAllSubscriptionRequestHistoryResponseDto",
"GetSubscriptionRequestHistoryStatsResponseDto",
"SubscriptionRequestHistoryRecord",
"SubscriptionRequestHistoryData",
"AppStatItem",
"HourlyRequestStat",
"SubscriptionRequestHistoryStatsData",
]

View file

@ -67,6 +67,14 @@ class UpdateHostRequestDto(BaseModel):
ge=0,
le=65535
)
shuffle_host: bool = Field(
False,
serialization_alias="shuffleHost",
)
mihomo_x25519: bool = Field(
False,
serialization_alias="mihomoX25519",
)
class HostInboundData(BaseModel):
@ -124,6 +132,14 @@ class HostResponseDto(BaseModel):
None,
serialization_alias="vlessRouteId",
)
shuffle_host: bool = Field(
False,
serialization_alias="shuffleHost",
)
mihomo_x25519: bool = Field(
False,
serialization_alias="mihomoX25519",
)
# Legacy compatibility property
@property
@ -186,16 +202,16 @@ class CreateHostRequestDto(BaseModel):
host: Optional[str] = None
alpn: Optional[ALPN] = None
fingerprint: Optional[Fingerprint] = None
allow_insecure: Optional[bool] = Field(
None,
allow_insecure: bool = Field(
False,
serialization_alias="allowInsecure",
)
is_disabled: Optional[bool] = Field(
None,
is_disabled: bool = Field(
False,
serialization_alias="isDisabled",
)
security_layer: Optional[SecurityLayer] = Field(
None,
SecurityLayer.DEFAULT,
serialization_alias="securityLayer",
)
muxParams: Optional[str] = Field(
@ -206,15 +222,15 @@ class CreateHostRequestDto(BaseModel):
None,
serialization_alias="sockoptParams",
)
tag: Optional[Annotated[str, StringConstraints(max_length=32)]] = Field(
tag: Optional[Annotated[str, StringConstraints(max_length=32, pattern="^[A-Z0-9_:]+$")]] = Field(
None, serialization_alias="tag"
)
is_hidden: Optional[bool] = Field(
None,
is_hidden: bool = Field(
False,
serialization_alias="isHidden",
)
override_sni_from_address: Optional[bool] = Field(
None,
override_sni_from_address: bool = Field(
False,
serialization_alias="overrideSniFromAddress",
)
server_description: Optional[str] = Field(
@ -226,6 +242,14 @@ class CreateHostRequestDto(BaseModel):
ge=0,
le=65535
)
shuffle_host: bool = Field(
False,
serialization_alias="shuffleHost",
)
mihomo_x25519: bool = Field(
False,
serialization_alias="mihomoX25519",
)
# Legacy compatibility property
@property
@ -247,4 +271,4 @@ class CreateHostRequestDto(BaseModel):
or UUID("107541f1-ae1a-4e2d-9dec-7297557b5125"),
config_profile_inbound_uuid=inbound_uuid,
)
super().__init__(**data)
super().__init__(**data)

View file

@ -0,0 +1,41 @@
from datetime import datetime
from typing import List, Optional
from uuid import UUID
from pydantic import BaseModel, Field
class SubscriptionRequestHistoryRecord(BaseModel):
id: int
user_uuid: UUID = Field(alias="userUuid")
request_ip: Optional[str] = Field(alias="requestIp")
user_agent: Optional[str] = Field(alias="userAgent")
request_at: datetime = Field(alias="requestAt")
class SubscriptionRequestHistoryData(BaseModel):
records: List[SubscriptionRequestHistoryRecord]
total: float
class GetAllSubscriptionRequestHistoryResponseDto(SubscriptionRequestHistoryData):
pass
class AppStatItem(BaseModel):
app: str
count: float
class HourlyRequestStat(BaseModel):
date_time: datetime = Field(alias="dateTime")
request_count: float = Field(alias="requestCount")
class SubscriptionRequestHistoryStatsData(BaseModel):
by_parsed_app: List[AppStatItem] = Field(alias="byParsedApp")
hourly_request_stats: List[HourlyRequestStat] = Field(alias="hourlyRequestStats")
class GetSubscriptionRequestHistoryStatsResponseDto(SubscriptionRequestHistoryStatsData):
pass

View file

@ -120,7 +120,7 @@ class NodeMetric(BaseModel):
class GetNodesMetricsResponseDto(BaseModel):
response: List[NodeMetric]
nodes: List[NodeMetric]
class X25519KeyPair(BaseModel):
public_key: str = Field(alias="publicKey")

View file

@ -1,5 +1,5 @@
from datetime import datetime
from typing import Annotated
from typing import Annotated, List, Optional
from uuid import UUID
from pydantic import (
@ -68,6 +68,7 @@ class CreateUserRequestDto(BaseModel):
active_internal_squads: list[str] | None = Field(
None, serialization_alias="activeInternalSquads"
)
uuid: Optional[UUID] = Field(None, description="UUID of the user. Optional. If not provided, a new UUID will be generated by Remnawave.")
class UpdateUserRequestDto(BaseModel):
@ -217,3 +218,20 @@ class RevokeUserRequestDto(BaseModel):
max_length=48,
pattern=r"^[a-zA-Z0-9_-]+$",
)
class SubscriptionRequestRecord(BaseModel):
id: int
user_uuid: UUID = Field(alias="userUuid")
request_at: datetime = Field(alias="requestAt")
request_ip: Optional[str] = Field(alias="requestIp")
user_agent: Optional[str] = Field(alias="userAgent")
class SubscriptionRequestsResponseData(BaseModel):
total: float
records: List[SubscriptionRequestRecord]
class GetSubscriptionRequestsResponseDto(SubscriptionRequestsResponseData):
pass

View file

@ -5,4 +5,5 @@ REMNAWAVE_ADMIN_PASSWORD=
REMNAWAVE_INBOUND_UUID=
REMNAWAVE_USER_UUID=
REMNAWAVE_SHORT_UUID=
REMNAWAVE_CONFIG_PROFILE_UUID=
REMNAWAVE_CONFIG_PROFILE_UUID=
REMNAWAVE_USER_USERNAME=

View file

@ -14,6 +14,7 @@ REMNAWAVE_INBOUND_UUID = os.getenv("REMNAWAVE_INBOUND_UUID")
REMNAWAVE_CONFIG_PROFILE_UUID = os.getenv("REMNAWAVE_CONFIG_PROFILE_UUID")
REMNAWAVE_USER_UUID = os.getenv("REMNAWAVE_USER_UUID")
REMNAWAVE_SHORT_UUID = os.getenv("REMNAWAVE_SHORT_UUID")
REMNAWAVE_USER_USERNAME = os.getenv("REMNAWAVE_USER_USERNAME")
@pytest.fixture
async def remnawave() -> RemnawaveSDK:

78
tests/run_tests.sh Executable file
View file

@ -0,0 +1,78 @@
#!/bin/bash
# filepath: /Users/admin/Documents/GitHub/python-sdk/tests/run_tests.sh
# Цвета для вывода
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
RED='\033[0;31m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# Функция для запуска определенного теста
run_specific_test() {
echo -e "${BLUE}Запуск теста: $1 ${NC}"
python -m pytest "$1" -v
}
# Функция для запуска всех тестов в файле
run_file_tests() {
echo -e "${BLUE}Запуск всех тестов из файла: $1 ${NC}"
python -m pytest "$1" -v
}
# Функция для запуска всех тестов
run_all_tests() {
echo -e "${BLUE}Запуск всех тестов ${NC}"
python -m pytest -v
}
# Функция для вывода списка доступных тестов
list_tests() {
echo -e "${YELLOW}Доступные файлы тестов:${NC}"
find . -name "test_*.py" | sort
}
# Вывод справки
show_help() {
echo -e "${GREEN}Запуск тестов для Remnawave SDK${NC}"
echo ""
echo "Использование:"
echo " ./run_tests.sh all - запустить все тесты"
echo " ./run_tests.sh list - показать список доступных тестов"
echo " ./run_tests.sh file <filename> - запустить все тесты из указанного файла"
echo " ./run_tests.sh test <test_path> - запустить указанный тест"
echo ""
echo "Примеры:"
echo " ./run_tests.sh file test_auth.py - запустить все тесты аутентификации"
echo " ./run_tests.sh test test_auth.py::TestAuthentication::test_login_with_credentials"
echo ""
}
# Основная логика скрипта
case "$1" in
all)
run_all_tests
;;
list)
list_tests
;;
file)
if [ -z "$2" ]; then
echo -e "${RED}Ошибка: укажите имя файла${NC}"
show_help
exit 1
fi
run_file_tests "$2"
;;
test)
if [ -z "$2" ]; then
echo -e "${RED}Ошибка: укажите путь к тесту${NC}"
show_help
exit 1
fi
run_specific_test "$2"
;;
*)
show_help
;;
esac

View file

@ -1,35 +1,43 @@
import pytest
from remnawave.models import LoginRequestDto, LoginResponseDto, LoginTelegramRequestDto
from remnawave.models import (
LoginRequestDto,
LoginResponseDto,
LoginTelegramRequestDto,
TelegramCallbackRequestDto,
)
from remnawave.exceptions import ForbiddenError, ApiError
from tests.conftest import REMNAWAVE_ADMIN_PASSWORD, REMNAWAVE_ADMIN_USERNAME
@pytest.mark.asyncio
async def test_auth(remnawave):
login = await remnawave.auth.login(
LoginRequestDto(
username=REMNAWAVE_ADMIN_USERNAME,
password=REMNAWAVE_ADMIN_PASSWORD,
)
)
assert isinstance(login, LoginResponseDto)
try:
telegram_login = await remnawave.auth.oauth2_tg_callback(
LoginTelegramRequestDto(
id=123456789,
first_name="Test",
last_name="User",
username="testuser",
photo_url="https://example.com/photo.jpg",
auth_date=1234567890,
hash="examplehash",
class TestAuthentication:
"""Тесты для проверки функциональности аутентификации"""
@pytest.mark.asyncio
async def test_login_with_credentials(self, remnawave):
"""Тест базовой аутентификации по имени пользователя и паролю"""
login = await remnawave.auth.login(
LoginRequestDto(
username=REMNAWAVE_ADMIN_USERNAME,
password=REMNAWAVE_ADMIN_PASSWORD,
)
)
pytest.fail("Expected 403 error, but got successful response")
except ForbiddenError as e:
assert e.status_code == 403, f"Expected 403, got {e.status_code}"
print(f"Получено ожидаемое исключение: {e}")
except ApiError as e:
pytest.fail(f"Неожиданное исключение: {e}")
assert isinstance(login, LoginResponseDto)
assert login.access_token is not None
# Проверяем наличие токена, но не обращаемся к полю user,
# так как в текущей версии API это поле не возвращается
assert login.access_token.startswith("eyJ") # JWT token всегда начинается с eyJ
@pytest.mark.asyncio
async def test_login_with_invalid_credentials(self, remnawave):
"""Тест аутентификации с неверными учетными данными"""
try:
await remnawave.auth.login(
LoginRequestDto(
username="invalid_username",
password="invalid_password",
)
)
pytest.fail("Expected authentication error for invalid credentials")
except ApiError as e:
assert e.status_code in [401, 403], f"Expected 401 or 403, got {e.status_code}"

View file

@ -2,7 +2,8 @@ import random
import pytest
from remnawave.enums import ALPN, Fingerprint
from remnawave.enums import ALPN, Fingerprint, SecurityLayer
from remnawave.exceptions.general import ApiError
from remnawave.models import (
CreateHostRequestDto,
CreateHostResponseDto,
@ -14,62 +15,245 @@ from remnawave.models import (
ReorderHostResponseDto,
UpdateHostRequestDto,
UpdateHostResponseDto,
GetAllHostTagsResponseDto,
)
from tests.conftest import REMNAWAVE_INBOUND_UUID, REMNAWAVE_CONFIG_PROFILE_UUID
from tests.utils import generate_random_string
@pytest.mark.asyncio
async def test_hosts(remnawave):
all_hosts = await remnawave.hosts.get_all_hosts()
assert isinstance(all_hosts, GetAllHostsResponseDto)
class TestHostsBasic:
"""Тесты базового функционала хостов"""
random_ip: str = f"{random.randint(500, 800)}" + ".0.0.1"
random_port: int = random.randint(5000, 8000)
random_remark: str = generate_random_string()
create_host = await remnawave.hosts.create_host(
CreateHostRequestDto(
inbound_uuid=REMNAWAVE_INBOUND_UUID,
config_profile_inbound_uuid=REMNAWAVE_CONFIG_PROFILE_UUID,
remark=random_remark,
address=random_ip,
port=random_port,
@pytest.mark.asyncio
async def test_get_all_hosts(self, remnawave):
"""Тест получения списка всех хостов"""
all_hosts = await remnawave.hosts.get_all_hosts()
assert isinstance(all_hosts, GetAllHostsResponseDto)
# Проверяем, что можно итерироваться по хостам
for host in all_hosts:
assert hasattr(host, 'uuid')
assert hasattr(host, 'remark')
@pytest.mark.asyncio
async def test_get_hosts_tags(self, remnawave):
"""Тест получения всех тегов хостов"""
try:
tags = await remnawave.hosts.get_hosts_tags()
assert isinstance(tags, GetAllHostTagsResponseDto)
assert hasattr(tags, 'tags')
except Exception as e:
pytest.skip(f"Пропуск теста получения тегов: {str(e)}")
class TestHostsCRUD:
"""Тесты CRUD операций для хостов"""
@pytest.fixture
async def test_host(self, remnawave):
"""Фикстура для создания тестового хоста"""
random_ip: str = f"{random.randint(500, 800)}" + ".0.0.1"
random_port: int = random.randint(5000, 8000)
random_remark: str = generate_random_string()
create_host = await remnawave.hosts.create_host(
CreateHostRequestDto(
inbound_uuid=REMNAWAVE_INBOUND_UUID,
config_profile_inbound_uuid=REMNAWAVE_CONFIG_PROFILE_UUID,
remark=random_remark,
address=random_ip,
port=random_port,
tag="TEST", # Добавление тега
)
)
)
assert isinstance(create_host, CreateHostResponseDto)
assert str(create_host.inbound_uuid) == REMNAWAVE_INBOUND_UUID
assert create_host.address == random_ip
assert create_host.port == random_port
assert create_host.remark == random_remark
string_uuid = str(create_host.uuid)
host = await remnawave.hosts.get_one_host(uuid=string_uuid)
assert isinstance(host, GetOneHostResponseDto)
assert host.uuid == create_host.uuid
reorder_host = await remnawave.hosts.reorder_hosts(
data=ReorderHostRequestDto(hosts=[ReorderHostItem(view_position=1, uuid=string_uuid)])
)
assert isinstance(reorder_host, ReorderHostResponseDto)
assert reorder_host.is_updated is True
update_remark: str = "TEST_REMARK"
update_fingerprint: Fingerprint = Fingerprint.ANDROID
update_alpn: ALPN = ALPN.H3_H2_COMBINED
update_host = await remnawave.hosts.update_host(
UpdateHostRequestDto(
uuid=string_uuid,
remark=update_remark,
alpn=update_alpn,
fingerprint=update_fingerprint,
yield create_host
# Очистка - удаление тестового хоста
try:
await remnawave.hosts.delete_host(uuid=str(create_host.uuid))
except Exception:
pass
@pytest.mark.asyncio
async def test_create_host(self, remnawave):
"""Тест создания хоста"""
random_ip: str = f"{random.randint(500, 800)}" + ".0.0.1"
random_port: int = random.randint(5000, 8000)
random_remark: str = generate_random_string()
create_host = await remnawave.hosts.create_host(
CreateHostRequestDto(
inbound_uuid=REMNAWAVE_INBOUND_UUID,
config_profile_inbound_uuid=REMNAWAVE_CONFIG_PROFILE_UUID,
remark=random_remark,
address=random_ip,
port=random_port,
tag="TEST", # Добавление тега
is_hidden=False,
server_description="Test Server",
vless_route_id=1234,
shuffle_host=False,
mihomo_x25519=False,
)
)
)
assert isinstance(update_host, UpdateHostResponseDto)
assert update_host.remark == update_remark
assert update_host.alpn == update_alpn
assert update_host.fingerprint == update_fingerprint
assert isinstance(create_host, CreateHostResponseDto)
assert str(create_host.inbound_uuid) == REMNAWAVE_INBOUND_UUID
assert create_host.address == random_ip
assert create_host.port == random_port
assert create_host.remark == random_remark
assert create_host.tag == "TEST"
# Очистка - удаление созданного хоста
await remnawave.hosts.delete_host(uuid=str(create_host.uuid))
@pytest.mark.asyncio
async def test_get_one_host(self, remnawave, test_host):
"""Тест получения одного хоста"""
string_uuid = str(test_host.uuid)
host = await remnawave.hosts.get_one_host(uuid=string_uuid)
assert isinstance(host, GetOneHostResponseDto)
assert host.uuid == test_host.uuid
assert host.remark == test_host.remark
@pytest.mark.asyncio
async def test_update_host(self, remnawave, test_host):
"""Тест обновления хоста"""
# Создаем новый объект для обновления
update_data = UpdateHostRequestDto(
uuid=test_host.uuid,
serverDescription="Updated Host",
is_disabled=False # явно устанавливаем значение
)
# Обновляем хост
updated_host: UpdateHostResponseDto = await remnawave.hosts.update_host(update_data)
# Проверяем что обновление прошло успешно
assert updated_host is not None
assert updated_host.server_description == "Updated Host"
assert updated_host.is_disabled is False
@pytest.mark.asyncio
async def test_delete_host(self, remnawave):
"""Тест удаления хоста"""
# Сначала создаем хост для удаления
random_ip: str = f"{random.randint(500, 800)}" + ".0.0.1"
random_port: int = random.randint(5000, 8000)
random_remark: str = generate_random_string()
create_host = await remnawave.hosts.create_host(
CreateHostRequestDto(
inbound_uuid=REMNAWAVE_INBOUND_UUID,
config_profile_inbound_uuid=REMNAWAVE_CONFIG_PROFILE_UUID,
remark=random_remark,
address=random_ip,
port=random_port,
)
)
string_uuid = str(create_host.uuid)
# Теперь удаляем созданный хост
delete_host = await remnawave.hosts.delete_host(uuid=string_uuid)
assert isinstance(delete_host, DeleteHostResponseDto)
assert delete_host.is_deleted is True
# Проверяем, что хост действительно удален
try:
await remnawave.hosts.get_one_host(uuid=string_uuid)
pytest.fail("Хост не был удален")
except Exception:
# Ожидаем ошибку, так как хост удален
pass
delete_host = await remnawave.hosts.delete_host(uuid=string_uuid)
assert isinstance(delete_host, DeleteHostResponseDto)
assert delete_host.is_deleted is True
class TestHostsOrdering:
"""Тесты упорядочивания хостов"""
@pytest.mark.asyncio
async def test_reorder_hosts(self, remnawave):
"""Тест переупорядочивания хостов"""
try:
# Получаем список хостов для работы
hosts_response = await remnawave.hosts.get_all_hosts()
# Преобразуем в список для проверки длины
hosts_list = list(hosts_response)
# Если хостов меньше 2, пропускаем тест
if len(hosts_list) < 2:
pytest.skip("Not enough hosts to test reordering")
# Создаем объекты ReorderHostItem для первых двух хостов
# и меняем их порядок (первый становится вторым, второй - первым)
reorder_items = [
ReorderHostItem(view_position=1, uuid=hosts_list[1].uuid),
ReorderHostItem(view_position=0, uuid=hosts_list[0].uuid),
]
# Формируем запрос на переупорядочивание
reorder_request = ReorderHostRequestDto(hosts=reorder_items)
# Отправляем запрос
response: ReorderHostResponseDto = await remnawave.hosts.reorder_hosts(body=reorder_request)
# Проверяем ответ
assert response is not None
assert response.is_updated is True
except ApiError as e:
# В случае ошибки доступа пропускаем тест
pytest.skip(f"Could not reorder hosts: {str(e)}")
class TestHostsAdvanced:
"""Тесты расширенного функционала хостов"""
@pytest.mark.asyncio
async def test_create_host_with_advanced_options(self, remnawave):
"""Тест создания хоста с расширенными параметрами"""
random_ip: str = f"{random.randint(500, 800)}" + ".0.0.1"
random_port: int = random.randint(5000, 8000)
random_remark: str = generate_random_string()
# Создаем хост с расширенными параметрами
create_host = await remnawave.hosts.create_host(
CreateHostRequestDto(
inbound_uuid=REMNAWAVE_INBOUND_UUID,
config_profile_inbound_uuid=REMNAWAVE_CONFIG_PROFILE_UUID,
remark=random_remark,
address=random_ip,
port=random_port,
alpn=ALPN.H2,
fingerprint=Fingerprint.CHROME,
security_layer=SecurityLayer.TLS,
path="/websocket",
sni="example.com",
host="example.org",
allow_insecure=False,
is_disabled=False,
muxParams='{"enabled": true, "concurrency": 8}',
sockopt_params='{"mark": 255}',
tag="ADVANCED",
is_hidden=False,
override_sni_from_address=True,
server_description="Advanced Server",
vless_route_id=9876,
shuffle_host=True,
mihomo_x25519=True,
)
)
assert isinstance(create_host, CreateHostResponseDto)
assert create_host.alpn == ALPN.H2
assert create_host.fingerprint == Fingerprint.CHROME
assert create_host.security_layer == SecurityLayer.TLS
assert create_host.path == "/websocket"
assert create_host.sni == "example.com"
assert create_host.host == "example.org"
assert create_host.tag == "ADVANCED"
# Очистка - удаление созданного хоста
await remnawave.hosts.delete_host(uuid=str(create_host.uuid))

View file

@ -14,83 +14,156 @@ from remnawave.models import (
)
from tests.conftest import REMNAWAVE_USER_UUID
new_hwid = str(uuid.uuid4())
@pytest.mark.asyncio
async def test_get_hwid_user(remnawave):
hwid = await remnawave.hwid.get_hwid_user(uuid=REMNAWAVE_USER_UUID)
assert isinstance(hwid, GetUserHwidDevicesResponseDto)
assert hwid.devices is not None
@pytest.mark.asyncio
async def test_get_hwid_users(remnawave):
response = await remnawave.hwid.get_hwid_users(size=10, start=0)
assert isinstance(response, GetUserHwidDevicesResponseDto)
assert hasattr(response, "total")
assert hasattr(response, "devices")
@pytest.mark.asyncio
async def test_get_hwid_stats(remnawave):
response = await remnawave.hwid.get_hwid_stats()
assert isinstance(response, GetHwidStatisticsResponseDto)
assert hasattr(response, "by_platform")
assert hasattr(response, "by_app")
assert hasattr(response, "stats")
assert hasattr(response.stats, "total_unique_devices")
assert hasattr(response.stats, "total_hwid_devices")
assert hasattr(response.stats, "average_hwid_devices_per_user")
@pytest.mark.asyncio
async def test_add_hwid_to_user(remnawave):
create_request = CreateUserHwidDeviceRequestDto(
hwid=new_hwid,
user_uuid=REMNAWAVE_USER_UUID,
platform="Windows",
os_version="10.0.19042",
device_model="Surface Pro",
user_agent="Mozilla/5.0"
)
response = await remnawave.hwid.add_hwid_to_users(body=create_request)
assert isinstance(response, CreateUserHwidDeviceResponseDto)
assert any(item.hwid == new_hwid for item in response.devices)
@pytest.mark.asyncio
async def test_delete_hwid_user(remnawave):
delete_request = DeleteUserHwidDeviceRequestDto(
hwid=new_hwid,
user_uuid=REMNAWAVE_USER_UUID
)
response = await remnawave.hwid.delete_hwid_to_user(body=delete_request)
assert isinstance(response, DeleteUserHwidDeviceResponseDto)
assert not any(item.hwid == new_hwid for item in response.devices)
@pytest.mark.asyncio
async def test_delete_all_hwid_user(remnawave):
# Сначала добавим новый HWID
create_request = CreateUserHwidDeviceRequestDto(
hwid=str(uuid.uuid4()),
user_uuid=REMNAWAVE_USER_UUID,
platform="iOS",
os_version="15.0",
device_model="iPhone 13",
user_agent="Safari/605.1.15"
)
await remnawave.hwid.add_hwid_to_users(body=create_request)
class TestHwidInfo:
"""Тесты для получения информации о HWID устройствах"""
# Теперь удалим все HWID устройства пользователя
delete_all_request = DeleteUserAllHwidDeviceRequestDto(
user_uuid=REMNAWAVE_USER_UUID
)
response = await remnawave.hwid.delete_all_hwid_user(body=delete_all_request)
@pytest.mark.asyncio
async def test_get_hwid_user(self, remnawave):
"""Тест получения HWID устройств конкретного пользователя"""
hwid = await remnawave.hwid.get_hwid_user(uuid=REMNAWAVE_USER_UUID)
assert isinstance(hwid, GetUserHwidDevicesResponseDto)
assert hasattr(hwid, "devices")
assert isinstance(response, DeleteUserHwidDeviceResponseDto)
assert len(response.devices) == 0
@pytest.mark.asyncio
async def test_get_hwid_users(self, remnawave):
"""Тест получения всех HWID устройств с пагинацией"""
response = await remnawave.hwid.get_hwid_users(size=10, start=0)
assert isinstance(response, GetUserHwidDevicesResponseDto)
assert hasattr(response, "total")
assert hasattr(response, "devices")
class TestHwidStatistics:
"""Тесты для статистики HWID устройств"""
# Проверим, что устройства действительно удалены
hwid_check = await remnawave.hwid.get_hwid_user(uuid=REMNAWAVE_USER_UUID)
assert len(hwid_check.devices) == 0
@pytest.mark.asyncio
async def test_get_hwid_stats(self, remnawave):
"""Тест получения статистики по HWID устройствам"""
try:
response = await remnawave.hwid.get_hwid_stats()
assert isinstance(response, GetHwidStatisticsResponseDto)
# Проверяем структуру ответа
assert hasattr(response, "by_platform")
assert hasattr(response, "by_app")
assert hasattr(response, "stats")
# Проверяем поля статистики
assert hasattr(response.stats, "total_unique_devices")
assert hasattr(response.stats, "total_hwid_devices")
assert hasattr(response.stats, "average_hwid_devices_per_user")
# Проверяем типы данных в ответе
assert isinstance(response.stats.total_unique_devices, float)
assert isinstance(response.stats.total_hwid_devices, float)
assert isinstance(response.stats.average_hwid_devices_per_user, float)
# Проверяем данные по платформам
if len(response.by_platform) > 0:
platform = response.by_platform[0]
assert hasattr(platform, "platform")
assert hasattr(platform, "count")
# Проверяем данные по приложениям
if len(response.by_app) > 0:
app = response.by_app[0]
assert hasattr(app, "app")
assert hasattr(app, "count")
except Exception as e:
pytest.skip(f"Пропуск теста статистики HWID: {str(e)}")
class TestHwidCRUD:
"""Тесты для CRUD операций с HWID устройствами"""
@pytest.fixture
def test_hwid(self):
"""Фикстура для генерации тестового HWID"""
return str(uuid.uuid4())
@pytest.mark.asyncio
# @pytest.mark.xfail(reason="User hwid device limit может быть достигнут")
async def test_add_hwid_to_user(self, remnawave, test_hwid):
"""Тест добавления HWID устройства пользователю"""
# Создаем запрос на добавление HWID
create_request = CreateUserHwidDeviceRequestDto(
hwid=test_hwid,
user_uuid=REMNAWAVE_USER_UUID,
platform="Windows",
os_version="10.0.19042",
device_model="Surface Pro",
user_agent="Mozilla/5.0"
)
# Отправляем запрос
response = await remnawave.hwid.add_hwid_to_users(body=create_request)
# Проверяем результат
assert isinstance(response, CreateUserHwidDeviceResponseDto)
assert any(item.hwid == test_hwid for item in response.devices)
# Проверяем, что устройство действительно добавлено
hwid_check = await remnawave.hwid.get_hwid_user(uuid=REMNAWAVE_USER_UUID)
assert any(device.hwid == test_hwid for device in hwid_check.devices)
@pytest.mark.asyncio
async def test_delete_hwid_user(self, remnawave, test_hwid):
"""Тест удаления HWID устройства у пользователя"""
# Сначала добавляем устройство
create_request = CreateUserHwidDeviceRequestDto(
hwid=test_hwid,
user_uuid=REMNAWAVE_USER_UUID,
platform="Android",
os_version="12",
device_model="Pixel 6",
user_agent="Chrome Mobile"
)
await remnawave.hwid.add_hwid_to_users(body=create_request)
# Удаляем устройство
delete_request = DeleteUserHwidDeviceRequestDto(
hwid=test_hwid,
user_uuid=REMNAWAVE_USER_UUID
)
response = await remnawave.hwid.delete_hwid_to_user(body=delete_request)
# Проверяем результат
assert isinstance(response, DeleteUserHwidDeviceResponseDto)
assert not any(item.hwid == test_hwid for item in response.devices)
# Проверяем, что устройство действительно удалено
hwid_check = await remnawave.hwid.get_hwid_user(uuid=REMNAWAVE_USER_UUID)
assert not any(device.hwid == test_hwid for device in hwid_check.devices)
@pytest.mark.asyncio
async def test_delete_all_hwid_user(self, remnawave):
"""Тест удаления всех HWID устройств пользователя"""
# Сначала добавим новый HWID
random_hwid = str(uuid.uuid4())
create_request = CreateUserHwidDeviceRequestDto(
hwid=random_hwid,
user_uuid=REMNAWAVE_USER_UUID,
platform="iOS",
os_version="15.0",
device_model="iPhone 13",
user_agent="Safari/605.1.15"
)
await remnawave.hwid.add_hwid_to_users(body=create_request)
# Проверяем, что устройство добавлено
check_before = await remnawave.hwid.get_hwid_user(uuid=REMNAWAVE_USER_UUID)
assert any(device.hwid == random_hwid for device in check_before.devices)
# Теперь удалим все HWID устройства пользователя
delete_all_request = DeleteUserAllHwidDeviceRequestDto(
user_uuid=REMNAWAVE_USER_UUID
)
response = await remnawave.hwid.delete_all_hwid_user(body=delete_all_request)
# Проверяем результат
assert isinstance(response, DeleteUserHwidDeviceResponseDto)
# Проверяем, что устройства действительно удалены
hwid_check = await remnawave.hwid.get_hwid_user(uuid=REMNAWAVE_USER_UUID)
assert not any(device.hwid == random_hwid for device in hwid_check.devices)

View file

@ -0,0 +1,29 @@
import pytest
from remnawave.models import (
GetAllSubscriptionRequestHistoryResponseDto,
GetSubscriptionRequestHistoryStatsResponseDto
)
class TestSubscriptionRequestHistory:
"""Тесты для истории запросов подписок"""
@pytest.mark.asyncio
async def test_get_all_subscription_request_history(self, remnawave):
"""Тест получения всей истории запросов подписок"""
response = await remnawave.subscription_request_history.get_all_subscription_request_history(
size=10,
start=0
)
assert isinstance(response, GetAllSubscriptionRequestHistoryResponseDto)
assert hasattr(response, 'total')
assert hasattr(response, 'records')
@pytest.mark.asyncio
async def test_get_subscription_request_history_stats(self, remnawave):
"""Тест получения статистики истории запросов подписок"""
response = await remnawave.subscription_request_history.get_subscription_request_history_stats()
assert isinstance(response, GetSubscriptionRequestHistoryStatsResponseDto)
assert hasattr(response, 'by_parsed_app')
assert hasattr(response, 'hourly_request_stats')

View file

@ -1,33 +1,74 @@
import pytest
from remnawave.enums import ClientType
from remnawave.models import GetSubscriptionInfoResponseDto
from tests.conftest import REMNAWAVE_SHORT_UUID
from remnawave.exceptions.general import ApiError
from remnawave.models import (
GetSubscriptionInfoResponseDto,
GetRawSubscriptionByShortUuidResponseDto,
GetAllSubscriptionsResponseDto,
GetSubscriptionByUsernameResponseDto
)
from tests.conftest import REMNAWAVE_SHORT_UUID, REMNAWAVE_USER_USERNAME
@pytest.mark.asyncio
async def test_subscriptions(remnawave):
subscription_info = (
await remnawave.subscription.get_subscription_info_by_short_uuid(
class TestSubscriptionInfo:
"""Тесты для получения информации о подписках"""
@pytest.mark.asyncio
async def test_get_subscription_info_by_short_uuid(self, remnawave):
"""Тест получения информации о подписке по короткому UUID"""
subscription_info = await remnawave.subscription.get_subscription_info_by_short_uuid(
short_uuid=REMNAWAVE_SHORT_UUID
)
)
assert isinstance(subscription_info, GetSubscriptionInfoResponseDto)
assert subscription_info.is_found is True
assert isinstance(subscription_info, GetSubscriptionInfoResponseDto)
assert subscription_info.is_found is True
assert hasattr(subscription_info, 'user')
@pytest.mark.asyncio
async def test_get_raw_subscription_by_short_uuid(self, remnawave):
"""Тест получения сырой подписки по короткому UUID"""
subscription = await remnawave.subscription.get_subscription(
short_uuid=REMNAWAVE_SHORT_UUID
)
assert isinstance(subscription, str)
subscription_by_client_type = (
await remnawave.subscription.get_subscription_by_client_type(
short_uuid=REMNAWAVE_SHORT_UUID, client_type=ClientType.SINGBOX
raw_subscription = await remnawave.subscriptions.get_raw_subscription(
short_uuid=REMNAWAVE_SHORT_UUID
)
)
assert isinstance(subscription_by_client_type, str)
assert isinstance(raw_subscription, GetRawSubscriptionByShortUuidResponseDto)
subscription_with_type = await remnawave.subscription.get_subscription_with_type(
short_uuid=REMNAWAVE_SHORT_UUID
)
assert isinstance(subscription_with_type, str)
class TestSubscriptionContent:
"""Тесты для получения контента подписок"""
@pytest.mark.asyncio
async def test_get_subscription(self, remnawave):
"""Тест получения подписки по короткому UUID"""
subscription = await remnawave.subscription.get_subscription(
short_uuid=REMNAWAVE_SHORT_UUID
)
assert isinstance(subscription, str)
@pytest.mark.asyncio
async def test_get_subscription_with_type(self, remnawave):
"""Тест получения подписки с типом"""
subscription_with_type = await remnawave.subscription.get_subscription_with_type(
short_uuid=REMNAWAVE_SHORT_UUID
)
assert isinstance(subscription_with_type, str)
assert len(subscription_with_type) > 0
class TestSubscriptionsManagement:
"""Тесты для управления подписками"""
@pytest.mark.asyncio
async def test_get_all_subscriptions(self, remnawave):
"""Тест получения всех подписок"""
all_subscriptions = await remnawave.subscriptions.get_all_subscriptions()
assert isinstance(all_subscriptions, GetAllSubscriptionsResponseDto)
assert hasattr(all_subscriptions, 'subscriptions')
assert hasattr(all_subscriptions, 'total')
@pytest.mark.asyncio
async def test_get_subscription_by_username(self, remnawave):
"""Тест получения подписки по имени пользователя"""
subscription_by_username = await remnawave.subscriptions.get_subscription_by_username(
username=REMNAWAVE_USER_USERNAME
)
assert isinstance(subscription_by_username, GetSubscriptionByUsernameResponseDto)

View file

@ -0,0 +1,82 @@
import pytest
from remnawave.models import (
GetAllSubscriptionRequestHistoryResponseDto,
GetSubscriptionRequestHistoryStatsResponseDto
)
class TestSubscriptionRequestHistory:
"""Тесты для истории запросов подписок"""
@pytest.mark.asyncio
async def test_get_all_subscription_request_history(self, remnawave):
"""Тест получения всей истории запросов подписок"""
try:
response = await remnawave.subscription_request_history.get_all_subscription_request_history(
size=10,
start=0
)
assert isinstance(response, GetAllSubscriptionRequestHistoryResponseDto)
assert hasattr(response, 'total')
assert hasattr(response, 'records')
# Проверяем корректность структуры ответа
if response.total > 0 and len(response.records) > 0:
record = response.records[0]
assert hasattr(record, 'id')
assert hasattr(record, 'user_uuid')
assert hasattr(record, 'request_at')
assert hasattr(record, 'request_ip')
assert hasattr(record, 'user_agent')
except Exception as e:
pytest.skip(f"Пропуск теста истории запросов подписок: {str(e)}")
@pytest.mark.asyncio
async def test_get_subscription_request_history_stats(self, remnawave):
"""Тест получения статистики истории запросов подписок"""
try:
response = await remnawave.subscription_request_history.get_subscription_request_history_stats()
assert isinstance(response, GetSubscriptionRequestHistoryStatsResponseDto)
assert hasattr(response, 'by_parsed_app')
assert hasattr(response, 'hourly_request_stats')
# Проверяем корректность структуры ответа
if len(response.by_parsed_app) > 0:
app_stat = response.by_parsed_app[0]
assert hasattr(app_stat, 'app')
assert hasattr(app_stat, 'count')
if len(response.hourly_request_stats) > 0:
hourly_stat = response.hourly_request_stats[0]
assert hasattr(hourly_stat, 'date_time')
assert hasattr(hourly_stat, 'request_count')
except Exception as e:
pytest.skip(f"Пропуск теста статистики истории запросов подписок: {str(e)}")
@pytest.mark.asyncio
async def test_subscription_request_history_pagination(self, remnawave):
"""Тест пагинации истории запросов подписок"""
try:
# Получаем первую страницу
first_page = await remnawave.subscription_request_history.get_all_subscription_request_history(
size=5,
start=0
)
# Получаем вторую страницу
second_page = await remnawave.subscription_request_history.get_all_subscription_request_history(
size=5,
start=5
)
# Проверяем, что пагинация работает
if first_page.total > 10:
# Проверяем, что ID записей на разных страницах отличаются
if len(first_page.records) > 0 and len(second_page.records) > 0:
first_ids = [record.id for record in first_page.records]
second_ids = [record.id for record in second_page.records]
# Проверяем, что нет пересечений между страницами
assert len(set(first_ids).intersection(set(second_ids))) == 0
except Exception as e:
pytest.skip(f"Пропуск теста пагинации: {str(e)}")

View file

@ -4,16 +4,63 @@ from remnawave.models import (
GetBandwidthStatsResponseDto,
GetNodesStatisticsResponseDto,
GetStatsResponseDto,
GetNodesMetricsResponseDto,
GetRemnawaveHealthResponseDto,
)
@pytest.mark.asyncio
async def test_system(remnawave):
stats = await remnawave.system.get_stats()
assert isinstance(stats, GetStatsResponseDto)
class TestSystemStatistics:
"""Тесты для получения статистики системы"""
@pytest.mark.asyncio
async def test_get_stats(self, remnawave):
"""Тест получения общей статистики"""
stats = await remnawave.system.get_stats()
assert isinstance(stats, GetStatsResponseDto)
assert hasattr(stats, 'timestamp')
assert hasattr(stats, 'uptime')
@pytest.mark.asyncio
async def test_get_bandwidth_stats(self, remnawave):
"""Тест получения статистики по полосе пропускания"""
bandwidth_stats = await remnawave.system.get_bandwidth_stats()
assert isinstance(bandwidth_stats, GetBandwidthStatsResponseDto)
assert hasattr(bandwidth_stats, 'current_year')
@pytest.mark.asyncio
async def test_get_nodes_statistics(self, remnawave):
"""Тест получения статистики по нодам"""
nodes_statistics = await remnawave.system.get_nodes_statistics()
assert isinstance(nodes_statistics, GetNodesStatisticsResponseDto)
assert hasattr(nodes_statistics, 'last_seven_days')
bandwidth_stats = await remnawave.system.get_bandwidth_stats()
assert isinstance(bandwidth_stats, GetBandwidthStatsResponseDto)
nodes_statistics = await remnawave.system.get_nodes_statistics()
assert isinstance(nodes_statistics, GetNodesStatisticsResponseDto)
class TestSystemMonitoring:
"""Тесты для мониторинга системы"""
@pytest.mark.asyncio
async def test_get_nodes_metrics(self, remnawave):
"""Тест получения метрик нод"""
nodes_metrics = await remnawave.system.get_nodes_metrics()
assert isinstance(nodes_metrics, GetNodesMetricsResponseDto)
assert hasattr(nodes_metrics, 'nodes')
assert isinstance(nodes_metrics.nodes, list)
if nodes_metrics.nodes: # Если список не пустой
node = nodes_metrics.nodes[0]
assert hasattr(node, 'uuid')
assert hasattr(node, 'name')
assert hasattr(node, 'cpu_usage')
assert hasattr(node, 'memory_usage')
assert hasattr(node, 'network_upload')
assert hasattr(node, 'network_download')
assert hasattr(node, 'uptime')
assert hasattr(node, 'last_seen')
assert hasattr(node, 'connected_users')
@pytest.mark.asyncio
async def test_get_health(self, remnawave):
"""Тест получения состояния здоровья системы"""
health = await remnawave.system.get_health()
assert isinstance(health, GetRemnawaveHealthResponseDto)
assert hasattr(health, 'pm2_stats')

View file

@ -16,133 +16,275 @@ from remnawave.models import (
UserResponseDto,
UsersResponseDto,
TagsResponseDto,
RevokeUserRequestDto
RevokeUserRequestDto,
GetSubscriptionRequestsResponseDto
)
from tests.utils import generate_email, generate_random_string
@pytest.mark.asyncio
async def test_users(remnawave) -> None:
email: str = generate_email(length=8)
username: str = generate_random_string(length=8)
telegram_id: int = random.randint(100000000, 999999999)
expire_at: datetime = datetime.now(tz=pytz.UTC) + timedelta(days=7)
class TestUsersCRUD:
"""Тесты базовых CRUD операций для пользователей"""
@pytest.mark.asyncio
async def test_create_user(self, remnawave):
email: str = generate_email(length=8)
username: str = generate_random_string(length=8)
telegram_id: int = random.randint(100000000, 999999999)
expire_at: datetime = datetime.now(tz=pytz.UTC) + timedelta(days=7)
create_user = await remnawave.users.create_user(
create_user = await remnawave.users.create_user(
CreateUserRequestDto(
username=username,
email=email,
telegram_id=telegram_id,
expire_at=expire_at,
)
)
assert isinstance(create_user, UserResponseDto)
assert create_user.username == username
assert create_user.email == email
assert create_user.telegram_id == telegram_id
assert create_user.expire_at.isoformat(timespec="seconds") == expire_at.isoformat(
timespec="seconds"
)
# Clean up - delete the test user
string_uuid = str(create_user.uuid)
await remnawave.users.delete_user(uuid=string_uuid)
@pytest.mark.asyncio
async def test_update_user(self, remnawave):
# Create test user first
username: str = generate_random_string(length=8)
expire_at: datetime = datetime.now(tz=pytz.UTC) + timedelta(days=7)
create_user = await remnawave.users.create_user(
CreateUserRequestDto(
username=username,
expire_at=expire_at,
)
)
string_uuid = str(create_user.uuid)
# Update user
update_description: str = "TEST"
update_status: UserStatus = UserStatus.DISABLED
update_user = await remnawave.users.update_user(
UpdateUserRequestDto(
uuid=string_uuid, status=update_status, description=update_description
)
)
assert isinstance(update_user, UserResponseDto)
assert update_user.uuid == create_user.uuid
assert update_user.status == update_status
assert update_user.description == update_description
# Clean up
await remnawave.users.delete_user(uuid=string_uuid)
@pytest.mark.asyncio
async def test_delete_user(self, remnawave):
# Create test user first
username: str = generate_random_string(length=8)
expire_at: datetime = datetime.now(tz=pytz.UTC) + timedelta(days=7)
create_user = await remnawave.users.create_user(
CreateUserRequestDto(
username=username,
expire_at=expire_at,
)
)
string_uuid = str(create_user.uuid)
# Delete user
delete_user = await remnawave.users.delete_user(uuid=string_uuid)
assert isinstance(delete_user, DeleteUserResponseDto)
assert delete_user.is_deleted is True
class TestUsersFetch:
"""Тесты получения информации о пользователях"""
@pytest.mark.asyncio
async def test_get_all_users(self, remnawave):
all_users = await remnawave.users.get_all_users_v2()
assert isinstance(all_users, UsersResponseDto)
@pytest.mark.asyncio
async def test_get_user_by_uuid(self, remnawave, test_user):
string_uuid = str(test_user.uuid)
user_uuid = await remnawave.users.get_user_by_uuid(uuid=string_uuid)
assert isinstance(user_uuid, UserResponseDto)
assert user_uuid.uuid == test_user.uuid
@pytest.mark.asyncio
async def test_get_user_by_short_uuid(self, remnawave, test_user):
user_short_uuid = await remnawave.users.get_user_by_short_uuid(
short_uuid=test_user.short_uuid
)
assert isinstance(user_short_uuid, UserResponseDto)
assert user_short_uuid.uuid == test_user.uuid
@pytest.mark.asyncio
async def test_get_user_by_username(self, remnawave, test_user):
user_username = await remnawave.users.get_user_by_username(
username=test_user.username
)
assert isinstance(user_username, UserResponseDto)
assert user_username.uuid == test_user.uuid
@pytest.mark.asyncio
async def test_get_users_by_telegram_id(self, remnawave, test_user_with_telegram):
string_telegram_id = str(test_user_with_telegram.telegram_id)
user_telegram_id = await remnawave.users.get_users_by_telegram_id(
telegram_id=string_telegram_id
)
assert isinstance(user_telegram_id, TelegramUserResponseDto)
assert any(user.uuid == test_user_with_telegram.uuid for user in user_telegram_id)
@pytest.mark.asyncio
async def test_get_users_by_email(self, remnawave, test_user_with_email):
user_email = await remnawave.users.get_users_by_email(email=test_user_with_email.email)
assert isinstance(user_email, EmailUserResponseDto)
assert any(user.uuid == test_user_with_email.uuid for user in user_email)
@pytest.mark.asyncio
async def test_get_all_tags(self, remnawave):
users_tags = await remnawave.users.get_all_tags()
assert isinstance(users_tags, TagsResponseDto)
@pytest.mark.asyncio
async def test_get_user_accessible_nodes(self, remnawave, test_user):
try:
string_uuid = str(test_user.uuid)
user_accessible_nodes = await remnawave.users.get_user_accessible_nodes(uuid=string_uuid)
assert isinstance(user_accessible_nodes, GetUserAccessibleNodesResponseDto)
assert isinstance(user_accessible_nodes.nodes, list)
except ApiError as e:
# This might fail if the user doesn't have access to any nodes
# or if the feature is not available, which is acceptable for testing
assert e.error.code in [ErrorCode.USER_NOT_FOUND, ]
@pytest.mark.asyncio
async def test_get_subscription_requests(self, remnawave, test_user):
"""Test fetching user subscription request history"""
string_uuid = str(test_user.uuid)
try:
subscription_requests = await remnawave.users.get_subscription_requests(uuid=string_uuid)
assert isinstance(subscription_requests, GetSubscriptionRequestsResponseDto)
assert hasattr(subscription_requests, 'total')
assert hasattr(subscription_requests, 'records')
# Даже если записей нет, модель должна быть правильно сформирована
# с пустым списком records и total=0
except ApiError as e:
# Этот блок должен срабатывать только если API вернуло ошибку
# (404, 403 и т.д.), но не когда просто нет записей
assert e.error.code in [ErrorCode.USER_NOT_FOUND]
class TestUserActions:
"""Тесты действий над пользователями"""
@pytest.mark.asyncio
async def test_reset_user_traffic(self, remnawave, test_user):
string_uuid = str(test_user.uuid)
user_reset_traffic = await remnawave.users.reset_user_traffic(uuid=string_uuid)
assert isinstance(user_reset_traffic, UserResponseDto)
assert user_reset_traffic.uuid == test_user.uuid
assert user_reset_traffic.used_traffic_bytes == 0
@pytest.mark.asyncio
async def test_disable_enable_user(self, remnawave, test_user):
string_uuid = str(test_user.uuid)
# Disable user
try:
disable_user = await remnawave.users.disable_user(uuid=string_uuid)
assert isinstance(disable_user, UserResponseDto)
assert disable_user.uuid == test_user.uuid
assert disable_user.status == UserStatus.DISABLED
except ApiError as e:
assert e.error.code == ErrorCode.USER_ALREADY_DISABLED
# Enable user
try:
enable_user = await remnawave.users.enable_user(uuid=string_uuid)
assert isinstance(enable_user, UserResponseDto)
assert enable_user.uuid == test_user.uuid
assert enable_user.status == UserStatus.ACTIVE
except ApiError as e:
assert e.error.code == ErrorCode.USER_ALREADY_ENABLED
@pytest.mark.asyncio
async def test_revoke_user_subscription(self, remnawave, test_user):
string_uuid = str(test_user.uuid)
old_short_uuid = test_user.short_uuid
revoke_user_subscription = await remnawave.users.revoke_user_subscription(uuid=string_uuid)
assert isinstance(revoke_user_subscription, UserResponseDto)
assert revoke_user_subscription.uuid == test_user.uuid
assert revoke_user_subscription.short_uuid != old_short_uuid
@pytest.fixture
async def test_user(remnawave):
"""Fixture to create a test user for tests"""
username = generate_random_string(length=8)
expire_at = datetime.now(tz=pytz.UTC) + timedelta(days=7)
user = await remnawave.users.create_user(
CreateUserRequestDto(
username=username,
expire_at=expire_at,
)
)
yield user
# Clean up
await remnawave.users.delete_user(uuid=str(user.uuid))
@pytest.fixture
async def test_user_with_email(remnawave):
"""Fixture to create a test user with email for tests"""
username = generate_random_string(length=8)
email = generate_email(length=8)
expire_at = datetime.now(tz=pytz.UTC) + timedelta(days=7)
user = await remnawave.users.create_user(
CreateUserRequestDto(
username=username,
email=email,
expire_at=expire_at,
)
)
yield user
# Clean up
await remnawave.users.delete_user(uuid=str(user.uuid))
@pytest.fixture
async def test_user_with_telegram(remnawave):
"""Fixture to create a test user with telegram ID for tests"""
username = generate_random_string(length=8)
telegram_id = random.randint(100000000, 999999999)
expire_at = datetime.now(tz=pytz.UTC) + timedelta(days=7)
user = await remnawave.users.create_user(
CreateUserRequestDto(
username=username,
telegram_id=telegram_id,
expire_at=expire_at,
)
)
assert isinstance(create_user, UserResponseDto)
assert create_user.username == username
assert create_user.email == email
assert create_user.telegram_id == telegram_id
assert create_user.expire_at.isoformat(timespec="seconds") == expire_at.isoformat(
timespec="seconds"
)
string_uuid = str(create_user.uuid)
string_telegram_id = str(create_user.telegram_id)
all_users = await remnawave.users.get_all_users_v2()
assert isinstance(all_users, UsersResponseDto)
user_uuid = await remnawave.users.get_user_by_uuid(uuid=string_uuid)
assert isinstance(user_uuid, UserResponseDto)
assert user_uuid.uuid == create_user.uuid
user_short_uuid = await remnawave.users.get_user_by_short_uuid(
short_uuid=user_uuid.short_uuid
)
assert isinstance(user_short_uuid, UserResponseDto)
assert user_short_uuid.uuid == create_user.uuid
# Only test get_user_by_subscription_uuid if subscription_uuid is not None
# if create_user.subscription_uuid is not None:
# string_subscription_uuid = str(create_user.subscription_uuid)
# user_subscription_uuid = await remnawave.users.get_user_by_subscription_uuid(
# subscription_uuid=string_subscription_uuid
# )
# assert isinstance(user_subscription_uuid, UserResponseDto)
# assert user_subscription_uuid.uuid == create_user.uuid
user_username = await remnawave.users.get_user_by_username(
username=user_uuid.username
)
assert isinstance(user_username, UserResponseDto)
assert user_username.uuid == create_user.uuid
user_telegram_id = await remnawave.users.get_users_by_telegram_id(
telegram_id=string_telegram_id
)
assert isinstance(user_telegram_id, TelegramUserResponseDto)
assert any(user.uuid == create_user.uuid for user in user_telegram_id)
user_email = await remnawave.users.get_users_by_email(email=user_uuid.email)
assert isinstance(user_email, EmailUserResponseDto)
assert any(user.uuid == create_user.uuid for user in user_email)
user_reset_traffic = await remnawave.users.reset_user_traffic(uuid=string_uuid)
assert isinstance(user_reset_traffic, UserResponseDto)
assert user_reset_traffic.uuid == create_user.uuid
assert user_reset_traffic.used_traffic_bytes == 0
try:
disable_user = await remnawave.users.disable_user(uuid=string_uuid)
assert isinstance(disable_user, UserResponseDto)
assert disable_user.uuid == create_user.uuid
assert disable_user.status == UserStatus.DISABLED
except ApiError as e:
assert e.error.code == ErrorCode.USER_ALREADY_DISABLED
try:
enable_user = await remnawave.users.enable_user(uuid=string_uuid)
assert isinstance(enable_user, UserResponseDto)
assert enable_user.uuid == create_user.uuid
assert enable_user.status == UserStatus.ACTIVE
except ApiError as e:
assert e.error.code == ErrorCode.USER_ALREADY_ENABLED
update_description: str = "TEST"
update_status: UserStatus = UserStatus.DISABLED
update_user = await remnawave.users.update_user(
UpdateUserRequestDto(
uuid=string_uuid, status=update_status, description=update_description
)
)
assert isinstance(update_user, UserResponseDto)
assert update_user.uuid == create_user.uuid
assert update_user.status == update_status
assert update_user.description == update_description
# Temporarily disabled, error in backend
revoke_user_subscription = await remnawave.users.revoke_user_subscription(
uuid=string_uuid,
# body=RevokeUserRequestDto(
# short_uuid="fokfaa"
# )
)
assert isinstance(revoke_user_subscription, UserResponseDto)
assert revoke_user_subscription.uuid == create_user.uuid
assert revoke_user_subscription.short_uuid != create_user.short_uuid
# Test get user accessible nodes
try:
user_accessible_nodes = await remnawave.users.get_user_accessible_nodes(uuid=string_uuid)
assert isinstance(user_accessible_nodes, GetUserAccessibleNodesResponseDto)
assert isinstance(user_accessible_nodes.nodes, list)
except ApiError as e:
# This might fail if the user doesn't have access to any nodes
# or if the feature is not available, which is acceptable for testing
assert e.error_code in [ErrorCode.USER_NOT_FOUND, ErrorCode.FORBIDDEN, ErrorCode.NOT_FOUND]
delete_user = await remnawave.users.delete_user(uuid=string_uuid)
assert isinstance(delete_user, DeleteUserResponseDto)
assert delete_user.is_deleted is True
users_tags = await remnawave.users.get_all_tags()
assert isinstance(users_tags, TagsResponseDto)
yield user
# Clean up
await remnawave.users.delete_user(uuid=str(user.uuid))