Witam Forumowiczów.
Zbudowałem w C# serwer, w którym przetwarzam równolegle ramki TCP. Potrzebuję tego rozwiązania ponieważ przetworzenie jednej ramki zajmuje bardzo dużo czasu (nawet 20 sekund). W tym czasie od jednego klienta może przyjść kilka innych ramek, które muszą zostać "obsłużone". Zaimplementowałem w tym celu mechanizm do odpalania wątku dla każdej nadchodzącej ramki, który działa bez zarzutu, jednak w przypadku dużej ilości danych może spowodować znaczne obniżenie wydajności maszyny, na której aktualnie stoi. Przyjrzałem się puli wątków, którą zaimplementowałem, jednak pula powoduje skolejkowanie żądań. Kod przedstawiam poniżej. Ma ktoś jakiś pomysł, jak zmusić pulę do równoległego przetwarzania?
Nasłuchiwanie klientów:
private void ListenForClients()
{
try
{
this.tcpListener.Start();
}
catch (Exception ex)
{
MessageBox.Show(ex.Message.ToString());
return;
}
while (true) // server will keep on listening foerever
{
// this one blocks until a client connects
TcpClient client = this.tcpListener.AcceptTcpClient();
// create a thread to handle comunication with the new client
clientCount += 1;
Thread clientThread = new Thread(new ParameterizedThreadStart(HandleClient));
clientThread.IsBackground = true;
clientThread.Start(client);
}
}
private void HandleClient(object client)
{
int myNumber = clientCount;
TcpClient tcpClient = (TcpClient)client;
System.Net.EndPoint ep = tcpClient.Client.RemoteEndPoint;
System.Net.IPEndPoint ip = (System.Net.IPEndPoint)ep;
NetworkStream clientStream = tcpClient.GetStream();
DateTime lastTransmition = DateTime.Now;
CommunicationManager communicationManager = new CommunicationManager(this, myNumber);
byte[] messageBuffer = new byte[256];
List<byte[]> incommingFrames = new List<byte[]>();
List<byte[]> framesToSend = new List<byte[]>();
List<string> textMessages = new List<string>();
ProcessThreadInfo info = new ProcessThreadInfo(incommingFrames, framesToSend, textMessages, client, myNumber, cancelThreads);
ThreadPool.QueueUserWorkItem(new WaitCallback(ProcessClientRequest), info);
while (true)
{
Thread.Sleep(10);
if (cancelThreads == true)
{
if (ServerMessage != null) ServerMessage(client, "Wyłączanie serwera - kończenie wątku obsługi klienta", true);
if (clientStream != null)
{
while (framesToSend.Count > 0)
{
lock (framesToSend)
{
clientStream.Write(framesToSend[0], 0, framesToSend[0].Length);
framesToSend.RemoveAt(0);
}
}
}
break;
}
try
{
if (clientStream.DataAvailable == true)
{
clientStream.Read(messageBuffer, 0, messageBuffer.Length);
byte[] fromBuffer = new byte[messageBuffer[2] + 3];
CopyArray(messageBuffer, fromBuffer);
lock (incommingFrames)
{
incommingFrames.Add(fromBuffer);
}
lastTransmition = DateTime.Now;
if (ServerMessage != null)
ServerMessage(client, String.Concat("KLIENT ", myNumber, ": ", BitConverter.ToString(fromBuffer)), logFrames);
}
}
catch (Exception ex)
{
if (ServerMessage != null) ServerMessage(client, ex.Message.ToString(), true);
if (ServerMessage != null) ServerMessage(client, "Błąd odczytu danych - kończenie wątku obsługi klienta", true);
break;
}
try
{
lock (framesToSend)
{
while (framesToSend.Count > 0)
{
clientStream.Write(framesToSend[0], 0, framesToSend[0].Length);
if (ServerMessage != null) ServerMessage(client, String.Concat("SERWER -> ", ((System.Net.IPEndPoint)tcpClient.Client.RemoteEndPoint).ToString(), ": ", BitConverter.ToString(framesToSend[0])), logFrames);
framesToSend.RemoveAt(0);
}
}
}
catch (Exception ex)
{
if (ServerMessage != null) ServerMessage(client, ex.Message.ToString(), true);
if (ServerMessage != null) ServerMessage(client, "Błąd zapisu danych - kończenie wątku obsługi klienta", true);
break;
}
if (lastTransmition < DateTime.Now.AddSeconds(-terminalTimeout))
{
if (ServerMessage != null) ServerMessage(client, String.Concat("Klient ", myNumber, " rozłączony - kończenie wątku obsługi klienta"), true);
break;
}
}
tcpClient.Close();
}