Synchronizacja niezależnych wywołań grupy

0

Hej,

Mam skrypt, który opiera się na wykonaniu krok po kroku, takiej samej operacji w wielu grupach i lokalizacjach. Ogólnie, działa to ok, ale zastanawiam się nad zrobieniem tego asynchronicznie, synchronizując po wykonaniu danej operacji na całej grupie.

Mianowicie, działa to tak:
Zadanie 1;
Zadanie 2;
Zadanie 3;
...
Każda z grup złożonych z elementów przechodzi przez wszystkie zadania w następujący sposób:
Element 1 > Zadanie 1
Element 2 > Zadanie 1
Element 3 > Zadanie 1
...
Element 1 > Zadanie 2
itd.

Z tym że te elementy są od siebie niezależne, więc mogłyby być wywoływane równocześnie, natomiast przed zadaniem 2 musi dla danego elementu zostać wykonane zadanie 1.
Każde z zadań jest wykonywane przez subprocess:

def run_command(bash_string): 
    subprocess.run(bash_string, shell=True, universal_newlines=True, executable="/bin/bash") 

Czy ktoś ma może sprawdzony sposób na coś w rodzaju thread-pool, gdzie uruchamiamy wszystkie elementy dla danego zadania asynchronicznie, a kontynuujemy dopiero po zakończeniu zadania dla wszystkich elementów?

Po krótkim google niewiele znalazłem o synchronizacji w pythonie, możliwe że po prostu miałem z tym za mało doczynienia i nie potrafię tego odpowiednio nazwać aby znaleźć coś co by mnie tego nauczyło.

Myślałem o wystartowaniu nowych wątków dla każdego wywołania run_command, semaforze kontrolującym ile razy został podniesiony i opuszczony, natomiast nie do końca jestem pewien czy w ogóle myślę w dobrą stronę, stąd pojawia się moja prośba o poradę :)

[Tak, wiem że niektórzy zapewne stwierdzą że nie powinienem pisać skryptów w pythonie tylko w bashu, natomiast takie podejście jest tutaj potrzebne przez składanie i weryfikacje typów napisowych.]

0

Może nie jest odpowiedź na Twoje pytanie, ale.. szukałem czegoś prostego w Pythonie i okazało się, że Apache AirFlow ma wszystkie funkcjonalności, których potrzebuję + wiele więcej :)

Podpinam się pod obserwację i może dowiem się jak można zrobić inaczej.

0

Zmieniłem podejście, i znalazłem satysfakcjonujące rozwiązanie, bo skoro w sumie Zadania dla oddzielnych elementów też nie są od siebie zależne, to mogłem 'odwrócić macierz'
Czyli zrobić:
Element 1 > Zadanie 1 > Zadanie 2 ...
Element 2 > Zadanie 1 > ...

A następnie uruchomić wszystkie kolejne elementy niezależnie, czyli wystartować subprocess w osobnych wątkach aby nie freezować głównego wątku wykonawczego.

Więc, przerobiłem funkcję:

def run_command(bash_string): 
    subprocess.run("x-terminal-emulator -e \"" + bash_string + " ; sleep 600 \"", shell=True, universal_newlines=True, executable="/bin/bash")

Z tym że teraz bash_string, jest składany na zasadzie Zadanie 1 && Zadanie 2 && Zadanie 3 ... etc.

Odpowiada za to prosta klasa:

class Command_Queue:
    repo_action = dict()

    @classmethod
    def evaulate_all_async(cls):
        thread_list = []
        for repo, action in cls.repo_action.items():
            os.chdir(Settings.COMBINED_PATH + os.path.sep + repo)
            print(repo, "started with:", action)
            thread_list.append(threading.Thread(target=run_command, args=(action, )))
        for thread_elem in thread_list:
            thread_elem.start()
        time.sleep(3)

    @classmethod
    def add(cls, repo, command):
        if cls.repo_action.get(repo, None):
            cls.repo_action[repo] = cls.repo_action[repo] + " && " + command
        else:
            cls.repo_action[repo] = command

To defakto uruchamia mi w oddzielnych terminalach wszystkie polecenia pokolei, wykonując je jednocześnie. Natomiast gdybym potrzebował przechwycić coś z outputu tych komend, wtedy to całe podejście nie sprawdziłoby się. Jedyne co dodaje jako ostatnie to zczytanie exit code z ostatniego zakończonego procesu i wypisuje na ekran i mam 10 minut (sleep) na sprawdzenie tego.

1 użytkowników online, w tym zalogowanych: 0, gości: 1