Synchornizacja danych pomiędzy procesami.

0

Cześć próbuję wykonać zadanie w którym przy pomocy pamięci dzielonej posortuje tablicę liczb całkowitych(nie tylko). Program rozwidlam na dwa procesy przy pomocy fork(), dane próbuję synchronizować przy pomocy kolejek komunikatów oraz semafor. Jednak program zachowuje się całkowicie losowo i nie wiem do końca gdzie mam błąd. Czasami program zawiesza się po wyświetleniu wylosowanych liczb a czasami wyswietla je jednak nie posortowane, w ostateczności zadziała prawidłowo. Czy mógłby ktoś zerknąć i sprawdzić czemu tak się dzieje z góry dziękuję.

#include <stdlib.h>  
#include <fcntl.h> 
#include <mqueue.h> 
#include <sys/mman.h>
#include <string.h>
#include <semaphore.h> 

enum operation{
	SORT_ASC,SORT_DESC,MAXMIN_NUMBER

};


struct message {
	char name[30];
	size_t size;
	enum operation dir;
	
};

#define SEM_NAME "/semaphore"
#define QUEUE_NAME "/queue1.txt"


int main(void)
{
	mqd_t  mq; 
	sem_t *sem;
	struct mq_attr attr;
	attr.mq_maxmsg  = 8;  
	attr.mq_flags = 0;
	attr.mq_msgsize = sizeof(struct message) +  sizeof(enum operation); 

	sem_t *semaphore = sem_open(SEM_NAME, O_CREAT, 0644, 0);
	if (semaphore == SEM_FAILED) {
        	perror("Could not create semaphore!");
        	exit(1);
    	}

	pid_t childPid;  
	childPid = fork();
	if(childPid < 0)
	{
		printf("Fork error\n");
		sem_close(sem);
        	sem_unlink(SEM_NAME);
		return EXIT_FAILURE;
	}
		
	
	if (childPid != 0)
	{
		mq=mq_open(QUEUE_NAME,O_RDWR | O_CREAT, 0666,&attr);
		printf("%d \n",mq);

		
		int i=0;
		struct message client_message;
		mq_receive(mq,&client_message,sizeof(client_message),i);
		
		

		int fd = shm_open( client_message.name, O_RDWR, 0777 );
		int *addr = (int*) mmap(0, client_message.size*sizeof(int), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);

		if(client_message.dir == SORT_ASC)
		{
			for(int i = 0; i < client_message.size; i++)
			{
				for(int j = 0 ; j < client_message.size - 1 - i ; j++)
				{
					if(client_message.dir==SORT_ASC ? (addr[j] > addr[j+1]) : (addr[j] < addr[j+1]))
					{
						int temp = addr[j];
						addr[j] = addr[j+1];
						addr[j+1] = temp;
					}
				}
			}
			
		}else if(client_message.dir == MAXMIN_NUMBER)
		{
			int min = addr[0];
			int max = addr[0];
			for(int i = 1; i < client_message.size; i++)
			{
				if(addr[i] > max)
					max = addr[i];

				if(addr[i] < min)
					min = addr[i];
			}

			addr[0] = max;
			addr[1] = min;
		}
		munmap(addr, client_message.size*sizeof(int));
		close(fd);
		sem_post(semaphore);
		int returnStatus;    
    		waitpid(childPid, &returnStatus, 0);   

    		if (returnStatus == 0)  
    		{
       		   printf("The child process terminated normally.\n");    
    		}

		if (returnStatus == 1)      
		{
		    printf("The child process terminated with an error!.\n");    
		}
			

	}else{
		printf ("Client\n");
		mq=mq_open(QUEUE_NAME,O_RDWR , 0666,&attr);

		sem_t *semaphore = sem_open(SEM_NAME, 0);
		if (semaphore == SEM_FAILED) {
			perror("Could not create semaphore!");
			exit(1);
    		}
		char shared_memory_object_name[] = "/shared_memory_object";
		size_t shared_memory_object_size = 5;



		int fd = shm_open( shared_memory_object_name, O_RDWR | O_CREAT, 0777 );
		ftruncate(fd, shared_memory_object_size*sizeof(int));

		int *addr = (int*) mmap(0, shared_memory_object_size*sizeof(int), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);

		srand(time(NULL));
		for(int i = 0; i < shared_memory_object_size; i++)
			addr[i] = rand()%100;

		for(int i = 0; i < shared_memory_object_size; i++)
			printf("%d ", addr[i]);
		printf("\n");

		struct message msg;
		strcpy(msg.name, shared_memory_object_name);
		msg.size = shared_memory_object_size;
		msg.dir = SORT_ASC;

		int x;
		
		x=mq_send(mq,&msg,sizeof(msg),1);
		if(x<0)
		{
			printf("Send error\n");
			sem_close(sem);
        		sem_unlink(SEM_NAME);
			return EXIT_FAILURE;
		}
		sem_wait(semaphore);
			
		if(msg.dir == SORT_ASC | msg.dir == SORT_DESC)
		{
			for(int i = 0; i < shared_memory_object_size; i++)
				printf("%d ", addr[i]);
			printf("\n");
		}else
		{
			printf("MAKSIMUM %d , MINIMUM %d\n",addr[0],addr[1]);
		}
		munmap(addr, msg.size*sizeof(int));
		close(fd);
		sem_close(sem);
        	sem_unlink(SEM_NAME);
		
	
	}
	sem_close(semaphore);
	mq_close(mq);
	return EXIT_SUCCESS;
}
0

Nie masz tu przypadkiem race condition przy tworzeniu SHM ?
Prawie nigdzie nie sprawdzasz czy funkcja wykonała się poprawnie.

A tak poza tym - napisz to po ludzku a nie w jednej gigantycznej funkcji main()...

0

Race condition nie powinno być ponieważ mq_recieve blokuję proces aż do pojawienia się komunikatu w kolejce.

0

Masz rację, akurat z tym problemu nie ma - nie zauważyłem wczoraj tego mq_receive (używaj ```c do wklejania kodu )

Natomiast jest race przy tworzeniu kolejki - bo nie sprawdzasz w dziecku czy się udało. Jeśli się nie udało, to nie wyślesz potem komunikatu i rodzic będzie wisiał.

0

Zauważyłem że problem leży gdzie indziej. Mq_receive nie czeka na mq_send i zwraca -1. Wyświetliłem sobie error i pokazuje message to long. Jednak kompletnie nie wiem dlaczego tak się dzieje, nawet jak ustawie attr.mq_msgsize na wartość dużo wiekszą niż sizeof struktury. Mq_send zachowuje się normalnie.

0
attr.mq_msgsize = sizeof(struct message) +  sizeof(enum operation);
// ...
 mq_receive(mq,&client_message,sizeof(client_message),i);

Te rozmiary się zgadzają ? To co podajesz w mq_receive musi byc nie mniejsze niż attr.mq_msgsize

Dlaczego nie wstawisz typu operacji po prostu jako pierwszego pola komunikatu ?

3

Pisząc wszystko w main, ogarnięcie takiego tematu zajmie ci bardzo długo.
Biorąc pod uwagę treść zadania, to chcesz zajmować się programowaniem zawodowo, ergo taki styl kodowania jest nie do przyjęcia i im szybciej go wyplewisz tym lepiej.

Podziel kod na dużo mniejsze funkcje, a zadanie stanie się o niebo łatwiejsze.

0

Zgubiłeś funkcję mq_unlink i cały czas otwierasz kiedyś wcześniej utworzoną mq z innym rozmiarem mq_msgsize.

1 użytkowników online, w tym zalogowanych: 0, gości: 1