Wątek przeniesiony 2021-09-16 09:47 z Inne języki programowania przez cerrato.

[GO] Concurrent http request

1

Część, chciałem się was zapytać o małą pomoc, tak z ciekawości chciałem spróbować coś napisać wykorzystując go routine i tak sobie pomyślałem żeby odpytać API od github-a o rożne info dla niektórych repo.

W sumie nie wiem, czy to jest najlepszy przykład zęby wykorzystać go routine, ale wydaję mi się, że mając 4 procesory fizyczne wykorzystując go routine po prostu czas request-ów się zmniejszy, tak wygląda mniej więcej kod:

package main

import (
	"encoding/json"
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"runtime"
	"sync"
	"time"
)

type Repository struct {
	Id            int    `json:"id"`
	Name          string `json:"full_name"`
	ForksCount    int    `json:"forks_count"`
	WatchersCount int    `json:"watchers_count"`
	Contributors  []Contributor
	Languages     map[string]int
}

type Contributor struct {
	Id            int    `json:"id"`
	Url           string `json:"url"`
	Contributions int    `json:"contributions"`
}

type ChannelContributors struct {
	repo         string
	contributors []Contributor
}

type ChannelLanguages struct {
	repo      string
	languages map[string]int
}

func (j *Repository) prettyPrint() string {
	s, _ := json.MarshalIndent(j, "", "\t")
	return string(s)
}

func (r *Repository) AddContributor(contributor Contributor) {
	r.Contributors = append(r.Contributors, contributor)
}

func (r *Repository) AddLanguages(languages map[string]int) {
	r.Languages = languages
}

func (r *Repository) getContributors(client http.Client, ch chan<- *ChannelContributors, wg *sync.WaitGroup) {
	defer wg.Done()

	req, err := http.NewRequest("GET", fmt.Sprintf("%s/%s/contributors", "https://api.github.com/repos", r.Name), nil)
	if err != nil {
		ch <- nil
		log.Fatalln(err)
	}

	resp, err := client.Do(req)
	if err != nil {
		ch <- nil
		log.Fatalln(err)
	}

	defer resp.Body.Close()

	if resp.StatusCode != 200 {
		fmt.Printf("Contributors not found for repo %s | Status code: %d \n", r.Name, resp.StatusCode)
		ch <- nil
	}

	body, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		log.Fatalln(err)
	}

	var contributors []Contributor
	err = json.Unmarshal(body, &contributors)
	if err != nil {
		log.Println("Contributors unmarshal error: ", err)
		ch <- nil
	}

	fmt.Printf("Contributors for repo %s : %d \n", r.Name, len(contributors))

	data := new(ChannelContributors)
	data.repo = r.Name
	for _, c := range contributors {
		data.contributors = append(data.contributors, c)
	}
	ch <- data
}

func (r *Repository) getLanguages(client http.Client, ch chan<- *ChannelLanguages, wg *sync.WaitGroup) {
	defer wg.Done()

	req, err := http.NewRequest("GET", fmt.Sprintf("%s/%s/languages", "https://api.github.com/repos", r.Name), nil)
	if err != nil {
		log.Fatalln(err)
	}

	resp, err := client.Do(req)
	if err != nil {
		log.Fatalln(err)
	}

	defer resp.Body.Close()

	data := new(ChannelLanguages)
	data.repo = r.Name
	data.languages = nil

	if resp.StatusCode != 200 {
		fmt.Printf("Languages not found for repo %s | Status code: %d \n", r.Name, resp.StatusCode)
		data.languages = nil
		ch <- data
		return
	}

	body, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		ch <- data
		log.Fatalln(err)
	}

	languages := make(map[string]int)
	err = json.Unmarshal(body, &languages)
	if err != nil {
		log.Println("Languages unmarshal error: ", err)
		ch <- data
		return
	}

	data.languages = languages
	ch <- data
}

func main() {
	start := time.Now()

	numCPUs := runtime.NumCPU()
	fmt.Printf("Available processors: %d \n", numCPUs)
	runtime.GOMAXPROCS(numCPUs)

	repos := map[string]string{
		"DivanteLtd/vue-storefront": "https://api.github.com/repos/DivanteLtd/vue-storefront",
		"chebyrash/promise":         "https://api.github.com/repos/chebyrash/promise",
		"dunglas/mercure":           "https://api.github.com/repos/dunglas/mercure",
		"flutter/flutter":           "https://api.github.com/repos/flutter/flutter",
		"api-platform/api-platform": "https://api.github.com/repos/api-platform/api-platform",
		"Sylius/Sylius":             "https://api.github.com/repos/Sylius/Sylius",
		"gocolly/colly":             "https://api.github.com/repos/gocolly/colly",
	}
	reposData := make(map[string]*Repository, len(repos))

	timeout := time.Duration(2 * time.Second)
	client := http.Client{
		Timeout: timeout,
	}

	for k, repoUrl := range repos {
		req, err := http.NewRequest("GET", repoUrl, nil)
		if err != nil {
			log.Fatalln(err)
		}

		req.Header.Set("Authorization", "token %TOKEN%")
		resp, err := client.Do(req)
		if err != nil {
			log.Fatalln(err)
		}

		defer resp.Body.Close()

		if resp.StatusCode != 200 {
			log.Fatalf("Cannot get info about %s repo | Status code: %d \n", k, resp.StatusCode)
		}

		body, err := ioutil.ReadAll(resp.Body)
		if err != nil {
			log.Fatalln(err)
		}

		repo := new(Repository)
		err = json.Unmarshal(body, &repo)
		if err != nil {
			log.Fatalln(err)
		}
		reposData[k] = repo
	}

	contributorsChan := make(chan *ChannelContributors, len(reposData))
	languagesChan := make(chan *ChannelLanguages, len(reposData))
	wg := new(sync.WaitGroup)

	for k, _ := range reposData {
		wg.Add(len(reposData) * 2)
		go reposData[k].getContributors(client, contributorsChan, wg)
		go reposData[k].getLanguages(client, languagesChan, wg)
	}

	select {
	case contributorResult := <-contributorsChan:
		if len(contributorResult.contributors) > 0 {
			for _, c := range contributorResult.contributors {
				reposData[contributorResult.repo].AddContributor(c)
			}
		}

	case langResult := <-languagesChan:
		if langResult.languages != nil {
			reposData[langResult.repo].AddLanguages(langResult.languages)
		}
	}

	wg.Wait()
	close(contributorsChan)
	close(languagesChan)

	for k, _ := range reposData {
		fmt.Println(reposData[k].prettyPrint())
	}

	elapsed := time.Since(start)
	log.Printf("Took %s", elapsed)
}

No i mam 2 problemy:
– Kod wykonuję się bez przerwy tak jakby wait group nigdy się nie kończył
– Czasami nigdy nie czyta z jednej z dwóch go routine.

Ogólnie nie rozumiem zbytnio co robię źle nigdy nie pisałem w językach, które pozwalają pracować na thread (no ok js, ale jest single threaded i tam asynchroniczność działa trochę inaczej).

Czy ktoś może mi podpowiedzieć co robię źle? Nie chce rozwiązania.

Dzięki z góry za pomoc.

1
  • po pierwsze - GOMAXPROCS - nie ustawiaj tego. Go sobie poradzi :)

  • WaitGroups - to bym wypchnął poza funkcje getLanguages, getContributors etc - ona nie musi wiedzieć, że jest wywołana w oddzielnej routine. Mogą po prostu zwracać zwykły error

  • w linii 199 masz coś dziwnego.

wg.Add(len(reposData) * 2)

nie powinno być tam

wg.Add(2)

?? To pewnie jest ten moment, przez który Ci się "wiesza".

  • rozbij to na mniejsze funkcje, bo momentami ciężko się połapać co się gdzie dzieje
0

po pierwsze - GOMAXPROCS - nie ustawiaj tego. Go sobie poradzi

No ok ale chcialem zoabczyc roznice gdy go wykona to na jednym core i 2 watkach a gdy wykona na wszystkich dostepnych

w linii 199 masz coś dziwnego.

Wydaje mi sie ze wywolyjac x2 gorotuine w for bedzie ich po prostu 2 razy tyle ile repozytorium do sprawdzenia lub sie myle ?

Prawdopobnie problem byl taki ze select albo powinno byc w srodku for {} lub innej goroutine. Na sam koniec zrobilem inaczej i dziala:

package main

import (
	"encoding/json"
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"runtime"
	"sync"
	"time"
)

type Repository struct {
	Id            int    `json:"id"`
	Name          string `json:"full_name"`
	ForksCount    int    `json:"forks_count"`
	WatchersCount int    `json:"watchers_count"`
	Contributors  []Contributor
	Languages     map[string]int
	sync.Mutex
}

type Contributor struct {
	Id            int    `json:"id"`
	Url           string `json:"url"`
	Contributions int    `json:"contributions"`
}

type ChannelContributors struct {
	repo         string
	contributors []Contributor
}

type ChannelLanguages struct {
	repo      string
	languages map[string]int
}

func (j *Repository) prettyPrint() string {
	s, _ := json.MarshalIndent(j, "", "\t")
	return string(s)
}

func (r *Repository) AddContributor(contributor Contributor) {
	r.Lock()
	defer r.Unlock()
	r.Contributors = append(r.Contributors, contributor)
}

func (r *Repository) AddLanguages(languages map[string]int) {
	r.Lock()
	defer r.Unlock()
	r.Languages = languages
}

func getRepoInfo(client http.Client, repos map[string]string, reposData map[string]*Repository) {
	for k, repoUrl := range repos {
		req, err := http.NewRequest("GET", repoUrl, nil)
		if err != nil {
			log.Fatalln(err)
		}

		req.Header.Set("Authorization", "token ")
		resp, err := client.Do(req)
		if err != nil {
			log.Fatalln(err)
		}

		defer resp.Body.Close()

		if resp.StatusCode != 200 {
			log.Fatalf("Cannot get info about %s repo | Status code: %d \n", k, resp.StatusCode)
		}

		body, err := ioutil.ReadAll(resp.Body)
		if err != nil {
			log.Fatalln(err)
		}

		repo := new(Repository)
		err = json.Unmarshal(body, &repo)
		if err != nil {
			log.Fatalln(err)
		}
		reposData[k] = repo
	}
}

func getContributors(client http.Client, repositories []string, wg *sync.WaitGroup) <-chan *ChannelContributors {

	contributorsChan := make(chan *ChannelContributors, len(repositories))

	for _, repo := range repositories {
		wg.Add(1)
		go func(repo string) {
			defer wg.Done()

			req, err := http.NewRequest("GET", fmt.Sprintf("%s/%s/contributors", "https://api.github.com/repos", repo), nil)
			if err != nil {
				contributorsChan <- nil
				log.Fatalln(err)
			}

			req.Header.Set("Authorization", "token ")
			resp, err := client.Do(req)
			if err != nil {
				contributorsChan <- nil
				log.Fatalln(err)
			}

			defer resp.Body.Close()

			if resp.StatusCode != 200 {
				fmt.Printf("Contributors not found for repo %s | Status code: %d \n", repo, resp.StatusCode)
				contributorsChan <- nil
			}

			body, err := ioutil.ReadAll(resp.Body)
			if err != nil {
				log.Fatalln(err)
			}

			var contributors []Contributor
			err = json.Unmarshal(body, &contributors)
			if err != nil {
				log.Println("Contributors unmarshal error: ", err)
				contributorsChan <- nil
			}

			fmt.Printf("Contributors for repo %s : %d \n", repo, len(contributors))

			data := new(ChannelContributors)
			data.repo = repo
			for _, c := range contributors {
				data.contributors = append(data.contributors, c)
			}
			contributorsChan <- data
		}(repo)
	}

	return contributorsChan
}

func getLanguages(client http.Client, repositories []string, wg *sync.WaitGroup) <-chan *ChannelLanguages {
	languagesChan := make(chan *ChannelLanguages, len(repositories))

	for _, repo := range repositories {
		wg.Add(1)
		go func(repo string) {
			defer wg.Done()

			req, err := http.NewRequest("GET", fmt.Sprintf("%s/%s/languages", "https://api.github.com/repos", repo), nil)
			if err != nil {
				log.Fatalln(err)
			}

			req.Header.Set("Authorization", "token ")
			resp, err := client.Do(req)
			if err != nil {
				log.Fatalln(err)
			}

			defer resp.Body.Close()

			data := new(ChannelLanguages)
			data.repo = repo
			data.languages = nil

			if resp.StatusCode != 200 {
				fmt.Println(req.Header.Get("Authorization"))
				fmt.Printf("Languages not found for repo %s | Status code: %d \n", repo, resp.StatusCode)
				data.languages = nil
				languagesChan <- data
				return
			}

			body, err := ioutil.ReadAll(resp.Body)
			if err != nil {
				languagesChan <- data
				log.Fatalln(err)
			}

			languages := make(map[string]int)
			err = json.Unmarshal(body, &languages)
			if err != nil {
				log.Println("Languages unmarshal error: ", err)
				languagesChan <- data
				return
			}

			fmt.Printf("Languages for repo %s : %d \n", repo, len(languages))

			data.languages = languages
			languagesChan <- data
		}(repo)
	}

	return languagesChan
}

func fetchFromChannels(reposData map[string]*Repository, contributorsChan <-chan *ChannelContributors, languagesChan <-chan *ChannelLanguages) {
	for {
		select {

		case contributorResult := <-contributorsChan:
			if contributorResult != nil {
				for _, c := range contributorResult.contributors {
					reposData[contributorResult.repo].AddContributor(c)
				}
			}

		case langResult := <-languagesChan:
			if langResult != nil {
				reposData[langResult.repo].AddLanguages(langResult.languages)
			}
		}
	}
}

func main() {
	start := time.Now()

	var wg sync.WaitGroup

	numCPUs := runtime.NumCPU()
	fmt.Printf("Available processors: %d \n", numCPUs)
	runtime.GOMAXPROCS(2)

	repos := map[string]string{
		"DivanteLtd/vue-storefront": "https://api.github.com/repos/DivanteLtd/vue-storefront",
		"chebyrash/promise":         "https://api.github.com/repos/chebyrash/promise",
		"dunglas/mercure":           "https://api.github.com/repos/dunglas/mercure",
		"flutter/flutter":           "https://api.github.com/repos/flutter/flutter",
		"api-platform/api-platform": "https://api.github.com/repos/api-platform/api-platform",
		"Sylius/Sylius":             "https://api.github.com/repos/Sylius/Sylius",
		"gocolly/colly":             "https://api.github.com/repos/gocolly/colly",
	}
	reposData := make(map[string]*Repository, len(repos))

	timeout := time.Duration(2 * time.Second)
	client := http.Client{
		Timeout: timeout,
	}

	getRepoInfo(client, repos, reposData)

	repositories := make([]string, 0, len(repos))
	for k, _ := range repos {
		repositories = append(repositories, k)
	}

	contributorsChan := getContributors(client, repositories, &wg)
	languagesChan := getLanguages(client, repositories, &wg)

	go fetchFromChannels(reposData, contributorsChan, languagesChan)

	wg.Wait()

	for k, _ := range reposData {
		fmt.Printf("Repo %s | CONTRIBUTORS: %d | LANGUAGES: %d \n", k, len(reposData[k].Contributors), len(reposData[k].Languages))
	}

	elapsed := time.Since(start)
	log.Printf("Took %s", elapsed)
}

Pewnie ze mozna napisac to lepiej ale poki co interesuje mnie tylko zrozumienie goroutines, zastanawia mnie tylko fakt ze program nie zmienia dzialania czy uzywaj buffered channel lub gdy uzyje unbuffered. Mozliwe ze majac fetchFromChannels jako goroutine po prostu czy channel jest unbuffered lub mnie ciagle z niego czyta wiec po prostu nic nie blokuje, choc wydaje mi sie ze z buffered jest ciut szybsze bo wtedy chyba odczyt z takiego channel-a nastepuje tylko wtedy gdy go calego zapelnimy jesli dobrze zrozumialem.

Musze jeszce tylko popracowac troche nad error handling gdy ktorys z requestow sie nie wykona poprawnie

0

Generalnie Twoje problemy po obserwacji:

  1. Twoje funkcje robią syf, mydło i powidło, czytaj: za dużo dziwnych zadań, za mało rozsądnego podziału - ja z Twojej jednej zrobiłbym 5. To najważniejszy problem, który powoduje pozostałe:
  2. Dziwnie podajesz WaitGrupę, ja bym się bał zakleszczenia jakiegoś tam głębiej tej WaitGrupy lub co najmniej niewłaściwej synchronizacji całości. Pilnuj się również, jak używasz map przy goroutines, nie chce mi się sprawdzać czy gdzieś tam głębiej je edytujesz
  3. Zdajesz sobie sprawę, ze w związku jak to poukładałeś sprawy w funkcjach getContributors() i getLanguages(), to one mogą się jeszcze mielić jak dojdzie do odpalenia fetchFromChannels()? Śledziłeś to tracem i sprawdzałeś co się dzieje przy kilkukrotnym odpaleniu?

// Edit: właśnie widzę, że @no_solution_found już Ci część spraw zasugerował

0

ja bym zasugerował, żebyś na początku zrobił to w pełni synchronicznie. Łatwiej będzie niektóre rzeczy ogarnąć :)

co do WaitGroup to możesz to zrobić np tak:

go func(){
   wg.Add(1)
   defer wg.Done()
   contributors := getContributors(client, repositories)
   // tutaj dodajesz gdzie chcesz
}

funkcja, którą wykonujesz asynchronicznie nie musi wcale o tym wiedzieć :) na moje oko przekombinowałeś.

No ok ale chcialem zoabczyc roznice gdy go wykona to na jednym core i 2 watkach a gdy wykona na wszystkich dostepnych

tutaj masz głównie operacje na IO i to internet będzie wąskim gardłem - nie CPU, ale oczywiście możesz sprawdzić.

Ah i te pętle można by było pociąć, bo masz trochę duplikacji i dzieje się w nich tyle, że ho ho

0

Defer w pętli to błąd (zauważyłem to w jednej funkcji). Zdeferowane wywołanie nie wykona się wraz z końcem pętli, wywoła się przy wyjściu z funkcji.
Dodatkowo jest kwesta tego na co wskazuje response.Body podczas wywołania zdeferowanej funkcji. Response.Body to interfejs, a implementacja na 99% przyjmuje wskaźnik. Skutek? Wywoła się Close z recieverem ustawionym na ostatnio używaną wartości implementacji response.Body, za każdym razem tą samą, mimo, że w pętli używałeś kilku różnych response.Body .

0

@TurkucPodjadek:

Dziwnie podajesz WaitGrupę, ja bym się bał zakleszczenia jakiegoś tam głębiej tej WaitGrupy lub co najmniej niewłaściwej synchronizacji całości. Pilnuj się również, jak używasz map przy goroutines, nie chce mi się sprawdzać czy gdzieś tam głębiej je edytujesz

Nie wiem wiekszosc artykolow na ten temat i kodu ktory analizuje zeby sie z nich nauczyc wydaje mi sie ze tak wlasnie robia, nvm

Zdajesz sobie sprawę, ze w związku jak to poukładałeś sprawy w funkcjach getContributors() i getLanguages(), to one mogą się jeszcze mielić jak dojdzie do odpalenia fetchFromChannels()? Śledziłeś to tracem i sprawdzałeś co się dzieje przy kilkukrotnym odpaleniu?

No tak ale fetchFromChannels jest 2 goroutine ktora nie robic nic innego oprocz tego ze czyta z channel-ow gdzie pisza 2 inne funkcje wiec chyba o to chodzi ze jedna mieli contributors/languages druga bierze wynik i tak dalej. Byc moze brakuje tam tylko default z delayem 10ms jesli nic nie przychodzi na tych dwoch innych channel-ach i jakiegos channel quit ktory bedzie powiadamial ze obydwie getContributors() i getLanguages() skonczyly to co mialy zrobic ale nie bardzo wiem jak to zrobic.

@no_solution_found

ja bym zasugerował, żebyś na początku zrobił to w pełni synchronicznie. Łatwiej będzie niektóre rzeczy ogarnąć

Nie ma na to zbytnio czasu robie to w pracy w wolnych chwilach nie ma sensu tego robic synchronicznie ani lepiej jest to kod ktory ma dzialac potem idzie do kosza tyle. Lepiej to moge robic na codzien taski w pracy albo jakies moje projekty na ktorych nie mam czasu ani checi :D

tutaj masz głównie operacje na IO i to internet będzie wąskim gardłem - nie CPU, ale oczywiście możesz sprawdzić.

No tak sa to operacje i/o bound nie cpu bound, ale scheduler w go z tego co wyczytalem to polaczenie preemptive z cooperative czyli chyba bo jakims czasie (10ms ?) scheduler da sie wykonac nastepnej goroutine co z przypadku request-ow jest chyba ok ?

co do WaitGroup to możesz to zrobić np tak:

Mozesz rozwinac? W takim przypadku getContributors() nie dodaje nic do waitGroup tak ?

Ah i te pętle można by było pociąć, bo masz trochę duplikacji i dzieje się w nich tyle, że ho ho

Tak racja rozbije to na jakis "generyczny" fetch i unmarshal

@nalik

Defer w pętli to błąd (zauważyłem to w jednej funkcji). Zdeferowane wywołanie nie wykona się wraz z końcem pętli, wywoła się przy wyjściu z funkcji.

Ale defer mam fakt ze w petli ale goroutine ktora jest tam wykonywana nie jest to ok? Tez tak widze w artach i w roznych zrodlach ktore ogladam na gh

Dodatkowo jest kwesta tego na co wskazuje response.Body podczas wywołania zdeferowanej funkcji. Response.Body to interfejs, a implementacja na 99% przyjmuje wskaźnik. Skutek? Wywoła się Close z recieverem ustawionym na ostatnio używaną wartości implementacji response.Body, za każdym razem tą samą, mimo, że w pętli używałeś kilku różnych response.Body

Tzn mozesz wskazac miejsce gdzie jest ten problem ?

0

Zakres defer to funkcja, nie blok pętli. Wykona się dopiero przy wychodzeniu z funkcji.
https://play.golang.org/p/Pz_IVOs_856

Dla porównania, gdyby zakresem x była pętla:
https://play.golang.org/p/3Gy_nKk178y

W sumie u ciebie zakresem zmiennej body jest pętla, wiec chyba jest OK.

Go routine tworzone w pętli jest OK.

0
marcio napisał(a):

No tak sa to operacje i/o bound nie cpu bound, ale scheduler w go z tego co wyczytalem to polaczenie preemptive z cooperative czyli chyba bo jakims czasie (10ms ?) scheduler da sie wykonac nastepnej goroutine co z przypadku request-ow jest chyba ok ?

Tak jak napisałeś. Go routines to tak naprawdę coroutine, ale bez możliwości ręcznego sterowania nimi. Context switch następuję podczas predefiniowanych zdarzeń, jak : użycie go, odśmiecanie pamięci, syscall, synchronizacja (czyli także odczyt z kanału) [dygresja: w podobny sposób działa scheduler w Linuxie, kontekst switch następuję przy syscalach]. To nie są watki systemowe, go routines rozkładają się na pule wątków systemowych.

marcio napisał(a):

co do WaitGroup to możesz to zrobić np tak:

Mozesz rozwinac? W takim przypadku getContributors() nie dodaje nic do waitGroup tak ?

Myślę, że nie zrozumiałeś kolegi. Nie chodzi o to, że u Ciebie wykona się żle, lecz o to, żeby rozdzielić asynchroniczne wykonanie i synchronizację funkcji od samej zawartości tej funkcji. Tak zwane "single responsibility principle". Funkcja powinna opakować jakąś logikę, nazwijmy ją biznesową. Z kolei asynchroniczne wykonanie tej funkcji zajmie się wywołanie tej funkcji i zarządzaniem synchronizacją.

1 użytkowników online, w tym zalogowanych: 0, gości: 1