Serwer TCP przetwarzający równolegle ramki a pula wątków

0

Witam Forumowiczów.
Zbudowałem w C# serwer, w którym przetwarzam równolegle ramki TCP. Potrzebuję tego rozwiązania ponieważ przetworzenie jednej ramki zajmuje bardzo dużo czasu (nawet 20 sekund). W tym czasie od jednego klienta może przyjść kilka innych ramek, które muszą zostać "obsłużone". Zaimplementowałem w tym celu mechanizm do odpalania wątku dla każdej nadchodzącej ramki, który działa bez zarzutu, jednak w przypadku dużej ilości danych może spowodować znaczne obniżenie wydajności maszyny, na której aktualnie stoi. Przyjrzałem się puli wątków, którą zaimplementowałem, jednak pula powoduje skolejkowanie żądań. Kod przedstawiam poniżej. Ma ktoś jakiś pomysł, jak zmusić pulę do równoległego przetwarzania?

Nasłuchiwanie klientów:

    private void ListenForClients()
    {
        try
        {
            this.tcpListener.Start();
        }
        catch (Exception ex)
        {
            MessageBox.Show(ex.Message.ToString());
            return;
        }

        while (true) // server will keep on listening foerever
        {
            // this one blocks until a client connects
            TcpClient client = this.tcpListener.AcceptTcpClient();                
            // create a thread to handle comunication with the new client
            clientCount += 1;
            Thread clientThread = new Thread(new ParameterizedThreadStart(HandleClient));
            clientThread.IsBackground = true;
            clientThread.Start(client);                
        }
    }

private void HandleClient(object client)
{
int myNumber = clientCount;
TcpClient tcpClient = (TcpClient)client;

        System.Net.EndPoint ep = tcpClient.Client.RemoteEndPoint;
        System.Net.IPEndPoint ip = (System.Net.IPEndPoint)ep;

        NetworkStream clientStream = tcpClient.GetStream();
        DateTime lastTransmition = DateTime.Now;

        CommunicationManager communicationManager = new CommunicationManager(this, myNumber);
        byte[] messageBuffer = new byte[256];

        List<byte[]> incommingFrames = new List<byte[]>();
        List<byte[]> framesToSend = new List<byte[]>();
        List<string> textMessages = new List<string>();


        ProcessThreadInfo info = new ProcessThreadInfo(incommingFrames, framesToSend, textMessages, client, myNumber, cancelThreads);
        ThreadPool.QueueUserWorkItem(new WaitCallback(ProcessClientRequest), info);
        while (true)
        {
            Thread.Sleep(10);
            if (cancelThreads == true)
            {
                if (ServerMessage != null) ServerMessage(client, "Wyłączanie serwera - kończenie wątku obsługi klienta", true);
                if (clientStream != null)
                {
                    while (framesToSend.Count > 0)
                    {
                        lock (framesToSend)
                        {
                            clientStream.Write(framesToSend[0], 0, framesToSend[0].Length);
                            framesToSend.RemoveAt(0);
                        }
                    }
                }
                break;
            }

            try
            {
                if (clientStream.DataAvailable == true)
                {
                    clientStream.Read(messageBuffer, 0, messageBuffer.Length);
                    byte[] fromBuffer = new byte[messageBuffer[2] + 3];
                    CopyArray(messageBuffer, fromBuffer);
                    lock (incommingFrames)
                    {
                        incommingFrames.Add(fromBuffer);                            
                    }

                    lastTransmition = DateTime.Now;
                    if (ServerMessage != null)
                        ServerMessage(client, String.Concat("KLIENT ", myNumber, ": ", BitConverter.ToString(fromBuffer)), logFrames);
                }
            }
            catch (Exception ex)
            {
                if (ServerMessage != null) ServerMessage(client, ex.Message.ToString(), true);
                if (ServerMessage != null) ServerMessage(client, "Błąd odczytu danych - kończenie wątku obsługi klienta", true);
                break;
            }

            try
            {
                lock (framesToSend)
                {
                    while (framesToSend.Count > 0)
                    {               
                        clientStream.Write(framesToSend[0], 0, framesToSend[0].Length);
                        if (ServerMessage != null) ServerMessage(client, String.Concat("SERWER -> ", ((System.Net.IPEndPoint)tcpClient.Client.RemoteEndPoint).ToString(), ": ", BitConverter.ToString(framesToSend[0])), logFrames);
                        framesToSend.RemoveAt(0);
                    }
                }
            }
            catch (Exception ex)
            {
                if (ServerMessage != null) ServerMessage(client, ex.Message.ToString(), true);
                if (ServerMessage != null) ServerMessage(client, "Błąd zapisu danych - kończenie wątku obsługi klienta", true);
                break;
            }

            if (lastTransmition < DateTime.Now.AddSeconds(-terminalTimeout))
            {
                if (ServerMessage != null) ServerMessage(client, String.Concat("Klient ", myNumber, " rozłączony - kończenie wątku obsługi klienta"), true);
                break;
            }
        }
        tcpClient.Close();
    }         
0

Usuń lock'i i zamień globalne framesToSend na lokalne dla każdego wątku

0

Ja bym to zrobił tylko na dwóch wątkach. Jeden wątek zajmowałby siętylo odbieraniem danych i wrzucanie ich do jakiejś kolejki zadań, a drugi tylko obróbką danych które będzie sobie po kolei pobierał z kolejki zadań.

0

cos,
napisałem w poście że przetwarzanie ramki trwa dużo czasu i w tym czasie nadchodzą inne które muszą zostać obsłużone, dlatego wprowadziłem wielowątkowość w przetwarzaniu

1 użytkowników online, w tym zalogowanych: 0, gości: 1