Remove tests

This commit is contained in:
Ingmar Stein
2025-12-13 13:55:00 +01:00
parent 7eb7307132
commit 18dab2134b
19 changed files with 1 additions and 2412 deletions

View File

@@ -2,13 +2,6 @@
tronbyt-server
go.work
# Python (Legacy)
__pycache__
.venv
.mypy_cache
.pytest_cache
.ruff_cache
# IDE / Editor
.vscode
.idea

View File

@@ -2,7 +2,7 @@
## Project Overview
This project is the **Tronbyt Server** (v2), a Go-based rewrite of the original Python/FastAPI application. It manages Tronbyt devices (flashed Tidbyts) locally, providing a web UI and API for app management and rendering.
This project is the **Tronbyt Server**, a Go-based application which manages Tronbyt devices (flashed Tidbyts) locally, providing a web UI and API for app management and rendering.
The core functionality involves serving WebP images to devices, generated by rendering Pixlet apps (Starlark scripts). The server integrates with the native Go `pixlet` library.
@@ -67,4 +67,3 @@ The core functionality involves serving WebP images to devices, generated by ren
* **Logging:** Use `log/slog` for structured logging.
* **Templates:** Use `{{ t .Localizer "MessageID" }}` for translated strings.
* **Database:** Use GORM for DB interactions. Ensure migrations are updated in `cmd/migrate` or auto-migrated in `server.go` if appropriate.

View File

@@ -277,14 +277,6 @@ func (s *Server) handleEditUserGet(w http.ResponseWriter, r *http.Request) {
return
}
// Get System Repo Info if admin (Stub for now or implement) // Python: system_apps.get_system_repo_info
// I'll leave it empty for now or implement later if critical.
// Template expects 'system_repo_info' but I don't pass it in TemplateData explicitly,
// unless I extend TemplateData or pass map.
// Go TemplateData has User.
// I need to add fields to TemplateData if I want to pass extra info.
// 'FirmwareVersion' is there.
firmwareVersion := "unknown"
firmwareFile := filepath.Join(s.DataDir, "firmware", "firmware_version.txt")
if bytes, err := os.ReadFile(firmwareFile); err == nil {

View File

View File

@@ -1,111 +0,0 @@
from pathlib import Path
from typing import Iterator
import sqlite3
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
import shutil
from tronbyt_server import db
from tronbyt_server.main import app as fastapi_app
from tronbyt_server.dependencies import get_db
from tronbyt_server.config import get_settings
settings = get_settings()
@pytest.fixture(scope="session")
def db_connection(
tmp_path_factory: pytest.TempPathFactory,
) -> Iterator[sqlite3.Connection]:
tmp_path = tmp_path_factory.mktemp("data")
db_path = tmp_path / "testdb.sqlite"
db_path.unlink(missing_ok=True)
settings.DB_FILE = str(db_path)
settings.DATA_DIR = str(tmp_path / "data")
settings.USERS_DIR = str(tmp_path / "users")
settings.ENABLE_USER_REGISTRATION = "1"
with sqlite3.connect(settings.DB_FILE, check_same_thread=False) as conn:
# Initialize the database schema immediately after connection.
db.init_db(conn)
yield conn
@pytest.fixture(scope="session")
def app(db_connection: sqlite3.Connection) -> Iterator[FastAPI]:
def get_db_override() -> sqlite3.Connection:
return db_connection
fastapi_app.dependency_overrides[get_db] = get_db_override
yield fastapi_app
fastapi_app.dependency_overrides.clear()
@pytest.fixture(autouse=True)
def db_cleanup(db_connection: sqlite3.Connection) -> Iterator[None]:
yield
cursor = db_connection.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = [row[0] for row in cursor.fetchall()]
for table in tables:
if table != "sqlite_sequence":
cursor.execute(f'DELETE FROM "{table}"')
db_connection.commit()
@pytest.fixture(autouse=True)
def settings_cleanup() -> Iterator[None]:
original_enable_user_registration = settings.ENABLE_USER_REGISTRATION
original_max_users = settings.MAX_USERS
yield
settings.ENABLE_USER_REGISTRATION = original_enable_user_registration
settings.MAX_USERS = original_max_users
@pytest.fixture()
def client(app: FastAPI) -> TestClient:
return TestClient(app)
@pytest.fixture()
def auth_client(app: FastAPI) -> Iterator[TestClient]:
with TestClient(app) as client:
# Create owner
response = client.post(
"/auth/register_owner",
data={"password": "password"},
follow_redirects=False,
)
assert response.status_code == 302
# Register testuser
response = client.post(
"/auth/register",
data={"username": "testuser", "password": "password"},
follow_redirects=False,
)
assert response.status_code == 302 or response.status_code == 409
# Login as testuser
response = client.post(
"/auth/login",
data={"username": "testuser", "password": "password"},
follow_redirects=False,
)
assert response.status_code == 302
yield client
@pytest.fixture()
def clean_app() -> Iterator[TestClient]:
users_dir = Path("tests/users")
if users_dir.exists():
shutil.rmtree(users_dir)
users_dir.mkdir()
yield TestClient(fastapi_app)
if users_dir.exists():
shutil.rmtree(users_dir)

View File

@@ -1,167 +0,0 @@
import sqlite3
from operator import attrgetter
from pathlib import Path
from fastapi.testclient import TestClient
from tronbyt_server import db
from tronbyt_server.models.app import App
from . import utils
def test_api(auth_client: TestClient, db_connection: sqlite3.Connection) -> None:
# Create a device
response = auth_client.post(
"/create",
data={
"name": "TESTDEVICE",
"img_url": "TESTID",
"api_key": "TESTKEY",
"notes": "TESTNOTES",
"brightness": "3",
},
follow_redirects=False,
)
assert response.status_code == 302
assert response.headers["location"] == "http://testserver/"
# Get user to find device_id
user = db.get_user(db_connection, "testuser")
assert user
device_id = list(user.devices.keys())[0]
# Push base64 image via call to push
data = """UklGRsYAAABXRUJQVlA4TLkAAAAvP8AHABcw/wKBJH/ZERYIJEHtr/b8B34K3DbbHievrd+SlSqA3btETOGfo881kEXFGJQRa+biGiCi/xPAXywwVqenXXoCj+L90gO4ryqALawrJOwGX1iVsGnVMRX8irHyqbzGagksXy0zsmlldlEbgotNM1Nfaw04UbmahSFTi0pgml3UgIvaNDNA4JMikAFTQ16YXYhDNk1jbiaGoTEgsnO5vqJ1KwpcpWXOiQrUoqbZyc3FIEb5PAA="""
push_data = {"image": data}
# Send the POST request
url = f"/v0/devices/{device_id}/push"
# Assert push fails with bad key
response = auth_client.post(
url,
headers={"Authorization": "badkey", "Content-Type": "application/json"},
json=push_data,
)
assert response.status_code == 401
push_path = Path(db.get_device_webp_dir(device_id)) / "pushed"
assert not push_path.exists()
# Assert push succeeds with good key
response = auth_client.post(
url,
headers={"Authorization": "TESTKEY", "Content-Type": "application/json"},
json=push_data,
)
assert response.status_code == 200
file_list = [
f for f in push_path.iterdir() if f.is_file() and f.name.startswith("__")
]
assert len(file_list) > 0
# Call next and assert the pushed file is deleted
auth_client.get(f"/{device_id}/next")
file_list = [
f for f in push_path.iterdir() if f.is_file() and f.name.startswith("__")
]
assert len(file_list) == 0
# Cleanup
db.delete_device_dirs(device_id)
assert not Path(f"tronbyt_server/webp/{device_id}").is_dir()
class TestMoveApp:
def _setup_device_with_apps(
self,
auth_client: TestClient,
db_connection: sqlite3.Connection,
num_apps: int = 4,
) -> str:
"""Sets up a user, device, and apps for testing."""
response = auth_client.post(
"/create",
data={
"name": "TESTDEVICE",
"img_url": "TESTID",
"api_key": "TESTKEY",
"notes": "TESTNOTES",
"brightness": "3",
},
follow_redirects=False,
)
assert response.status_code == 302
assert response.headers["location"] == "http://testserver/"
user = db.get_user(db_connection, "testuser")
assert user
device_id = list(user.devices.keys())[0]
for i in range(1, num_apps + 1):
response = auth_client.post(
f"/{device_id}/addapp",
data={
"name": "NOAA Tides",
"iname": f"app{i}",
"uinterval": "10",
"display_time": "10",
},
follow_redirects=False,
)
assert response.status_code == 302
return device_id
def _get_sorted_apps_from_db(
self, db_connection: sqlite3.Connection, device_id: str
) -> list[App]:
"""Retrieves and sorts apps by order for a given device from the DB."""
user = utils.get_testuser(db_connection)
apps_dict = user.devices[device_id].apps
apps_list = sorted(apps_dict.values(), key=attrgetter("order"))
return apps_list
def test_move_app_scenarios(
self, auth_client: TestClient, db_connection: sqlite3.Connection
) -> None:
device_id = self._setup_device_with_apps(auth_client, db_connection, 4)
apps = self._get_sorted_apps_from_db(db_connection, device_id)
app1, app2, app3, app4 = (
apps[0].iname,
apps[1].iname,
apps[2].iname,
apps[3].iname,
)
# Move app2 down
auth_client.post(f"/{device_id}/{app2}/moveapp", params={"direction": "down"})
apps = self._get_sorted_apps_from_db(db_connection, device_id)
assert [app.iname for app in apps] == [app1, app3, app2, app4]
for i, app in enumerate(apps):
assert app.order == i
# Move app2 up
auth_client.post(f"/{device_id}/{app2}/moveapp", params={"direction": "up"})
apps = self._get_sorted_apps_from_db(db_connection, device_id)
assert [app.iname for app in apps] == [app1, app2, app3, app4]
for i, app in enumerate(apps):
assert app.order == i
# Move app1 up (should not change order)
auth_client.post(f"/{device_id}/{app1}/moveapp", params={"direction": "up"})
apps = self._get_sorted_apps_from_db(db_connection, device_id)
assert [app.iname for app in apps] == [app1, app2, app3, app4]
# Move app4 down (should not change order)
auth_client.post(f"/{device_id}/{app4}/moveapp", params={"direction": "down"})
apps = self._get_sorted_apps_from_db(db_connection, device_id)
assert [app.iname for app in apps] == [app1, app2, app3, app4]
# Move app1 down twice
auth_client.post(f"/{device_id}/{app1}/moveapp", params={"direction": "down"})
auth_client.post(f"/{device_id}/{app1}/moveapp", params={"direction": "down"})
apps = self._get_sorted_apps_from_db(db_connection, device_id)
assert [app.iname for app in apps] == [app2, app3, app1, app4]
for i, app in enumerate(apps):
assert app.order == i

View File

@@ -1,280 +0,0 @@
import sqlite3
import string
import pytest
from fastapi.testclient import TestClient
from tronbyt_server.models.user import User
import tronbyt_server.db as db
from . import utils
class TestApiKeyGeneration:
"""Test cases for API key generation functionality"""
def test_migrate_user_api_keys_generates_key_for_user_without_key(
self, auth_client: TestClient, db_connection: sqlite3.Connection
) -> None:
"""Test that migrate_user_api_keys generates API key for users without one"""
user = utils.get_testuser(db_connection)
user.api_key = ""
db.save_user(db_connection, user)
user_without_key = db.get_user(db_connection, "testuser")
assert user_without_key is not None
assert not user_without_key.api_key
db.migrate_user_api_keys(db_connection)
migrated_user = db.get_user(db_connection, "testuser")
assert migrated_user is not None
assert migrated_user.api_key
assert len(migrated_user.api_key) == 32
assert all(
c in string.ascii_letters + string.digits for c in migrated_user.api_key
)
def test_migrate_user_api_keys_preserves_existing_key(
self, auth_client: TestClient, db_connection: sqlite3.Connection
) -> None:
"""Test that migrate_user_api_keys doesn't overwrite existing API keys"""
response = auth_client.post(
"/create",
data={
"name": "TESTDEVICE",
"img_url": "TESTID",
"api_key": "TESTKEY",
"notes": "TESTNOTES",
"brightness": "3",
},
follow_redirects=False,
)
assert response.status_code == 302
user = utils.get_testuser(db_connection)
original_api_key = user.api_key
assert original_api_key
db.migrate_user_api_keys(db_connection)
user_after_migration = db.get_user(db_connection, "testuser")
assert user_after_migration is not None
assert user_after_migration.api_key == original_api_key
def test_migrate_user_api_keys_handles_multiple_users(
self, auth_client: TestClient, db_connection: sqlite3.Connection
) -> None:
"""Test that migrate_user_api_keys handles multiple users correctly"""
auth_client.post(
"/auth/register",
data={"username": "user1", "password": "password"},
)
auth_client.post(
"/auth/register",
data={"username": "user2", "password": "password"},
)
user1 = db.get_user(db_connection, "user1")
assert user1 is not None
user1.api_key = ""
db.save_user(db_connection, user1)
user2 = db.get_user(db_connection, "user2")
assert user2 is not None
original_key_user2 = user2.api_key
db.migrate_user_api_keys(db_connection)
user1_after = db.get_user(db_connection, "user1")
assert user1_after is not None
assert user1_after.api_key
assert len(user1_after.api_key) == 32
user2_after = db.get_user(db_connection, "user2")
assert user2_after is not None
assert user2_after.api_key == original_key_user2
def test_migrate_user_api_keys_generates_valid_key(
self, auth_client: TestClient, db_connection: sqlite3.Connection
) -> None:
"""Test that migrate_user_api_keys generates a valid API key"""
user = utils.get_testuser(db_connection)
user.api_key = ""
db.save_user(db_connection, user)
db.migrate_user_api_keys(db_connection)
migrated_user = db.get_user(db_connection, "testuser")
assert migrated_user is not None
assert migrated_user.api_key
assert len(migrated_user.api_key) == 32
assert all(
c in string.ascii_letters + string.digits for c in migrated_user.api_key
)
class TestApiKeyRetrieval:
"""Test cases for API key retrieval functionality"""
def test_get_user_by_api_key_success(
self, auth_client: TestClient, db_connection: sqlite3.Connection
) -> None:
"""Test successful user retrieval by API key"""
user = utils.get_testuser(db_connection)
api_key = user.api_key
retrieved_user = db.get_user_by_api_key(db_connection, api_key)
assert retrieved_user is not None
assert retrieved_user.username == "testuser"
assert retrieved_user.api_key == api_key
def test_get_user_by_api_key_not_found(
self, auth_client: TestClient, db_connection: sqlite3.Connection
) -> None:
"""Test user retrieval with non-existent API key"""
retrieved_user = db.get_user_by_api_key(db_connection, "nonexistent_key")
assert retrieved_user is None
def test_get_user_by_api_key_empty_string(
self, auth_client: TestClient, db_connection: sqlite3.Connection
) -> None:
"""Test user retrieval with empty API key"""
retrieved_user = db.get_user_by_api_key(db_connection, "")
assert retrieved_user is None
def test_get_user_by_api_key_multiple_users(
self, auth_client: TestClient, db_connection: sqlite3.Connection
) -> None:
"""Test user retrieval when multiple users exist"""
auth_client.post(
"/auth/register",
data={"username": "user1", "password": "password"},
)
auth_client.post(
"/auth/register",
data={"username": "user2", "password": "password"},
)
user1 = db.get_user(db_connection, "user1")
user2 = db.get_user(db_connection, "user2")
assert user1 is not None and user2 is not None
api_key1 = user1.api_key
api_key2 = user2.api_key
assert api_key1 != api_key2
retrieved_user1 = db.get_user_by_api_key(db_connection, api_key1)
assert retrieved_user1 is not None
assert retrieved_user1.username == "user1"
retrieved_user2 = db.get_user_by_api_key(db_connection, api_key2)
assert retrieved_user2 is not None
assert retrieved_user2.username == "user2"
def test_get_user_by_api_key_case_sensitive(
self, auth_client: TestClient, db_connection: sqlite3.Connection
) -> None:
"""Test that API key lookup is case sensitive"""
user = utils.get_testuser(db_connection)
api_key = user.api_key
if any(c.islower() for c in api_key):
wrong_case_key = api_key.upper()
else:
wrong_case_key = api_key.lower()
retrieved_user = db.get_user_by_api_key(db_connection, wrong_case_key)
assert retrieved_user is None
retrieved_user = db.get_user_by_api_key(db_connection, api_key)
assert retrieved_user is not None
@pytest.fixture
def api_key_user(auth_client: TestClient, db_connection: sqlite3.Connection) -> User:
"""Fixture to create a user with a device and an API key."""
response = auth_client.post(
"/create",
data={
"name": "TESTDEVICE",
"img_url": "TESTID",
"api_key": "TESTKEY",
"notes": "TESTNOTES",
"brightness": "3",
},
follow_redirects=False,
)
assert response.status_code == 302
return utils.get_testuser(db_connection)
class TestApiKeyIntegration:
"""Integration tests for API key functionality"""
def test_user_registration_creates_api_key(
self, auth_client: TestClient, db_connection: sqlite3.Connection
) -> None:
"""Test that user registration automatically creates an API key"""
auth_client.post(
"/auth/register",
data={"username": "newuser", "password": "password"},
)
user = db.get_user(db_connection, "newuser")
assert user is not None
assert user.api_key
assert len(user.api_key) == 32
def test_api_key_authentication_flow(
self, auth_client: TestClient, db_connection: sqlite3.Connection
) -> None:
"""Test complete API key authentication flow"""
response = auth_client.post(
"/create",
data={
"name": "TESTDEVICE",
"img_url": "TESTID",
"api_key": "TESTKEY",
"notes": "TESTNOTES",
"brightness": "3",
},
follow_redirects=False,
)
assert response.status_code == 302
user = utils.get_testuser(db_connection)
device_id = list(user.devices.keys())[0]
api_key = user.api_key
response = auth_client.get(
f"/v0/devices/{device_id}",
headers={"Authorization": f"Bearer {api_key}"},
)
assert response.status_code == 200
data = response.json()
assert "id" in data
response = auth_client.get(
f"/v0/devices/{device_id}",
headers={"Authorization": "Bearer invalid_key"},
)
assert response.status_code == 401
response = auth_client.get(f"/v0/devices/{device_id}")
assert response.status_code == 401
def test_api_key_uniqueness(
self, auth_client: TestClient, db_connection: sqlite3.Connection
) -> None:
"""Test that generated API keys are unique across users"""
api_keys: list[str] = []
for i in range(5):
username = f"user{i}"
response = auth_client.post(
"/auth/register",
data={"username": username, "password": "password"},
follow_redirects=False,
)
assert response.status_code == 302 or response.status_code == 409
user = db.get_user(db_connection, username)
assert user is not None
api_keys.append(user.api_key)
assert len(api_keys) == len(set(api_keys)), "API keys should be unique"

View File

@@ -1,100 +0,0 @@
import sqlite3
from fastapi.testclient import TestClient
from tests import utils
from tronbyt_server import db
def test_app_create_edit_config_delete(
auth_client: TestClient, db_connection: sqlite3.Connection
) -> None:
response = auth_client.post(
"/create",
data={
"name": "TESTDEVICE",
"img_url": "TESTID",
"api_key": "TESTKEY",
"notes": "TESTNOTES",
"brightness": "3",
},
follow_redirects=False,
)
assert response.status_code == 302
user = utils.get_testuser(db_connection)
device_id = list(user.devices.keys())[0]
r = auth_client.get(f"/{device_id}/addapp")
assert r.status_code == 200
r = auth_client.post(
f"/{device_id}/addapp",
data={
"name": "NOAA Tides",
"uinterval": "69",
"display_time": "10",
"notes": "",
},
follow_redirects=False,
)
assert r.status_code == 302
user = utils.get_testuser(db_connection)
assert len(user.devices[device_id].apps) == 1
device_id = list(user.devices.keys())[0]
r = auth_client.get(f"/{device_id}/addapp")
assert r.status_code == 200
r = auth_client.post(
f"/{device_id}/addapp",
data={
"name": "NOAA Tides",
"uinterval": "69",
"display_time": "10",
"notes": "",
},
)
assert r.status_code == 200
user = utils.get_testuser(db_connection)
app_id = list(user.devices[device_id].apps.keys())[0]
test_app = user.devices[device_id].apps[app_id]
assert test_app.name == "NOAA Tides"
r = auth_client.get(f"/{device_id}/{app_id}/configapp?delete_on_cancel=true")
assert r.status_code == 200
# Retrieve current config to include in submission
user = utils.get_testuser(db_connection)
test_app = user.devices[device_id].apps[app_id]
r = auth_client.post(
f"/{device_id}/{app_id}/configapp",
json={
"iname": app_id,
"name": "NOAA Tides",
"uinterval": 69,
"display_time": 69,
"notes": "69",
"enabled": True,
"start_time": "",
"end_time": "",
"config": test_app.config,
},
follow_redirects=False,
)
assert r.status_code == 302
user = utils.get_testuser(db_connection)
test_app = user.devices[device_id].apps[app_id]
assert test_app.uinterval == 69
assert test_app.display_time == 69
assert test_app.notes == "69"
auth_client.post(f"/{device_id}/{app_id}/delete")
user = utils.get_testuser(db_connection)
assert app_id not in user.devices[device_id].apps
db.delete_device_dirs(device_id)

View File

@@ -1,72 +0,0 @@
import sqlite3
from pathlib import Path
from fastapi.testclient import TestClient
from tests import utils
from tronbyt_server.config import get_settings
def test_device_operations(
auth_client: TestClient, db_connection: sqlite3.Connection
) -> None:
r = auth_client.get("/create")
assert r.status_code == 200
r = auth_client.post(
"/create",
data={
"name": "TESTDEVICE",
"img_url": "TESTID",
"api_key": "TESTKEY",
"notes": "TESTNOTES",
"brightness": "3",
},
follow_redirects=False,
)
assert r.status_code == 302
user = utils.get_testuser(db_connection)
device_id = list(user.devices.keys())[0]
assert user.devices[device_id].name == "TESTDEVICE"
r = auth_client.get(f"/{device_id}/firmware")
assert r.status_code == 200
# Create dummy firmware file for testing
settings = get_settings()
firmware_dir = Path(settings.DATA_DIR) / "firmware"
firmware_dir.mkdir(parents=True, exist_ok=True)
firmware_file = firmware_dir / "tidbyt-gen1.bin"
content = (
b"A" * 1000
+ b"XplaceholderWIFISSID____________\x00"
+ b"XplaceholderWIFIPASSWORD________________________________________\x00"
+ b"XplaceholderREMOTEURL___________________________________________________________________________________________________________\x00"
)
firmware_file.write_bytes(content)
data = {
"id": device_id,
"img_url": f"http://m1Pro.local:8000/{device_id}/next",
"wifi_ap": "Blah",
"wifi_password": "Blah",
}
r = auth_client.post(f"/{device_id}/firmware", data=data)
assert r.status_code == 200
assert r.headers["content-type"] == "application/octet-stream"
assert (
r.headers["content-disposition"]
== f"attachment;filename=firmware_tidbyt_gen1_{device_id}.bin"
)
assert len(r.content) > 0
auth_client.post(
f"/{device_id}/addapp",
data={
"name": "NOAA Tides",
"iname": "noaa-tides",
"uinterval": "10",
"display_time": "10",
},
follow_redirects=False,
)
assert auth_client.get(f"/{device_id}/next").status_code == 200

View File

@@ -1,548 +0,0 @@
import sqlite3
import pytest
from fastapi.testclient import TestClient
from tronbyt_server import db
from tronbyt_server.models import App, DeviceType, Brightness
from tronbyt_server.models.user import User
from tests import utils
@pytest.fixture
def device_user(auth_client: TestClient, db_connection: sqlite3.Connection) -> User:
"""Fixture to create a user with a device."""
response = auth_client.post(
"/create",
data={
"name": "TESTDEVICE",
"img_url": "TESTID",
"api_key": "TESTKEY",
"notes": "TESTNOTES",
"brightness": "3",
},
)
assert response.status_code == 200
return utils.get_testuser(db_connection)
def _add_app_to_device(
db_connection: sqlite3.Connection,
user: User,
device_id: str,
*,
iname: str = "test-app",
name: str = "Test App",
enabled: bool = True,
uinterval: int = 10,
display_time: int = 30,
pushed: bool = False,
last_render: int = 1234,
empty_last_render: bool = False,
) -> App:
device = user.devices[device_id]
app = App(
iname=iname,
name=name,
enabled=enabled,
uinterval=uinterval,
display_time=display_time,
pushed=pushed,
last_render=last_render,
empty_last_render=empty_last_render,
)
device.apps[iname] = app
user.devices[device_id] = device
db.save_user(db_connection, user)
return app
class TestDevicesEndpoint:
"""Test cases for the /v0/devices endpoint"""
def test_list_devices_success(
self, auth_client: TestClient, device_user: User
) -> None:
"""Test successful listing of devices"""
api_key = device_user.api_key
response = auth_client.get(
"/v0/devices", headers={"Authorization": f"Bearer {api_key}"}
)
assert response.status_code == 200
data = response.json()
assert "devices" in data
assert len(data["devices"]) == 1
assert data["devices"][0]["displayName"] == "TESTDEVICE"
def test_list_devices_with_direct_auth_header(
self, auth_client: TestClient, device_user: User
) -> None:
"""Test listing devices with direct Authorization header (no Bearer prefix)"""
api_key = device_user.api_key
response = auth_client.get("/v0/devices", headers={"Authorization": api_key})
assert response.status_code == 200
data = response.json()
assert "devices" in data
assert len(data["devices"]) == 1
def test_list_devices_missing_auth_header(self, client: TestClient) -> None:
"""Test listing devices without Authorization header"""
response = client.get("/v0/devices")
assert response.status_code == 401
assert "Invalid credentials" in response.text
def test_list_devices_invalid_api_key(self, auth_client: TestClient) -> None:
"""Test listing devices with invalid API key"""
response = auth_client.get(
"/v0/devices", headers={"Authorization": "Bearer invalid_key"}
)
assert response.status_code == 401
assert "Invalid credentials" in response.text
def test_list_devices_empty_devices(
self, auth_client: TestClient, db_connection: sqlite3.Connection
) -> None:
"""Test listing devices when user has no devices"""
user = utils.get_testuser(db_connection)
api_key = user.api_key
response = auth_client.get(
"/v0/devices", headers={"Authorization": f"Bearer {api_key}"}
)
assert response.status_code == 200
data = response.json()
assert "devices" in data
assert len(data["devices"]) == 0
class TestDeviceEndpoint:
"""Test cases for the /v0/devices/<device_id> endpoint"""
def test_get_device_with_user_api_key(
self, auth_client: TestClient, device_user: User
) -> None:
"""Test successful retrieval of device info"""
device_id = list(device_user.devices.keys())[0]
api_key = device_user.api_key
response = auth_client.get(
f"/v0/devices/{device_id}",
headers={"Authorization": f"Bearer {api_key}"},
)
assert response.status_code == 200
data = response.json()
assert data["id"] == device_id
assert data["displayName"] == "TESTDEVICE"
def test_get_device_with_device_api_key(
self, auth_client: TestClient, device_user: User
) -> None:
"""Test device retrieval using device-specific API key"""
device_id = list(device_user.devices.keys())[0]
device_api_key = device_user.devices[device_id].api_key
response = auth_client.get(
f"/v0/devices/{device_id}",
headers={"Authorization": f"Bearer {device_api_key}"},
)
assert response.status_code == 200
data = response.json()
assert data["id"] == device_id
def test_get_device_not_found(
self,
auth_client: TestClient,
device_user: User,
db_connection: sqlite3.Connection,
) -> None:
"""Test device retrieval for non-existent device"""
# Create a second user to get a different API key
response = auth_client.post(
"/auth/register",
data={"username": "testuser2", "password": "password"},
follow_redirects=False,
)
assert response.status_code == 302
user2 = utils.get_user_by_username(db_connection, "testuser2")
assert user2 is not None
device_id = list(device_user.devices.keys())[0]
response = auth_client.get(
f"/v0/devices/{device_id}",
headers={"Authorization": f"Bearer {user2.api_key}"},
)
assert response.status_code == 404
def test_get_device_invalid_id_format(
self, auth_client: TestClient, device_user: User
) -> None:
"""Test device retrieval with invalid ID format"""
api_key = device_user.api_key
response = auth_client.get(
"/v0/devices/invalid-id",
headers={"Authorization": f"Bearer {api_key}"},
)
assert response.status_code == 422
def test_get_device_unauthorized_api_key(
self, auth_client: TestClient, device_user: User
) -> None:
"""Test device retrieval with wrong API key"""
device_id = list(device_user.devices.keys())[0]
response = auth_client.get(
f"/v0/devices/{device_id}",
headers={"Authorization": "Bearer wrong_key"},
)
assert response.status_code == 401
def test_patch_device_with_user_api_key(
self, auth_client: TestClient, device_user: User
) -> None:
"""Test device update with user API key"""
device_id = list(device_user.devices.keys())[0]
api_key = device_user.api_key
response = auth_client.patch(
f"/v0/devices/{device_id}",
headers={"Authorization": f"Bearer {api_key}"},
json={"brightness": 50, "autoDim": True},
)
assert response.status_code == 200
data = response.json()
assert data["brightness"] == 50
assert data["autoDim"] is True
def test_patch_device_with_device_api_key(
self, auth_client: TestClient, device_user: User
) -> None:
"""Test device update with device-specific API key"""
device_id = list(device_user.devices.keys())[0]
device_api_key = device_user.devices[device_id].api_key
response = auth_client.patch(
f"/v0/devices/{device_id}",
headers={"Authorization": f"Bearer {device_api_key}"},
json={"brightness": 64},
)
assert response.status_code == 200
data = response.json()
assert data["brightness"] == 64
def test_patch_device_invalid_brightness(
self, auth_client: TestClient, device_user: User
) -> None:
"""Test device update with invalid brightness value"""
device_id = list(device_user.devices.keys())[0]
api_key = device_user.api_key
response = auth_client.patch(
f"/v0/devices/{device_id}",
headers={"Authorization": f"Bearer {api_key}"},
json={"brightness": 300},
)
assert response.status_code == 400
def test_patch_device_missing_auth(
self, auth_client: TestClient, device_user: User
) -> None:
"""Test device update without authorization"""
device_id = list(device_user.devices.keys())[0]
response = auth_client.patch(
f"/v0/devices/{device_id}", json={"brightness": 100}
)
assert response.status_code == 401
def test_get_device_returns_extended_fields(
self,
auth_client: TestClient,
device_user: User,
db_connection: sqlite3.Connection,
) -> None:
"""Ensure device payload exposes new metadata fields."""
device_id = list(device_user.devices.keys())[0]
device = device_user.devices[device_id]
app = _add_app_to_device(
db_connection,
device_user,
device_id,
iname="night-app",
name="Night App",
pushed=True,
empty_last_render=True,
)
device.type = DeviceType.TRONBYT_S3
device.notes = "Extra info"
device.default_interval = 42
device.night_mode_enabled = True
device.night_mode_app = app.iname
device.night_start = "21:30"
device.night_end = "06:00"
device.night_brightness = Brightness(64)
device.dim_time = "05:15"
device.dim_brightness = Brightness(22)
device.pinned_app = app.iname
device_user.devices[device_id] = device
db.save_user(db_connection, device_user)
response = auth_client.get(
f"/v0/devices/{device_id}",
headers={"Authorization": f"Bearer {device_user.api_key}"},
)
assert response.status_code == 200
payload = response.json()
assert payload["type"] == "tronbyt_s3"
assert payload["notes"] == "Extra info"
assert payload["intervalSec"] == 42
assert payload["pinnedApp"] == app.iname
assert payload["nightMode"]["enabled"] is True
assert payload["nightMode"]["app"] == app.iname
assert payload["nightMode"]["startTime"] == "21:30"
assert payload["nightMode"]["endTime"] == "06:00"
assert payload["nightMode"]["brightness"] == 64
assert payload["dimMode"]["startTime"] == "05:15"
assert payload["dimMode"]["brightness"] == 22
assert payload["autoDim"] is True
def test_patch_device_updates_extended_fields(
self,
auth_client: TestClient,
device_user: User,
db_connection: sqlite3.Connection,
) -> None:
"""Ensure device updates support new fields and value validation."""
device_id = list(device_user.devices.keys())[0]
app = _add_app_to_device(
db_connection, device_user, device_id, iname="evening", name="Evening App"
)
response = auth_client.patch(
f"/v0/devices/{device_id}",
headers={"Authorization": f"Bearer {device_user.api_key}"},
json={
"intervalSec": 15,
"nightModeEnabled": True,
"nightModeApp": app.iname,
"nightModeBrightness": 50,
"nightModeStartTime": "2130",
"nightModeEndTime": "6:45",
"dimModeStartTime": "930",
"dimModeBrightness": 30,
"pinnedApp": app.iname,
},
)
assert response.status_code == 200
payload = response.json()
assert payload["intervalSec"] == 15
assert payload["nightMode"]["enabled"] is True
assert payload["nightMode"]["app"] == app.iname
assert payload["nightMode"]["startTime"] == "21:30"
assert payload["nightMode"]["endTime"] == "06:45"
assert payload["nightMode"]["brightness"] == 50
assert payload["dimMode"]["startTime"] == "09:30"
assert payload["dimMode"]["brightness"] == 30
assert payload["pinnedApp"] == app.iname
def test_patch_device_rejects_unknown_pinned_app(
self,
auth_client: TestClient,
device_user: User,
) -> None:
"""Ensure pinnedApp validation rejects unknown installations."""
device_id = list(device_user.devices.keys())[0]
response = auth_client.patch(
f"/v0/devices/{device_id}",
headers={"Authorization": f"Bearer {device_user.api_key}"},
json={"pinnedApp": "missing-app"},
)
assert response.status_code == 400
assert "Pinned app not found" in response.text
def test_patch_device_clears_dim_mode(
self,
auth_client: TestClient,
device_user: User,
db_connection: sqlite3.Connection,
) -> None:
"""Empty dimModeStartTime should clear stored dim time."""
device_id = list(device_user.devices.keys())[0]
device = device_user.devices[device_id]
device.dim_time = "04:00"
device.dim_brightness = Brightness(40)
device_user.devices[device_id] = device
db.save_user(db_connection, device_user)
response = auth_client.patch(
f"/v0/devices/{device_id}",
headers={"Authorization": f"Bearer {device_user.api_key}"},
json={"dimModeStartTime": ""},
)
assert response.status_code == 200
payload = response.json()
assert payload["dimMode"]["startTime"] is None
class TestDeviceInstallationsEndpoint:
"""Test cases for device installations endpoints."""
def test_list_installations_returns_extended_fields(
self,
auth_client: TestClient,
device_user: User,
db_connection: sqlite3.Connection,
) -> None:
"""Ensure installation payload includes new metadata fields."""
device_id = list(device_user.devices.keys())[0]
app = _add_app_to_device(
db_connection,
device_user,
device_id,
iname="install-1",
name="Sample App",
enabled=False,
pushed=True,
uinterval=7,
display_time=12,
last_render=999,
empty_last_render=True,
)
device = device_user.devices[device_id]
device.pinned_app = app.iname
device_user.devices[device_id] = device
db.save_user(db_connection, device_user)
response = auth_client.get(
f"/v0/devices/{device_id}/installations",
headers={"Authorization": f"Bearer {device_user.api_key}"},
)
assert response.status_code == 200
payload = response.json()
assert "installations" in payload
assert len(payload["installations"]) == 1
installation = payload["installations"][0]
assert installation["id"] == app.iname
assert installation["appID"] == "Sample App"
assert installation["enabled"] is False
assert installation["pinned"] is True
assert installation["pushed"] is True
assert installation["renderIntervalMin"] == 7
assert installation["displayTimeSec"] == 12
assert installation["lastRenderAt"] == 999
assert installation["isInactive"] is True
def test_get_installation_returns_payload(
self,
auth_client: TestClient,
device_user: User,
db_connection: sqlite3.Connection,
) -> None:
"""Ensure single installation lookup exposes new fields."""
device_id = list(device_user.devices.keys())[0]
app = _add_app_to_device(
db_connection,
device_user,
device_id,
iname="install-2",
name="Lookup App",
uinterval=3,
display_time=9,
)
device = device_user.devices[device_id]
device.pinned_app = app.iname
device_user.devices[device_id] = device
db.save_user(db_connection, device_user)
response = auth_client.get(
f"/v0/devices/{device_id}/installations/{app.iname}",
headers={"Authorization": f"Bearer {device_user.api_key}"},
)
assert response.status_code == 200
payload = response.json()
assert payload["id"] == app.iname
assert payload["appID"] == "Lookup App"
assert payload["enabled"] is True
assert payload["pinned"] is True
assert payload["renderIntervalMin"] == 3
assert payload["displayTimeSec"] == 9
def test_patch_installation_updates_fields(
self,
auth_client: TestClient,
device_user: User,
db_connection: sqlite3.Connection,
) -> None:
"""Ensure installation updates support new fields and return payload."""
device_id = list(device_user.devices.keys())[0]
app = _add_app_to_device(
db_connection,
device_user,
device_id,
iname="install-3",
name="Update App",
)
response = auth_client.patch(
f"/v0/devices/{device_id}/installations/{app.iname}",
headers={"Authorization": f"Bearer {device_user.api_key}"},
json={
"enabled": False,
"pinned": True,
"renderIntervalMin": 5,
"displayTimeSec": 20,
},
)
assert response.status_code == 200
payload = response.json()
assert payload["enabled"] is False
assert payload["pinned"] is True
assert payload["renderIntervalMin"] == 5
assert payload["displayTimeSec"] == 20
# Confirm persistence by fetching installation again
refreshed = auth_client.get(
f"/v0/devices/{device_id}/installations/{app.iname}",
headers={"Authorization": f"Bearer {device_user.api_key}"},
)
assert refreshed.status_code == 200
refreshed_payload = refreshed.json()
assert refreshed_payload["pinned"] is True
assert refreshed_payload["renderIntervalMin"] == 5
def test_patch_installation_rejects_negative_values(
self,
auth_client: TestClient,
device_user: User,
db_connection: sqlite3.Connection,
) -> None:
"""Ensure validation rejects negative interval/time values."""
device_id = list(device_user.devices.keys())[0]
response_interval = auth_client.patch(
f"/v0/devices/{device_id}/installations/unknown",
headers={"Authorization": f"Bearer {device_user.api_key}"},
json={"renderIntervalMin": -1},
)
assert response_interval.status_code == 404
app = _add_app_to_device(
db_connection,
device_user,
device_id,
iname="install-4",
name="Validation App",
)
response_negative = auth_client.patch(
f"/v0/devices/{device_id}/installations/{app.iname}",
headers={"Authorization": f"Bearer {device_user.api_key}"},
json={"renderIntervalMin": -5},
)
assert response_negative.status_code == 400
assert "Render interval" in response_negative.text
response_display = auth_client.patch(
f"/v0/devices/{device_id}/installations/{app.iname}",
headers={"Authorization": f"Bearer {device_user.api_key}"},
json={"displayTimeSec": -3},
)
assert response_display.status_code == 400
assert "Display time" in response_display.text

View File

@@ -1,33 +0,0 @@
from fastapi.testclient import TestClient
def test_dots_svg_defaults(client: TestClient) -> None:
response = client.get("/v0/dots")
assert response.status_code == 200
assert response.headers["content-type"] == "image/svg+xml"
body = response.text
assert 'width="64"' in body
assert 'height="32"' in body
assert body.count("<circle") == 64 * 32
assert '<circle cx="0.5" cy="0.5" r="0.3"/>' in body
def test_dots_svg_custom_parameters(client: TestClient) -> None:
response = client.get("/v0/dots?w=2&h=1&r=0.75")
assert response.status_code == 200
body = response.text
assert 'width="2"' in body
assert 'height="1"' in body
assert body.count("<circle") == 2
assert '<circle cx="0.5" cy="0.5" r="0.75"/>' in body
assert '<circle cx="1.5" cy="0.5" r="0.75"/>' in body
def test_dots_svg_rejects_invalid_dimension(client: TestClient) -> None:
response = client.get("/v0/dots?w=0")
assert response.status_code == 400

View File

@@ -1,76 +0,0 @@
from io import BytesIO
import os
import sqlite3
from fastapi.testclient import TestClient
from tronbyt_server import db
from tronbyt_server.config import get_settings
settings = get_settings()
def test_upload_and_delete(auth_client: TestClient) -> None:
files = {"file": ("report.star", BytesIO(b"my file contents"))}
auth_client.get("/create")
response = auth_client.post(
"/create",
data={
"name": "TESTDEVICE",
"img_url": "TESTID",
"api_key": "TESTKEY",
"notes": "TESTNOTES",
"brightness": "3",
},
follow_redirects=False,
)
assert response.status_code == 302
with db.get_db() as db_conn:
user = db.get_user(db_conn, "testuser")
assert user
device_id = list(user.devices.keys())[0]
response = auth_client.post(
f"/{device_id}/uploadapp", files=files, follow_redirects=False
)
assert response.status_code == 302
assert response.headers["location"] == f"/{device_id}/addapp"
response = auth_client.get(
f"/{device_id}/deleteupload/report.star", follow_redirects=False
)
assert response.status_code == 302
assert response.headers["location"] == f"/{device_id}/addapp"
def test_upload_bad_extension(
auth_client: TestClient, db_connection: sqlite3.Connection
) -> None:
files = {"file": ("report.exe", BytesIO(b"my file contents"))}
auth_client.get("/create")
response = auth_client.post(
"/create",
data={
"name": "TESTDEVICE",
"img_url": "TESTID",
"api_key": "TESTKEY",
"notes": "TESTNOTES",
"brightness": "3",
},
follow_redirects=False,
)
assert response.status_code == 302
user = db.get_user(db_connection, "testuser")
assert user
device_id = list(user.devices.keys())[0]
response = auth_client.post(
f"/{device_id}/uploadapp", files=files, follow_redirects=False
)
assert response.status_code == 400
assert "File type not allowed" in response.text
user_apps_path = os.path.join(settings.USERS_DIR, user.username, "apps")
assert not os.path.exists(os.path.join(user_apps_path, "report.exe"))

View File

@@ -1,68 +0,0 @@
from fastapi.testclient import TestClient
from tronbyt_server import db
def test_register_login_logout(auth_client: TestClient) -> None:
with db.get_db() as db_conn:
db.delete_user(db_conn, "testuser")
response = auth_client.get("/auth/register")
assert response.status_code == 200
response = auth_client.post(
"/auth/register",
data={"username": "testuser", "password": "password"},
)
assert response.status_code == 200
response = auth_client.get("/auth/login")
assert response.status_code == 200
response = auth_client.post(
"/auth/login",
data={"username": "testuser", "password": "password"},
follow_redirects=False,
)
assert response.status_code == 302
assert response.headers["location"] == "http://testserver/"
response = auth_client.get("/auth/logout", follow_redirects=False)
assert response.status_code == 302
assert response.headers["location"] == "http://testserver/auth/login"
def test_login_with_wrong_password(client: TestClient) -> None:
# Create owner
response = client.post(
"/auth/register_owner",
data={"password": "password"},
follow_redirects=False,
)
assert response.status_code == 302
# Register testuser
response = client.post(
"/auth/register",
data={"username": "testuser", "password": "password"},
follow_redirects=False,
)
assert response.status_code in [302, 409]
# Login as testuser with bad password
response = client.post(
"/auth/login",
data={"username": "testuser", "password": "BADDPASSWORD"},
follow_redirects=False,
)
assert response.status_code == 401
assert "Incorrect username/password." in response.text
def test_unauth_index_with_users(client: TestClient) -> None:
client.post("/auth/register_owner", data={"password": "adminpassword"})
response = client.get("/", follow_redirects=False)
assert response.status_code in [302, 307]
assert response.headers["location"].endswith("/auth/login")
def test_unauth_index_no_users(client: TestClient) -> None:
response = client.get("/", follow_redirects=False)
assert response.status_code in [302, 307]
assert response.headers["location"].endswith("/auth/register_owner")

View File

@@ -1,521 +0,0 @@
import datetime
from zoneinfo import ZoneInfo
from tronbyt_server import db
from tronbyt_server.models.app import App, RecurrencePattern, RecurrenceType, Weekday
def test_get_is_app_schedule_active() -> None:
assert db.get_is_app_schedule_active_at_time(
App(
name="testing",
iname="testing",
id="testing",
path="/testing",
start_time=datetime.time(18, 0),
end_time=datetime.time(22, 0),
),
datetime.datetime(
year=2025,
month=1,
day=1,
hour=19,
minute=0,
second=0,
microsecond=0,
tzinfo=ZoneInfo("America/New_York"),
),
)
assert db.get_is_app_schedule_active_at_time(
App(
name="testing",
iname="testing",
id="testing",
path="/testing",
start_time=datetime.time(18, 0),
end_time=datetime.time(22, 0),
),
datetime.datetime(
year=2025,
month=1,
day=1,
hour=19,
minute=0,
second=0,
),
)
assert not db.get_is_app_schedule_active_at_time(
App(
name="testing",
iname="testing",
id="testing",
path="/testing",
start_time=datetime.time(18, 0),
end_time=datetime.time(22, 0),
),
datetime.datetime(
year=2025,
month=1,
day=1,
hour=12,
minute=0,
),
)
assert db.get_is_app_schedule_active_at_time(
App(
name="testing",
iname="testing",
id="testing",
path="/testing",
start_time=datetime.time(22, 0),
end_time=datetime.time(6, 0),
),
datetime.datetime(
year=2025,
month=1,
day=1,
hour=23,
minute=10,
),
)
assert not db.get_is_app_schedule_active_at_time(
App(
name="testing",
iname="testing",
id="testing",
path="/testing",
start_time=datetime.time(22, 0),
end_time=datetime.time(6, 0),
),
datetime.datetime(
year=2025,
month=1,
day=1,
hour=8,
minute=10,
),
)
assert db.get_is_app_schedule_active_at_time(
App(
name="testing",
iname="testing",
id="testing",
path="/testing",
start_time=datetime.time(22, 0),
end_time=datetime.time(6, 0),
),
datetime.datetime(
year=2025,
month=1,
day=1,
hour=23,
minute=59,
second=10,
microsecond=1000,
),
)
assert db.get_is_app_schedule_active_at_time(
App(name="testing", iname="testing", id="testing", path="/testing"),
datetime.datetime(year=2025, month=1, day=1, hour=10, minute=0, second=10),
)
assert db.get_is_app_schedule_active_at_time(
App(
name="testing",
iname="testing",
id="testing",
path="/testing",
start_time=datetime.time(18, 0),
end_time=datetime.time(22, 0),
days=[Weekday.WEDNESDAY],
),
datetime.datetime(
year=2025,
month=1,
day=1,
hour=19,
minute=0,
second=0,
),
)
assert not db.get_is_app_schedule_active_at_time(
App(
name="testing",
iname="testing",
id="testing",
path="/testing",
start_time=datetime.time(18, 0),
end_time=datetime.time(22, 0),
days=[Weekday.MONDAY, Weekday.TUESDAY],
),
datetime.datetime(
year=2025,
month=1,
day=1,
hour=19,
minute=0,
second=0,
),
)
def test_daily_recurrence() -> None:
"""Test daily recurrence patterns."""
# Every day
app = App(
name="daily_test",
iname="daily_test",
id="daily_test",
path="/daily_test",
use_custom_recurrence=True,
recurrence_type=RecurrenceType.DAILY,
recurrence_interval=1,
recurrence_start_date=datetime.date(2025, 1, 1),
)
# Should be active every day
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 1, 12, 0)
)
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 2, 12, 0)
)
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 15, 12, 0)
)
# Every 3 days
app.recurrence_interval = 3
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 1, 12, 0)
) # Start date
assert not db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 2, 12, 0)
) # +1 day
assert not db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 3, 12, 0)
) # +2 days
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 4, 12, 0)
) # +3 days
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 7, 12, 0)
) # +6 days
def test_weekly_recurrence() -> None:
"""Test weekly recurrence patterns."""
# Every week on Monday and Wednesday
app = App(
name="weekly_test",
iname="weekly_test",
id="weekly_test",
path="/weekly_test",
use_custom_recurrence=True,
recurrence_type=RecurrenceType.WEEKLY,
recurrence_interval=1,
recurrence_start_date=datetime.date(2025, 1, 6), # Monday, Jan 6, 2025
recurrence_pattern=RecurrencePattern(
weekdays=[Weekday.MONDAY, Weekday.WEDNESDAY]
),
)
# Test first week
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 6, 12, 0)
) # Monday
assert not db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 7, 12, 0)
) # Tuesday
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 8, 12, 0)
) # Wednesday
assert not db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 9, 12, 0)
) # Thursday
# Test next week
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 13, 12, 0)
) # Monday
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 15, 12, 0)
) # Wednesday
# Every 2 weeks (bi-weekly) on Monday
app.recurrence_interval = 2
app.recurrence_pattern = RecurrencePattern(weekdays=[Weekday.MONDAY])
# Week 1: Should be active
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 6, 12, 0)
) # Monday week 1
# Week 2: Should NOT be active (skip week)
assert not db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 13, 12, 0)
) # Monday week 2
# Week 3: Should be active again
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 20, 12, 0)
) # Monday week 3
def test_monthly_recurrence_day_of_month() -> None:
"""Test monthly recurrence on specific day of month."""
# 1st of every month
app = App(
name="monthly_test",
iname="monthly_test",
id="monthly_test",
path="/monthly_test",
use_custom_recurrence=True,
recurrence_type=RecurrenceType.MONTHLY,
recurrence_interval=1,
recurrence_start_date=datetime.date(2025, 1, 1),
recurrence_pattern=RecurrencePattern(day_of_month=1),
)
# Should be active on 1st of each month
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 1, 12, 0)
)
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 2, 1, 12, 0)
)
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 3, 1, 12, 0)
)
# Should NOT be active on other days
assert not db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 2, 12, 0)
)
assert not db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 15, 12, 0)
)
# 15th of every 2 months
app.recurrence_interval = 2
app.recurrence_pattern = RecurrencePattern(day_of_month=15)
app.recurrence_start_date = datetime.date(2025, 1, 15)
# Should be active on 15th of Jan, Mar, May, etc.
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 15, 12, 0)
) # Month 0
assert not db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 2, 15, 12, 0)
) # Month 1 (skip)
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 3, 15, 12, 0)
) # Month 2
def test_monthly_recurrence_day_of_week() -> None:
"""Test monthly recurrence on specific day of week pattern."""
# First Monday of every month
app = App(
name="monthly_dow_test",
iname="monthly_dow_test",
id="monthly_dow_test",
path="/monthly_dow_test",
use_custom_recurrence=True,
recurrence_type=RecurrenceType.MONTHLY,
recurrence_interval=1,
recurrence_start_date=datetime.date(2025, 1, 1),
recurrence_pattern=RecurrencePattern(day_of_week="first_monday"),
)
# January 2025: First Monday is Jan 6
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 6, 12, 0)
)
assert not db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 13, 12, 0)
) # Second Monday
# February 2025: First Monday is Feb 3
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 2, 3, 12, 0)
)
# Last Friday of every month
app.recurrence_pattern = RecurrencePattern(day_of_week="last_friday")
# January 2025: Last Friday is Jan 31
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 31, 12, 0)
)
assert not db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 24, 12, 0)
) # Not the last Friday
def test_yearly_recurrence() -> None:
"""Test yearly recurrence patterns."""
# Every year on the same date
app = App(
name="yearly_test",
iname="yearly_test",
id="yearly_test",
path="/yearly_test",
use_custom_recurrence=True,
recurrence_type=RecurrenceType.YEARLY,
recurrence_interval=1,
recurrence_start_date=datetime.date(2025, 9, 22), # Today's date
)
# Should be active on Sep 22 of each year
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 9, 22, 12, 0)
)
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2026, 9, 22, 12, 0)
)
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2027, 9, 22, 12, 0)
)
# Should NOT be active on other dates
assert not db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 9, 21, 12, 0)
)
assert not db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 9, 23, 12, 0)
)
assert not db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 10, 22, 12, 0)
)
# Every 2 years
app.recurrence_interval = 2
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 9, 22, 12, 0)
) # Year 0
assert not db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2026, 9, 22, 12, 0)
) # Year 1 (skip)
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2027, 9, 22, 12, 0)
) # Year 2
def test_recurrence_with_end_date() -> None:
"""Test recurrence patterns with end dates."""
app = App(
name="end_date_test",
iname="end_date_test",
id="end_date_test",
path="/end_date_test",
use_custom_recurrence=True,
recurrence_type=RecurrenceType.DAILY,
recurrence_interval=1,
recurrence_start_date=datetime.date(2025, 1, 1),
recurrence_end_date=datetime.date(2025, 1, 5),
)
# Should be active within the date range
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 1, 12, 0)
)
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 3, 12, 0)
)
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 5, 12, 0)
)
# Should NOT be active after end date
assert not db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 6, 12, 0)
)
assert not db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 10, 12, 0)
)
def test_recurrence_with_time_range() -> None:
"""Test recurrence patterns combined with time ranges."""
app = App(
name="time_range_test",
iname="time_range_test",
id="time_range_test",
path="/time_range_test",
use_custom_recurrence=True,
start_time=datetime.time(9, 0),
end_time=datetime.time(17, 0),
recurrence_type=RecurrenceType.WEEKLY,
recurrence_interval=1,
recurrence_start_date=datetime.date(2025, 1, 6), # Monday
recurrence_pattern=RecurrencePattern(
weekdays=[Weekday.MONDAY, Weekday.WEDNESDAY, Weekday.FRIDAY]
),
)
# Should be active on weekdays within time range
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 6, 10, 0)
) # Monday 10:00
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 8, 14, 0)
) # Wednesday 14:00
# Should NOT be active on weekdays outside time range
assert not db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 6, 8, 0)
) # Monday 8:00
assert not db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 6, 18, 0)
) # Monday 18:00
# Should NOT be active on non-matching days
assert not db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 7, 10, 0)
) # Tuesday 10:00
def test_legacy_days_compatibility() -> None:
"""Test that legacy days field still works for backward compatibility."""
app = App(
name="legacy_test",
iname="legacy_test",
id="legacy_test",
path="/legacy_test",
start_time=datetime.time(10, 0),
end_time=datetime.time(16, 0),
days=[Weekday.MONDAY, Weekday.WEDNESDAY, Weekday.FRIDAY],
# No recurrence_type field - should fall back to legacy behavior
)
# Should work with legacy days field (Jan 1, 2025 is a Wednesday)
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 1, 12, 0)
) # Wednesday
assert not db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 2, 12, 0)
) # Thursday
def test_edge_cases() -> None:
"""Test edge cases and error conditions."""
# Test before start date
app = App(
name="before_start_test",
iname="before_start_test",
id="before_start_test",
path="/before_start_test",
use_custom_recurrence=True,
recurrence_type=RecurrenceType.DAILY,
recurrence_interval=1,
recurrence_start_date=datetime.date(2025, 1, 10),
)
# Should not be active before start date
assert not db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 5, 12, 0)
)
assert db.get_is_app_schedule_active_at_time(
app, datetime.datetime(2025, 1, 10, 12, 0)
)

View File

@@ -1,47 +0,0 @@
from fastapi.testclient import TestClient
from tronbyt_server.config import get_settings
settings = get_settings()
def test_registration_disabled(auth_client: TestClient) -> None:
settings.ENABLE_USER_REGISTRATION = "0"
response = auth_client.get("/auth/register")
assert response.status_code == 200
def test_registration_enabled(auth_client: TestClient) -> None:
settings.ENABLE_USER_REGISTRATION = "1"
response = auth_client.get("/auth/register")
assert response.status_code == 200
assert "Register" in response.text
response = auth_client.post(
"/auth/register",
data={"username": "newuser", "password": "password123"},
)
assert response.status_code == 200
def test_max_users_limit_with_open_registration(auth_client: TestClient) -> None:
settings.MAX_USERS = 2
response = auth_client.post(
"/auth/register",
data={"username": "user1", "password": "password123"},
follow_redirects=True,
)
assert response.status_code == 200
response = auth_client.post(
"/auth/register",
data={"username": "user2", "password": "password123"},
follow_redirects=True,
)
assert response.status_code == 200
response = auth_client.post(
"/auth/register",
data={"username": "user3", "password": "password123"},
follow_redirects=True,
)
assert response.status_code == 200
assert "Maximum number of users reached" in response.text

View File

@@ -1,78 +0,0 @@
from unittest.mock import patch, MagicMock
import requests
from tronbyt_server import version
from tronbyt_server.version import VersionInfo
def test_check_for_updates_dev_version() -> None:
"""Test checking for updates with a dev version."""
v_info = VersionInfo(version="dev")
assert version.check_for_updates(v_info) == (False, None)
@patch("tronbyt_server.version.requests.get")
def test_check_for_updates_newer_available(mock_get: MagicMock) -> None:
"""Test checking for updates when a newer version is available."""
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"tag_name": "v1.0.1",
"html_url": "http://example.com/v1.0.1",
}
mock_get.return_value = mock_response
v_info = VersionInfo(version="1.0.0", tag="v1.0.0")
update_available, url = version.check_for_updates(v_info)
assert update_available is True
assert url == "http://example.com/v1.0.1"
@patch("tronbyt_server.version.requests.get")
def test_check_for_updates_no_update(mock_get: MagicMock) -> None:
"""Test checking for updates when no newer version is available."""
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"tag_name": "v1.0.0",
"html_url": "http://example.com/v1.0.0",
}
mock_get.return_value = mock_response
v_info = VersionInfo(version="1.0.0", tag="v1.0.0")
update_available, url = version.check_for_updates(v_info)
assert update_available is False
assert url is None
@patch("tronbyt_server.version.requests.get")
def test_check_for_updates_older_remote(mock_get: MagicMock) -> None:
"""Test checking for updates when remote version is older (should not happen usually but testing logic)."""
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"tag_name": "v0.9.9",
"html_url": "http://example.com/v0.9.9",
}
mock_get.return_value = mock_response
v_info = VersionInfo(version="1.0.0", tag="v1.0.0")
update_available, url = version.check_for_updates(v_info)
assert update_available is False
assert url is None
@patch("tronbyt_server.version.requests.get")
def test_check_for_updates_error(mock_get: MagicMock) -> None:
"""Test checking for updates when an error occurs."""
mock_get.side_effect = requests.exceptions.RequestException("Network error")
v_info = VersionInfo(version="1.0.0", tag="v1.0.0")
update_available, url = version.check_for_updates(v_info)
assert update_available is False
assert url is None
def test_check_for_updates_no_tag() -> None:
"""Test checking for updates when no tag is present."""
v_info = VersionInfo(version="main-1234567", tag=None)
assert version.check_for_updates(v_info) == (False, None)

View File

@@ -1,80 +0,0 @@
import shutil
from io import BytesIO
from pathlib import Path
from fastapi.testclient import TestClient
from tronbyt_server import db
from tronbyt_server.utils import possibly_render
def test_webp_upload_and_app_creation(auth_client: TestClient) -> None:
# 1. Create a device
response = auth_client.post(
"/create",
data={
"name": "TESTDEVICE_WEBP",
"img_url": "TESTID",
"api_key": "TESTKEY",
"notes": "TESTNOTES",
"brightness": "3",
},
follow_redirects=False,
)
assert response.status_code == 302
with db.get_db() as db_conn:
user = db.get_user(db_conn, "testuser")
assert user
device_id = list(user.devices.keys())[0]
device = user.devices[device_id]
# 2. Upload a .webp file
webp_content = (
b"RIFF\x0c\x00\x00\x00WEBPVP8 \x02\x00\x00\x00\x9d\x01*" # Minimal valid WebP
)
files = {"file": ("test.webp", BytesIO(webp_content), "image/webp")}
response = auth_client.post(
f"/{device_id}/uploadapp", files=files, follow_redirects=False
)
assert response.status_code == 302
assert response.headers["location"] == f"/{device_id}/addapp"
# Check that the preview has been created
preview_path = db.get_data_dir() / "apps" / "test.webp"
assert preview_path.exists()
assert preview_path.read_bytes() == webp_content
# 3. Add the uploaded .webp app to the device
response = auth_client.post(
f"/{device_id}/addapp",
data={"name": "test"},
follow_redirects=False,
)
assert response.status_code == 302
assert response.headers["location"] == "http://testserver/"
# 4. Check that the app is added and file is copied
with db.get_db() as db_conn:
user = db.get_user(db_conn, "testuser")
assert user
device = user.devices[device_id]
app = next((app for app in device.apps.values() if app.name == "test"), None)
assert app is not None
assert app.enabled is True
assert Path(str(app.path)).suffix == ".webp"
device_webp_path = (
db.get_device_webp_dir(device_id) / f"{app.name}-{app.iname}.webp"
)
assert device_webp_path.exists()
assert device_webp_path.read_bytes() == webp_content
# 5. Check that possibly_render works correctly
assert possibly_render(db_conn, user, device_id, app) is True
# Cleanup
preview_path.unlink()
user_apps_path = db.get_users_dir() / user.username / "apps" / "test"
if user_apps_path.exists():
shutil.rmtree(user_apps_path)

View File

@@ -1,157 +0,0 @@
import sqlite3
import pytest
from fastapi.testclient import TestClient
from starlette.websockets import WebSocketDisconnect
from tronbyt_server.models.user import User
from tests import utils
from datetime import datetime
from typing import Any
@pytest.fixture
def device_user_ws(auth_client: TestClient, db_connection: sqlite3.Connection) -> User:
"""Fixture to create a user with a device for websocket tests."""
response = auth_client.post(
"/create",
data={
"name": "TESTDEVICE_WS",
"img_url": "TESTID",
"api_key": "TESTKEY",
"notes": "TESTNOTES",
"brightness": "3",
},
follow_redirects=True,
)
assert response.status_code == 200
return utils.get_testuser(db_connection)
def test_websocket_invalid_device_id_format(auth_client: TestClient) -> None:
"""Test websocket connection with an invalid device ID format."""
with pytest.raises(WebSocketDisconnect) as excinfo:
with auth_client.websocket_connect("/invalid-id/ws"):
pass
assert excinfo.value.code == 1008
def test_websocket_nonexistent_device_id(auth_client: TestClient) -> None:
"""Test websocket connection with a non-existent but valid device ID."""
with pytest.raises(WebSocketDisconnect) as excinfo:
with auth_client.websocket_connect("/12345678/ws"):
pass
assert excinfo.value.code == 1008
def test_websocket_success_connection_and_data(
auth_client: TestClient, device_user_ws: User
) -> None:
"""Test successful websocket connection and receiving data."""
device_id = list(device_user_ws.devices.keys())[0]
with auth_client.websocket_connect(f"/{device_id}/ws") as websocket:
# It should send dwell time and brightness first
data = websocket.receive_json()
assert "dwell_secs" in data
assert isinstance(data["dwell_secs"], int)
data = websocket.receive_json()
assert "brightness" in data
assert isinstance(data["brightness"], int)
# Then it should send the default image or an error message
message = websocket.receive()
if "bytes" in message:
image_data = message["bytes"]
assert image_data is not None
assert len(image_data) > 0
elif "text" in message:
json_data = message["text"]
assert "status" in json_data
assert "message" in json_data
def test_websocket_client_messages(
auth_client: TestClient, device_user_ws: User, db_connection: sqlite3.Connection
) -> None:
"""Test that the server correctly handles client messages."""
device_id = list(device_user_ws.devices.keys())[0]
with auth_client.websocket_connect(f"/{device_id}/ws") as websocket:
# It should send dwell time and brightness first
_ = websocket.receive_json()
_ = websocket.receive_json()
_ = websocket.receive()
# The client can send "queued"
websocket.send_json({"queued": 1})
# After sending "queued", the protocol_version should be updated if it was None
def get_protocol_version() -> int | None:
device = utils.get_device_by_id(db_connection, device_id)
return device.info.protocol_version if device else None
utils.poll_for_change(get_protocol_version, 1)
device = utils.get_device_by_id(db_connection, device_id)
assert device is not None
assert device.info.protocol_version == 1
# The client can send "displaying"
websocket.send_json({"displaying": 1})
# The client can send client_info
client_info: dict[str, Any] = {
"client_info": {
"firmware_version": "1.25.0",
"firmware_type": "ESP32",
"protocol_version": 1,
"mac": "xx:xx:xx:xx:xx:xx",
}
}
websocket.send_json(client_info)
def check_full_client_info_update() -> bool:
device = utils.get_device_by_id(db_connection, device_id)
if not device:
return False
return (
device.last_seen is not None
and device.info.firmware_version == "1.25.0"
and device.info.firmware_type == "ESP32"
and device.info.protocol_version == 1
and device.info.mac_address == "xx:xx:xx:xx:xx:xx"
)
utils.poll_for_change(check_full_client_info_update, True)
device = utils.get_device_by_id(db_connection, device_id)
assert device is not None
assert isinstance(device.last_seen, datetime)
assert device.info.firmware_version == "1.25.0"
assert device.info.firmware_type == "ESP32"
assert device.info.protocol_version == 1
assert device.info.mac_address == "xx:xx:xx:xx:xx:xx"
# The client can send partial client_info
partial_client_info = {"client_info": {"firmware_version": "1.26.0"}}
websocket.send_json(partial_client_info)
def check_partial_client_info_update() -> bool:
device = utils.get_device_by_id(db_connection, device_id)
if not device:
return False
return (
device.info.firmware_version == "1.26.0"
and device.info.firmware_type == "ESP32"
and device.info.protocol_version == 1
and device.info.mac_address == "xx:xx:xx:xx:xx:xx"
)
utils.poll_for_change(check_partial_client_info_update, True)
device = utils.get_device_by_id(db_connection, device_id)
assert device is not None
assert device.info.firmware_version == "1.26.0"
assert device.info.firmware_type == "ESP32"
assert device.info.protocol_version == 1
assert device.info.mac_address == "xx:xx:xx:xx:xx:xx"

View File

@@ -1,57 +0,0 @@
import sqlite3
import time
from typing import Any, Callable, TypeVar
from tronbyt_server import db
from tronbyt_server.models.device import Device
from tronbyt_server.models.user import User
T = TypeVar("T")
def get_testuser(conn: sqlite3.Connection) -> User:
user = db.get_user(conn, "testuser")
if not user:
raise Exception("testuser not found")
return user
def get_user_by_username(conn: sqlite3.Connection, username: str) -> User | None:
return db.get_user(conn, username)
def get_device_by_id(conn: sqlite3.Connection, device_id: str) -> Device | None:
return db.get_device_by_id(conn, device_id)
def poll_for_change(
func: Callable[[], T],
expected_value: Any,
timeout: float = 5.0,
interval: float = 0.1,
) -> T:
"""
Poll a function until its return value matches the expected value or a timeout is reached.
Args:
func: The function to poll.
expected_value: The expected return value of the function.
timeout: The maximum time to wait in seconds.
interval: The time to wait between polls in seconds.
Returns:
The final return value of the function.
Raises:
TimeoutError: If the timeout is reached before the condition is met.
"""
start_time = time.monotonic()
result = None
while time.monotonic() - start_time < timeout:
result = func()
if result == expected_value:
return result
time.sleep(interval)
raise TimeoutError(
f"Timeout reached while polling for value '{expected_value}'. Last value was '{result}'."
)