Source code for lavaplay.node_manager

import asyncio
import typing as t
from .exceptions import TrackLoadFailed
from .emitter import Emitter
from .ws import WS
from .rest import RestApi
from .objects import Stats, Track, ConnectionInfo, PlayList, Info
from .events import Event
from . import __version__
from .utlits import get_event_loop, prossing_tracks , prossing_single_track
import logging
from .player import Player

_LOG = logging.getLogger("lavaplay.node")

[docs]class Node: """ Represents a Lavalink client used to manage nodes and connections. Parameters --------- host: :class:`str` ip address for lavalink server, default ip address for lavalink is ``127.0.0.1`` port: :class:`int` The port to use for websocket and REST connections. password: :class:`str` The password used for authentication. user_id: :class:`int | None` The bot id when you keep None you need to set on a started event on ur library used. num_shards: :class:`int` The count shards for websocket is_ssl: :class:`bool` Is server using ssl """ def __init__( self, *, host: t.Optional[str] = "127.0.0.1", port: int, password: str, user_id: t.Optional[int], resume_key: str = None, resume_timeout: int = 180, shards_count: int = 1, ssl: bool = False, loop: t.Optional[asyncio.AbstractEventLoop] = None, **kwargs ) -> None: self.host = host self.port = port self.password = password self.user_id = user_id self.shards_count = shards_count self.ssl = ssl self.loop = loop or get_event_loop() self.event_manager = Emitter(self.loop) self._ws: t.Optional[WS] = None self._resume_key = resume_key self._resume_timeout = resume_timeout # Unique identifier for the client. self.rest = RestApi(host=self.host, port=self.port, password=self.password, ssl=self.ssl) self.stats: Stats = None self._voice_handlers: t.Dict[int, ConnectionInfo] = {} self.session_id: t.Optional[str] = None self.players: t.Dict[int, Player] = {}
[docs] def set_event_loop(self, loop: asyncio.AbstractEventLoop) -> None: """ Set the event loop for the client requird set after call :meth:`connect`, Parameters --------- loop: :class:`asyncio.AbstractEventLoop` The event loop to use. """ # asyncio.set_event_loop(loop) self.loop = loop self.event_manager._loop = loop
[docs] def create_player(self, guild_id: int) -> Player: """ Create a player for guild id. Parameters --------- guild_id: :class:`int` The guild id for player. """ player = Player(self, guild_id) self.players[guild_id] = player return player
[docs] def destroy_player(self, guild_id: int) -> None: """ Destroy a player for guild id. Parameters --------- guild_id: :class:`int` The guild id for player. """ del self.players[guild_id]
[docs] def get_player(self, guild_id: int) -> Player: """ Get player for guild id. Parameters --------- guild_id: :class:`int` The guild id for player. """ return self.players.get(guild_id)
[docs] def get_players(self) -> t.List[Player]: """ Get all players. """ return list(self.players.values())
[docs] def change_player(self, guild_id: int, player: Player) -> None: """ Change player for guild id. Parameters --------- guild_id: :class:`int` The guild id for player. player: :class:`Player` The player to change. """ self.players[guild_id] = player
[docs] async def search_youtube(self, query: str) -> t.Optional[t.Union[t.List[Track], TrackLoadFailed]]: """ Search for tracks with youtube. Parameters --------- query: :class:`str` words for search with youtube. if not found result retrun empty :class:`list` Exceptions ---------- :class:`lavaplayer.exceptions.TrackLoadFailed` If the track could not be loaded. """ result = await self.rest.load_tracks(f"ytsearch:{query}") res = result["data"] if result["loadType"] == "empty": return [] if result["loadType"] == "error": raise TrackLoadFailed(res["message"], res["severity"], res["cause"], res["causeStackTrace"]) return prossing_tracks(result["data"], result)
[docs] async def search_soundcloud(self, query: str) -> t.Optional[t.Union[t.List[Track], TrackLoadFailed]]: """ Search for tracks with soundcloud. Parameters --------- query: :class:`str` words for search with soundcloud. if not found result retrun empty :class:`list` Exceptions ---------- :class:`lavaplayer.exceptions.TrackLoadFailed` If the track could not be loaded. """ result = await self.rest.load_tracks(f"scsearch:{query}") res = result["data"] if result["loadType"] == "empty": return [] if result["loadType"] == "error": raise TrackLoadFailed(res["message"], res["severity"], res["cause"], res["causeStackTrace"]) return prossing_tracks(result["data"],result)
[docs] async def search_youtube_music(self, query: str) -> t.Optional[t.Union[t.List[Track], PlayList, TrackLoadFailed]]: """ Search for tracks with youtube music. Parameters --------- query: :class:`str` words for search with youtube music. if not found result retrun empty :class:`list` Exceptions ---------- :class:`lavaplayer.exceptions.TrackLoadFailed` If the track could not be loaded. """ result = await self.rest.load_tracks(f"ytmsearch:{query}") res = result["data"] if result["loadType"] == "empty": return [] if result["loadType"] == "error": raise TrackLoadFailed(res["message"], res["severity"], res["cause"], res["causeStackTrace"]) return prossing_tracks(result["data"],result)
[docs] async def get_tracks(self, query: str) -> t.Optional[t.Union[t.List[Track], PlayList, TrackLoadFailed]]: """ Load tracks for unknow sits or youtube or soundcloud or radio. Parameters --------- query: :class:`str` track url, if not found result retrun empty :class:`list` Exceptions ---------- :class:`lavaplayer.exceptions.TrackLoadFailed` If the track could not be loaded. """ result = await self.rest.load_tracks(query) res = result["data"] if result["loadType"] == "playlist": tracks = prossing_tracks(res["tracks"],result) return PlayList(res["info"]["name"], res["info"]["selectedTrack"], tracks) if result["loadType"] == "track": return prossing_single_track(result["data"],result) if result["loadType"] == "error": raise TrackLoadFailed(res["message"], res["severity"], res["cause"], res["causeStackTrace"]) if result["loadType"] == "empty": return [] return prossing_tracks(result["data"],result)
[docs] async def decodetrack(self, track: str) -> Track: """ This method is used to decode a track from base64 only server can resolve, to info can anyone understanding it Parameters --------- track: :class:`str` track result from base64 """ result = await self.rest.decode_track(track) info = result["info"] return Track( track, identifier=info["identifier"], is_seekable=info["isSeekable"], author=info["author"], length=info["length"], is_stream=info["isStream"], position=info["position"], source_name=info.get("sourceName", None), title=info.get("title", None), uri=info["uri"], artworkUrl=info.get("artworkUrl", None), isrc=info.get("isrc", None), load_type=result.get("loadType", None), plugin_info=track["pluginInfo"] )
[docs] async def decodetracks(self, tracks: t.List[t.Dict]) -> t.List[Track]: """ This method is used to decode a tracks from base64 only server can resolve, to info can anyone understanding it Parameters --------- tracks: :class:`list` tracks result from base64 """ result = await self.rest.decode_tracks(tracks) return prossing_tracks(result,result)
[docs] async def auto_search_tracks(self, query: str) -> t.Union[t.Optional[t.List[Track]], t.Optional[PlayList]]: """ Load tracks for youtube search or other urls. Parameters --------- query: :class:`str` url or words to search, if not found result retrun empty :class:`list` Exceptions ---------- :class:`lavaplayer.exceptions.TrackLoadFailed` If the track could not be loaded. """ if "http" in query: return await self.get_tracks(query) return await self.search_youtube(query)
[docs] def listen(self, event: t.Union[str, Event]) -> t.Callable[..., t.Awaitable]: """ The register function for listener handler Parameters --------- event: :class:`Any` | :class:`str` event name or class for event """ def deco(func: t.Awaitable) -> t.Callable[..., t.Awaitable]: self.event_manager.add_listener(event, func) return deco
@property def is_connect(self) -> bool: """ Check if the client is connect to the voice server. """ return self._ws.is_connect if self._ws else False
[docs] async def info(self) -> Info: """ Get info from lavalink server. """ data = await self.rest.info() return Info.from_kwargs(**data)
[docs] async def version(self) -> str: """ Get version from lavalink server. """ data = await self.rest.version() return data["version"]
[docs] def connect(self): """ Connect to the lavalink websocket """ self._ws = WS( node=self, host=self.host, port=self.port, ssl=self.ssl, password=self.password, user_id=self.user_id, shards_count=self.shards_count ) asyncio.ensure_future(self._ws._connect(), loop=self.loop)
[docs] async def close(self): """ Disconnect from the lavalink websocket """ await self._ws.ws.close()