abs_client.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. import httpx
  2. from typing import List, Dict, Any
  3. from app.config import get_settings
  4. class AudiobookshelfClient:
  5. """Client for interacting with Audiobookshelf API."""
  6. def __init__(self):
  7. settings = get_settings()
  8. self.base_url = settings.abs_url.rstrip("/")
  9. self.api_token = settings.abs_api_token
  10. self.headers = {"Authorization": f"Bearer {self.api_token}"}
  11. async def get_libraries(self) -> List[Dict[str, Any]]:
  12. """Get all libraries from Audiobookshelf."""
  13. async with httpx.AsyncClient() as client:
  14. response = await client.get(
  15. f"{self.base_url}/api/libraries", headers=self.headers
  16. )
  17. response.raise_for_status()
  18. return response.json().get("libraries", [])
  19. async def get_library_items(self, library_id: str) -> List[Dict[str, Any]]:
  20. """Get all items in a library."""
  21. async with httpx.AsyncClient() as client:
  22. response = await client.get(
  23. f"{self.base_url}/api/libraries/{library_id}/items",
  24. headers=self.headers,
  25. )
  26. response.raise_for_status()
  27. return response.json().get("results", [])
  28. async def get_user_listening_sessions(self) -> List[Dict[str, Any]]:
  29. """Get user's listening sessions."""
  30. async with httpx.AsyncClient() as client:
  31. response = await client.get(
  32. f"{self.base_url}/api/me/listening-sessions", headers=self.headers
  33. )
  34. response.raise_for_status()
  35. return response.json().get("sessions", [])
  36. async def get_user_progress(self) -> List[Dict[str, Any]]:
  37. """Get user's media progress for all items."""
  38. async with httpx.AsyncClient() as client:
  39. response = await client.get(
  40. f"{self.base_url}/api/me/progress", headers=self.headers
  41. )
  42. response.raise_for_status()
  43. return response.json().get("libraryItems", [])
  44. async def get_item_details(self, item_id: str) -> Dict[str, Any]:
  45. """Get detailed information about a specific library item."""
  46. async with httpx.AsyncClient() as client:
  47. response = await client.get(
  48. f"{self.base_url}/api/items/{item_id}", headers=self.headers
  49. )
  50. response.raise_for_status()
  51. return response.json()
  52. async def get_user_info(self) -> Dict[str, Any]:
  53. """Get current user information."""
  54. async with httpx.AsyncClient() as client:
  55. response = await client.get(
  56. f"{self.base_url}/api/me", headers=self.headers
  57. )
  58. response.raise_for_status()
  59. return response.json()