/* Mamy 3 procesy N - wątkowe uruchomione w klastrze komputerów – na różnych węzłach. Procesy są zorganizowane w listę: Proces1.Watek1->Proces2.Watek1->Proces3.Watek1-> Proces1.Watek2->Proces2.Watek2->Proces3.Watek2->..-> Proces1.WatekN->Proces2.WatekN->Proces3.WatekN->Proces1.Watek1 Między procesami i wątkami „wędruje” liczba całkowita, która za każdym przesłaniem jest zwiększana o 1. Jak liczba zwiększona 10000 razy, wówczas wszystkie procesy i wątki się kończą. */ #include #include #include #include #include #include #include #include #include #include #include #include #include char wlasneIP[] = "127.0.0.1"; struct sockaddr_in adresDlaSocketuOczekujacegoNaPolaczenia, adresDlaSocketuDoWysylania, adresDlaSocketuKlienta; char errorBuffer[300]; int socketOczekujacyNaPolaczenia, socketDoWysylania, socketKlienta; int wielkoscPakietu; int iloscProcesowToken; int portDoNasluchiwania; char* targetIP; int portDoKtoregoMaWysylac; int iloscProcesowNaStarcie; int startowyNumerProcesu; void handle_error(const char* msg) { fprintf(stderr, "%s\n", msg); perror(NULL); close(socketOczekujacyNaPolaczenia); close(socketDoWysylania); close(socketKlienta); exit(EXIT_FAILURE); } void utworzSocketNasluchujacyNaPorcie(int port) { socketOczekujacyNaPolaczenia = socket(AF_INET, SOCK_STREAM, 0); if(socketOczekujacyNaPolaczenia == -1) { handle_error("Nie udalo sie utworzyc socketu nasluchujacego."); } int optval = 1; setsockopt(socketOczekujacyNaPolaczenia, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof optval); bzero((char *) &adresDlaSocketuOczekujacegoNaPolaczenia, sizeof(adresDlaSocketuOczekujacegoNaPolaczenia)); adresDlaSocketuOczekujacegoNaPolaczenia.sin_family = AF_INET; adresDlaSocketuOczekujacegoNaPolaczenia.sin_addr.s_addr = INADDR_ANY; adresDlaSocketuOczekujacegoNaPolaczenia.sin_port = htons(port); if(bind(socketOczekujacyNaPolaczenia, (struct sockaddr *) &adresDlaSocketuOczekujacegoNaPolaczenia, sizeof(adresDlaSocketuOczekujacegoNaPolaczenia)) == -1) { sprintf(errorBuffer, "Nie udalo sie powiazac socketu nasluchujacego z INADDR_ANY na porcie %d.", port); handle_error(errorBuffer); } if(listen(socketOczekujacyNaPolaczenia, 2) == 1) { handle_error("Nie udalo sie rozpoczac nasluchiwania na sockecie."); } } void utworzPolaczenieDoWysylaniaTokenu(char* ip, int port) { socketDoWysylania = socket(AF_INET, SOCK_STREAM, 0); if(socketDoWysylania == -1) { handle_error("Nie udalo sie utworzyc socketu wysylajacego."); } bzero((char *) &adresDlaSocketuDoWysylania, sizeof(adresDlaSocketuDoWysylania)); adresDlaSocketuDoWysylania.sin_family = AF_INET; adresDlaSocketuDoWysylania.sin_addr.s_addr = inet_addr(ip); adresDlaSocketuDoWysylania.sin_port = htons(port); while(1) { if(connect(socketDoWysylania, (struct sockaddr *) &adresDlaSocketuDoWysylania, sizeof(adresDlaSocketuDoWysylania)) == -1) { if(errno != ECONNREFUSED) { sprintf(errorBuffer, "Nie udalo sie polaczyc z socketu wysylajacego do IP %s do portu %d.", ip, port); handle_error(errorBuffer); } else { fprintf(stderr, "Proba polaczenia z socketu wysylajacego do IP %s do portu %d nie powiodla sie, polaczenie odrzucone. Czekam chwile.\n", ip, port); } // 0.1 sec miedzy probami kolejnych polaczen usleep(100000); } else { break; } } fprintf(stderr, "Polaczono socket wysylajacy do IP %s na port %d z listenera %d.\n", ip, port, portDoNasluchiwania); } void czekajNaPolaczenie() { wielkoscPakietu = sizeof(struct sockaddr_in); socketKlienta = accept(socketOczekujacyNaPolaczenia,(struct sockaddr *) &adresDlaSocketuKlienta, (socklen_t*) &wielkoscPakietu); if(socketKlienta == -1) { handle_error("Akceptowanie polaczenia nie powiodlo sie."); } fprintf(stderr, "Klient z IP %s i portu %d\n", inet_ntoa(adresDlaSocketuKlienta.sin_addr), ntohs(adresDlaSocketuKlienta.sin_port)); } void wyslijToken() { int wyslaneBajty = 0; int intDoNeta = htonl(iloscProcesowToken); if((wyslaneBajty = send(socketDoWysylania, (const char*) &intDoNeta, 4, 0)) == -1 || wyslaneBajty != 4) { handle_error("Wysylanie inta sie nie powiodlo."); } fprintf(stderr, "Wyslano token %d\n", iloscProcesowToken); } void odbierzToken() { int odebraneBajty = 0; if((odebraneBajty = recv(socketKlienta, (void*) &iloscProcesowToken, 4, 0)) == -1 || odebraneBajty != 4) { handle_error("Odbieranie inta sie nie powiodlo."); } iloscProcesowToken = ntohl(iloscProcesowToken); fprintf(stderr, "Odebrano token %d\n", iloscProcesowToken); } int main(int argc, char *argv[]) { if(argc != 6) { handle_error("Program wymaga 5 parametrow."); } portDoNasluchiwania = atoi(argv[1]); targetIP = argv[2]; portDoKtoregoMaWysylac = atoi(argv[3]); iloscProcesowNaStarcie = atoi(argv[4]); startowyNumerProcesu = atoi(argv[5]); iloscProcesowToken = iloscProcesowNaStarcie; int startowyPortDoNasluchiwania = portDoNasluchiwania; // utworz socket oczekujacy na polaczenia utworzSocketNasluchujacyNaPorcie(portDoNasluchiwania); if(startowyNumerProcesu == 0) // pierwszy proces { // polacz sie z celem utworzPolaczenieDoWysylaniaTokenu(targetIP, portDoKtoregoMaWysylac); // czekaj na polaczenie czekajNaPolaczenie(); // wyslij do celu token (ilosc procesow) wyslijToken(); } else // inny niz pierwszy proces { // czekaj na polaczenie czekajNaPolaczenie(); // polacz sie z celem utworzPolaczenieDoWysylaniaTokenu(targetIP, portDoKtoregoMaWysylac); } while(1) { usleep(100000); // odbierz liczbe odbierzToken(); if(iloscProcesowToken >= iloscProcesowNaStarcie * 2) { // osiagnieto limit procesow, wyslij nastepnemu na liscie informacje o tym i zakoncz program usleep(100000); wyslijToken(); break; } if(rand() % 2 == 0) { // dodaj 1 proces do listy przed aktualnym procesem int wolnyPortDoKomunikacjiMiedzyDzieckiemRodzicem = startowyPortDoNasluchiwania + iloscProcesowToken; pid_t czyToDziecko = fork(); if(czyToDziecko == 0) // dziecko { portDoKtoregoMaWysylac = wolnyPortDoKomunikacjiMiedzyDzieckiemRodzicem; // polacz sie z rodzicem utworzPolaczenieDoWysylaniaTokenu(wlasneIP, portDoKtoregoMaWysylac); usleep(100000); } else // rodzic { portDoNasluchiwania = wolnyPortDoKomunikacjiMiedzyDzieckiemRodzicem; // utworz socket oczekujacy na polaczenie od dziecka utworzSocketNasluchujacyNaPorcie(portDoNasluchiwania); usleep(100000); // czekaj na polaczenie od dziecka czekajNaPolaczenie(); usleep(100000); // wprowadz nowa ilosc procesow na liscie iloscProcesowToken++; // wyslij token (liczbe procesow) wyslijToken(); usleep(100000); } } else { // wyslij token dalej wyslijToken(); usleep(100000); } } // zamykanie wszystkich socketow danego procesu close(socketKlienta); close(socketOczekujacyNaPolaczenia); close(socketDoWysylania); // komunikat na koniec fprintf(stderr, "Zamknieto sockety. ID startowe: %d, suma procesow: %d, nasluchiwal na porcie: %d, wysylal do: %d\n", startowyNumerProcesu, iloscProcesowToken, portDoNasluchiwania, portDoKtoregoMaWysylac); return 1; }