0 Linux – współbieżność, IPC
Tomasz Zaworski edited this page 2020-11-15 02:28:24 +01:00
  1. Wprowadzenie
  2. Przykłady i zadania
  3. Łącza nienazwane i nazwane
  4. Kolejki komunikatów
  5. Pamięć współdzielona
  6. Zadania domowe

Wprowadzenie

Dzisiejsze zajęcia poświęcone zostaną komunikacji międzyprocesowej (ang. inter-process communication - IPC). Jak wiemy procesy działające w systemie posiadają odrębną pamięć. Tak więc pojedynczy proces (nazwijmy go X) nie ma bezpośredniego dostępu do pamięci innego procesu Y.

Załóżmy, że chcemy napisać program wykonujący np. 16 niezależnych od siebie zadań, by następnie złączyć ich wyniki w jedną całość. Program ten wykonywany będzie na komputerze o 16 rdzeniach. Możemy napisać program, który wykonuje kolejno 16 niezależnych obliczeń na jednym rdzeniu. Jednak uruchomienie 16 oddzielnych procesów, a następnie zebranie danych w jednym głównym procesie może być dużo szybsze. Do przesłania wyników do głównego procesu przyda nam się znajomość IPC.

Do komunikacji międzyprocesowej (IPC) zalicza się:

  • łącza nienazwane (tzw. potoki) i łącza nazwane (tzw. kolejki FIFO),
  • kolejki komunikatów,
  • pamięć współdzielona.

Przykłady i zadania

Wszystkie programy kompiluj i uruchamiaj na swojej maszynie wirtualnej na Proxmoxie.

Łącza nienazwane i nazwane

Podstawowe informacje

Łącza komunikacyjne w systemie LINUX są specjalnymi plikami służącymi do komunikacji pomiędzy procesami. Rozróżniamy ich dwa typy:

  • łącza nienazwane (nazywane potokami) - nie posiadają one nazwy, przez co dostęp do nich może mieć proces który utworzył potok lub proces potomny, do którego zostały przekazane deskryptory umożliwiające komunikację za pomocą tego łącza.
  • łącza nazwane (nazywane kolejkami FIFO) - posiadają dowiązanie w systemie plików i otwarcie łącza jest możliwe nawet przez niespokrewnione procesy.

Do obsługi łączy nienazwanych wykorzystuje się polecenie pipe. Natomiast do tworzenia łącz nazwanych wykorzystuje się polecenie mkfifo.

Przykład 1

Program pipe.c służy do otwarcia potoku, a następnie stworzenia procesu potomnego i przesłania danych do niego za pomocą potoku.

#include <sys/types.h>
#include <sys/wait.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>

int
main(int argc, char *argv[])
{
    int pipefd[2];
    pid_t cpid;
    char buf;

    if (argc != 2) {
        fprintf(stderr, "Usage: %s <string>\n", argv[0]);
        exit(EXIT_FAILURE);
    }

    if (pipe(pipefd) == -1) {   /* (A) */
        perror("pipe");
        exit(EXIT_FAILURE);
    }

    cpid = fork();              /* (B) */
    if (cpid == -1) {
        perror("fork");
        exit(EXIT_FAILURE);
    }

    if (cpid == 0) {    /* Child reads from pipe */
        close(pipefd[1]);       /* (C) */

        while (read(pipefd[0], &buf, 1) > 0)
            write(STDOUT_FILENO, &buf, 1);

        write(STDOUT_FILENO, "\n", 1);
        close(pipefd[0]);
        exit(EXIT_SUCCESS);

    } else {            /* Parent writes argv[1] to pipe */
        close(pipefd[0]);          /* (D) */
        write(pipefd[1], argv[1], strlen(argv[1]));
        close(pipefd[1]);
        wait(NULL);
        exit(EXIT_SUCCESS);
    }
}
  • W wierszu 20 (A): polecenie pipe(pipefd) tworzy łącze nienazwane(potok) i wpisuje dwa deskryptory kolejno do odczytu i zapisu w tablicy int pipefd[2].
  • Wiersz 25 (B): następuje utworzenie procesu potomnego. Proces potomny (na podstawie wartości zmiennej cpid) wykona kod programu z wierszy 32-39, a proces macierzysty wykona kod programu z wierszy 42 - 46.
  • Wiersze 32-39 (C): Proces potomny będzie nasłuchiwał na dane w potoku. Dlatego zamyka niepotrzebny deskryptor pipefd[1], gdyż służy on jedynie do zapisu do potoku. Następnie wykonuje znaną nam już z wcześniejszych zajęć konstrukcję odczytującą dane z pliku od deskryptorze pipefd[0].
  • Wiersze 42-46 (D): Zadaniem procesu macierzystego jest wysłanie danych do potoku. Dlatego zamyka on niepotrzebny deskryptor pipefd[0] służący jedynie do odczytu danych z potoku. Następnie wykonuje znane nam już polecenie write przekazując jako dane argument argv[1] przekazany podczas uruchomienia programu (./pipe)
  • Każdy z procesów po odczycie lub zapisie danych z/do potoku zamyka używane deskryptory.
Zadanie 1

Skompiluj program pipe.c za pomocą polecenia:

gcc pipe.c -o pipe

i uruchom go.

Przykład 2

Program fifo_writer.c służy do zapisu danych do kolejki FIFO.

#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>

int main()
{
    int fd;
    char * myfifo = "/tmp/myfifo";

    /* create the FIFO (named pipe) */
    mkfifo(myfifo, 0666);

    /* write "Hi" to the FIFO */
    fd = open(myfifo, O_WRONLY);

    int i;
    for(i=0; i<5; i++){
       write(fd, "Hi", sizeof("Hi"));
       sleep(1);
    }
    close(fd);

    /* remove the FIFO */
    unlink(myfifo);

    return 0;
}

Program fifo_reader.c służy do odczytu danych z kolejki FIFO:

#include <fcntl.h>
#include <stdio.h>
#include <sys/stat.h>
#include <unistd.h>
#include <string.h>

#define MAX_BUF 1024

int main()
{
    int fd;
    char * myfifo = "/tmp/myfifo";
    char buf[MAX_BUF];

    /* open, read, and display the message from the FIFO */
    fd = open(myfifo, O_RDONLY);
    memset(buf, 0, MAX_BUF);
    while (read(fd, buf, MAX_BUF) != 0) {
       printf("Received: %s\n", buf);
       memset(buf, 0, MAX_BUF);
    }
    close(fd);

    return 0;
}
Zadanie 2

Skompiluj oba programy i uruchom najpierw program fifo_writer, a następnie fifo_reader. Zauważ, że program fifo_writer zatrzymuje działanie do czasu uruchomienia programu fifo_reader. Wynika to z faktu, że wszelkie operacje wejścia/wyjścia są blokowane do czasu, aż plik specjalny zostanie otwarty zarówno do odczytu jak i zapisu. Ponadto operacje write, read są blokowane w przypadku, gdy łącze jest odpowiednio pełne, puste.

Kolejki komunikatów

Podstawowe informacje

Kolejki komunikatów służą do wymiany komunikatów pomiędzy procesami (tzw. IPC). Każda kolejka posiada określoną pojemność (maksymalną liczbę przechowywanych komunikatów) oraz nazwę po której można kolejkę zidentyfikować. W naszych przykładach wykorzystamy implementację kolejek za pomocą interfejsu POSIX. Ponadto do/z kolejki komunikatów może zapisywać/odczytywać wiele procesów jednocześnie. oraz istnieje możliwość ustawienia priorytetu wiadomości. Komunikaty o wyższym priorytecie są odczytywane z kolejki komunikatów przed innymi wiadomościami.

Podstawowymi operacjami wykorzystywanymi podczas obsługi kolejki komunikatów są:

  • mq_open - pozwala utworzyć kolejkę komunikatów.
  • mq_receive - służy do odczytania komunikatu z kolejki
  • mq_getattr - pozwala odczytać aktualny stan kolejki (np. czy są jeszcze jakieś nieodczytane komunikaty.
  • mq_close - zamknięcie kolejki.
  • mq_unlink - usunięcie kolejki.
  • mq_notify - pozwala na otrzymywanie asynchronicznych powiadomień kiedy zostanie dodana wiadomość do kolejki (my nie będziemy wykorzystywać tej funkcji).
Przykład 3

Przykładowy program mq_write.c. Pozwala on na utworzenie kolejki komunikatów oraz wpisanie do niej 10 wiadomości o różnym priorytecie.

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <sys/stat.h>
#include <sys/wait.h>

#define MESSAGE_COUNT 10
#define TEXT_SIZE 50

char text_message[TEXT_SIZE];

struct message {                       /* (A) */
  char mtext[TEXT_SIZE];
};

struct mq_attr message_attr = {        /* (B) */
  .mq_maxmsg = MESSAGE_COUNT,
  .mq_msgsize = sizeof(struct message)
};

const char *queue_name = "/queue";


int main(int argc, char *argv[]){
  mqd_t mqid = mq_open(queue_name,  O_CREAT | O_RDWR, 0666, &message_attr);   /* (C) */
  if (mqid == (mqd_t) -1) {
    perror("mq_open"); exit(1);
  }

  int i;
  for (i=0; i<MESSAGE_COUNT; i++) {    /* (D) */
    sprintf(text_message, "Message id = %d with priority %d", i, i);
    int result = mq_send(mqid, text_message, strlen(text_message) + 1, i);
    if (result == -1) {
      perror("mq_send");
    }
    sleep(1);
  }

  mq_close(mqid);                      /* (E) */

  return 0;
}
  • Wiersze 14 - 16 (A): definicja struktury wiadomości (w naszym przypadku każda wiadomość będzie zawierać jedynie tekst o maksymalnej długości 50 znaków.
  • Wiersze 18 - 21 (B): zmienna message_attr będzie zawierać informację potrzebne podczas tworzenia kolejki (np. maksymalną liczbę komunikatów oraz długość komunikatu).
  • Wiersz 28 (C): utworzenie kolejki do zapisu i odczytu.
  • Wiersze 34-25 (D): przygotowanie i wysłanie wiadomości do kolejki o identyfikatorze mqid. W każdej iteracji pętli wiadomość ma kolejny numer oraz wyższy priorytet.
  • Wiersz 42 (E): zamknięcie kolejki.
Przykład 4

Przykładowy program mq_read.c. Pozwala on na odebraniu z kolejki kolejki wszystkich aktualnie zapisanych komunikatów.

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <sys/stat.h>
#include <sys/wait.h>

#define MESSAGE_COUNT 10
#define TEXT_SIZE 50

struct message {
  char mtext[TEXT_SIZE];
};


int main(int argc, char *argv[]) {
  const char *queue_name = "/queue";

  struct mq_attr message_attr = {
    .mq_maxmsg = MESSAGE_COUNT, 
    .mq_msgsize = sizeof(struct message)
  };

  mqd_t mqid = mq_open(queue_name,  O_RDONLY, 0666, &message_attr);     /* (A) */

  if (mqid == (mqd_t) -1) {
    perror("mq_open"); exit(1);
  }

  do {
    unsigned int priority;
    struct message msg;
    int len = mq_receive(mqid, (char *) &msg, sizeof(msg), &priority);   /* (B) */
    if (len == -1) {
      perror("mq_receive");
      break;
    }
    printf("Otrzymalem wiadomosc (priorytet = %d): '%s'\n", priority, msg.mtext);
    int r = mq_getattr(mqid, &message_attr);                            /* (C) */
    if (r == -1) {
      perror("mq_getattr");
      break;
    }
  } while (message_attr.mq_curmsgs);

  mq_close(mqid);                                                       /* (D) */
  mq_unlink(queue_name);                                                /* (E) */
  return 0;
}

  • Wiersz 22 (A): otwarcie kolejki komunikatów tylko do odczytu (jeżeli kolejki nie było jest ona tworzona).
  • Wiersz 31 (B): odebranie pojedynczego komunikatu z kolejki. Komunikat jest umieszczony w zmiennej msg.
  • Wiersz 37 (C): odczytanie aktualnego stanu kolejki (w tym liczby komunikatów w kolejce).
  • Wiersz 44 (D): zamknięcie kolejki.
  • Wiersz 45 (E): usunięcie kolejki komunikatów.
Zadanie 3

Skompiluj oba przykłady dotyczące kolejek komunikatów. Oto przykładowy plik Makefile (pamiętaj o tabulacji!):

all: mq_read mq_write

mq_read: mq_read.c
	gcc $< -o $@ -lrt

mq_write: mq_write.c
	gcc $< -o $@ -lrt

Następnie uruchom je w następujący sposób:

./mq_write &

# poczekaj chwile

./mq_read

# poczekaj chwile
./mq_read

Dlaczego program mq_read nie odczytał wszystkich wiadomości (nawet po powtórnym uruchomieniu)?

Zadanie 4

Zmodyfikuj przykład tak by program mq_read.c nie usuwał kolejki komunikatów. Następnie wykonaj następujące polecenia:

./mq_write &

# poczekaj chwile
./mq_read
# poczekaj chwile
./mq_read
# poczekaj chwile
./mq_read
# poczekaj chwile
./mq_read

Teraz powtórne uruchomienie programu pozwala na odczytanie kolejnych wiadomości. Zwróć również uwagę w jakiej kolejności są odczytywane wiadomości z kolejki komunikatów.

Pamięć współdzielona

Podstawowe informacje

Pamięć współdzielona to zadeklarowany obszar pamięci w systemie operacyjnym, z którego może jednocześnie korzystać więcej niż jeden program. Pamięć współdzielona jest jednym z mechanizmów komunikacji międzyprocesami (IPC).

Jest to najszybszy sposób komunikacji międzyprocesowej. Wynika to z faktu, że wszystkie procesy mają dostęp do tej samej pamięci RAM, a więc nie jest potrzebne kopiowanie danych pomiędzy procesami. Ponadto operacje zapisu i odczytu takich danych odbywają się w trybie użytkownika, a więc nie są w tym celu wykorzystywane funkcje systemowe. Funkcje systemowe są wywoływane jedynie podczas zarządzania pamięcią współdzieloną, a nie podczas jej wykorzystywania.

W przykładach wykorzystamy implementację pamięci dzielonej za pomocą interfejsu POSIX.

Podstawowymi operacjami do obsługi pamięci dzielonej są następujące instrukcje:

  • shm_open - utworzenie pamięci dzielonej.
  • ftruncate - zmiana długości utworzonej pamięci dzielonej.
  • mmap - zamapowanie obszaru pamięci do pamięci dzielonej.
  • munmap -usunięcie mapowania pamięci do pamięci dzielonej.
  • shm_unlink - usunięcie pamięci dzielonej utworzonej poleceniem shm_open.
Przykład 5

Przykład programu shm_write.c tworzącego pamięć współdzieloną. Program czeka 10 sekund po utworzeniu pamięci dzielonej (eng. shared memory) i zapisuje tam "jakis tekst".

#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <sys/shm.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <errno.h>

#define TEXT_SIZE 512


struct shared_data {                                      /* (A) */
  int data;
  char some_text[TEXT_SIZE];
};


int main() {
  struct shared_data *ptr;
  const char *shm_name = "s123456";
  int shm_fd;
  shm_fd = shm_open(shm_name, O_CREAT | O_RDWR, 0666);    /* (B) */

  ftruncate(shm_fd, sizeof(struct shared_data));          /* (C) */

  ptr = mmap(0, sizeof(struct shared_data), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);  /* (D) */
  if (ptr == MAP_FAILED) {
    printf("mmap - failed.\n");
    return -1;
  }

  ptr->data = 0;                                          /* (E) */
  sleep(10);

  sprintf(ptr->some_text,"Jakis tekst");
  ptr->data = 1;

  sleep(10);

  if (munmap(ptr, sizeof(struct shared_data)) == -1) {    /* (F) */
    printf("unmap failed: %s\n", strerror(errno));
    exit(1);
  }

  if (close(shm_fd)) {
    printf("close failed: %s", strerror(errno));
    exit(1);
  }

  return 0;

}

  • W wierszu 14 - 17 (A) tworzona jest struktura. Przykład wykorzystania struktury dla zmiennej ptr (wpisania danych) znajduje się w liniach 34, 37-38.
  • W wierszu 24 (B) następuje utworzenie pamięci dzielonej (składnia jest podobna do składni poleceń z ćwiczeń wcześniejszych - tworzenia plików). Domyślnie po utworzeniu pamięć współdzielona ma długość 0 bajtów.
  • W wierszu 26 (C) zawiera polecenie zwiększenia zaalokowanej pamięci dzielonej do długości struktury: struct shared_data.
  • W wierszu 28 (D) następuje zamapowanie pamięci dzielonej. Jeżeli polecenie zakończy się sukcesem to od tego momentu zmieniając wartość pamięci wskazywanej przez ptr (np. ptr->data = 1) w rzeczywistości zapisujemy wartość 1 w pamięci dzielonej.
  • Wiersze 34 - 40 (E) wpisywanie danych do pamięci dzielonej (z zachowaniem elementów czasowych).
  • Wiersz 42 (F) - usunięcie mapowania pamięci do pamięci dzielonej.
Przykład 6

Przykład programu shm_read.c odczytującego dane z pamięci dzielonej utworzonej za pomocą przykładu 5.

#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <sys/shm.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <errno.h>

#define TEXT_SIZE 512


struct shared_data {                                   /* (A) */
  int data;
  char some_text[TEXT_SIZE];
};

int main(void)
{
  struct shared_data *ptr;
  const char *shm_name = "s123456";
  int shm_fd;
  shm_fd = shm_open(shm_name, O_RDONLY, 0666);         /* (B) */

  ptr = mmap(0, sizeof(struct shared_data), PROT_READ , MAP_SHARED, shm_fd, 0);  /* (C) */
  if (ptr == MAP_FAILED) {
    printf("mmap - failed.\n");
    return -1;
  }

//wait for data
  while(ptr->data==0){                                 /* (D) */
    sleep(1);
  };
  printf("Text: %s\n", ptr->some_text);

  if (munmap(ptr, sizeof(struct shared_data)) == -1) { /* (E) */
    printf("unmap failed: %s\n", strerror(errno));
    exit(1);
  }

  if (close(shm_fd)) {                                 /* (F) */
    printf("close failed: %s", strerror(errno));
    exit(1);
  }

  /* remove the shared memory segment from the file system */
  if (shm_unlink(shm_name) == -1) {                    /* (G) */
    printf("cons: Error removing %s: %s\n", shm_name, strerror(errno));
    exit(1);
  }

  return 0;
}

  • W wierszu 14 - 17 (A) tworzona jest struktura. Przykład wykorzystania struktury dla zmiennej ptr (wpisania danych) znajduje się w liniach 34, 37-38.
  • W wierszu 24 (B) następuje uzyskanie deskryptora do pamięci dzielonej (składnia jest podobna do składni poleceń z ćwiczeń wcześniejszych - tworzenia plików).
  • W wierszu 26 (C) następuje zamapowanie pamięci dzielonej. Jeżeli polecenie zakończy się sukcesem to od tego momentu zmieniając wartość pamięci wskazywanej przez ptr (np. ptr->data = 1) w rzeczywistości zapisujemy wartość 1 w pamięci dzielonej.
  • Wiersze 33 - 36 (D) czekanie na dane w pamięci dzielonej (program czeka na wartość różną od 0 w zmiennej ptr->data). Następnie wyświetlany jest tekst zapisany w zmiennej ptr->some_text.
  • Wiersz 38 (E) - usunięcie mapowania pamięci do pamięci dzielonej.
  • Wiersz 43 (F) - usunięcie deskryptora shm_fd.
  • Wiersz 49 (G) - usunięcie pamięci dzielonej utworzonej poleceniem shm_open.
Zadanie 5

Skompiluj, a następnie uruchom przykłady 5 i 6 w następujący sposób:

./shm_write &
./shm_read

Po zakończeniu działania kodu uruchom jeszcze raz program shm_read:

./shm_read

Co się stało? Wytłumacz wynik drugiego wykonania programu shm_read.

Zadanie 6

Zakomentuj w kodzie programu shm_read linię kodu z poleceniem shm_unlink. Skompiluj kod i wykonaj ponownie polecenia z zadania 5. Wytłumacz zmianę zachowania programu.

Zadania domowe

Zadanie domowe 1

Na podstawie przykładów dotyczących pamięci dzielonej stwórz dwa programy:

  • shm_text_send.c - program powinien umożliwiać wpisywanie tekstu z klawiatury i zapisywania go w pamięci dzielonej. Dodatkowo powinno istnieć zabezpieczenie przed nadpisaniem danych do czasu aż program shm_text_receive.c nie odbierze wiadomości. Wysłanie wiadomości "quit" powinno zakończyć działanie programu.
  • shm_text_receive.c - program powinien oczekiwać na dane wysyłane przez pamięć współdzieloną z programu shm_text_send.c. Po otrzymaniu danych powinien wyświetlić otrzymany tekst na ekranie. Jeżeli program otrzymał tekst "quit" powinien usunąć pamięć współdzieloną i zakończyć swoje działanie.
Zadanie domowe 2

Utwórz program, który tworzy kolejkę komunikatów, a następnie rozdziela się na 3 procesy:

  • proces zapisujący wiadomości: "1" , "2" , "3", "4", "5" do kolejki komunikatów z priorytetem równym 1.
  • proces zapisujący wiadomości: "11" , "12" , "13", "14", "15" do kolejki komunikatów z priorytetem równym 2.
  • proces konsument wyświetlający co sekundę jeden komunikat z kolejki.
Zadanie domowe 3

Napisz programy przekazujące 10 mln wiadomości o długości 128 bajtów za pomocą:

  • pamięci dzielonej,
  • kolejki komunikatów,

i porównaj czas przekazywania danych. Kody programów wraz z wykonywanymi czasami są rozwiązaniem zadania.