import asyncio
from abc import ABC, abstractmethod, abstractproperty
from typing import Set, Union, Optional, Awaitable, Dict, Type, Tuple
import attr
from async_property import AwaitableOnly
from privex.helpers import cached, Tuple, List, AttribDictable, Decimal, empty, camel_to_snake, PrivexException, awaitable, await_if_needed
from privex.helpers.cache import adapter_get, adapter_set, AsyncCacheAdapter, AsyncMemoryCache
import logging
import importlib
from decimal import getcontext
getcontext().prec = 30
log = logging.getLogger(__name__)
_curr_adapter = adapter_get()
if not isinstance(_curr_adapter, AsyncCacheAdapter):
log.warning("WARNING: Current privex.helpers cache adapter is not an Async adapter.")
log.warning("Current privex.helpers adapter: %s", _curr_adapter.__class__.__name__)
log.warning("Setting privex.helpers cache adapter to 'AsyncMemoryCache'.")
adapter_set(AsyncMemoryCache())
class ExchangeException(PrivexException):
"""Base exception for all exceptions that are part of :mod:`privex.exchange`"""
pass
class PairNotFound(ExchangeException):
"""Raised when a requested coin pair isn't supported by this exchange adapter"""
pass
class ProxyMissingPair(PairNotFound):
"""Raised when a requested coin pair required for try_proxy does not exist."""
pass
class ExchangeDown(ExchangeException):
"""Raised when an exchange appears to be down, e.g. timing out or throwing 4xx / 5xx errors."""
pass
class ExchangeRateLimited(ExchangeDown):
"""Raised when an exchange adapter encounters a rate limit while querying an exchange"""
pass
[docs]def empty_decimal(obj: Optional[Decimal]):
if empty(obj): return None
return Decimal(obj)
[docs]@attr.s
class PriceData(AttribDictable):
"""Exchange price data object returned by exchange adapters"""
from_coin = attr.ib(type=str)
to_coin = attr.ib(type=str)
last = attr.ib(type=Decimal, converter=empty_decimal)
bid = attr.ib(default=None, type=Decimal, converter=empty_decimal)
ask = attr.ib(default=None, type=Decimal, converter=empty_decimal)
open = attr.ib(default=None, type=Decimal, converter=empty_decimal)
close = attr.ib(default=None, type=Decimal, converter=empty_decimal)
high = attr.ib(default=None, type=Decimal, converter=empty_decimal)
low = attr.ib(default=None, type=Decimal, converter=empty_decimal)
volume = attr.ib(default=None, type=Decimal, converter=empty_decimal)
PairListOrSet = Union[List[Tuple[str, str]], Set[List[Tuple[str, str]]]]
[docs]class ExchangeAdapter(ABC):
"""Abstract base class used by all exchange adapters for interoperability"""
cache_timeout: int = 120
_provides: Set[Tuple[str, str]] = set()
_extra_provides: Set[Tuple[str, str]]
validate_provides: bool
ex_settings: dict
def __str__(self):
return f"[Exchange '{self.name}' (code: '{self.code}')]'"
def __repr__(self):
return self.__str__()
@property
@abstractmethod
def name(self) -> str:
return self.__class__.__name__
@property
@abstractmethod
def code(self) -> str:
return camel_to_snake(self.name)
[docs] def __init__(self, extra_provides: PairListOrSet = None, validate_provides: bool = True, **ex_settings):
if not empty(extra_provides, itr=True):
for p in extra_provides:
if self.has_pair(*p):
continue
self._extra_provides.add(p)
self.validate_provides = validate_provides
self.ex_settings = dict(ex_settings)
@property
@abstractmethod
def provides(self) -> Union[Set[Tuple[str, str]], Awaitable[Set[Tuple[str, str]]]]:
return set(list(self._provides) + list(self._extra_provides))
[docs] @abstractmethod
async def has_pair(self, from_coin: str, to_coin: str) -> bool:
for f, t in self.provides:
if f.upper() == from_coin.upper() and t.upper() == to_coin.upper():
return True
return False
@abstractmethod
async def _get_pair(self, from_coin: str, to_coin: str) -> PriceData:
raise NotImplemented
@staticmethod
async def _get_cache_pricedata(key: str) -> Optional[PriceData]:
cdata = cached.get(key)
if empty(cdata):
return None
if isinstance(cdata, PriceData):
return cdata
if isinstance(cdata, dict):
return PriceData(**cdata)
log.warning("Error reading cache key '%s' - Data was neither PriceData nor dict. Data was: %s", key, cdata)
return None
[docs] async def get_pair(self, from_coin: str, to_coin: str) -> PriceData:
"""
**Instantiate an exchange class and get a ticker asynchronously**::
>>> from privex.exchange import Binance
>>> b = Binance()
>>> ticker = await b.get_pair('LTC', 'BTC')
>>> ticker.last
Decimal('0.00607100')
>>> ticker.volume
Decimal('89557.57000000')
**Can use synchronously in non-async applications**::
>>> from privex.exchange import Binance
>>> b = Binance()
>>> ticker = b.get_pair('LTC', 'BTC')
>>> ticker.last
Decimal('0.00607100')
:param from_coin:
:param to_coin:
:return:
"""
if self.validate_provides:
if not await self.has_pair(from_coin=from_coin, to_coin=to_coin):
raise PairNotFound(f"The coin pair '{from_coin}/{to_coin}' is not supported by {self.name}")
code = self.code
ckey = f"pvxex:{code}:{from_coin}_{to_coin}"
cdata = await cached.get(ckey)
if not empty(cdata):
if isinstance(cdata, PriceData):
return cdata
if isinstance(cdata, dict):
return PriceData(**cdata)
log.warning("Error reading cache key '%s' - Data was neither PriceData nor dict. Data was: %s", ckey, cdata)
log.warning(f"Will call {self.__class__.__name__}._get_pair and re-write cache key...")
data = await self._get_pair(from_coin=from_coin, to_coin=to_coin)
await cached.set(ckey, data, self.cache_timeout)
return data
[docs]class ExchangeManager:
"""
Basic usage::
>>> from privex.exchange import ExchangeManager
>>> exm = ExchangeManager()
>>> await exm.get_pair('btc', 'usd')
Decimal('6694.53000000')
>>> await exm.get_pair('ltc', 'usd')
Decimal('40.15000000')
Converting arbitrary cryptos between each other, seamlessly::
>>> await exm.get_pair('eos', 'ltc') # LTC per 1 EOS
Decimal('0.05957304869913275517011340894')
>>> await exm.get_pair('hive', 'eos') # EOS per 1 HIVE
Decimal('0.04325307950727883538633818590')
"""
available_adapters: List[Tuple[str, str]] = [
('privex.exchange.Binance', 'Binance'),
('privex.exchange.Bittrex', 'Bittrex'),
('privex.exchange.Kraken', 'Kraken'),
('privex.exchange.CoinGecko', 'CoinGecko'),
]
ex_instances: Dict[str, ExchangeAdapter] = {}
ex_code_map: Dict[str, str] = {}
ex_name_map: Dict[str, str] = {}
ex_pair_map: Dict[str, List[ExchangeAdapter]] = {}
ex_pair_map_inv: Dict[ExchangeAdapter, Set[Tuple[str, str]]] = {}
map_tether_real: bool = True
tether_map = {
'USDT': 'USD',
'USDC': 'USD',
}
proxy_coins = ['BTC', 'USD', 'USDT']
loaded = False
[docs] @classmethod
async def load_exchange(cls, package: str, name: str) -> ExchangeAdapter:
obj_path = '.'.join([package, name])
log.debug("Checking if adapter '%s' is already registered...", obj_path)
if obj_path not in cls.ex_instances:
log.debug("Importing module '%s'", package)
# Try importing the package
pkg = importlib.import_module(package)
log.debug("Extracting object '%s' from module '%s'", name, package)
# Then try pulling the object out of the package
obj: Type[ExchangeAdapter] = pkg.__dict__[name]
# Instantiate the adapter and add it to the exchange maps
log.debug("Instantiating object '%s' and mapping it...", obj)
inst: ExchangeAdapter = obj()
cls.ex_instances[obj_path] = inst
cls.ex_code_map[inst.code] = obj_path
cls.ex_name_map[inst.name] = obj_path
# Populate ExchangeManager.ex_pair_map
await cls._import_pairs(inst)
return cls.ex_instances[obj_path]
@classmethod
async def _import_pairs(cls, inst: ExchangeAdapter):
log.debug("Importing pairs for exchange adapter: %s", inst)
# Grab the .provides set of tuples from the instance.
_prov: Union[Set[Tuple[str, str]], Awaitable[Set[Tuple[str, str]]]] = inst.provides
# Some classes may have an async_property .provides, so we need to check if it's a coroutine.
# If it is a coroutine, then we have to await it before we can scan it.
# noinspection PyTypeChecker
if asyncio.iscoroutine(_prov) or asyncio.iscoroutinefunction(_prov) or isinstance(_prov, AwaitableOnly):
log.debug(f"{inst.__class__.__name__}.provides is async. Awaiting it...")
_prov = await _prov
cls.ex_pair_map_inv[inst] = _prov
# Iterate over each supported coin pair, and append the adapter instance to each pair in ex_pair_map
for frm_coin, to_coin in _prov:
cls._map_pair(frm_coin, to_coin, inst)
if cls.map_tether_real:
if frm_coin in cls.tether_map.keys():
log.debug(
"Detected frm_coin '%s' is a tethered coin. Adding extra pair map for %s/%s",
frm_coin, cls.tether_map[frm_coin], to_coin
)
cls._map_pair(cls.tether_map[frm_coin], to_coin, inst)
elif to_coin in cls.tether_map.keys():
log.debug(
"Detected to_coin '%s' is a tethered coin. Adding extra pair map for %s/%s",
to_coin, frm_coin, cls.tether_map[to_coin]
)
cls._map_pair(frm_coin, cls.tether_map[to_coin], inst)
return _prov
[docs] @classmethod
def list_pairs_from(cls, frm_coin: str) -> Dict[str, ExchangeAdapter]:
return {
pair: adapters for pair, adapters in cls.ex_pair_map.items()
if pair.split('_')[0] == frm_coin.upper()
}
[docs] @classmethod
def list_pairs_to(cls, to_coin: str) -> Dict[str, ExchangeAdapter]:
return {
pair: adapters for pair, adapters in cls.ex_pair_map.items()
if pair.split('_')[1] == to_coin.upper()
}
@classmethod
def _map_pair(cls, frm_coin: str, to_coin: str, inst: ExchangeAdapter):
k = f"{frm_coin.upper()}_{to_coin.upper()}"
log.debug("Mapping pair %s for exchange: %s", k, inst)
if k in cls.ex_pair_map:
if inst in cls.ex_pair_map[k]:
log.debug("Exchange already exists in pair map for %s. Exchange: %s", k, inst)
return
log.debug("Pair %s exists in ex_pair_map. Appending exchange: %s", k, inst)
cls.ex_pair_map[k].append(inst)
else:
log.debug("Pair %s did not exist in ex_pair_map. Creating pair with exchange: %s", k, inst)
cls.ex_pair_map[k] = [inst]
[docs] @classmethod
async def load_exchanges(cls):
for pkg_path, obj_name in cls.available_adapters:
obj_path = '.'.join([pkg_path, obj_name])
if obj_path in cls.ex_instances:
continue
try:
await cls.load_exchange(pkg_path, obj_name)
except:
log.exception("Skipping adapter %s due to exception ...", obj_path)
cls.loaded = True
[docs] @classmethod
def exchange_by_code(cls, code: str) -> ExchangeAdapter:
return cls.ex_instances[cls.ex_code_map[code]]
[docs] @classmethod
def exchange_by_name(cls, name: str) -> ExchangeAdapter:
return cls.ex_instances[cls.ex_name_map[name]]
[docs] @classmethod
def exchange_by_path(cls, obj_path: str) -> ExchangeAdapter:
return cls.ex_instances[obj_path]
[docs] @classmethod
def pair_exists(cls, from_coin: str, to_coin: str, should_raise=False) -> bool:
pair = f"{from_coin.upper()}_{to_coin.upper()}"
if pair in cls.ex_pair_map and len(cls.ex_pair_map[pair]) > 0:
return True
if should_raise:
raise PairNotFound(f"Pair '{pair}' does not exist / no usable exchanges in {cls.__name__}.ex_pair_map")
return False
@classmethod
def _pair_exchanges(cls, from_coin: str, to_coin: str) -> List[ExchangeAdapter]:
pair = f"{from_coin.upper()}_{to_coin.upper()}"
if pair not in cls.ex_pair_map:
raise PairNotFound(f"Pair '{pair}' does not exist in {cls.__name__}.ex_pair_map")
if len(cls.ex_pair_map[pair]) == 0:
raise PairNotFound(f"Pair '{pair}' has no listed exchanges in {cls.__name__}.ex_pair_map")
return cls.ex_pair_map[pair]
[docs] @classmethod
async def try_proxy(cls, from_coin: str, to_coin: str, proxy: str = "BTC", rate='last', adapter: ExchangeAdapter = None):
"""
>>> exm = ExchangeManager()
>>> await exm.load_exchanges()
>>> await exm.try_proxy('HIVE', 'USD', proxy='BTC')
:param from_coin:
:param to_coin:
:param proxy:
:param rate:
:param adapter:
:return:
"""
if not cls.loaded: await cls.load_exchanges()
from_coin, to_coin, proxy = from_coin.upper(), to_coin.upper(), proxy.upper()
log.info("Attempting to get '%s' price for %s / %s using proxy coin %s", rate, from_coin, to_coin, proxy)
inv_to = False
if adapter:
log.debug("Custom adapter was specified. Proxying exclusively via: %s", adapter)
pair_exists = cls.pair_exists if adapter is None else adapter.has_pair
get_ticker = cls._get_ticker if adapter is None else adapter.get_pair
if not await await_if_needed(pair_exists, from_coin, proxy):
raise ProxyMissingPair(f"No pairs available for {from_coin} -> {proxy}")
if not await await_if_needed(pair_exists, proxy, to_coin):
log.debug("Did not find pair %s/%s - checking if we can do inverted %s/%s", proxy, to_coin, to_coin, proxy)
if await await_if_needed(pair_exists, to_coin, proxy):
log.debug("Found inverted proxy %s/%s", to_coin, proxy)
inv_to = True
else:
raise ProxyMissingPair(f"No pairs available for {proxy} -> {to_coin}")
# pair = f"{from_coin}_{to_coin}"
# e.g. if from=HIVE, to=USD, proxy=BTC
# get ticker for HIVE/BTC - get ticker for USD/BTC
# e.g. HIVE/BTC = 0.00001545 BTC/USD = 6900
# 0.00001545 * 6900 = 0.106605 USD per HIVE
ex_from = await get_ticker(from_coin, proxy)
if inv_to:
ex_to = await get_ticker(to_coin, proxy)
log.debug("Original rates from reverse proxy %s/%s - rates: %s", to_coin, proxy, ex_to)
ex_to = ExchangeManager._invert_data(ex_to)
log.debug("Inverted rates - converted pair into %s/%s : %s", proxy, to_coin, ex_to)
else:
ex_to = await get_ticker(proxy, to_coin)
r_from, r_to = Decimal(ex_from[rate]), Decimal(ex_to[rate])
log.debug(f"{from_coin}/{proxy} price: {r_from:.8f} {proxy} per 1 {from_coin}")
log.debug(f"{proxy}/{to_coin} price: {r_to:.8f} {to_coin} per 1 {proxy}")
log.debug(f"{r_from} * {r_to} = %f", r_from * r_to)
return r_from * r_to
@staticmethod
def _invert_data(data: PriceData):
inv_keys = ['last', 'bid', 'ask', 'open', 'close', 'high', 'low', 'volume']
new_pd = PriceData(**dict(data))
# Invert each ticker price, e.g. for BTC/USD into USD/BTC
for k in inv_keys:
if empty(new_pd[k], zero=True):
continue
setattr(new_pd, k, Decimal('1') / new_pd[k])
# Atomic swap from_coin and to_coin with eachother
new_pd.from_coin, new_pd.to_coin = new_pd.to_coin, new_pd.from_coin
return new_pd
@classmethod
async def _get_ticker(cls, from_coin: str, to_coin: str, ex_index: int = 0) -> PriceData:
if not cls.loaded: await cls.load_exchanges()
from_coin, to_coin = from_coin.upper(), to_coin.upper()
try:
cls.pair_exists(from_coin, to_coin, should_raise=True)
except PairNotFound:
log.info("Pair %s/%s was not found. Trying inverse query using %s/%s", from_coin, to_coin, to_coin, from_coin)
cls.pair_exists(to_coin, from_coin, should_raise=True)
adp = cls.ex_pair_map[f"{to_coin}_{from_coin}"][ex_index]
return ExchangeManager._invert_data(await adp.get_pair(to_coin, from_coin))
adp = cls.ex_pair_map[f"{from_coin}_{to_coin}"][ex_index]
log.info("Found pair %s/%s - querying exchange: %s", from_coin, to_coin, adp)
return await adp.get_pair(from_coin, to_coin)
[docs] @classmethod
async def get_tickers(cls, from_coin: str, to_coin: str, rate='last') -> Dict[str, Decimal]:
if not cls.loaded: await cls.load_exchanges()
from_coin, to_coin = from_coin.upper(), to_coin.upper()
data = {}
for _, adp in cls.ex_instances.items(): # type: ExchangeAdapter
if await adp.has_pair(from_coin, to_coin):
try:
d = await adp.get_pair(from_coin, to_coin)
data[adp.code] = d[rate]
except Exception as e:
log.warning(
"Error obtaining %s/%s exchange rate from %s - reason: %s %s",
from_coin, to_coin, adp.name, type(e), str(e)
)
elif await adp.has_pair(to_coin, from_coin):
log.info("Pair %s/%s was not found. Trying inverse query using %s/%s", from_coin, to_coin, to_coin, from_coin)
try:
data[adp.code] = ExchangeManager._invert_data(await adp.get_pair(to_coin, from_coin))[rate]
except Exception as e:
log.warning(
"Error obtaining %s/%s exchange rate from %s - reason: %s %s",
from_coin, to_coin, adp.name, type(e), str(e)
)
else:
try:
proxy = await cls._find_proxy(from_coin, to_coin, adapter=adp)
data[adp.code] = await cls.try_proxy(from_coin, to_coin, proxy, adapter=adp)
except (ProxyMissingPair, PairNotFound):
log.warning(
"Error obtaining %s/%s exchange rate from %s - cannot find a proxy pair.",
from_coin, to_coin, adp.name
)
return data
# data = {}
# try:
# cls.pair_exists(from_coin, to_coin, should_raise=True)
# except PairNotFound:
# log.info("Pair %s/%s was not found. Trying inverse query using %s/%s", from_coin, to_coin, to_coin, from_coin)
# cls.pair_exists(to_coin, from_coin, should_raise=True)
# for adp in cls.ex_pair_map[f"{to_coin}_{from_coin}"]:
# data[adp.code] = ExchangeManager._invert_data(await adp.get_pair(to_coin, from_coin))[rate]
# return data
#
# for adp in cls.ex_pair_map[f"{from_coin}_{to_coin}"]:
# d = await adp.get_pair(from_coin, to_coin)
# data[adp.code] = d[rate]
#
# return data
@classmethod
async def _find_proxy(cls, from_coin: str, to_coin: str, adapter: ExchangeAdapter = None) -> str:
"""
>>> await cls._find_proxy('HIVE', 'USD')
'BTC'
:param from_coin:
:param to_coin:
:return:
"""
if not cls.loaded: await cls.load_exchanges()
from_coin, to_coin = from_coin.upper(), to_coin.upper()
for c in cls.proxy_coins:
if adapter:
if not await adapter.has_pair(from_coin, c): continue
if await adapter.has_pair(c, to_coin): return c
if await adapter.has_pair(to_coin, c): return c
continue
if f"{from_coin}_{c}" not in cls.ex_pair_map:
log.debug("Cannot proxy via %s - pair %s not found", c, f"{from_coin}_{c}")
continue
if f"{c}_{to_coin}" in cls.ex_pair_map:
log.debug("Found forward proxy via %s - pair %s found", c, f"{c}_{to_coin}")
return c
if f"{to_coin}_{c}" in cls.ex_pair_map:
log.debug("Found reverse proxy via %s - pair %s found", c, f"{to_coin}_{c}")
return c
raise ProxyMissingPair(f"Could not find a viable proxy route for {from_coin} -> {to_coin}")
[docs] async def get_pair(self, from_coin: str, to_coin: str, rate='last', use_proxy: bool = True):
if not self.loaded:
await self.load_exchanges()
pair = f"{from_coin.upper()}_{to_coin.upper()}"
if pair not in self.ex_pair_map:
if not use_proxy:
raise PairNotFound(f"Pair {pair} was not found in ex_pair_map, and no proxying requested.")
try:
log.debug("Pair %s not found - trying to find a proxy route...", pair)
proxy = await self._find_proxy(from_coin, to_coin)
log.debug("Found proxy to %s via %s - trying proxy...", to_coin, proxy)
rate_proxy = await self.try_proxy(from_coin, to_coin, proxy, rate=rate)
log.debug("Got proxy rate: %f %s per %s", rate_proxy, to_coin, from_coin)
return rate_proxy
except ProxyMissingPair:
raise PairNotFound(f"Pair {pair} not found, nor a viable proxy route.")
adp = self.ex_pair_map[pair][0]
log.debug("Pair %s found - getting exchange rate directly", pair)
data = await adp.get_pair(from_coin, to_coin)
return data[rate]
__all__ = [
'PairNotFound', 'ExchangeRateLimited', 'ExchangeException', 'ExchangeAdapter', 'ExchangeDown', 'PriceData',
'ExchangeManager'
]