add vibetui
This commit is contained in:
parent
0f74d899fc
commit
721cb6a9bf
3
.gitignore
vendored
3
.gitignore
vendored
@ -8,3 +8,6 @@ wheels/
|
|||||||
|
|
||||||
# Virtual environments
|
# Virtual environments
|
||||||
.venv
|
.venv
|
||||||
|
|
||||||
|
# Patch file
|
||||||
|
PIFanTUI.patch
|
||||||
19
main.py
19
main.py
@ -2,8 +2,7 @@
|
|||||||
import difflib
|
import difflib
|
||||||
import tempfile
|
import tempfile
|
||||||
import shutil
|
import shutil
|
||||||
|
import os
|
||||||
from tui import TUI
|
|
||||||
|
|
||||||
class DtParam:
|
class DtParam:
|
||||||
__slots__ = ('temp', 'hyst', 'speed', 'N')
|
__slots__ = ('temp', 'hyst', 'speed', 'N')
|
||||||
@ -38,7 +37,7 @@ class FanCurve:
|
|||||||
self._assert_valid_cfg()
|
self._assert_valid_cfg()
|
||||||
self._write_cfg()
|
self._write_cfg()
|
||||||
|
|
||||||
def _write_cfg(self):
|
def _write_cfg(self, dry_run: bool = False):
|
||||||
with open(self.config_path, "r") as f:
|
with open(self.config_path, "r") as f:
|
||||||
lines = f.readlines()
|
lines = f.readlines()
|
||||||
old_cfg = [line for line in lines if not (line.startswith("dtparam=fan_temp") or line.startswith("# Auto-Generated Fan Curve"))]
|
old_cfg = [line for line in lines if not (line.startswith("dtparam=fan_temp") or line.startswith("# Auto-Generated Fan Curve"))]
|
||||||
@ -61,11 +60,20 @@ class FanCurve:
|
|||||||
new_conf.writelines(new_cfg)
|
new_conf.writelines(new_cfg)
|
||||||
|
|
||||||
diff = difflib.unified_diff(lines, new_cfg, fromfile=self.config_path, tofile=new_conf.name)
|
diff = difflib.unified_diff(lines, new_cfg, fromfile=self.config_path, tofile=new_conf.name)
|
||||||
|
|
||||||
|
if dry_run:
|
||||||
|
user_dir = os.getcwd()
|
||||||
|
with open(os.path.join(user_dir, 'PIFanTUI.patch'), 'w') as patch_file:
|
||||||
|
patch_file.writelines(diff)
|
||||||
|
print(f"Patch file generated at {os.path.join(user_dir, 'PIFanTUI.patch')}\nrun '/boot/firmware/config.txt < PIFanTUI.patch' to apply the patch")
|
||||||
|
|
||||||
|
|
||||||
print(f"Updated values:")
|
print(f"Updated values:")
|
||||||
for line in diff:
|
for line in diff:
|
||||||
print(line)
|
print(line)
|
||||||
|
|
||||||
shutil.move(new_conf.name, self.config_path)
|
if not dry_run:
|
||||||
|
shutil.move(new_conf.name, self.config_path)
|
||||||
|
|
||||||
def _assert_valid_cfg(self):
|
def _assert_valid_cfg(self):
|
||||||
assert len(self.settings) <= 4 and len(self.settings) >= 1, f"Only up to 4 thresholds supported, found {len(self.settings)}"
|
assert len(self.settings) <= 4 and len(self.settings) >= 1, f"Only up to 4 thresholds supported, found {len(self.settings)}"
|
||||||
@ -97,9 +105,10 @@ class FanCurve:
|
|||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
|
from tui import TUI
|
||||||
sc = FanCurve()
|
sc = FanCurve()
|
||||||
tui = TUI(sc)
|
tui = TUI(sc)
|
||||||
print("Bye from pifantui!")
|
tui.run()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
83
test.py
83
test.py
@ -1,5 +1,7 @@
|
|||||||
import io
|
import io
|
||||||
import os
|
import os
|
||||||
|
import shutil
|
||||||
|
import subprocess
|
||||||
import tempfile
|
import tempfile
|
||||||
import unittest
|
import unittest
|
||||||
from contextlib import redirect_stdout
|
from contextlib import redirect_stdout
|
||||||
@ -12,6 +14,22 @@ def _write_config(path: str, content: str) -> None:
|
|||||||
with open(path, "w", encoding="utf-8") as f:
|
with open(path, "w", encoding="utf-8") as f:
|
||||||
f.write(content)
|
f.write(content)
|
||||||
|
|
||||||
|
class _Cwd:
|
||||||
|
"""Small helper to run code with a temporary working directory."""
|
||||||
|
|
||||||
|
def __init__(self, path: str) -> None:
|
||||||
|
self._path = path
|
||||||
|
self._old = ""
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
self._old = os.getcwd()
|
||||||
|
os.chdir(self._path)
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc, tb):
|
||||||
|
os.chdir(self._old)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
class TestDtParamAsserts(unittest.TestCase):
|
class TestDtParamAsserts(unittest.TestCase):
|
||||||
def test_temp_must_be_in_sensible_range(self) -> None:
|
def test_temp_must_be_in_sensible_range(self) -> None:
|
||||||
@ -268,6 +286,71 @@ class TestFanCurveWriteBehavior(unittest.TestCase):
|
|||||||
self.assertEqual(after_stripped, original.splitlines(keepends=True))
|
self.assertEqual(after_stripped, original.splitlines(keepends=True))
|
||||||
|
|
||||||
|
|
||||||
|
class TestFanCurveDryRunPatchEquivalence(unittest.TestCase):
|
||||||
|
def test_dry_run_patch_applies_to_same_result_as_write(self) -> None:
|
||||||
|
if shutil.which("patch") is None:
|
||||||
|
self.skipTest("system 'patch' command not available")
|
||||||
|
|
||||||
|
base_cfg = "\n".join(
|
||||||
|
[
|
||||||
|
"# test config",
|
||||||
|
"[all]",
|
||||||
|
"dtoverlay=dwc2,dr_mode=host",
|
||||||
|
"",
|
||||||
|
]
|
||||||
|
) + "\n"
|
||||||
|
|
||||||
|
tmpcfg = tempfile.TemporaryDirectory()
|
||||||
|
tmpwork = tempfile.TemporaryDirectory()
|
||||||
|
try:
|
||||||
|
cfg_a = os.path.join(tmpcfg.name, "a.txt")
|
||||||
|
cfg_b = os.path.join(tmpcfg.name, "b.txt")
|
||||||
|
_write_config(cfg_a, base_cfg)
|
||||||
|
_write_config(cfg_b, base_cfg)
|
||||||
|
|
||||||
|
# Prepare two FanCurve instances pointing at the two files.
|
||||||
|
fc_a = FanCurve.__new__(FanCurve)
|
||||||
|
fc_a.config_path = cfg_a
|
||||||
|
fc_a.settings = fc_a._get_default_settings()
|
||||||
|
fc_a._assert_valid_cfg()
|
||||||
|
|
||||||
|
fc_b = FanCurve.__new__(FanCurve)
|
||||||
|
fc_b.config_path = cfg_b
|
||||||
|
fc_b.settings = fc_b._get_default_settings()
|
||||||
|
fc_b._assert_valid_cfg()
|
||||||
|
|
||||||
|
# Non-dry-run: directly writes config.
|
||||||
|
with redirect_stdout(io.StringIO()):
|
||||||
|
fc_a._write_cfg(dry_run=False)
|
||||||
|
|
||||||
|
# Dry-run: generates patch in CWD. Apply patch to cfg_b and compare to cfg_a.
|
||||||
|
with _Cwd(tmpwork.name):
|
||||||
|
with redirect_stdout(io.StringIO()):
|
||||||
|
fc_b._write_cfg(dry_run=True)
|
||||||
|
patch_path = os.path.join(tmpwork.name, "PIFanTUI.patch")
|
||||||
|
self.assertTrue(os.path.exists(patch_path), "dry-run did not generate PIFanTUI.patch")
|
||||||
|
|
||||||
|
# Patch output includes absolute paths, which GNU patch treats as unsafe.
|
||||||
|
# Provide the file-to-patch explicitly to avoid filename resolution issues.
|
||||||
|
subprocess.run(
|
||||||
|
["patch", "--batch", cfg_b, "-i", patch_path],
|
||||||
|
check=True,
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE,
|
||||||
|
text=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
with open(cfg_a, "r", encoding="utf-8") as f:
|
||||||
|
a = f.read()
|
||||||
|
with open(cfg_b, "r", encoding="utf-8") as f:
|
||||||
|
b = f.read()
|
||||||
|
|
||||||
|
self.assertEqual(a, b)
|
||||||
|
finally:
|
||||||
|
tmpcfg.cleanup()
|
||||||
|
tmpwork.cleanup()
|
||||||
|
|
||||||
|
|
||||||
class TestFanCurveGetCurrentSettings(unittest.TestCase):
|
class TestFanCurveGetCurrentSettings(unittest.TestCase):
|
||||||
def setUp(self) -> None:
|
def setUp(self) -> None:
|
||||||
self.tmpdir = tempfile.TemporaryDirectory()
|
self.tmpdir = tempfile.TemporaryDirectory()
|
||||||
|
|||||||
603
tui.py
Normal file
603
tui.py
Normal file
@ -0,0 +1,603 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
import sys
|
||||||
|
import select
|
||||||
|
import os
|
||||||
|
import termios
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
from rich.console import Console
|
||||||
|
from rich.console import Group
|
||||||
|
from rich.layout import Layout
|
||||||
|
from rich.live import Live
|
||||||
|
from rich.panel import Panel
|
||||||
|
from rich.table import Table
|
||||||
|
from rich.text import Text
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class _LayoutSpec:
|
||||||
|
top_ratio: int = 60
|
||||||
|
bottom_ratio: int = 40
|
||||||
|
bottom_left_ratio: int = 65
|
||||||
|
bottom_right_ratio: int = 35
|
||||||
|
|
||||||
|
|
||||||
|
class TUI:
|
||||||
|
"""Rich-only TUI that avoids setting any background color.
|
||||||
|
|
||||||
|
This keeps the terminal emulator theme (including alpha transparency) intact.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, fan_curve) -> None:
|
||||||
|
self._fan_curve = fan_curve
|
||||||
|
self._console = Console()
|
||||||
|
self._spec = _LayoutSpec()
|
||||||
|
self._stop = threading.Event()
|
||||||
|
self._selected_bar_idx: int | None = None
|
||||||
|
self._mode: str = "nav" # "nav" | "edit"
|
||||||
|
self._notice: str = ""
|
||||||
|
|
||||||
|
def _sorted_settings(self):
|
||||||
|
return sorted(
|
||||||
|
getattr(self._fan_curve, "settings", []) or [],
|
||||||
|
key=lambda s: getattr(s, "temp", 0),
|
||||||
|
)
|
||||||
|
|
||||||
|
def _build_layout(self) -> Layout:
|
||||||
|
root = Layout(name="root")
|
||||||
|
root.split_column(
|
||||||
|
Layout(name="status", size=1),
|
||||||
|
Layout(name="top", ratio=self._spec.top_ratio),
|
||||||
|
Layout(name="bottom", ratio=self._spec.bottom_ratio),
|
||||||
|
)
|
||||||
|
root["bottom"].split_row(
|
||||||
|
Layout(name="bottom_left", ratio=self._spec.bottom_left_ratio),
|
||||||
|
Layout(name="bottom_right", ratio=self._spec.bottom_right_ratio),
|
||||||
|
)
|
||||||
|
return root
|
||||||
|
|
||||||
|
def _settings_table(self, settings, selected_idx: int | None) -> Table:
|
||||||
|
table = Table(title="Fan curve", expand=True)
|
||||||
|
table.add_column("N", justify="right", no_wrap=True)
|
||||||
|
table.add_column("Temp (°C)", justify="right", no_wrap=True)
|
||||||
|
table.add_column("Hyst (°C)", justify="right", no_wrap=True)
|
||||||
|
table.add_column("Speed", justify="right", no_wrap=True)
|
||||||
|
|
||||||
|
for i, s in enumerate(settings or []):
|
||||||
|
# Backend stores milli-Celsius for temp and hyst.
|
||||||
|
table.add_row(
|
||||||
|
str(getattr(s, "N", "?")),
|
||||||
|
f"{getattr(s, 'temp', 0) / 1000:.1f}",
|
||||||
|
f"{getattr(s, 'hyst', 0) / 1000:.1f}",
|
||||||
|
str(getattr(s, "speed", "?")),
|
||||||
|
style=("reverse" if (selected_idx is not None and i == selected_idx) else None),
|
||||||
|
)
|
||||||
|
return table
|
||||||
|
|
||||||
|
def _icefire_rgb(self, t: float) -> tuple[int, int, int]:
|
||||||
|
"""Approximate 'icefire' diverging colormap (0..1 -> RGB).
|
||||||
|
|
||||||
|
We keep a small set of control points and linearly interpolate for a compact,
|
||||||
|
dependency-free implementation.
|
||||||
|
"""
|
||||||
|
tt = 0.0 if t < 0.0 else 1.0 if t > 1.0 else t
|
||||||
|
stops: list[tuple[float, tuple[int, int, int]]] = [
|
||||||
|
(0.00, (0, 7, 26)), # near-black blue
|
||||||
|
(0.12, (0, 53, 124)), # deep blue
|
||||||
|
(0.25, (0, 119, 182)), # cyan-blue
|
||||||
|
(0.38, (82, 181, 214)), # light cyan
|
||||||
|
(0.50, (245, 245, 245)), # near-white midpoint
|
||||||
|
(0.62, (250, 199, 120)), # warm light
|
||||||
|
(0.75, (242, 120, 45)), # orange
|
||||||
|
(0.88, (189, 45, 38)), # red
|
||||||
|
(1.00, (64, 0, 8)), # near-black red
|
||||||
|
]
|
||||||
|
|
||||||
|
for (a, ca), (b, cb) in zip(stops, stops[1:]):
|
||||||
|
if tt <= b:
|
||||||
|
u = 0.0 if b == a else (tt - a) / (b - a)
|
||||||
|
r = round(ca[0] + (cb[0] - ca[0]) * u)
|
||||||
|
g = round(ca[1] + (cb[1] - ca[1]) * u)
|
||||||
|
b2 = round(ca[2] + (cb[2] - ca[2]) * u)
|
||||||
|
return (int(r), int(g), int(b2))
|
||||||
|
return stops[-1][1]
|
||||||
|
|
||||||
|
def _help_text(self) -> Text:
|
||||||
|
t = Text()
|
||||||
|
|
||||||
|
def _line(parts: list[tuple[str, str]]) -> None:
|
||||||
|
for s, style in parts:
|
||||||
|
t.append(s, style=style)
|
||||||
|
t.append("\n")
|
||||||
|
|
||||||
|
key = "bright_yellow"
|
||||||
|
key2 = "bright_cyan"
|
||||||
|
key_save = "bright_green"
|
||||||
|
key_dry = "bright_blue"
|
||||||
|
dim = "bright_black"
|
||||||
|
white = "white"
|
||||||
|
|
||||||
|
_line([("NAV:", f"{white} bold")])
|
||||||
|
_line([(" ", white), ("←/→/↑/↓", key2), (" select bar", white)])
|
||||||
|
_line([(" ", white), ("Enter", key), (" ", white), ("edit mode", white)])
|
||||||
|
_line([(" ", white), ("Ctrl+R", key2), (" reset defaults", white)])
|
||||||
|
_line([(" ", white), ("Ctrl+S", key_save), (" save", white)])
|
||||||
|
_line([(" ", white), ("Ctrl+D", key_dry), (" save (dry-run diff)", white)])
|
||||||
|
_line([("", white)])
|
||||||
|
_line([("EDIT:", f"{white} bold")])
|
||||||
|
_line([(" ", white), ("←/→", key2), (" temp +/-", white)])
|
||||||
|
_line([(" ", white), ("↑/↓", key2), (" speed +/-", white)])
|
||||||
|
_line([(" ", white), ("Shift+←/→", key2), (" hyst +/-", white), (" (Left increases)", dim)])
|
||||||
|
_line([(" ", white), ("Esc", key), (" ", white), ("exit edit mode", white)])
|
||||||
|
_line([("", white)])
|
||||||
|
_line([("Ctrl+C", key), (" to exit", white)])
|
||||||
|
|
||||||
|
# Drop trailing newline.
|
||||||
|
if t.plain.endswith("\n"):
|
||||||
|
t = t[:-1]
|
||||||
|
return t
|
||||||
|
|
||||||
|
def _axes(self, width: int, height: int, settings, selected_idx: int | None) -> Text:
|
||||||
|
"""Draw just X/Y axes for a fan curve plot.
|
||||||
|
|
||||||
|
- X axis: temperature from 10..100 (°C)
|
||||||
|
- Y axis: fan speed from 0..100 (%)
|
||||||
|
"""
|
||||||
|
# Keep at least one blank column on the right to avoid terminal edge clipping.
|
||||||
|
safe_right = 2
|
||||||
|
|
||||||
|
total_w = max(14, width)
|
||||||
|
total_h = max(8, height)
|
||||||
|
|
||||||
|
# Space for Y tick labels (e.g. "100%").
|
||||||
|
y_label_w = max(len("100%"), len(" 90%"), len(" 0%")) + 1 # right-aligned + padding
|
||||||
|
# Two rows below the x-axis: one for tick numbers, one for range endpoints.
|
||||||
|
x_label_h = 2
|
||||||
|
|
||||||
|
# Plot area (including the axes themselves).
|
||||||
|
plot_w = max(6, total_w - y_label_w - safe_right)
|
||||||
|
plot_h = max(4, total_h - x_label_h)
|
||||||
|
|
||||||
|
# Build a fixed-size character grid (no background styling).
|
||||||
|
grid = [[" " for _ in range(total_w)] for _ in range(total_h)]
|
||||||
|
|
||||||
|
# Origin at bottom-left of plot area (after y-label margin).
|
||||||
|
origin_x = y_label_w
|
||||||
|
origin_y = plot_h - 1
|
||||||
|
|
||||||
|
# Y axis.
|
||||||
|
for y in range(0, plot_h):
|
||||||
|
grid[y][origin_x] = "│"
|
||||||
|
|
||||||
|
# X axis.
|
||||||
|
x_axis_end = min(total_w - 1 - safe_right, origin_x + plot_w - 1)
|
||||||
|
for x in range(origin_x, x_axis_end + 1):
|
||||||
|
grid[origin_y][x] = "─"
|
||||||
|
|
||||||
|
# Corner.
|
||||||
|
grid[origin_y][origin_x] = "└"
|
||||||
|
|
||||||
|
# Axis end caps (ASCII-safe; avoids width quirks on some terminals/fonts).
|
||||||
|
grid[0][origin_x] = "^"
|
||||||
|
grid[origin_y][x_axis_end] = ">"
|
||||||
|
|
||||||
|
# Y range labels (to the left of the y-axis, vertically).
|
||||||
|
def _put(y: int, x: int, s: str) -> None:
|
||||||
|
if y < 0 or y >= total_h:
|
||||||
|
return
|
||||||
|
for i, ch in enumerate(s):
|
||||||
|
xx = x + i
|
||||||
|
if 0 <= xx < total_w:
|
||||||
|
grid[y][xx] = ch
|
||||||
|
|
||||||
|
# Ticks every 10% on Y axis and every 10C on X axis + labels next to ticks.
|
||||||
|
plot_span_y = max(1, plot_h - 1)
|
||||||
|
for pct in range(0, 101, 10):
|
||||||
|
y = origin_y - round((pct / 100) * plot_span_y)
|
||||||
|
if 0 <= y < plot_h:
|
||||||
|
if grid[y][origin_x] not in ("└", "^"):
|
||||||
|
grid[y][origin_x] = "├"
|
||||||
|
# Label to the left of the axis, right-aligned.
|
||||||
|
label = f"{pct:>3}%"
|
||||||
|
_put(y, max(0, y_label_w - 1 - len(label)), label)
|
||||||
|
|
||||||
|
plot_span_x = max(1, (x_axis_end - origin_x))
|
||||||
|
for temp_c in range(10, 101, 10):
|
||||||
|
frac = (temp_c - 10) / 90 if 90 else 0.0
|
||||||
|
x = origin_x + round(frac * plot_span_x)
|
||||||
|
x = min(x_axis_end, max(origin_x, x))
|
||||||
|
# Tick mark on axis
|
||||||
|
if grid[origin_y][x] not in ("└", ">"):
|
||||||
|
grid[origin_y][x] = "┬"
|
||||||
|
|
||||||
|
# Numeric label below the axis, centered-ish on tick.
|
||||||
|
label = str(temp_c)
|
||||||
|
label_y = origin_y + 1
|
||||||
|
if label_y < total_h:
|
||||||
|
start_x = max(origin_x, min(x_axis_end - len(label) + 1, x - (len(label) // 2)))
|
||||||
|
_put(label_y, start_x, label)
|
||||||
|
|
||||||
|
# X range endpoint labels on the second row below the axis.
|
||||||
|
x_range_y = origin_y + 2
|
||||||
|
if x_range_y < total_h:
|
||||||
|
_put(x_range_y, origin_x, "10C")
|
||||||
|
_put(x_range_y, max(origin_x, x_axis_end - len("100C") + 1), "100C")
|
||||||
|
|
||||||
|
# Plot a filled rectangle per setting:
|
||||||
|
# x in [temp-hyst, temp], y in [0, speed%]; 255 == 100%.
|
||||||
|
plot_span_y = max(1, plot_h - 1)
|
||||||
|
plot_span_x = max(1, x_axis_end - origin_x)
|
||||||
|
|
||||||
|
def _clamp(v: int, lo: int, hi: int) -> int:
|
||||||
|
return lo if v < lo else hi if v > hi else v
|
||||||
|
|
||||||
|
plotted: list[tuple[int, int, str, bool]] = []
|
||||||
|
for i, s in enumerate(settings or []):
|
||||||
|
try:
|
||||||
|
temp_c = float(getattr(s, "temp")) / 1000.0
|
||||||
|
speed = float(getattr(s, "speed"))
|
||||||
|
hyst_c = float(getattr(s, "hyst")) / 1000.0
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Clamp to axis domain.
|
||||||
|
temp_c = max(10.0, min(100.0, temp_c))
|
||||||
|
speed = max(0.0, min(255.0, speed))
|
||||||
|
hyst_c = max(0.0, hyst_c)
|
||||||
|
pct = (speed / 255.0) * 100.0
|
||||||
|
|
||||||
|
# Map to plot coordinates.
|
||||||
|
frac_x = (temp_c - 10.0) / 90.0 if 90.0 else 0.0
|
||||||
|
x = origin_x + round(frac_x * plot_span_x)
|
||||||
|
x = _clamp(x, origin_x, x_axis_end)
|
||||||
|
|
||||||
|
y = origin_y - round((pct / 100.0) * plot_span_y)
|
||||||
|
y = _clamp(y, 0, origin_y)
|
||||||
|
|
||||||
|
# Rectangle x-span from (temp-hyst) .. temp.
|
||||||
|
rect_start_c = max(10.0, temp_c - hyst_c)
|
||||||
|
rect_end_c = temp_c
|
||||||
|
frac_start = (rect_start_c - 10.0) / 90.0 if 90.0 else 0.0
|
||||||
|
frac_end = (rect_end_c - 10.0) / 90.0 if 90.0 else 0.0
|
||||||
|
x0 = origin_x + round(frac_start * plot_span_x)
|
||||||
|
x1 = origin_x + round(frac_end * plot_span_x)
|
||||||
|
x0 = _clamp(x0, origin_x, x_axis_end)
|
||||||
|
x1 = _clamp(x1, origin_x, x_axis_end)
|
||||||
|
if x0 > x1:
|
||||||
|
x0, x1 = x1, x0
|
||||||
|
|
||||||
|
# Rectangle y-span from axis baseline (0%) up to the point.
|
||||||
|
y0 = y
|
||||||
|
y1 = origin_y - 1
|
||||||
|
if y0 > y1:
|
||||||
|
y0, y1 = y1, y0
|
||||||
|
|
||||||
|
# Color by fan speed (constant per setting): 0..255 -> 0..1.
|
||||||
|
speed_norm = speed / 255.0 if 255.0 else 0.0
|
||||||
|
r, g, b2 = self._icefire_rgb(speed_norm)
|
||||||
|
fill_style = f"rgb({r},{g},{b2})"
|
||||||
|
|
||||||
|
# Fill with a foreground-colored block; do not set background.
|
||||||
|
# Avoid overwriting axes/labels: only fill into blank cells.
|
||||||
|
for yy in range(y0, y1 + 1):
|
||||||
|
if not (0 <= yy < origin_y):
|
||||||
|
continue
|
||||||
|
for xx in range(x0, x1 + 1):
|
||||||
|
if xx <= origin_x:
|
||||||
|
continue
|
||||||
|
if grid[yy][xx] == " ":
|
||||||
|
# Use a shaded block to look "semi-transparent" while still
|
||||||
|
# rendering via foreground color only (no background fill).
|
||||||
|
grid[yy][xx] = ("▒", fill_style)
|
||||||
|
|
||||||
|
plotted.append((x, y, fill_style, (selected_idx is not None and i == selected_idx)))
|
||||||
|
|
||||||
|
# ASCII-safe point marker (draw last).
|
||||||
|
for x, y, style, is_selected in plotted:
|
||||||
|
# Match the rectangle's color (foreground only) so the point "belongs" to it.
|
||||||
|
# If selected, add a background highlight (point only).
|
||||||
|
point_style = style
|
||||||
|
if is_selected:
|
||||||
|
point_style = f"{style} on rgb(255,255,0)"
|
||||||
|
grid[y][x] = ("o", point_style)
|
||||||
|
|
||||||
|
# Convert grid into a styled Text. Cells may be a plain char or (char, style).
|
||||||
|
out = Text()
|
||||||
|
for row in grid:
|
||||||
|
for cell in row:
|
||||||
|
if isinstance(cell, tuple):
|
||||||
|
ch, style = cell
|
||||||
|
out.append(ch, style=style)
|
||||||
|
else:
|
||||||
|
out.append(cell)
|
||||||
|
out.append("\n")
|
||||||
|
# Drop the final newline to avoid Rich adding an extra blank line at the bottom.
|
||||||
|
if out and out.plain.endswith("\n"):
|
||||||
|
out = out[:-1]
|
||||||
|
out.no_wrap = True
|
||||||
|
return out
|
||||||
|
|
||||||
|
def _render(self, layout: Layout) -> None:
|
||||||
|
# Status line (top-left): current mode.
|
||||||
|
mode_label = "EDIT" if self._mode == "edit" else "NAV"
|
||||||
|
layout["status"].update(Text(f"MODE: {mode_label}", style="bold"))
|
||||||
|
|
||||||
|
# Draw axes in the available inner area (panel takes 2 columns/rows for border).
|
||||||
|
top_size = layout["top"].size
|
||||||
|
if top_size is None:
|
||||||
|
dims = self._console.size
|
||||||
|
top_w = dims.width
|
||||||
|
top_h = max(3, int(dims.height * (self._spec.top_ratio / 100)))
|
||||||
|
else:
|
||||||
|
top_w = top_size.width
|
||||||
|
top_h = top_size.height
|
||||||
|
|
||||||
|
# Panel border is 2 cols/rows; plus we add horizontal padding to keep content away
|
||||||
|
# from terminal edges (helps avoid right-edge clipping).
|
||||||
|
panel_pad_x = 1
|
||||||
|
inner_w = max(3, top_w - 2 - (panel_pad_x * 2))
|
||||||
|
inner_h = max(3, top_h - 2)
|
||||||
|
|
||||||
|
settings = self._sorted_settings()
|
||||||
|
if self._selected_bar_idx is None and settings:
|
||||||
|
self._selected_bar_idx = 0
|
||||||
|
|
||||||
|
layout["top"].update(
|
||||||
|
Panel(
|
||||||
|
self._axes(inner_w, inner_h, settings, self._selected_bar_idx),
|
||||||
|
title="Fan curve axes (Temp 10–100°C, Speed 0–100%)",
|
||||||
|
border_style="yellow",
|
||||||
|
padding=(0, panel_pad_x),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
layout["bottom_left"].update(
|
||||||
|
Panel(
|
||||||
|
Group(
|
||||||
|
*(
|
||||||
|
[Text(f"Last action: {self._notice}", style="yellow")]
|
||||||
|
if self._notice
|
||||||
|
else []
|
||||||
|
),
|
||||||
|
self._settings_table(settings, self._selected_bar_idx),
|
||||||
|
),
|
||||||
|
title="Current settings",
|
||||||
|
border_style="green",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
layout["bottom_right"].update(
|
||||||
|
Panel(
|
||||||
|
self._help_text(),
|
||||||
|
title="Help",
|
||||||
|
border_style="magenta",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
def _save(self, dry_run: bool) -> None:
|
||||||
|
"""Persist current settings to config (or write a patch when dry_run=True)."""
|
||||||
|
try:
|
||||||
|
settings = getattr(self._fan_curve, "settings", []) or []
|
||||||
|
settings.sort(key=lambda s: getattr(s, "temp", 0))
|
||||||
|
for i, s in enumerate(settings):
|
||||||
|
setattr(s, "N", i)
|
||||||
|
# Validate and write using backend implementation.
|
||||||
|
if hasattr(self._fan_curve, "_assert_valid_cfg"):
|
||||||
|
self._fan_curve._assert_valid_cfg()
|
||||||
|
if hasattr(self._fan_curve, "_write_cfg"):
|
||||||
|
self._fan_curve._write_cfg(dry_run=dry_run)
|
||||||
|
except Exception as e:
|
||||||
|
self._notice = f"Save failed: {e}"
|
||||||
|
return
|
||||||
|
|
||||||
|
cfg_path = getattr(self._fan_curve, "config_path", "/boot/firmware/config.txt")
|
||||||
|
if dry_run:
|
||||||
|
patch_file = os.path.join(os.getcwd(), "PIFanTUI.patch")
|
||||||
|
self._notice = (
|
||||||
|
f"Wrote {os.path.basename(patch_file)}. Apply then reboot: "
|
||||||
|
f"sudo patch {cfg_path} -i {os.path.basename(patch_file)} && sudo reboot"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self._notice = f"Saved. Reboot for settings to take effect: sudo reboot"
|
||||||
|
|
||||||
|
def _on_arrow(self, direction: str) -> None:
|
||||||
|
"""Arrow key handler.
|
||||||
|
|
||||||
|
For now:
|
||||||
|
- arrows cycle the selected bar (point highlight moves)
|
||||||
|
"""
|
||||||
|
settings = self._sorted_settings()
|
||||||
|
if not settings:
|
||||||
|
self._selected_bar_idx = None
|
||||||
|
return
|
||||||
|
if self._selected_bar_idx is None:
|
||||||
|
self._selected_bar_idx = 0
|
||||||
|
return
|
||||||
|
|
||||||
|
n = len(settings)
|
||||||
|
if direction in ("left", "up"):
|
||||||
|
self._selected_bar_idx = (self._selected_bar_idx - 1) % n
|
||||||
|
elif direction in ("right", "down"):
|
||||||
|
self._selected_bar_idx = (self._selected_bar_idx + 1) % n
|
||||||
|
|
||||||
|
def _apply_edit(self, direction: str, shift: bool) -> None:
|
||||||
|
"""Edit-mode mutations of the selected setting."""
|
||||||
|
settings = self._sorted_settings()
|
||||||
|
if not settings:
|
||||||
|
self._selected_bar_idx = None
|
||||||
|
return
|
||||||
|
if self._selected_bar_idx is None:
|
||||||
|
self._selected_bar_idx = 0
|
||||||
|
idx = max(0, min(len(settings) - 1, self._selected_bar_idx))
|
||||||
|
s = settings[idx]
|
||||||
|
|
||||||
|
# Step sizes (simple + predictable).
|
||||||
|
TEMP_STEP = 1000 # 1C (milli-C)
|
||||||
|
HYST_STEP = 1000 # 1C (milli-C)
|
||||||
|
SPEED_STEP = 5 # PWM units (0..255)
|
||||||
|
|
||||||
|
temp = int(getattr(s, "temp", 10000))
|
||||||
|
hyst = int(getattr(s, "hyst", 1000))
|
||||||
|
speed = int(getattr(s, "speed", 0))
|
||||||
|
|
||||||
|
if shift and direction in ("left", "right"):
|
||||||
|
# In edit mode, Left increases / Right decreases (user preference).
|
||||||
|
hyst += (HYST_STEP if direction == "left" else -HYST_STEP)
|
||||||
|
else:
|
||||||
|
if direction == "left":
|
||||||
|
temp -= TEMP_STEP
|
||||||
|
elif direction == "right":
|
||||||
|
temp += TEMP_STEP
|
||||||
|
elif direction == "up":
|
||||||
|
speed += SPEED_STEP
|
||||||
|
elif direction == "down":
|
||||||
|
speed -= SPEED_STEP
|
||||||
|
|
||||||
|
# Clamp to backend-validated ranges.
|
||||||
|
temp = max(10_000, min(99_000, temp)) # DtParam asserts temp//1000 < 100
|
||||||
|
speed = max(0, min(255, speed))
|
||||||
|
hyst = max(1_000, min(99_000, hyst)) # DtParam asserts hyst//1000 > 0 and < 100
|
||||||
|
# Keep hyst from exceeding temp-10C, so temp-hyst stays in chart domain.
|
||||||
|
hyst = min(hyst, max(1_000, temp - 10_000))
|
||||||
|
|
||||||
|
setattr(s, "temp", temp)
|
||||||
|
setattr(s, "hyst", hyst)
|
||||||
|
setattr(s, "speed", speed)
|
||||||
|
|
||||||
|
def _read_keys(self) -> None:
|
||||||
|
"""Read keys (arrow keys, enter, esc, shift+arrows) without breaking Rich output."""
|
||||||
|
fd = sys.stdin.fileno()
|
||||||
|
old = termios.tcgetattr(fd)
|
||||||
|
try:
|
||||||
|
# IMPORTANT: don't use tty.setraw() here — it disables output post-processing
|
||||||
|
# on the shared TTY (e.g. \n no longer implies CR), which breaks Rich layout
|
||||||
|
# rendering. We only need non-canonical, no-echo input.
|
||||||
|
new = termios.tcgetattr(fd)
|
||||||
|
new[0] &= ~(termios.IXON | termios.IXOFF) # no flow control
|
||||||
|
new[3] &= ~(termios.ICANON | termios.ECHO | termios.ISIG) # bytes, no echo, no signals
|
||||||
|
# Non-blocking reads with small timeout so ESC can be handled as "escape" alone.
|
||||||
|
new[6][termios.VMIN] = 0
|
||||||
|
new[6][termios.VTIME] = 1 # 0.1s
|
||||||
|
termios.tcsetattr(fd, termios.TCSADRAIN, new)
|
||||||
|
buf = b""
|
||||||
|
|
||||||
|
def _handle_arrow(final: int, params: bytes) -> None:
|
||||||
|
direction = {ord("A"): "up", ord("B"): "down", ord("C"): "right", ord("D"): "left"}.get(final)
|
||||||
|
if direction is None:
|
||||||
|
return
|
||||||
|
shift = b";2" in params or params.startswith(b"2;") or b";2;" in params
|
||||||
|
if self._mode == "edit":
|
||||||
|
self._apply_edit(direction, shift=shift)
|
||||||
|
else:
|
||||||
|
self._on_arrow(direction)
|
||||||
|
|
||||||
|
while not self._stop.is_set():
|
||||||
|
try:
|
||||||
|
chunk = os.read(fd, 64)
|
||||||
|
except OSError:
|
||||||
|
chunk = b""
|
||||||
|
if not chunk:
|
||||||
|
continue
|
||||||
|
buf += chunk
|
||||||
|
|
||||||
|
# Parse as much as possible from the buffer.
|
||||||
|
while buf:
|
||||||
|
b0 = buf[0]
|
||||||
|
if b0 == 0x03: # Ctrl+C
|
||||||
|
self._stop.set()
|
||||||
|
return
|
||||||
|
if b0 == 0x12: # Ctrl+R
|
||||||
|
# NAV-only: reset the in-memory curve to firmware defaults.
|
||||||
|
if self._mode == "nav":
|
||||||
|
getter = getattr(self._fan_curve, "_get_default_settings", None)
|
||||||
|
if callable(getter):
|
||||||
|
try:
|
||||||
|
self._fan_curve.settings = getter()
|
||||||
|
self._selected_bar_idx = 0
|
||||||
|
self._notice = "Reset to defaults"
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
buf = buf[1:]
|
||||||
|
continue
|
||||||
|
if b0 == 0x13: # Ctrl+S
|
||||||
|
if self._mode == "nav":
|
||||||
|
self._save(dry_run=False)
|
||||||
|
buf = buf[1:]
|
||||||
|
continue
|
||||||
|
if b0 == 0x04: # Ctrl+D
|
||||||
|
if self._mode == "nav":
|
||||||
|
self._save(dry_run=True)
|
||||||
|
buf = buf[1:]
|
||||||
|
continue
|
||||||
|
if b0 in (0x0D, 0x0A): # Enter
|
||||||
|
self._mode = "edit" if self._mode != "edit" else "nav"
|
||||||
|
buf = buf[1:]
|
||||||
|
continue
|
||||||
|
|
||||||
|
if b0 != 0x1B: # not ESC
|
||||||
|
buf = buf[1:]
|
||||||
|
continue
|
||||||
|
|
||||||
|
# ESC: could be alone, or CSI sequence.
|
||||||
|
if len(buf) == 1:
|
||||||
|
if not select.select([fd], [], [], 0.03)[0]:
|
||||||
|
if self._mode == "edit":
|
||||||
|
self._mode = "nav"
|
||||||
|
buf = b""
|
||||||
|
break
|
||||||
|
|
||||||
|
if buf[1:2] != b"[":
|
||||||
|
# Unknown escape sequence; drop ESC and keep going.
|
||||||
|
buf = buf[1:]
|
||||||
|
continue
|
||||||
|
|
||||||
|
# CSI: ESC [ ... final
|
||||||
|
# Simple arrows: ESC [ A/B/C/D
|
||||||
|
if len(buf) >= 3 and buf[2] in (ord("A"), ord("B"), ord("C"), ord("D")):
|
||||||
|
_handle_arrow(buf[2], b"")
|
||||||
|
buf = buf[3:]
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Extended CSI: e.g. ESC [ 1 ; 2 C (shift+right)
|
||||||
|
final_idx = None
|
||||||
|
for i in range(2, len(buf)):
|
||||||
|
if buf[i] in (ord("A"), ord("B"), ord("C"), ord("D")):
|
||||||
|
final_idx = i
|
||||||
|
break
|
||||||
|
if i > 16:
|
||||||
|
break
|
||||||
|
if final_idx is None:
|
||||||
|
# Need more bytes.
|
||||||
|
break
|
||||||
|
|
||||||
|
params = buf[2:final_idx]
|
||||||
|
final = buf[final_idx]
|
||||||
|
_handle_arrow(final, params)
|
||||||
|
buf = buf[final_idx + 1 :]
|
||||||
|
finally:
|
||||||
|
termios.tcsetattr(fd, termios.TCSADRAIN, old)
|
||||||
|
|
||||||
|
def run(self, refresh_per_second: float = 10.0) -> None:
|
||||||
|
layout = self._build_layout()
|
||||||
|
key_thread = threading.Thread(target=self._read_keys, name="pifantui-keys", daemon=True)
|
||||||
|
key_thread.start()
|
||||||
|
try:
|
||||||
|
# NOTE: `screen=True` uses the alternate screen buffer which some terminals render
|
||||||
|
# with an opaque background. Keep this False to preserve emulator transparency.
|
||||||
|
with Live(layout, console=self._console, screen=False, refresh_per_second=refresh_per_second):
|
||||||
|
while not self._stop.is_set():
|
||||||
|
self._render(layout)
|
||||||
|
time.sleep(0.1)
|
||||||
|
finally:
|
||||||
|
self._stop.set()
|
||||||
|
key_thread.join(timeout=0.2)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# Minimal standalone preview.
|
||||||
|
# When run via main.py, we pass the real FanCurve instance.
|
||||||
|
class _Dummy:
|
||||||
|
settings = []
|
||||||
|
|
||||||
|
TUI(_Dummy()).run()
|
||||||
Loading…
x
Reference in New Issue
Block a user