Knocks/BackEnd/Knoks.CryptoExchanges/Managers/CryptoExchangeManager.cs

302 lines
12 KiB
C#

using Knoks.CryptoExchanges.Entities;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using WebSocket.Interfaces;
//using Knoks.CryptoExchanges.Classes;
using Knoks.Framework.DataAccess;
using System.IO;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Knoks.CryptoExchanges.Data.Interfaces;
using Knoks.CryptoExchanges.Data.Dao;
using Knoks.CryptoExchanges.Entities.Args;
using System.Threading;
using System.Text;
namespace Knoks.CryptoExchanges
{
public class CryptoExchangeManager
{
#region Fields
private List<ExchangeClientHandler> Exchanges { get; set; }
private UpdateData saveData;
private readonly List<KnokExchangeData>[] testSet = new List<KnokExchangeData>[2];
private readonly int last = Convert.ToInt32(false);
public static IConfiguration Configuration { get; set; }
private IExchangeDao _exchangeDao;
private IKnokDao _knokDao;
private IExchangePairRateDao _exchangePairRateDao;
private readonly Dictionary<int, List<Tuple<string, string>>> pairs = new Dictionary<int, List<Tuple<string, string>>>();
//private bool exit = false;
private Task exchangePairsTracerTask;
private readonly double mitunteIntervalCheck;
private readonly Dictionary<int, DateTime> lastExchangesResponse = new Dictionary<int, DateTime>();
CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
ILogger _logger;
#endregion
#region Public
/// <summary>
/// ctor
/// </summary>
public CryptoExchangeManager() {
var builder = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json");
Configuration = builder.Build();
if (double.TryParse(Configuration["MinuteIntervalCheck"], out var val))
{
this.mitunteIntervalCheck = val;
}
else {
this.mitunteIntervalCheck = 1;
}
ILoggerFactory loggerFactory = new LoggerFactory()
.AddConsole(Configuration.GetSection("Logging"))
.AddDebug();
_logger = loggerFactory.CreateLogger<ProcExecutor>();
Dictionary<string, string> items = Configuration.GetSection("DbConnections").
GetChildren().ToDictionary(x=>x.Key, x => x.Value);
var provider = new ProcExecutor(items,
loggerFactory.CreateLogger<ProcExecutor>());
_exchangeDao = new ExchangeDao(provider);
_knokDao = new KnokDao(provider);
_exchangePairRateDao = new ExchangePairRateDao(provider);
}
/// <summary>
/// Starts initial request to all CryptoExchanges
/// </summary>
/// <returns></returns>
public bool StartExchangesRequest() {
this.saveData = (exchangeId, exchange, x) => {
var replaceSymbols = Configuration.GetSection($"{exchange}:ReplaceSymbols").GetChildren()
.ToDictionary(s=>s.Key, s=>s.Value);
this.lastExchangesResponse[exchangeId] = DateTime.Now;
foreach (var item in x)
{
var pair = item.Key;
// Replace TUSD => USDT for Bittrex
//if (replaceSymbols != null)
//{
// var value = pair.Split('-');
// if (value.Count() == 2)
// {
// var symbol1 = replaceSymbols.FirstOrDefault(rs => rs.Value.Equals(value[0]));
// if (!symbol1.Equals(default(KeyValuePair<string, string>)))
// {
// pair = $"{symbol1.Key}{value[1]}";
// }
// }
//}
_exchangePairRateDao.UpdateExchangePairRate(exchangeId,
new ExchangePairRateArg()
{
PairSymbol = pair.Replace("-", string.Empty).Replace(":", string.Empty),
Open = item.Value.OpenPrice,
High = item.Value.HighPrice,
Low = item.Value.LowPrice,
Close = item.Value.ClosePrice,
//Volume = item.Value.Volume,
StartDate = item.Value.TimeStamp
});
}
var sb = new StringBuilder();
sb.AppendLine($"Exchange: \"{exchange}\"");
foreach (var item in x)
{
sb.AppendLine($" Pair: {item.Key}");
sb.AppendLine($" Timestamp: {item.Value.TimeStamp.ToShortDateString()} {item.Value.TimeStamp.ToShortTimeString()}");
sb.AppendLine($" High: {item.Value.HighPrice}");
sb.AppendLine($" Low: {item.Value.LowPrice}");
sb.AppendLine();
}
DebugInfo(sb.ToString());
};
InitExchagesClients().Wait();
this.exchangePairsTracerTask = Task.Run(() => ExchangePairsTracer(_cancellationTokenSource.Token), _cancellationTokenSource.Token);
LogInfo($"CryptoExchange service is ran.");
return true;
}
/// <summary>
/// Stops request to all CryptoExchanges
/// </summary>
/// <returns></returns>
public async Task StopExchangesRequest()
{
//this.exit = true;
LogInfo($"CryptoExchange service is stopping...");
_cancellationTokenSource.Cancel();
await Task.WhenAll(this.exchangePairsTracerTask);
await Task.WhenAll(Exchanges.Select(x=>x.Client.CloseConnection()));
}
#endregion
#region Private
/// <summary>
/// Background proccess for the pairs tracing
/// </summary>
/// <returns></returns>
private async Task ExchangePairsTracer(CancellationToken token)
{
do //while (!this.exit)
{
await CheckPairsChange();
// Task.Delay(TimeSpan.FromMinutes(this.mitunteIntervalCheck)).Wait();
} while (!token.WaitHandle.WaitOne(TimeSpan.FromMinutes(this.mitunteIntervalCheck))); // GetRemainRefreshDataIntervalMs(cycleBeginTime, _trackerSettings.ProcessKnoksIntervalMs)));
}
/// <summary>
/// Comparator of the exchange pairs list
/// </summary>
/// <returns></returns>
private async Task CheckPairsChange()
{
foreach (var exchange in Exchanges)
{
var exchangePairs = await GetActiveKnoksDataByExchange(exchange.ExchangeId);
var pairsChanged = true;
if (this.pairs.Keys.Contains(exchange.ExchangeId))
{
var firstNotSecond = this.pairs[exchange.ExchangeId].Except(exchangePairs).ToList();
var secondNotFirst = exchangePairs.Except(this.pairs[exchange.ExchangeId]).ToList();
pairsChanged = firstNotSecond.Any() || secondNotFirst.Any();
}
this.pairs[exchange.ExchangeId] = exchangePairs.ToList();
if ((pairsChanged && exchangePairs.Any()) || (this.lastExchangesResponse.Keys.Contains(exchange.ExchangeId) && (DateTime.Now - this.lastExchangesResponse[exchange.ExchangeId]).Minutes > 10))
{
UpdateExchangeRequest(exchange);
}
}
await RecalcAggregations();
}
/// <summary>
/// The clients initialization
/// </summary>
/// <returns></returns>
private async Task InitExchagesClients() {
Exchanges = new List<ExchangeClientHandler>();
foreach (var item in await GetExchanges())
{
var exchange = new ExchangeClientHandler
{
ExchangeId = item.ExchangeId,
Client = CryptoExchangeFactory.InitialyzeExchange(item.ExchangeId, item.ExchangeName, CryptoExchangeManager.Configuration)
};
Exchanges.Add(exchange);
if (exchange.Client != null)
{
await exchange.Client.InitClient(saveData);
}
}
}
/// <summary>
/// Returns all CeyptoExchanges from DB
/// </summary>
/// <returns></returns>
private async Task<IEnumerable<Exchange>> GetExchanges() {
return await _exchangeDao.GetExchanges();
}
/// <summary>
/// Get list of active knoks pairs
/// </summary>
/// <param name="exchangeId"></param>
/// <returns></returns>
private async Task<IEnumerable<Tuple<string, string>>> GetActiveKnoksDataByExchange(int exchangeId)
{
return await _knokDao.GetActiveKnoksDataByExchange(exchangeId);
}
/// <summary>
/// Recalculate all aggregations except 1 minute
/// </summary>
/// <returns></returns>
private async Task RecalcAggregations()
{
await _knokDao.RecalcAggregations();
}
/// <summary>
/// Reconnect exchanges with new pairs
/// </summary>
/// <param name="exchange"></param>
private void UpdateExchangeRequest(ExchangeClientHandler exchange)
{
var requestedExchange = Exchanges.FirstOrDefault(x => x.ExchangeId == exchange.ExchangeId);
if (requestedExchange != null) {
if (exchange.TaskHandler != null)
{
requestedExchange.Client.Reconnect().Wait();
Task.WaitAll(exchange.TaskHandler);
}
var pairs = this.pairs[exchange.ExchangeId]; // await GetActiveKnoksDataByExchange(exchange.ExchangeId); // await knoks. SelectMany(x => x.TradingPairs).Distinct().ToList();
if (pairs.Any())
{
var sb = new StringBuilder();
sb.AppendLine($"Reconnect for ExchangeId: {exchange.ExchangeId}");
foreach (var i in pairs) {
sb.AppendLine($" Pair: {i}");
}
LogInfo(sb.ToString());
// Replace USDT => TUSD for Bittrex
//if (requestedExchange.Client.ReplaceSymbols != null)
//{
// for (var i = 0; i < pairs.Count; i++)
// {
// if (requestedExchange.Client.ReplaceSymbols.Keys.Contains(pairs[i].Item1))
// {
// pairs[i] = new Tuple<string, string>(requestedExchange.Client.ReplaceSymbols[pairs[i].Item1], pairs[i].Item2);
// }
// }
//}
exchange.TaskHandler = requestedExchange.Client.RequestPairsData(pairs);
}
else {
exchange.TaskHandler = null;
}
}
}
/// <summary>
/// Log info
/// </summary>
/// <param name="info"></param>
private void LogInfo(string info)
{
_logger.LogInformation($"{DateTime.UtcNow:G} {info}");
}
/// <summary>
/// Log debug info
/// </summary>
/// <param name="info"></param>
/// <param name="timestamp"></param>
private void DebugInfo(string info, bool timestamp = false)
{
if (timestamp)
_logger.LogDebug($"{DateTime.UtcNow:G} {info}");
else
_logger.LogDebug($"{info}");
}
#endregion
}
}