497 lines
18 KiB
Python
497 lines
18 KiB
Python
import io
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import tempfile
|
|
import unittest
|
|
from contextlib import redirect_stdout
|
|
|
|
from main import DtParam, FanCurve
|
|
|
|
|
|
def _write_config(path: str, content: str) -> None:
|
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
f.write(content)
|
|
|
|
class _Cwd:
|
|
"""Small helper to run code with a temporary working directory."""
|
|
|
|
def __init__(self, path: str) -> None:
|
|
self._path = path
|
|
self._old = ""
|
|
|
|
def __enter__(self):
|
|
self._old = os.getcwd()
|
|
os.chdir(self._path)
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb):
|
|
os.chdir(self._old)
|
|
return False
|
|
|
|
|
|
class TestDtParamAsserts(unittest.TestCase):
|
|
def test_temp_must_be_in_sensible_range(self) -> None:
|
|
with self.assertRaises(AssertionError):
|
|
DtParam(temp=0, hyst=2000, speed=90, n=0)
|
|
with self.assertRaises(AssertionError):
|
|
DtParam(temp=100_000, hyst=2000, speed=90, n=0)
|
|
|
|
def test_hyst_must_be_in_sensible_range(self) -> None:
|
|
with self.assertRaises(AssertionError):
|
|
DtParam(temp=55_000, hyst=0, speed=90, n=0)
|
|
with self.assertRaises(AssertionError):
|
|
DtParam(temp=55_000, hyst=100_000, speed=90, n=0)
|
|
|
|
def test_speed_must_be_in_range(self) -> None:
|
|
with self.assertRaises(AssertionError):
|
|
DtParam(temp=55_000, hyst=2000, speed=-1, n=0)
|
|
with self.assertRaises(AssertionError):
|
|
DtParam(temp=55_000, hyst=2000, speed=256, n=0)
|
|
|
|
def test_threshold_index_must_be_supported(self) -> None:
|
|
with self.assertRaises(AssertionError):
|
|
DtParam(temp=55_000, hyst=2000, speed=90, n=-1)
|
|
with self.assertRaises(AssertionError):
|
|
DtParam(temp=55_000, hyst=2000, speed=90, n=4)
|
|
|
|
|
|
class TestFanCurveAsserts(unittest.TestCase):
|
|
def setUp(self) -> None:
|
|
self.tmpdir = tempfile.TemporaryDirectory()
|
|
self.fc = FanCurve.__new__(FanCurve)
|
|
self.fc.config_path = os.path.join(self.tmpdir.name, "config.txt")
|
|
_write_config(
|
|
self.fc.config_path,
|
|
"\n".join(
|
|
[
|
|
"# test config",
|
|
"[all]",
|
|
"dtoverlay=dwc2,dr_mode=host",
|
|
"",
|
|
]
|
|
)
|
|
+ "\n",
|
|
)
|
|
|
|
def tearDown(self) -> None:
|
|
self.tmpdir.cleanup()
|
|
|
|
def test_assert_valid_cfg_allows_1_to_4_thresholds(self) -> None:
|
|
self.fc.settings = [DtParam(temp=55_000, hyst=2000, speed=90, n=0)]
|
|
self.fc._assert_valid_cfg()
|
|
|
|
self.fc.settings = [
|
|
DtParam(temp=55_000, hyst=2000, speed=90, n=0),
|
|
DtParam(temp=65_000, hyst=3000, speed=160, n=1),
|
|
DtParam(temp=70_000, hyst=4000, speed=200, n=2),
|
|
DtParam(temp=72_000, hyst=5000, speed=255, n=3),
|
|
]
|
|
self.fc._assert_valid_cfg()
|
|
|
|
def test_assert_valid_cfg_rejects_0_or_more_than_4_thresholds(self) -> None:
|
|
self.fc.settings = []
|
|
with self.assertRaises(AssertionError):
|
|
self.fc._assert_valid_cfg()
|
|
|
|
self.fc.settings = [
|
|
DtParam(temp=51_000, hyst=2000, speed=1, n=0),
|
|
DtParam(temp=52_000, hyst=2000, speed=2, n=1),
|
|
DtParam(temp=53_000, hyst=2000, speed=3, n=2),
|
|
DtParam(temp=54_000, hyst=2000, speed=4, n=3),
|
|
DtParam(temp=55_000, hyst=2000, speed=5, n=0),
|
|
]
|
|
with self.assertRaises(AssertionError):
|
|
self.fc._assert_valid_cfg()
|
|
|
|
def test_assert_valid_cfg_requires_temp_order_matches_threshold_number_order(self) -> None:
|
|
a = DtParam(temp=55_000, hyst=2000, speed=90, n=3)
|
|
b = DtParam(temp=65_000, hyst=3000, speed=160, n=2)
|
|
c = DtParam(temp=70_000, hyst=4000, speed=200, n=1)
|
|
d = DtParam(temp=72_000, hyst=5000, speed=255, n=0)
|
|
self.fc.settings = [a, b, c, d]
|
|
with self.assertRaises(AssertionError):
|
|
self.fc._assert_valid_cfg()
|
|
|
|
def test_assert_valid_cfg_fails_when_temps_not_increasing_by_threshold_number(self) -> None:
|
|
a = DtParam(temp=65_000, hyst=2000, speed=90, n=0)
|
|
b = DtParam(temp=55_000, hyst=3000, speed=160, n=1)
|
|
c = DtParam(temp=70_000, hyst=4000, speed=200, n=2)
|
|
d = DtParam(temp=72_000, hyst=5000, speed=255, n=3)
|
|
self.fc.settings = [a, b, c, d]
|
|
with self.assertRaises(AssertionError):
|
|
self.fc._assert_valid_cfg()
|
|
|
|
|
|
class TestFanCurveWriteBehavior(unittest.TestCase):
|
|
def setUp(self) -> None:
|
|
self.tmpdir = tempfile.TemporaryDirectory()
|
|
self.fc = FanCurve.__new__(FanCurve)
|
|
self.fc.config_path = os.path.join(self.tmpdir.name, "config.txt")
|
|
_write_config(
|
|
self.fc.config_path,
|
|
"\n".join(
|
|
[
|
|
"# test config",
|
|
"[all]",
|
|
"dtoverlay=dwc2,dr_mode=host",
|
|
"",
|
|
]
|
|
)
|
|
+ "\n",
|
|
)
|
|
|
|
def tearDown(self) -> None:
|
|
self.tmpdir.cleanup()
|
|
|
|
def test_update_writes_fan_curve_and_is_quiet_when_unchanged(self) -> None:
|
|
# First run should write and typically print a diff.
|
|
buf1 = io.StringIO()
|
|
with redirect_stdout(buf1):
|
|
self.fc.update(self.fc._get_default_settings())
|
|
with open(self.fc.config_path, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
self.assertIn("# Auto-Generated Fan Curve parameters by PiFanUI\n", content)
|
|
self.assertIn("dtparam=fan_temp0=", content)
|
|
self.assertIn("dtparam=fan_temp3=", content)
|
|
|
|
# Second run should be a no-op and print nothing.
|
|
buf2 = io.StringIO()
|
|
with redirect_stdout(buf2):
|
|
self.fc.update(self.fc._get_default_settings())
|
|
# A no-op run must not print a diff.
|
|
self.assertNotIn("Updated values:", buf2.getvalue())
|
|
|
|
def test_update_rewrites_when_settings_change(self) -> None:
|
|
# Write defaults.
|
|
with redirect_stdout(io.StringIO()):
|
|
self.fc.update(self.fc._get_default_settings())
|
|
with open(self.fc.config_path, "r", encoding="utf-8") as f:
|
|
before = f.read()
|
|
|
|
# Change one setting (speed) and update again.
|
|
changed = self.fc._get_default_settings()
|
|
changed[0] = DtParam(temp=55_000, hyst=2000, speed=91, n=0)
|
|
buf = io.StringIO()
|
|
with redirect_stdout(buf):
|
|
self.fc.update(changed)
|
|
with open(self.fc.config_path, "r", encoding="utf-8") as f:
|
|
after = f.read()
|
|
|
|
self.assertNotEqual(before, after)
|
|
self.assertIn("Updated values:", buf.getvalue())
|
|
self.assertIn("fan_temp0_speed=91", after)
|
|
|
|
def test_update_renumbers_thresholds_by_temperature(self) -> None:
|
|
# Provide thresholds out of temp order and with incorrect N values.
|
|
s0 = DtParam(temp=72_000, hyst=5000, speed=255, n=2)
|
|
s1 = DtParam(temp=55_000, hyst=2000, speed=90, n=3)
|
|
s2 = DtParam(temp=70_000, hyst=4000, speed=200, n=0)
|
|
s3 = DtParam(temp=65_000, hyst=3000, speed=160, n=1)
|
|
settings = [s0, s1, s2, s3]
|
|
|
|
with redirect_stdout(io.StringIO()):
|
|
self.fc.update(settings)
|
|
|
|
with open(self.fc.config_path, "r", encoding="utf-8") as f:
|
|
dtparam_lines = [line.rstrip("\n") for line in f if line.startswith("dtparam=fan_temp")]
|
|
|
|
# Extract (N, temp) from "dtparam=fan_tempN=temp,..."
|
|
pairs: list[tuple[int, int]] = []
|
|
for line in dtparam_lines:
|
|
# Note: line contains two '=' characters: "dtparam=fan_tempN=temp,..."
|
|
tail = line[len("dtparam=fan_temp") :] # "N=temp,..."
|
|
n_str, rest = tail.split("=", 1) # "N", "temp,..."
|
|
n = int(n_str)
|
|
temp = int(rest.split(",", 1)[0])
|
|
pairs.append((n, temp))
|
|
|
|
self.assertEqual(sorted(n for n, _ in pairs), [0, 1, 2, 3])
|
|
# N order must correspond to increasing temps.
|
|
temps_by_n = [temp for _, temp in sorted(pairs, key=lambda p: p[0])]
|
|
self.assertEqual(temps_by_n, sorted(temps_by_n))
|
|
|
|
# Also assert the passed-in objects were renumbered in-place by update().
|
|
self.assertEqual(sorted(s.N for s in settings), [0, 1, 2, 3])
|
|
self.assertEqual([s.N for s in sorted(settings, key=lambda s: s.temp)], [0, 1, 2, 3])
|
|
|
|
def test_written_dtparam_lines_match_dtparam_str(self) -> None:
|
|
settings = self.fc._get_default_settings()
|
|
# Pre-seed the config with stale/duplicate fan_temp lines to ensure update deletes them.
|
|
_write_config(
|
|
self.fc.config_path,
|
|
"\n".join(
|
|
[
|
|
"# test config",
|
|
"[all]",
|
|
"# Auto-Generated Fan Curve parameters by PiFanUI",
|
|
"dtparam=fan_temp0=1,fan_temp0_hyst=1,fan_temp0_speed=1",
|
|
"dtparam=fan_temp1=2,fan_temp1_hyst=2,fan_temp1_speed=2",
|
|
"dtparam=fan_temp2=3,fan_temp2_hyst=3,fan_temp2_speed=3",
|
|
"dtparam=fan_temp3=4,fan_temp3_hyst=4,fan_temp3_speed=4",
|
|
# duplicates that should be removed
|
|
"dtparam=fan_temp0=999,fan_temp0_hyst=999,fan_temp0_speed=999",
|
|
"dtparam=fan_temp3=999,fan_temp3_hyst=999,fan_temp3_speed=999",
|
|
"",
|
|
]
|
|
)
|
|
+ "\n",
|
|
)
|
|
with redirect_stdout(io.StringIO()):
|
|
self.fc.update(settings)
|
|
|
|
with open(self.fc.config_path, "r", encoding="utf-8") as f:
|
|
all_lines = list(f)
|
|
|
|
# Ensure deletion worked: the only remaining fan_temp dtparam lines are the 4 expected ones.
|
|
self.assertEqual(sum(1 for line in all_lines if "dtparam=fan_temp" in line), 4)
|
|
written_lines = [line.rstrip("\n") for line in all_lines if line.startswith("dtparam=fan_temp")]
|
|
|
|
expected_lines = [str(s) for s in settings]
|
|
self.assertEqual(written_lines, expected_lines)
|
|
|
|
def test_update_preserves_unrelated_config_lines(self) -> None:
|
|
original = "\n".join(
|
|
[
|
|
"# test config",
|
|
"dtparam=audio=on",
|
|
"",
|
|
"[pi5]",
|
|
"dtoverlay=foo",
|
|
"",
|
|
"[all]",
|
|
"dtoverlay=dwc2,dr_mode=host",
|
|
"dtparam=i2c_arm=on",
|
|
"",
|
|
"[extra]",
|
|
"somekey=somevalue",
|
|
"",
|
|
]
|
|
) + "\n"
|
|
_write_config(self.fc.config_path, original)
|
|
|
|
with redirect_stdout(io.StringIO()):
|
|
self.fc.update(self.fc._get_default_settings())
|
|
|
|
with open(self.fc.config_path, "r", encoding="utf-8") as f:
|
|
after_lines = f.readlines()
|
|
|
|
# Strip out the auto-generated fan curve block; remaining content should match exactly.
|
|
after_stripped = [
|
|
line
|
|
for line in after_lines
|
|
if not (line.startswith("dtparam=fan_temp") or line.startswith("# Auto-Generated Fan Curve"))
|
|
]
|
|
self.assertEqual(after_stripped, original.splitlines(keepends=True))
|
|
|
|
|
|
class TestFanCurveDryRunPatchEquivalence(unittest.TestCase):
|
|
def test_dry_run_patch_applies_to_same_result_as_write(self) -> None:
|
|
if shutil.which("patch") is None:
|
|
self.skipTest("system 'patch' command not available")
|
|
|
|
base_cfg = "\n".join(
|
|
[
|
|
"# test config",
|
|
"[all]",
|
|
"dtoverlay=dwc2,dr_mode=host",
|
|
"",
|
|
]
|
|
) + "\n"
|
|
|
|
tmpcfg = tempfile.TemporaryDirectory()
|
|
tmpwork = tempfile.TemporaryDirectory()
|
|
try:
|
|
cfg_a = os.path.join(tmpcfg.name, "a.txt")
|
|
cfg_b = os.path.join(tmpcfg.name, "b.txt")
|
|
_write_config(cfg_a, base_cfg)
|
|
_write_config(cfg_b, base_cfg)
|
|
|
|
# Prepare two FanCurve instances pointing at the two files.
|
|
fc_a = FanCurve.__new__(FanCurve)
|
|
fc_a.config_path = cfg_a
|
|
fc_a.settings = fc_a._get_default_settings()
|
|
fc_a._assert_valid_cfg()
|
|
|
|
fc_b = FanCurve.__new__(FanCurve)
|
|
fc_b.config_path = cfg_b
|
|
fc_b.settings = fc_b._get_default_settings()
|
|
fc_b._assert_valid_cfg()
|
|
|
|
# Non-dry-run: directly writes config.
|
|
with redirect_stdout(io.StringIO()):
|
|
fc_a._write_cfg(dry_run=False)
|
|
|
|
# Dry-run: generates patch in CWD. Apply patch to cfg_b and compare to cfg_a.
|
|
with _Cwd(tmpwork.name):
|
|
with redirect_stdout(io.StringIO()):
|
|
fc_b._write_cfg(dry_run=True)
|
|
patch_path = os.path.join(tmpwork.name, "PIFanTUI.patch")
|
|
self.assertTrue(os.path.exists(patch_path), "dry-run did not generate PIFanTUI.patch")
|
|
|
|
# Patch output includes absolute paths, which GNU patch treats as unsafe.
|
|
# Provide the file-to-patch explicitly to avoid filename resolution issues.
|
|
subprocess.run(
|
|
["patch", "--batch", cfg_b, "-i", patch_path],
|
|
check=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
)
|
|
|
|
with open(cfg_a, "r", encoding="utf-8") as f:
|
|
a = f.read()
|
|
with open(cfg_b, "r", encoding="utf-8") as f:
|
|
b = f.read()
|
|
|
|
self.assertEqual(a, b)
|
|
finally:
|
|
tmpcfg.cleanup()
|
|
tmpwork.cleanup()
|
|
|
|
|
|
class TestFanCurveGetCurrentSettings(unittest.TestCase):
|
|
def setUp(self) -> None:
|
|
self.tmpdir = tempfile.TemporaryDirectory()
|
|
self.fc = FanCurve.__new__(FanCurve)
|
|
self.fc.config_path = os.path.join(self.tmpdir.name, "config.txt")
|
|
|
|
def tearDown(self) -> None:
|
|
self.tmpdir.cleanup()
|
|
|
|
def _read_settings_as_tuples(self) -> list[tuple[int, int, int, int]]:
|
|
settings = self.fc._get_current_settings()
|
|
return [(s.N, s.temp, s.hyst, s.speed) for s in settings]
|
|
|
|
def test_get_current_settings_parses_separate_lines_per_param(self) -> None:
|
|
_write_config(
|
|
self.fc.config_path,
|
|
"\n".join(
|
|
[
|
|
"[all]",
|
|
"dtparam=fan_temp0=65000",
|
|
"dtparam=fan_temp0_hyst=5000",
|
|
"dtparam=fan_temp0_speed=75",
|
|
"",
|
|
"dtparam=fan_temp1=70000",
|
|
"dtparam=fan_temp1_hyst=5000",
|
|
"dtparam=fan_temp1_speed=128",
|
|
"",
|
|
"dtparam=fan_temp2=75000",
|
|
"dtparam=fan_temp2_hyst=5000",
|
|
"dtparam=fan_temp2_speed=192",
|
|
"",
|
|
]
|
|
)
|
|
+ "\n",
|
|
)
|
|
self.assertEqual(
|
|
self._read_settings_as_tuples(),
|
|
[
|
|
(0, 65000, 5000, 75),
|
|
(1, 70000, 5000, 128),
|
|
(2, 75000, 5000, 192),
|
|
],
|
|
)
|
|
|
|
def test_get_current_settings_parses_comma_separated_dtparam_lines(self) -> None:
|
|
_write_config(
|
|
self.fc.config_path,
|
|
"\n".join(
|
|
[
|
|
"[all]",
|
|
"dtparam=fan_temp0=54000,fan_temp0_hyst=2000,fan_temp0_speed=90",
|
|
"dtparam=fan_temp1=65000,fan_temp1_hyst=3000,fan_temp1_speed=160",
|
|
"dtparam=fan_temp2=70000,fan_temp2_hyst=4000,fan_temp2_speed=200",
|
|
"dtparam=fan_temp3=72000,fan_temp3_hyst=5000,fan_temp3_speed=255",
|
|
"",
|
|
]
|
|
)
|
|
+ "\n",
|
|
)
|
|
self.assertEqual(
|
|
self._read_settings_as_tuples(),
|
|
[
|
|
(0, 54000, 2000, 90),
|
|
(1, 65000, 3000, 160),
|
|
(2, 70000, 4000, 200),
|
|
(3, 72000, 5000, 255),
|
|
],
|
|
)
|
|
|
|
def test_get_current_settings_parses_three_thresholds_set(self) -> None:
|
|
_write_config(
|
|
self.fc.config_path,
|
|
"\n".join(
|
|
[
|
|
"[all]",
|
|
"dtparam=fan_temp0=54000,fan_temp0_hyst=2000,fan_temp0_speed=90",
|
|
"dtparam=fan_temp1=65000,fan_temp1_hyst=3000,fan_temp1_speed=160",
|
|
"dtparam=fan_temp2=70000,fan_temp2_hyst=4000,fan_temp2_speed=200",
|
|
"",
|
|
]
|
|
)
|
|
+ "\n",
|
|
)
|
|
self.assertEqual(
|
|
self._read_settings_as_tuples(),
|
|
[
|
|
(0, 54000, 2000, 90),
|
|
(1, 65000, 3000, 160),
|
|
(2, 70000, 4000, 200),
|
|
],
|
|
)
|
|
|
|
def test_get_current_settings_rejects_partially_specified_threshold(self) -> None:
|
|
# fan_temp0 is missing hyst and speed -> must fail rather than silently mis-parsing.
|
|
_write_config(
|
|
self.fc.config_path,
|
|
"\n".join(
|
|
[
|
|
"[all]",
|
|
"dtparam=fan_temp0=65000",
|
|
"",
|
|
]
|
|
)
|
|
+ "\n",
|
|
)
|
|
with self.assertRaises(AssertionError):
|
|
self.fc._get_current_settings()
|
|
|
|
def test_get_current_settings_parses_mixed_styles(self) -> None:
|
|
_write_config(
|
|
self.fc.config_path,
|
|
"\n".join(
|
|
[
|
|
"[all]",
|
|
# fan_temp0: all in one line
|
|
"dtparam=fan_temp0=54000,fan_temp0_hyst=2000,fan_temp0_speed=90",
|
|
# fan_temp1: split across 3 lines
|
|
"dtparam=fan_temp1=65000",
|
|
"dtparam=fan_temp1_hyst=3000",
|
|
"dtparam=fan_temp1_speed=160",
|
|
# fan_temp2: split (temp+speed) then hyst
|
|
"dtparam=fan_temp2=70000,fan_temp2_speed=200",
|
|
"dtparam=fan_temp2_hyst=4000",
|
|
# fan_temp3: all in one line
|
|
"dtparam=fan_temp3=72000,fan_temp3_hyst=5000,fan_temp3_speed=255",
|
|
"",
|
|
]
|
|
)
|
|
+ "\n",
|
|
)
|
|
self.assertEqual(
|
|
self._read_settings_as_tuples(),
|
|
[
|
|
(0, 54000, 2000, 90),
|
|
(1, 65000, 3000, 160),
|
|
(2, 70000, 4000, 200),
|
|
(3, 72000, 5000, 255),
|
|
],
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main(verbosity=2)
|