from __future__ import annotations import time import threading import sys import select import os import termios from dataclasses import dataclass from rich.console import Console from rich.console import Group from rich.layout import Layout from rich.live import Live from rich.panel import Panel from rich.table import Table from rich.text import Text @dataclass(frozen=True) class _LayoutSpec: top_ratio: int = 60 bottom_ratio: int = 40 bottom_left_ratio: int = 65 bottom_right_ratio: int = 35 class TUI: """Rich-only TUI that avoids setting any background color. This keeps the terminal emulator theme (including alpha transparency) intact. """ def __init__(self, fan_curve) -> None: self._fan_curve = fan_curve self._console = Console() self._spec = _LayoutSpec() self._stop = threading.Event() self._selected_bar_idx: int | None = None self._mode: str = "nav" # "nav" | "edit" self._notice: str = "" def _sorted_settings(self): return sorted( getattr(self._fan_curve, "settings", []) or [], key=lambda s: getattr(s, "temp", 0), ) def _build_layout(self) -> Layout: root = Layout(name="root") root.split_column( Layout(name="status", size=1), Layout(name="top", ratio=self._spec.top_ratio), Layout(name="bottom", ratio=self._spec.bottom_ratio), ) root["bottom"].split_row( Layout(name="bottom_left", ratio=self._spec.bottom_left_ratio), Layout(name="bottom_right", ratio=self._spec.bottom_right_ratio), ) return root def _settings_table(self, settings, selected_idx: int | None) -> Table: table = Table(title="Fan curve", expand=True) table.add_column("N", justify="right", no_wrap=True) table.add_column("Temp (°C)", justify="right", no_wrap=True) table.add_column("Hyst (°C)", justify="right", no_wrap=True) table.add_column("Speed", justify="right", no_wrap=True) for i, s in enumerate(settings or []): # Backend stores milli-Celsius for temp and hyst. table.add_row( str(getattr(s, "N", "?")), f"{getattr(s, 'temp', 0) / 1000:.1f}", f"{getattr(s, 'hyst', 0) / 1000:.1f}", str(getattr(s, "speed", "?")), style=("reverse" if (selected_idx is not None and i == selected_idx) else None), ) return table def _icefire_rgb(self, t: float) -> tuple[int, int, int]: """Approximate 'icefire' diverging colormap (0..1 -> RGB). We keep a small set of control points and linearly interpolate for a compact, dependency-free implementation. """ tt = 0.0 if t < 0.0 else 1.0 if t > 1.0 else t stops: list[tuple[float, tuple[int, int, int]]] = [ (0.00, (0, 7, 26)), # near-black blue (0.12, (0, 53, 124)), # deep blue (0.25, (0, 119, 182)), # cyan-blue (0.38, (82, 181, 214)), # light cyan (0.50, (245, 245, 245)), # near-white midpoint (0.62, (250, 199, 120)), # warm light (0.75, (242, 120, 45)), # orange (0.88, (189, 45, 38)), # red (1.00, (64, 0, 8)), # near-black red ] for (a, ca), (b, cb) in zip(stops, stops[1:]): if tt <= b: u = 0.0 if b == a else (tt - a) / (b - a) r = round(ca[0] + (cb[0] - ca[0]) * u) g = round(ca[1] + (cb[1] - ca[1]) * u) b2 = round(ca[2] + (cb[2] - ca[2]) * u) return (int(r), int(g), int(b2)) return stops[-1][1] def _help_text(self) -> Text: t = Text() def _line(parts: list[tuple[str, str]]) -> None: for s, style in parts: t.append(s, style=style) t.append("\n") key = "bright_yellow" key2 = "bright_cyan" key_save = "bright_green" key_dry = "bright_blue" dim = "bright_black" white = "white" _line([("NAV:", f"{white} bold")]) _line([(" ", white), ("←/→/↑/↓", key2), (" select bar", white)]) _line([(" ", white), ("Enter", key), (" ", white), ("edit mode", white)]) _line([(" ", white), ("Ctrl+R", key2), (" reset defaults", white)]) _line([(" ", white), ("Ctrl+S", key_save), (" save", white)]) _line([(" ", white), ("Ctrl+D", key_dry), (" save (dry-run diff)", white)]) _line([("", white)]) _line([("EDIT:", f"{white} bold")]) _line([(" ", white), ("←/→", key2), (" temp +/-", white)]) _line([(" ", white), ("↑/↓", key2), (" speed +/-", white)]) _line([(" ", white), ("Shift+←/→", key2), (" hyst +/-", white), (" (Left increases)", dim)]) _line([(" ", white), ("Esc", key), (" ", white), ("exit edit mode", white)]) _line([("", white)]) _line([("Ctrl+C", key), (" to exit", white)]) # Drop trailing newline. if t.plain.endswith("\n"): t = t[:-1] return t def _axes(self, width: int, height: int, settings, selected_idx: int | None) -> Text: """Draw just X/Y axes for a fan curve plot. - X axis: temperature from 10..100 (°C) - Y axis: fan speed from 0..100 (%) """ # Keep at least one blank column on the right to avoid terminal edge clipping. safe_right = 2 total_w = max(14, width) total_h = max(8, height) # Space for Y tick labels (e.g. "100%"). y_label_w = max(len("100%"), len(" 90%"), len(" 0%")) + 1 # right-aligned + padding # Two rows below the x-axis: one for tick numbers, one for range endpoints. x_label_h = 2 # Plot area (including the axes themselves). plot_w = max(6, total_w - y_label_w - safe_right) plot_h = max(4, total_h - x_label_h) # Build a fixed-size character grid (no background styling). grid = [[" " for _ in range(total_w)] for _ in range(total_h)] # Origin at bottom-left of plot area (after y-label margin). origin_x = y_label_w origin_y = plot_h - 1 # Y axis. for y in range(0, plot_h): grid[y][origin_x] = "│" # X axis. x_axis_end = min(total_w - 1 - safe_right, origin_x + plot_w - 1) for x in range(origin_x, x_axis_end + 1): grid[origin_y][x] = "─" # Corner. grid[origin_y][origin_x] = "└" # Axis end caps (ASCII-safe; avoids width quirks on some terminals/fonts). grid[0][origin_x] = "^" grid[origin_y][x_axis_end] = ">" # Y range labels (to the left of the y-axis, vertically). def _put(y: int, x: int, s: str) -> None: if y < 0 or y >= total_h: return for i, ch in enumerate(s): xx = x + i if 0 <= xx < total_w: grid[y][xx] = ch # Ticks every 10% on Y axis and every 10C on X axis + labels next to ticks. plot_span_y = max(1, plot_h - 1) for pct in range(0, 101, 10): y = origin_y - round((pct / 100) * plot_span_y) if 0 <= y < plot_h: if grid[y][origin_x] not in ("└", "^"): grid[y][origin_x] = "├" # Label to the left of the axis, right-aligned. label = f"{pct:>3}%" _put(y, max(0, y_label_w - 1 - len(label)), label) plot_span_x = max(1, (x_axis_end - origin_x)) for temp_c in range(10, 101, 10): frac = (temp_c - 10) / 90 if 90 else 0.0 x = origin_x + round(frac * plot_span_x) x = min(x_axis_end, max(origin_x, x)) # Tick mark on axis if grid[origin_y][x] not in ("└", ">"): grid[origin_y][x] = "┬" # Numeric label below the axis, centered-ish on tick. label = str(temp_c) label_y = origin_y + 1 if label_y < total_h: start_x = max(origin_x, min(x_axis_end - len(label) + 1, x - (len(label) // 2))) _put(label_y, start_x, label) # X range endpoint labels on the second row below the axis. x_range_y = origin_y + 2 if x_range_y < total_h: _put(x_range_y, origin_x, "10C") _put(x_range_y, max(origin_x, x_axis_end - len("100C") + 1), "100C") # Plot a filled rectangle per setting: # x in [temp-hyst, temp], y in [0, speed%]; 255 == 100%. plot_span_y = max(1, plot_h - 1) plot_span_x = max(1, x_axis_end - origin_x) def _clamp(v: int, lo: int, hi: int) -> int: return lo if v < lo else hi if v > hi else v plotted: list[tuple[int, int, str, bool]] = [] for i, s in enumerate(settings or []): try: temp_c = float(getattr(s, "temp")) / 1000.0 speed = float(getattr(s, "speed")) hyst_c = float(getattr(s, "hyst")) / 1000.0 except Exception: continue # Clamp to axis domain. temp_c = max(10.0, min(100.0, temp_c)) speed = max(0.0, min(255.0, speed)) hyst_c = max(0.0, hyst_c) pct = (speed / 255.0) * 100.0 # Map to plot coordinates. frac_x = (temp_c - 10.0) / 90.0 if 90.0 else 0.0 x = origin_x + round(frac_x * plot_span_x) x = _clamp(x, origin_x, x_axis_end) y = origin_y - round((pct / 100.0) * plot_span_y) y = _clamp(y, 0, origin_y) # Rectangle x-span from (temp-hyst) .. temp. rect_start_c = max(10.0, temp_c - hyst_c) rect_end_c = temp_c frac_start = (rect_start_c - 10.0) / 90.0 if 90.0 else 0.0 frac_end = (rect_end_c - 10.0) / 90.0 if 90.0 else 0.0 x0 = origin_x + round(frac_start * plot_span_x) x1 = origin_x + round(frac_end * plot_span_x) x0 = _clamp(x0, origin_x, x_axis_end) x1 = _clamp(x1, origin_x, x_axis_end) if x0 > x1: x0, x1 = x1, x0 # Rectangle y-span from axis baseline (0%) up to the point. y0 = y y1 = origin_y - 1 if y0 > y1: y0, y1 = y1, y0 # Color by fan speed (constant per setting): 0..255 -> 0..1. speed_norm = speed / 255.0 if 255.0 else 0.0 r, g, b2 = self._icefire_rgb(speed_norm) fill_style = f"rgb({r},{g},{b2})" # Fill with a foreground-colored block; do not set background. # Avoid overwriting axes/labels: only fill into blank cells. for yy in range(y0, y1 + 1): if not (0 <= yy < origin_y): continue for xx in range(x0, x1 + 1): if xx <= origin_x: continue if grid[yy][xx] == " ": # Use a shaded block to look "semi-transparent" while still # rendering via foreground color only (no background fill). grid[yy][xx] = ("▒", fill_style) plotted.append((x, y, fill_style, (selected_idx is not None and i == selected_idx))) # ASCII-safe point marker (draw last). for x, y, style, is_selected in plotted: # Match the rectangle's color (foreground only) so the point "belongs" to it. # If selected, add a background highlight (point only). point_style = style if is_selected: point_style = f"{style} on rgb(255,255,0)" grid[y][x] = ("o", point_style) # Convert grid into a styled Text. Cells may be a plain char or (char, style). out = Text() for row in grid: for cell in row: if isinstance(cell, tuple): ch, style = cell out.append(ch, style=style) else: out.append(cell) out.append("\n") # Drop the final newline to avoid Rich adding an extra blank line at the bottom. if out and out.plain.endswith("\n"): out = out[:-1] out.no_wrap = True return out def _render(self, layout: Layout) -> None: # Status line (top-left): current mode. mode_label = "EDIT" if self._mode == "edit" else "NAV" layout["status"].update(Text(f"MODE: {mode_label}", style="bold")) # Draw axes in the available inner area (panel takes 2 columns/rows for border). top_size = layout["top"].size if top_size is None: dims = self._console.size top_w = dims.width top_h = max(3, int(dims.height * (self._spec.top_ratio / 100))) else: top_w = top_size.width top_h = top_size.height # Panel border is 2 cols/rows; plus we add horizontal padding to keep content away # from terminal edges (helps avoid right-edge clipping). panel_pad_x = 1 inner_w = max(3, top_w - 2 - (panel_pad_x * 2)) inner_h = max(3, top_h - 2) settings = self._sorted_settings() if self._selected_bar_idx is None and settings: self._selected_bar_idx = 0 layout["top"].update( Panel( self._axes(inner_w, inner_h, settings, self._selected_bar_idx), title="Fan curve axes (Temp 10–100°C, Speed 0–100%)", border_style="yellow", padding=(0, panel_pad_x), ) ) layout["bottom_left"].update( Panel( Group( *( [Text(f"Last action: {self._notice}", style="yellow")] if self._notice else [] ), self._settings_table(settings, self._selected_bar_idx), ), title="Current settings", border_style="green", ) ) layout["bottom_right"].update( Panel( self._help_text(), title="Help", border_style="magenta", ) ) def _save(self, dry_run: bool) -> None: """Persist current settings to config (or write a patch when dry_run=True).""" try: settings = getattr(self._fan_curve, "settings", []) or [] settings.sort(key=lambda s: getattr(s, "temp", 0)) for i, s in enumerate(settings): setattr(s, "N", i) # Validate and write using backend implementation. if hasattr(self._fan_curve, "_assert_valid_cfg"): self._fan_curve._assert_valid_cfg() if hasattr(self._fan_curve, "_write_cfg"): self._fan_curve._write_cfg(dry_run=dry_run) except Exception as e: self._notice = f"Save failed: {e}" return cfg_path = getattr(self._fan_curve, "config_path", "/boot/firmware/config.txt") if dry_run: patch_file = os.path.join(os.getcwd(), "PIFanTUI.patch") self._notice = ( f"Wrote {os.path.basename(patch_file)}. Apply then reboot: " f"sudo patch {cfg_path} -i {os.path.basename(patch_file)} && sudo reboot" ) else: self._notice = f"Saved. Reboot for settings to take effect: sudo reboot" def _on_arrow(self, direction: str) -> None: """Arrow key handler. For now: - arrows cycle the selected bar (point highlight moves) """ settings = self._sorted_settings() if not settings: self._selected_bar_idx = None return if self._selected_bar_idx is None: self._selected_bar_idx = 0 return n = len(settings) if direction in ("left", "up"): self._selected_bar_idx = (self._selected_bar_idx - 1) % n elif direction in ("right", "down"): self._selected_bar_idx = (self._selected_bar_idx + 1) % n def _apply_edit(self, direction: str, shift: bool) -> None: """Edit-mode mutations of the selected setting.""" settings = self._sorted_settings() if not settings: self._selected_bar_idx = None return if self._selected_bar_idx is None: self._selected_bar_idx = 0 idx = max(0, min(len(settings) - 1, self._selected_bar_idx)) s = settings[idx] # Step sizes (simple + predictable). TEMP_STEP = 1000 # 1C (milli-C) HYST_STEP = 1000 # 1C (milli-C) SPEED_STEP = 5 # PWM units (0..255) temp = int(getattr(s, "temp", 10000)) hyst = int(getattr(s, "hyst", 1000)) speed = int(getattr(s, "speed", 0)) if shift and direction in ("left", "right"): # In edit mode, Left increases / Right decreases (user preference). hyst += (HYST_STEP if direction == "left" else -HYST_STEP) else: if direction == "left": temp -= TEMP_STEP elif direction == "right": temp += TEMP_STEP elif direction == "up": speed += SPEED_STEP elif direction == "down": speed -= SPEED_STEP # Clamp to backend-validated ranges. temp = max(10_000, min(99_000, temp)) # DtParam asserts temp//1000 < 100 speed = max(0, min(255, speed)) hyst = max(1_000, min(99_000, hyst)) # DtParam asserts hyst//1000 > 0 and < 100 # Keep hyst from exceeding temp-10C, so temp-hyst stays in chart domain. hyst = min(hyst, max(1_000, temp - 10_000)) setattr(s, "temp", temp) setattr(s, "hyst", hyst) setattr(s, "speed", speed) def _read_keys(self) -> None: """Read keys (arrow keys, enter, esc, shift+arrows) without breaking Rich output.""" fd = sys.stdin.fileno() old = termios.tcgetattr(fd) try: # IMPORTANT: don't use tty.setraw() here — it disables output post-processing # on the shared TTY (e.g. \n no longer implies CR), which breaks Rich layout # rendering. We only need non-canonical, no-echo input. new = termios.tcgetattr(fd) new[0] &= ~(termios.IXON | termios.IXOFF) # no flow control new[3] &= ~(termios.ICANON | termios.ECHO | termios.ISIG) # bytes, no echo, no signals # Non-blocking reads with small timeout so ESC can be handled as "escape" alone. new[6][termios.VMIN] = 0 new[6][termios.VTIME] = 1 # 0.1s termios.tcsetattr(fd, termios.TCSADRAIN, new) buf = b"" def _handle_arrow(final: int, params: bytes) -> None: direction = {ord("A"): "up", ord("B"): "down", ord("C"): "right", ord("D"): "left"}.get(final) if direction is None: return shift = b";2" in params or params.startswith(b"2;") or b";2;" in params if self._mode == "edit": self._apply_edit(direction, shift=shift) else: self._on_arrow(direction) while not self._stop.is_set(): try: chunk = os.read(fd, 64) except OSError: chunk = b"" if not chunk: continue buf += chunk # Parse as much as possible from the buffer. while buf: b0 = buf[0] if b0 == 0x03: # Ctrl+C self._stop.set() return if b0 == 0x12: # Ctrl+R # NAV-only: reset the in-memory curve to firmware defaults. if self._mode == "nav": getter = getattr(self._fan_curve, "_get_default_settings", None) if callable(getter): try: self._fan_curve.settings = getter() self._selected_bar_idx = 0 self._notice = "Reset to defaults" except Exception: pass buf = buf[1:] continue if b0 == 0x13: # Ctrl+S if self._mode == "nav": self._save(dry_run=False) buf = buf[1:] continue if b0 == 0x04: # Ctrl+D if self._mode == "nav": self._save(dry_run=True) buf = buf[1:] continue if b0 in (0x0D, 0x0A): # Enter self._mode = "edit" if self._mode != "edit" else "nav" buf = buf[1:] continue if b0 != 0x1B: # not ESC buf = buf[1:] continue # ESC: could be alone, or CSI sequence. if len(buf) == 1: if not select.select([fd], [], [], 0.03)[0]: if self._mode == "edit": self._mode = "nav" buf = b"" break if buf[1:2] != b"[": # Unknown escape sequence; drop ESC and keep going. buf = buf[1:] continue # CSI: ESC [ ... final # Simple arrows: ESC [ A/B/C/D if len(buf) >= 3 and buf[2] in (ord("A"), ord("B"), ord("C"), ord("D")): _handle_arrow(buf[2], b"") buf = buf[3:] continue # Extended CSI: e.g. ESC [ 1 ; 2 C (shift+right) final_idx = None for i in range(2, len(buf)): if buf[i] in (ord("A"), ord("B"), ord("C"), ord("D")): final_idx = i break if i > 16: break if final_idx is None: # Need more bytes. break params = buf[2:final_idx] final = buf[final_idx] _handle_arrow(final, params) buf = buf[final_idx + 1 :] finally: termios.tcsetattr(fd, termios.TCSADRAIN, old) def run(self, refresh_per_second: float = 10.0) -> None: layout = self._build_layout() key_thread = threading.Thread(target=self._read_keys, name="pifantui-keys", daemon=True) key_thread.start() try: # NOTE: `screen=True` uses the alternate screen buffer which some terminals render # with an opaque background. Keep this False to preserve emulator transparency. with Live(layout, console=self._console, screen=False, refresh_per_second=refresh_per_second): while not self._stop.is_set(): self._render(layout) time.sleep(0.1) finally: self._stop.set() key_thread.join(timeout=0.2) if __name__ == "__main__": # Minimal standalone preview. # When run via main.py, we pass the real FanCurve instance. class _Dummy: settings = [] TUI(_Dummy()).run()