#!/usr/bin/env python3 # -*- coding: iso-8859-1 -*- # GPL v2 — see License.txt. Derived from KeyJ's rebuild_db (Martin Fiedler). import array import fnmatch import getopt import os import random import sys from contextlib import contextmanager from functools import cmp_to_key __title__ = "iPod shuffle Database Builder" # by Sylvain __version__ = "1.0-rc1" KNOWN_PROPS = frozenset( ("filename", "size", "ignore", "type", "shuffle", "reuse", "bookmark") ) DEFAULT_RULES = [ ([("filename", "~", "*.mp3")], {"type": 1, "shuffle": 1, "bookmark": 0}), ([("filename", "~", "*.m4?")], {"type": 2, "shuffle": 1, "bookmark": 0}), ([("filename", "~", "*.m4b")], {"shuffle": 0, "bookmark": 1}), ([("filename", "~", "*.aa")], {"type": 1, "shuffle": 0, "bookmark": 1, "reuse": 1}), # type 4 = WAV to the firmware; KeyJ used shuffle=0 (voice-clips style). Use shuffle=1 # so WAVs join the normal playlist like MP3/M4A. ([("filename", "~", "*.wav")], {"type": 4, "shuffle": 1, "bookmark": 0}), ([("filename", "~", "*.book.???")], {"shuffle": 0, "bookmark": 1}), ([("filename", "~", "*.announce.???")], {"shuffle": 0, "bookmark": 0}), ([("filename", "~", "/recycled/*")], {"ignore": 1}), ] AUDIO_EXT = frozenset((".mp3", ".m4a", ".m4b", ".m4p", ".aa", ".wav")) _SAFE = frozenset("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_") GETOPT_LONG = ( "help", "interactive", "volume=", "nosmart", "nochdir", "nolog", "force", "logfile=", "rename", "ipod-path=", "shuffle-all", "verbose", ) HELP = "Rebuild iPod shuffle database.\n\nMandatory arguments to long options are mandatory for short options too.\n -h, --help display this help text\n -i, --interactive prompt before browsing each directory\n -v, --volume=VOL set playback volume to a value between 0 and 38\n -s, --nosmart do not use smart shuffle\n -n, --nochdir do not change directory to this scripts directory first\n -p, --ipod-path=DIR use DIR as the iPod root (mount point); skip --nochdir behavior\n -l, --nolog do not create a log file\n -f, --force always rebuild database entries, do not re-use old ones\n -L, --logfile set log file name\n --shuffle-all force shuffle=1 on every track (overrides audiobook-style rules)\n --verbose log each track as it is written (path, type, shuffle) like KeyJ\n\nWithout --ipod-path, run from the iPod's root directory (or rely on --nochdir /\nscript location as before). By default, the whole volume is searched for playable\nfiles, unless at least one DIRECTORY is specified." ERR_NO_CTRL = "ERROR: No iPod control directory found!\nPlease make sure that:\n (*) this program's working directory is the iPod's root directory\n (*) the iPod was correctly initialized with iTunes" ERR_NO_SD = "ERROR: Cannot write to the iPod database file (iTunesSD)!\nPlease make sure that:\n (*) you have sufficient permissions to write to the iPod volume\n (*) you are actually using an iPod shuffle, and not some other iPod model :)" def _u24(i: int) -> bytes: if i < 0: i += 0x1000000 return bytes((i & 0xFF, (i >> 8) & 0xFF, (i >> 16) & 0xFF)) def _circ(s, u, sc): return min(abs(s - u), abs(s - u + sc), abs(s - u - sc)) class ShuffleRebuild: def __init__(self) -> None: self.opts = { "volume": None, "interactive": False, "smart": True, "home": True, "logging": True, "reuse": 1, "logfile": "rebuild_db.log.txt", "rename": False, "ipod_path": None, "shuffle_all": False, "verbose": False, } self.rules = list(DEFAULT_RULES) self.domains, self.total_count, self.known_entries = [], 0, {} self._log_fp = self._sd = None self._entry_template = array.array("B") def log(self, *a, end="\n") -> None: print(*a, end=end) if self._log_fp: try: print(*a, end=end, file=self._log_fp) except OSError: pass @contextmanager def log_to_file(self): self._log_fp = None if self.opts["logging"]: try: self._log_fp = open(self.opts["logfile"], "w", encoding="utf-8") except OSError: pass try: yield finally: if self._log_fp: self._log_fp.close() self._log_fp = None def chdir_for_run(self, argv0: str) -> None: ip = self.opts["ipod_path"] if ip is not None: p = os.path.abspath(os.path.expanduser(ip)) if not os.path.isdir(p): print("error: --ipod-path is not a directory: %s" % p, file=sys.stderr) sys.exit(1) try: os.chdir(p) except OSError as e: print( "error: cannot change to directory %s: %s" % (p, e), file=sys.stderr ) sys.exit(1) elif self.opts["home"]: try: os.chdir(os.path.split(argv0)[0]) except OSError: pass def _load_user_rules_file(self) -> None: try: lines = ( open("rebuild_db.rules", "r", encoding="iso-8859-1").read().split("\n") ) except OSError: return for line in lines: p = self._parse_rule_line(line) if p is not None: self.rules.append(p) def _parse_rule_line(self, line: str): line = line.strip() if not line or line[0] == "#": return None try: tmp = line.split(":") ruleset = [x.strip() for x in ":".join(tmp[:-1]).split(",")] actions = dict(map(self._parse_action, tmp[-1].split(","))) if len(ruleset) == 1 and not ruleset[0]: return ([], actions) return (list(map(self._parse_rule, ruleset)), actions) except (ValueError, TypeError, KeyError): self.log("WARNING: rule `%s' is malformed, ignoring" % line) return None def _parse_rule(self, rule: str): positions = [rule.find(sep) for sep in "~=<>"] candidates = [p for p in positions if p > 0] if not candidates: raise ValueError("no operator in rule") sep_pos = min(candidates) sep = rule[sep_pos] prop = rule[:sep_pos].strip() if prop not in KNOWN_PROPS: self.log("WARNING: unknown property `%s'" % prop) return (prop, sep, self._parse_value(rule[sep_pos + 1 :].strip())) def _parse_action(self, action: str): prop, value = [x.strip() for x in action.split("=", 1)] if prop not in KNOWN_PROPS: self.log("WARNING: unknown property `%s'" % prop) return (prop, self._parse_value(value)) @staticmethod def _parse_value(val: str): if len(val) >= 2 and ( (val[0] == "'" and val[-1] == "'") or (val[0] == '"' and val[-1] == '"') ): return val[1:-1] try: return int(val) except ValueError: return val @staticmethod def _match_rule(props: dict, rule) -> bool: try: prop, op, ref = props[rule[0]], rule[1], rule[2] except KeyError: return False if op == "~": return fnmatch.fnmatchcase(prop.lower(), ref.lower()) c = (prop > ref) - (prop < ref) return (op == "=" and c == 0) or (op == ">" and c > 0) or (op == "<" and c < 0) @staticmethod def _filesize(path: str): try: return os.stat(path).st_size except OSError: return None def _rename_safely(self, path: str, name: str) -> str: base, ext = os.path.splitext(name) newname = "".join(c if c in _SAFE else "_" for c in base) if name == newname + ext: return name if os.path.exists("%s/%s%s" % (path, newname, ext)): i = 0 while os.path.exists("%s/%s_%d%s" % (path, newname, i, ext)): i += 1 newname += "_%d" % i newname += ext try: os.rename("%s/%s" % (path, name), "%s/%s" % (path, newname)) except OSError: pass return newname @staticmethod def _make_sort_key(s: str): if not s: return s s = s.lower() for i in range(len(s)): if s[i].isdigit(): break if not s[i].isdigit(): return s for j in range(i, len(s)): if not s[j].isdigit(): break if s[j].isdigit(): j += 1 return (s[:i], int(s[i:j]), ShuffleRebuild._make_sort_key(s[j:])) @staticmethod def _key_repr(x): if isinstance(x, tuple): return "%s%d%s" % (x[0], x[1], ShuffleRebuild._key_repr(x[2])) return x @classmethod def _cmp_sort_key(cls, a, b) -> int: if isinstance(a, tuple) and isinstance(b, tuple): for xa, xb in (a[0], b[0]), (a[1], b[1]): c = (xa > xb) - (xa < xb) if c: return c return cls._cmp_sort_key(a[2], b[2]) ka, kb = cls._key_repr(a), cls._key_repr(b) return (ka > kb) - (ka < kb) @classmethod def _cmp_file_entry(cls, a, b) -> int: c = (a[0] > b[0]) - (a[0] < b[0]) if c: return c c = cls._cmp_sort_key(a[1], b[1]) if c: return c pa, pb = a[2], b[2] return (pa > pb) - (pa < pb) def _file_entry(self, path: str, name: str, prefix: str = ""): if not name or name[0] == ".": return None fullname = "%s/%s" % (path, name) may_rename = not fullname.startswith("./iPod_Control") and self.opts["rename"] try: if os.path.islink(fullname): return None if os.path.isdir(fullname): if may_rename: name = self._rename_safely(path, name) return (0, self._make_sort_key(name), prefix + name) if os.path.splitext(name)[1].lower() in AUDIO_EXT: if may_rename: name = self._rename_safely(path, name) return (1, self._make_sort_key(name), prefix + name) except OSError: pass return None def _browse(self, path: str, interactive: bool) -> None: if path.endswith("/"): path = path[:-1] displaypath = path[1:] or "/" if interactive: while True: try: choice = input("include `%s'? [(Y)es, (N)o, (A)ll] " % displaypath)[ :1 ].lower() except EOFError: raise KeyboardInterrupt if not choice: continue if choice in "at": interactive = False break if choice in "yjos": break if choice in "n": return try: files = [ x for x in (self._file_entry(path, n) for n in os.listdir(path)) if x ] except OSError: return if path == "./iPod_Control/Music": subdirs = [x[2] for x in files if not x[0]] files = [x for x in files if x[0]] for sub in subdirs: sp = "%s/%s" % (path, sub) try: files.extend( x for x in ( self._file_entry(sp, n, sub + "/") for n in os.listdir(sp) ) if x and x[0] ) except OSError: pass files.sort(key=cmp_to_key(self._cmp_file_entry)) nfiles = sum(1 for x in files if x[0]) if nfiles: self.domains.append([]) real = 0 for it in files: fp = "%s/%s" % (path, it[2]) if it[0]: real += self._write_track(fp[1:]) else: self._browse(fp, interactive) self.log( "%s: %d files%s" % (displaypath, real, "" if real == nfiles else " (out of %d)" % nfiles) ) def _write_track(self, filename: str) -> int: props = { "filename": filename, "size": self._filesize(filename[1:]), "ignore": 0, "type": 1, "shuffle": 1, "reuse": self.opts["reuse"], "bookmark": 0, } for ruleset, action in self.rules: if all(self._match_rule(props, r) for r in ruleset): props.update(action) if self.opts["shuffle_all"]: props["shuffle"] = 1 if props["ignore"]: return 0 if self.opts["verbose"]: self.log( "=> %s type=%d shuffle=%d bookmark=%d" % (filename, props["type"], props["shuffle"], props["bookmark"]) ) entry = props["reuse"] and self.known_entries.get(filename) if not entry: self._entry_template[29] = props["type"] fn = filename[:261].encode("latin-1", "replace") part = b"".join(bytes((b, 0)) for b in fn) pad = b"\0" * (525 - 2 * len(fn)) entry = self._entry_template.tobytes() + part + pad self._sd.write( entry[:555] + bytes((props["shuffle"], props["bookmark"])) + entry[557:] ) if props["shuffle"]: self.domains[-1].append(self.total_count) self.total_count += 1 return 1 def _smart_shuffle(self) -> list: try: sc = max(map(len, self.domains)) except ValueError: return [] slices = [[] for _ in range(sc)] fill = [0] * sc for d in range(len(self.domains)): if not self.domains[d]: continue used = [] for n in self.domains[d]: metric = [ min([sc] + [_circ(s, u, sc) for u in used]) for s in range(sc) ] th = (max(metric) + 1) // 2 far = [s for s in range(sc) if metric[s] >= th] th = (min(fill) + max(fill) + 1) // 2 emp = [s for s in range(sc) if fill[s] <= th and s in far] choices = emp or far or list(range(sc)) s = random.choice(choices) slices[s].append((n, d)) fill[s] += 1 used.append(s) seq, last = [], -1 for sl in slices: if not sl: continue random.shuffle(sl) if len(sl) > 2 and sl[0][1] == last: sl.append(sl.pop(0)) seq += [x[0] for x in sl] last = sl[-1][1] return seq def _write_sidecars(self) -> bool: v, n = self.opts["volume"], self.total_count self.log("Setting playback state ...", end=" ") ps = [] try: with open("iPod_Control/iTunes/iTunesPState", "rb") as f: a = array.array("B") a.frombytes(f.read()) ps = a.tolist() except (OSError, EOFError): pass if len(ps) != 21: ps = list(_u24(29)) + [0] * 15 + list(_u24(1)) ps[3:15] = [0] * 6 + [1] + [0] * 5 if v is not None: ps[:3] = list(_u24(v)) ok = True try: with open("iPod_Control/iTunes/iTunesPState", "wb") as f: array.array("B", ps).tofile(f) self.log("OK.") except OSError: self.log("FAILED.") ok = False self.log("Creating statistics file ...", end=" ") try: with open("iPod_Control/iTunes/iTunesStats", "wb") as f: f.write(_u24(n) + b"\0" * 3 + (_u24(18) + b"\xff" * 3 + b"\0" * 12) * n) self.log("OK.") except OSError: self.log("FAILED.") ok = False random.seed() if self.opts["smart"]: self.log("Generating smart shuffle sequence ...", end=" ") seq = self._smart_shuffle() else: self.log("Generating shuffle sequence ...", end=" ") seq = list(range(n)) random.shuffle(seq) # KeyJ: only shuffle=1 tracks join smart-shuffle domains; shuffle=0 (audiobooks, etc.) # is stored per entry. iTunesShuffle may be shorter than n; do not pad with fake indices. shuf_tracks = sum(len(d) for d in self.domains) if n and shuf_tracks < n and not self.opts["shuffle_all"]: self.log( "\nNOTE: %d of %d playable tracks use shuffle=1; the rest have shuffle=0 " "(e.g. *.book.??? / *.m4b rules). Only shuffle=1 tracks appear in the shuffle " "playlist. Use --shuffle-all, rename files, or rebuild_db.rules if that is wrong." % (shuf_tracks, n) ) try: with open("iPod_Control/iTunes/iTunesShuffle", "wb") as f: f.write(b"".join(_u24(x) for x in seq)) self.log("OK.") except OSError: self.log("FAILED.") ok = False return ok def run(self, dirs: list) -> None: self.log("Welcome to %s, version %s" % (__title__, __version__)) self.log() self._load_user_rules_file() if not os.path.isdir("iPod_Control/iTunes"): self.log(ERR_NO_CTRL) sys.exit(1) self._entry_template = array.array("B") sd_read = None try: sd_read = open("iPod_Control/iTunes/iTunesSD", "rb") self._entry_template.fromfile(sd_read, 51) if self.opts["reuse"]: sd_read.seek(18) ent = sd_read.read(558) while len(ent) == 558: self.known_entries[ ent[33::2].split(b"\0", 1)[0].decode("latin-1", "replace") ] = ent ent = sd_read.read(558) except (OSError, EOFError): pass if sd_read: sd_read.close() et = self._entry_template if len(et) == 51: self.log("Using iTunesSD headers from existing database.") if self.known_entries: self.log( "Collected %d entries from existing database." % len(self.known_entries) ) else: del et[18:] if len(et) == 18: self.log("Using iTunesSD main header from existing database.") else: del et[:] self.log("Rebuilding iTunesSD main header from scratch.") et.fromlist([0, 0, 0, 1, 6, 0, 0, 0, 18] + [0] * 9) self.log("Rebuilding iTunesSD entry header from scratch.") et.fromlist([0, 2, 46, 90, 165, 1] + [0] * 20 + [100, 0, 0, 1, 0, 2, 0]) self.log() try: self._sd = open("iPod_Control/iTunes/iTunesSD", "wb") et[:18].tofile(self._sd) except OSError: self.log(ERR_NO_SD) sys.exit(1) del et[:18] self.log("Searching for files on your iPod.") try: if dirs: for d in dirs: self._browse("./" + d, self.opts["interactive"]) else: self._browse(".", self.opts["interactive"]) self.log("%d playable files were found on your iPod." % self.total_count) self.log() self.log("Fixing iTunesSD header.") self._sd.seek(0) self._sd.write( bytes((0, (self.total_count >> 8) & 0xFF, self.total_count & 0xFF)) ) self._sd.close() self._sd = None except OSError: self.log("ERROR: Some strange errors occured while writing iTunesSD.") self.log(" You may have to re-initialize the iPod using iTunes.") sys.exit(1) ok = self._write_sidecars() if ok: self.log() self.log("The iPod shuffle database was rebuilt successfully.") self.log("Have fun listening to your music!") else: self.log() self.log( "WARNING: The main database file was rebuilt successfully, but there were errors\n" " while resetting the other files. However, playback MAY work correctly." ) def _opterr(msg) -> None: print("parse error:", msg) print("use `%s -h' to get help" % sys.argv[0]) sys.exit(1) def parse_argv(argv: list, s: ShuffleRebuild) -> list: try: opts, args = getopt.getopt(argv[1:], "hiv:snlfL:rp:", GETOPT_LONG) except getopt.GetoptError as e: _opterr(e) o = s.opts for opt, arg in opts: if opt in ("-h", "--help"): print("Usage: %s [OPTION]... [DIRECTORY]...\n%s" % (argv[0], HELP)) sys.exit(0) if opt in ("-i", "--interactive"): o["interactive"] = True elif opt in ("-v", "--volume"): try: o["volume"] = int(arg) except ValueError: _opterr("invalid volume") elif opt in ("-s", "--nosmart"): o["smart"] = False elif opt in ("-n", "--nochdir"): o["home"] = False elif opt in ("-p", "--ipod-path"): o["ipod_path"] = arg elif opt in ("-l", "--nolog"): o["logging"] = False elif opt in ("-f", "--force"): o["reuse"] = 0 elif opt in ("-L", "--logfile"): o["logfile"] = arg elif opt in ("-r", "--rename"): o["rename"] = True elif opt == "--shuffle-all": o["shuffle_all"] = True elif opt == "--verbose": o["verbose"] = True return args def cli() -> None: s = ShuffleRebuild() dirs = parse_argv(sys.argv, s) s.chdir_for_run(sys.argv[0]) with s.log_to_file(): try: s.run(dirs) except KeyboardInterrupt: s.log() s.log("You decided to cancel processing. This is OK, but please note that") s.log("the iPod database is now corrupt and the iPod won't play!") if __name__ == "__main__": cli()