0 Linux – współbieżność, semafory
Tomasz Zaworski edited this page 2020-11-15 02:28:24 +01:00
  1. Wprowadzenie
  2. Przykłady i zadania
  3. Zadania domowe
  4. Przygotowanie do kolejnych zajęć

Wprowadzenie

Dzisiejsze zajęcia zostaną poświęcone problemowi semaforów i wątków POSIX. W odróżnieniu od przykładów w BACI, poniższe będą dotyczyły rzeczywistych programów (tzn. wykonywanych bezpośrednio na procesorze, a nie w interpreterze).

Trzeba zwrócić uwagę na istotną różnicę: w wielu przypadkach problem synchronizacji może występować dopiero po wielu milionach wykonań!

Ze względu na trudność analizy samego kodu, wykorzystamy narzędzie Valgrind, zawierające moduł analizy kodu wielowątkowego (Helgrind).

W przypadku bardziej zaawansowanych projektów trzeba wziąć pod uwagę także szczegóły konkretnej implementacji. Np. do sterowania wykonaniem wątku można stosować sygnały czasu rzeczywistego (choćby w celu przerwania oczekiwania na zwolnienie zasobu), jednak niektóre sygnały czasu rzeczywistego (RT) są zarezerwowane przez bibliotekę glibc (szczegóły w man 7 signal). Informacje te nie będa jednak potrzebne w trakcie dzisiejszych zajęć.

W przykładach wykorzystamy stosunkowo nową implementację semaforów (w Linuksie od wersji 2.6.19 z 29 listopada 2006), w starszych systemach dostępna była implementacia SysV (opisana między innymi tutaj).

Przy okazji omawiania semaforów zapoznamy się także z wątkami. W przypadku systemu Linux wątki są traktowane podobnie do procesów ze wspólną pamięcią. Na niskim poziomie są nawet tworzone za pomocą tej samej funkcji systemowej clone() (zob. man 2 clone) i można je kontrolowac np. za pomocą programu htop (po dodaniu kolumny NLWP będzie widać liczbę wątków danego procesu).

Uwaga: wszystkie przykłady z dzisiejszych ćwiczeń należy kompilować poleceniem

gcc -Wall -ansi -pthread nazwa_pliku.c -o nazwa_programu

Opcja -pthread dołącza potrzebne biblioteki.

$ ldd ./nazwa_programu
	linux-vdso.so.1 =>  (0x00007ffda69f1000)
	libpthread.so.0 => /lib/x86_64-linux-gnu/libpthread.so.0 (0x00007fcd90c92000)
	libc.so.6 => /lib/x86_64-linux-gnu/libc.so.6 (0x00007fcd908cb000)
	/lib64/ld-linux-x86-64.so.2 (0x000055d8be219000)

Uwaga 2: Jeżeli kod przykładu ma wyświetlone numery wierszy, należy najpierw je wyłączyć (link w prawym górnym rogu okienka z kodem), a dopiero potem kopiować.

Przykłady i zadania

Przykład 1

Przykład programu z dwoma wątkami (i niezabezpieczoną zmienną):

#include <stdio.h>
#include <pthread.h>                             /* A */

int var = 0;

/* ta funkcja będzie wywołana po starcie wątku */
void* child_fn ( void* arg ) {                   /* B */
   var++;
   return NULL;
}

int main ( void ) {
   pthread_t child;                              /* C */
   pthread_create(&child, NULL, child_fn, NULL); /* D */
   var++; /* ta sama zmienna jest wykorzystywana przez wątek potomny */
   pthread_join(child, NULL);                    /* E */
   printf("\n%d\n", var);
   return 0;
}
  • W wierszu drugim (A) dołączany jest plik nagłówkowy pthread.h zawierający (czasami pośrednio) nagłówki funkcji potrzebnych przy programowaniu wielowątkowym.
  • Wiersz 7 (B) definiuje funkcję wywoływaną po uruchomieniu wątku (taka funkcja main dla wątku), każdy wątek ma swoje własne zmienne lokalne, zatem funkcja to może zostać wykorzystana wielokrotnie w różnych wątkach.
  • Wiersz 13 (C) zawiera definicję zmiennej służącej do przechowywania ID wątku (64 bitowa liczba całkowita bez znaku).
  • Wiersz 14 (D) to wywołanie funkcji tworzącej nowy wątek (pthread_create). Pierwszy argument to zmienna, w której zapisany zostanie identyfikator wątku, drugi może zawierać atrybuty wątku (szczegóły w man pthread_attr_init), trzeci to wskaźnik do funkcji, która ma zostać wykonana w nowym wątku, a ostatni to wskaźnik na argumenty do tej funkcji. W przypadku, gdy funkcja pthread_create zwróci wartość różną od zera, wartość identyfikatora wątku jest niezdefiniowana (możliwe błędy opisane w man 3 pthread_create).
  • Wiersz 16 (E) zawiera wywołanie funkcji pthread_join, której pierwszym argumentem jest identyfikator wątku, a drugi może opcjonalnie (jeżeli nie jest NULL) zawierać wartość zwracaną przez pthread_exit lub stałą PTHREAD_CANCELED w przypadku wywołania na wątku funkcji pthread_cancel. Należy pamiętać, że funkcja pthread_join nie kończy wątku (w przypadku, gdy nadal działa, czeka na zakończenie). Nie jest też możliwe wywołanie funkcji pthread_join tak, by czekała na dowolny zakończony wątek tak, jak np. funkcja wait (w przypadku wątków taka funkcja nie miałaby wiele sensu).

Uwaga: prawdopodobnie nie uda się uzyskać wartości innej niż 2 w rozsądnej liczbie prób (a może nawet nigdy, gdyż wątki działają tak krótko, że będą wykonane na jednym rdzeniu procesora sekwencyjnie), aby uzyskać inną wartość należałoby istotnie zwiększyć liczbę wykonań! Na tym polega problem ze współbieżnością, problemy są trudne do zaobserwowania i czasami pojawiają się niespodziewanie po zmianie sprzętu bądź wersji bibliotek (tak np. było z funkcją fork() ze względu na różne podejścia do jej implementacji).

Przykład 2

Poprzedni przykład jest bardzo krótki, ale jak znaleźć problematyczną sytuację w bardziej skomplikowanym programie? Z pomocą przychodzi narzędzie valgrind, a dokładnie helgrind będący modułem valgrinda. Wywołanie valgrinda jest bardzo proste (zakładamy, że a.out to skompilowany powyższy przykład):

$ valgrind --tool=helgrind ./a.out
==25551== Helgrind, a thread error detector
==25551== Copyright (C) 2007-2015, and GNU GPL'd, by OpenWorks LLP et al.
==25551== Using Valgrind-3.12.0 and LibVEX; rerun with -h for copyright info
==25551== Command: ./a.out
==25551== 
==25551== ---Thread-Announcement------------------------------------------
==25551== 
==25551== Thread #1 is the program's root thread
==25551== 
==25551== ---Thread-Announcement------------------------------------------
==25551== 
==25551== Thread #2 was created
==25551==    at 0x5166D4E: clone (clone.S:73)
==25551==    by 0x4E46141: create_thread (createthread.c:100)
==25551==    by 0x4E47EAB: pthread_create@@GLIBC_2.2.5 (pthread_create.c:813)
==25551==    by 0x4C34A87: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==25551==    by 0x108821: main (in /tmp/a.out)
==25551== 
==25551== ----------------------------------------------------------------
==25551== 
==25551== Possible data race during read of size 4 at 0x309014 by thread #1
==25551== Locks held: none
==25551==    at 0x108822: main (in /tmp/a.out)
==25551== 
==25551== This conflicts with a previous write of size 4 by thread #2
==25551== Locks held: none
==25551==    at 0x1087E1: child_fn (in /tmp/a.out)
==25551==    by 0x4C34C86: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==25551==    by 0x4E476D9: start_thread (pthread_create.c:456)
==25551==  Address 0x309014 is 0 bytes inside data symbol "var"
==25551== 
==25551== ----------------------------------------------------------------
==25551== 
==25551== Possible data race during write of size 4 at 0x309014 by thread #1
==25551== Locks held: none
==25551==    at 0x10882B: main (in /tmp/a.out)
==25551== 
==25551== This conflicts with a previous write of size 4 by thread #2
==25551== Locks held: none
==25551==    at 0x1087E1: child_fn (in /tmp/a.out)
==25551==    by 0x4C34C86: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==25551==    by 0x4E476D9: start_thread (pthread_create.c:456)
==25551==  Address 0x309014 is 0 bytes inside data symbol "var"
==25551== 

2
==25551== 
==25551== For counts of detected and suppressed errors, rerun with: -v
==25551=# Use --history-level=approx ornone to gain increased speed, at
==25551== the cost of reduced accuracy of conflicting-access information
==25551== ERROR SUMMARY: 2 errors from 2 contexts (suppressed: 0 from 0)

Zrozumienie wyjścia polecenia może być nieco trudniejsze, szczególnie gdy używamy programów i bibliotek z usuniętymi nazwami symboli (stripped, zobacz man strip, albo opis opcji -g kompilatora). Widzimy, że wykryte zostały dwa wątki (Thread #1 i Thread #2), które (linie 22 i 35) mogą spowodować kolizję na zapisie do zmiennej var o adresie 0x309014. W wierszach 46-47 znajduje się to, co wypisał program (nie jest poprzedzane PIDem procesu).

Przykład 3

Zanim przejdziemy do zadań, postaramy się „poprawić” przykład tak, by nie zawierał błędu (sytuacja taka nie musi być błędem, jeżeli koszt synchronizacji jest wysoki, to często opłaca się dopuścić błąd, bądź obsłużyć go w inny sposób).

Aby dokonać "naprawy" wykorzystamy mutex.

#include <stdio.h>
#include <pthread.h>

pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
int var = 0;

void* child_fn ( void* arg ) {
   pthread_mutex_lock(&mut);
   var++;
   pthread_mutex_unlock(&mut);
   return NULL;
}

int main ( void ) {
   pthread_t child;
   pthread_create(&child, NULL, child_fn, NULL);
   pthread_mutex_lock(&mut);
   var++;
   pthread_mutex_unlock(&mut);
   pthread_join(child, NULL);
   printf("\n%d\n", var);
   return 0;
}

W ten sposób w danym momencie co najwyżej jeden wątek będzie miał dostęp do zmiennej (należy pamiętać, że nie chroni to w żaden sposób przed modyfikacją zmiennej w innym miejscu programu, odpowiedzialność spoczywa na programiście). Wykorzystane zostały funkcje pthread_mutex_lock i pthread_mutex_unlock. Pierwsza z nich usiłuje uzyskać blokadę i czeka, jeżeli w danym momencie nie jest to możliwe, a druga zwalnia blokadę (na zmiennej mut) (możliwe też są inne zachowania, szczegóły we wspomnianej dokumentacji).

Zadanie 1

Skompiluj powyższy przykład i sprawdź, czy po modyfikacji kodu Valgrind nie zgłasza problemów.

Zadanie 2

Poniższy kod został zmodyfikoway w stosunku do oryginału tak, by zwiększyć prawdopodobieństwo kolizji (10000 inkrementacji zmiennej var).

#include <stdio.h>
#include <pthread.h>
#define ITERACJE 10000
int var = 0;

/* ta funkcja będzie wywołana po starcie wątk */
void* child_fn ( void* arg ) {
   int i;
   for(i=0;i<ITERACJE;i++) var++;
   return NULL;
}

int main ( void ) {
   int i;
   pthread_t child;
   pthread_create(&child, NULL, child_fn, NULL);
   for(i=0;i<ITERACJE;i++) var++;
   pthread_join(child, NULL);
   printf("\n%d =? %d\n", var, 2 * ITERACJE);
   return 0;
}

Sprawdź dla jakiej liczby iteracji (makro ITERACJE) większość wyników jest prawidłowa.

Zadanie 3

Za pomocą programu htop sprawdź jakie procesy wielowątkowe działają aktualnie w systemie, spróbuj też sprawdzić jak to wygląda w przypadku przykładu z poprzedniego zadania (konieczne może okazać się znaczne wydłużenie czasu działania procesu, np. przez dodanie funkcji wait lub zwiększenie liczby iteracji).

Przykład 4

Oczywiście czasami potrzeba więcej niż semafora binarnego w jednym procesie. W takim przypadku możemy wykorzystać rozwiązanie oparte o semafory. Dla uproszczenia opisu zajmiemy się wyłącznie semaforami nienazwanymi (nazwane wymagają dodatkowo unikalnej nazwy) i wyłącznie przypadkiem wątków jednego procesu (w przypadku wielu procesów potrzebny jest np. wspólny obszar pamięci).

Poniższy przykład pochodzi ze sekcji 6 posix-semaphores (dla zainteresowanych znajduje się tam także wersja z nazwanymi semaforami). Kod został zmodyfikowany tak, by był zgodny z ANSI C i działał w miarę poprawnie.

/*
 *
 *   posix-unnamed-semaphore-example.c: Program to demonstrate POSIX 
 *                                      unnamed semaphores in C under 
 *                                      Linux (Producer - Consumer problem)
 */

#include <sys/types.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <time.h>
#include <errno.h>
#include <pthread.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <semaphore.h>
#include <unistd.h>

/* Buffer data structures */
#define MAX_BUFFERS 10
char buf [MAX_BUFFERS] [100];
int buffer_index;
int buffer_print_index;

sem_t mutex_sem, buffer_count_sem, spool_signal_sem;

void *producer (void *arg);
void *spooler (void *arg);

int main (int argc, char **argv)
{
    pthread_t tid_producer [10], tid_spooler;
    int i, r;

    /* initialization */
    buffer_index # buffer_print_index 0;

    /*  mutual exclusion semaphore, mutex_sem with an initial value 1. */
    if (sem_init (&mutex_sem, 0, 1) == -1) {
        perror ("sem_init"); exit (1);
    }
    
    /* counting semaphore, indicating the number of available buffers. Initial value = MAX_BUFFERS */
    if (sem_init (&buffer_count_sem, 0, MAX_BUFFERS) == -1) {
        perror ("sem_init"); exit (1);
    }

    /* counting semaphore, indicating the number of strings to be printed. Initial value = 0 */
    if (sem_init (&spool_signal_sem, 0, 0) == -1) {
        perror ("sem_init"); exit (1);
    }

    /* Create spooler */
    if ((r = pthread_create (&tid_spooler, NULL, spooler, NULL)) != 0) {
        fprintf (stderr, "Error = %d (%s)\n", r, strerror (r)); exit (1);
    }

    /* Create 10 producer threads */
    int thread_no [10];
    for (i = 0; i < 10; i++) {
        thread_no [i] = i;
        if ((r = pthread_create (&tid_producer [i], NULL, producer, (void *) &thread_no [i])) != 0) {
            fprintf (stderr, "Error = %d (%s)\n", r, strerror (r)); exit (1);
        }
    }
    /* Wait for producers to terminate */
    for (i = 0; i < 10; i++)
        if ((r # pthread_join (tid_producer [i], NULL))= -1) {
            fprintf (stderr, "Error = %d (%s)\n", r, strerror (r)); exit (1);
        }
    
    /* No more strings to print? Wait for spool_signal_sem to become 0 */
    int semval;
    while (1) {
        if (sem_getvalue (&spool_signal_sem, &semval)== -1)
            perror ("sem_getvalue");
        if (!semval) break;
        sleep (1);
    }
    /* terminate spooler */
    if ((r = pthread_cancel (tid_spooler)) != 0) {
        fprintf (stderr, "Error = %d (%s)\n", r, strerror (r)); exit (1);
    }
    
    /* Destroy semaphores */

    if (sem_destroy (&mutex_sem) == -1) {
        perror ("sem_destroy"); exit (1);
    }

    if (sem_destroy (&mutex_sem) == -1) {
        perror ("sem_destroy"); exit (1);
    }

    if (sem_destroy (&mutex_sem) == -1) {
        perror ("sem_destroy"); exit (1);
    }
    exit (0);
}

/* producer: produce strings for printing
   There might be multiple producer threads */
void *producer (void *arg)
{
    /* Create 10 strings and terminate */
    int i;
    int my_id = *((int *) arg);
    int count = 0;

    for (i = 0; i < 10; i++) {
        /* get a buffer: P (buffer_count_sem); */
        if (sem_wait (&buffer_count_sem) == -1) {
	    perror ("sem_wait: buffer_count_sem"); exit (1);
        }
    
        /* There might be multiple producers. We must ensure that 
            only one producer uses buffer_index at a time.
           P (mutex_sem); */
        if (sem_wait (&mutex_sem) == -1) {
	    perror ("sem_wait: mutex_sem"); exit (1);
        }

	    /* Critical section */
            int j = buffer_index;
            buffer_index++;
            if (buffer_index == MAX_BUFFERS)
                buffer_index = 0;

        /* Release mutex sem: V (mutex_sem) */
        if (sem_post (&mutex_sem) == -1) {
	    perror ("sem_post: mutex_sem"); exit (1);
        }
    
	/* Produce a string */
        sprintf (buf [j], "Thread %d: %d\n", my_id, ++count);
	/* Tell spooler that there is a string to print: V (spool_signal_sem); */
        if (sem_post (&spool_signal_sem) == -1) {
	    perror ("sem_post: spool_signal_sem"); exit (1);
        }
    
        /* Take a nap */
        sleep (1);
    }
    return NULL;
}

/* There is only one spooler thread */
void *spooler (void *arg)
{

    while (1) {  /* forever */
        /* Is there a string to print? P (spool_signal_sem); */
        if (sem_wait (&spool_signal_sem) == -1) {
	    perror ("sem_wait: spool_signal_sem"); exit (1);
        }
    
        printf ("%s", buf [buffer_print_index]);

        /* Since there is only one thread (spooler) using the 
           buffer_print_index, mutex semaphore is not necessary */
        buffer_print_index++;
        if (buffer_print_index == MAX_BUFFERS)
           buffer_print_index = 0;

        /* Contents of one buffer has been printed.
           One more buffer is available for use by producers.
           Release buffer: V (buffer_count_sem);  */
        if (sem_post (&buffer_count_sem) == -1) {
	    perror ("sem_post: buffer_count_sem"); exit (1);
        }
    }
    return NULL;
}
Zadanie 4

Przeanalizuj powyższy przykład i opisz krótko jego działanie. Przanalizuj go następnie za pomocą Valgrinda i spróbuj zidentyfikować ew. problemy w kodzie.

Podsumowanie

Na dzisiejszych zajęciach omówione zostały podstawy wątków w systemie Linux, wraz z mechanizmami blokowania (ograniczyliśmy się do metod zgodnych z POSIX). Przy okazji omawiania wątków przedstawione zostało narzędzie do analizy programów wielowątkowych (i nie tylko).

Warto pamiętać, że znajomość narzędzi blokowania zasobów jest bardzo istotna, szczególnie jeżeli zadania magą być wykonywane jednocześnie, jednak jeszcze lepszym rozwiązaniem jest takie projektowanie algorytmów, by nie wymagały blokowania, bądź były odporne na błędy związane z jednoczesnym dostępem. Jest to szczególnie istotne w przypadku implementacji rozproszonych (np. chmury obliczeniowe), w których rygorystyczne podejście do synchronizacji może mieć fatalny wpływ na wydajność systemu. Informacje na temat algorytmów lock-free można znaleźć m.in. tutaj. W przypadku języka C istnieją specjalne wersje niektórych operacji, które gwarantują nieprzerwane wykonanie (dopiero w wersji C11, z 12 sierpnia 2011 roku). Należy zauważyć, że czasami wystarczy tylko niewielka zmiana podejścia do problemu, by blokady nie były konieczne. Należy też mieć świadomość, że czasami, w przypadku wielu wątków pracujących jednocześnie, można zablokować proces nawet wtedy, gdy blokady nie są stosowane! Sytuację taką ilustruje poniższy kawałek kodu (jeżeli wykonywany w więcej niż jednym wątku jednocześnie).

int x;
x=10;
while(x==0) x--;

Poza procesami i wątkami systemu operacyjnego istnieją także inne rozwiązania, których wydajność zależy od umiejętności stworzenia algorytmu równoległego nie wymagającego częstych synchronizacji (np. programowanie procesorów kart graficznych, w których program może być wykonywany w tysiącach jednoczesnych wątków, jeżeli wszystkie wątki będą wykonywały ten sam kod - w przeciwnym razie warianty zostaną wykonane sekwencyjnie).

Zadania domowe

Zadanie domowe 1

Sprawdź jakie są czasy obliczenia wartości następującego wyrażenia (1 mld podniesień do kwardratu):

for(i=0;i<1000000000;i++) sum=(sum+i*i) % 1024;

dla różnej liczby wątków (liczba wątków powinna zostać podana jako parametr programu). Program powinien działać poprawnie co najmniej w zakresie od 1 do 16 wątków. Oczywiście wynik programu (wynik wyrażenia) powinien być ten sam, niezależnie od liczby wykorzystanych wątków! Do pomiaru czasu można wykorzystać zarówno polecenie time, jak i funkcję gettimeofday() wywołaną na początku procesu i na końcu, po zakończeniu wszystkich wątków. Do odpowiedzi załącz kod programu oraz zmierzone czasy.

Zadanie domowe 2

Wykorzystując informacje z ćwiczeń zaimplementuj w C rozwiązanie problemu ucztujących filozofów.

Przygotowanie do kolejnych zajęć

W ramach przygotowania do kolejnych zajęć proszę przerobić ze strony Linux Journey materiały z sekcji Devices oraz Kernel.