570 lines
20 KiB
Python
570 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: iso-8859-1 -*-
|
|
# GPL v2 — see License.txt. Derived from KeyJ's rebuild_db (Martin Fiedler).
|
|
import array
|
|
import fnmatch
|
|
import getopt
|
|
import os
|
|
import random
|
|
import sys
|
|
from contextlib import contextmanager
|
|
from functools import cmp_to_key
|
|
|
|
__title__ = "KeyJ's iPod shuffle Database Builder"
|
|
__version__ = "1.0-rc1"
|
|
KNOWN_PROPS = frozenset(
|
|
("filename", "size", "ignore", "type", "shuffle", "reuse", "bookmark")
|
|
)
|
|
DEFAULT_RULES = [
|
|
([("filename", "~", "*.mp3")], {"type": 1, "shuffle": 1, "bookmark": 0}),
|
|
([("filename", "~", "*.m4?")], {"type": 2, "shuffle": 1, "bookmark": 0}),
|
|
([("filename", "~", "*.m4b")], {"shuffle": 0, "bookmark": 1}),
|
|
([("filename", "~", "*.aa")], {"type": 1, "shuffle": 0, "bookmark": 1, "reuse": 1}),
|
|
([("filename", "~", "*.wav")], {"type": 4, "shuffle": 0, "bookmark": 0}),
|
|
([("filename", "~", "*.book.???")], {"shuffle": 0, "bookmark": 1}),
|
|
([("filename", "~", "*.announce.???")], {"shuffle": 0, "bookmark": 0}),
|
|
([("filename", "~", "/recycled/*")], {"ignore": 1}),
|
|
]
|
|
AUDIO_EXT = frozenset((".mp3", ".m4a", ".m4b", ".m4p", ".aa", ".wav"))
|
|
_SAFE = frozenset("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_")
|
|
GETOPT_LONG = (
|
|
"help",
|
|
"interactive",
|
|
"volume=",
|
|
"nosmart",
|
|
"nochdir",
|
|
"nolog",
|
|
"force",
|
|
"logfile=",
|
|
"rename",
|
|
"ipod-path=",
|
|
)
|
|
HELP = "Rebuild iPod shuffle database.\n\nMandatory arguments to long options are mandatory for short options too.\n -h, --help display this help text\n -i, --interactive prompt before browsing each directory\n -v, --volume=VOL set playback volume to a value between 0 and 38\n -s, --nosmart do not use smart shuffle\n -n, --nochdir do not change directory to this scripts directory first\n -p, --ipod-path=DIR use DIR as the iPod root (mount point); skip --nochdir behavior\n -l, --nolog do not create a log file\n -f, --force always rebuild database entries, do not re-use old ones\n -L, --logfile set log file name\n\nWithout --ipod-path, run from the iPod's root directory (or rely on --nochdir /\nscript location as before). By default, the whole volume is searched for playable\nfiles, unless at least one DIRECTORY is specified."
|
|
ERR_NO_CTRL = "ERROR: No iPod control directory found!\nPlease make sure that:\n (*) this program's working directory is the iPod's root directory\n (*) the iPod was correctly initialized with iTunes"
|
|
ERR_NO_SD = "ERROR: Cannot write to the iPod database file (iTunesSD)!\nPlease make sure that:\n (*) you have sufficient permissions to write to the iPod volume\n (*) you are actually using an iPod shuffle, and not some other iPod model :)"
|
|
|
|
|
|
def _u24(i: int) -> bytes:
|
|
if i < 0:
|
|
i += 0x1000000
|
|
return bytes((i & 0xFF, (i >> 8) & 0xFF, (i >> 16) & 0xFF))
|
|
|
|
|
|
def _circ(s, u, sc):
|
|
return min(abs(s - u), abs(s - u + sc), abs(s - u - sc))
|
|
|
|
|
|
class ShuffleRebuild:
|
|
def __init__(self) -> None:
|
|
self.opts = {
|
|
"volume": None,
|
|
"interactive": False,
|
|
"smart": True,
|
|
"home": True,
|
|
"logging": True,
|
|
"reuse": 1,
|
|
"logfile": "rebuild_db.log.txt",
|
|
"rename": False,
|
|
"ipod_path": None,
|
|
}
|
|
self.rules = list(DEFAULT_RULES)
|
|
self.domains, self.total_count, self.known_entries = [], 0, {}
|
|
self._log_fp = self._sd = None
|
|
self._entry_template = array.array("B")
|
|
|
|
def log(self, *a, end="\n") -> None:
|
|
print(*a, end=end)
|
|
if self._log_fp:
|
|
try:
|
|
print(*a, end=end, file=self._log_fp)
|
|
except OSError:
|
|
pass
|
|
|
|
@contextmanager
|
|
def log_to_file(self):
|
|
self._log_fp = None
|
|
if self.opts["logging"]:
|
|
try:
|
|
self._log_fp = open(self.opts["logfile"], "w", encoding="utf-8")
|
|
except OSError:
|
|
pass
|
|
try:
|
|
yield
|
|
finally:
|
|
if self._log_fp:
|
|
self._log_fp.close()
|
|
self._log_fp = None
|
|
|
|
def chdir_for_run(self, argv0: str) -> None:
|
|
ip = self.opts["ipod_path"]
|
|
if ip is not None:
|
|
p = os.path.abspath(os.path.expanduser(ip))
|
|
if not os.path.isdir(p):
|
|
print("error: --ipod-path is not a directory: %s" % p, file=sys.stderr)
|
|
sys.exit(1)
|
|
try:
|
|
os.chdir(p)
|
|
except OSError as e:
|
|
print(
|
|
"error: cannot change to directory %s: %s" % (p, e), file=sys.stderr
|
|
)
|
|
sys.exit(1)
|
|
elif self.opts["home"]:
|
|
try:
|
|
os.chdir(os.path.split(argv0)[0])
|
|
except OSError:
|
|
pass
|
|
|
|
def _load_user_rules_file(self) -> None:
|
|
try:
|
|
lines = (
|
|
open("rebuild_db.rules", "r", encoding="iso-8859-1").read().split("\n")
|
|
)
|
|
except OSError:
|
|
return
|
|
for line in lines:
|
|
p = self._parse_rule_line(line)
|
|
if p is not None:
|
|
self.rules.append(p)
|
|
|
|
def _parse_rule_line(self, line: str):
|
|
line = line.strip()
|
|
if not line or line[0] == "#":
|
|
return None
|
|
try:
|
|
tmp = line.split(":")
|
|
ruleset = [x.strip() for x in ":".join(tmp[:-1]).split(",")]
|
|
actions = dict(map(self._parse_action, tmp[-1].split(",")))
|
|
if len(ruleset) == 1 and not ruleset[0]:
|
|
return ([], actions)
|
|
return (list(map(self._parse_rule, ruleset)), actions)
|
|
except (ValueError, TypeError, KeyError):
|
|
self.log("WARNING: rule `%s' is malformed, ignoring" % line)
|
|
return None
|
|
|
|
def _parse_rule(self, rule: str):
|
|
positions = [rule.find(sep) for sep in "~=<>"]
|
|
candidates = [p for p in positions if p > 0]
|
|
if not candidates:
|
|
raise ValueError("no operator in rule")
|
|
sep_pos = min(candidates)
|
|
sep = rule[sep_pos]
|
|
prop = rule[:sep_pos].strip()
|
|
if prop not in KNOWN_PROPS:
|
|
self.log("WARNING: unknown property `%s'" % prop)
|
|
return (prop, sep, self._parse_value(rule[sep_pos + 1 :].strip()))
|
|
|
|
def _parse_action(self, action: str):
|
|
prop, value = [x.strip() for x in action.split("=", 1)]
|
|
if prop not in KNOWN_PROPS:
|
|
self.log("WARNING: unknown property `%s'" % prop)
|
|
return (prop, self._parse_value(value))
|
|
|
|
@staticmethod
|
|
def _parse_value(val: str):
|
|
if len(val) >= 2 and (
|
|
(val[0] == "'" and val[-1] == "'") or (val[0] == '"' and val[-1] == '"')
|
|
):
|
|
return val[1:-1]
|
|
try:
|
|
return int(val)
|
|
except ValueError:
|
|
return val
|
|
|
|
@staticmethod
|
|
def _match_rule(props: dict, rule) -> bool:
|
|
try:
|
|
prop, op, ref = props[rule[0]], rule[1], rule[2]
|
|
except KeyError:
|
|
return False
|
|
if op == "~":
|
|
return fnmatch.fnmatchcase(prop.lower(), ref.lower())
|
|
c = (prop > ref) - (prop < ref)
|
|
return (op == "=" and c == 0) or (op == ">" and c > 0) or (op == "<" and c < 0)
|
|
|
|
@staticmethod
|
|
def _filesize(path: str):
|
|
try:
|
|
return os.stat(path).st_size
|
|
except OSError:
|
|
return None
|
|
|
|
def _rename_safely(self, path: str, name: str) -> str:
|
|
base, ext = os.path.splitext(name)
|
|
newname = "".join(c if c in _SAFE else "_" for c in base)
|
|
if name == newname + ext:
|
|
return name
|
|
if os.path.exists("%s/%s%s" % (path, newname, ext)):
|
|
i = 0
|
|
while os.path.exists("%s/%s_%d%s" % (path, newname, i, ext)):
|
|
i += 1
|
|
newname += "_%d" % i
|
|
newname += ext
|
|
try:
|
|
os.rename("%s/%s" % (path, name), "%s/%s" % (path, newname))
|
|
except OSError:
|
|
pass
|
|
return newname
|
|
|
|
@staticmethod
|
|
def _make_sort_key(s: str):
|
|
if not s:
|
|
return s
|
|
s = s.lower()
|
|
for i in range(len(s)):
|
|
if s[i].isdigit():
|
|
break
|
|
if not s[i].isdigit():
|
|
return s
|
|
for j in range(i, len(s)):
|
|
if not s[j].isdigit():
|
|
break
|
|
if s[j].isdigit():
|
|
j += 1
|
|
return (s[:i], int(s[i:j]), ShuffleRebuild._make_sort_key(s[j:]))
|
|
|
|
@staticmethod
|
|
def _key_repr(x):
|
|
if isinstance(x, tuple):
|
|
return "%s%d%s" % (x[0], x[1], ShuffleRebuild._key_repr(x[2]))
|
|
return x
|
|
|
|
@classmethod
|
|
def _cmp_sort_key(cls, a, b) -> int:
|
|
if isinstance(a, tuple) and isinstance(b, tuple):
|
|
for xa, xb in (a[0], b[0]), (a[1], b[1]):
|
|
c = (xa > xb) - (xa < xb)
|
|
if c:
|
|
return c
|
|
return cls._cmp_sort_key(a[2], b[2])
|
|
ka, kb = cls._key_repr(a), cls._key_repr(b)
|
|
return (ka > kb) - (ka < kb)
|
|
|
|
def _file_entry(self, path: str, name: str, prefix: str = ""):
|
|
if not name or name[0] == ".":
|
|
return None
|
|
fullname = "%s/%s" % (path, name)
|
|
may_rename = not fullname.startswith("./iPod_Control") and self.opts["rename"]
|
|
try:
|
|
if os.path.islink(fullname):
|
|
return None
|
|
if os.path.isdir(fullname):
|
|
if may_rename:
|
|
name = self._rename_safely(path, name)
|
|
return (0, self._make_sort_key(name), prefix + name)
|
|
if os.path.splitext(name)[1].lower() in AUDIO_EXT:
|
|
if may_rename:
|
|
name = self._rename_safely(path, name)
|
|
return (1, self._make_sort_key(name), prefix + name)
|
|
except OSError:
|
|
pass
|
|
return None
|
|
|
|
def _browse(self, path: str, interactive: bool) -> None:
|
|
if path.endswith("/"):
|
|
path = path[:-1]
|
|
displaypath = path[1:] or "/"
|
|
if interactive:
|
|
while True:
|
|
try:
|
|
choice = input("include `%s'? [(Y)es, (N)o, (A)ll] " % displaypath)[
|
|
:1
|
|
].lower()
|
|
except EOFError:
|
|
raise KeyboardInterrupt
|
|
if not choice:
|
|
continue
|
|
if choice in "at":
|
|
interactive = False
|
|
break
|
|
if choice in "yjos":
|
|
break
|
|
if choice in "n":
|
|
return
|
|
try:
|
|
files = [
|
|
x for x in (self._file_entry(path, n) for n in os.listdir(path)) if x
|
|
]
|
|
except OSError:
|
|
return
|
|
if path == "./iPod_Control/Music":
|
|
subdirs = [x[2] for x in files if not x[0]]
|
|
files = [x for x in files if x[0]]
|
|
for sub in subdirs:
|
|
sp = "%s/%s" % (path, sub)
|
|
try:
|
|
files.extend(
|
|
x
|
|
for x in (
|
|
self._file_entry(sp, n, sub + "/") for n in os.listdir(sp)
|
|
)
|
|
if x and x[0]
|
|
)
|
|
except OSError:
|
|
pass
|
|
files.sort(key=cmp_to_key(self._cmp_sort_key))
|
|
nfiles = sum(1 for x in files if x[0])
|
|
if nfiles:
|
|
self.domains.append([])
|
|
real = 0
|
|
for it in files:
|
|
fp = "%s/%s" % (path, it[2])
|
|
if it[0]:
|
|
real += self._write_track(fp[1:])
|
|
else:
|
|
self._browse(fp, interactive)
|
|
self.log(
|
|
"%s: %d files%s"
|
|
% (displaypath, real, "" if real == nfiles else " (out of %d)" % nfiles)
|
|
)
|
|
|
|
def _write_track(self, filename: str) -> int:
|
|
props = {
|
|
"filename": filename,
|
|
"size": self._filesize(filename[1:]),
|
|
"ignore": 0,
|
|
"type": 1,
|
|
"shuffle": 1,
|
|
"reuse": self.opts["reuse"],
|
|
"bookmark": 0,
|
|
}
|
|
for ruleset, action in self.rules:
|
|
if all(self._match_rule(props, r) for r in ruleset):
|
|
props.update(action)
|
|
if props["ignore"]:
|
|
return 0
|
|
entry = props["reuse"] and self.known_entries.get(filename)
|
|
if not entry:
|
|
self._entry_template[29] = props["type"]
|
|
fn = filename[:261].encode("latin-1", "replace")
|
|
part = b"".join(bytes((b, 0)) for b in fn)
|
|
pad = b"\0" * (525 - 2 * len(fn))
|
|
entry = self._entry_template.tobytes() + part + pad
|
|
self._sd.write(
|
|
entry[:555] + bytes((props["shuffle"], props["bookmark"])) + entry[557:]
|
|
)
|
|
if props["shuffle"]:
|
|
self.domains[-1].append(self.total_count)
|
|
self.total_count += 1
|
|
return 1
|
|
|
|
def _smart_shuffle(self) -> list:
|
|
try:
|
|
sc = max(map(len, self.domains))
|
|
except ValueError:
|
|
return []
|
|
slices = [[] for _ in range(sc)]
|
|
fill = [0] * sc
|
|
for d in range(len(self.domains)):
|
|
if not self.domains[d]:
|
|
continue
|
|
used = []
|
|
for n in self.domains[d]:
|
|
metric = [
|
|
min([sc] + [_circ(s, u, sc) for u in used]) for s in range(sc)
|
|
]
|
|
th = (max(metric) + 1) // 2
|
|
far = [s for s in range(sc) if metric[s] >= th]
|
|
th = (min(fill) + max(fill) + 1) // 2
|
|
emp = [s for s in range(sc) if fill[s] <= th and s in far]
|
|
s = random.choice(emp or far)
|
|
slices[s].append((n, d))
|
|
fill[s] += 1
|
|
used.append(s)
|
|
seq, last = [], -1
|
|
for sl in slices:
|
|
random.shuffle(sl)
|
|
if len(sl) > 2 and sl[0][1] == last:
|
|
sl.append(sl.pop(0))
|
|
seq += [x[0] for x in sl]
|
|
last = sl[-1][1]
|
|
return seq
|
|
|
|
def _write_sidecars(self) -> bool:
|
|
v, n = self.opts["volume"], self.total_count
|
|
self.log("Setting playback state ...", end=" ")
|
|
ps = []
|
|
try:
|
|
with open("iPod_Control/iTunes/iTunesPState", "rb") as f:
|
|
a = array.array("B")
|
|
a.frombytes(f.read())
|
|
ps = a.tolist()
|
|
except OSError, EOFError:
|
|
pass
|
|
if len(ps) != 21:
|
|
ps = list(_u24(29)) + [0] * 15 + list(_u24(1))
|
|
ps[3:15] = [0] * 6 + [1] + [0] * 5
|
|
if v is not None:
|
|
ps[:3] = list(_u24(v))
|
|
ok = True
|
|
try:
|
|
with open("iPod_Control/iTunes/iTunesPState", "wb") as f:
|
|
array.array("B", ps).tofile(f)
|
|
self.log("OK.")
|
|
except OSError:
|
|
self.log("FAILED.")
|
|
ok = False
|
|
self.log("Creating statistics file ...", end=" ")
|
|
try:
|
|
with open("iPod_Control/iTunes/iTunesStats", "wb") as f:
|
|
f.write(_u24(n) + b"\0" * 3 + (_u24(18) + b"\xff" * 3 + b"\0" * 12) * n)
|
|
self.log("OK.")
|
|
except OSError:
|
|
self.log("FAILED.")
|
|
ok = False
|
|
random.seed()
|
|
if self.opts["smart"]:
|
|
self.log("Generating smart shuffle sequence ...", end=" ")
|
|
seq = self._smart_shuffle()
|
|
else:
|
|
self.log("Generating shuffle sequence ...", end=" ")
|
|
seq = list(range(n))
|
|
random.shuffle(seq)
|
|
try:
|
|
with open("iPod_Control/iTunes/iTunesShuffle", "wb") as f:
|
|
f.write(b"".join(_u24(x) for x in seq))
|
|
self.log("OK.")
|
|
except OSError:
|
|
self.log("FAILED.")
|
|
ok = False
|
|
return ok
|
|
|
|
def run(self, dirs: list) -> None:
|
|
self.log("Welcome to %s, version %s" % (__title__, __version__))
|
|
self.log()
|
|
self._load_user_rules_file()
|
|
if not os.path.isdir("iPod_Control/iTunes"):
|
|
self.log(ERR_NO_CTRL)
|
|
sys.exit(1)
|
|
self._entry_template = array.array("B")
|
|
sd_read = None
|
|
try:
|
|
sd_read = open("iPod_Control/iTunes/iTunesSD", "rb")
|
|
self._entry_template.fromfile(sd_read, 51)
|
|
if self.opts["reuse"]:
|
|
sd_read.seek(18)
|
|
ent = sd_read.read(558)
|
|
while len(ent) == 558:
|
|
self.known_entries[
|
|
ent[33::2].split(b"\0", 1)[0].decode("latin-1", "replace")
|
|
] = ent
|
|
ent = sd_read.read(558)
|
|
except OSError, EOFError:
|
|
pass
|
|
if sd_read:
|
|
sd_read.close()
|
|
et = self._entry_template
|
|
if len(et) == 51:
|
|
self.log("Using iTunesSD headers from existing database.")
|
|
if self.known_entries:
|
|
self.log(
|
|
"Collected %d entries from existing database."
|
|
% len(self.known_entries)
|
|
)
|
|
else:
|
|
del et[18:]
|
|
if len(et) == 18:
|
|
self.log("Using iTunesSD main header from existing database.")
|
|
else:
|
|
del et[:]
|
|
self.log("Rebuilding iTunesSD main header from scratch.")
|
|
et.fromlist([0, 0, 0, 1, 6, 0, 0, 0, 18] + [0] * 9)
|
|
self.log("Rebuilding iTunesSD entry header from scratch.")
|
|
et.fromlist([0, 2, 46, 90, 165, 1] + [0] * 20 + [100, 0, 0, 1, 0, 2, 0])
|
|
self.log()
|
|
try:
|
|
self._sd = open("iPod_Control/iTunes/iTunesSD", "wb")
|
|
et[:18].tofile(self._sd)
|
|
except OSError:
|
|
self.log(ERR_NO_SD)
|
|
sys.exit(1)
|
|
del et[:18]
|
|
self.log("Searching for files on your iPod.")
|
|
try:
|
|
if dirs:
|
|
for d in dirs:
|
|
self._browse("./" + d, self.opts["interactive"])
|
|
else:
|
|
self._browse(".", self.opts["interactive"])
|
|
self.log("%d playable files were found on your iPod." % self.total_count)
|
|
self.log()
|
|
self.log("Fixing iTunesSD header.")
|
|
self._sd.seek(0)
|
|
self._sd.write(
|
|
bytes((0, (self.total_count >> 8) & 0xFF, self.total_count & 0xFF))
|
|
)
|
|
self._sd.close()
|
|
self._sd = None
|
|
except OSError:
|
|
self.log("ERROR: Some strange errors occured while writing iTunesSD.")
|
|
self.log(" You may have to re-initialize the iPod using iTunes.")
|
|
sys.exit(1)
|
|
ok = self._write_sidecars()
|
|
if ok:
|
|
self.log()
|
|
self.log("The iPod shuffle database was rebuilt successfully.")
|
|
self.log("Have fun listening to your music!")
|
|
else:
|
|
self.log()
|
|
self.log(
|
|
"WARNING: The main database file was rebuilt successfully, but there were errors\n"
|
|
" while resetting the other files. However, playback MAY work correctly."
|
|
)
|
|
|
|
|
|
def _opterr(msg) -> None:
|
|
print("parse error:", msg)
|
|
print("use `%s -h' to get help" % sys.argv[0])
|
|
sys.exit(1)
|
|
|
|
|
|
def parse_argv(argv: list, s: ShuffleRebuild) -> list:
|
|
try:
|
|
opts, args = getopt.getopt(argv[1:], "hiv:snlfL:rp:", GETOPT_LONG)
|
|
except getopt.GetoptError as e:
|
|
_opterr(e)
|
|
o = s.opts
|
|
for opt, arg in opts:
|
|
if opt in ("-h", "--help"):
|
|
print("Usage: %s [OPTION]... [DIRECTORY]...\n%s" % (argv[0], HELP))
|
|
sys.exit(0)
|
|
if opt in ("-i", "--interactive"):
|
|
o["interactive"] = True
|
|
elif opt in ("-v", "--volume"):
|
|
try:
|
|
o["volume"] = int(arg)
|
|
except ValueError:
|
|
_opterr("invalid volume")
|
|
elif opt in ("-s", "--nosmart"):
|
|
o["smart"] = False
|
|
elif opt in ("-n", "--nochdir"):
|
|
o["home"] = False
|
|
elif opt in ("-p", "--ipod-path"):
|
|
o["ipod_path"] = arg
|
|
elif opt in ("-l", "--nolog"):
|
|
o["logging"] = False
|
|
elif opt in ("-f", "--force"):
|
|
o["reuse"] = 0
|
|
elif opt in ("-L", "--logfile"):
|
|
o["logfile"] = arg
|
|
elif opt in ("-r", "--rename"):
|
|
o["rename"] = True
|
|
return args
|
|
|
|
|
|
def cli() -> None:
|
|
s = ShuffleRebuild()
|
|
dirs = parse_argv(sys.argv, s)
|
|
s.chdir_for_run(sys.argv[0])
|
|
with s.log_to_file():
|
|
try:
|
|
s.run(dirs)
|
|
except KeyboardInterrupt:
|
|
s.log()
|
|
s.log("You decided to cancel processing. This is OK, but please note that")
|
|
s.log("the iPod database is now corrupt and the iPod won't play!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
cli()
|