ConcurrentDictionary, Parallel.Foreach - nie dodaje obiektów do kolekcji

0

hej, nie uzywalem nigdy ConcurrentDictionary, próbuje pobrac ze Storage pliki, jednak jest ich całkiem sporo, więc chciałem pobierać jednocześnie wiele
ustukałem taki kod

        public static async Task<ConcurrentDictionary<string, Stream>> GetBlobsParallel(
            BlobContainerClient container,
            IEnumerable<string> filesPaths,
            int division=500)
        {
            var distinctFilesPath = filesPaths.Distinct();
            var result = new ConcurrentDictionary<string, Stream>();
            var divided = Divide(distinctFilesPath, division);
            Parallel.ForEach(divided, async (files) =>
            {
                foreach (var file in files)
                {
                    try
                    {
                        var blob = await (GetBlob(container, file));
                        result.TryAdd(file, blob);
                    }
                    catch
                    {
                        result.TryAdd(file, FileNotFoundStream());
                    }
                    
                }
            });
            
            return result;
        }

        static MemoryStream FileNotFoundStream() => new MemoryStream(Encoding.UTF8.GetBytes("FILE NOT FOUNND"));

Jednak problem jest w tym, że od razu przechodzi do return result, no chyba że sobie wolno debuguje, to pare razy trafi do try, ale i tak result jest zawsze pusty (0 elementów)
Co tu jest nie tak ?

sama metoda Getblob działa jak coś ;)

a metoda Divide działa na mniejsze IEnumerable

   static IEnumerable<IEnumerable<T>> Divide<T>(IEnumerable<T> data, int division)
        {
            var result = new List<IEnumerable<T>>();
            int skip = 0;
            IEnumerable<T> divedData = data.Skip(skip).Take(division);
            while (divedData.Any())
            {
                result.Add(divedData);
                skip += division;
                divedData = data.Skip(skip).Take(division);
            }
            return result;
        }

btw program nadal "działa" ale zawiesza sie całe visual :D

0

kurcze wyglada, ze niby dobrze robie.
zrobilem sobie jakis przykladowy kodzik podobny do tego

var list = new List<string>();
var division = 500;
for (var i = 0; i < 10000; i++)
    list.Add(i.ToString());


var x = GetData(list, division);
foreach(var item in x)
{
    Console.WriteLine(item.ToString());
}


IEnumerable<IEnumerable<T>> Divide<T>(IEnumerable<T>data, int division)
{
    var result = new List<IEnumerable<T>>();
    int skip = 0;
    IEnumerable<T> divedData = data.Skip(skip).Take(division);
    while(divedData.Any())
    {
        result.Add(divedData);
        skip += division;
        divedData = data.Skip(skip).Take(division);
    }
    return result;
}

ConcurrentDictionary<string, Guid> GetData(IEnumerable<string> list, int division)
{
    var result = new ConcurrentDictionary<string, Guid>();
    var divided = Divide(list, division);
    Parallel.ForEach(divided, div =>
    {
        foreach (var d in div)
        {
            var guid = Guid.NewGuid();
            result.TryAdd(d.ToString(), guid);
        }
    });
    return result;
}

i normalnie smiga

ale w kodzie, ktory umiescilem w pierwszym poscie nie leci zaden wyjatek :/

5
  1. Metoda Divide: nie wiem jakiej wersji .NET używasz, ale od wersji 6 jest metoda Linq pod nazwą Chunk, która robi właśnie to czego oczekujesz.
  2. Metoda Parallel.ForEach wykonuje zadania równolegle, ale sama w sobie działa synchronicznie.
  3. Przez to, że do body metody Parallel.ForEach wrzucasz async niejawnie tworzysz sobie taski i na nich operuje ta metoda. Nigdzie ich nie awaitujesz, więc metoda równolegle sobie tworzy x tasków, ale nigdzie nie czekasz na ich zakończenie.
  4. W kodzie z twojego drugiego posta nie wrzucasz async jako akcję do wykonania, przez co wszystko działa jak należy.
0

ej noi faktycznie, dzieki :D ale jak w takim razie awaitować to w ładniejszy sposób w parallel.foreach

var blob = GetBlob(container, file).GetAwaiter().GetResult();
2

Możesz użyć Parallel.ForEachAsync dostepnego chyba od .NET 6 :P

3

Tak jak @some_ONE, albo w ogóle zaniechać pomysłu dzielenia filesPaths na równe części, tylko wrzucić wszystkie elementy do ConcurrentBag, który będzie pełnił rolę takiego "worka", z którego x tasków będzie sobie wyciągać elementy po jednym, a następnie przetwarzać. Ograniczysz problem, kiedy na przykład 10 taksów wykona się w 1 sekundę i będą czekać na jeden, który się będzie wykonywał 20 sekund (bo przypadkowo trafi mu się 5 długotrwałych elementów).

1 użytkowników online, w tym zalogowanych: 0, gości: 1