Przesyłanie plików klient-serwer wielowątkowo.

0

Cześć piszę program do przesyłania plików pomiędzy klientem a serwerem. Aplikacja kliencka ma obserwować zmiany w folderze klienta i gdy takowe wystąpią wysyła te pliki na serwer, który tworzy kopię tych plików. Wszystko działało sprawnie dopóki nie dodałem wielowątkowości. Rozumiem że jest problem z synchronizacją jednak za bardzo nie wiem jak sobie z tym poradzić.

Kod Clienta.

import java.net.*;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Client {

    File file=null;
    public static void main(String[] args) throws IOException
    {
        InetAddress ip = InetAddress.getByName("localhost");
        List<String> user_files = new ArrayList<String>();
        String directory = "C:\\Users\\andrze\\Desktop\\uzytkownik1";
        Socket socket = new Socket(ip, 1342);
        BufferedOutputStream bos = new BufferedOutputStream(socket.getOutputStream());
        DataOutputStream dos = new DataOutputStream(bos);

        while (true)
        {
            String f=File_Handler.check_changes_in_folder(directory,user_files);

            if(!f.equals("new_files_not_found"))
            {
                Thread watek = new Send_File(socket,dos,f,directory);
                watek.start();
            }

        }

    }
}

class File_Handler
{

    static String check_changes_in_folder(String user_directory,List<String>user_files )
    {
        File[] in = new File(user_directory).listFiles();

        for(int i=0;i< in.length;i++)
        {
            String curr_f = in[i].getName();
            if(!user_files.contains(curr_f))
            {
                user_files.add(curr_f);
                return curr_f;
            }
        }
        return "new_files_not_found";
    }


}


class Send_File extends Thread {
    final Socket s;
    final DataOutputStream dos;
    final String directory;
    final String f;


    public Send_File(Socket s, DataOutputStream o,String f,String directory)
    {
        this.s = s;
        this.dos = o;
        this.directory= directory;
        this.f=f;
    }

    public void run()
    {
        System.out.println("NOWY WATEK "+ Thread.currentThread().getName());
        try {
            BufferedOutputStream bos = new BufferedOutputStream(s.getOutputStream());
            File file = new File(directory + "\\" + f);
            long length = file.length();
            dos.writeLong(length);

            String name = file.getName();
            System.out.println(name);
            dos.writeUTF(name);

            FileInputStream fis = new FileInputStream(file);
            BufferedInputStream bis = new BufferedInputStream(fis);
            int theByte = 0;
            while ((theByte = bis.read()) != -1) bos.write(theByte);
            bis.close();
        }catch(Exception e)
        {
            getStackTrace();
        }

    }




}

Kod Serwera

import java.net.*;
import java.io.*;

public class Serwer
{

    public static void main (String[] args ) throws IOException {

        int bytesRead;
        int current = 0;

        ServerSocket serverSocket = null;
        serverSocket = new ServerSocket(1342);

        while(true) {
            String dirPath ="C:\\Users\\andrze\\Desktop\\Serwer1" ;
            Socket socket = serverSocket.accept();

            Thread watek = new File_Download(socket,dirPath);
            watek.start();



        }
    }
}


class File_Download extends Thread
{
    final Socket socket;
    final String dirPath;

    public File_Download(Socket s,String Directory)
    {
        this.socket=s;
        this.dirPath=Directory;

    }

    public void run()
    {


            try {
                BufferedInputStream bis = new BufferedInputStream(socket.getInputStream());
                DataInputStream dis = new DataInputStream(bis);

                long fileLength = dis.readLong();
                String fileName = dis.readUTF();
                System.out.println(fileName);

                File file = new File(dirPath + "/" + fileName);

                FileOutputStream fos = new FileOutputStream(file);
                BufferedOutputStream bos = new BufferedOutputStream(fos);

                for (int j = 0; j < fileLength; j++) bos.write(bis.read());

                bos.close();
            }catch(Exception e)
            {
                getStackTrace();
            }

    }





}
0

Kilka wątków klienckich pisze do tego samo socketa, w efekcie bajty kilku plików się po prostu mieszają między sobą.

0

Czyli dla każdego wysłanego pliku tworzyć nowe połączenie?

0

W czym miałaby tutaj pomóc wielowątkowość?

Wielowątkowość rozwiązuje problemy w sytuacjach, gdy bottleneckiem jest CPU (np. raytracing) - u Ciebie najpewniej występuje problem z wydajnością łącza internetowego, który zostanie jedynie zamplifikowany przez wątki.

0

Gdy klient wysyła kilka plików na raz.Wtedy uruchamiam ileś wątków z puli które wysyłają je równolegle.

0

Nie ma sensu robić tego równolegle - niepotrzebnie obciążasz łącze; wysyłaj je szeregowo (tak działa np. rsync).

0
Sanders688 napisał(a):

Wszystko działało sprawnie dopóki nie dodałem wielowątkowości.

A przed tym działało skoro nawet nie zamykasz połączenia? Gdzie masz wywoływaną metodę close() na gnieździe w aplikacji klienckiej? wywołujesz close() ale na strumieniu. Implikacja na pewno przebiega w drugą stronę:

Closing this socket will also close the socket's InputStream and OutputStream.

If this socket has an associated channel then the channel is closed as well.

Z dokumentacji metody close() klasy Socket.
Nie jestem pewny czy zamykając strumień zamyka się również połączenie gniazda.

Edit:
Zgadzam się z @Patryk27 jeśli chcesz to możesz zrobić obsługę plików w innym wątku niż główny ale wystarczy Tobie jeden dodatkowy wątek chociażby Executors.newSingleThreadExecutor(). Wszystkie taski związane z odczytywaniem plików wrzucaj do tego. W końcu na raz i tak będziesz odczytywał tylko jeden plik. Utworzenie więcej niż jednego wątku do tej operacji nie tylko nie przyśpieszy jej ale może ją nawet niepotrzebnie spowolnić i skomplikować.

0

Właśnie chciałem żeby w przypadku gdy jeden plik waży 100GB drugi 1MB to żeby ten drugi nie czekał na przesłanie pierwszego , generalnie cały program ma działac jak dropbox.

0

No ok. ale to i tak najwyżej dwa wątki dodatkowe na to przeznaczysz. Zrobisz albo dwa razy Executors.newSingleThreadExecutor() albo Executors.newFixedThreadPool​(2). Pierwszy sposób lepszy moim zdaniem. Zwłaszcza gdy ładnie nazwiesz te executory by je odróżnić od siebie.

0

Dodałem

ExecutorService service = Executors.newFixedThreadPool(10);
 service.execute(new Send_File(socket,dos,f,directory));

Jednak ciągle jest problem przy synchronizacji z serwerem.

0
ReallyGrid napisał(a):

No ok. ale to i tak najwyżej dwa wątki dodatkowe na to przeznaczysz. Zrobisz albo dwa razy Executors.newSingleThreadExecutor() albo Executors.newFixedThreadPool​(2). Pierwszy sposób lepszy moim zdaniem. Zwłaszcza gdy ładnie nazwiesz te executory by je odróżnić od siebie.

Mógłbyś mniej więcej pokazać jak to ma wyglądać ponieważ nie za bardzo rozumiem.
Serwer ma działać mniej wiecej w ten sposób że dostaje 10 plików i równolegle je kopiuje.

0

Jak wyżej już wspomniano, przesyłanie kilku plików na raz w oddzielnych wątkach spowolni Ci transfer. Jak już mówisz o tym rozmiarze, to może po prostu posortować je po rozmiarze i wysyłać od najmniejszego? Pojedyńczo.

0

Generalnie tak jest w założeniach projektu. Gdy Klient wyśle np. 10 plików serwer ma je równolegle kopiować.

0

napiszę tylko najważniejsze różnice.

while (true) {
    String f=File_Handler.check_changes_in_folder(directory,user_files);
    
    if(!f.equals("new_files_not_found")) {
        Socket socket = new Socket(ip, 1342);
        BufferedOutputStream bos = new BufferedOutputStream(socket.getOutputStream());
        DataOutputStream dos = new DataOutputStream(bos);
        // powyższe 3 linijki możesz przenieść do metody run() w klasie Send_File i pozbędziesz się 2 argumentów konstruktora
        
        Runeable task = new Send_File(socket, dos, f, directory);
        executors.execute(task);
    }
}

a klasa Send_File wystarczy żeby implementowała interfejs Runeable. Dziedziczenie po Thread to wytaczanie działa na muchę.

class Send_File implements Runeable {
    // tutaj może być bez zmian

    public void run() {
        System.out.println("NOWY WATEK");
        try {
            // tutaj też bez zmian
        }
        catch(Exception e) {
            getStackTrace();
        }
        finally {
            s.close(); // dodaj to.
        }
    }
}

nie jest to najładniejszy kod, ale nie korzystałem z IDE, klepałem z pamięci.

0

A co z serwerem w takim przypadku? Naprawdę zależy mi na tym aby kopiował on przychodzące pliki równolegle.

0
Sanders688 napisał(a):

Wszystko działało sprawnie dopóki nie dodałem wielowątkowości.

Skoro problem wyskoczył przy dodaniu współbieżności do kodu klienckiego (serwer miałeś bez zmian), to... powinien działać. A co, nie działa?
Na serwer nie patrzyłem.

0

Nie dodałem wielowątkowośc po obu stronach bo tak defacto ma to finalnie działać.

0

W pierwszej odpowiedzi w tym wątku napisałem, że nie zadziała jeśli wiele wątków będzie pisało do tego samego socketa... Każdy z twoich wątków powinien nawiązać osobne połączenie, jeśli nie chcesz się bawić w multiplexing co de facto oznacza własny protokół.

0
damianem napisał(a):

W pierwszej odpowiedzi w tym wątku napisałem, że nie zadziała jeśli wiele wątków będzie pisało do tego samego socketa... Każdy z twoich wątków powinien nawiązać osobne połączenie, jeśli nie chcesz się bawić w multiplexing co de facto oznacza własny protokół.

Przecież obecnie tworzę dla każdego wysyłanego pliku nowy socket.

0
Sanders688 napisał(a):

Nie dodałem wielowątkowośc po obu stronach bo tak defacto ma to finalnie działać.

Tak widzę, nie spojrzałem wcześniej. Ale nie odpowiedziałeś na pytanie. Działa teraz czy nie?
Ogólnie to masz tak jak to sie robi w takich syt.
Szablon kodu serwera jest taki:

ServerSocket socket = new ServerSocket(port);
while(true) {
    final Socket connection = new socket.accept();
    Runable task = new Runnable() {
        public void run() {
            handleRequest(connection);
        }
    };
    new Thread(task).start();
}

Masz inne nazwy ale działasz wg tego schematu więc powinno być ok. Serwer każde połączenie obsługuje w oddzielnym wątku.

0

@ReallyGrid: Mógłbyś jeszcze wytłumaczyć czym jest handleRequest?

0
 ServerSocket socket = new ServerSocket(1342);

            while(true){

                Socket connection = socket.accept();
                String dirPath ="C:\\Users\\andrze\\Desktop\\Serwer1";
                BufferedInputStream bis = new BufferedInputStream(connection.getInputStream());
                DataInputStream dis = new DataInputStream(bis);

                Runnable task = new Runnable() {
                    public void run() {
                        File_Download f= new File_Download(connection,dirPath);
                    }
                };
                new Thread(task).start();
            }


        }

Tak to powinno wyglądać?

0

Po pierwsze to wywal te linijki z pętli while bo z nimi nic nie robisz. Nigdzie dalej nie przesyłasz obiektu dis co więcej te same linijki masz w metodzie run() klasy File_Download. Mówię o tych liniach:

BufferedInputStream bis = new BufferedInputStream(connection.getInputStream());
DataInputStream dis = new DataInputStream(bis);

a nie działa Ci program dlatego że spójrz co Ty robisz…
Masz tak:

while(true) {
    Socket connection = socket.accept();
    String dirPath ="C:\\Users\\andrze\\Desktop\\Serwer1";                          // to możesz przenieść przed while
//  BufferedInputStream bis = new BufferedInputStream(connection.getInputStream()); // wywal to
//  DataInputStream dis = new DataInputStream(bis);                                 // to też wywal skoro nie używasz tych linii

    Runnable task = new Runnable() {               // tworzysz obiekt Runnable
        public void run() {                        // metoda run() TEGO obiektu
            File_Download f= new File_Download(connection,dirPath);
        }
    };
    new Thread(task).start();               // tutaj uruchomi się POWYŻSZA metoda run(), a nie ta z klasy File_Download
}

Rozumiesz już?

Wywal całe to Runnable task = new Runnable() { i zrób tylko

new Thread(new File_Download(connection,dirPath)).start();

Teraz powinno zadziałać

0

Wcześniej się udało. Tylko teraz myślę czy to rozwiązanie jest poprawne bo nie wydaje mi się żeby otwieranie za każdym razem socketu było poprawne. tzn. to raczej wyklucza wielowątkowość po stronie serwera . Do tego ma zostać jeszcze dołożony kontroler wątków który będzie koordynował ich pracą tj. gdy jeden klient przesyła dużo plików , pojawia się drugi to ten drugi ma mieć wrażenie natychmiastowej obsługi.

0

Oczywiście że jest to normalne działanie, a nawet konieczne. Jeśli wykorzystasz jeden socket do przesyłania wielu plików po stronie klienta (zwłaszcza współbieżnie!!!), to może się zdarzyć tak, że pierwszy wątek obsługujący wysyłanie pierwszego pliku z tekstem dajmy na to "ALA" zostanie wywłaszczony po przesłaniu 2 znaków.
Rozpocznie pracę drugi wątek (na tym samym gnieździe), który zacznie wysyłać drugi plik z napisem "KOTEK". Znów po przesłaniu 2 znaków zostanie wstrzymany i rozpocznie pracę trzeci wątek, itd.
Jeśli działoby się to na tym samym sockecie to:
po pierwsze: nie potrzebny byłby serwer współbieżny tylko iteracyjny bo serwer otrzymałby tylko jedno połączenie (jedno conection).
po drugie: serwer otrzymałby zlepek liter "ALKO..." w jednym połączeniu i nijak nie umiałby rozdzielić tego strumienia. Litery ze wszystkich plików by się wymieszały.

Zostaw jak jest i spróbuj zrozumieć co się dzieje w kodzie.

1

Można to zrobić w inny sposób - mieć zawsze tylko jedno połączenie między klientem a serwerem a i tak mieć wrażenie przesyłu wielu plików jednocześnie. Klienta implementujemy wtedy jako Runnable, które posiada pewien stan, zbiór aktualnie przesyłanych plików wraz z ich postępem. Algorytm klienta można wtedy opisać:

[1] poczekaj aż będzie przynajmniej jeden transfer do wykonania (polecam java.util.concurrent.locks.Condition)
[2] wybierz spośród transferów podzbiór N na podstawie wybranej strategii
[3] upewnij się, że mamy połączenie do serwera (połącz, jeśli trzeba)
[4] dla każdego z transferów z N odczytaj pewien kawałek pliku, zbuduj wiadomość i wyślij do serwera
[5] jeśli któreś z transferów są kompletne, usuń je
[6] wróć do [1]

Mamy więc tutaj do czynienia z multiplexingiem na jednym połączeniu - żeby to działało trzeba opracować protokół tak, żeby serwer był w stanie wiedzieć, do którego pliku należy dana wiadomość. Najprostsza implementacja, która przychodzi mi do głowy to wiadomość gdzie pierwsze 4 bajty to ID pliku, następne 4 bajty to długość fragmentu pliku a reszta to fragment pliku o zakodowanej długości.

Samo dodawanie pliku do wysłania jest wtedy nieblokujące, tzn. wywołujemy sender.addFile(file), co dodaje nowy transfer do wewnętrznej kolejki sendera i metoda zwraca. Metoda ta może też przyjmować callback, który ma zostać wykonany po zakończeniu transferu (lub na wypadek błędu).

Największą siłą takiego rozwiązania jest punkt [2] - tutaj można zastosować różne strategie, np. wybierać pliki na podstawie ich priorytetu, rozmiaru, postępu. Tak samo jak systemowy scheduler przydziela czas procesora wątkom, tak tutaj można też bawić się w różne algorytmy przydziału łącza (generalnie https://en.wikipedia.org/wiki/Scheduling_(computing)).

Oczywiście diabeł tkwi w szczegółach, na przykład jak ustalić dobre, uniwersalne ID pliku? Albo co robić gdy połączenie zostanie stracone? Co jeśli odczyt z pliku się nie powiedzie? Jak zasygnalizować serwerowi koniec konkretnego transferu? Jak utrzymać stan serwera z klientem w synchronizacji między rozłączeniami? Wszystko to da się rozwiązać (np. rozszerzając protokół), kosztem narastającego complexity. No ale nikt nie powiedział, że temat jest trywialny :)

0

Pojawił się kolejny problem, generalnie wszystko śmiga bardzo dobrze do momentu gdy Klient chce wysłać jakiś bardzo mały plik. Wtedy te kilka bajtów wysyłane przez klienta gdzieś giną.
https://pastebin.com/nAkyp7sh - CLIENT
https://pastebin.com/7GXHLk8y - SERWER

0

W kliencie, po:

            while ((theByte = bis.read()) != -1)
            {
                bos.write(theByte);
            }

Dodaj bos.flush().

Polecam też poczytać o zero-copy: https://developer.ibm.com/articles/j-zerocopy/

1 użytkowników online, w tym zalogowanych: 0, gości: 1