604 lines
24 KiB
Python
604 lines
24 KiB
Python
from __future__ import annotations
|
||
|
||
import time
|
||
import threading
|
||
import sys
|
||
import select
|
||
import os
|
||
import termios
|
||
from dataclasses import dataclass
|
||
|
||
from rich.console import Console
|
||
from rich.console import Group
|
||
from rich.layout import Layout
|
||
from rich.live import Live
|
||
from rich.panel import Panel
|
||
from rich.table import Table
|
||
from rich.text import Text
|
||
|
||
@dataclass(frozen=True)
|
||
class _LayoutSpec:
|
||
top_ratio: int = 60
|
||
bottom_ratio: int = 40
|
||
bottom_left_ratio: int = 65
|
||
bottom_right_ratio: int = 35
|
||
|
||
|
||
class TUI:
|
||
"""Rich-only TUI that avoids setting any background color.
|
||
|
||
This keeps the terminal emulator theme (including alpha transparency) intact.
|
||
"""
|
||
|
||
def __init__(self, fan_curve) -> None:
|
||
self._fan_curve = fan_curve
|
||
self._console = Console()
|
||
self._spec = _LayoutSpec()
|
||
self._stop = threading.Event()
|
||
self._selected_bar_idx: int | None = None
|
||
self._mode: str = "nav" # "nav" | "edit"
|
||
self._notice: str = ""
|
||
|
||
def _sorted_settings(self):
|
||
return sorted(
|
||
getattr(self._fan_curve, "settings", []) or [],
|
||
key=lambda s: getattr(s, "temp", 0),
|
||
)
|
||
|
||
def _build_layout(self) -> Layout:
|
||
root = Layout(name="root")
|
||
root.split_column(
|
||
Layout(name="status", size=1),
|
||
Layout(name="top", ratio=self._spec.top_ratio),
|
||
Layout(name="bottom", ratio=self._spec.bottom_ratio),
|
||
)
|
||
root["bottom"].split_row(
|
||
Layout(name="bottom_left", ratio=self._spec.bottom_left_ratio),
|
||
Layout(name="bottom_right", ratio=self._spec.bottom_right_ratio),
|
||
)
|
||
return root
|
||
|
||
def _settings_table(self, settings, selected_idx: int | None) -> Table:
|
||
table = Table(title="Fan curve", expand=True)
|
||
table.add_column("N", justify="right", no_wrap=True)
|
||
table.add_column("Temp (°C)", justify="right", no_wrap=True)
|
||
table.add_column("Hyst (°C)", justify="right", no_wrap=True)
|
||
table.add_column("Speed", justify="right", no_wrap=True)
|
||
|
||
for i, s in enumerate(settings or []):
|
||
# Backend stores milli-Celsius for temp and hyst.
|
||
table.add_row(
|
||
str(getattr(s, "N", "?")),
|
||
f"{getattr(s, 'temp', 0) / 1000:.1f}",
|
||
f"{getattr(s, 'hyst', 0) / 1000:.1f}",
|
||
str(getattr(s, "speed", "?")),
|
||
style=("reverse" if (selected_idx is not None and i == selected_idx) else None),
|
||
)
|
||
return table
|
||
|
||
def _icefire_rgb(self, t: float) -> tuple[int, int, int]:
|
||
"""Approximate 'icefire' diverging colormap (0..1 -> RGB).
|
||
|
||
We keep a small set of control points and linearly interpolate for a compact,
|
||
dependency-free implementation.
|
||
"""
|
||
tt = 0.0 if t < 0.0 else 1.0 if t > 1.0 else t
|
||
stops: list[tuple[float, tuple[int, int, int]]] = [
|
||
(0.00, (0, 7, 26)), # near-black blue
|
||
(0.12, (0, 53, 124)), # deep blue
|
||
(0.25, (0, 119, 182)), # cyan-blue
|
||
(0.38, (82, 181, 214)), # light cyan
|
||
(0.50, (245, 245, 245)), # near-white midpoint
|
||
(0.62, (250, 199, 120)), # warm light
|
||
(0.75, (242, 120, 45)), # orange
|
||
(0.88, (189, 45, 38)), # red
|
||
(1.00, (64, 0, 8)), # near-black red
|
||
]
|
||
|
||
for (a, ca), (b, cb) in zip(stops, stops[1:]):
|
||
if tt <= b:
|
||
u = 0.0 if b == a else (tt - a) / (b - a)
|
||
r = round(ca[0] + (cb[0] - ca[0]) * u)
|
||
g = round(ca[1] + (cb[1] - ca[1]) * u)
|
||
b2 = round(ca[2] + (cb[2] - ca[2]) * u)
|
||
return (int(r), int(g), int(b2))
|
||
return stops[-1][1]
|
||
|
||
def _help_text(self) -> Text:
|
||
t = Text()
|
||
|
||
def _line(parts: list[tuple[str, str]]) -> None:
|
||
for s, style in parts:
|
||
t.append(s, style=style)
|
||
t.append("\n")
|
||
|
||
key = "bright_yellow"
|
||
key2 = "bright_cyan"
|
||
key_save = "bright_green"
|
||
key_dry = "bright_blue"
|
||
dim = "bright_black"
|
||
white = "white"
|
||
|
||
_line([("NAV:", f"{white} bold")])
|
||
_line([(" ", white), ("←/→/↑/↓", key2), (" select bar", white)])
|
||
_line([(" ", white), ("Enter", key), (" ", white), ("edit mode", white)])
|
||
_line([(" ", white), ("Ctrl+R", key2), (" reset defaults", white)])
|
||
_line([(" ", white), ("Ctrl+S", key_save), (" save", white)])
|
||
_line([(" ", white), ("Ctrl+D", key_dry), (" save (dry-run diff)", white)])
|
||
_line([("", white)])
|
||
_line([("EDIT:", f"{white} bold")])
|
||
_line([(" ", white), ("←/→", key2), (" temp +/-", white)])
|
||
_line([(" ", white), ("↑/↓", key2), (" speed +/-", white)])
|
||
_line([(" ", white), ("Shift+←/→", key2), (" hyst +/-", white), (" (Left increases)", dim)])
|
||
_line([(" ", white), ("Esc", key), (" ", white), ("exit edit mode", white)])
|
||
_line([("", white)])
|
||
_line([("Ctrl+C", key), (" to exit", white)])
|
||
|
||
# Drop trailing newline.
|
||
if t.plain.endswith("\n"):
|
||
t = t[:-1]
|
||
return t
|
||
|
||
def _axes(self, width: int, height: int, settings, selected_idx: int | None) -> Text:
|
||
"""Draw just X/Y axes for a fan curve plot.
|
||
|
||
- X axis: temperature from 10..100 (°C)
|
||
- Y axis: fan speed from 0..100 (%)
|
||
"""
|
||
# Keep at least one blank column on the right to avoid terminal edge clipping.
|
||
safe_right = 2
|
||
|
||
total_w = max(14, width)
|
||
total_h = max(8, height)
|
||
|
||
# Space for Y tick labels (e.g. "100%").
|
||
y_label_w = max(len("100%"), len(" 90%"), len(" 0%")) + 1 # right-aligned + padding
|
||
# Two rows below the x-axis: one for tick numbers, one for range endpoints.
|
||
x_label_h = 2
|
||
|
||
# Plot area (including the axes themselves).
|
||
plot_w = max(6, total_w - y_label_w - safe_right)
|
||
plot_h = max(4, total_h - x_label_h)
|
||
|
||
# Build a fixed-size character grid (no background styling).
|
||
grid = [[" " for _ in range(total_w)] for _ in range(total_h)]
|
||
|
||
# Origin at bottom-left of plot area (after y-label margin).
|
||
origin_x = y_label_w
|
||
origin_y = plot_h - 1
|
||
|
||
# Y axis.
|
||
for y in range(0, plot_h):
|
||
grid[y][origin_x] = "│"
|
||
|
||
# X axis.
|
||
x_axis_end = min(total_w - 1 - safe_right, origin_x + plot_w - 1)
|
||
for x in range(origin_x, x_axis_end + 1):
|
||
grid[origin_y][x] = "─"
|
||
|
||
# Corner.
|
||
grid[origin_y][origin_x] = "└"
|
||
|
||
# Axis end caps (ASCII-safe; avoids width quirks on some terminals/fonts).
|
||
grid[0][origin_x] = "^"
|
||
grid[origin_y][x_axis_end] = ">"
|
||
|
||
# Y range labels (to the left of the y-axis, vertically).
|
||
def _put(y: int, x: int, s: str) -> None:
|
||
if y < 0 or y >= total_h:
|
||
return
|
||
for i, ch in enumerate(s):
|
||
xx = x + i
|
||
if 0 <= xx < total_w:
|
||
grid[y][xx] = ch
|
||
|
||
# Ticks every 10% on Y axis and every 10C on X axis + labels next to ticks.
|
||
plot_span_y = max(1, plot_h - 1)
|
||
for pct in range(0, 101, 10):
|
||
y = origin_y - round((pct / 100) * plot_span_y)
|
||
if 0 <= y < plot_h:
|
||
if grid[y][origin_x] not in ("└", "^"):
|
||
grid[y][origin_x] = "├"
|
||
# Label to the left of the axis, right-aligned.
|
||
label = f"{pct:>3}%"
|
||
_put(y, max(0, y_label_w - 1 - len(label)), label)
|
||
|
||
plot_span_x = max(1, (x_axis_end - origin_x))
|
||
for temp_c in range(10, 101, 10):
|
||
frac = (temp_c - 10) / 90 if 90 else 0.0
|
||
x = origin_x + round(frac * plot_span_x)
|
||
x = min(x_axis_end, max(origin_x, x))
|
||
# Tick mark on axis
|
||
if grid[origin_y][x] not in ("└", ">"):
|
||
grid[origin_y][x] = "┬"
|
||
|
||
# Numeric label below the axis, centered-ish on tick.
|
||
label = str(temp_c)
|
||
label_y = origin_y + 1
|
||
if label_y < total_h:
|
||
start_x = max(origin_x, min(x_axis_end - len(label) + 1, x - (len(label) // 2)))
|
||
_put(label_y, start_x, label)
|
||
|
||
# X range endpoint labels on the second row below the axis.
|
||
x_range_y = origin_y + 2
|
||
if x_range_y < total_h:
|
||
_put(x_range_y, origin_x, "10C")
|
||
_put(x_range_y, max(origin_x, x_axis_end - len("100C") + 1), "100C")
|
||
|
||
# Plot a filled rectangle per setting:
|
||
# x in [temp-hyst, temp], y in [0, speed%]; 255 == 100%.
|
||
plot_span_y = max(1, plot_h - 1)
|
||
plot_span_x = max(1, x_axis_end - origin_x)
|
||
|
||
def _clamp(v: int, lo: int, hi: int) -> int:
|
||
return lo if v < lo else hi if v > hi else v
|
||
|
||
plotted: list[tuple[int, int, str, bool]] = []
|
||
for i, s in enumerate(settings or []):
|
||
try:
|
||
temp_c = float(getattr(s, "temp")) / 1000.0
|
||
speed = float(getattr(s, "speed"))
|
||
hyst_c = float(getattr(s, "hyst")) / 1000.0
|
||
except Exception:
|
||
continue
|
||
|
||
# Clamp to axis domain.
|
||
temp_c = max(10.0, min(100.0, temp_c))
|
||
speed = max(0.0, min(255.0, speed))
|
||
hyst_c = max(0.0, hyst_c)
|
||
pct = (speed / 255.0) * 100.0
|
||
|
||
# Map to plot coordinates.
|
||
frac_x = (temp_c - 10.0) / 90.0 if 90.0 else 0.0
|
||
x = origin_x + round(frac_x * plot_span_x)
|
||
x = _clamp(x, origin_x, x_axis_end)
|
||
|
||
y = origin_y - round((pct / 100.0) * plot_span_y)
|
||
y = _clamp(y, 0, origin_y)
|
||
|
||
# Rectangle x-span from (temp-hyst) .. temp.
|
||
rect_start_c = max(10.0, temp_c - hyst_c)
|
||
rect_end_c = temp_c
|
||
frac_start = (rect_start_c - 10.0) / 90.0 if 90.0 else 0.0
|
||
frac_end = (rect_end_c - 10.0) / 90.0 if 90.0 else 0.0
|
||
x0 = origin_x + round(frac_start * plot_span_x)
|
||
x1 = origin_x + round(frac_end * plot_span_x)
|
||
x0 = _clamp(x0, origin_x, x_axis_end)
|
||
x1 = _clamp(x1, origin_x, x_axis_end)
|
||
if x0 > x1:
|
||
x0, x1 = x1, x0
|
||
|
||
# Rectangle y-span from axis baseline (0%) up to the point.
|
||
y0 = y
|
||
y1 = origin_y - 1
|
||
if y0 > y1:
|
||
y0, y1 = y1, y0
|
||
|
||
# Color by fan speed (constant per setting): 0..255 -> 0..1.
|
||
speed_norm = speed / 255.0 if 255.0 else 0.0
|
||
r, g, b2 = self._icefire_rgb(speed_norm)
|
||
fill_style = f"rgb({r},{g},{b2})"
|
||
|
||
# Fill with a foreground-colored block; do not set background.
|
||
# Avoid overwriting axes/labels: only fill into blank cells.
|
||
for yy in range(y0, y1 + 1):
|
||
if not (0 <= yy < origin_y):
|
||
continue
|
||
for xx in range(x0, x1 + 1):
|
||
if xx <= origin_x:
|
||
continue
|
||
if grid[yy][xx] == " ":
|
||
# Use a shaded block to look "semi-transparent" while still
|
||
# rendering via foreground color only (no background fill).
|
||
grid[yy][xx] = ("▒", fill_style)
|
||
|
||
plotted.append((x, y, fill_style, (selected_idx is not None and i == selected_idx)))
|
||
|
||
# ASCII-safe point marker (draw last).
|
||
for x, y, style, is_selected in plotted:
|
||
# Match the rectangle's color (foreground only) so the point "belongs" to it.
|
||
# If selected, add a background highlight (point only).
|
||
point_style = style
|
||
if is_selected:
|
||
point_style = f"{style} on rgb(255,255,0)"
|
||
grid[y][x] = ("o", point_style)
|
||
|
||
# Convert grid into a styled Text. Cells may be a plain char or (char, style).
|
||
out = Text()
|
||
for row in grid:
|
||
for cell in row:
|
||
if isinstance(cell, tuple):
|
||
ch, style = cell
|
||
out.append(ch, style=style)
|
||
else:
|
||
out.append(cell)
|
||
out.append("\n")
|
||
# Drop the final newline to avoid Rich adding an extra blank line at the bottom.
|
||
if out and out.plain.endswith("\n"):
|
||
out = out[:-1]
|
||
out.no_wrap = True
|
||
return out
|
||
|
||
def _render(self, layout: Layout) -> None:
|
||
# Status line (top-left): current mode.
|
||
mode_label = "EDIT" if self._mode == "edit" else "NAV"
|
||
layout["status"].update(Text(f"MODE: {mode_label}", style="bold"))
|
||
|
||
# Draw axes in the available inner area (panel takes 2 columns/rows for border).
|
||
top_size = layout["top"].size
|
||
if top_size is None:
|
||
dims = self._console.size
|
||
top_w = dims.width
|
||
top_h = max(3, int(dims.height * (self._spec.top_ratio / 100)))
|
||
else:
|
||
top_w = top_size.width
|
||
top_h = top_size.height
|
||
|
||
# Panel border is 2 cols/rows; plus we add horizontal padding to keep content away
|
||
# from terminal edges (helps avoid right-edge clipping).
|
||
panel_pad_x = 1
|
||
inner_w = max(3, top_w - 2 - (panel_pad_x * 2))
|
||
inner_h = max(3, top_h - 2)
|
||
|
||
settings = self._sorted_settings()
|
||
if self._selected_bar_idx is None and settings:
|
||
self._selected_bar_idx = 0
|
||
|
||
layout["top"].update(
|
||
Panel(
|
||
self._axes(inner_w, inner_h, settings, self._selected_bar_idx),
|
||
title="Fan curve axes (Temp 10–100°C, Speed 0–100%)",
|
||
border_style="yellow",
|
||
padding=(0, panel_pad_x),
|
||
)
|
||
)
|
||
|
||
layout["bottom_left"].update(
|
||
Panel(
|
||
Group(
|
||
*(
|
||
[Text(f"Last action: {self._notice}", style="yellow")]
|
||
if self._notice
|
||
else []
|
||
),
|
||
self._settings_table(settings, self._selected_bar_idx),
|
||
),
|
||
title="Current settings",
|
||
border_style="green",
|
||
)
|
||
)
|
||
|
||
layout["bottom_right"].update(
|
||
Panel(
|
||
self._help_text(),
|
||
title="Help",
|
||
border_style="magenta",
|
||
)
|
||
)
|
||
|
||
def _save(self, dry_run: bool) -> None:
|
||
"""Persist current settings to config (or write a patch when dry_run=True)."""
|
||
try:
|
||
settings = getattr(self._fan_curve, "settings", []) or []
|
||
settings.sort(key=lambda s: getattr(s, "temp", 0))
|
||
for i, s in enumerate(settings):
|
||
setattr(s, "N", i)
|
||
# Validate and write using backend implementation.
|
||
if hasattr(self._fan_curve, "_assert_valid_cfg"):
|
||
self._fan_curve._assert_valid_cfg()
|
||
if hasattr(self._fan_curve, "_write_cfg"):
|
||
self._fan_curve._write_cfg(dry_run=dry_run)
|
||
except Exception as e:
|
||
self._notice = f"Save failed: {e}"
|
||
return
|
||
|
||
cfg_path = getattr(self._fan_curve, "config_path", "/boot/firmware/config.txt")
|
||
if dry_run:
|
||
patch_file = os.path.join(os.getcwd(), "PIFanTUI.patch")
|
||
self._notice = (
|
||
f"Wrote {os.path.basename(patch_file)}. Apply then reboot: "
|
||
f"sudo patch {cfg_path} -i {os.path.basename(patch_file)} && sudo reboot"
|
||
)
|
||
else:
|
||
self._notice = f"Saved. Reboot for settings to take effect: sudo reboot"
|
||
|
||
def _on_arrow(self, direction: str) -> None:
|
||
"""Arrow key handler.
|
||
|
||
For now:
|
||
- arrows cycle the selected bar (point highlight moves)
|
||
"""
|
||
settings = self._sorted_settings()
|
||
if not settings:
|
||
self._selected_bar_idx = None
|
||
return
|
||
if self._selected_bar_idx is None:
|
||
self._selected_bar_idx = 0
|
||
return
|
||
|
||
n = len(settings)
|
||
if direction in ("left", "up"):
|
||
self._selected_bar_idx = (self._selected_bar_idx - 1) % n
|
||
elif direction in ("right", "down"):
|
||
self._selected_bar_idx = (self._selected_bar_idx + 1) % n
|
||
|
||
def _apply_edit(self, direction: str, shift: bool) -> None:
|
||
"""Edit-mode mutations of the selected setting."""
|
||
settings = self._sorted_settings()
|
||
if not settings:
|
||
self._selected_bar_idx = None
|
||
return
|
||
if self._selected_bar_idx is None:
|
||
self._selected_bar_idx = 0
|
||
idx = max(0, min(len(settings) - 1, self._selected_bar_idx))
|
||
s = settings[idx]
|
||
|
||
# Step sizes (simple + predictable).
|
||
TEMP_STEP = 1000 # 1C (milli-C)
|
||
HYST_STEP = 1000 # 1C (milli-C)
|
||
SPEED_STEP = 5 # PWM units (0..255)
|
||
|
||
temp = int(getattr(s, "temp", 10000))
|
||
hyst = int(getattr(s, "hyst", 1000))
|
||
speed = int(getattr(s, "speed", 0))
|
||
|
||
if shift and direction in ("left", "right"):
|
||
# In edit mode, Left increases / Right decreases (user preference).
|
||
hyst += (HYST_STEP if direction == "left" else -HYST_STEP)
|
||
else:
|
||
if direction == "left":
|
||
temp -= TEMP_STEP
|
||
elif direction == "right":
|
||
temp += TEMP_STEP
|
||
elif direction == "up":
|
||
speed += SPEED_STEP
|
||
elif direction == "down":
|
||
speed -= SPEED_STEP
|
||
|
||
# Clamp to backend-validated ranges.
|
||
temp = max(10_000, min(99_000, temp)) # DtParam asserts temp//1000 < 100
|
||
speed = max(0, min(255, speed))
|
||
hyst = max(1_000, min(99_000, hyst)) # DtParam asserts hyst//1000 > 0 and < 100
|
||
# Keep hyst from exceeding temp-10C, so temp-hyst stays in chart domain.
|
||
hyst = min(hyst, max(1_000, temp - 10_000))
|
||
|
||
setattr(s, "temp", temp)
|
||
setattr(s, "hyst", hyst)
|
||
setattr(s, "speed", speed)
|
||
|
||
def _read_keys(self) -> None:
|
||
"""Read keys (arrow keys, enter, esc, shift+arrows) without breaking Rich output."""
|
||
fd = sys.stdin.fileno()
|
||
old = termios.tcgetattr(fd)
|
||
try:
|
||
# IMPORTANT: don't use tty.setraw() here — it disables output post-processing
|
||
# on the shared TTY (e.g. \n no longer implies CR), which breaks Rich layout
|
||
# rendering. We only need non-canonical, no-echo input.
|
||
new = termios.tcgetattr(fd)
|
||
new[0] &= ~(termios.IXON | termios.IXOFF) # no flow control
|
||
new[3] &= ~(termios.ICANON | termios.ECHO | termios.ISIG) # bytes, no echo, no signals
|
||
# Non-blocking reads with small timeout so ESC can be handled as "escape" alone.
|
||
new[6][termios.VMIN] = 0
|
||
new[6][termios.VTIME] = 1 # 0.1s
|
||
termios.tcsetattr(fd, termios.TCSADRAIN, new)
|
||
buf = b""
|
||
|
||
def _handle_arrow(final: int, params: bytes) -> None:
|
||
direction = {ord("A"): "up", ord("B"): "down", ord("C"): "right", ord("D"): "left"}.get(final)
|
||
if direction is None:
|
||
return
|
||
shift = b";2" in params or params.startswith(b"2;") or b";2;" in params
|
||
if self._mode == "edit":
|
||
self._apply_edit(direction, shift=shift)
|
||
else:
|
||
self._on_arrow(direction)
|
||
|
||
while not self._stop.is_set():
|
||
try:
|
||
chunk = os.read(fd, 64)
|
||
except OSError:
|
||
chunk = b""
|
||
if not chunk:
|
||
continue
|
||
buf += chunk
|
||
|
||
# Parse as much as possible from the buffer.
|
||
while buf:
|
||
b0 = buf[0]
|
||
if b0 == 0x03: # Ctrl+C
|
||
self._stop.set()
|
||
return
|
||
if b0 == 0x12: # Ctrl+R
|
||
# NAV-only: reset the in-memory curve to firmware defaults.
|
||
if self._mode == "nav":
|
||
getter = getattr(self._fan_curve, "_get_default_settings", None)
|
||
if callable(getter):
|
||
try:
|
||
self._fan_curve.settings = getter()
|
||
self._selected_bar_idx = 0
|
||
self._notice = "Reset to defaults"
|
||
except Exception:
|
||
pass
|
||
buf = buf[1:]
|
||
continue
|
||
if b0 == 0x13: # Ctrl+S
|
||
if self._mode == "nav":
|
||
self._save(dry_run=False)
|
||
buf = buf[1:]
|
||
continue
|
||
if b0 == 0x04: # Ctrl+D
|
||
if self._mode == "nav":
|
||
self._save(dry_run=True)
|
||
buf = buf[1:]
|
||
continue
|
||
if b0 in (0x0D, 0x0A): # Enter
|
||
self._mode = "edit" if self._mode != "edit" else "nav"
|
||
buf = buf[1:]
|
||
continue
|
||
|
||
if b0 != 0x1B: # not ESC
|
||
buf = buf[1:]
|
||
continue
|
||
|
||
# ESC: could be alone, or CSI sequence.
|
||
if len(buf) == 1:
|
||
if not select.select([fd], [], [], 0.03)[0]:
|
||
if self._mode == "edit":
|
||
self._mode = "nav"
|
||
buf = b""
|
||
break
|
||
|
||
if buf[1:2] != b"[":
|
||
# Unknown escape sequence; drop ESC and keep going.
|
||
buf = buf[1:]
|
||
continue
|
||
|
||
# CSI: ESC [ ... final
|
||
# Simple arrows: ESC [ A/B/C/D
|
||
if len(buf) >= 3 and buf[2] in (ord("A"), ord("B"), ord("C"), ord("D")):
|
||
_handle_arrow(buf[2], b"")
|
||
buf = buf[3:]
|
||
continue
|
||
|
||
# Extended CSI: e.g. ESC [ 1 ; 2 C (shift+right)
|
||
final_idx = None
|
||
for i in range(2, len(buf)):
|
||
if buf[i] in (ord("A"), ord("B"), ord("C"), ord("D")):
|
||
final_idx = i
|
||
break
|
||
if i > 16:
|
||
break
|
||
if final_idx is None:
|
||
# Need more bytes.
|
||
break
|
||
|
||
params = buf[2:final_idx]
|
||
final = buf[final_idx]
|
||
_handle_arrow(final, params)
|
||
buf = buf[final_idx + 1 :]
|
||
finally:
|
||
termios.tcsetattr(fd, termios.TCSADRAIN, old)
|
||
|
||
def run(self, refresh_per_second: float = 10.0) -> None:
|
||
layout = self._build_layout()
|
||
key_thread = threading.Thread(target=self._read_keys, name="pifantui-keys", daemon=True)
|
||
key_thread.start()
|
||
try:
|
||
# NOTE: `screen=True` uses the alternate screen buffer which some terminals render
|
||
# with an opaque background. Keep this False to preserve emulator transparency.
|
||
with Live(layout, console=self._console, screen=False, refresh_per_second=refresh_per_second):
|
||
while not self._stop.is_set():
|
||
self._render(layout)
|
||
time.sleep(0.1)
|
||
finally:
|
||
self._stop.set()
|
||
key_thread.join(timeout=0.2)
|
||
|
||
if __name__ == "__main__":
|
||
# Minimal standalone preview.
|
||
# When run via main.py, we pass the real FanCurve instance.
|
||
class _Dummy:
|
||
settings = []
|
||
|
||
TUI(_Dummy()).run()
|