Knocks/BackEnd/Knoks.SignalTracking/SignalsTracker.cs

475 lines
19 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Knoks.Core.Data.Interfaces;
using Knoks.Core.Entities;
using Knoks.CryptoExchanges.Data.Interfaces;
using Knoks.CryptoExchanges.Entities;
using Knoks.CryptoExchanges.Entities.Args;
using Microsoft.Extensions.Logging;
using IExchangeDao = Knoks.Core.Data.Interfaces.IExchangeDao;
using IKnokDao = Knoks.Core.Data.Interfaces.IKnokDao;
namespace Knoks.SignalsTracking
{
#region aux classes
class TrackingKnoks : LiveRateItemRequest
{
//public string PairSymbol;
//public int ExchangeId;
public List<Knok> Knoks;
//public DateTime LastRequestDate;
public TrackingKnoks()
{
Knoks = new List<Knok>();
}
}
#endregion
public class SignalsTracker : IDisposable
{
private const int ResolvedKnokStatusId = 5;
private const int OnGoingStatusId = 4;
private readonly ILogger<SignalsTracker> _logger;
private readonly SignalsTrackerSettings _trackerSettings;
public bool IsInitialized { get; private set; }
private readonly string _trackerName = nameof(SignalsTracker);
CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
private Task _worker;
private Task _refreshDataWorker;
private readonly IKnokDao _knokDao;
private readonly IUserDao _userDao;
private readonly IExchangeDao _exchangeDao;
private readonly IExchangePairRateDao _pairRateDao;
private long _lastLoadedSignalId;
//ExachangeId + PairSymbol => LastRequestedDate
private ConcurrentDictionary<string, TrackingKnoks> _trackedKnoks = new ConcurrentDictionary<string, TrackingKnoks>();
private ImmutableDictionary<int, TickerInfo> _tikersDict;
public SignalsTracker(SignalsTrackerSettings trackerSettings, ILogger<SignalsTracker> logger, IKnokDao knokDao, IExchangeDao exchangeDao,
IExchangePairRateDao pairRateDao)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_trackerSettings = trackerSettings ?? throw new ArgumentNullException(nameof(trackerSettings));
_knokDao = knokDao ?? throw new ArgumentNullException(nameof(knokDao));
_exchangeDao = exchangeDao ?? throw new ArgumentNullException(nameof(exchangeDao));
_pairRateDao = pairRateDao ?? throw new ArgumentNullException(nameof(pairRateDao));
}
public async Task Init()
{
if (IsInitialized)
throw new InvalidOperationException("Already initialized");
LogInfo($"Initializing service {_trackerName}..");
//load tickers dictionary
var tickers = await _exchangeDao.GetTickers();
_tikersDict = tickers.ToImmutableDictionary(k => k.TickerId, v => v);
await EnqueueActiveKnoks();
IsInitialized = true;
}
private async Task EnqueueActiveKnoks()
{
var activeKnoks = await _knokDao.GetKnoks(true, _lastLoadedSignalId);
if (activeKnoks.Where(k=>k.KnokId != null).Any())
{
LogInfo($"Found {activeKnoks.Count} active knoks");
StringBuilder sb = new StringBuilder();
foreach (var activeKnok in activeKnoks.Where(k => k.KnokId != null))
{
EnqueueActiveKnok(activeKnok);
sb.AppendLine($"KnokId = {activeKnok.KnokId} enqueued");
}
DebugInfo($"{sb.ToString()}");
}
}
private void EnqueueActiveKnok(Knok activeKnok)
{
if (!activeKnok.TickerId.HasValue)
throw new InvalidDataException($"Invalid knok wiht Id = {activeKnok.KnokId}: TickerId is not set");
string key = GetKnokKey(activeKnok.ExchangeId, _tikersDict[activeKnok.TickerId.Value].TickerSymbol);
if (!_trackedKnoks.ContainsKey(key))
{
_trackedKnoks.TryAdd(key, new TrackingKnoks()
{
ExchangeId = activeKnok.ExchangeId,
PairSymbol = _tikersDict[activeKnok.TickerId.Value].TickerSymbol,
//LaterThanDate = DateTime.UtcNow
LaterThanDate = DateTime.UtcNow.AddYears(-1)//for debug
});
}
_trackedKnoks[key].Knoks.Add(activeKnok);
if (activeKnok.KnokId.Value > _lastLoadedSignalId)
_lastLoadedSignalId = activeKnok.KnokId.Value;
}
private string GetKnokKey(int exchangeId, string pairSymbol)
{
return $"{exchangeId}{pairSymbol}";
}
private void LogInfo(string info)
{
_logger.LogInformation($"{DateTime.UtcNow:G} {info}");
}
private void DebugInfo(string info, bool timestamp = false)
{
if (timestamp)
_logger.LogDebug($"{DateTime.UtcNow:G} {info}");
else
_logger.LogDebug($"{info}");
}
public void Start()
{
if (!IsInitialized)
throw new InvalidOperationException($"Service is not initialized");
if (_worker != null)
throw new InvalidOperationException($"Service is already started ({_worker.Status})");
LogInfo($"Starting {_trackerName}..");
_worker = Task.Run(() => ProcessKnoks_DoWork(_cancellationTokenSource.Token), _cancellationTokenSource.Token);
_refreshDataWorker = Task.Run(() => RefreshData_DoWork(_cancellationTokenSource.Token), _cancellationTokenSource.Token);
LogInfo($"{_trackerName} started.");
}
public void Stop()
{
if (_worker == null)
throw new InvalidOperationException("Workers are not started");
LogInfo($"Signalling {_trackerName} to stop..");
_cancellationTokenSource.Cancel();
LogInfo($"{_trackerName} signalled to stop.");
}
private async Task ProcessKnoks_DoWork(CancellationToken ct)
{
if (ct.IsCancellationRequested)
{
LogInfo("Task was cancelled before it got started.");
ct.ThrowIfCancellationRequested();
}
DateTime cycleBeginTime;
do
{
cycleBeginTime = DateTime.UtcNow;
List<LiveRateItemRequest> pairs = _trackedKnoks.Values.Cast<LiveRateItemRequest>().ToList();
if (!pairs.Any())
continue;
var liveRates = await _pairRateDao.GetLiveRates(pairs);
foreach (var trakingKnok in _trackedKnoks)
{
Dictionary<long, Dictionary<string, object>> knokChanges = new Dictionary<long, Dictionary<string, object>>();
var userChanges = new List<long>();
var liveRate = liveRates.FirstOrDefault(l => l.ExchangeId == trakingKnok.Value.ExchangeId && l.PairSymbol == trakingKnok.Value.PairSymbol);
if (liveRate != null)
trakingKnok.Value.LaterThanDate = liveRate.Timestamp.AddMinutes(-1);
foreach (var knok in trakingKnok.Value.Knoks)
{
if (knok == null || !knok.KnokId.HasValue)
{
continue;
}
Dictionary<string, object> changes;
if (liveRate != null)
{
changes = CheckKnok(knok, liveRate);
}
else
{
changes = new Dictionary<string, object>();
CheckForTimeout(knok, changes, null, _trackerSettings.KnokFeedExpirationHours);
}
if (changes.Any())
{
knokChanges.Add(knok.KnokId.Value, changes);
}
}
await FlushKnokChanges(trakingKnok.Value, knokChanges);
}
} while (!ct.WaitHandle.WaitOne(GetRemainRefreshDataIntervalMs(cycleBeginTime, _trackerSettings.ProcessKnoksIntervalMs)));
LogInfo("Task_DoWork exited.");
}
private async Task FlushKnokChanges(TrackingKnoks trakingKnok, Dictionary<long, Dictionary<string, object>> knokChangesDict)
{
foreach (var knokChanges in knokChangesDict)
{
StringBuilder sb = new StringBuilder();
sb.AppendLine($"Knok with Id = {knokChanges.Key} has changes:");
foreach (var change in knokChanges.Value)
sb.AppendLine($" {change.Key}: {change.Value}");
DebugInfo(sb.ToString());
//TODO: change to bulk processing
knokChanges.Value.Add(nameof(Knok.KnokId), knokChanges.Key);
Knok updatedKnok = await _knokDao.UpdateKnok(knokChanges.Value, _trackerSettings.KnokerWinRate, _trackerSettings.UserRefundRate);
trakingKnok.Knoks.RemoveAll(k => k.KnokId == knokChanges.Key);
if (updatedKnok.KnokStatusId != ResolvedKnokStatusId)
{
trakingKnok.Knoks.Add(updatedKnok);
}
}
}
private Dictionary<string, object> CheckKnok(Knok knok, LiveRateItem liveRate)
{
Dictionary<string, object> result = new Dictionary<string, object>();
try
{
//1. Check for exit of duration interval. Check for feeds availability
CheckForTimeout(knok, result, Convert.ToDecimal(liveRate.BidHigh), _trackerSettings.KnokFeedExpirationHours);
if (knok.KnokStatusId != ResolvedKnokStatusId)
{
//2. Check High rate
if (liveRate.BidHigh > (double)knok.HighRate)
{
knok.HighRate = (decimal)liveRate.BidHigh;
result.Add(nameof(knok.HighRate), knok.HighRate);
}
//3. Check Low rate
if (liveRate.BidLow < (double)knok.LowRate || knok.LowRate == 0)
{
knok.LowRate = (decimal)liveRate.BidLow;
result.Add(nameof(knok.LowRate), knok.LowRate);
}
//4. Check StopLoss (IsFailure)
if (!knok.StopLossTouched && liveRate.BidLow <= (double)knok.StopLoss)
{
knok.StopLossTouched = true;
result.Add(nameof(knok.StopLossTouched), knok.StopLossTouched);
result.Add(nameof(knok.CloseRate), liveRate.BidLow);
knok.KnokStatusId = ResolvedKnokStatusId;
if (!result.ContainsKey(nameof(knok.KnokStatusId)))
{
result.Add(nameof(knok.KnokStatusId), knok.KnokStatusId);
}
else
{
result[nameof(knok.KnokStatusId)] = knok.KnokStatusId;
}
//For failed knok Stop Loss activated: SL / entry from mid*100
var midEntry = ((knok.EntryPriceTo + knok.EntryPriceFrom) / 2m);
var distanceFromTarget = Math.Round(knok.StopLoss / midEntry * 100);
result[nameof(knok.DistanceFromTarget)] = distanceFromTarget;
}
//5. Check entry and exit intervals
else
{
if (!knok.EntryPriceTouched && liveRate.BidHigh >= (double)knok.EntryPriceFrom)
{
knok.EntryPriceTouched = true;
result.Add(nameof(knok.EntryPriceTouched), knok.EntryPriceTouched);
}
if (!knok.ExitPriceTouched && liveRate.BidHigh > (double)knok.ExitPriceFrom)
{
knok.ExitPriceTouched = true;
result.Add(nameof(knok.ExitPriceTouched), knok.ExitPriceTouched);
}
//IsSuccess
if (knok.EntryPriceTouched && knok.ExitPriceTouched)
{
knok.KnokStatusId = ResolvedKnokStatusId;
if (!result.ContainsKey(nameof(knok.KnokStatusId)))
{
result.Add(nameof(knok.KnokStatusId), knok.KnokStatusId);
}
else
{
result[nameof(knok.KnokStatusId)] = knok.KnokStatusId;
}
result.Add(nameof(knok.CloseRate), liveRate.BidHigh);
//For successful knok: Candle high/ exit from mid*100
var midExit = ((knok.ExitPriceTo + knok.ExitPriceFrom) / 2m);
var distanceFromTarget = Math.Round((decimal)liveRate.BidHigh / midExit * 100);
result[nameof(knok.DistanceFromTarget)] = distanceFromTarget;
}
//var distanceFromTarget = Math.Round((decimal)liveRate.BidHigh / ((knok.ExitPriceTo + knok.ExitPriceFrom) / 2) * 100);
//if (!knok.DistanceFromTarget.HasValue || knok.DistanceFromTarget > distanceFromTarget)
//{
// result[nameof(knok.DistanceFromTarget)] = distanceFromTarget;
//}
//var midExit = ((knok.ExitPriceTo + knok.ExitPriceFrom) / 2m);
//if (midExit != 0)
//{
// var distanceFromTarget = Math.Round((decimal)liveRate.BidHigh / midExit * 100);
// if (!knok.DistanceFromTarget.HasValue || knok.DistanceFromTarget > distanceFromTarget)
// {
// result[nameof(knok.DistanceFromTarget)] = distanceFromTarget;
// }
//}
//else
//{
// LogInfo($"Error when calculating DistanceFromTarget. KnokId: {knok.KnokId} ExitPriceFrom: {knok.ExitPriceFrom}, ExitPriceTo: {knok.ExitPriceTo}");
//}
}
}
return result;
}
catch (Exception ex) {
LogInfo($"An error when checking knok (CheckKnok) KnokId: {knok.KnokId}, error: \"{ex.Message}\"");
return new Dictionary<string, object>();
}
}
private void CheckForTimeout(Knok knok, Dictionary<string, object> result, decimal? closeRate, int knokFeedExpirationHours)
{
try {
if (knok.CreateDate.HasValue && DateTime.UtcNow > knok.CreateDate.Value.AddDays(knok.Duration))
{
knok.KnokStatusId = ResolvedKnokStatusId;
if (!result.ContainsKey(nameof(knok.KnokStatusId)))
{
result.Add(nameof(knok.KnokStatusId), knok.KnokStatusId);
}
else
{
result[nameof(knok.KnokStatusId)] = knok.KnokStatusId;
}
//For failed knok time out: Close rate/ exit from mid*100
if (closeRate.HasValue)
{
var midExit = ((knok.ExitPriceTo + knok.ExitPriceFrom) / 2m);
var distanceFromTarget = Math.Round(closeRate.Value / midExit * 100);
result[nameof(knok.DistanceFromTarget)] = distanceFromTarget;
}
}
else if (knok.CreateDate.HasValue && DateTime.UtcNow > knok.CreateDate.Value.AddHours(knokFeedExpirationHours) && knok.KnokStatusId != OnGoingStatusId)
{
knok.KnokStatusId = OnGoingStatusId;
if (!result.ContainsKey(nameof(knok.KnokStatusId)))
{
result.Add(nameof(knok.KnokStatusId), knok.KnokStatusId);
}
else
{
result[nameof(knok.KnokStatusId)] = knok.KnokStatusId;
}
}
}
catch (Exception ex)
{
LogInfo($"An error when checking for knok timeout (CheckForTimeout) KnokId: {knok.KnokId}, error: \"{ex.Message}\"");
}
}
private async Task RefreshData_DoWork(CancellationToken ct)
{
if (ct.IsCancellationRequested)
{
LogInfo("Task was cancelled before it got started.");
ct.ThrowIfCancellationRequested();
}
DateTime cycleBeginTime;
do
{
cycleBeginTime = DateTime.UtcNow;
await EnqueueActiveKnoks();
} while (!ct.WaitHandle.WaitOne(GetRemainRefreshDataIntervalMs(cycleBeginTime, _trackerSettings.RefreshDataIntervalMs)));
LogInfo("RefreshData_DoWork exited.");
}
private int GetRemainRefreshDataIntervalMs(DateTime cycleBeginTime, int intervalMs)
{
var cycleMS = (int)(DateTime.UtcNow - cycleBeginTime).TotalMilliseconds;
if (cycleMS < intervalMs)
return intervalMs - cycleMS;
return 0;
}
public async Task WaitForCompletion()
{
LogInfo("Waiting for service to stop..");
try
{
await Task.WhenAll(_worker, _refreshDataWorker);
LogInfo($"{_trackerName} stopped. Really.");
}
catch (AggregateException e)
{
LogInfo("\nStopping failed: AggregateException thrown with the following inner exceptions:");
// Display information about each exception.
foreach (var v in e.InnerExceptions)
{
if (v is TaskCanceledException)
LogInfo(" TaskCanceledException: Task " + ((TaskCanceledException)v).Task.Id);
else
LogInfo(" Exception: " + v.GetType().Name);
}
}
}
public void Dispose()
{
_cancellationTokenSource?.Dispose();
_worker?.Dispose();
}
}
}