using Knoks.CryptoExchanges.Entities; using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using WebSocket.Interfaces; //using Knoks.CryptoExchanges.Classes; using Knoks.Framework.DataAccess; using System.IO; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Knoks.CryptoExchanges.Data.Interfaces; using Knoks.CryptoExchanges.Data.Dao; using Knoks.CryptoExchanges.Entities.Args; using System.Threading; using System.Text; namespace Knoks.CryptoExchanges { public class CryptoExchangeManager { #region Fields private List Exchanges { get; set; } private UpdateData saveData; private readonly List[] testSet = new List[2]; private readonly int last = Convert.ToInt32(false); public static IConfiguration Configuration { get; set; } private IExchangeDao _exchangeDao; private IKnokDao _knokDao; private IExchangePairRateDao _exchangePairRateDao; private readonly Dictionary>> pairs = new Dictionary>>(); //private bool exit = false; private Task exchangePairsTracerTask; private readonly double mitunteIntervalCheck; private readonly Dictionary lastExchangesResponse = new Dictionary(); CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); ILogger _logger; #endregion #region Public /// /// ctor /// public CryptoExchangeManager() { var builder = new ConfigurationBuilder() .SetBasePath(Directory.GetCurrentDirectory()) .AddJsonFile("appsettings.json"); Configuration = builder.Build(); if (double.TryParse(Configuration["MinuteIntervalCheck"], out var val)) { this.mitunteIntervalCheck = val; } else { this.mitunteIntervalCheck = 1; } ILoggerFactory loggerFactory = new LoggerFactory() .AddConsole(Configuration.GetSection("Logging")) .AddDebug(); _logger = loggerFactory.CreateLogger(); Dictionary items = Configuration.GetSection("DbConnections"). GetChildren().ToDictionary(x=>x.Key, x => x.Value); var provider = new ProcExecutor(items, loggerFactory.CreateLogger()); _exchangeDao = new ExchangeDao(provider); _knokDao = new KnokDao(provider); _exchangePairRateDao = new ExchangePairRateDao(provider); } /// /// Starts initial request to all CryptoExchanges /// /// public bool StartExchangesRequest() { this.saveData = (exchangeId, exchange, x) => { var replaceSymbols = Configuration.GetSection($"{exchange}:ReplaceSymbols").GetChildren() .ToDictionary(s=>s.Key, s=>s.Value); this.lastExchangesResponse[exchangeId] = DateTime.Now; foreach (var item in x) { var pair = item.Key; // Replace TUSD => USDT for Bittrex //if (replaceSymbols != null) //{ // var value = pair.Split('-'); // if (value.Count() == 2) // { // var symbol1 = replaceSymbols.FirstOrDefault(rs => rs.Value.Equals(value[0])); // if (!symbol1.Equals(default(KeyValuePair))) // { // pair = $"{symbol1.Key}{value[1]}"; // } // } //} _exchangePairRateDao.UpdateExchangePairRate(exchangeId, new ExchangePairRateArg() { PairSymbol = pair.Replace("-", string.Empty).Replace(":", string.Empty), Open = item.Value.OpenPrice, High = item.Value.HighPrice, Low = item.Value.LowPrice, Close = item.Value.ClosePrice, //Volume = item.Value.Volume, StartDate = item.Value.TimeStamp }); } var sb = new StringBuilder(); sb.AppendLine($"Exchange: \"{exchange}\""); foreach (var item in x) { sb.AppendLine($" Pair: {item.Key}"); sb.AppendLine($" Timestamp: {item.Value.TimeStamp.ToShortDateString()} {item.Value.TimeStamp.ToShortTimeString()}"); sb.AppendLine($" High: {item.Value.HighPrice}"); sb.AppendLine($" Low: {item.Value.LowPrice}"); sb.AppendLine(); } DebugInfo(sb.ToString()); }; InitExchagesClients().Wait(); this.exchangePairsTracerTask = Task.Run(() => ExchangePairsTracer(_cancellationTokenSource.Token), _cancellationTokenSource.Token); LogInfo($"CryptoExchange service is ran."); return true; } /// /// Stops request to all CryptoExchanges /// /// public async Task StopExchangesRequest() { //this.exit = true; LogInfo($"CryptoExchange service is stopping..."); _cancellationTokenSource.Cancel(); await Task.WhenAll(this.exchangePairsTracerTask); await Task.WhenAll(Exchanges.Select(x=>x.Client.CloseConnection())); } #endregion #region Private /// /// Background proccess for the pairs tracing /// /// private async Task ExchangePairsTracer(CancellationToken token) { do //while (!this.exit) { await CheckPairsChange(); // Task.Delay(TimeSpan.FromMinutes(this.mitunteIntervalCheck)).Wait(); } while (!token.WaitHandle.WaitOne(TimeSpan.FromMinutes(this.mitunteIntervalCheck))); // GetRemainRefreshDataIntervalMs(cycleBeginTime, _trackerSettings.ProcessKnoksIntervalMs))); } /// /// Comparator of the exchange pairs list /// /// private async Task CheckPairsChange() { foreach (var exchange in Exchanges) { var exchangePairs = await GetActiveKnoksDataByExchange(exchange.ExchangeId); var pairsChanged = true; if (this.pairs.Keys.Contains(exchange.ExchangeId)) { var firstNotSecond = this.pairs[exchange.ExchangeId].Except(exchangePairs).ToList(); var secondNotFirst = exchangePairs.Except(this.pairs[exchange.ExchangeId]).ToList(); pairsChanged = firstNotSecond.Any() || secondNotFirst.Any(); } this.pairs[exchange.ExchangeId] = exchangePairs.ToList(); if ((pairsChanged && exchangePairs.Any()) || (this.lastExchangesResponse.Keys.Contains(exchange.ExchangeId) && (DateTime.Now - this.lastExchangesResponse[exchange.ExchangeId]).Minutes > 10)) { UpdateExchangeRequest(exchange); } } await RecalcAggregations(); } /// /// The clients initialization /// /// private async Task InitExchagesClients() { Exchanges = new List(); foreach (var item in await GetExchanges()) { var exchange = new ExchangeClientHandler { ExchangeId = item.ExchangeId, Client = CryptoExchangeFactory.InitialyzeExchange(item.ExchangeId, item.ExchangeName, CryptoExchangeManager.Configuration) }; Exchanges.Add(exchange); if (exchange.Client != null) { await exchange.Client.InitClient(saveData); } } } /// /// Returns all CeyptoExchanges from DB /// /// private async Task> GetExchanges() { return await _exchangeDao.GetExchanges(); } /// /// Get list of active knoks pairs /// /// /// private async Task>> GetActiveKnoksDataByExchange(int exchangeId) { return await _knokDao.GetActiveKnoksDataByExchange(exchangeId); } /// /// Recalculate all aggregations except 1 minute /// /// private async Task RecalcAggregations() { await _knokDao.RecalcAggregations(); } /// /// Reconnect exchanges with new pairs /// /// private void UpdateExchangeRequest(ExchangeClientHandler exchange) { var requestedExchange = Exchanges.FirstOrDefault(x => x.ExchangeId == exchange.ExchangeId); if (requestedExchange != null) { if (exchange.TaskHandler != null) { requestedExchange.Client.Reconnect().Wait(); Task.WaitAll(exchange.TaskHandler); } var pairs = this.pairs[exchange.ExchangeId]; // await GetActiveKnoksDataByExchange(exchange.ExchangeId); // await knoks. SelectMany(x => x.TradingPairs).Distinct().ToList(); if (pairs.Any()) { var sb = new StringBuilder(); sb.AppendLine($"Reconnect for ExchangeId: {exchange.ExchangeId}"); foreach (var i in pairs) { sb.AppendLine($" Pair: {i}"); } LogInfo(sb.ToString()); // Replace USDT => TUSD for Bittrex //if (requestedExchange.Client.ReplaceSymbols != null) //{ // for (var i = 0; i < pairs.Count; i++) // { // if (requestedExchange.Client.ReplaceSymbols.Keys.Contains(pairs[i].Item1)) // { // pairs[i] = new Tuple(requestedExchange.Client.ReplaceSymbols[pairs[i].Item1], pairs[i].Item2); // } // } //} exchange.TaskHandler = requestedExchange.Client.RequestPairsData(pairs); } else { exchange.TaskHandler = null; } } } /// /// Log info /// /// private void LogInfo(string info) { _logger.LogInformation($"{DateTime.UtcNow:G} {info}"); } /// /// Log debug info /// /// /// private void DebugInfo(string info, bool timestamp = false) { if (timestamp) _logger.LogDebug($"{DateTime.UtcNow:G} {info}"); else _logger.LogDebug($"{info}"); } #endregion } }