105 lines
3.5 KiB
Python
105 lines
3.5 KiB
Python
import warnings
|
|
import httpx
|
|
import functools
|
|
import asyncio
|
|
|
|
|
|
def retry_request(n_retries=3, timeout=5.0, backoff=1.5):
|
|
def decorator(func):
|
|
@functools.wraps(func)
|
|
async def wrapper(*args, **kwargs):
|
|
last_exc = None
|
|
|
|
for attempt in range(n_retries):
|
|
try:
|
|
# inject timeout if function supports it
|
|
if "timeout" not in kwargs:
|
|
kwargs["timeout"] = timeout
|
|
|
|
return await func(*args, **kwargs)
|
|
|
|
except httpx.TimeoutException as e:
|
|
last_exc = e
|
|
warnings.warn(f"Timeout (attempt {attempt + 1}/{n_retries})")
|
|
|
|
except httpx.RequestError as e:
|
|
last_exc = e
|
|
warnings.warn(
|
|
f"Request error (attempt {attempt + 1}/{n_retries}): {e}"
|
|
)
|
|
|
|
# backoff before retrying
|
|
if attempt < n_retries - 1:
|
|
await asyncio.sleep(backoff * (attempt + 1))
|
|
|
|
warnings.warn(f"All retries failed: {last_exc}")
|
|
return None
|
|
|
|
return wrapper
|
|
|
|
return decorator
|
|
|
|
|
|
class Api:
|
|
def __init__(self, refresh_token: str, base_64: str):
|
|
self.refresh_token = refresh_token
|
|
self.base_64 = base_64
|
|
self.access_token = None
|
|
self.header = None
|
|
self._client = httpx.AsyncClient(timeout=httpx.Timeout(5.0, connect=5.0))
|
|
|
|
@retry_request(n_retries=10, timeout=5.0)
|
|
async def refreshAuth(self) -> None:
|
|
uri = "https://accounts.spotify.com/api/token"
|
|
data = {
|
|
"grant_type": "refresh_token",
|
|
"refresh_token": self.refresh_token,
|
|
}
|
|
res = await self._client.post(
|
|
uri,
|
|
data=data,
|
|
headers={"Authorization": "Basic " + self.base_64},
|
|
)
|
|
req = res.json()
|
|
self.access_token = req["access_token"]
|
|
self.header = {"Authorization": f"Bearer {self.access_token}"}
|
|
return req["expires_in"]
|
|
|
|
@retry_request(n_retries=4, timeout=3.0)
|
|
async def getPlaying(self):
|
|
url = "https://api.spotify.com/v1/me/player/currently-playing"
|
|
req = await self._client.get(url, headers=self.header)
|
|
if req.status_code == 204:
|
|
return "not-playing"
|
|
if req.status_code == 401:
|
|
warnings.warn("API Error: Bad or Expired token")
|
|
elif req.status_code == 403:
|
|
warnings.warn("API Error: Bad OAuth request, re-authenticating won't help")
|
|
elif req.status_code == 429:
|
|
warnings.warn("API Error: API rate limit exceeded")
|
|
elif req.status_code != 200:
|
|
warnings.warn(f"{req.status_code},\n{req.content}")
|
|
return None
|
|
return self._format_req(req.json())
|
|
|
|
def _format_req(self, r):
|
|
if not r["is_playing"] or r["currently_playing_type"] != "track":
|
|
return "not-playing"
|
|
item, album = r["item"], r["item"]["album"]
|
|
res = {
|
|
"progress_ms": r["progress_ms"],
|
|
"duration_ms": item["duration_ms"],
|
|
"track": item["name"],
|
|
"album": album["name"],
|
|
"artists": [artist["name"] for artist in item["artists"]],
|
|
"img_url": None,
|
|
}
|
|
img_urls = album.get("images", [])
|
|
if img_urls:
|
|
res["img_url"] = img_urls.pop()["url"]
|
|
else:
|
|
warnings.warn(
|
|
f"{res['track']} - {res['artists']}\nAlbum art can't be found\n{img_urls}"
|
|
)
|
|
return res
|