Jak zmienic ten kod na szybszy?

0

Witam,
Mam tutaj proste wczytywanie pliku:

with open("scan/scan.txt") as file:
        for line in file:
                check(line)

Problem jest taki ze plik ma ok 50000 linijek.
Jak można usprawnić ten kod zeby działał szybciej? Multiprocessing? Nigdy nie robiłem programu który wykorzystuje multi threading-processing.

0

Ile teraz trwa wykonanie tego kodu a ile uważasz że powinno? Co robi funkcja check? Czy blokuje ją wejście/wyjście czy obliczenia? Czy zastosowany algorytm jest na pewno optymalny? Bez znajomości problemu nic tak naprawdę nie można powiedzieć.

1

Nie wiem, co robi Twój check, ale być może wykorzystanie map w miejscu iteracji po wierszach pliku pozwoli Ci przyspieszyć wykonanie.

Poniżej prosty przykład z rezultatami pomiarów które walnąłem na szybko - map wykonał się znacznie szybciej niż iteracja czy list comprehension:

>>> from time import time
>>>
>>> collection = ["A" * 1000 for _ in range(50000)]
>>>
>>> def my_procedure(line):
...   long_line = line * 10
...   half_of_long_line = long_line[:len(long_line)//2]
...   return len(half_of_long_line) > 10000
...
>>> def as_map(procedure):
...   return map(procedure, collection)
...
>>> def as_comprehension(procedure):
...   return [procedure(line) for line in collection]
...
>>> def as_loop(procedure):
...   result = []
...   for line in collection:
...     result.append(procedure(line))
...   return result
...
>>> def timeit(fun):
...   start = time()
...   for _ in range(10):
...     result = fun(my_procedure)
...   end = time()
...   avg_time = (end - start)/10.0
...   print("Average time for {0}, 10 runs: {1:10.6f}s".format(fun.__name__, avg_time))
...
>>> timeit(as_map)
Average time for as_map, 10 runs:   0.000098s
>>> timeit(as_loop)
Average time for as_loop, 10 runs:   0.048471s
>>> timeit(as_comprehension)
Average time for as_comprehension, 10 runs:   0.046080s
3

Swoją drogą, jeśli chcesz wykorzystać multiprocessing lub multithreading to dorzucę jeszcze trzy grosze w temacie:

  • multithreading w Pythonie jest często gęsto mało efektywny, a wszystko z powodu wuja GIL-a, który ratuje programistę przed skutkami zarządzania pamięcią przez CPython, które nie jest thread-safe
  • możesz wykorzystać zamiast tego moduł multiprocessing i pulę procesów multiprocessing.Pool. Pool ma metodę map, która pozwala dość wygodnie przetwarzać listy itp. z wykorzystaniem puli procesów, pozwalając ominąć GIL a jednocześnie nie babrać się z zarządzaniem procesami. Wada jest taka, że moduł multiprocessing - przynajmniej z mojego osobistego doświadczenia - jest mocno upośledzony, jeśli chodzi o obsługę błędów. Co się przekopałem przez wątki na SO, które traktują o omijaniu jego ograniczeń jeśli chodzi o obsługę sytuacji, gdy np. proces potomny wywali się z powodu wyjątku czy SIGINT, to moje.
  • alternatywą, moim zdaniem lepszą od multiprocessing, jest moduł concurrent, który ma własną pulę procesów: concurrent.futures.ProcessPoolExecutor, która generalnie lepiej sobie radzi z przypadkami, gdy proces-dziecko umrze. Jest jeszcze kwestia tego co się stanie, gdy zabity zostanie proces-rodzic, ale z tym akurat ciężko cokolwiek zrobić (chyba, że zrobisz jakiś awaryjny mechanizm ubijający pulę w procesie-rodzicu, na przykład gdy proces dostanie sygnał SIGINT / SIGABRT poprzez moduł signal, ale na SIGKILL już nic nie poradzisz).
  • możesz poczytać sobie o modułach numba i dask - pierwszy pozwala na kompilację JIT wybranych fragmentów kodu pisanego w Pythonie i tym samym przyspieszenie wykonania, drugi na równoległe czy nawet rozproszone przetwarzanie - mogą Cię zainteresować. Tu masz artykuł o tym, jak autor uzyskał 60-krotne przyspieszenie łącząc oba moduły ;)
1
  1. Wczytaj cały plik na raz za pomocą read a potem potnij na linie
  2. Pokaż co robi check
  3. Użyj PyPy
1
superdurszlak napisał(a):

Nie wiem, co robi Twój check, ale być może wykorzystanie map w miejscu iteracji po wierszach pliku pozwoli Ci przyspieszyć wykonanie.

Poniżej prosty przykład z rezultatami pomiarów które walnąłem na szybko - map wykonał się znacznie szybciej niż iteracja czy list comprehension:
[...]

W Python 3, as_map zwraca generator. W Python 2, zgaduję, nie ma tak sensacyjnej różnicy.

1

Spróbuj użyć pandas, jest najszybsze w Pythonie.

0
Mózg napisał(a):

W Python 3, as_map zwraca generator. W Python 2, zgaduję, nie ma tak sensacyjnej różnicy.

Racja, moje niedopatrzenie. Niemniej po zmianie:

>>> def as_map(procedure):
...   return list(map(procedure, collection))

czas wykonania i tak jest ~10% krótszy, niż dla iteracji.

Edit jeszcze tak z ciekawości zastąpiłem map przez filter. Okazał się jeszcze minimalnie (może 2%) szybszy.

0

Witam,
Dziękuję za wszystkie odpowiedzi :D
Funkcja check łączy się z serwerem i sprawdza dane odesłane przez serwer. Czasami zajmuje to z 10-20 sekund także dość długo jeżeli robię to linia po linii. Dlatego chciałem użyć multithreadingu.

0

A właściciel serwera się nie zdenerwuje jak będziesz mu napieprzał tysiącami żądań? ;]

Spróbuj użyć klienta asynchronicznego klienta HTTP. Google: "python async http client"

0
Adrian Rudy Dacka napisał(a):

Witam,
Dziękuję za wszystkie odpowiedzi :D
Funkcja check łączy się z serwerem i sprawdza dane odesłane przez serwer. Czasami zajmuje to z 10-20 sekund także dość długo jeżeli robię to linia po linii. Dlatego chciałem użyć multithreadingu.

Jeśli wysyła dane na serwer, multithreading niewiele da, o ile serwer też nie jest wielowątkowy - napisałeś go sam, czy korzystasz z czegoś gotowego? Serwer dysponuje pulą wątków, kolejkuje zadania dla jakiegoś wątku przetwarzającego dane, czy spawnuje nowy wątek per połączenie? Załóżmy, że jest - problemem może się okazać brać mocy przerobowych, jeśli przetwarzania jest dużo. Jeśli jest go mało, to całość przetwarzania będzie spowalniać komunikacja. Zresztą, nawet jeśli większość czasu to jakieś obliczenia na serwerze i serwer wykorzystuje prawie całą moc, to nadal masz narzut wynikający z przesyłania komunikatów do serwera 50000 razy - lepiej byłoby wysłać plik w całości i przenieść sprawdzanie pliku linia po linii do kodu serwera.

0

W sumie to niby ustaliliśmy już z @lion137, że co benchmark to inny wynik, ale spróbuj pomierzyć czasy wykonania poszczególnych fragmentów funkcji check() i wrzuć jej kod źródłowy - i przy okazji zmierz lub sprawdź w logach (jeśli masz dostęp), ile trwa przetworzenie pojedynczego żądania na serwerze. Łatwiej będzie ustalić, co najbardziej spowalnia, ale jeśli w każdym kroku pętli

  • łączysz się z serwerem,
  • wysyłasz komunikat,
  • odbierasz odpowiedź,
  • kończysz połączenie

To pewnie masz narzut jak skurczybyk na komunikację ;)

0

Chodziło mi bardziej o to nie żeby nawalać 50000 linijek do serwera, tylko np po 10. Tylko mi się udało na raz, tylko że miałem dosłownie 10 konsol odpalonych z tym samym programem a to mija się z celem. No i oczywiście wszystkie wysyłały pierwsze 10 linijek a nie kolejne 10.
@superdurszlak funkcji check niestety nie moge wrzucić, ale zajmuje ona tyle ile zajmuje serwerowi odpowiedzenie na zapytanie. W przypadku mojego serwera jest to 10-20sekund.

1
Adrian Rudy Dacka napisał(a):

Chodziło mi bardziej o to nie żeby nawalać 50000 linijek do serwera, tylko np po 10.
Tylko mi się udało na raz, tylko że miałem dosłownie 10 konsol odpalonych z tym samym programem a to mija się z celem. No i oczywiście wszystkie wysyłały pierwsze 10 linijek a nie kolejne 10.

A co to właściwie miało na celu?

Chcesz wysłać ten plik na serwer w kilku kolejnych paczkach, czy interesuje Cię tylko jakIś fragment pliku? Co dokładnie ma robić ten check, a właściwie to serwer?

0
[Adrian Rudy Dacka napisał(a)](p

@superdurszlak funkcji check niestety nie moge wrzucić, ale zajmuje ona tyle ile zajmuje serwerowi odpowiedzenie na zapytanie. W przypadku mojego serwera jest to 10-20sekund.

No to problemem jest komunikacja lub wydajność serwera, spróbuj wysłać wszystko w jednym lub kilku batchach i/lub wyciągnąć łączenie z serwerem poza pętlę i podtrzymywać jedno do całości komunikacji, jeśli pod spodem już tak nie jest i/lub wsadzić wszystko w pulę procesów żeby sprawdzić czy nie wyczerpałeś możliwości sieci lub serwera. Najlepiej po 1 zmianie naraz i mierz zmiany w czasie wykonania.

Btw. 10s to czas odpowiedzi na 50 000 zapytań czy jednego? (sic!)

0

Ja wysyłam zapytania json zaware w pliku (jedno na linijke) a serwer zwraca jakiś inny json.I w tym problem bo na odpowiedź czekam między 10-20sekund w zależność od prędkości mojego internetu. Dlatego chciałem wczytując linie, wysyłać na przykład po 10 zapytań jednocześnie(Czyli 10 linijek),czekać np 1 minutę a potem wysłać następne 10.
EDIT:
Dodam że funkcja potem nic nie zwraca tylko zapisuje odpowiedz do pliku.

0
Adrian Rudy Dacka napisał(a):

Dlatego chciałem wczytując linie, wysyłać na przykład po 10 zapytań jednocześnie(Czyli 10 linijek),czekać np 1 minutę a potem wysłać następne 10

A mierzyłeś czas odpowiedzi, gdy jedno żądanie zawiera więcej linii? Może wtedy wyniesie nie 1min, a 21s?

Jeśli winne jest połączenie internetowe, to multiprocessing nic nie da, skoro już jeden proces może je zapchać.

I dobrze by było, gdybyś jednak ujawnił więcej szczegółów, zrobił pomiary dla wielu scenariuszy i dał wyniki, bo tak to wróżymy ze szklanej kuli :p

0

Funkcja check:

def check(line):
   url = "XXX"
   data = {"jsonrpc":"2.0","method":line,"params":[],"id":1}
   headers = {'Content-type': 'application/json'}
   rsp = requests.post(url, json=data, headers=headers,timeout=10)
   if "X" in rsp:
     with open("results.txt",'a') as file:
       file.write("\n"+rsp)

I nie chodzi mi o to żeby wysyłać 10 linijek w jednym zapytaniu tylko 10 w osobnych czyli wczytuje plik linia po lini i wywołuje dla każdej check(line). Jeżeli działa akurat 10 wątków to czekam aż któryś się zakończy i wtedy uruchamiam następny z następną linia.
Przykład:

with open("scan/scan.txt") as file:
        for line in file:
                Jezeli 10 wątków istnieje to czekaj na zakonczenie jakiegoś, jeżeli jest mniej niż 10 to zacznij następny check(line)
                
1

Panie Adrianie, Pana problem polega właśnie na tym, że nie chce Pan wysyłać 10 linijek w jednym zapytaniu.

Dodatkowo, if "X" in rsp: się zapewne nigdy nie wykona - zwraca on bowiem listę obiektów typu bytes, gdzie każdy obiekt to 128 znaków (lub mniej, gdy już się dotrze do końca). Tak więc, jeśli być może pracujesz na Pythonie 2, to sprawdzenie nie miałoby problemów z typem, natomiast i tak musiałbyś mieć w odpowiedzi dokładnie jedną linijkę z treścią "X", co by zupełnie przeczyło potrzebie zapisania czegoś takiego to pliku (bo nie jesteś w stanie tego skojarzyć z argumentem przesyłanym do funkcji check).

0

Panie Adrianie, Pana problem polega właśnie na tym, że nie chce Pan wysyłać 10 linijek w jednym zapytaniu
Akurat nie mogę wysłać 10 linijek gdyż nie każda idzie do jednego serwera. Jest to tylko fragment funkcji check. Niektóre linijki zawierające konkretną wartość idą na inny serwer a inne na inny,funkcja check przed wysłaniem je dzieli i wysyła na serwer.
Dodatkowo, if "X" in rsp: się zapewne nigdy nie wykona
Tutaj akurat mój błąd. Powinno być rsp.text a nie rsp.

0

Panie Adrianie, Pana problem polega właśnie na tym, że nie chce Pan wysyłać 10 linijek w jednym zapytaniu

Akurat nie mogę wysłać 10 linijek gdyż nie każda idzie do jednego serwera. Jest to tylko fragment funkcji check. Niektóre linijki zawierające konkretną wartość idą na inny serwer a inne na inny,funkcja check przed wysłaniem je dzieli i wysyła na serwer.

Dodatkowo, if "X" in rsp: się zapewne nigdy nie wykona

Tutaj akurat mój błąd. Powinno być rsp.text a nie rsp.

4
Adrian Rudy Dacka napisał(a):

Akurat nie mogę wysłać 10 linijek gdyż nie każda idzie do jednego serwera. Jest to tylko fragment funkcji check. Niektóre linijki zawierające konkretną wartość idą na inny serwer a inne na inny,funkcja check przed wysłaniem je dzieli i wysyła na serwer.

Rozwiązanie jest dość proste:

Skoro linijki w zależności od zawartości wysyłane są na różne serwery, i i tak musisz to określić na podstawie zawartości linijki - pogrupuj je według tej zawartości na paczki tak, by każda paczka była wysyłana na jeden, konkretny serwer.

Ktoś tu linkował moduł pandas - tak się składa, że pandas.DataFrame dostarcza Ci praktycznie wszystkich narzędzi niezbędnych do wykonania tego grupowania. Możesz nawet pogrupować swoje dane wg. serwera docelowego, po czym odpalić osobne procesy, z których każdy będzie gadał z osobnym serwerem z puli, jeśli koniecznie chcesz. Wczytywać dane z obiektów dict czy plików json też potrafi.

Ale skoro Twój program działa tak szybko, jak tylko pozwala na to Twoja sieć, to musisz podjąć kroki, które sieć odciążą - dopiero wtedy będziesz mógł kombinować nad dalszym przyspieszaniem. Optymalizację zaczynaj od identyfikacji i eliminacji wąskich gardeł - Twoim najwyraźniej jest narzut komunikacji po sieci.

Staraj się wyeliminować wszystko, co nie jest absolutnie niezbędne do działania programu:

  • wielokrotne nawiązywanie i zamykanie połączeń z jednym serwerem - nawet głupi 3-way handshake TCP (nie wskazałeś, czy łączysz się po gołym TCP, UDP - w UDP wręcz wcale się nie łączysz, bo to protokół bezpołączeniowy, HTTP, websockets, MQTT czy jeszcze czymś innym) będzie powodował jakiś narzut. Jeśli nawiążesz jedno połączenie z każdym serwerem i wykorzystasz je 1000 razy, zmniejszysz czas nawiązywania połączeń 1000 razy - być może zyskasz tylko 1s, być może aż 4s - sprawdź.
  • nadmierne rozdrabnianie przesyłanych danych - ok, może i daje nieco płynniejszą komunikację, szczególnie, jeśli coś ma się dziać real-time. Ale w niektórych przypadkach będzie powodować nadmierny narzut, szczególnie, gdy będziesz miał bardzo dużo, bardzo krótkich komunikatów. To zależy również od tego, jak protokół, który wykorzystujesz do komunikacji, traktuje danych - jako ciągły strumień czy niezależne datagramy/komunikaty. Spróbuj wysłać więcej danych w jednym komunikacie i sprawdź co się stanie. Może się okazać, że to jest właśnie to i będziesz musiał zrezygnować z wysyłania wierszy pojedynczo, jeśli będziesz chciał uzyskać jakiekolwiek zauważalne przyspieszenie.
  • synchroniczne oczekiwanie na odpowiedź z serwera - tak, to też może spowolnić, i możesz faktycznie uzyskać szybsze wykonanie, ale głównie dzięki asynchronicznemu wykonaniu. Spróbuj skorzystać z modułu concurrent.futures i możliwości pobierania wyników z obiektów Future. Zyskasz na tym o tyle, że podczas gdy jeden proces musi wykonać w korku pętli następujące kroki:
    • wysyłanie komunikatu (upload zatkany)
    • oczekiwanie na odpowiedź (wolny przebieg)
    • odebranie odpowiedzi (download zatkany)

jakiś inny proces będzie mógł wysyłać swój komunikat podczas, gdy ten pierwszy czeka lub odbiera odpowiedź. Ale nie masz gwarancji, że 10 procesów da Ci automatycznie 10 razy szybsze wykonanie - musisz przeprowadzić testy. Ponadto będziesz mógł odebrać odpowiedź na dowolny komunikat, którego przetwarzanie już się zakończyło, niezależnie od Twojej pętli, i konsumować już gotowe wyniki. Upłynni Ci to wykonanie nawet, jeśli przyspieszenie będzie niewielkie.

Sprawdź wszystkie możliwości, zamiast powtarzać, że nie chcesz czegoś robić. Chcesz przyspieszyć program - skup się na likwidacji wąskich gardeł. Nie chcesz pokazać kodu i jesteś oszczędny w opisie tego, co się dzieje pod spodem - spoko, ale jeśli nie będziemy nic wiedzieć, a Ty nie spróbujesz wypróbować naszych podpowiedzi, to nie będziemy w stanie Ci pomóc, a tylko zrazisz do siebie ludzi i następnym razem przeczytasz radź sobie sam, przecież nie chciałeś pomocy.

1 użytkowników online, w tym zalogowanych: 0, gości: 1