475 lines
19 KiB
C#
475 lines
19 KiB
C#
using System;
|
||
using System.Collections.Concurrent;
|
||
using System.Collections.Generic;
|
||
using System.Collections.Immutable;
|
||
using System.IO;
|
||
using System.Linq;
|
||
using System.Text;
|
||
using System.Threading;
|
||
using System.Threading.Tasks;
|
||
using Knoks.Core.Data.Interfaces;
|
||
using Knoks.Core.Entities;
|
||
using Knoks.CryptoExchanges.Data.Interfaces;
|
||
using Knoks.CryptoExchanges.Entities;
|
||
using Knoks.CryptoExchanges.Entities.Args;
|
||
using Microsoft.Extensions.Logging;
|
||
using IExchangeDao = Knoks.Core.Data.Interfaces.IExchangeDao;
|
||
using IKnokDao = Knoks.Core.Data.Interfaces.IKnokDao;
|
||
|
||
namespace Knoks.SignalsTracking
|
||
{
|
||
#region aux classes
|
||
|
||
class TrackingKnoks : LiveRateItemRequest
|
||
{
|
||
//public string PairSymbol;
|
||
//public int ExchangeId;
|
||
|
||
public List<Knok> Knoks;
|
||
//public DateTime LastRequestDate;
|
||
|
||
public TrackingKnoks()
|
||
{
|
||
Knoks = new List<Knok>();
|
||
}
|
||
}
|
||
#endregion
|
||
|
||
public class SignalsTracker : IDisposable
|
||
{
|
||
private const int ResolvedKnokStatusId = 5;
|
||
private const int OnGoingStatusId = 4;
|
||
|
||
private readonly ILogger<SignalsTracker> _logger;
|
||
private readonly SignalsTrackerSettings _trackerSettings;
|
||
|
||
public bool IsInitialized { get; private set; }
|
||
|
||
private readonly string _trackerName = nameof(SignalsTracker);
|
||
|
||
CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
|
||
|
||
private Task _worker;
|
||
private Task _refreshDataWorker;
|
||
|
||
private readonly IKnokDao _knokDao;
|
||
private readonly IUserDao _userDao;
|
||
private readonly IExchangeDao _exchangeDao;
|
||
private readonly IExchangePairRateDao _pairRateDao;
|
||
|
||
private long _lastLoadedSignalId;
|
||
|
||
//ExachangeId + PairSymbol => LastRequestedDate
|
||
private ConcurrentDictionary<string, TrackingKnoks> _trackedKnoks = new ConcurrentDictionary<string, TrackingKnoks>();
|
||
private ImmutableDictionary<int, TickerInfo> _tikersDict;
|
||
|
||
public SignalsTracker(SignalsTrackerSettings trackerSettings, ILogger<SignalsTracker> logger, IKnokDao knokDao, IExchangeDao exchangeDao,
|
||
IExchangePairRateDao pairRateDao)
|
||
{
|
||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||
_trackerSettings = trackerSettings ?? throw new ArgumentNullException(nameof(trackerSettings));
|
||
|
||
_knokDao = knokDao ?? throw new ArgumentNullException(nameof(knokDao));
|
||
_exchangeDao = exchangeDao ?? throw new ArgumentNullException(nameof(exchangeDao));
|
||
_pairRateDao = pairRateDao ?? throw new ArgumentNullException(nameof(pairRateDao));
|
||
}
|
||
|
||
public async Task Init()
|
||
{
|
||
if (IsInitialized)
|
||
throw new InvalidOperationException("Already initialized");
|
||
|
||
LogInfo($"Initializing service {_trackerName}..");
|
||
|
||
//load tickers dictionary
|
||
var tickers = await _exchangeDao.GetTickers();
|
||
_tikersDict = tickers.ToImmutableDictionary(k => k.TickerId, v => v);
|
||
|
||
await EnqueueActiveKnoks();
|
||
|
||
IsInitialized = true;
|
||
}
|
||
|
||
private async Task EnqueueActiveKnoks()
|
||
{
|
||
var activeKnoks = await _knokDao.GetKnoks(true, _lastLoadedSignalId);
|
||
|
||
if (activeKnoks.Where(k=>k.KnokId != null).Any())
|
||
{
|
||
LogInfo($"Found {activeKnoks.Count} active knoks");
|
||
|
||
StringBuilder sb = new StringBuilder();
|
||
foreach (var activeKnok in activeKnoks.Where(k => k.KnokId != null))
|
||
{
|
||
EnqueueActiveKnok(activeKnok);
|
||
sb.AppendLine($"KnokId = {activeKnok.KnokId} enqueued");
|
||
}
|
||
DebugInfo($"{sb.ToString()}");
|
||
}
|
||
}
|
||
|
||
private void EnqueueActiveKnok(Knok activeKnok)
|
||
{
|
||
if (!activeKnok.TickerId.HasValue)
|
||
throw new InvalidDataException($"Invalid knok wiht Id = {activeKnok.KnokId}: TickerId is not set");
|
||
|
||
string key = GetKnokKey(activeKnok.ExchangeId, _tikersDict[activeKnok.TickerId.Value].TickerSymbol);
|
||
|
||
if (!_trackedKnoks.ContainsKey(key))
|
||
{
|
||
_trackedKnoks.TryAdd(key, new TrackingKnoks()
|
||
{
|
||
ExchangeId = activeKnok.ExchangeId,
|
||
PairSymbol = _tikersDict[activeKnok.TickerId.Value].TickerSymbol,
|
||
//LaterThanDate = DateTime.UtcNow
|
||
LaterThanDate = DateTime.UtcNow.AddYears(-1)//for debug
|
||
});
|
||
}
|
||
|
||
_trackedKnoks[key].Knoks.Add(activeKnok);
|
||
|
||
if (activeKnok.KnokId.Value > _lastLoadedSignalId)
|
||
_lastLoadedSignalId = activeKnok.KnokId.Value;
|
||
}
|
||
|
||
private string GetKnokKey(int exchangeId, string pairSymbol)
|
||
{
|
||
return $"{exchangeId}{pairSymbol}";
|
||
}
|
||
|
||
private void LogInfo(string info)
|
||
{
|
||
_logger.LogInformation($"{DateTime.UtcNow:G} {info}");
|
||
}
|
||
|
||
private void DebugInfo(string info, bool timestamp = false)
|
||
{
|
||
if (timestamp)
|
||
_logger.LogDebug($"{DateTime.UtcNow:G} {info}");
|
||
else
|
||
_logger.LogDebug($"{info}");
|
||
}
|
||
|
||
public void Start()
|
||
{
|
||
if (!IsInitialized)
|
||
throw new InvalidOperationException($"Service is not initialized");
|
||
|
||
if (_worker != null)
|
||
throw new InvalidOperationException($"Service is already started ({_worker.Status})");
|
||
|
||
LogInfo($"Starting {_trackerName}..");
|
||
|
||
_worker = Task.Run(() => ProcessKnoks_DoWork(_cancellationTokenSource.Token), _cancellationTokenSource.Token);
|
||
_refreshDataWorker = Task.Run(() => RefreshData_DoWork(_cancellationTokenSource.Token), _cancellationTokenSource.Token);
|
||
|
||
LogInfo($"{_trackerName} started.");
|
||
}
|
||
|
||
public void Stop()
|
||
{
|
||
if (_worker == null)
|
||
throw new InvalidOperationException("Workers are not started");
|
||
|
||
LogInfo($"Signalling {_trackerName} to stop..");
|
||
_cancellationTokenSource.Cancel();
|
||
LogInfo($"{_trackerName} signalled to stop.");
|
||
}
|
||
|
||
private async Task ProcessKnoks_DoWork(CancellationToken ct)
|
||
{
|
||
if (ct.IsCancellationRequested)
|
||
{
|
||
LogInfo("Task was cancelled before it got started.");
|
||
ct.ThrowIfCancellationRequested();
|
||
}
|
||
|
||
DateTime cycleBeginTime;
|
||
do
|
||
{
|
||
cycleBeginTime = DateTime.UtcNow;
|
||
|
||
List<LiveRateItemRequest> pairs = _trackedKnoks.Values.Cast<LiveRateItemRequest>().ToList();
|
||
|
||
if (!pairs.Any())
|
||
continue;
|
||
|
||
var liveRates = await _pairRateDao.GetLiveRates(pairs);
|
||
|
||
foreach (var trakingKnok in _trackedKnoks)
|
||
{
|
||
Dictionary<long, Dictionary<string, object>> knokChanges = new Dictionary<long, Dictionary<string, object>>();
|
||
|
||
var userChanges = new List<long>();
|
||
var liveRate = liveRates.FirstOrDefault(l => l.ExchangeId == trakingKnok.Value.ExchangeId && l.PairSymbol == trakingKnok.Value.PairSymbol);
|
||
|
||
if (liveRate != null)
|
||
trakingKnok.Value.LaterThanDate = liveRate.Timestamp.AddMinutes(-1);
|
||
|
||
foreach (var knok in trakingKnok.Value.Knoks)
|
||
{
|
||
if (knok == null || !knok.KnokId.HasValue)
|
||
{
|
||
continue;
|
||
}
|
||
|
||
Dictionary<string, object> changes;
|
||
if (liveRate != null)
|
||
{
|
||
changes = CheckKnok(knok, liveRate);
|
||
}
|
||
else
|
||
{
|
||
changes = new Dictionary<string, object>();
|
||
CheckForTimeout(knok, changes, null, _trackerSettings.KnokFeedExpirationHours);
|
||
}
|
||
|
||
if (changes.Any())
|
||
{
|
||
knokChanges.Add(knok.KnokId.Value, changes);
|
||
}
|
||
}
|
||
|
||
|
||
await FlushKnokChanges(trakingKnok.Value, knokChanges);
|
||
}
|
||
} while (!ct.WaitHandle.WaitOne(GetRemainRefreshDataIntervalMs(cycleBeginTime, _trackerSettings.ProcessKnoksIntervalMs)));
|
||
|
||
LogInfo("Task_DoWork exited.");
|
||
}
|
||
|
||
private async Task FlushKnokChanges(TrackingKnoks trakingKnok, Dictionary<long, Dictionary<string, object>> knokChangesDict)
|
||
{
|
||
foreach (var knokChanges in knokChangesDict)
|
||
{
|
||
StringBuilder sb = new StringBuilder();
|
||
sb.AppendLine($"Knok with Id = {knokChanges.Key} has changes:");
|
||
foreach (var change in knokChanges.Value)
|
||
sb.AppendLine($" {change.Key}: {change.Value}");
|
||
DebugInfo(sb.ToString());
|
||
|
||
//TODO: change to bulk processing
|
||
knokChanges.Value.Add(nameof(Knok.KnokId), knokChanges.Key);
|
||
|
||
Knok updatedKnok = await _knokDao.UpdateKnok(knokChanges.Value, _trackerSettings.KnokerWinRate, _trackerSettings.UserRefundRate);
|
||
|
||
trakingKnok.Knoks.RemoveAll(k => k.KnokId == knokChanges.Key);
|
||
|
||
if (updatedKnok.KnokStatusId != ResolvedKnokStatusId)
|
||
{
|
||
trakingKnok.Knoks.Add(updatedKnok);
|
||
}
|
||
}
|
||
}
|
||
|
||
private Dictionary<string, object> CheckKnok(Knok knok, LiveRateItem liveRate)
|
||
{
|
||
Dictionary<string, object> result = new Dictionary<string, object>();
|
||
|
||
try
|
||
{
|
||
//1. Check for exit of duration interval. Check for feeds availability
|
||
CheckForTimeout(knok, result, Convert.ToDecimal(liveRate.BidHigh), _trackerSettings.KnokFeedExpirationHours);
|
||
|
||
if (knok.KnokStatusId != ResolvedKnokStatusId)
|
||
{
|
||
//2. Check High rate
|
||
if (liveRate.BidHigh > (double)knok.HighRate)
|
||
{
|
||
knok.HighRate = (decimal)liveRate.BidHigh;
|
||
result.Add(nameof(knok.HighRate), knok.HighRate);
|
||
}
|
||
|
||
//3. Check Low rate
|
||
if (liveRate.BidLow < (double)knok.LowRate || knok.LowRate == 0)
|
||
{
|
||
knok.LowRate = (decimal)liveRate.BidLow;
|
||
result.Add(nameof(knok.LowRate), knok.LowRate);
|
||
}
|
||
|
||
//4. Check StopLoss (IsFailure)
|
||
if (!knok.StopLossTouched && liveRate.BidLow <= (double)knok.StopLoss)
|
||
{
|
||
knok.StopLossTouched = true;
|
||
result.Add(nameof(knok.StopLossTouched), knok.StopLossTouched);
|
||
result.Add(nameof(knok.CloseRate), liveRate.BidLow);
|
||
|
||
knok.KnokStatusId = ResolvedKnokStatusId;
|
||
if (!result.ContainsKey(nameof(knok.KnokStatusId)))
|
||
{
|
||
result.Add(nameof(knok.KnokStatusId), knok.KnokStatusId);
|
||
}
|
||
else
|
||
{
|
||
result[nameof(knok.KnokStatusId)] = knok.KnokStatusId;
|
||
}
|
||
//For failed knok – Stop Loss activated: SL / entry from mid*100
|
||
var midEntry = ((knok.EntryPriceTo + knok.EntryPriceFrom) / 2m);
|
||
var distanceFromTarget = Math.Round(knok.StopLoss / midEntry * 100);
|
||
result[nameof(knok.DistanceFromTarget)] = distanceFromTarget;
|
||
}
|
||
//5. Check entry and exit intervals
|
||
else
|
||
{
|
||
if (!knok.EntryPriceTouched && liveRate.BidHigh >= (double)knok.EntryPriceFrom)
|
||
{
|
||
knok.EntryPriceTouched = true;
|
||
result.Add(nameof(knok.EntryPriceTouched), knok.EntryPriceTouched);
|
||
}
|
||
|
||
if (!knok.ExitPriceTouched && liveRate.BidHigh > (double)knok.ExitPriceFrom)
|
||
{
|
||
knok.ExitPriceTouched = true;
|
||
result.Add(nameof(knok.ExitPriceTouched), knok.ExitPriceTouched);
|
||
}
|
||
|
||
//IsSuccess
|
||
if (knok.EntryPriceTouched && knok.ExitPriceTouched)
|
||
{
|
||
knok.KnokStatusId = ResolvedKnokStatusId;
|
||
if (!result.ContainsKey(nameof(knok.KnokStatusId)))
|
||
{
|
||
result.Add(nameof(knok.KnokStatusId), knok.KnokStatusId);
|
||
}
|
||
else
|
||
{
|
||
result[nameof(knok.KnokStatusId)] = knok.KnokStatusId;
|
||
}
|
||
|
||
result.Add(nameof(knok.CloseRate), liveRate.BidHigh);
|
||
//For successful knok: Candle high/ exit from mid*100
|
||
var midExit = ((knok.ExitPriceTo + knok.ExitPriceFrom) / 2m);
|
||
var distanceFromTarget = Math.Round((decimal)liveRate.BidHigh / midExit * 100);
|
||
result[nameof(knok.DistanceFromTarget)] = distanceFromTarget;
|
||
}
|
||
|
||
//var distanceFromTarget = Math.Round((decimal)liveRate.BidHigh / ((knok.ExitPriceTo + knok.ExitPriceFrom) / 2) * 100);
|
||
//if (!knok.DistanceFromTarget.HasValue || knok.DistanceFromTarget > distanceFromTarget)
|
||
//{
|
||
// result[nameof(knok.DistanceFromTarget)] = distanceFromTarget;
|
||
//}
|
||
|
||
//var midExit = ((knok.ExitPriceTo + knok.ExitPriceFrom) / 2m);
|
||
//if (midExit != 0)
|
||
//{
|
||
// var distanceFromTarget = Math.Round((decimal)liveRate.BidHigh / midExit * 100);
|
||
// if (!knok.DistanceFromTarget.HasValue || knok.DistanceFromTarget > distanceFromTarget)
|
||
// {
|
||
// result[nameof(knok.DistanceFromTarget)] = distanceFromTarget;
|
||
// }
|
||
//}
|
||
//else
|
||
//{
|
||
// LogInfo($"Error when calculating DistanceFromTarget. KnokId: {knok.KnokId} ExitPriceFrom: {knok.ExitPriceFrom}, ExitPriceTo: {knok.ExitPriceTo}");
|
||
//}
|
||
}
|
||
}
|
||
|
||
return result;
|
||
}
|
||
catch (Exception ex) {
|
||
LogInfo($"An error when checking knok (CheckKnok) KnokId: {knok.KnokId}, error: \"{ex.Message}\"");
|
||
return new Dictionary<string, object>();
|
||
}
|
||
}
|
||
|
||
private void CheckForTimeout(Knok knok, Dictionary<string, object> result, decimal? closeRate, int knokFeedExpirationHours)
|
||
{
|
||
try {
|
||
if (knok.CreateDate.HasValue && DateTime.UtcNow > knok.CreateDate.Value.AddDays(knok.Duration))
|
||
{
|
||
knok.KnokStatusId = ResolvedKnokStatusId;
|
||
if (!result.ContainsKey(nameof(knok.KnokStatusId)))
|
||
{
|
||
result.Add(nameof(knok.KnokStatusId), knok.KnokStatusId);
|
||
}
|
||
else
|
||
{
|
||
result[nameof(knok.KnokStatusId)] = knok.KnokStatusId;
|
||
}
|
||
//For failed knok – time out: Close rate/ exit from mid*100
|
||
if (closeRate.HasValue)
|
||
{
|
||
var midExit = ((knok.ExitPriceTo + knok.ExitPriceFrom) / 2m);
|
||
var distanceFromTarget = Math.Round(closeRate.Value / midExit * 100);
|
||
result[nameof(knok.DistanceFromTarget)] = distanceFromTarget;
|
||
}
|
||
}
|
||
else if (knok.CreateDate.HasValue && DateTime.UtcNow > knok.CreateDate.Value.AddHours(knokFeedExpirationHours) && knok.KnokStatusId != OnGoingStatusId)
|
||
{
|
||
knok.KnokStatusId = OnGoingStatusId;
|
||
if (!result.ContainsKey(nameof(knok.KnokStatusId)))
|
||
{
|
||
result.Add(nameof(knok.KnokStatusId), knok.KnokStatusId);
|
||
}
|
||
else
|
||
{
|
||
result[nameof(knok.KnokStatusId)] = knok.KnokStatusId;
|
||
}
|
||
}
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
LogInfo($"An error when checking for knok timeout (CheckForTimeout) KnokId: {knok.KnokId}, error: \"{ex.Message}\"");
|
||
}
|
||
}
|
||
|
||
private async Task RefreshData_DoWork(CancellationToken ct)
|
||
{
|
||
if (ct.IsCancellationRequested)
|
||
{
|
||
LogInfo("Task was cancelled before it got started.");
|
||
ct.ThrowIfCancellationRequested();
|
||
}
|
||
|
||
DateTime cycleBeginTime;
|
||
do
|
||
{
|
||
cycleBeginTime = DateTime.UtcNow;
|
||
|
||
await EnqueueActiveKnoks();
|
||
|
||
} while (!ct.WaitHandle.WaitOne(GetRemainRefreshDataIntervalMs(cycleBeginTime, _trackerSettings.RefreshDataIntervalMs)));
|
||
|
||
LogInfo("RefreshData_DoWork exited.");
|
||
}
|
||
|
||
private int GetRemainRefreshDataIntervalMs(DateTime cycleBeginTime, int intervalMs)
|
||
{
|
||
var cycleMS = (int)(DateTime.UtcNow - cycleBeginTime).TotalMilliseconds;
|
||
if (cycleMS < intervalMs)
|
||
return intervalMs - cycleMS;
|
||
|
||
return 0;
|
||
}
|
||
|
||
public async Task WaitForCompletion()
|
||
{
|
||
LogInfo("Waiting for service to stop..");
|
||
try
|
||
{
|
||
await Task.WhenAll(_worker, _refreshDataWorker);
|
||
LogInfo($"{_trackerName} stopped. Really.");
|
||
}
|
||
catch (AggregateException e)
|
||
{
|
||
LogInfo("\nStopping failed: AggregateException thrown with the following inner exceptions:");
|
||
// Display information about each exception.
|
||
foreach (var v in e.InnerExceptions)
|
||
{
|
||
if (v is TaskCanceledException)
|
||
LogInfo(" TaskCanceledException: Task " + ((TaskCanceledException)v).Task.Id);
|
||
else
|
||
LogInfo(" Exception: " + v.GetType().Name);
|
||
}
|
||
}
|
||
}
|
||
|
||
public void Dispose()
|
||
{
|
||
_cancellationTokenSource?.Dispose();
|
||
_worker?.Dispose();
|
||
}
|
||
}
|
||
}
|