import io import os import tempfile import unittest from contextlib import redirect_stdout from main import DtParam, FanCurve def _write_config(path: str, content: str) -> None: os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w", encoding="utf-8") as f: f.write(content) class TestDtParamAsserts(unittest.TestCase): def test_temp_must_be_in_sensible_range(self) -> None: with self.assertRaises(AssertionError): DtParam(temp=0, hyst=2000, speed=90, n=0) with self.assertRaises(AssertionError): DtParam(temp=100_000, hyst=2000, speed=90, n=0) def test_hyst_must_be_in_sensible_range(self) -> None: with self.assertRaises(AssertionError): DtParam(temp=55_000, hyst=0, speed=90, n=0) with self.assertRaises(AssertionError): DtParam(temp=55_000, hyst=100_000, speed=90, n=0) def test_speed_must_be_in_range(self) -> None: with self.assertRaises(AssertionError): DtParam(temp=55_000, hyst=2000, speed=-1, n=0) with self.assertRaises(AssertionError): DtParam(temp=55_000, hyst=2000, speed=256, n=0) def test_threshold_index_must_be_supported(self) -> None: with self.assertRaises(AssertionError): DtParam(temp=55_000, hyst=2000, speed=90, n=-1) with self.assertRaises(AssertionError): DtParam(temp=55_000, hyst=2000, speed=90, n=4) class TestFanCurveAsserts(unittest.TestCase): def setUp(self) -> None: self.tmpdir = tempfile.TemporaryDirectory() self.fc = FanCurve.__new__(FanCurve) self.fc.config_path = os.path.join(self.tmpdir.name, "config.txt") _write_config( self.fc.config_path, "\n".join( [ "# test config", "[all]", "dtoverlay=dwc2,dr_mode=host", "", ] ) + "\n", ) def tearDown(self) -> None: self.tmpdir.cleanup() def test_assert_valid_cfg_allows_1_to_4_thresholds(self) -> None: self.fc.settings = [DtParam(temp=55_000, hyst=2000, speed=90, n=0)] self.fc._assert_valid_cfg() self.fc.settings = [ DtParam(temp=55_000, hyst=2000, speed=90, n=0), DtParam(temp=65_000, hyst=3000, speed=160, n=1), DtParam(temp=70_000, hyst=4000, speed=200, n=2), DtParam(temp=72_000, hyst=5000, speed=255, n=3), ] self.fc._assert_valid_cfg() def test_assert_valid_cfg_rejects_0_or_more_than_4_thresholds(self) -> None: self.fc.settings = [] with self.assertRaises(AssertionError): self.fc._assert_valid_cfg() self.fc.settings = [ DtParam(temp=51_000, hyst=2000, speed=1, n=0), DtParam(temp=52_000, hyst=2000, speed=2, n=1), DtParam(temp=53_000, hyst=2000, speed=3, n=2), DtParam(temp=54_000, hyst=2000, speed=4, n=3), DtParam(temp=55_000, hyst=2000, speed=5, n=0), ] with self.assertRaises(AssertionError): self.fc._assert_valid_cfg() def test_assert_valid_cfg_requires_temp_order_matches_threshold_number_order(self) -> None: a = DtParam(temp=55_000, hyst=2000, speed=90, n=3) b = DtParam(temp=65_000, hyst=3000, speed=160, n=2) c = DtParam(temp=70_000, hyst=4000, speed=200, n=1) d = DtParam(temp=72_000, hyst=5000, speed=255, n=0) self.fc.settings = [a, b, c, d] with self.assertRaises(AssertionError): self.fc._assert_valid_cfg() def test_assert_valid_cfg_fails_when_temps_not_increasing_by_threshold_number(self) -> None: a = DtParam(temp=65_000, hyst=2000, speed=90, n=0) b = DtParam(temp=55_000, hyst=3000, speed=160, n=1) c = DtParam(temp=70_000, hyst=4000, speed=200, n=2) d = DtParam(temp=72_000, hyst=5000, speed=255, n=3) self.fc.settings = [a, b, c, d] with self.assertRaises(AssertionError): self.fc._assert_valid_cfg() class TestFanCurveWriteBehavior(unittest.TestCase): def setUp(self) -> None: self.tmpdir = tempfile.TemporaryDirectory() self.fc = FanCurve.__new__(FanCurve) self.fc.config_path = os.path.join(self.tmpdir.name, "config.txt") _write_config( self.fc.config_path, "\n".join( [ "# test config", "[all]", "dtoverlay=dwc2,dr_mode=host", "", ] ) + "\n", ) def tearDown(self) -> None: self.tmpdir.cleanup() def test_update_writes_fan_curve_and_is_quiet_when_unchanged(self) -> None: # First run should write and typically print a diff. buf1 = io.StringIO() with redirect_stdout(buf1): self.fc.update(self.fc._get_default_settings()) with open(self.fc.config_path, "r", encoding="utf-8") as f: content = f.read() self.assertIn("# Auto-Generated Fan Curve parameters by PiFanUI\n", content) self.assertIn("dtparam=fan_temp0=", content) self.assertIn("dtparam=fan_temp3=", content) # Second run should be a no-op and print nothing. buf2 = io.StringIO() with redirect_stdout(buf2): self.fc.update(self.fc._get_default_settings()) # A no-op run must not print a diff. self.assertNotIn("Updated values:", buf2.getvalue()) def test_update_rewrites_when_settings_change(self) -> None: # Write defaults. with redirect_stdout(io.StringIO()): self.fc.update(self.fc._get_default_settings()) with open(self.fc.config_path, "r", encoding="utf-8") as f: before = f.read() # Change one setting (speed) and update again. changed = self.fc._get_default_settings() changed[0] = DtParam(temp=55_000, hyst=2000, speed=91, n=0) buf = io.StringIO() with redirect_stdout(buf): self.fc.update(changed) with open(self.fc.config_path, "r", encoding="utf-8") as f: after = f.read() self.assertNotEqual(before, after) self.assertIn("Updated values:", buf.getvalue()) self.assertIn("fan_temp0_speed=91", after) def test_update_renumbers_thresholds_by_temperature(self) -> None: # Provide thresholds out of temp order and with incorrect N values. s0 = DtParam(temp=72_000, hyst=5000, speed=255, n=2) s1 = DtParam(temp=55_000, hyst=2000, speed=90, n=3) s2 = DtParam(temp=70_000, hyst=4000, speed=200, n=0) s3 = DtParam(temp=65_000, hyst=3000, speed=160, n=1) settings = [s0, s1, s2, s3] with redirect_stdout(io.StringIO()): self.fc.update(settings) with open(self.fc.config_path, "r", encoding="utf-8") as f: dtparam_lines = [line.rstrip("\n") for line in f if line.startswith("dtparam=fan_temp")] # Extract (N, temp) from "dtparam=fan_tempN=temp,..." pairs: list[tuple[int, int]] = [] for line in dtparam_lines: # Note: line contains two '=' characters: "dtparam=fan_tempN=temp,..." tail = line[len("dtparam=fan_temp") :] # "N=temp,..." n_str, rest = tail.split("=", 1) # "N", "temp,..." n = int(n_str) temp = int(rest.split(",", 1)[0]) pairs.append((n, temp)) self.assertEqual(sorted(n for n, _ in pairs), [0, 1, 2, 3]) # N order must correspond to increasing temps. temps_by_n = [temp for _, temp in sorted(pairs, key=lambda p: p[0])] self.assertEqual(temps_by_n, sorted(temps_by_n)) # Also assert the passed-in objects were renumbered in-place by update(). self.assertEqual(sorted(s.N for s in settings), [0, 1, 2, 3]) self.assertEqual([s.N for s in sorted(settings, key=lambda s: s.temp)], [0, 1, 2, 3]) def test_written_dtparam_lines_match_dtparam_str(self) -> None: settings = self.fc._get_default_settings() # Pre-seed the config with stale/duplicate fan_temp lines to ensure update deletes them. _write_config( self.fc.config_path, "\n".join( [ "# test config", "[all]", "# Auto-Generated Fan Curve parameters by PiFanUI", "dtparam=fan_temp0=1,fan_temp0_hyst=1,fan_temp0_speed=1", "dtparam=fan_temp1=2,fan_temp1_hyst=2,fan_temp1_speed=2", "dtparam=fan_temp2=3,fan_temp2_hyst=3,fan_temp2_speed=3", "dtparam=fan_temp3=4,fan_temp3_hyst=4,fan_temp3_speed=4", # duplicates that should be removed "dtparam=fan_temp0=999,fan_temp0_hyst=999,fan_temp0_speed=999", "dtparam=fan_temp3=999,fan_temp3_hyst=999,fan_temp3_speed=999", "", ] ) + "\n", ) with redirect_stdout(io.StringIO()): self.fc.update(settings) with open(self.fc.config_path, "r", encoding="utf-8") as f: all_lines = list(f) # Ensure deletion worked: the only remaining fan_temp dtparam lines are the 4 expected ones. self.assertEqual(sum(1 for line in all_lines if "dtparam=fan_temp" in line), 4) written_lines = [line.rstrip("\n") for line in all_lines if line.startswith("dtparam=fan_temp")] expected_lines = [str(s) for s in settings] self.assertEqual(written_lines, expected_lines) def test_update_preserves_unrelated_config_lines(self) -> None: original = "\n".join( [ "# test config", "dtparam=audio=on", "", "[pi5]", "dtoverlay=foo", "", "[all]", "dtoverlay=dwc2,dr_mode=host", "dtparam=i2c_arm=on", "", "[extra]", "somekey=somevalue", "", ] ) + "\n" _write_config(self.fc.config_path, original) with redirect_stdout(io.StringIO()): self.fc.update(self.fc._get_default_settings()) with open(self.fc.config_path, "r", encoding="utf-8") as f: after_lines = f.readlines() # Strip out the auto-generated fan curve block; remaining content should match exactly. after_stripped = [ line for line in after_lines if not (line.startswith("dtparam=fan_temp") or line.startswith("# Auto-Generated Fan Curve")) ] self.assertEqual(after_stripped, original.splitlines(keepends=True)) class TestFanCurveGetCurrentSettings(unittest.TestCase): def setUp(self) -> None: self.tmpdir = tempfile.TemporaryDirectory() self.fc = FanCurve.__new__(FanCurve) self.fc.config_path = os.path.join(self.tmpdir.name, "config.txt") def tearDown(self) -> None: self.tmpdir.cleanup() def _read_settings_as_tuples(self) -> list[tuple[int, int, int, int]]: settings = self.fc._get_current_settings() return [(s.N, s.temp, s.hyst, s.speed) for s in settings] def test_get_current_settings_parses_separate_lines_per_param(self) -> None: _write_config( self.fc.config_path, "\n".join( [ "[all]", "dtparam=fan_temp0=65000", "dtparam=fan_temp0_hyst=5000", "dtparam=fan_temp0_speed=75", "", "dtparam=fan_temp1=70000", "dtparam=fan_temp1_hyst=5000", "dtparam=fan_temp1_speed=128", "", "dtparam=fan_temp2=75000", "dtparam=fan_temp2_hyst=5000", "dtparam=fan_temp2_speed=192", "", ] ) + "\n", ) self.assertEqual( self._read_settings_as_tuples(), [ (0, 65000, 5000, 75), (1, 70000, 5000, 128), (2, 75000, 5000, 192), ], ) def test_get_current_settings_parses_comma_separated_dtparam_lines(self) -> None: _write_config( self.fc.config_path, "\n".join( [ "[all]", "dtparam=fan_temp0=54000,fan_temp0_hyst=2000,fan_temp0_speed=90", "dtparam=fan_temp1=65000,fan_temp1_hyst=3000,fan_temp1_speed=160", "dtparam=fan_temp2=70000,fan_temp2_hyst=4000,fan_temp2_speed=200", "dtparam=fan_temp3=72000,fan_temp3_hyst=5000,fan_temp3_speed=255", "", ] ) + "\n", ) self.assertEqual( self._read_settings_as_tuples(), [ (0, 54000, 2000, 90), (1, 65000, 3000, 160), (2, 70000, 4000, 200), (3, 72000, 5000, 255), ], ) def test_get_current_settings_parses_three_thresholds_set(self) -> None: _write_config( self.fc.config_path, "\n".join( [ "[all]", "dtparam=fan_temp0=54000,fan_temp0_hyst=2000,fan_temp0_speed=90", "dtparam=fan_temp1=65000,fan_temp1_hyst=3000,fan_temp1_speed=160", "dtparam=fan_temp2=70000,fan_temp2_hyst=4000,fan_temp2_speed=200", "", ] ) + "\n", ) self.assertEqual( self._read_settings_as_tuples(), [ (0, 54000, 2000, 90), (1, 65000, 3000, 160), (2, 70000, 4000, 200), ], ) def test_get_current_settings_rejects_partially_specified_threshold(self) -> None: # fan_temp0 is missing hyst and speed -> must fail rather than silently mis-parsing. _write_config( self.fc.config_path, "\n".join( [ "[all]", "dtparam=fan_temp0=65000", "", ] ) + "\n", ) with self.assertRaises(AssertionError): self.fc._get_current_settings() def test_get_current_settings_parses_mixed_styles(self) -> None: _write_config( self.fc.config_path, "\n".join( [ "[all]", # fan_temp0: all in one line "dtparam=fan_temp0=54000,fan_temp0_hyst=2000,fan_temp0_speed=90", # fan_temp1: split across 3 lines "dtparam=fan_temp1=65000", "dtparam=fan_temp1_hyst=3000", "dtparam=fan_temp1_speed=160", # fan_temp2: split (temp+speed) then hyst "dtparam=fan_temp2=70000,fan_temp2_speed=200", "dtparam=fan_temp2_hyst=4000", # fan_temp3: all in one line "dtparam=fan_temp3=72000,fan_temp3_hyst=5000,fan_temp3_speed=255", "", ] ) + "\n", ) self.assertEqual( self._read_settings_as_tuples(), [ (0, 54000, 2000, 90), (1, 65000, 3000, 160), (2, 70000, 4000, 200), (3, 72000, 5000, 255), ], ) if __name__ == "__main__": unittest.main(verbosity=2)