Szybkie obliczenie różnicy w dniach pomiędzy dwoma datami w timestamp

0

Posiadam wielkie tablice 50 000 x 50 000 (numpy) zawierające timestamp
Potrzebuję wykonać na wielu tablicach szybką operację obliczenia różnicy pomiędzy dwoma datami

Data jest zapisana w formie jednej liczby timestamp.
Różnica pomiędzy jednym dniem jest w momencie, gdy dzień kalendarzowy się różni

przykład:
26 stycznia 2022 23:00 a 27 stycznia 2022 02:00 jest różnica jednego dnia, gdyż jest to inny dzień

Czy można to napisać bez własnego parsera dat?

Takich operacji muszę wykonać kilkaset, zależy mi na oddzielnej funkcji / lambdzie do takiego celu, która po podaniu dwóch liczb zwróci różnicę w dniach.
Wykorzystywać będę w warunkach do numpy where

2

Może coś takiego?

from datetime import datetime
 a = lambda x,y: (datetime.fromtimestamp(x) - datetime.fromtimestamp(y)).days
0

@ledi12:
Dlaczego nie funkcja?

def day_diff(x: int, y: int) -> int:
    return abs((datetime.fromtimestamp(x) - datetime.fromtimestamp(y)).days)

Da się to jakoś przyspieszyć?

1
Marcin Marcin napisał(a):

@ledi12:
Dlaczego nie funkcja?

def day_diff(x: int, y: int) -> int:
    return abs((datetime.fromtimestamp(x) - datetime.fromtimestamp(y)).days)

Da się to jakoś przyspieszyć?

Oneliner jest wg mnie czytelniejszy, ale jak kto woli :D

Idealnie wg mnie nadało by się tutaj asyncio celem zoptymalizowania czasu.

1

(datetime.fromtimestamp(x) - datetime.fromtimestamp(y)).day

To daje wynik różny o jeden po zmnianie kolejności argumentów.

Spróbuj x//86400 - y//86400 - podzielenie całkowite przez liczbę sekund w dobie daje numer dnia od początku pewnej epoki. Przetestuj na brzegowych przypadkach, może byc wrażliwe na strefę czasową, sekundy przestępne.

1

Faktycznie, nie przetestowałem przypadku mniejszego niż 24h. To powinno załątwić sprawę.

def day_diff(x: float, y: float) -> int:
    date1 = datetime.fromtimestamp(x)
    date2 = datetime.fromtimestamp(y)
    diff = (date1 - date2) if x > y else (date2 - date1)
    return diff.days
0

Czy da się zrobić coś w stylu programowania dynamicznego aby jeżeli dużo razy wywołujemy z tą samą wartością to aby odczytywało z tablicy?

2

@Marcin Marcin python ma takie cudo jak @cache / @lru_cache nad funkcją które robi dokładnie to.
https://docs.python.org/3/library/functools.html

0

@Shalom: czy można takie cos policzyć przy Vectorize z numpy?

def day_diff(x: int) -> int:
    y = change_layer_value_timestamp
    date1 = datetime.datetime.fromtimestamp(x)
    date2 = datetime.datetime.fromtimestamp(y)
    diff = (date1 - date2) if x > y else (date2 - date1)
    return diff.days

vector_func = np.vectorize(day_diff)
day_difference_array = vector_func(newest_change_array)
1

@Marcin Marcin: trudno coś konkretnego poradzic ni widząc co ty dokładnie chcesz tam zrobić. Np.:

from time import time

import datetime
import numpy as np
import random
from functools import lru_cache

@lru_cache(maxsize=None)
def day_diff(x: float, y: float) -> int:
    date1 = datetime.datetime.fromtimestamp(x)
    date2 = datetime.datetime.fromtimestamp(y)
    diff = (date1 - date2) if x > y else (date2 - date1)
    return diff.days


def main():
    unique_dates = 1000
    n = 1000000
    dates = [(datetime.datetime.now() - datetime.timedelta(days=i)).timestamp() for i in range(unique_dates)]
    array = np.array([random.choice(dates) for _ in range(n)])
    reference = datetime.datetime.now().timestamp()
    day_diff_vectorized = np.vectorize(lambda x: day_diff(x, reference))
    s = time()
    diffs = day_diff_vectorized(array)
    print(time() - s)


main()

Liczy różnicę między dzisiaj a jednym z 1000 innych losowych timestampów, robi to milion razy i u mnie wykonuje sie <0.5 sekundy

Ile to jest dla ciebie za wolno? Jaki performance chcesz uzyskać? Bo jednak są jakieś fizyczne ograniczenia ;)

0

@Shalom: brakuje mi ramu. mam tablice numpt array 50 000 x 50 000 i nie jestem w stanie zastosować tej funkcji w klasie, która wykonuje tablice tymczasową na podstawie której wykonywane są dalsze obliczenia

Czy masz pomysł jak podejść do tego problemu?

1

tu chyba wystepuje problem XY. Dlaczego ta tablica jest rozmiaru 50000 x 50000 a nie 50000 x 2?
Moze podaj przykladowe dane wejsciowe (np pierwszych 5 wartosci).

2

@Marcin Marcin nadal nie wyjaśniłeś co konkretnie chcesz tam robić i cieżko cokolwiek ci poradzić a coraz bardziej wygląda na problem XY bo teraz twierdzisz ze problem masz z pamięcią a nie z czasem wykonania. Zdecyduj się. Napisz konkretnie:

  1. Co jest dane (daj może przykład?)
  2. Co chcesz osiągnąć
  3. Z czym masz właściwie problem

50k x 50k = 2.5 miliarda floatów, czyli choćby skłały srały to nie będzie mniej niż 10GB pamieci, a w większości jezyków zarządzanych będzie i ze 2 razy tyle.

0

@Shalom: postaram się opisać najprościej jak umiem:

Posiadam dane zapisane w formacie .tif każdy obrazek zawiera typ INT i jest skompresowany za pomocą algorytmu LZW zajmując około 50-150mb przy rozmiarze 50 000 x 50 000 px (około)

Dane obsługuję za pomocą pythona wersja 3.8.5 oraz biblioteki GDAL

Do wykonania operacji potrzebne są trzy rzeczy:
-warstwa z pikselami które zawierają wartość timestamp i odczytywane są z pliku
-jedna data od której będziemy porównywać
-warstwa z wynikiem oznaczającym liczbę dni będącą różnicą pomiędzy datą porównywaną oraz każdym pikselem z timestamp

Więc zaczynam od wczytania obrazka z wartościami:

newest_change_dataset = gdal.Open(str(self.__newest_change_layer_path), gdal.GA_Update) # zrobienie datasetu
newest_change_array = np.array(newest_change_dataset.GetRasterBand(1).ReadAsArray()) # wygenerowanie numpy array (nie jest skompresowany)
newest_change_no_data_value: int = int(newest_change_dataset.GetRasterBand(1).GetNoDataValue()) # wyciągnięcie wartości nodata

Następnie odczytuję jaką posiada wartość nodata czyli coś co mnie nie interesuje

Istotną wartością jest change_layer_value_timestamp która na 99% będzie zawsze większa od każdej z wartości w newest_change_array
Oznacza ona jedną z wartości, którą porównujemy

Następnie chciałbym stworzyć funkcje, która dla change_layer_value_timestamp oraz tablicy newest_change_array utworzy tablice day_difference_array wartościami różnicy w pełnych dniach tam gdzie jest faktycznie różnica dnia w kalendarzu mogąc pominąć wartości nodata w tablicy newest_change_array

Przykład funkcji: (jest ona poza klasą w której dzieje się ta logika)

@lru_cache(maxsize=100)
def day_diff(x: float, y: float) -> int:
    date1 = datetime.datetime.fromtimestamp(x)
    date2 = datetime.datetime.fromtimestamp(y)
    diff = (date1 - date2) if x > y else (date2 - date1)
    return diff.days

Wywołanie funkcji zwektoryzowanej:

day_diff_vectorized = np.vectorize(lambda x: day_diff(x, change_layer_value_timestamp))
day_difference_array = day_diff_vectorized(newest_change_array)

Istotna informacja jest to część klasy (wywoływanie jako metoda prywatna) oraz do dyspozycji jest ~40gb ram i trudno będzie to zwiększyć. Program ogólnie działa ale powinien liczyć się dla obrazka około 4min a liczy 36min po dodaniu tej warstwy z różnicą dni. Nie mam pomysłu na optymalizację pod kątem czasu. Dla niektórych danych dostaję:

Process finished with exit code 137 (interrupted by signal 9: SIGKILL)

Chyba za dużo ramu zużywam :/

1

Tak jak pisałem wyżej, masz 2.5 mld elementów w tablicy czyli minimum (!) 10GB na same dane po zdekompresowaniu i zrobieniu numpy array i to już przy założeniu że to sie jakoś sprytnie cachuje, bo same dane to 10GB + jeśli gdzieś są do nich referencje to kolejne 10GB. Zobacz ile pamięci proces zużywa po samym tylko wczytaniu danych, zanim zaczniesz jeszcze w ogóle liczyc te swoje różnice...
A ty teraz chcesz wygenerować kolejne 2.5 mld elementów z tymi twoimi diffami, czyli znów minimum (!) kolejne 10GB a możliwe że znów dwa razy tyle.

W kontekście pamięci może jesteś w stanie streamować te 2.5 mld elementów - do policzenia różnic wcale nie musisz wszystkiego na raz mieć w pamieci, równie dobrze mógłbyś łykać sobie to jakimiś chunkami, obcinając wymagania pamięciowe o połowę (potrzeba ci tylko tablicy na wynik)?

W kontekście czasu wykonania obawiam się ze cudów nie zdziałasz, chyba że przepiszesz to na jakieś C ;) Jeśli chcesz to robić gołym pythonem to od biedy możesz odpalić multiprocessing i wtedy czas podzielisz przez tyle ile masz dostępnych corów.

from multiprocessing import freeze_support
from time import time

import datetime
import functools
import multiprocessing
import numpy as np
import random
from functools import lru_cache


def multi(worker, data_list, processes):
    pool = multiprocessing.Pool(processes=processes)
    result = pool.map(worker, data_list)
    pool.close()
    return result


@lru_cache(maxsize=None)
def day_diff(x: float, y: float) -> int:
    date1 = datetime.datetime.fromtimestamp(x)
    date2 = datetime.datetime.fromtimestamp(y)
    diff = (date1 - date2) if x > y else (date2 - date1)
    return diff.days


def main():
    unique_dates = 1000
    n = 100000000
    cores = 8
    dates = [(datetime.datetime.now() - datetime.timedelta(days=i)).timestamp() for i in range(unique_dates)]
    array = np.array([random.choice(dates) for _ in range(n)])
    reference = datetime.datetime.now().timestamp()
    day_diff_vectorized = np.vectorize(functools.partial(day_diff, y=reference))
    s = time()
    diffs = multi(day_diff_vectorized, np.array_split(array, cores, axis=0), processes=cores)
    print(time() - s)


if __name__ == '__main__':
    freeze_support()  # windows quirk
    main()

Przyspieszenie jest czysto liniowe, więc np. jakbyś miał 16 rdzeni to zamiast 4.5 minuty miałbyś poniżej pół minuty.

0

@Shalom: Jak dodać rzutowanie aby nie trwało 1h do funkcji:

diffs = multi(day_diff_vectorized, np.array_split(array, cores, axis=0), processes=cores)

Próbuję podejrzeć debuggerem ale trwa już ~1h i nie może dojść do tego etapu a normalnie program wykonuje się w 5min
Tablica jest potrzebna w formacie numpy.array do obliczeń z późniejszą tablicą i powinna zawiera wartości int oraz mieć rozmar 50 000 x 50 000
Spróbowałem

diffs = np.array(diffs)

oraz

diffs = np.asarray(diffs, dtype=np.int)

Przykład wynikowej tablicy diff:

[array([[18544, 18544, 18544, ..., 18544, 18544, 18544],
       [18544, 18544, 18544, ..., 18544, 18544, 18544],
       [18544, 18544, 18544, ..., 18544, 18544, 18544],
       ...,
       [18544, 18544, 18544, ..., 18544, 18544, 18544],
       [18544, 18544, 18544, ..., 18544, 18544, 18544],
       [18544, 18544, 18544, ..., 18544, 18544, 18544]]), array([[18544, 18544, 18544, ..., 18544, 18544, 18544],
       [18544, 18544, 18544, ..., 18544, 18544, 18544],
       [18544, 18544, 18544, ..., 18544, 18544, 18544],
       ...,
       [18544, 18544, 18544, ..., 18544, 18544, 18544],
       [18544, 18544, 18544, ..., 18544, 18544, 18544],
       [18544, 18544, 18544, ..., 18544, 18544, 18544]]), array([[18544, 18544, 18544, ..., 18544, 18544, 18544],
       [18544, 18544, 18544, ..., 18544, 18544, 18544],
       [18544, 18544, 18544, ..., 18544, 18544, 18544],
       ...,
       [18544, 18544, 18544, ..., 18544, 18544, 18544],
       [18544, 18544, 18544, ..., 18544, 18544, 18544],
       [18544, 18544, 18544, ..., 18544, 18544, 18544]]), array([[18544, 18544, 18544, ..., 18544, 18544, 18544],
       [18544, 18544, 18544, ..., 18544, 18544, 18544],
       [18544, 18544, 18544, ..., 18544, 18544, 18544],
       ...,
       [18544, 18544, 18544, ..., 18544, 18544, 18544],
       [18544, 18544, 18544, ..., 18544, 18544, 18544],
       [18544, 18544, 18544, ..., 18544, 18544, 18544]]), array([[18544, 18544, 18544, ..., 18544, 18544, 18544],
       [18544, 18544, 18544, ..., 18544, 18544, 18544],
       [18544, 18544, 18544, ..., 18544, 18544, 18544],
       ...,
       [18544, 18544, 18544, ..., 18544, 18544, 18544],
       [18544, 18544, 18544, ..., 18544, 18544, 18544],
       [18544, 18544, 18544, ..., 18544, 18544, 18544]]), array([[18544, 18544, 18544, ..., 18544, 18544, 18544],
       [18544, 18544, 18544, ..., 18544, 18544, 18544],
       [18544, 18544, 18544, ..., 18544, 18544, 18544],

0

No debuggerem to lepiej byłoby jednak zmniejszyć rozmiar danych xD
To multi zwróci ci listę rozwiązań per worker i musisz to sobie poskładać. Jeśli wejściem była tablica 1000 elementowa i podzieliłeś na 10 corów to dostaniesz jako wynik 10 tablic po 100 elementów na wyjściu (chyba w zupełnie losowej kolejności)

0

@Shalom: Jak to złożyć w kolejności takiej jak weszło?

W zupełnie losowej kolejności? Miejsce położenia danego elementu w tablicy jest bardzo istotne i nie mogę ich zmienić. Czy jest możliwe złożenie tych elementów z powrotem do numpy array z zachowaniem kolejności?

Wynikiem jest lista zawierająca 22 (bo są 22 wątki) numpy array

0

Trudne pytania :P Można by to przerobic tak że worker dostaje jakis slice tablicy + "index" i przelicza i zwraca cały slice i potem możesz to poskładać na bazie indeksu. Przykład:

from multiprocessing import freeze_support
from time import time

import datetime
import functools
import multiprocessing
import numpy as np
import random
from functools import lru_cache


def multi(worker, data_list, processes):
    pool = multiprocessing.Pool(processes=processes)
    result = pool.map(worker, data_list)
    pool.close()
    return result


def slice_worker(data):
    index, array_slice, function = data
    return index, function(array_slice)


@lru_cache(maxsize=None)
def day_diff(x: float, y: float) -> int:
    date1 = datetime.datetime.fromtimestamp(x)
    date2 = datetime.datetime.fromtimestamp(y)
    diff = (date1 - date2) if x > y else (date2 - date1)
    return diff.days


def main():
    unique_dates = 1000
    n = 100
    cores = 4
    dates = [(datetime.datetime.now() - datetime.timedelta(days=i)).timestamp() for i in range(unique_dates)]
    array = np.array([random.choice(dates) for _ in range(n)])
    reference = datetime.datetime.now().timestamp()
    day_diff_vectorized = np.vectorize(functools.partial(day_diff, y=reference))
    s = time()
    diffs = multi(slice_worker, [(i, s, day_diff_vectorized) for i, s in enumerate(np.array_split(array, cores, axis=0))], processes=cores)
    print(diffs)
    res = np.concatenate([values for _, values in sorted(diffs, key=lambda x: x[0])], axis=0)
    print(res)
    print(time() - s)


if __name__ == '__main__':
    freeze_support()  # windows quirk
    main()
0

@Shalom: Poradziłem sobie ze składaniem jednak wywołuję program w klasie. To co masz w main

day_diff_vectorized = np.vectorize(functools.partial(day_diff, y=reference))
diffs = multi(slice_worker, [(i, s, day_diff_vectorized) for i, s in enumerate(np.array_split(array, cores, axis=0))], processes=cores)
res = np.concatenate([values for _, values in sorted(diffs, key=lambda x: x[0])], axis=0)

funkcje multi, slice_worker, day_diff są poza klasą. Czy po wykonaniu powinienem jakoś zwalaniach cache / usuwać obiekty

Po uruchomieniu programu po obliczeniu poprawnie jednego zestawu danych program nie przechodzi do kolejnego tylko się zawiesza i wymaga restartu. Po restarcie oblicza poprawnie.

Czy miałbyś pomysł co usuwać / zwalniać po wykonaniu się programu?

1

Dość istotna kwestia tutaj jest taka, że multiprocessing odpala nowy proces pythona w którym ładuje ten plik. Więc trzeba bardzo uważać jakie instrukcje masz w tym pliku, szczególnie jeśli nie są objęte if __name__ == '__main__':
Procesy potomne same się powinny pozabijać po pool.close() i jedyne istniejące obiekty to te zwrócone z funkcji więc nic specjalnie nie trzeba robić.

Co to znaczy zawiesza się? :) Nie ma czegoś takiego. Proces coś robi, pytanie tylko co? Dodaj może jakieś logi, albo odpal z debugerem na małym zestawie danych i zobacz co się tam dzieje.

0

@Shalom: Po wykonaniu funkcji multi i zwróceniu result zasoby nie zostały zwalniane. Przez to w pamięci zostawały procesy.

def multi(worker, data_list, processes):
pool = multiprocessing.Pool(processes=processes)
result = pool.map(worker, data_list)
pool.close()
return result

Po pool.close() powinno jeszcze być pool.join() ale nawet jak dodałem to program nie zwalniał zasobów. Utworzone podprocesy po wykonaniu się funkcji multi zostawały co skutkowało brakiem zasobów dla kolejnych iteracji (wykonania wielokrotnie funkcji na różnych danych).
Prawidłowo należy użyć context managera aby mieć zagwarantowane że nie będzie trzeba wzywać "killera" aby oczyścił miasto

def multi(worker, data_list, processes):
with concurrent.futures.ProcessPoolExecutor(max_workers=processes) as executor:
    return executor.map(worker, data_list)

Gwarantuje to zwolnienie wszystkich zasobów po wykonaniu się procesu.

Dobry poradnik:

0

To chyba jakieś linuxowe niedomaganie, bo windows nie robi żadnych problemów ze zwolnieniem zasobów po pool.close()
https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool

Warning multiprocessing.pool objects have internal resources that need to be properly managed (like any other resource) by using the pool as a context manager or by calling close() and terminate() manually

close()
Prevents any more tasks from being submitted to the pool. Once all the tasks have been completed the worker processes will exit.

Ale z tego co widzę to close może być nieblokujące i dlatego wymaga join żeby poczekać aż procesy faktycznie umrą.

0

@Shalom: Wykonywałeś na Windowsie taki kod 100 000 razy dla danych ~20gb?

0

Zamiast cachować diffa powinniście cacheować wrappera na funkcję fromtimestamp. Odejmowanie jest tanią operacją, a konwersja string to int lub odwrotnie tania już nie jest. Dodatkowo lru cache będzie osobno cachował diff(a, b) od diff(b, a) chociaż wynik jest ten sam.

0

@twoj_stary_pijany: Zrobiłem taką funkcje z lru_cache

@lru_cache(maxsize=None)
def day_diff(x: float, y: float, no_data: int) -> int:
    if x == no_data:
        return no_data
    date1 = datetime.datetime.fromtimestamp(x)
    date2 = datetime.datetime.fromtimestamp(y)
    diff = (date1 - date2) if x > y else (date2 - date1)
    return diff.days
0

Zamiast lru_cache(maxsize=None) lepiej jest używać cache — mniejszy narzut, bo nie ma tej całej procedury zapętlania LRU. IMO powinni w ogóle wywalić opcję ustawiania maxsize na None przez to, no ale cóż, kompatybilność wsteczna — cache doszło w Pythonie 3.9, zaś lru_cache w 3.2.

1 użytkowników online, w tym zalogowanych: 0, gości: 1