Połączenie TreeMaps z takimi samym kluczami z roznych watków, List<CompletableFuture> + stream

0

Mam listę List<File>:

"Hanna Kowalska nie lubi plackow.txt"
"Anna Kowalska też nie lubi plackow.txt"
"Mateusz Kowalski lubi placki.txt"
"placki je tylko Katarzyna Kowalska.txt"
"a robi je Anna Kowalska.txt"
"bla bla Katarzyna Kowalska bla.txt"
itd.

ktorą dzielę na części (List<List<File>>).

W pierwszej części znajdują się:

"Hanna Kowalska nie lubi plackow.txt"
"Anna Kowalska też nie lubi plackow.txt"
"Mateusz Kowalski lubi placki.txt"
itd.

W drugiej są:

"placki je tylko Katarzyna Kowalska.txt"
"a robi je Anna Kowalska.txt"
"bla bla Katarzyna Kowalska bla.txt"
itd.

To tylko przykład, ale wszystkich plików jest ok 100 tys.

Dla każdej części odpalam pojedynczy wątek. Każdy wątek buduje TreeMap<String, HashMap<String, Integer>>.

Pierwszy wątek buduje:
Hanna Kowalska, (Hanna Kowalska nie lubi plackow.txt, 13) gdzie 13 to liczba dopasowanych znaków z imieniem i nazwiskiem.
Anna Kowalska, (Anna Kowalska też nie lubi plackow.txt, 12) gdzie 12 to liczba dopasowanych znaków z imieniem i nazwiskiem.
itd.

W drugim wątku powstaje:
Anna Kowalska, (a robi je Anna Kowalska.txt, 12) gdzie 12 to liczba dopasowanych znaków z imieniem i nazwiskiem.
itd.

Chce polaczyć wszystkie mapy TreeMap<String, HashMap<String, Integer>> do jednej, ale one moga zawierac te same klucze - wiec niektore wartosci muszą zostac pobrane i dołączone. Tutaj akurat są tylko dwie mapy ale w rzeczywistości ma być ich ok 1 tys.

Próbuje używać List<CompletableFuture> i Stream w java 8, ale nie rozumiem jak to połączyć i jak to robić dalej.

Oczywiście nie musi to być robione za pomocą stream, ale innej możliwości nie znalazłem albo nie znam.

private void runThreads(){

List<File> filesLists = Arrays.asList(filesAndFoldersArray);

List<List<File>> filesPartsList = divideArrayIntoChunks(filesLists , 50);

TreeMap<String, HashMap<String, Integer>> namesAndFileNamesMatchingCharactersMap = new TreeMap<String, Map<String, Integer>>();

List<CompletableFuture<TreeMap<String, Map<String, Integer>>>> builderPartsMapFutures = filesArrays.stream()
    		        .map(filesList -> CompletableFuture.supplyAsync(() -> buildNamesAndFileNamesWithMatchingCharactersMapForEachChunk(filesList...), executor) // jak dalej?
}

private TreeMap<String, HashMap<String, Integer>> buildNamesAndFileNamesWithMatchingCharactersMapForEachChunk(...){

...// algorytm poszukiwania imion i nazwisk w nazwach plików, nie może zostać zoptymalizowany dla pojedynczego wątka

}
2

Na szybko wymodziłem coś takiego, wygląda, że działa. Możesz potraktować to jako szkielet rozwiązania.

public class Main {

    public static void main(String[] args) {
        List<Map<String, Integer>> input = Arrays.asList(
                Map.of("ala", 2, "kasia", 1, "adam", 5),
                Map.of("ala", 2, "kasia", 1, "adam", 5),
                Map.of("ala", 2, "kasia", 1, "adam", 5),
                Map.of("ala", 2, "kasia", 1, "adam", 5),
                Map.of("ala", 2, "kasia", 1, "adam", 5),
                Map.of("ala", 2, "kasia", 1, "adam", 5),
                Map.of("ala", 2, "kasia", 1, "adam", 5) // 7 razy to samo
        );

        ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors()); // 12 na moim Maczq <3

        Map<String, Integer> result = forkJoinPool.invoke(new MergeTask(input, 0, input.size(), forkJoinPool));

        System.out.println(result);
    }

    static class MergeTask extends RecursiveTask<Map<String, Integer>> {

        private final ArrayList<Map<String, Integer>> mapsToMerge;
        private final int begin;
        private final int end;

        public MergeTask(List<Map<String, Integer>> input, int begin, int end) {
            this.mapsToMerge = new ArrayList<>(input);
            this.begin = begin;
            this.end = end;
        }

        @Override
        protected Map<String, Integer> compute() {
            System.out.printf("Thread [%s], compute begin = %d, end = %d\n", Thread.currentThread().getName(), begin, end);

            int elements = Math.max(0, end - begin);
            if (elements <= 0) {
                return Map.of();
            } else if (elements == 1) {
                return mapsToMerge.get(begin);
            } else if (elements == 2) {
                return simpleMerge(mapsToMerge.get(begin), mapsToMerge.get(end - 1));
            } else {
                return computeRecursively();
            }
        }

        private Map<String, Integer> computeRecursively() {
            int mid = (end + begin) / 2;
            var left = new MergeTask(mapsToMerge, begin, mid);
            var right = new MergeTask(mapsToMerge, mid, end);
            left.fork();
            var rightResult = right.compute();
            var leftResult = left.join();
            return simpleMerge(leftResult, rightResult);
        }

        private Map<String, Integer> simpleMerge(Map<String, Integer> left, Map<String, Integer> right) {
            Map<String, Integer> result = new HashMap<>();
            left.forEach((k, v) -> result.merge(k, v, Integer::sum));
            right.forEach((k, v) -> result.merge(k, v, Integer::sum));
            return result;
        }
    }
}

Output:

Thread [ForkJoinPool-1-worker-19], compute begin = 0, end = 7
Thread [ForkJoinPool-1-worker-19], compute begin = 3, end = 7
Thread [ForkJoinPool-1-worker-5], compute begin = 0, end = 3
Thread [ForkJoinPool-1-worker-23], compute begin = 3, end = 5
Thread [ForkJoinPool-1-worker-19], compute begin = 5, end = 7
Thread [ForkJoinPool-1-worker-9], compute begin = 0, end = 1
Thread [ForkJoinPool-1-worker-5], compute begin = 1, end = 3
{adam=35, kasia=7, ala=14}

EDIT: dorzucam jeszcze czasy wykonania programu dla różnej wielkości listy i liczby wątków. Core'ów jest 12. Sposób pomiaru lamerski - przez System.currentTimeMillis(). Co ciekawe - przy 10k elementach optymalnie jest zrobić parallelism 4. Dobrym pomysłem może być spawnowanie większej liczby subtasków niż 2.

Czasy (12 wątków)

1k elementów - 20 ms
10k elementów - ~150 ms
100k elementów - ~6400 ms (pewnie GC)

Czasy (10k elementów)

1 wątek - 170 ms
4 wątki - 120 ms
12 wątków - ~150 ms (!!!)
24 wątki - ~200 ms (!!!)

1

Ostatecznie zrobilem to tak:

	public void mainMethod(){

                ...
		
		// get max available threads
		int maxNumberOfThreads = Runtime.getRuntime().availableProcessors();

		List<List<File>> filesLists = divideArrayIntoChunks(Arrays.asList(filesArray), maxNumberOfThreads);

		int numberOfTasks = filesLists.size();

		List<CompletableFuture<TreeMap<String, HashMap<String, Integer>>>> builderPartsMapFutures = Lists
				.newArrayList();

		for (int i = 0; i < numberOfTasks; i++) {

			List<File> filesList = filesLists.get(i);

			builderPartsMapFutures.add(getCompetableFutureResult(filesList, fileNameAndCharactersAmountMatchersMap,
					namesElementsMap, namesAndFileNamesMatchingCharactersMap, i));
		}

		@SuppressWarnings("rawtypes")
		CompletableFuture[] futureResultArray = builderPartsMapFutures
				.toArray(new CompletableFuture[builderPartsMapFutures.size()]);

		CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(futureResultArray);

		CompletableFuture<List<TreeMap<String, HashMap<String, Integer>>>> finalResults = combinedFuture.thenApply(
				voidd -> builderPartsMapFutures.stream().map(future -> future.join()).collect(Collectors.toList()));

		combinedFuture.get();
		
		finalResults.thenAccept(result -> System.out.println(result));
		
	}
		
	public static CompletableFuture<TreeMap<String, HashMap<String, Integer>>> getCompetableFutureResult(
			List<File> filesList, HashMap<String, Integer> fileNameAndCharactersAmountMatchersMap,
			TreeMap<String, ArrayList<String>> namesElementsMap,
			TreeMap<String, Map<String, Integer>> namesAndFileNamesMatchingCharactersMap, int i) {

		return CompletableFuture
				.supplyAsync(() -> buildNamesAndFileNamesWithMatchingCharactersMapForEachChunk(filesList,
						fileNameAndCharactersAmountMatchersMap, namesElementsMap,
						namesAndFileNamesMatchingCharactersMap, i));
	}

Z tymże wyniki są fatalne dla pojedynczego wątku trwało to 10 minut.
A dla 7 wątków trwa już 15 minut i bałem się, że procesor się spali.
Wczesniej dla wszystkich watkow weszlo po 20 minutach.
Kompletnie tego nie rozumiem.

Jesli chodzi o czas to metoda buildNamesAndFileNamesWithMatchingCharactersMapForEachChunk(), ktora porownuje Stringi z imionami i nazwiskami z nazwami plików (i jak jest blad w zapisie to tez znajdzie) to wlasnie zajmuje kosmiczna ilosc czasu. Dla ok 20 tys. imion i nazwisk, przelatuje kazdą nazwe pliku, a plikow jest ok 1 tys.

Nie rozumiem skad to wynika, przeciez to sa tylko operacje na stringach.

1

A możesz wyjaśnić na pseudokodzie, jak to działa? Jakie są Twoje założenia/przewidywania co do poziomu zrownoleglenia tego kodu? Pokaz w jaki sposób mergujesz TreeMapy, powinieneś robić to liniowo. Przydałoby się również odpalić to na własnej puli wątków.

0

Dobra udalo mi sie uzyskac rezultaty w 10 sekund, po prostu algorytm przeszukiwania stringow sie zapetlil, ale go zmienilem.
Z kolei mam problem z konwersja List<TreeMap... do TreeMap, nie znam streamow i nie znam sie na streamach.
Ogarnalem jakis watek, na stacku, ale to nie zwraca tego co chce.

Mam:

CompletableFuture<List<TreeMap<String, HashMap<String, Integer>>>> finalResults = combinedFuture.thenApply(
                voidd -> builderPartsMapFutures.stream().map(future -> future.join()).collect(Collectors.toList()));

I chcę to List<TreeMap<String, HashMap<String, Integer>>> przekonwertować do pojedynczej TreeMapy:

finalResults.thenAccept(result -> result);
// gdzie result zwraca List<TreeMap<String, HashMap<String, Integer>>>

Rozpisuje dalej wg. tego

TreeMap<String, HashMap<String, Integer>> finalTreeMap = finalResults.thenAccept(result -> result.stream()
      .collect(Collectors
      .toMap(k -> k, v -> v)));

Ale w rzeczywistosci zwraca mi to CompletableFuture<Void>.
Jak z tego result: finalResults.thenAccept(result -> result); wyciagnac i zmergowac do pojedynczej TreeMapy.

1

@zgrzyt: sugeruję po prostu zapoznać się z dokumentacją. Przecież w CompletableFuture thenAccept zwraca voida.

0

Dobra to bylo proste jednak. Wystarczylo do finalResults dodac get(). Sprawa zamknieta.

1 użytkowników online, w tym zalogowanych: 0, gości: 1