Deadlock przy async/await. Symulowanie komunikacji sieciowej

1

Moje pytanie generalnie odnosi się do tematyki async/await, deadlocków i przetwarzania wielowątkowego, jednak żeby ktoś mi mógł pomóc muszę chyba zrobić małe wprowadzenie teoretyczne:

Pracuję obecnie nad programem, który służyłby (w uproszczeniu) do wymiany informacji w sieci rozproszonej. To znaczy program byłby uruchomiony na kilkudziesięciu urządzeniach i za jego pośrednictwem urządzenia przesyłałyby sobie nawzajem wiadomości.

Szukałem sposobu żeby móc automatycznie testować zachowanie takiej sieci (wymiana wiadomości wraz z symulacją opóźnień i zagubionych pakietów). Żeby łatwo móc symulować dużo urządzeń, postanowiłem utworzyć kilkanaście instancji głównej klasy programu (dajmy na to, że nazywa się Node) oraz utworzyłem interfejs ITransmissionMedium co pozwala mi zmockować warstwę sieciową:

public interface ITransmissionMedium
{
    void SendMessage(MessageNetworkLayer message);
    void HandleIncomingMessage(MessageNetworkLayer message);
}

Każdy Node ma obiekt implementujący ITransmissionMedium, który pozwala mu się komunikować z innymi instancjami klasy Node. Docelowo funkcja SendMessage wysyłałaby wiadomość w postaci pakietu TCP. Natomiast w symulacji za ITransmissionMedium podstawiam klasę, która dorzuca wiadomość do globalnej tablicy, wraz z informacją kiedy ta wiadomość miałaby być dostarczona (żeby zasymulować opóźnienie).

Symulacja w uproszczeniu wygląda tak:

for (int t=0; t<MAX_TIME; t++)
{
	// w globalnej tablicy znajdź wiadomości, które powinny być dostarczone w czasie t. Dla każdej z nich wywołaj funkcję HandleIncomingMessage w ITransmissionMedium docelowej instancji klasy Node 
	_messageBroker.ProcessMessagesForTime(t);
}

Czyli podsumowując: jeśli w czasie t Node1 wywoła SendMessage adresując wiadomość do Node2, to wiadomość zostanie dodana do tablicy z indeksem np. t+10 . Kiedy wyżej zaprezentowana pętla dojdzie do momentu t+10 to w Node2 zostanie wywołana funkcja HandleIncomingMessage która tę wiadomość odbierze.

Docelowo jednak chcialem, żeby funkcja ITransmissionMedium.SendMessage była asynchroniczna i zwracała od razu wiadomość zwrotną (ACK), zawierającą potwierdzenie, że druga strona odebrała wiadomość. AckStatus to wiadomość potwierdzająca odebranie innej wiadomości, zawierająca informację czy druga strona przetworzyła ją bez błędów.

// tak chciałbym używać funkcji SendMessage
AckStatus wiadomosc_zwrotna_od_innego_urzadzenia = await SendMessage(message);

// obecna implementacja ITransmissionMedium:

public async Task<AckStatus> SendMessage(MessageNetworkLayer message)
{
    // [...]

    // Wysłanie wiadomości
	
	// Dodanie wiadomości do kolejki oczekującej na ACK
    TaskCompletionSource waitingForAckTask = new TaskCompletionSource();
    connectionState.MessagesWaitingForAck.Add(connectionState.MessageCounter, waitingForAckTask);

	// Zwrócenie wartości gdy przyjdzie ACK
    return await waitingForAckTask.Task;
}

public async Task HandleIncomingMessage(MessageNetworkLayer message)
{
	if (message.Type == MessageTypeNetLayer.DeliveryAck)
	{
		// przychodząca wiadomość to ACK
		
		if (connectionState.MessagesWaitingForAck.TryGetValue(message.MessageCounter,
					out var taskCompletionSource))
		{
			// odblokuj `waitingForAckTask` z funkcji `SendMessage`
			taskCompletionSource.TrySetResult(new AckStatus { ... });
		}

	}
}

Problem polega na tym, że jeśli teraz uruchomię SendMessage, to wykonywanie kodu zatrzymuje się na await waitingForAckTask.Task;, wygląda na to, że powstał deadlock. Przychodzi wam do głowy jak taki problem można rozwiązać? Generalnie wygląda na to, że każdy Node powinien działać w swoim wątku, tylko jak te wątki zsynchronizaować z pętlą symulatora? Jak sprawdzić czy wszystkie wątki oczekują na coś, żeby przejść w symulacji do momentu t+1?

0

na oko to TrySetResult musi być w innym wątku niż await na nim, więc potrzebujesz minimum dwa wątki
Nie widzę gdzie jest wywoływane HandleIncomingMessage, nie widzę w nim też żadnego await a metoda działa synchronicznie do pierwszego await, jak chcesz przepuścić inne wątki to możesz użyć await Task.Yield()

0

Nie widzę gdzie jest wywoływane HandleIncomingMessage

HandleIncomingMessage jest wywoływane w pętli symulacyjnej (drugi blok kodu w moim poście). W pętli jest funkcjaProcessMessagesForTime która szuka w tablicy MessagesWaitingForAck wiadomości, które powinny być dostarczone w czasie t. Jeśli jest taka wiadomość, uruchamiana jest funkcja HandleIncomingMessage dla odpowiedniego node'a, która jako argument przyjmuje tę wiadomość.

nie widzę w nim też żadnego await a metoda działa synchronicznie do pierwszego await, jak chcesz przepuścić inne wątki to możesz użyć await Task.Yield()

tego nie do końca rozumiem. W HandleIncomingMessage na nic obecnie nie czekam, po co miałbym używać await? HandleIncomingMessage jedyne co ma zrobić to odebrać wiadomość ACK i "popchnąć" wykonywanie funkcji SendMessage powiązanej z wiadomością, która dotyczy ACK.

0

Po pierwsze, TaskCompletionSource wykonuje kontynuacje synchronicznie, więc jak zrobisz TrySetResult, to kontynuacja po awaicie SendMessage leci na wątku HandleIncomingMessage. Użyj TaskCreationOptions.RunContinuationsAsynchronously

A po drugie, za mało kodu. Pokaż jak tworzysz wątki, jak to zbierasz do kupy, bo bez tego można sobie gdybać.

0

No właśnie na chwilę obecną nie tworzę żadnych wątków (no chyba że nieświadomie za pośrednictwem async/await, jeszcze nie rozumiem jak to działa do końca).

Może jeszcze żeby lepiej zobrazować co chcę osiągnąć, dodam, że w poprzednim (trochę podobnym) projekcie stosowałem do tego callbacki:

 private void SendMessage(Guid destination, Action < AckStatus > callback = null) {
   // [...]

   // wysłanie wiadomości:
   udpClient.Send(bytes, bytes.Length, endPoint);

   // callback wrzucany jest na listę śledzącą wychodzące wiadomości:
   TrackedMessage tm = new TrackedMessage(bytes, endPoint, callback);
   foundSource.TrackedOutgoingMessages.Add(retransmissionId, tm);
 }

Czyli do funkcji SendMessage dorzucałem funkcję (callback), która w założeniu miała się wykonać w momencie odebrania potwierdzenia ACK. Funkcja ta była wrzucana w listę TrackedOutgoingMessages. Za każdym razem jak przychodziła jakaś wiadomość ACK (z odpowiednim identyfikatorem wiadomości) z internetu, to lista TrackedOutgoingMessages była przeszukiwana pod kątem tego identyfikatora i do pasującej wiadomości wywoływany był callback.

SendMessage(destination, (status) => {
  // w tym momencie wiadomo, że wiadomość jest dostarczona i znamy `status` przetworzenia tej wiadomości
});

Problem z tym rozwiązaniem oczywiście taki, że callbacki się zagnieżdżały. Przy bardziej skomplikowanej wymianie kod wyglądał dość słabo, dlatego zacząłem się zastanawiać czy nie przerzucić się na async/await właśnie.

0

No to o jakim deadlocku mówisz, skoro nie tworzysz żadnych wątków? Z Twojego kodu nie da się wywnioskować nic konkretnego, wrzućże całość, a nie takie skrawki. A najlepiej zrób minimalny przykład odtwarzający ten deadlock.

Twój kod z Deadlock przy async/await. Symulowanie komunikacji sieciowej działa zupełnie inaczej. On wysyła i nie czeka na przetworzenie wiadomości, a kod z pierwszego postu w funkcji SendMessage czeka.

0

Jak najbardziej ma deadlock, skoro nie korzysta z żadnych innych wątków, no to ma jeden wątek w którym awejtuje sam siebie :D.

@iteredi
Async/await nie zrównolegla kodu, muszi uruchomic każdy Node w osobnym wątku.

0
neves napisał(a):

Async/await nie zrównolegla kodu, muszi uruchomic każdy Node w osobnym wątku.

co masz na myśli pisząc że async/await nie zrównolegla kodu? Zrównolegla po pierwszym asynchronicznym await - kod jest kontynuowany na losowym wolnym wątku z puli

for (var i = 0; i < 10; i++)
{
    Foo();
}

Thread.Sleep(Timeout.Infinite);

async Task Foo()
{
    while (true)
    {
        Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
        await Task.Delay(1000);
    }
}

wyjście:

1
1
1
1
1
1
1
1
1
1
6
12
13
6
4
6
11
12
13
6

chyba że masz SynchronizationContext do którego wracasz

1

W tym przykładzie zrównoleglenie występuje dzięki Task.Delay(), await nie powoduje że mamy +1 jeden kolejny tor wykonywania w aplikacji, tylko kontynuuje tor zapoczątkowany przez Task.Delay.
Oryginalny problem można sprowadzić do tego kodu, pomimo użycia async await, ciągle mamy tylko jeden tor wykonywania w programie co prowadzi do deadlocka:


TaskCompletionSource waitingForAckTask = new TaskCompletionSource();

for (var i = 0; i < 10; i++)
{
    await Foo();
}

waitingForAckTask.TrySetResult();

Thread.Sleep(Timeout.Infinite);

async Task Foo()
{
    while (true)
    {
        Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
        await waitingForAckTask.Task;
    }
}

1 użytkowników online, w tym zalogowanych: 0, gości: 1