slskd-musicsearch/slskdsearch.py
2023-09-20 23:20:00 +10:00

123 lines
5 KiB
Python

#!/usr/bin/env python3
import musicbrainzngs
from slskd_api import SlskdClient
from requests import get, post
from time import sleep
from os import environ
musicbrainzngs.set_useragent("slskd-musicsearch", "0.1", "https://github.com/alyssadev/slskd-musicsearch")
slskd = SlskdClient(environ.get("SLSKD_HOST","http://localhost:5030"), environ.get("SLSKD_KEY",""))
lidarr = environ.get("LIDARR_HOST","http://localhost:8686"), environ.get("LIDARR_KEY","")
def print_json(d):
from json import dumps
print(dumps(d,indent=4))
def get_album(release_id):
result = musicbrainzngs.get_release_by_id(release_id, includes=["artists", "recordings", "recording-level-rels", "work-rels", "work-level-rels"])["release"]
tracks = []
for medium in result["medium-list"]:
for track in medium["track-list"]:
tracks.append({
"title": track["recording"]["title"],
"alttitle": track["recording"].get("work-relation-list",[{}])[0].get("work",{}).get("title"),
"length": track["length"]
})
return {
"title": result["title"],
"artist": result["artist-credit-phrase"],
"tracks": tracks
}
def san(inp):
return inp.replace("\u2019","").replace(",","").replace("'","") if inp else None
def search(album):
search = None
query = f"{album['artist']} - {album['title']} flac"
previous_searches = slskd.searches.get_all()
for s in previous_searches:
if s["searchText"] == query:
# print(query, s)
search = s
if not search:
search = slskd.searches.search_text(f"{album['artist']} - {album['title']} flac")
try:
while not slskd.searches.state(search["id"])["isComplete"]:
sleep(0.5)
except KeyboardInterrupt:
pass
results = slskd.searches.search_responses(search["id"])
lookup = []
for result in results:
track_names = {n:(san(track["title"]),san(track["alttitle"])) for n,track in enumerate(album["tracks"])}
# print_json(track_names)
matches = 0
resp = {"username": result["username"], "files": []}
for f in result["files"]:
fn = (f["filename"].split("\\")[-1])
# print(fn)
_track_names = dict(track_names)
for n,title in _track_names.items():
if f"{title[0]}.flac" in fn or (f"{title[1]}.flac" in fn if title[1] else False):
del track_names[n]
resp["files"].append(f)
matches += 1
if not track_names:
break
if matches == len([track["title"] for track in album["tracks"]]):
slskd.transfers.enqueue(**resp)
slskd.searches.delete(search["id"])
return True
if not matches:
continue
resp["matches"] = matches
lookup.append(resp)
return search["id"]
def lidarr_get(endpoint, *args, **kwargs):
if not "headers" in kwargs:
kwargs["headers"] = {}
kwargs["headers"]["X-Api-Key"] = lidarr[1]
return get(lidarr[0] + endpoint, *args, **kwargs).json()
def lidarr_post(endpoint, *args, **kwargs):
if not "headers" in kwargs:
kwargs["headers"] = {}
kwargs["headers"]["X-Api-Key"] = lidarr[1]
return post(lidarr[0] + endpoint, *args, **kwargs).json()
def lidarr_get_root_folder(index: int=0):
root_folder = lidarr_get("/api/v1/rootFolder")
if type(root_folder) is list:
root_folder = root_folder[index]
return root_folder
def add_album_to_lidarr(release_id, root_folder_index: int=0):
release_group_id = musicbrainzngs.get_release_by_id(release_id,includes=["release-groups"])["release"]["release-group"]["id"]
lidarr_search = lidarr_get("/api/v1/search", params={"term": f"lidarr:{release_group_id}"})
lidarr_album = lidarr_search[0]["album"]
lidarr_album["monitored"] = True
lidarr_album["addOptions"] = {"searchForNewAlbum": False}
lidarr_album["artist"]["monitored"] = True
lidarr_album["artist"]["addOptions"] = {"monitor": "missing", "searchForMissingAlbums": False}
#print_json(lidarr_album)
root_folder = lidarr_get_root_folder(root_folder_index)
#print_json(root_folder)
# bring in values configured on root folder
lidarr_album["artist"]["metadataProfileId"] = root_folder["defaultMetadataProfileId"]
lidarr_album["artist"]["qualityProfileId"] = root_folder["defaultQualityProfileId"]
lidarr_album["artist"]["rootFolderPath"] = root_folder["path"]
return lidarr_post("/api/v1/album", json=lidarr_album)
def lidarr_import(root_folder):
# currently not working
return lidarr_post("/api/v1/command", json={"name": "DownloadedAlbumsScan", "path": root_folder["path"]})
def lidarr_retag(artist_id):
retag_list = lidarr_get("/api/v1/retag", params={"artistId": artist_id})
retag_data = {"artistId": artist_id, "files": [t["trackFileId"] for t in retag_list], "name": "RetagFiles"}
return lidarr_post("/api/v1/command", json=retag_data)