diff --git a/.gitignore b/.gitignore index 505a3b1..a6e959b 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,6 @@ wheels/ # Virtual environments .venv + +# Patch file +PIFanTUI.patch \ No newline at end of file diff --git a/main.py b/main.py index 07af50c..fdf50e3 100644 --- a/main.py +++ b/main.py @@ -2,8 +2,7 @@ import difflib import tempfile import shutil - -from tui import TUI +import os class DtParam: __slots__ = ('temp', 'hyst', 'speed', 'N') @@ -38,7 +37,7 @@ class FanCurve: self._assert_valid_cfg() self._write_cfg() - def _write_cfg(self): + def _write_cfg(self, dry_run: bool = False): with open(self.config_path, "r") as f: lines = f.readlines() old_cfg = [line for line in lines if not (line.startswith("dtparam=fan_temp") or line.startswith("# Auto-Generated Fan Curve"))] @@ -61,11 +60,20 @@ class FanCurve: new_conf.writelines(new_cfg) diff = difflib.unified_diff(lines, new_cfg, fromfile=self.config_path, tofile=new_conf.name) + + if dry_run: + user_dir = os.getcwd() + with open(os.path.join(user_dir, 'PIFanTUI.patch'), 'w') as patch_file: + patch_file.writelines(diff) + print(f"Patch file generated at {os.path.join(user_dir, 'PIFanTUI.patch')}\nrun '/boot/firmware/config.txt < PIFanTUI.patch' to apply the patch") + + print(f"Updated values:") for line in diff: print(line) - shutil.move(new_conf.name, self.config_path) + if not dry_run: + shutil.move(new_conf.name, self.config_path) def _assert_valid_cfg(self): assert len(self.settings) <= 4 and len(self.settings) >= 1, f"Only up to 4 thresholds supported, found {len(self.settings)}" @@ -97,9 +105,10 @@ class FanCurve: def main(): + from tui import TUI sc = FanCurve() tui = TUI(sc) - print("Bye from pifantui!") + tui.run() if __name__ == "__main__": diff --git a/test.py b/test.py index a220a41..528f441 100644 --- a/test.py +++ b/test.py @@ -1,5 +1,7 @@ import io import os +import shutil +import subprocess import tempfile import unittest from contextlib import redirect_stdout @@ -12,6 +14,22 @@ def _write_config(path: str, content: str) -> None: with open(path, "w", encoding="utf-8") as f: f.write(content) +class _Cwd: + """Small helper to run code with a temporary working directory.""" + + def __init__(self, path: str) -> None: + self._path = path + self._old = "" + + def __enter__(self): + self._old = os.getcwd() + os.chdir(self._path) + return self + + def __exit__(self, exc_type, exc, tb): + os.chdir(self._old) + return False + class TestDtParamAsserts(unittest.TestCase): def test_temp_must_be_in_sensible_range(self) -> None: @@ -268,6 +286,71 @@ class TestFanCurveWriteBehavior(unittest.TestCase): self.assertEqual(after_stripped, original.splitlines(keepends=True)) +class TestFanCurveDryRunPatchEquivalence(unittest.TestCase): + def test_dry_run_patch_applies_to_same_result_as_write(self) -> None: + if shutil.which("patch") is None: + self.skipTest("system 'patch' command not available") + + base_cfg = "\n".join( + [ + "# test config", + "[all]", + "dtoverlay=dwc2,dr_mode=host", + "", + ] + ) + "\n" + + tmpcfg = tempfile.TemporaryDirectory() + tmpwork = tempfile.TemporaryDirectory() + try: + cfg_a = os.path.join(tmpcfg.name, "a.txt") + cfg_b = os.path.join(tmpcfg.name, "b.txt") + _write_config(cfg_a, base_cfg) + _write_config(cfg_b, base_cfg) + + # Prepare two FanCurve instances pointing at the two files. + fc_a = FanCurve.__new__(FanCurve) + fc_a.config_path = cfg_a + fc_a.settings = fc_a._get_default_settings() + fc_a._assert_valid_cfg() + + fc_b = FanCurve.__new__(FanCurve) + fc_b.config_path = cfg_b + fc_b.settings = fc_b._get_default_settings() + fc_b._assert_valid_cfg() + + # Non-dry-run: directly writes config. + with redirect_stdout(io.StringIO()): + fc_a._write_cfg(dry_run=False) + + # Dry-run: generates patch in CWD. Apply patch to cfg_b and compare to cfg_a. + with _Cwd(tmpwork.name): + with redirect_stdout(io.StringIO()): + fc_b._write_cfg(dry_run=True) + patch_path = os.path.join(tmpwork.name, "PIFanTUI.patch") + self.assertTrue(os.path.exists(patch_path), "dry-run did not generate PIFanTUI.patch") + + # Patch output includes absolute paths, which GNU patch treats as unsafe. + # Provide the file-to-patch explicitly to avoid filename resolution issues. + subprocess.run( + ["patch", "--batch", cfg_b, "-i", patch_path], + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + + with open(cfg_a, "r", encoding="utf-8") as f: + a = f.read() + with open(cfg_b, "r", encoding="utf-8") as f: + b = f.read() + + self.assertEqual(a, b) + finally: + tmpcfg.cleanup() + tmpwork.cleanup() + + class TestFanCurveGetCurrentSettings(unittest.TestCase): def setUp(self) -> None: self.tmpdir = tempfile.TemporaryDirectory() diff --git a/tui.py b/tui.py new file mode 100644 index 0000000..0d84141 --- /dev/null +++ b/tui.py @@ -0,0 +1,603 @@ +from __future__ import annotations + +import time +import threading +import sys +import select +import os +import termios +from dataclasses import dataclass + +from rich.console import Console +from rich.console import Group +from rich.layout import Layout +from rich.live import Live +from rich.panel import Panel +from rich.table import Table +from rich.text import Text + +@dataclass(frozen=True) +class _LayoutSpec: + top_ratio: int = 60 + bottom_ratio: int = 40 + bottom_left_ratio: int = 65 + bottom_right_ratio: int = 35 + + +class TUI: + """Rich-only TUI that avoids setting any background color. + + This keeps the terminal emulator theme (including alpha transparency) intact. + """ + + def __init__(self, fan_curve) -> None: + self._fan_curve = fan_curve + self._console = Console() + self._spec = _LayoutSpec() + self._stop = threading.Event() + self._selected_bar_idx: int | None = None + self._mode: str = "nav" # "nav" | "edit" + self._notice: str = "" + + def _sorted_settings(self): + return sorted( + getattr(self._fan_curve, "settings", []) or [], + key=lambda s: getattr(s, "temp", 0), + ) + + def _build_layout(self) -> Layout: + root = Layout(name="root") + root.split_column( + Layout(name="status", size=1), + Layout(name="top", ratio=self._spec.top_ratio), + Layout(name="bottom", ratio=self._spec.bottom_ratio), + ) + root["bottom"].split_row( + Layout(name="bottom_left", ratio=self._spec.bottom_left_ratio), + Layout(name="bottom_right", ratio=self._spec.bottom_right_ratio), + ) + return root + + def _settings_table(self, settings, selected_idx: int | None) -> Table: + table = Table(title="Fan curve", expand=True) + table.add_column("N", justify="right", no_wrap=True) + table.add_column("Temp (°C)", justify="right", no_wrap=True) + table.add_column("Hyst (°C)", justify="right", no_wrap=True) + table.add_column("Speed", justify="right", no_wrap=True) + + for i, s in enumerate(settings or []): + # Backend stores milli-Celsius for temp and hyst. + table.add_row( + str(getattr(s, "N", "?")), + f"{getattr(s, 'temp', 0) / 1000:.1f}", + f"{getattr(s, 'hyst', 0) / 1000:.1f}", + str(getattr(s, "speed", "?")), + style=("reverse" if (selected_idx is not None and i == selected_idx) else None), + ) + return table + + def _icefire_rgb(self, t: float) -> tuple[int, int, int]: + """Approximate 'icefire' diverging colormap (0..1 -> RGB). + + We keep a small set of control points and linearly interpolate for a compact, + dependency-free implementation. + """ + tt = 0.0 if t < 0.0 else 1.0 if t > 1.0 else t + stops: list[tuple[float, tuple[int, int, int]]] = [ + (0.00, (0, 7, 26)), # near-black blue + (0.12, (0, 53, 124)), # deep blue + (0.25, (0, 119, 182)), # cyan-blue + (0.38, (82, 181, 214)), # light cyan + (0.50, (245, 245, 245)), # near-white midpoint + (0.62, (250, 199, 120)), # warm light + (0.75, (242, 120, 45)), # orange + (0.88, (189, 45, 38)), # red + (1.00, (64, 0, 8)), # near-black red + ] + + for (a, ca), (b, cb) in zip(stops, stops[1:]): + if tt <= b: + u = 0.0 if b == a else (tt - a) / (b - a) + r = round(ca[0] + (cb[0] - ca[0]) * u) + g = round(ca[1] + (cb[1] - ca[1]) * u) + b2 = round(ca[2] + (cb[2] - ca[2]) * u) + return (int(r), int(g), int(b2)) + return stops[-1][1] + + def _help_text(self) -> Text: + t = Text() + + def _line(parts: list[tuple[str, str]]) -> None: + for s, style in parts: + t.append(s, style=style) + t.append("\n") + + key = "bright_yellow" + key2 = "bright_cyan" + key_save = "bright_green" + key_dry = "bright_blue" + dim = "bright_black" + white = "white" + + _line([("NAV:", f"{white} bold")]) + _line([(" ", white), ("←/→/↑/↓", key2), (" select bar", white)]) + _line([(" ", white), ("Enter", key), (" ", white), ("edit mode", white)]) + _line([(" ", white), ("Ctrl+R", key2), (" reset defaults", white)]) + _line([(" ", white), ("Ctrl+S", key_save), (" save", white)]) + _line([(" ", white), ("Ctrl+D", key_dry), (" save (dry-run diff)", white)]) + _line([("", white)]) + _line([("EDIT:", f"{white} bold")]) + _line([(" ", white), ("←/→", key2), (" temp +/-", white)]) + _line([(" ", white), ("↑/↓", key2), (" speed +/-", white)]) + _line([(" ", white), ("Shift+←/→", key2), (" hyst +/-", white), (" (Left increases)", dim)]) + _line([(" ", white), ("Esc", key), (" ", white), ("exit edit mode", white)]) + _line([("", white)]) + _line([("Ctrl+C", key), (" to exit", white)]) + + # Drop trailing newline. + if t.plain.endswith("\n"): + t = t[:-1] + return t + + def _axes(self, width: int, height: int, settings, selected_idx: int | None) -> Text: + """Draw just X/Y axes for a fan curve plot. + + - X axis: temperature from 10..100 (°C) + - Y axis: fan speed from 0..100 (%) + """ + # Keep at least one blank column on the right to avoid terminal edge clipping. + safe_right = 2 + + total_w = max(14, width) + total_h = max(8, height) + + # Space for Y tick labels (e.g. "100%"). + y_label_w = max(len("100%"), len(" 90%"), len(" 0%")) + 1 # right-aligned + padding + # Two rows below the x-axis: one for tick numbers, one for range endpoints. + x_label_h = 2 + + # Plot area (including the axes themselves). + plot_w = max(6, total_w - y_label_w - safe_right) + plot_h = max(4, total_h - x_label_h) + + # Build a fixed-size character grid (no background styling). + grid = [[" " for _ in range(total_w)] for _ in range(total_h)] + + # Origin at bottom-left of plot area (after y-label margin). + origin_x = y_label_w + origin_y = plot_h - 1 + + # Y axis. + for y in range(0, plot_h): + grid[y][origin_x] = "│" + + # X axis. + x_axis_end = min(total_w - 1 - safe_right, origin_x + plot_w - 1) + for x in range(origin_x, x_axis_end + 1): + grid[origin_y][x] = "─" + + # Corner. + grid[origin_y][origin_x] = "└" + + # Axis end caps (ASCII-safe; avoids width quirks on some terminals/fonts). + grid[0][origin_x] = "^" + grid[origin_y][x_axis_end] = ">" + + # Y range labels (to the left of the y-axis, vertically). + def _put(y: int, x: int, s: str) -> None: + if y < 0 or y >= total_h: + return + for i, ch in enumerate(s): + xx = x + i + if 0 <= xx < total_w: + grid[y][xx] = ch + + # Ticks every 10% on Y axis and every 10C on X axis + labels next to ticks. + plot_span_y = max(1, plot_h - 1) + for pct in range(0, 101, 10): + y = origin_y - round((pct / 100) * plot_span_y) + if 0 <= y < plot_h: + if grid[y][origin_x] not in ("└", "^"): + grid[y][origin_x] = "├" + # Label to the left of the axis, right-aligned. + label = f"{pct:>3}%" + _put(y, max(0, y_label_w - 1 - len(label)), label) + + plot_span_x = max(1, (x_axis_end - origin_x)) + for temp_c in range(10, 101, 10): + frac = (temp_c - 10) / 90 if 90 else 0.0 + x = origin_x + round(frac * plot_span_x) + x = min(x_axis_end, max(origin_x, x)) + # Tick mark on axis + if grid[origin_y][x] not in ("└", ">"): + grid[origin_y][x] = "┬" + + # Numeric label below the axis, centered-ish on tick. + label = str(temp_c) + label_y = origin_y + 1 + if label_y < total_h: + start_x = max(origin_x, min(x_axis_end - len(label) + 1, x - (len(label) // 2))) + _put(label_y, start_x, label) + + # X range endpoint labels on the second row below the axis. + x_range_y = origin_y + 2 + if x_range_y < total_h: + _put(x_range_y, origin_x, "10C") + _put(x_range_y, max(origin_x, x_axis_end - len("100C") + 1), "100C") + + # Plot a filled rectangle per setting: + # x in [temp-hyst, temp], y in [0, speed%]; 255 == 100%. + plot_span_y = max(1, plot_h - 1) + plot_span_x = max(1, x_axis_end - origin_x) + + def _clamp(v: int, lo: int, hi: int) -> int: + return lo if v < lo else hi if v > hi else v + + plotted: list[tuple[int, int, str, bool]] = [] + for i, s in enumerate(settings or []): + try: + temp_c = float(getattr(s, "temp")) / 1000.0 + speed = float(getattr(s, "speed")) + hyst_c = float(getattr(s, "hyst")) / 1000.0 + except Exception: + continue + + # Clamp to axis domain. + temp_c = max(10.0, min(100.0, temp_c)) + speed = max(0.0, min(255.0, speed)) + hyst_c = max(0.0, hyst_c) + pct = (speed / 255.0) * 100.0 + + # Map to plot coordinates. + frac_x = (temp_c - 10.0) / 90.0 if 90.0 else 0.0 + x = origin_x + round(frac_x * plot_span_x) + x = _clamp(x, origin_x, x_axis_end) + + y = origin_y - round((pct / 100.0) * plot_span_y) + y = _clamp(y, 0, origin_y) + + # Rectangle x-span from (temp-hyst) .. temp. + rect_start_c = max(10.0, temp_c - hyst_c) + rect_end_c = temp_c + frac_start = (rect_start_c - 10.0) / 90.0 if 90.0 else 0.0 + frac_end = (rect_end_c - 10.0) / 90.0 if 90.0 else 0.0 + x0 = origin_x + round(frac_start * plot_span_x) + x1 = origin_x + round(frac_end * plot_span_x) + x0 = _clamp(x0, origin_x, x_axis_end) + x1 = _clamp(x1, origin_x, x_axis_end) + if x0 > x1: + x0, x1 = x1, x0 + + # Rectangle y-span from axis baseline (0%) up to the point. + y0 = y + y1 = origin_y - 1 + if y0 > y1: + y0, y1 = y1, y0 + + # Color by fan speed (constant per setting): 0..255 -> 0..1. + speed_norm = speed / 255.0 if 255.0 else 0.0 + r, g, b2 = self._icefire_rgb(speed_norm) + fill_style = f"rgb({r},{g},{b2})" + + # Fill with a foreground-colored block; do not set background. + # Avoid overwriting axes/labels: only fill into blank cells. + for yy in range(y0, y1 + 1): + if not (0 <= yy < origin_y): + continue + for xx in range(x0, x1 + 1): + if xx <= origin_x: + continue + if grid[yy][xx] == " ": + # Use a shaded block to look "semi-transparent" while still + # rendering via foreground color only (no background fill). + grid[yy][xx] = ("▒", fill_style) + + plotted.append((x, y, fill_style, (selected_idx is not None and i == selected_idx))) + + # ASCII-safe point marker (draw last). + for x, y, style, is_selected in plotted: + # Match the rectangle's color (foreground only) so the point "belongs" to it. + # If selected, add a background highlight (point only). + point_style = style + if is_selected: + point_style = f"{style} on rgb(255,255,0)" + grid[y][x] = ("o", point_style) + + # Convert grid into a styled Text. Cells may be a plain char or (char, style). + out = Text() + for row in grid: + for cell in row: + if isinstance(cell, tuple): + ch, style = cell + out.append(ch, style=style) + else: + out.append(cell) + out.append("\n") + # Drop the final newline to avoid Rich adding an extra blank line at the bottom. + if out and out.plain.endswith("\n"): + out = out[:-1] + out.no_wrap = True + return out + + def _render(self, layout: Layout) -> None: + # Status line (top-left): current mode. + mode_label = "EDIT" if self._mode == "edit" else "NAV" + layout["status"].update(Text(f"MODE: {mode_label}", style="bold")) + + # Draw axes in the available inner area (panel takes 2 columns/rows for border). + top_size = layout["top"].size + if top_size is None: + dims = self._console.size + top_w = dims.width + top_h = max(3, int(dims.height * (self._spec.top_ratio / 100))) + else: + top_w = top_size.width + top_h = top_size.height + + # Panel border is 2 cols/rows; plus we add horizontal padding to keep content away + # from terminal edges (helps avoid right-edge clipping). + panel_pad_x = 1 + inner_w = max(3, top_w - 2 - (panel_pad_x * 2)) + inner_h = max(3, top_h - 2) + + settings = self._sorted_settings() + if self._selected_bar_idx is None and settings: + self._selected_bar_idx = 0 + + layout["top"].update( + Panel( + self._axes(inner_w, inner_h, settings, self._selected_bar_idx), + title="Fan curve axes (Temp 10–100°C, Speed 0–100%)", + border_style="yellow", + padding=(0, panel_pad_x), + ) + ) + + layout["bottom_left"].update( + Panel( + Group( + *( + [Text(f"Last action: {self._notice}", style="yellow")] + if self._notice + else [] + ), + self._settings_table(settings, self._selected_bar_idx), + ), + title="Current settings", + border_style="green", + ) + ) + + layout["bottom_right"].update( + Panel( + self._help_text(), + title="Help", + border_style="magenta", + ) + ) + + def _save(self, dry_run: bool) -> None: + """Persist current settings to config (or write a patch when dry_run=True).""" + try: + settings = getattr(self._fan_curve, "settings", []) or [] + settings.sort(key=lambda s: getattr(s, "temp", 0)) + for i, s in enumerate(settings): + setattr(s, "N", i) + # Validate and write using backend implementation. + if hasattr(self._fan_curve, "_assert_valid_cfg"): + self._fan_curve._assert_valid_cfg() + if hasattr(self._fan_curve, "_write_cfg"): + self._fan_curve._write_cfg(dry_run=dry_run) + except Exception as e: + self._notice = f"Save failed: {e}" + return + + cfg_path = getattr(self._fan_curve, "config_path", "/boot/firmware/config.txt") + if dry_run: + patch_file = os.path.join(os.getcwd(), "PIFanTUI.patch") + self._notice = ( + f"Wrote {os.path.basename(patch_file)}. Apply then reboot: " + f"sudo patch {cfg_path} -i {os.path.basename(patch_file)} && sudo reboot" + ) + else: + self._notice = f"Saved. Reboot for settings to take effect: sudo reboot" + + def _on_arrow(self, direction: str) -> None: + """Arrow key handler. + + For now: + - arrows cycle the selected bar (point highlight moves) + """ + settings = self._sorted_settings() + if not settings: + self._selected_bar_idx = None + return + if self._selected_bar_idx is None: + self._selected_bar_idx = 0 + return + + n = len(settings) + if direction in ("left", "up"): + self._selected_bar_idx = (self._selected_bar_idx - 1) % n + elif direction in ("right", "down"): + self._selected_bar_idx = (self._selected_bar_idx + 1) % n + + def _apply_edit(self, direction: str, shift: bool) -> None: + """Edit-mode mutations of the selected setting.""" + settings = self._sorted_settings() + if not settings: + self._selected_bar_idx = None + return + if self._selected_bar_idx is None: + self._selected_bar_idx = 0 + idx = max(0, min(len(settings) - 1, self._selected_bar_idx)) + s = settings[idx] + + # Step sizes (simple + predictable). + TEMP_STEP = 1000 # 1C (milli-C) + HYST_STEP = 1000 # 1C (milli-C) + SPEED_STEP = 5 # PWM units (0..255) + + temp = int(getattr(s, "temp", 10000)) + hyst = int(getattr(s, "hyst", 1000)) + speed = int(getattr(s, "speed", 0)) + + if shift and direction in ("left", "right"): + # In edit mode, Left increases / Right decreases (user preference). + hyst += (HYST_STEP if direction == "left" else -HYST_STEP) + else: + if direction == "left": + temp -= TEMP_STEP + elif direction == "right": + temp += TEMP_STEP + elif direction == "up": + speed += SPEED_STEP + elif direction == "down": + speed -= SPEED_STEP + + # Clamp to backend-validated ranges. + temp = max(10_000, min(99_000, temp)) # DtParam asserts temp//1000 < 100 + speed = max(0, min(255, speed)) + hyst = max(1_000, min(99_000, hyst)) # DtParam asserts hyst//1000 > 0 and < 100 + # Keep hyst from exceeding temp-10C, so temp-hyst stays in chart domain. + hyst = min(hyst, max(1_000, temp - 10_000)) + + setattr(s, "temp", temp) + setattr(s, "hyst", hyst) + setattr(s, "speed", speed) + + def _read_keys(self) -> None: + """Read keys (arrow keys, enter, esc, shift+arrows) without breaking Rich output.""" + fd = sys.stdin.fileno() + old = termios.tcgetattr(fd) + try: + # IMPORTANT: don't use tty.setraw() here — it disables output post-processing + # on the shared TTY (e.g. \n no longer implies CR), which breaks Rich layout + # rendering. We only need non-canonical, no-echo input. + new = termios.tcgetattr(fd) + new[0] &= ~(termios.IXON | termios.IXOFF) # no flow control + new[3] &= ~(termios.ICANON | termios.ECHO | termios.ISIG) # bytes, no echo, no signals + # Non-blocking reads with small timeout so ESC can be handled as "escape" alone. + new[6][termios.VMIN] = 0 + new[6][termios.VTIME] = 1 # 0.1s + termios.tcsetattr(fd, termios.TCSADRAIN, new) + buf = b"" + + def _handle_arrow(final: int, params: bytes) -> None: + direction = {ord("A"): "up", ord("B"): "down", ord("C"): "right", ord("D"): "left"}.get(final) + if direction is None: + return + shift = b";2" in params or params.startswith(b"2;") or b";2;" in params + if self._mode == "edit": + self._apply_edit(direction, shift=shift) + else: + self._on_arrow(direction) + + while not self._stop.is_set(): + try: + chunk = os.read(fd, 64) + except OSError: + chunk = b"" + if not chunk: + continue + buf += chunk + + # Parse as much as possible from the buffer. + while buf: + b0 = buf[0] + if b0 == 0x03: # Ctrl+C + self._stop.set() + return + if b0 == 0x12: # Ctrl+R + # NAV-only: reset the in-memory curve to firmware defaults. + if self._mode == "nav": + getter = getattr(self._fan_curve, "_get_default_settings", None) + if callable(getter): + try: + self._fan_curve.settings = getter() + self._selected_bar_idx = 0 + self._notice = "Reset to defaults" + except Exception: + pass + buf = buf[1:] + continue + if b0 == 0x13: # Ctrl+S + if self._mode == "nav": + self._save(dry_run=False) + buf = buf[1:] + continue + if b0 == 0x04: # Ctrl+D + if self._mode == "nav": + self._save(dry_run=True) + buf = buf[1:] + continue + if b0 in (0x0D, 0x0A): # Enter + self._mode = "edit" if self._mode != "edit" else "nav" + buf = buf[1:] + continue + + if b0 != 0x1B: # not ESC + buf = buf[1:] + continue + + # ESC: could be alone, or CSI sequence. + if len(buf) == 1: + if not select.select([fd], [], [], 0.03)[0]: + if self._mode == "edit": + self._mode = "nav" + buf = b"" + break + + if buf[1:2] != b"[": + # Unknown escape sequence; drop ESC and keep going. + buf = buf[1:] + continue + + # CSI: ESC [ ... final + # Simple arrows: ESC [ A/B/C/D + if len(buf) >= 3 and buf[2] in (ord("A"), ord("B"), ord("C"), ord("D")): + _handle_arrow(buf[2], b"") + buf = buf[3:] + continue + + # Extended CSI: e.g. ESC [ 1 ; 2 C (shift+right) + final_idx = None + for i in range(2, len(buf)): + if buf[i] in (ord("A"), ord("B"), ord("C"), ord("D")): + final_idx = i + break + if i > 16: + break + if final_idx is None: + # Need more bytes. + break + + params = buf[2:final_idx] + final = buf[final_idx] + _handle_arrow(final, params) + buf = buf[final_idx + 1 :] + finally: + termios.tcsetattr(fd, termios.TCSADRAIN, old) + + def run(self, refresh_per_second: float = 10.0) -> None: + layout = self._build_layout() + key_thread = threading.Thread(target=self._read_keys, name="pifantui-keys", daemon=True) + key_thread.start() + try: + # NOTE: `screen=True` uses the alternate screen buffer which some terminals render + # with an opaque background. Keep this False to preserve emulator transparency. + with Live(layout, console=self._console, screen=False, refresh_per_second=refresh_per_second): + while not self._stop.is_set(): + self._render(layout) + time.sleep(0.1) + finally: + self._stop.set() + key_thread.join(timeout=0.2) + +if __name__ == "__main__": + # Minimal standalone preview. + # When run via main.py, we pass the real FanCurve instance. + class _Dummy: + settings = [] + + TUI(_Dummy()).run()