iPodUtils/main.py
2026-04-08 13:56:30 +02:00

570 lines
20 KiB
Python

#!/usr/bin/env python3
# -*- coding: iso-8859-1 -*-
# GPL v2 — see License.txt. Derived from KeyJ's rebuild_db (Martin Fiedler).
import array
import fnmatch
import getopt
import os
import random
import sys
from contextlib import contextmanager
from functools import cmp_to_key
__title__ = "KeyJ's iPod shuffle Database Builder"
__version__ = "1.0-rc1"
KNOWN_PROPS = frozenset(
("filename", "size", "ignore", "type", "shuffle", "reuse", "bookmark")
)
DEFAULT_RULES = [
([("filename", "~", "*.mp3")], {"type": 1, "shuffle": 1, "bookmark": 0}),
([("filename", "~", "*.m4?")], {"type": 2, "shuffle": 1, "bookmark": 0}),
([("filename", "~", "*.m4b")], {"shuffle": 0, "bookmark": 1}),
([("filename", "~", "*.aa")], {"type": 1, "shuffle": 0, "bookmark": 1, "reuse": 1}),
([("filename", "~", "*.wav")], {"type": 4, "shuffle": 0, "bookmark": 0}),
([("filename", "~", "*.book.???")], {"shuffle": 0, "bookmark": 1}),
([("filename", "~", "*.announce.???")], {"shuffle": 0, "bookmark": 0}),
([("filename", "~", "/recycled/*")], {"ignore": 1}),
]
AUDIO_EXT = frozenset((".mp3", ".m4a", ".m4b", ".m4p", ".aa", ".wav"))
_SAFE = frozenset("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_")
GETOPT_LONG = (
"help",
"interactive",
"volume=",
"nosmart",
"nochdir",
"nolog",
"force",
"logfile=",
"rename",
"ipod-path=",
)
HELP = "Rebuild iPod shuffle database.\n\nMandatory arguments to long options are mandatory for short options too.\n -h, --help display this help text\n -i, --interactive prompt before browsing each directory\n -v, --volume=VOL set playback volume to a value between 0 and 38\n -s, --nosmart do not use smart shuffle\n -n, --nochdir do not change directory to this scripts directory first\n -p, --ipod-path=DIR use DIR as the iPod root (mount point); skip --nochdir behavior\n -l, --nolog do not create a log file\n -f, --force always rebuild database entries, do not re-use old ones\n -L, --logfile set log file name\n\nWithout --ipod-path, run from the iPod's root directory (or rely on --nochdir /\nscript location as before). By default, the whole volume is searched for playable\nfiles, unless at least one DIRECTORY is specified."
ERR_NO_CTRL = "ERROR: No iPod control directory found!\nPlease make sure that:\n (*) this program's working directory is the iPod's root directory\n (*) the iPod was correctly initialized with iTunes"
ERR_NO_SD = "ERROR: Cannot write to the iPod database file (iTunesSD)!\nPlease make sure that:\n (*) you have sufficient permissions to write to the iPod volume\n (*) you are actually using an iPod shuffle, and not some other iPod model :)"
def _u24(i: int) -> bytes:
if i < 0:
i += 0x1000000
return bytes((i & 0xFF, (i >> 8) & 0xFF, (i >> 16) & 0xFF))
def _circ(s, u, sc):
return min(abs(s - u), abs(s - u + sc), abs(s - u - sc))
class ShuffleRebuild:
def __init__(self) -> None:
self.opts = {
"volume": None,
"interactive": False,
"smart": True,
"home": True,
"logging": True,
"reuse": 1,
"logfile": "rebuild_db.log.txt",
"rename": False,
"ipod_path": None,
}
self.rules = list(DEFAULT_RULES)
self.domains, self.total_count, self.known_entries = [], 0, {}
self._log_fp = self._sd = None
self._entry_template = array.array("B")
def log(self, *a, end="\n") -> None:
print(*a, end=end)
if self._log_fp:
try:
print(*a, end=end, file=self._log_fp)
except OSError:
pass
@contextmanager
def log_to_file(self):
self._log_fp = None
if self.opts["logging"]:
try:
self._log_fp = open(self.opts["logfile"], "w", encoding="utf-8")
except OSError:
pass
try:
yield
finally:
if self._log_fp:
self._log_fp.close()
self._log_fp = None
def chdir_for_run(self, argv0: str) -> None:
ip = self.opts["ipod_path"]
if ip is not None:
p = os.path.abspath(os.path.expanduser(ip))
if not os.path.isdir(p):
print("error: --ipod-path is not a directory: %s" % p, file=sys.stderr)
sys.exit(1)
try:
os.chdir(p)
except OSError as e:
print(
"error: cannot change to directory %s: %s" % (p, e), file=sys.stderr
)
sys.exit(1)
elif self.opts["home"]:
try:
os.chdir(os.path.split(argv0)[0])
except OSError:
pass
def _load_user_rules_file(self) -> None:
try:
lines = (
open("rebuild_db.rules", "r", encoding="iso-8859-1").read().split("\n")
)
except OSError:
return
for line in lines:
p = self._parse_rule_line(line)
if p is not None:
self.rules.append(p)
def _parse_rule_line(self, line: str):
line = line.strip()
if not line or line[0] == "#":
return None
try:
tmp = line.split(":")
ruleset = [x.strip() for x in ":".join(tmp[:-1]).split(",")]
actions = dict(map(self._parse_action, tmp[-1].split(",")))
if len(ruleset) == 1 and not ruleset[0]:
return ([], actions)
return (list(map(self._parse_rule, ruleset)), actions)
except (ValueError, TypeError, KeyError):
self.log("WARNING: rule `%s' is malformed, ignoring" % line)
return None
def _parse_rule(self, rule: str):
positions = [rule.find(sep) for sep in "~=<>"]
candidates = [p for p in positions if p > 0]
if not candidates:
raise ValueError("no operator in rule")
sep_pos = min(candidates)
sep = rule[sep_pos]
prop = rule[:sep_pos].strip()
if prop not in KNOWN_PROPS:
self.log("WARNING: unknown property `%s'" % prop)
return (prop, sep, self._parse_value(rule[sep_pos + 1 :].strip()))
def _parse_action(self, action: str):
prop, value = [x.strip() for x in action.split("=", 1)]
if prop not in KNOWN_PROPS:
self.log("WARNING: unknown property `%s'" % prop)
return (prop, self._parse_value(value))
@staticmethod
def _parse_value(val: str):
if len(val) >= 2 and (
(val[0] == "'" and val[-1] == "'") or (val[0] == '"' and val[-1] == '"')
):
return val[1:-1]
try:
return int(val)
except ValueError:
return val
@staticmethod
def _match_rule(props: dict, rule) -> bool:
try:
prop, op, ref = props[rule[0]], rule[1], rule[2]
except KeyError:
return False
if op == "~":
return fnmatch.fnmatchcase(prop.lower(), ref.lower())
c = (prop > ref) - (prop < ref)
return (op == "=" and c == 0) or (op == ">" and c > 0) or (op == "<" and c < 0)
@staticmethod
def _filesize(path: str):
try:
return os.stat(path).st_size
except OSError:
return None
def _rename_safely(self, path: str, name: str) -> str:
base, ext = os.path.splitext(name)
newname = "".join(c if c in _SAFE else "_" for c in base)
if name == newname + ext:
return name
if os.path.exists("%s/%s%s" % (path, newname, ext)):
i = 0
while os.path.exists("%s/%s_%d%s" % (path, newname, i, ext)):
i += 1
newname += "_%d" % i
newname += ext
try:
os.rename("%s/%s" % (path, name), "%s/%s" % (path, newname))
except OSError:
pass
return newname
@staticmethod
def _make_sort_key(s: str):
if not s:
return s
s = s.lower()
for i in range(len(s)):
if s[i].isdigit():
break
if not s[i].isdigit():
return s
for j in range(i, len(s)):
if not s[j].isdigit():
break
if s[j].isdigit():
j += 1
return (s[:i], int(s[i:j]), ShuffleRebuild._make_sort_key(s[j:]))
@staticmethod
def _key_repr(x):
if isinstance(x, tuple):
return "%s%d%s" % (x[0], x[1], ShuffleRebuild._key_repr(x[2]))
return x
@classmethod
def _cmp_sort_key(cls, a, b) -> int:
if isinstance(a, tuple) and isinstance(b, tuple):
for xa, xb in (a[0], b[0]), (a[1], b[1]):
c = (xa > xb) - (xa < xb)
if c:
return c
return cls._cmp_sort_key(a[2], b[2])
ka, kb = cls._key_repr(a), cls._key_repr(b)
return (ka > kb) - (ka < kb)
def _file_entry(self, path: str, name: str, prefix: str = ""):
if not name or name[0] == ".":
return None
fullname = "%s/%s" % (path, name)
may_rename = not fullname.startswith("./iPod_Control") and self.opts["rename"]
try:
if os.path.islink(fullname):
return None
if os.path.isdir(fullname):
if may_rename:
name = self._rename_safely(path, name)
return (0, self._make_sort_key(name), prefix + name)
if os.path.splitext(name)[1].lower() in AUDIO_EXT:
if may_rename:
name = self._rename_safely(path, name)
return (1, self._make_sort_key(name), prefix + name)
except OSError:
pass
return None
def _browse(self, path: str, interactive: bool) -> None:
if path.endswith("/"):
path = path[:-1]
displaypath = path[1:] or "/"
if interactive:
while True:
try:
choice = input("include `%s'? [(Y)es, (N)o, (A)ll] " % displaypath)[
:1
].lower()
except EOFError:
raise KeyboardInterrupt
if not choice:
continue
if choice in "at":
interactive = False
break
if choice in "yjos":
break
if choice in "n":
return
try:
files = [
x for x in (self._file_entry(path, n) for n in os.listdir(path)) if x
]
except OSError:
return
if path == "./iPod_Control/Music":
subdirs = [x[2] for x in files if not x[0]]
files = [x for x in files if x[0]]
for sub in subdirs:
sp = "%s/%s" % (path, sub)
try:
files.extend(
x
for x in (
self._file_entry(sp, n, sub + "/") for n in os.listdir(sp)
)
if x and x[0]
)
except OSError:
pass
files.sort(key=cmp_to_key(self._cmp_sort_key))
nfiles = sum(1 for x in files if x[0])
if nfiles:
self.domains.append([])
real = 0
for it in files:
fp = "%s/%s" % (path, it[2])
if it[0]:
real += self._write_track(fp[1:])
else:
self._browse(fp, interactive)
self.log(
"%s: %d files%s"
% (displaypath, real, "" if real == nfiles else " (out of %d)" % nfiles)
)
def _write_track(self, filename: str) -> int:
props = {
"filename": filename,
"size": self._filesize(filename[1:]),
"ignore": 0,
"type": 1,
"shuffle": 1,
"reuse": self.opts["reuse"],
"bookmark": 0,
}
for ruleset, action in self.rules:
if all(self._match_rule(props, r) for r in ruleset):
props.update(action)
if props["ignore"]:
return 0
entry = props["reuse"] and self.known_entries.get(filename)
if not entry:
self._entry_template[29] = props["type"]
fn = filename[:261].encode("latin-1", "replace")
part = b"".join(bytes((b, 0)) for b in fn)
pad = b"\0" * (525 - 2 * len(fn))
entry = self._entry_template.tobytes() + part + pad
self._sd.write(
entry[:555] + bytes((props["shuffle"], props["bookmark"])) + entry[557:]
)
if props["shuffle"]:
self.domains[-1].append(self.total_count)
self.total_count += 1
return 1
def _smart_shuffle(self) -> list:
try:
sc = max(map(len, self.domains))
except ValueError:
return []
slices = [[] for _ in range(sc)]
fill = [0] * sc
for d in range(len(self.domains)):
if not self.domains[d]:
continue
used = []
for n in self.domains[d]:
metric = [
min([sc] + [_circ(s, u, sc) for u in used]) for s in range(sc)
]
th = (max(metric) + 1) // 2
far = [s for s in range(sc) if metric[s] >= th]
th = (min(fill) + max(fill) + 1) // 2
emp = [s for s in range(sc) if fill[s] <= th and s in far]
s = random.choice(emp or far)
slices[s].append((n, d))
fill[s] += 1
used.append(s)
seq, last = [], -1
for sl in slices:
random.shuffle(sl)
if len(sl) > 2 and sl[0][1] == last:
sl.append(sl.pop(0))
seq += [x[0] for x in sl]
last = sl[-1][1]
return seq
def _write_sidecars(self) -> bool:
v, n = self.opts["volume"], self.total_count
self.log("Setting playback state ...", end=" ")
ps = []
try:
with open("iPod_Control/iTunes/iTunesPState", "rb") as f:
a = array.array("B")
a.frombytes(f.read())
ps = a.tolist()
except OSError, EOFError:
pass
if len(ps) != 21:
ps = list(_u24(29)) + [0] * 15 + list(_u24(1))
ps[3:15] = [0] * 6 + [1] + [0] * 5
if v is not None:
ps[:3] = list(_u24(v))
ok = True
try:
with open("iPod_Control/iTunes/iTunesPState", "wb") as f:
array.array("B", ps).tofile(f)
self.log("OK.")
except OSError:
self.log("FAILED.")
ok = False
self.log("Creating statistics file ...", end=" ")
try:
with open("iPod_Control/iTunes/iTunesStats", "wb") as f:
f.write(_u24(n) + b"\0" * 3 + (_u24(18) + b"\xff" * 3 + b"\0" * 12) * n)
self.log("OK.")
except OSError:
self.log("FAILED.")
ok = False
random.seed()
if self.opts["smart"]:
self.log("Generating smart shuffle sequence ...", end=" ")
seq = self._smart_shuffle()
else:
self.log("Generating shuffle sequence ...", end=" ")
seq = list(range(n))
random.shuffle(seq)
try:
with open("iPod_Control/iTunes/iTunesShuffle", "wb") as f:
f.write(b"".join(_u24(x) for x in seq))
self.log("OK.")
except OSError:
self.log("FAILED.")
ok = False
return ok
def run(self, dirs: list) -> None:
self.log("Welcome to %s, version %s" % (__title__, __version__))
self.log()
self._load_user_rules_file()
if not os.path.isdir("iPod_Control/iTunes"):
self.log(ERR_NO_CTRL)
sys.exit(1)
self._entry_template = array.array("B")
sd_read = None
try:
sd_read = open("iPod_Control/iTunes/iTunesSD", "rb")
self._entry_template.fromfile(sd_read, 51)
if self.opts["reuse"]:
sd_read.seek(18)
ent = sd_read.read(558)
while len(ent) == 558:
self.known_entries[
ent[33::2].split(b"\0", 1)[0].decode("latin-1", "replace")
] = ent
ent = sd_read.read(558)
except OSError, EOFError:
pass
if sd_read:
sd_read.close()
et = self._entry_template
if len(et) == 51:
self.log("Using iTunesSD headers from existing database.")
if self.known_entries:
self.log(
"Collected %d entries from existing database."
% len(self.known_entries)
)
else:
del et[18:]
if len(et) == 18:
self.log("Using iTunesSD main header from existing database.")
else:
del et[:]
self.log("Rebuilding iTunesSD main header from scratch.")
et.fromlist([0, 0, 0, 1, 6, 0, 0, 0, 18] + [0] * 9)
self.log("Rebuilding iTunesSD entry header from scratch.")
et.fromlist([0, 2, 46, 90, 165, 1] + [0] * 20 + [100, 0, 0, 1, 0, 2, 0])
self.log()
try:
self._sd = open("iPod_Control/iTunes/iTunesSD", "wb")
et[:18].tofile(self._sd)
except OSError:
self.log(ERR_NO_SD)
sys.exit(1)
del et[:18]
self.log("Searching for files on your iPod.")
try:
if dirs:
for d in dirs:
self._browse("./" + d, self.opts["interactive"])
else:
self._browse(".", self.opts["interactive"])
self.log("%d playable files were found on your iPod." % self.total_count)
self.log()
self.log("Fixing iTunesSD header.")
self._sd.seek(0)
self._sd.write(
bytes((0, (self.total_count >> 8) & 0xFF, self.total_count & 0xFF))
)
self._sd.close()
self._sd = None
except OSError:
self.log("ERROR: Some strange errors occured while writing iTunesSD.")
self.log(" You may have to re-initialize the iPod using iTunes.")
sys.exit(1)
ok = self._write_sidecars()
if ok:
self.log()
self.log("The iPod shuffle database was rebuilt successfully.")
self.log("Have fun listening to your music!")
else:
self.log()
self.log(
"WARNING: The main database file was rebuilt successfully, but there were errors\n"
" while resetting the other files. However, playback MAY work correctly."
)
def _opterr(msg) -> None:
print("parse error:", msg)
print("use `%s -h' to get help" % sys.argv[0])
sys.exit(1)
def parse_argv(argv: list, s: ShuffleRebuild) -> list:
try:
opts, args = getopt.getopt(argv[1:], "hiv:snlfL:rp:", GETOPT_LONG)
except getopt.GetoptError as e:
_opterr(e)
o = s.opts
for opt, arg in opts:
if opt in ("-h", "--help"):
print("Usage: %s [OPTION]... [DIRECTORY]...\n%s" % (argv[0], HELP))
sys.exit(0)
if opt in ("-i", "--interactive"):
o["interactive"] = True
elif opt in ("-v", "--volume"):
try:
o["volume"] = int(arg)
except ValueError:
_opterr("invalid volume")
elif opt in ("-s", "--nosmart"):
o["smart"] = False
elif opt in ("-n", "--nochdir"):
o["home"] = False
elif opt in ("-p", "--ipod-path"):
o["ipod_path"] = arg
elif opt in ("-l", "--nolog"):
o["logging"] = False
elif opt in ("-f", "--force"):
o["reuse"] = 0
elif opt in ("-L", "--logfile"):
o["logfile"] = arg
elif opt in ("-r", "--rename"):
o["rename"] = True
return args
def cli() -> None:
s = ShuffleRebuild()
dirs = parse_argv(sys.argv, s)
s.chdir_for_run(sys.argv[0])
with s.log_to_file():
try:
s.run(dirs)
except KeyboardInterrupt:
s.log()
s.log("You decided to cancel processing. This is OK, but please note that")
s.log("the iPod database is now corrupt and the iPod won't play!")
if __name__ == "__main__":
cli()