Synchronizacja za pomocą ConcurrentMap i BlockingQueue

0

Piszę prosty komunikator. Komunikacja pomiędzy dwoma użytkownikami działa bez zarzutu. Problem pojawia się gdy zaczynam komunikować się z dwoma lub większą ilością klientów naraz. Komunikator składa się z pliku Communicator.java (jest to okno gdzie wybiera się z listy użytkownika, z którym chcemy zacząć rozmowę) oraz pliku testUI.java, który jest uruchamiany wtedy gdy wybierzemy z listy (pliku Communicator.java) użytkownika, z którym chcemy się połączyć. Problem pojawia się gdy otwieram np. dwie rozmowy (wybieram dwóch użytkowników klikając dwa razy na listę z pliku Communicator.java i tym samym otwieram dwa okna testUI - dwie rozmowy), gdyż po pewnym czasie wiadomości, które docierają do użytkownika wyświetlają się raz w jednym drugi raz w drugim oknie (choć teoretycznie powinny się wyświetlać tylko w tym, które jest odpowiedzialne za komunikację z danym użytkownikiem). Problem na 90% jest związany z synchronizacją. Kluczową rolę w kodzie pełnią dwa kontenery:
hashR - wiadomości odebrane oraz hashS - wiadomości do wysłania.

Początkowo problem próbowałem rozwiązać blokami synchronized lecz niestety nie udało się, teraz korzystam z ConcurrentMap oraz BlockingQueue lecz problem nadal występuje.

Communicator.java (plik odczytuje wiadomosci z serwera i wysyła na serwer (wiadomości do wysłania pobiera od poszczególnych okien - testUI.java))

 public static ConcurrentMap<String,LinkedBlockingQueue<String>> hashR=new ConcurrentHashMap<String, LinkedBlockingQueue<String>>();
        public static BlockingQueue<Message> queueS=new LinkedBlockingQueue<Message>();

    // receive the messages from server
                class InWorke implements Runnable{

         String slowo;
         ObjectInputStream ois=null;
         Message message;
         Socket socket=null;
             String loginSen=null;
             Queue<String> queue=new LinkedList<String>();

    InWorke(Socket socket,ObjectInputStream ois) {
        this.socket=socket;
        this.ois=ois;
    }

    public void run() {
        while(true) {

            try {
                message = (Message) ois.readObject();
                            loginSen=message.sender;

                            // i think here is the problem ?
                            if(!hashR.containsKey(loginSen)) {
                                hashR.putIfAbsent(loginSen, new LinkedBlockingQueue<String>());
                                hashR.get(loginSen).add(message.msg);
                            }
                            else
                            {
                                hashR.get(loginSen).add(message.msg);
                            }

            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            } catch (IOException e) {
            e.printStackTrace();
            }
            Thread.yield(); 
            } 
    }
    }

    //send messages to server
    class OutWorke implements Runnable{

    Socket socket=null;
    String tekst=null;
    Message message; 
    ObjectOutputStream oos;
    int tmp=0; // first message (send login)

    OutWorke(Socket socket, ObjectOutputStream oos) {
    this.socket=socket;
    this.oos=oos;
    }

    public void run() {

        while(true) {

        try {
                /** send first message - login */
                if(tmp==0) {
                    message=new Message("server", "info",login);
            oos.writeObject(message);
            oos.flush();
                    tmp=1;
                    Thread.yield();
                }
               /* */

                    // here is the problem ?
                    try {
                        oos.writeObject(queueS.take());
                    } catch (InterruptedException ex) {
                        Logger.getLogger(Communicator.class.getName()).log(Level.SEVERE, null, ex);
                    }
                        oos.flush();



        } catch (IOException e1) {
            e1.printStackTrace();
    }
    cos=null;
    Thread.yield();
    }}}

testUI.java (klasa oczywiście implementuje interfejs Runnable):

public void run() {
   while(true) {
       // receive messages

       if(Communicator.hashR.containsKey(loginB)) {
        while(!Communicator.hashR.get(loginB).isEmpty()) {
            if(all==null) 
               all="["+loginB+"] "+Communicator.hashR.get(loginB).element(); 
            else
                all=all+"\n"+"["+loginB+"] "+Communicator.hashR.get(loginB).element();
            jTextArea1.setText(all);
            Communicator.hashR.get(loginB).remove();
        } 
       }
       // send messages

       if(cos!=null) {
           message=new Message(loginB, cos, login);
           try {
               Communicator.queueS.put(message);
           } catch (InterruptedException ex) {
               Logger.getLogger(testUI.class.getName()).log(Level.SEVERE, null, ex);
           }
           cos=null;
       }

    Thread.yield();
   }
}

Kod uproszczony i wkleiłem tylko najważniejsze fragmenty. Oczywiście cała aplikacja z GUI (Netbeans).

0

Doszedłem do wniosku, że problem z synchronizacją leży także po stronie serwera, że wiadomości wysłane często nie trafiają do swoich adresatów.

Ale problem jest także po stronie klienta np. użytkownik A ma otwarte dwa okna do komunikacji z użytkownikiem B oraz C.

B przesyła wiadomość do A. -> A wyświetla ją w dobrym oknie, lecz gdy piszę wiadomość z A do C po wysłaniu w tym oknie (A->C) wyświetlają mi się także wiadomości przesyłane pomiędzy A i B co jest błędem.

Mam nadzieję, że trochę rozjaśniłem sytuację. Może ktoś będzie miał pomysł z blokami synchronizowanymi zamiast tych synchronizowanych kolekcji ?

0

W kliencie błąd był nie związany z synchronizacją. Wystarczyło zmienić właściwość zmiennej all(odpowiedzialnej za przechowywanie rozmowy) z static na non-static...;) .

1 użytkowników online, w tym zalogowanych: 0, gości: 1