Approche détaillée des futex (partie 3/4)

Magazine
Marque
GNU/Linux Magazine
Numéro
176
Mois de parution
novembre 2014
Spécialité(s)


Résumé
Après avoir vu le fonctionnement des futex dans le noyau de Linux, ce troisième opus de la série revient sur l'interface utilisateur de l'appel système, qui propose de nombreuses options pour résoudre les problèmes régulièrement rencontrés dans les systèmes multi-thread.

Body

1. Héritage de priorité

Aux paragraphes 1 et 2 du précédent article de la série, il a été vu que le réveil des threads se faisait en respectant la priorité, afin de s'assurer que les threads temps réel soient réveillés dans l'ordre décroissant des priorités. Cependant, il faut aussi envisager les situations où des threads se mettent en attente sur un futex parce qu'un autre thread moins prioritaire a la main dessus.

1.1 Inversion de priorité

Considérons le cas d'un mutex basé sur un futex où un thread#1 de priorité donnée a pris la main sur le futex et qu'un thread#3 plus prioritaire se retrouve en attente sur ce même futex. Si un thread#2 de priorité intermédiaire entre les deux précédents vient à se réveiller, il y a de fortes chances que l'ordonnanceur de Linux préempte le thread#1 pour donner le CPU au thread#2. Cela revient à dire que le thread#3, normalement plus prioritaire que les deux autres, se retrouve privé de CPU à cause du thread#1 qui détient le futex et du thread#2 qui a pris la main car il n'est pas en attente sur le futex. Ce phénomène s'appelle l'inversion de priorité [1]. Il est rédhibitoire dans un environnement temps réel, car il nuit au déterminisme du système. Le listing suivant est un peu long, mais il illustre bien ce cas de figure.

[...]
#define futex_wait(addr, val) \
 syscall(SYS_futex, addr, FUTEX_WAIT, val, NULL)
#define futex_wakeup(addr, nb) \
 syscall(SYS_futex, addr, FUTEX_WAKE, nb)
int futex_var;
unsigned int nb_loops;
int policy = SCHED_FIFO;
pthread_barrier_t barrier;
static void nop(void)
{
 // Sur Intel
 asm("rep; nop;");
 //asm("nop;");
}
[...]
void *thread_1(void *p)
{
unsigned int        i;
struct sched_param  param;
cpu_set_t           cpu;

CPU_ZERO(&cpu);
CPU_SET(0, &cpu);

pthread_setaffinity_np(pthread_self(), sizeof(cpu), &cpu);
param.sched_priority = 1;
pthread_setschedparam(pthread_self(), policy, &param);

pthread_barrier_wait(&barrier);

for (i = 0; i < nb_loops; i ++)

{
 nop();
 }

futex_var = 1;

futex_wakeup(&futex_var, 1);

printf("Thread_1 termine\n");
 return NULL;
}
void *thread_2(void *p)
{
unsigned int        i;
struct sched_param  param;
cpu_set_t           cpu;

CPU_ZERO(&cpu);
CPU_SET(0, &cpu);

pthread_setaffinity_np(pthread_self(), sizeof(cpu), &cpu);
param.sched_priority = 50;
pthread_setschedparam(pthread_self(), policy, &param);

pthread_barrier_wait(&barrier);

for (i = 0; i < nb_loops; i ++)

{
 nop();
 }

printf("Thread_2 termine\n");
return NULL;
}
void *thread_3(void *p)
{
struct timeval     t0, t1, t;
unsigned int       latence;
struct sched_param param;
cpu_set_t          cpu;

CPU_ZERO(&cpu);
CPU_SET(0, &cpu);

pthread_setaffinity_np(pthread_self(), sizeof(cpu), &cpu);
param.sched_priority = 97;
pthread_setschedparam(pthread_self(), policy, &param);

pthread_barrier_wait(&barrier);

gettimeofday(&t0, NULL);

futex_wait(&futex_var, 0);

gettimeofday(&t1, NULL);
 timersub(&t1, &t0, &t);

latence = t.tv_sec * 1000 + t.tv_usec / 1000000;

printf("Latence : %u ms\n", latence);
printf("Thread_3 termine\n");
return NULL;
}
int main(int ac, char *av[])
{
pthread_t          tid1, tid2, tid3;
struct sched_param param;
cpu_set_t          cpu;

CPU_ZERO(&cpu);
CPU_SET(0, &cpu);

pthread_setaffinity_np(pthread_self(), sizeof(cpu), &cpu);
param.sched_priority = 99;
 sched_setscheduler(0, policy, &param);
 // Calibration
 nb_loops = calibration(atoi(av[1]));
 printf("Nombre d'iterations pour %u ms : %u\n", atoi(av[1]), nb_loops);
pthread_barrier_init(&barrier, NULL, 3);
pthread_create(&tid1, NULL, thread_1, NULL);
pthread_create(&tid2, NULL, thread_2, NULL);
pthread_create(&tid3, NULL, thread_3, NULL);
pthread_join(tid1, NULL);

pthread_join(tid2, NULL);

pthread_join(tid3, NULL);

return 0;
}

Le programme reçoit en paramètre un nombre de millisecondes qu'il convertit en nombre d'itérations pour occuper le CPU pendant cette durée (variable globale nb_loops mise à jour par une boucle de calibration). Le thread#1 de priorité 1, qui a la main sur le futex, effectue une attente active en effectuant toutes ces itérations. Le thread#3 se met en attente jusqu'au passage du futex à la valeur 1. Le thread#2 se met aussi en attente active pendant le nombre d'itérations demandées. Comme le thread#1 a une priorité égale à 1, qui est inférieure à la priorité du thread#2 (dont la priorité est égale à 50) et que le thread#3, plus prioritaire que les deux premiers, est en attente sur le futex, le thread#2 prend très rapidement la main pour exécuter sa boucle active. Quand il se termine, l'ordonnanceur de Linux donne la main au thread#1 pour qu'il termine sa boucle active avant de passer le futex à la valeur 1 pour réveiller le thread#3. Ce dernier affiche son temps de latence (durée d'attente sur le futex), qui est logiquement le résultat de la durée des boucles actives des thread#1 et thread#2.

Dans l'exemple d'exécution qui suit, le programme recevant 10000 millisecondes en paramètre, fait attendre le thread#3 environ 19000 millisecondes (cumul des durées des boucles actives de 10000 millisecondes exécutées dans les threads#2 et thread#1). Sur certains systèmes, cet exemple peut nécessiter les droits du super-utilisateur (e.g. commande sudo) pour fonctionner, car il utilise des threads temps réel :

$ gcc futex_pi.c -o futex_pi -lpthread
$ sudo ./futex_pi 10000
Nombre d'iterations pour 10000 ms : 505283000
Thread_2 termine

Latence : 19000 ms
Thread_3 termine
Thread_1 termine

Nous sommes donc bien confrontés à un phénomène d'inversion de priorité : le thread#3 a dû non seulement attendre la fin du thread#1, mais aussi la fin du thread#2, alors qu'il est plus prioritaire que ce dernier (priorité 97 contre 50) !

1.2 Solution

Il aurait fallu que le thread#1 ait une priorité au moins supérieure à celle du thread#2 afin d'éviter le phénomène d'inversion de priorité. Le listing suivant est une version du code précédent avec le thread#1 plus prioritaire que le thread#2 (la seule ligne qui diffère est en rouge).

[...]
void *thread_1(void *p)
{
unsigned int        i;
struct sched_param  param;
cpu_set_t           cpu;

CPU_ZERO(&cpu);
CPU_SET(0, &cpu);

pthread_setaffinity_np(pthread_self(), sizeof(cpu), &cpu);
param.sched_priority = 51;
pthread_setschedparam(pthread_self(), policy, &param);

[...]

La priorité 51 pour le thread#1 fait qu'il garde la main par rapport au thread#2 de priorité 50. Le premier exécute donc sa boucle active de 10000 millisecondes avant de passer le futex à 1 et de rendre le CPU au thread#3, qui s'exécute avant le thread#2 car il est plus prioritaire. Le temps de latence affiché n'est donc plus que d'environ 9000 millisecondes (temps d'exécution de la boucle active dans le thread#1) :

$ gcc futex_pi_1.c -o futex_pi_1 -lpthread
$ sudo ./futex_pi_1 10000
Nombre d'iterations pour 10000 ms : 505267000
Latence : 9000 ms

Thread_3 termine
Thread_1 termine
Thread_2 termine

Nous touchons du doigt une solution couramment adoptée pour réduire le temps de latence des threads : l'héritage de priorité [2]. Cela consiste à augmenter provisoirement la priorité du thread qui détient le futex au même niveau que celle du thread de priorité la plus importante en attente sur ce même futex. Ce changement dure le temps où le thread détient le futex. Dès qu'il le libère, il retrouve sa priorité d'origine. Ainsi, dans notre exemple, le thread#1 hériterait de la priorité du thread#3, qui étant plus importante que celle du thread#2, empêcherait ce dernier de préempter le thread#1.

Comme décrit dans les documents accompagnant ses sources (Documentation/pi-futex.txt et Documentation/futex-requeue-pi.txt), le noyau Linux offre l'héritage de priorité à travers les opérations suffixées par « PI » (i.e. Priority Inheritance) dans le fichier d'en-tête <linux/futex.h> :

[...]
#define FUTEX_LOCK_PI    6
#define FUTEX_UNLOCK_PI  7

#define FUTEX_TRYLOCK_PI 8

[...]

Le principe est similaire aux listes robustes pour ce qui est de la gestion du contenu de la variable du futex (cf. figure 4 du précédent article de la série).

En espace utilisateur, le verrouillage du futex se fait généralement à l'aide d'une opération atomique telle que __sync_val_compare_and_swap() vueen section 4.1du premier article de la série.Deux cas de figure peuvent se présenter :

- Cas sans appel au système : le futex a la valeur 0, qui signifie qu'il est libre. L'opération le positionne à la valeur de l'identifiant de thread courant pour signifier qu'il est verrouillé. Dans ce cas, aucun appel n'est fait au noyau sauf peut-être la demande de l'identifiant du thread courant. Le programmeur judicieux mémorisera cette valeur dans une variable de thread (i.e. TLS) ou en pile, afin de ne pas réitérer l'appel.

- Cas avec appel système : le futex est différent de 0, il contient (dans son poids faible tout au moins !) l'identifiant du thread qui l'a verrouillé. Le thread courant se met en attente en invoquant l'appel système futex() avec l'opération FUTEX_LOCK_PI. Dans le noyau, un mutex temps réel (rt-mutex) est associé au thread qui a verrouillé le futex, le bit 31 (FUTEX_WAITERS) du futex est positionné à 1 pour indiquer qu'il y a au moins un thread en attente et le thread appelant se met en attente sur lert-mutex tout juste créé. Le mutex temps réel est un objet qui gère l'héritage de priorité entre les threads qui sont en attente dessus.

Le déverrouillage du futex (qui ne peut se faire que par le thread qui l'a verrouillé !) est aussi réalisé à l'aide de l'opération atomique afin de le mettre à 0. Il peut aussi se présenter deux cas de figure :

- Le thread ne contient que l'identifiant de thread courant. Cela indique qu'il n'y a pas de thread en attente et donc, le futex est repassé à la valeur 0 pour indiquer qu'il est libre.

- Le thread contient l'identifiant de thread courant et le bit FUTEX_WAITERS est positionné pour signifier qu'au moins un thread est en attente sur le rt-mutex associé dans le noyau. Le thread courant invoque l'appel système futex() avec l'opération FUTEX_UNLOCK_PI pour déclencher le réveil du thread en attente le plus prioritaire (dans l'ordre FIFO en cas d'égalité de priorité) et positionner le futex à la valeur de l'identifiant du thread ainsi réveillé.

Le code suivant est une réécriture du programme décrit dans le premier listing, avec l'utilisation des opérations suffixées par PI. La fonction main() étant complètement identique, elle n'est pas reproduite ici.

[...]
#define futex_lock_pi(addr) \
 syscall(SYS_futex, addr, FUTEX_LOCK_PI, 0, 0)
#define futex_unlock_pi(addr) \
 syscall(SYS_futex, addr, FUTEX_UNLOCK_PI, 0, 0)
#define gettid() syscall(__NR_gettid)
int futex_var;
unsigned int nb_loops;
int policy = SCHED_FIFO;
pthread_barrier_t barrier;
static void nop(void)

{

//asm("rep; nop;");

asm("nop;");

}

[...]
void *thread_1(void *p)
{
unsigned int        i;
struct sched_param  param;
cpu_set_t           cpu;
int                 old;
int                 mytid = gettid();

CPU_ZERO(&cpu);

CPU_SET(0, &cpu);

pthread_setaffinity_np(pthread_self(), sizeof(cpu), &cpu);
param.sched_priority = 1;
 pthread_setschedparam(pthread_self(), policy, &param);

futex_var = mytid;

pthread_barrier_wait(&barrier);
 for (i = 0; i < nb_loops; i ++)
 {
 nop();
 }
 old = __sync_val_compare_and_swap(&futex_var, mytid, 0);
 if (mytid == old)
 {
   // Rien à faire car il n'y a pas de thread en attente sur le futex
   printf("Pas de thread en attente sur le futex\n");
 }
 else
 {
   // Il y a un thread en attente sur le futex (futex_var = FUTEX_WAITERS | mytid)
   futex_unlock_pi(&futex_var);
 }
 printf("0x%x : Thread_1 termine\n", mytid);
 return NULL;
}
void *thread_2(void *p)
{
unsigned int        i;
struct sched_param  param;
cpu_set_t           cpu;
int                 mytid = gettid();

CPU_ZERO(&cpu);

CPU_SET(0, &cpu);

pthread_setaffinity_np(pthread_self(), sizeof(cpu), &cpu);
param.sched_priority = 50;
 pthread_setschedparam(pthread_self(), policy, &param);

pthread_barrier_wait(&barrier);

for (i = 0; i < nb_loops; i ++)

{
  nop();
 }

printf("0x%x : Thread_2 termine\n", mytid);
 return NULL;
}
void *thread_3(void *p)
{
struct timeval     t0, t1, t;
unsigned int       latence;
struct sched_param param;
cpu_set_t          cpu;
int                old;
int                mytid = gettid();

CPU_ZERO(&cpu);
CPU_SET(0, &cpu);

pthread_setaffinity_np(pthread_self(), sizeof(cpu), &cpu);
param.sched_priority = 97;
 pthread_setschedparam(pthread_self(), policy, &param);

pthread_barrier_wait(&barrier);

gettimeofday(&t0, NULL);
 old = __sync_val_compare_and_swap(&futex_var, 0, mytid);
printf("0x%x : La valeur du futex avant LOCK est : 0x%x\n", mytid, futex_var);
 switch(old)
 {
   case 0 : // Le futex était libre
   {
     // Désormais le thread courant détient le futex
   }
   break;
   default : // Le futex n'est pas libre
   {
     futex_lock_pi(&futex_var);
   }
 }
printf("0x%x : La valeur du futex après LOCK est : 0x%x\n", mytid, futex_var);
futex_unlock_pi(&futex_var);
printf("0x%x : La valeur du futex après UNLOCK est : 0x%x\n", mytid, futex_var);

gettimeofday(&t1, NULL);

timersub(&t1, &t0, &t);
latence = t.tv_sec * 1000 + t.tv_usec / 1000000;
printf("0x%x : Latence : %u ms\n", mytid, latence);
printf("0x%x : Thread_3 termine\n", mytid);

return NULL;
}
[...]

Malgré la valeur 1 pour la priorité du thread#1, l'exécution donne le même résultat que le programme du listing 2 où la priorité avait été modifiée avec la valeur 51 pour ne pas préempter le thread#1 par le thread#2. En d'autres termes, le phénomène d'inversion de priorité a été évité, car la latence du thread#3 est aux alentours de 9000 millisecondes au lieu de 19000.

$ gcc futex_pi_2.c -o futex_pi_2 -lpthread
$ sudo ./futex_pi_2 10000
0x7b8 : Nombre d'iterations pour 10000 ms : 505298000
0x7bd : La valeur du futex avant LOCK est : 0x7bb
0x7bd : La valeur du futex après LOCK est : 0x800007bd
0x7bd : La valeur du futex après UNLOCK est : 0x0
0x7bd : Latence : 9000 ms
0x7bd : Thread_3 termine
0x7bc : Thread_2 termine
0x7bb : Thread_1 termine

On pourra remarquer que l'héritage de priorité concerne non seulement les threads temps réel, mais aussi les threads temps partagé. Le code suivant est une modification du listing précédent avec le thread#1 défini avec des paramètres d'ordonnancement temps partagé.

[...]
void *thread_1(void *p)
{
unsigned int        i;
struct sched_param  param;
cpu_set_t           cpu;
int                 old;
int                 mytid = gettid();

CPU_ZERO(&cpu);
CPU_SET(0, &cpu);

pthread_setaffinity_np(pthread_self(), sizeof(cpu), &cpu);
param.sched_priority = 0;
pthread_setschedparam(pthread_self(), SCHED_OTHER, &param);

[...]

L'exécution donne exactement le même résultat que précédemment, car le thread#1 temps partagé est promu avec la priorité temps réel du thread#3 dont la latence reste aux alentours de 9000 millisecondes.

$ gcc futex_pi_3.c -o futex_pi_3 -lpthread
$ sudo ./futex_pi_3 10000
0x817 : Nombre d'iterations pour 10000 ms : 505303000
0x81c : La valeur du futex avant LOCK est : 0x81a
0x81c : La valeur du futex après LOCK est : 0x8000081c
0x81c : La valeur du futex après UNLOCK est : 0x0
0x81c : Latence : 9000 ms
0x81c : Thread_3 termine
0x81a : Thread_1 termine
0x81b : Thread_2 termine

Dans la GLIBC/NPTL, c'est l'attribut PTHREAD_PRIO_INHERIT du service pthread_mutexattr_setprotocol() qui active l'héritage de priorité sur un mutex.

2. Multiplexage

L'opération FUTEX_WAIT_BITSET permet de constituer des files d'attente de threads sur un futex. L'identificateur de file est un ensemble de bits passé en sixième paramètre de l'appel système futex() : 

int futex(int *uaddr1, FUTEX_WAIT_BITSET, int nb_wake1, const struct timespec *timeout, NULL, int bitset)

Toute combinaison de 32 bits est autorisée, sauf la valeur 0, sous peine de recevoir le compte-rendu d'erreur EINVAL. Au sein du noyau Linux, ce paramètre est placé dans le champ bitset de la structure futex_q présentée en figure 1 du précédent article de la série. En d'autres termes, lorsqu'un thread se met en attente sur un futex, la structure futex_q qui lui est attachée dans le kernel est rangée dans la table de hachage futex_queues, comme pour tout autre appel au service futex() avec un code FUTEX_WAIT, mais le champ bitset va permettre de discriminer les threads à réveiller dans l'ordre du plus prioritaire au moins prioritaire lors de l'invocation de l'appel système avec le code FUTEX_WAKE_BITSET. Le réveil d'un thread a lieu si au moins un des bits à 1 de son ensemble de bits correspond à un des bits à 1 de l'ensemble passé à l'opération FUTEX_WAKE_BITSET (i.e. c'est le résultat d'une opération logique AND et non pas un test d'égalité). On déduit qu'il peut y avoir au maximum 32 files d'attente par futex, car c'est le nombre maximum de sous-ensembles disjoints sur 32 bits (c'est-à-dire tous les sous-ensembles avec des bits à 1 non communs aux autres, donc tous les sous-ensembles disjoints avec un seul bit à 1).
En réalité, la notion de file d'attente est inhérente à toute opération sur les futex, vu que le champ bitset est dans la structure futex_q commune à tous les futex. Cependant, ce champ est positionné par défaut à 0xFFFFFFFF pour toutes les opérations autres que FUTEX_WAIT_BITSET, de sorte que les threads en attente soient réveillés par toute opération de réveil. D'où la constante suivante, définie dans le fichier d'en-tête <linux/futex.h> :

/*
 * bitset with all bits set for the FUTEX_xxx_BITSET OPs to request a
 * match of any bit.
 */
#define FUTEX_BITSET_MATCH_ANY 0xffffffff

FUTEX_BITSET_MATCH_ANY, utilisée avec l'opération FUTEX_WAIT_BITSET, permet au thread appelant d'être réveillé non seulement par tout appel à futex() avec l'opération FUTEX_WAKE_BITSET, mais aussi avec n'importe quelle autre opération de réveil telle que FUTEX_WAKE par exemple. Utilisé avec l'opération FUTEX_WAKE_BITSET, il permet de réveiller tous les threads en attente avec un ensemble de bits quelconque. FUTEX_BITSET_MATCH_ANY est utilisée par défaut pour le champ bitset dans le noyau Linux grâce à la structure suivante d'initialisation d'une structure futex_q nouvellement allouée (cf. fichier .../kernel/futex.c) :

static const struct futex_q futex_q_init = {
 /* list gets initialized in queue_me()*/
 .key = FUTEX_KEY_INIT,
 .bitset = FUTEX_BITSET_MATCH_ANY

};

Le listing suivant donne un exemple d'application à travers l'implémentation de la notion de rwlock [3]. C'est un mutex particulier, qui autorise tout thread à accéder à une ressource partagée en lecture de manière concurrente, mais n'autorise qu'un thread à la fois pour l'accès à cette même ressource en écriture. Des explications sont données dans le code sous forme de commentaires :

[...]
#define futex_wait_bitset(addr, val, bitset) \
 syscall(SYS_futex, addr, FUTEX_WAIT_BITSET, val, 0, 0, bitset)
#define futex_wakeup_bitset(addr, nb, bitset) \
 syscall(SYS_futex, addr, FUTEX_WAKE_BITSET, nb, 0, 0, bitset)

struct
{

int          lock;
int          wait;
unsigned int readers;
 unsigned int waiting_readers;
 unsigned int writer;
 unsigned int waiting_writers;
} rwlock;
int nb_thread;

[...]
void RWLOCK_READ(void)
{

LOCK(&(rwlock.lock));
 while(1)
 {
  // S'il n'y a pas d’écrivain en cours et pas d’écrivain en attente

   // ==> Le lecteur prend le rwlock
   if (0 == rwlock.writer && 0 == rwlock.waiting_writers)
   {
     rwlock.wait = 1;
     rwlock.readers ++;
     UNLOCK(&(rwlock.lock));
     break;
   }
   else // Il y a un écrivain en cours ou en attente
   {
     // Un lecteur supplémentaire en attente
     rwlock.waiting_readers ++;
     UNLOCK(&(rwlock.lock));
     // Mise en attente de libération par l’écrivain
     futex_wait_bitset(&(rwlock.wait), 1, 0x01);
     LOCK(&(rwlock.lock));
     // Un lecteur en attente de moins
     rwlock.waiting_readers --;
   }
 }
}
void RWLOCK_WRITE(void)

{
LOCK(&(rwlock.lock));

while(1)
 {
   // S'il n'y a pas d’écrivainni de lecteurs en cours

    // ==> L’écrivain prend le rwlock
   if (0 == rwlock.writer && 0 == rwlock.readers)
   {
     rwlock.wait = 1;
     rwlock.writer = 1;
     UNLOCK(&(rwlock.lock));
     break;
   }
   else // Il y a un écrivain ou des lecteurs en cours
   {
     // Un écrivainsupplémentaire en attente
     rwlock.waiting_writers ++;
     UNLOCK(&(rwlock.lock));
     // Mise en attente de libération par les lecteurs ou l’écrivain
     futex_wait_bitset(&(rwlock.wait), 1, 0x02);
     LOCK(&(rwlock.lock));
     // Un écrivain de moins en attente
     rwlock.waiting_writers --;
   }
 }
}
void RWUNLOCK(void)

 LOCK(&(rwlock.lock));
// Si c'est l’écrivain qui déverrouille
if (rwlock.writer)
 {
   // Il n'y a plus d’écrivain en cours
   assert(1 == rwlock.writer);
   rwlock.writer = 0;
 }
 else // C'est un lecteur qui déverrouille
{
   // Décrémentation du nombre de lecteurs
   assert(rwlock.readers > 0);
   rwlock.readers --;
   // S'il reste des lecteurs
   if (0 != rwlock.readers)
   {
     // Les lecteurs gardent le rwlock
     UNLOCK(&(rwlock.lock));
     return;
   }
 }
 // Il n'y ni écrivain, ni lecteur en cours

  // ==> On réveille les éventuelsécrivains ou lecteurs

  // en attente en donnant priorité aux écrivains
rwlock.wait = 0;
 // S'il y a au moins un écrivain en attente, on le réveille
if (rwlock.waiting_writers)
 {
   UNLOCK(&(rwlock.lock));
   futex_wakeup_bitset(&(rwlock.wait), 1, 0x02);
 }
 else if (rwlock.waiting_readers)
 {
   // On réveille les lecteurs en attente
   UNLOCK(&(rwlock.lock));
   futex_wakeup_bitset(&(rwlock.wait), INT_MAX, 0x01);

}

else

{

UNLOCK(&(rwlock.lock));

}

}
static void *thd_main(void *p)
{
int *idx = (int *)p;

printf("Thread %d demarre...\n", *idx);

while (1)
 {
   RWLOCK_READ();
   if (*idx >= nb_thread)
   {
     RWUNLOCK();
     break;
   }
   RWUNLOCK();
 }
 printf("Thread %d se termine...\n", *idx);
 return NULL;
}
int main(int ac, char *av[])
{
int       nb;
pthread_t tid[255];
int       param[255];
int       i;
int       nb_thread_old;
 for (i = 0; i < 255; i ++)
 {
   param[i] = i;
 }
 while(1)
 {
   printf("Nombre de threads : ");
   if (EOF == fscanf(stdin, "%d", &nb))
   {
     break;
   }
   if (nb > 255)
   {
     fprintf(stderr, "La valeur max est 255\n");
     continue;
   }
   RWLOCK_WRITE();
   nb_thread_old = nb_thread;
   nb_thread = nb;
   RWUNLOCK();
   if (nb < nb_thread_old)
   {
     // Attente de la fin des threads en trop
     for (i = nb; i < nb_thread_old; i ++)
     {
       pthread_join(tid[i], NULL);
     }
   }
   else if (nb > nb_thread_old)
   {
     // Création des threads supplémentaires
     for (i = nb_thread_old; i < nb; i ++)
     {
       pthread_create(&(tid[i]), NULL, thd_main, &(param[i]));
     }
   }

}
return 0;
}

Le programme consiste en un thread principal (fonction main()) qui reçoit de l'opérateur le nombre de threads secondaires (fonction thd_main()) qui doivent s'exécuter. Chaque thread secondaire reçoit en paramètre un index unique, afin qu'à tout moment, seuls les threads d'indice zéro au nombre de threads secondaires moins un tournent. Si la valeur saisie est supérieure au nombre de threads secondaires courants, les threads additionnels sont créés. Dans le cas contraire, les threads superflus sont arrêtés. Voici un exemple de compilation et d'exécution du programme, où on crée cinq threads secondaires, puis on demande à n'en avoir que deux puis zéro :

$ gcc rwlock.c -o rwlock -lpthread
$ ./rwlock
Nombre de threads : 5
Thread 0 demarre...
Thread 1 demarre...
Nombre de threads : Thread 2 demarre...
Thread 4 demarre...
Thread 3 demarre...
2
Thread 3 se termine...
Thread 4 se termine...
Thread 2 se termine...
Nombre de threads : 0
Thread 0 se termine...
Thread 1 se termine...
Nombre de threads :

<CTRL D>
$

Le nombre courant de threads secondaires est stocké dans la variable globale nb_thread par le thread principal qui y accède donc en écriture. Les threads secondaires effectuent une boucle pour accéder à cette variable en lecture, afin de s'arrêter dès que leur indice est supérieur ou égal à nb_thread. Sur certaines architectures, la mise à jour ou la consultation d'un entier n'est pas une opération atomique. Il faut par conséquent protéger la ressource partagée nb_thread de sorte à s'assurer que les threads secondaires lisent toujours une valeur cohérente avant et après la mise à jour faite par le thread principal. D'où l'utilité d'un rwlock représenté par la variable globale rwlock : structure composée d'un mutex (les fonctions LOCK() et UNLOCK() associées ne sont pas reproduites ici, car ce sont les mêmes que celles du listing 5 du premier article de la série) nommé lock, qui protège l'accès aux autres champs qui sont un futex nommé wait et des compteurs de nombre de lecteurs et écrivains en attente sur le rwlock, ou ayant verrouillé le rwlock. Trois opérations sont définies pour le rwlock :

- RWLOCK_READ() pour verrouiller le rwlock en lecture ;

- RWLOCK_WRITE() pour verrouiller le rwlock en écriture ;

- RWUNLOCK() pour déverrouiller le rwlock.

Deux files d'attente sont définies sur le futex wait : l'ensemble de bits 0x01 est la file d'attente pour les threads en lecture, tandis que la valeur 0x02 est pour les threads en écriture.

Le verrouillage en lecture ne peut se faire qu'à la condition où un écrivain n'a pas verrouillé le rwlock (i.e. champ writer égal à 0) et s'il n'y a pas d'écrivains en attente sur le rwlock (i.e. champ waiting_writers égal à 0). Cette dernière condition prévient les phénomènes de famine [4] qui pourraient survenir au niveau des écrivains si le nombre de lecteurs est important : les écrivains pourraient ne pas avoir accès à la ressource assez rapidement. La priorité d'accès est donc donnée aux écrivains, car il semble logique de modifier le plus rapidement possible la ressource partagée de sorte à donner aux lecteurs une valeur la plus à jour possible au plus vite. Si les conditions ne sont pas vérifiées, l'appelant se met en attente en invoquant l'appel système futex() avec le code opération FUTEX_WAIT_BITSET et l'ensemble de bits 0x01 sur le futex wait.

Le verrouillage en écriture ne peut se faire que s'il n'y a pas un écrivain (i.e. champ writer égal à 0) et des lecteurs (i.e. champ readers égal à 0) qui ont déjà verrouillé le rwlock. Ici, on ne se préoccupe pas du nombre de lecteurs en attente (champ waiting_readers), car les écrivains sont prioritaires pour verrouiller le rwlock. Si les conditions ne sont pas vérifiées, l'appelant se met en attente en invoquant l'appel système futex() avec le code opération FUTEX_WAIT_BITSET et l'ensemble de bits 0x02 sur le futex wait.

Le déverrouillage est une fonction commune aux lecteurs et écrivains, car le champ writer permet de différencier l'appel par un écrivain (valeur égale à 1) ou un lecteur (valeur égale à 0). Pour un écrivain, le déverrouillage consiste à remettre le champ writer à 0. Pour un lecteur, le champ readers est décrémenté et le déverrouillage n'est effectif qu'à partir du moment où ce champ atteint la valeur 0 (pour signifier qu'il n'y a plus de lecteurs). Le déverrouillage se conclut par le réveil d'un seul écrivain s'il y en a au moins un qui est en attente (i.e. valeur du champ waiting_writers supérieur à 0), ou de tous les lecteurs s'il y en a en attente (i.e. valeur du champ waiting_readers supérieur à 0). Dans le premier cas, futex() est appelé avec l'opération FUTEX_WAKE_BITSET, le nombre de threads à 1 et l'ensemble de bits 0x02 (pour ne sélectionner que les écrivains en attente sur le futex wait dans la table de hachage du noyau). Dans le second cas, le nombre de threads est positionné à INT_MAX (pour réveiller tous les lecteurs en attente) et l'ensemble de bits 0x01 (pour ne sélectionner que les lecteurs en attente sur le futex wait dans la table de hachage du noyau).

Si cette notion de file d'attente permet de faciliter l'implémentation de fonctions de synchronisation évoluées comme les rwlock, [5] souligne toutefois la limite des files d'attente multiples sur un même futex du point de vue de la performance. En effet, la conséquence est l'allongement des listes chaînées correspondantes dans la table de hachage futex_queues du kernel (cf. figure 1 du précédent article). Le parcours de liste est plus long pour ne sélectionner que les threads correspondant au bitset passé à l'opération FUTEX_WAKE_BITSET. Il est généralement préférable d'utiliser plusieurs futex avec une file d'attente (fonctionnement par défaut) qu'un seul futex avec plusieurs files d'attente. C'est sans doute la raison pour laquelle la GLIBC/NPTL n'utilise pas ces opérations pour les services relatifs aux rwlocks (i.e. pthread_rwlock_...()).

Par la suite, il sera vu que ces opérations ont tout de même un intérêt primordial au niveau du paramètre de temporisation, qui offre une certaine flexibilité par rapport aux autres opérations vues jusqu'ici.

3. Le paramètre de temporisation

3.1 Temporisation relative

Quand il n'est pas NULL, le quatrième paramètre de l'appel système futex() spécifie la valeur d'une temporisation pour les opérations comme FUTEX_WAIT ou FUTEX_LOCK_PI qui impliquent une notion d'attente. Voici un exemple de prototype :

int futex(int *uaddr1, FUTEX_WAIT, int nb_wake1, const struct timespec *timeout)

Le type de ce paramètre est l'adresse d'une structure timespec, qui décrit la durée maximum en secondes et nanosecondes pendant laquelle l'appelant sera bloqué sur le futex :

struct timespec {
 time_t  tv_sec;     /* seconds */
 long    tv_nsec;    /* nanoseconds */

}

Dans le noyau, une temporisation de haute résolution est associée au thread appelant lorsqu'il est mis en attente sur le futex. À l'échéance, l'appel système retourne le code d'erreur -1 avec la variable errno positionnée avec la valeur ETIMEDOUT. Le domaine d'application de cette fonctionnalité concerne généralement la mise en attente sur un mutex avec un délai maximum. Le programme du listing 6 propose une autre application par la réécriture simple d'un service tel que l'appel système nanosleep(), qui suspend le thread appelant pendant une durée donnée.

[...]
#define futex_wait_to(addr, val, to) \
 syscall(SYS_futex, addr, FUTEX_WAIT_PRIVATE, val, to)
void mysleep(struct timespec *timeout)
{
int rc;
int futex_var = 0;
 rc = futex_wait_to(&futex_var, 0, timeout);
 printf("rc=%d, errno='%m' (%d)\n", rc, errno);
}
int main(int ac, char *av[])
{
struct timespec timeout;
 timeout.tv_sec = atoi(av[1]);
 timeout.tv_nsec = 0;
 mysleep(&timeout);
} // main

La macro futex_wait(), utilisée jusque-là dans les exemples, est renommée en futex_wait_to() avec un paramètre supplémentaire spécifiant le temps maximum de l'attente. La fonction mysleep() suspend le thread appelant pendant la durée spécifiée en paramètre. Pour cela, l'opération FUTEX_WAIT_PRIVATE est appelée avec la variable locale futex_var à 0 et le délai d'attente maximum passé en paramètre. Comme aucune opération FUTEX_WAKE_PRIVATE n'est effectuée, la temporisation arrive à son terme :

$ gcc futex_timeout.c -o futex_timeout
$ ./futex_timeout 2
rc=-1, errno='Connection timed out' (110)

On en déduit que par défaut, les temporisations sont de type CLOCK_MONOTONIC : durée relative par rapport au moment du lancement de l'opération.

Pour implémenter les services POSIX tels que pthread_mutex_timedlock() ou pthread_cond_timedwait(), ce n'est pas très pratique, car la norme impose d'exprimer la durée d'attente en heure absolue, c'est-à-dire l'heure que l'attente ne doit pas dépasser. Si les primitives de la GLIBC/NPTL qui offre ces services standards utilisaient les temporisations par défaut des opérations sur les futex, on aurait un nombre d'appels système rédhibitoire pour les performances. En effet, l'utilisateur du service devrait appeler un service tel que gettimeofday() pour avoir l'heure courante, afin de lui ajouter l'intervalle de temps à attendre avant de passer cette heure absolue à la librairie standard. Cette dernière devrait quant à elle effectuer l'opération inverse, en appelant de nouveau gettimeofday() pour soustraire la valeur retournée au paramètre de temporisation pour obtenir la durée d'attente relative par rapport à l'heure courante avant de la soumettre à l'appel système futex(). Ce n'est pas efficace !

Dans ce contexte, l'opération FUTEX_WAIT_BITSET est une aubaine pour les services POSIX de la GLIBC/NPTL, car elle donne en plus la possibilité de temporisations de type CLOCK_REALTIME, qui sont exprimées en heure absolue.

3.2 Temporisation absolue

Le fichier d'en-tête <linux/futex.h> définit la macro FUTEX_CLOCK_REALTIME afin de positionner le bit 9 à 1 dans la commande via un « ou » logique (e.g. FUTEX_WAIT_BITSET | FUTEX_CLOCK_REALTIME). C'est donc un moyen de réduire le nombre des appels système, car la librairie C standard de GNU n'a pas besoin de faire une conversion en intervalle de temps relatif. Considérons par exemple le listing suivant, qui crée un thread qui se met en attente 10 secondes en appelant pthread_mutex_timedlock().

[...]
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void *thd_main(void *p)
{
struct timespec timeout;
int             rc;

struct timeval  tv;
 gettimeofday(&tv, NULL);

 tv.tv_sec += 10;

 TIMEVAL_TO_TIMESPEC(&tv, &timeout);

 printf("Echeance absolue : { %ld, %ld }\n", timeout.tv_sec, timeout.tv_nsec);

 rc = pthread_mutex_timedlock(&mutex, &timeout);
 printf("rc = %d\n", rc); 
 return NULL;
}
int main(int ac, char *av[])
{
int       rc;

pthread_t tid;

pthread_mutex_lock(&mutex);

pthread_create(&tid, NULL, thd_main, NULL);

pthread_join(tid, NULL);

return 0;
} // main

Le thread affiche l'heure absolue de l'échéance de l'attente décrite dans une structure timespec avant de la passer au service POSIX. L'outil strace montre que c'est bien l'opération FUTEX_WAIT_BITSET_PRIVATE | FUTEX_CLOCK_REALTIME qui est utilisée en interne par la librairie GNU. De plus, le paramètre de temporisation passé par l'utilisateur (en vert) est directement utilisé par le service en paramètre de l'appel système futex() (en jaune) sans aucun calcul ou conversion supplémentaire :

$ gcc timed_wait.c -o timed_wait -lpthread
$ strace -f ./timed_wait
[...]
[pid  5212] write(1, "Echeance absolue : { 1373120661,"..., 45Echeance absolue : { 1373120661, 419074000 }
) = 45
[pid  5212] futex(0x6010a0, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME, 2, {1373120661, 419074000}, ffffffff) = -1 ETIMEDOUT (Connection timed out)
[pid  5212] write(1, "rc = 110\n", 9rc = 110

On notera que le dernier paramètre passé à l'appel système futex() est 0xFFFFFFFF, c'est-à-dire FUTEX_BITSET_MATCH_ANY vu précédemment, afin de réveiller le thread appelant avec toute opération de réveil (telle que FUTEX_WAKE).

Lorsque la temporisation est exprimée en heure absolue (CLOCK_REALTIME), il est utile de préciser qu'elle est impactée par les éventuels changements d'heure du système. En d'autres termes, l'échéance de la temporisation peut être avancée ou retardée en fonction de l'évolution de l'heure. Ce n'est en revanche pas le cas pour les intervalles de temps relatifs (CLOCK_MONOTONIC).

Toute tentative d'utilisation du bit FUTEX_CLOCK_REALTIME avec une opération autre que FUTEX_WAIT_BITSET et FUTEX_WAIT_REQUEUE_PI, se conclut par un retour erreur ENOSYS pour indiquer que ce n'est pas supporté.

3.3 Temporisation et héritage de priorité

Concernant l'héritage de priorité, Linux supporte le réajustement de la priorité d'un thread ayant verrouillé un futex lorsqu'un thread de plus haute priorité, en attente temporisée sur ce même futex, voit sa temporisation échoir. Contrairement aux autres opérations, la temporisation passée en paramètre pour FUTEX_LOCK_PI est exprimée en heure absolue. Le listing suivant donne un exemple d'utilisation. 

[...]
#define futex_lock_pi_to(addr, to) \
 syscall(SYS_futex, addr, FUTEX_LOCK_PI, 0, to)

#define futex_unlock_pi(addr) \

syscall(SYS_futex, addr, FUTEX_UNLOCK_PI, 0, 0)
#define futex_wait_to(addr, to) \
 syscall(SYS_futex, addr, FUTEX_WAIT_PRIVATE, 0, to)
#define gettid() syscall(__NR_gettid)
int futex_var;
void *thd_main(void *p)
{
struct sched_param param;
int                old;
int                mytid = gettid();
struct timespec    timeout;
int                rc;
struct timeval     t;
unsigned int       seconds = *((unsigned int *)p);
 gettimeofday(&t, NULL);
 fprintf(stderr, "0x%x : Date courante %lu s / %lu us\n", mytid, t.tv_sec, t.tv_usec);
 old = __sync_val_compare_and_swap(&futex_var, 0, mytid);
 fprintf(stderr, "0x%x : La valeur du futex@%p avant LOCK est : 0x%x\n", mytid, &futex_var, old);
switch(old)
 {
   case 0 : // Le futex était libre
   {
     // Désormais le thread courant détient le futex
     assert(0);
   }
   break;
   default : // Le futex n'est pas libre
   {
     timeout.tv_sec = (t.tv_sec += seconds);
     timeout.tv_nsec = t.tv_usec * 1000;
     fprintf(stderr, "0x%x : timeout a la date %lu s / %lu us\n", mytid, t.tv_sec, t.tv_usec);
     rc = futex_lock_pi_to(&futex_var, &timeout);

   }

}
 gettimeofday(&t, NULL);
 fprintf(stderr, "0x%x : La valeur du futex@%p apres LOCK est : 0x%x\n"
                 "       Le code de retour de l'appel futex() est : %d (errno = %d)\n"
                 "       Date courante %lu s / %lu us\n"
         ,
     mytid, &futex_var, futex_var, rc, errno, t.tv_sec, t.tv_usec);
 fprintf(stderr, "0x%x : Thread termine\n", mytid);
 return NULL;
}
int main(int ac, char *av[])
{
pthread_t          tid;
int                mytid = gettid();
unsigned int       duree;
 duree = (unsigned int)atoi(av[1]);
 // Le thread courant verrouille le futex
 futex_var = gettid();
 pthread_create(&tid, NULL, thd_main, (void *)&duree);

 pthread_join(tid, NULL);

return 0;

}

Le programme consiste en un thread principal qui verrouille un futex avec héritage de priorité : il y stocke son numéro de tâche vu du noyau Linux (appel système gettid()), avant de créer un thread qui va se mettre en attente sur le futex verrouillé pendant le nombre de secondes passé en paramètre. L'attente s'effectue avec l'opération FUTEX_LOCK_PI et la valeur absolue de la date d'échéance de la temporisation, c'est-à-dire la date courante plus le nombre de secondes passé en paramètre. La compilation et l'exécution pour une attente de 5 secondes donne :

$ gcc futex_timeout_pi.c -o futex_timeout_pi -lpthread
$ ./futex_timeout_pi 5
0xf95 : Date courante 1373367174 s / 456851 us
0xf95 : La valeur du futex@0x601084 avant LOCK est : 0xf94
0xf95 : timeout a la date 1373367179 s / 456851 us
0xf95 : La valeur du futex@0x601084 apres LOCK est : 0x80000f94
       Le code de retour de l'appel futex() est : -1 (errno = 110)
       Date courante 1373367179 s / 456871 us

0xf95 : Thread termine

Sur échéance de temporisation, l'appel système futex() retourne -1 avec errno positionné à ETIMEDOUT.

On notera, de plus, que le mécanisme de protection contre l'inversion de priorité s'applique toujours : tout thread en attente sur un futex avec héritage de priorité, et dont la temporisation tombe à échéance, provoque le réajustement de la priorité du thread qui détient le futex avec la valeur de la priorité du thread de plus haute priorité encore en attente sur le futex. S'il n'y a plus de threads en attente, alors le thread qui détient le futex retrouve sa priorité d'origine.

Conclusion

Cet avant-dernier article a abordé des notions essentielles telles que l'héritage de priorité que les futex permettent de mettre en œuvre afin de minimiser les latences et le multiplexage, qui trouve surtout son utilité pour les temporisations exprimées en heure absolue pour la compatibilité POSIXdans la GLIBC/NPTL.

Références

[1] L'inversion de priorité : http://en.wikipedia.org/wiki/Priority_inversion

[2] L'héritage de priorité : http://en.wikipedia.org/wiki/Priority_inheritance

[3] La notion de rwlock : http://en.wikipedia.org/wiki/Readers-writer_lock

[4] Notion de famine :https://fr.wikipedia.org/wiki/Famine_(informatique)

[5] « Futex Cheat Sheet » : http://locklessinc.com/articles/futex_cheat_sheet/




Article rédigé par

Par le(s) même(s) auteur(s)

Exécution concurrente avec les coroutines

Magazine
Marque
GNU/Linux Magazine
Numéro
251
Mois de parution
septembre 2021
Spécialité(s)
Résumé

Concept datant des premières heures de l'informatique, puis laissé en désuétude au profit des threads, les coroutines suscitent un engouement depuis quelques années, notamment dans le domaine de l'embarqué et de l'Internet of Things (IoT). Certains langages les supportent nativement. Le langage C ne les propose pas, mais la librairie C recèle des services qui permettent de les mettre en œuvre.

Les derniers articles Premiums

Les derniers articles Premium

PostgreSQL au centre de votre SI avec PostgREST

Magazine
Marque
Contenu Premium
Spécialité(s)
Résumé

Dans un système d’information, il devient de plus en plus important d’avoir la possibilité d’échanger des données entre applications. Ce passage au stade de l’interopérabilité est généralement confié à des services web autorisant la mise en œuvre d’un couplage faible entre composants. C’est justement ce que permet de faire PostgREST pour les bases de données PostgreSQL.

La place de l’Intelligence Artificielle dans les entreprises

Magazine
Marque
Contenu Premium
Spécialité(s)
Résumé

L’intelligence artificielle est en train de redéfinir le paysage professionnel. De l’automatisation des tâches répétitives à la cybersécurité, en passant par l’analyse des données, l’IA s’immisce dans tous les aspects de l’entreprise moderne. Toutefois, cette révolution technologique soulève des questions éthiques et sociétales, notamment sur l’avenir des emplois. Cet article se penche sur l’évolution de l’IA, ses applications variées, et les enjeux qu’elle engendre dans le monde du travail.

Petit guide d’outils open source pour le télétravail

Magazine
Marque
Contenu Premium
Spécialité(s)
Résumé

Ah le Covid ! Si en cette période de nombreux cas resurgissent, ce n’est rien comparé aux vagues que nous avons connues en 2020 et 2021. Ce fléau a contraint une large partie de la population à faire ce que tout le monde connaît sous le nom de télétravail. Nous avons dû changer nos habitudes et avons dû apprendre à utiliser de nombreux outils collaboratifs, de visioconférence, etc., dont tout le monde n’était pas habitué. Dans cet article, nous passons en revue quelques outils open source utiles pour le travail à la maison. En effet, pour les adeptes du costume en haut et du pyjama en bas, la communauté open source s’est démenée pour proposer des alternatives aux outils propriétaires et payants.

Sécurisez vos applications web : comment Symfony vous protège des menaces courantes

Magazine
Marque
Contenu Premium
Spécialité(s)
Résumé

Les frameworks tels que Symfony ont bouleversé le développement web en apportant une structure solide et des outils performants. Malgré ces qualités, nous pouvons découvrir d’innombrables vulnérabilités. Cet article met le doigt sur les failles de sécurité les plus fréquentes qui affectent même les environnements les plus robustes. De l’injection de requêtes à distance à l’exécution de scripts malveillants, découvrez comment ces failles peuvent mettre en péril vos applications et, surtout, comment vous en prémunir.

Les listes de lecture

9 article(s) - ajoutée le 01/07/2020
Vous désirez apprendre le langage Python, mais ne savez pas trop par où commencer ? Cette liste de lecture vous permettra de faire vos premiers pas en découvrant l'écosystème de Python et en écrivant de petits scripts.
11 article(s) - ajoutée le 01/07/2020
La base de tout programme effectuant une tâche un tant soit peu complexe est un algorithme, une méthode permettant de manipuler des données pour obtenir un résultat attendu. Dans cette liste, vous pourrez découvrir quelques spécimens d'algorithmes.
10 article(s) - ajoutée le 01/07/2020
À quoi bon se targuer de posséder des pétaoctets de données si l'on est incapable d'analyser ces dernières ? Cette liste vous aidera à "faire parler" vos données.
Voir les 66 listes de lecture

Abonnez-vous maintenant

et profitez de tous les contenus en illimité

Je découvre les offres

Déjà abonné ? Connectez-vous