using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.Immutable; using System.IO; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using Knoks.Core.Data.Interfaces; using Knoks.Core.Entities; using Knoks.CryptoExchanges.Data.Interfaces; using Knoks.CryptoExchanges.Entities; using Knoks.CryptoExchanges.Entities.Args; using Microsoft.Extensions.Logging; using IExchangeDao = Knoks.Core.Data.Interfaces.IExchangeDao; using IKnokDao = Knoks.Core.Data.Interfaces.IKnokDao; namespace Knoks.SignalsTracking { #region aux classes class TrackingKnoks : LiveRateItemRequest { //public string PairSymbol; //public int ExchangeId; public List Knoks; //public DateTime LastRequestDate; public TrackingKnoks() { Knoks = new List(); } } #endregion public class SignalsTracker : IDisposable { private const int ResolvedKnokStatusId = 5; private const int OnGoingStatusId = 4; private readonly ILogger _logger; private readonly SignalsTrackerSettings _trackerSettings; public bool IsInitialized { get; private set; } private readonly string _trackerName = nameof(SignalsTracker); CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); private Task _worker; private Task _refreshDataWorker; private readonly IKnokDao _knokDao; private readonly IUserDao _userDao; private readonly IExchangeDao _exchangeDao; private readonly IExchangePairRateDao _pairRateDao; private long _lastLoadedSignalId; //ExachangeId + PairSymbol => LastRequestedDate private ConcurrentDictionary _trackedKnoks = new ConcurrentDictionary(); private ImmutableDictionary _tikersDict; public SignalsTracker(SignalsTrackerSettings trackerSettings, ILogger logger, IKnokDao knokDao, IExchangeDao exchangeDao, IExchangePairRateDao pairRateDao) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _trackerSettings = trackerSettings ?? throw new ArgumentNullException(nameof(trackerSettings)); _knokDao = knokDao ?? throw new ArgumentNullException(nameof(knokDao)); _exchangeDao = exchangeDao ?? throw new ArgumentNullException(nameof(exchangeDao)); _pairRateDao = pairRateDao ?? throw new ArgumentNullException(nameof(pairRateDao)); } public async Task Init() { if (IsInitialized) throw new InvalidOperationException("Already initialized"); LogInfo($"Initializing service {_trackerName}.."); //load tickers dictionary var tickers = await _exchangeDao.GetTickers(); _tikersDict = tickers.ToImmutableDictionary(k => k.TickerId, v => v); await EnqueueActiveKnoks(); IsInitialized = true; } private async Task EnqueueActiveKnoks() { var activeKnoks = await _knokDao.GetKnoks(true, _lastLoadedSignalId); if (activeKnoks.Where(k=>k.KnokId != null).Any()) { LogInfo($"Found {activeKnoks.Count} active knoks"); StringBuilder sb = new StringBuilder(); foreach (var activeKnok in activeKnoks.Where(k => k.KnokId != null)) { EnqueueActiveKnok(activeKnok); sb.AppendLine($"KnokId = {activeKnok.KnokId} enqueued"); } DebugInfo($"{sb.ToString()}"); } } private void EnqueueActiveKnok(Knok activeKnok) { if (!activeKnok.TickerId.HasValue) throw new InvalidDataException($"Invalid knok wiht Id = {activeKnok.KnokId}: TickerId is not set"); string key = GetKnokKey(activeKnok.ExchangeId, _tikersDict[activeKnok.TickerId.Value].TickerSymbol); if (!_trackedKnoks.ContainsKey(key)) { _trackedKnoks.TryAdd(key, new TrackingKnoks() { ExchangeId = activeKnok.ExchangeId, PairSymbol = _tikersDict[activeKnok.TickerId.Value].TickerSymbol, //LaterThanDate = DateTime.UtcNow LaterThanDate = DateTime.UtcNow.AddYears(-1)//for debug }); } _trackedKnoks[key].Knoks.Add(activeKnok); if (activeKnok.KnokId.Value > _lastLoadedSignalId) _lastLoadedSignalId = activeKnok.KnokId.Value; } private string GetKnokKey(int exchangeId, string pairSymbol) { return $"{exchangeId}{pairSymbol}"; } private void LogInfo(string info) { _logger.LogInformation($"{DateTime.UtcNow:G} {info}"); } private void DebugInfo(string info, bool timestamp = false) { if (timestamp) _logger.LogDebug($"{DateTime.UtcNow:G} {info}"); else _logger.LogDebug($"{info}"); } public void Start() { if (!IsInitialized) throw new InvalidOperationException($"Service is not initialized"); if (_worker != null) throw new InvalidOperationException($"Service is already started ({_worker.Status})"); LogInfo($"Starting {_trackerName}.."); _worker = Task.Run(() => ProcessKnoks_DoWork(_cancellationTokenSource.Token), _cancellationTokenSource.Token); _refreshDataWorker = Task.Run(() => RefreshData_DoWork(_cancellationTokenSource.Token), _cancellationTokenSource.Token); LogInfo($"{_trackerName} started."); } public void Stop() { if (_worker == null) throw new InvalidOperationException("Workers are not started"); LogInfo($"Signalling {_trackerName} to stop.."); _cancellationTokenSource.Cancel(); LogInfo($"{_trackerName} signalled to stop."); } private async Task ProcessKnoks_DoWork(CancellationToken ct) { if (ct.IsCancellationRequested) { LogInfo("Task was cancelled before it got started."); ct.ThrowIfCancellationRequested(); } DateTime cycleBeginTime; do { cycleBeginTime = DateTime.UtcNow; List pairs = _trackedKnoks.Values.Cast().ToList(); if (!pairs.Any()) continue; var liveRates = await _pairRateDao.GetLiveRates(pairs); foreach (var trakingKnok in _trackedKnoks) { Dictionary> knokChanges = new Dictionary>(); var userChanges = new List(); var liveRate = liveRates.FirstOrDefault(l => l.ExchangeId == trakingKnok.Value.ExchangeId && l.PairSymbol == trakingKnok.Value.PairSymbol); if (liveRate != null) trakingKnok.Value.LaterThanDate = liveRate.Timestamp.AddMinutes(-1); foreach (var knok in trakingKnok.Value.Knoks) { if (knok == null || !knok.KnokId.HasValue) { continue; } Dictionary changes; if (liveRate != null) { changes = CheckKnok(knok, liveRate); } else { changes = new Dictionary(); CheckForTimeout(knok, changes, null, _trackerSettings.KnokFeedExpirationHours); } if (changes.Any()) { knokChanges.Add(knok.KnokId.Value, changes); } } await FlushKnokChanges(trakingKnok.Value, knokChanges); } } while (!ct.WaitHandle.WaitOne(GetRemainRefreshDataIntervalMs(cycleBeginTime, _trackerSettings.ProcessKnoksIntervalMs))); LogInfo("Task_DoWork exited."); } private async Task FlushKnokChanges(TrackingKnoks trakingKnok, Dictionary> knokChangesDict) { foreach (var knokChanges in knokChangesDict) { StringBuilder sb = new StringBuilder(); sb.AppendLine($"Knok with Id = {knokChanges.Key} has changes:"); foreach (var change in knokChanges.Value) sb.AppendLine($" {change.Key}: {change.Value}"); DebugInfo(sb.ToString()); //TODO: change to bulk processing knokChanges.Value.Add(nameof(Knok.KnokId), knokChanges.Key); Knok updatedKnok = await _knokDao.UpdateKnok(knokChanges.Value, _trackerSettings.KnokerWinRate, _trackerSettings.UserRefundRate); trakingKnok.Knoks.RemoveAll(k => k.KnokId == knokChanges.Key); if (updatedKnok.KnokStatusId != ResolvedKnokStatusId) { trakingKnok.Knoks.Add(updatedKnok); } } } private Dictionary CheckKnok(Knok knok, LiveRateItem liveRate) { Dictionary result = new Dictionary(); try { //1. Check for exit of duration interval. Check for feeds availability CheckForTimeout(knok, result, Convert.ToDecimal(liveRate.BidHigh), _trackerSettings.KnokFeedExpirationHours); if (knok.KnokStatusId != ResolvedKnokStatusId) { //2. Check High rate if (liveRate.BidHigh > (double)knok.HighRate) { knok.HighRate = (decimal)liveRate.BidHigh; result.Add(nameof(knok.HighRate), knok.HighRate); } //3. Check Low rate if (liveRate.BidLow < (double)knok.LowRate || knok.LowRate == 0) { knok.LowRate = (decimal)liveRate.BidLow; result.Add(nameof(knok.LowRate), knok.LowRate); } //4. Check StopLoss (IsFailure) if (!knok.StopLossTouched && liveRate.BidLow <= (double)knok.StopLoss) { knok.StopLossTouched = true; result.Add(nameof(knok.StopLossTouched), knok.StopLossTouched); result.Add(nameof(knok.CloseRate), liveRate.BidLow); knok.KnokStatusId = ResolvedKnokStatusId; if (!result.ContainsKey(nameof(knok.KnokStatusId))) { result.Add(nameof(knok.KnokStatusId), knok.KnokStatusId); } else { result[nameof(knok.KnokStatusId)] = knok.KnokStatusId; } //For failed knok – Stop Loss activated: SL / entry from mid*100 var midEntry = ((knok.EntryPriceTo + knok.EntryPriceFrom) / 2m); var distanceFromTarget = Math.Round(knok.StopLoss / midEntry * 100); result[nameof(knok.DistanceFromTarget)] = distanceFromTarget; } //5. Check entry and exit intervals else { if (!knok.EntryPriceTouched && liveRate.BidHigh >= (double)knok.EntryPriceFrom) { knok.EntryPriceTouched = true; result.Add(nameof(knok.EntryPriceTouched), knok.EntryPriceTouched); } if (!knok.ExitPriceTouched && liveRate.BidHigh > (double)knok.ExitPriceFrom) { knok.ExitPriceTouched = true; result.Add(nameof(knok.ExitPriceTouched), knok.ExitPriceTouched); } //IsSuccess if (knok.EntryPriceTouched && knok.ExitPriceTouched) { knok.KnokStatusId = ResolvedKnokStatusId; if (!result.ContainsKey(nameof(knok.KnokStatusId))) { result.Add(nameof(knok.KnokStatusId), knok.KnokStatusId); } else { result[nameof(knok.KnokStatusId)] = knok.KnokStatusId; } result.Add(nameof(knok.CloseRate), liveRate.BidHigh); //For successful knok: Candle high/ exit from mid*100 var midExit = ((knok.ExitPriceTo + knok.ExitPriceFrom) / 2m); var distanceFromTarget = Math.Round((decimal)liveRate.BidHigh / midExit * 100); result[nameof(knok.DistanceFromTarget)] = distanceFromTarget; } //var distanceFromTarget = Math.Round((decimal)liveRate.BidHigh / ((knok.ExitPriceTo + knok.ExitPriceFrom) / 2) * 100); //if (!knok.DistanceFromTarget.HasValue || knok.DistanceFromTarget > distanceFromTarget) //{ // result[nameof(knok.DistanceFromTarget)] = distanceFromTarget; //} //var midExit = ((knok.ExitPriceTo + knok.ExitPriceFrom) / 2m); //if (midExit != 0) //{ // var distanceFromTarget = Math.Round((decimal)liveRate.BidHigh / midExit * 100); // if (!knok.DistanceFromTarget.HasValue || knok.DistanceFromTarget > distanceFromTarget) // { // result[nameof(knok.DistanceFromTarget)] = distanceFromTarget; // } //} //else //{ // LogInfo($"Error when calculating DistanceFromTarget. KnokId: {knok.KnokId} ExitPriceFrom: {knok.ExitPriceFrom}, ExitPriceTo: {knok.ExitPriceTo}"); //} } } return result; } catch (Exception ex) { LogInfo($"An error when checking knok (CheckKnok) KnokId: {knok.KnokId}, error: \"{ex.Message}\""); return new Dictionary(); } } private void CheckForTimeout(Knok knok, Dictionary result, decimal? closeRate, int knokFeedExpirationHours) { try { if (knok.CreateDate.HasValue && DateTime.UtcNow > knok.CreateDate.Value.AddDays(knok.Duration)) { knok.KnokStatusId = ResolvedKnokStatusId; if (!result.ContainsKey(nameof(knok.KnokStatusId))) { result.Add(nameof(knok.KnokStatusId), knok.KnokStatusId); } else { result[nameof(knok.KnokStatusId)] = knok.KnokStatusId; } //For failed knok – time out: Close rate/ exit from mid*100 if (closeRate.HasValue) { var midExit = ((knok.ExitPriceTo + knok.ExitPriceFrom) / 2m); var distanceFromTarget = Math.Round(closeRate.Value / midExit * 100); result[nameof(knok.DistanceFromTarget)] = distanceFromTarget; } } else if (knok.CreateDate.HasValue && DateTime.UtcNow > knok.CreateDate.Value.AddHours(knokFeedExpirationHours) && knok.KnokStatusId != OnGoingStatusId) { knok.KnokStatusId = OnGoingStatusId; if (!result.ContainsKey(nameof(knok.KnokStatusId))) { result.Add(nameof(knok.KnokStatusId), knok.KnokStatusId); } else { result[nameof(knok.KnokStatusId)] = knok.KnokStatusId; } } } catch (Exception ex) { LogInfo($"An error when checking for knok timeout (CheckForTimeout) KnokId: {knok.KnokId}, error: \"{ex.Message}\""); } } private async Task RefreshData_DoWork(CancellationToken ct) { if (ct.IsCancellationRequested) { LogInfo("Task was cancelled before it got started."); ct.ThrowIfCancellationRequested(); } DateTime cycleBeginTime; do { cycleBeginTime = DateTime.UtcNow; await EnqueueActiveKnoks(); } while (!ct.WaitHandle.WaitOne(GetRemainRefreshDataIntervalMs(cycleBeginTime, _trackerSettings.RefreshDataIntervalMs))); LogInfo("RefreshData_DoWork exited."); } private int GetRemainRefreshDataIntervalMs(DateTime cycleBeginTime, int intervalMs) { var cycleMS = (int)(DateTime.UtcNow - cycleBeginTime).TotalMilliseconds; if (cycleMS < intervalMs) return intervalMs - cycleMS; return 0; } public async Task WaitForCompletion() { LogInfo("Waiting for service to stop.."); try { await Task.WhenAll(_worker, _refreshDataWorker); LogInfo($"{_trackerName} stopped. Really."); } catch (AggregateException e) { LogInfo("\nStopping failed: AggregateException thrown with the following inner exceptions:"); // Display information about each exception. foreach (var v in e.InnerExceptions) { if (v is TaskCanceledException) LogInfo(" TaskCanceledException: Task " + ((TaskCanceledException)v).Task.Id); else LogInfo(" Exception: " + v.GetType().Name); } } } public void Dispose() { _cancellationTokenSource?.Dispose(); _worker?.Dispose(); } } }