Aby zobrazować w czym problem wrzucam kod realizujący symulację połączenia.
Program próbuje pobrać informacje "one", "two", "three"..... i ma do dyspozycji proxy "1", "2", "3"
Wprowadziłem symulację błędu na serwerze proxy "1".
I teraz chodzi o to, żeby przy błędzie, zmiana serwera na następny wykonana została jednokrotnie.
Tak się nie dzieje, ponieważ każdy z uruchomionych wątków pobierania (maks 2) zmienia serwer proxy i tak naprawdę pomijany jest serwer "2".
Proszę kolegów o pomoc.
class Program
{
private static Timer timer; // timer to initiate periodic data refresh and start date download on background threads
private static Queue<string> refreshQueue; // tickers waiting to be refreshed
private static bool inTimer;
private static long concurrentDownloads; // number of concurrent data downloads running (WinINet max it in 2!)
private static ConcurrentQueue<string> proxyQueue;
private static string proxy;
static void Main(string[] args)
{
timer = new Timer(timer_tick, null, Timeout.Infinite, Timeout.Infinite);
refreshQueue = new Queue<string>();
// Inserting the elements into the Queue
refreshQueue.Enqueue("one");
refreshQueue.Enqueue("two");
refreshQueue.Enqueue("three");
refreshQueue.Enqueue("four");
refreshQueue.Enqueue("five");
refreshQueue.Enqueue("six");
proxyQueue = new ConcurrentQueue<string>();
proxyQueue.Enqueue("1");
proxyQueue.Enqueue("2");
proxyQueue.Enqueue("3");
// start timer
timer.Change(1000, 100);
Console.ReadLine();
}
/// <summary>
/// Timer event handler
/// Start background threads to refresh enqueued tickers. It checks if there is anything in the refreshQueue and less then 3 requests are currently executing, then it starts up a threadpool thread to execute the next refresh.
/// </summary>
/// <param name="sender"></param>
private static void timer_tick(object sender)
{
if (inTimer)
return;
inTimer = true;
try
{
// if there are tickers enqueued and there are less then 3 parallel downloads already running
if (refreshQueue.Count > 0 && Interlocked.Read(ref concurrentDownloads) < 2)
{
// increment no of downloads
Interlocked.Increment(ref concurrentDownloads);
// dequeue the ticker and get the tickerdata for it
string ticker = refreshQueue.Dequeue();
// make an idle threadpool thread execute the download in the background
ThreadPool.QueueUserWorkItem(RefreshTicker, ticker);
}
}
catch (Exception ex)
{
Console.WriteLine("Failed to process all tickers: " + ex);
}
finally
{
inTimer = false;
}
}
/// <summary>
/// Refresh data of a single ticker
/// </summary>
/// <param name="ticker"></param>
private static void RefreshTicker(object ticker)
{
try
{
int response = GetWebData(0, ticker);
// if valid response
if (response != 0)
{
Console.WriteLine("Obróbka otrzymanych danych: " + ticker);
}
}
catch (Exception ex)
{
Console.WriteLine("Failed RefreshTicker: " + ticker + "\n" + ex);
}
finally
{
// decrement no of downloads
Interlocked.Decrement(ref concurrentDownloads);
}
}//koniec RefreshTicker
private static int GetWebData(int retry, object ticker)
{
string threadid = $"thred {Thread.CurrentThread.ManagedThreadId}: ";
try
{
Console.WriteLine(threadid +" Refresh ticker: " + ticker);
//zmiana serwera na następny
if (retry >= 1)
{
Console.WriteLine(threadid +" "+ticker + " Usuwam zły proxy" + "\n");
proxyQueue.TryDequeue(out proxy);
retry = 0;
}
else
{
proxyQueue.TryPeek(out proxy);
Console.WriteLine(threadid +" "+ticker + " Proxy "+proxy+"\n");
}
if (proxy == "1")
{
Thread.Sleep(3000);
throw new Exception();
}
else
return 1;
}
catch (Exception ex)
{
Console.WriteLine(threadid +" Failed to process " + ticker);
return GetWebData(retry + 1, ticker);
}
return 0;
}//koniec GetWebData
}//koniec klasy Program