1. Réveil sur condition
L'opération de réveil des futex FUTEX_WAKE_OP est certainement l'une des plus complexes à comprendre. Elle interprète les paramètres de l'appel système futex() comme suit :
int futex(int *uaddr1, FUTEX_WAKE_OP, int nb_wake1, int nb_wake2, int *uaddr2, int operation)
Cela permet de réveiller au maximum les nb_wake1 threads plus prioritaires en attente sur le futex uaddr1. De plus, ne seront réveillés au maximum les nb_wake2 threads plus prioritaires en attente sur le futex uaddr2 que si le résultat de l'opération spécifiée dans le dernier paramètre sur la valeur contenue à l'adresse uaddr2 est vrai. Le retour de l'appel système est le nombre de threads réveillés qui étaient en attente sur le futex uaddr1 cumulé au nombre de threads réveillés qui étaient en attente sur le futex uaddr2. L'opération est décrite à l'aide de la macro FUTEX_OP() définie dans <linux/futex.h> :
[...]
#define FUTEX_OP_SET 0 /* *(int *)UADDR2 = OPARG; */
#define FUTEX_OP_ADD 1 /* *(int *)UADDR2 += OPARG; */
#define FUTEX_OP_OR 2 /* *(int *)UADDR2 |= OPARG; */
#define FUTEX_OP_ANDN 3 /* *(int *)UADDR2 &= ~OPARG; */
#define FUTEX_OP_XOR 4 /* *(int *)UADDR2 ^= OPARG; */
#define FUTEX_OP_OPARG_SHIFT 8 /* Use (1 << OPARG) instead of OPARG. */
#define FUTEX_OP_CMP_EQ 0 /* if (oldval == CMPARG) wake */
#define FUTEX_OP_CMP_NE 1 /* if (oldval != CMPARG) wake */
#define FUTEX_OP_CMP_LT 2 /* if (oldval < CMPARG) wake */
#define FUTEX_OP_CMP_LE 3 /* if (oldval <= CMPARG) wake */
#define FUTEX_OP_CMP_GT 4 /* if (oldval > CMPARG) wake */
#define FUTEX_OP_CMP_GE 5 /* if (oldval >= CMPARG) wake */
/* FUTEX_WAKE_OP will perform atomically
int oldval = *(int *)UADDR2;
*(int *)UADDR2 = oldval OP OPARG;
if (oldval CMP CMPARG)
wake UADDR2; */
#define FUTEX_OP(op, oparg, cmp, cmparg) \
(((op & 0xf) << 28) | ((cmp & 0xf) << 24) \
| ((oparg & 0xfff) << 12) | (cmparg & 0xfff))
[...]
Le fichier d'en-tête détaille l'opération en commentaire (en rouge), mais il est incomplet, car il ne mentionne pas le réveil systématique effectué sur le futex uaddr1. Les actions sont correctement décrites dans un article de Ulrich Drepper [1] :
int oldval = *(int *)uaddr1;
*(int *)uaddr2 = oldval OP OPARG;
wake uaddr1
if (oldval CMP CMPARG)
wake uaddr2;
La première sous-opération (i.e. OP) est atomique de sorte que le contenu de uaddr2 est toujours cohérent lors des lectures tant côté kernel, qu'utilisateur. À titre d'exemple, considérons le morceau de code source suivant :
[...]
#define futex_wakeup_op(addr1, nb_wake1, addr2, nb_wake2, op) \
syscall(SYS_futex, addr1, FUTEX_WAKE_OP, nb_wake1, nb_wake2, addr2, op)
[...]
int futex1;
int futex2;
[...]
rc = futex_wakeup_op(&futex1, 1, &futex2, INT_MAX, FUTEX_OP(FUTEX_OP_ADD, -1, FUTEX_OP_CMP_EQ, 0))
[...]
Le dernier paramètre passé à l'appel système demande de décrémenter la variable futex2 de manière atomique (résultat de l'opération FUTEX_OP_ADD avec le paramètre -1) puis de tester si la valeur avant l'opération est égale à 0 (résultat de l'opération FUTEX_OP_CMP_EQ avec le paramètre 0). En sortie de cet appel système, le thread le plus prioritaire en attente sur futex1 sera réveillé (deuxième paramètre), tous les threads (INT_MAX en quatrième paramètre) en attente sur futex2 ne seront réveillés qu'à la condition où futex2 était égal à 0 avant l'appel (i.e. avant l'opération de décrémentation demandée) et, enfin, le code retour stocké dans rc est le nombre de threads effectivement réveillés sur les deux futex.
2. Changement implicite de futex
L'opération FUTEX_CMP_REQUEUE interprète les paramètres de l'appel système futex() comme suit :
int futex(int *uaddr1, FUTEX_CMP_REQUEUE, int nb_wake, int nb_move, int *uaddr2, int val1)
Si le futex uaddr1 contient la valeur val1 alors nb_wake threads en attente sur uaddr1 sont réveillés et s'il y a des threads supplémentaires en attente alors nb_move de ces derniers sont retirés de la file d'attente de uaddr1 pour être mis dans celle du futex uaddr2. Que ce soient les threads réveillés ou déplacés, il s'agit toujours des plus prioritaires. La valeur de retour de l'appel système est le nombre de threads réveillés plus le nombre de threads déplacés. De plus, rien n'indiquera au réveil des threads déplacés qu'ils ont changé de file d'attente de futex : il s'agit donc bien d'un déplacement implicite de la file d'attente d'un futex vers une autre.
Le programme suivant démontre le fonctionnement de ce service. Le thread principal (thread#0) crée quatre threads secondaires temps réel (thread#1 à thread#4) de priorités décroissantes. Le thread#0 s'endort une seconde pour laisser le temps aux autres de se mettre en attente sur futex1. Au réveil, le thread#0 exécute l'appel système futex() avec l'opération FUTEX_CMP_REQUEUE pour ne réveiller qu'un thread (i.e. paramètre nb_wake égal à 1) et de faire migrer deux threads vers futex2 (i.e. nb_move égal à 2). Cela a pour conséquence de :
-Réveiller le thread secondaire le plus prioritaire (thread#1);
-De faire migrer le thread#2 et thread#3 sur futex2;
-De laisser le thread#4 (le moins prioritaire) en attente sur futex1.
Le compte-rendu de l'appel système est donc égal à 3 (i.e. 1 thread réveillé plus 2 threads migrés). Ensuite le thread#0 réveille le thread#2 et le thread#3 puis le thread#4 en appelant futex() avec l'opération FUTEX_WAKE respectivement sur futex2 et futex1 avec le nombre de threads à réveiller positionné à INT_MAX. Les comptes-rendus de ces deux derniers appels systèmes sont respectivement 2 et 1 pour dénoter le nombre de threads réveillés à chaque fois.
[...]
int nb_thread = 5;
#define futex_wait(addr, val) \
syscall(SYS_futex, addr, FUTEX_WAIT, val, NULL)
#define futex_wakeup(addr, nb) \
syscall(SYS_futex, addr, FUTEX_WAKE, nb)
int futex1;
int futex2;
void *thd_main(void *p)
{
int *idx = (int *)p;
struct sched_param param;
param.sched_priority = nb_thread - *idx;
pthread_setschedparam(pthread_self(), SCHED_RR, ¶m);
printf("Thread#%d attend sur le futex1 (prio = %d)\n", *idx, param.sched_priority);
futex_wait(&futex1, 0);
printf("Thread#%d reveille\n", *idx);
return NULL;
}
#define futex_cmp_requeue(addr1, nb_wake, nb_move, addr2, val1) \
syscall(SYS_futex, addr1, FUTEX_CMP_REQUEUE, nb_wake, nb_move, addr2, val1)
int main(int ac, char *av[])
{
pthread_t *tid;
int i;
int *idx;
int rc;
printf("Thread#%d demarre\n", 0);
idx = (int *)malloc(nb_thread * sizeof(int));
tid = (pthread_t *)malloc(nb_thread * sizeof(pthread_t));
tid[0] = pthread_self();
for (i = 1; i < nb_thread; i ++)
{
idx[i] = i;
pthread_create(&(tid[i]), NULL, thd_main, (void *)&(idx[i]));
}
// Attente pour que les threads soient sur le futex1
mysleep(1);
// Réveil de thread#1
// Transfert de thread#2 et thread#3 sur futex2
// Thread#4 reste en attente sur futex1
rc = futex_cmp_requeue(&futex1, 1, 2, &futex2, 0);
printf("Thread#%d : rc = %d\n", 0, rc);
// Attente de la terminaison du thread#1
pthread_join(tid[1], NULL);
// Réveil des thread#2 et thread#3 en attente sur le futex2
rc = futex_wakeup(&futex2, INT_MAX);
printf("Thread#%d : rc = %d\n", 0, rc);
// Attente de la terminaison du thread#2 et thread#3
pthread_join(tid[2], NULL);
pthread_join(tid[3], NULL);
// Réveil du thread#4 en attente sur le futex1
rc = futex_wakeup(&futex1, INT_MAX);
printf("Thread#%d : rc = %d\n", 0, rc);
// Attente de la terminaison du thread#4
pthread_join(tid[4], NULL);
free(idx);
free(tid);
return 0;
}
La compilation et l'exécution donnent ceci (on notera le besoin des droits du super utilisateur pour créer des threads temps réel sur certains systèmes) :
$ gcc requeue.c -o requeue -lpthread
$ sudo ./requeue
Thread#0 demarre
Thread#1 attend sur le futex1 (prio = 4)
Thread#2 attend sur le futex1 (prio = 3)
Thread#3 attend sur le futex1 (prio = 2)
Thread#4 attend sur le futex1 (prio = 1)
Thread#0 : rc = 3
Thread#1 reveille
Thread#0 : rc = 2
Thread#2 reveille
Thread#3 reveille
Thread#0 : rc = 1
Thread#4 reveille
L'intérêt de cet appel système réside dans le fait que des threads en attente sur un futex peuvent migrer vers un autre sans être réveillés. C'est un moyen d'éviter le phénomène de « thundering herd » (cf. [2]) où plusieurs threads se réveilleraient en même temps pour se remettre aussitôt en attente sur un autre futex provoquant ainsi de la consommation CPU par la multiplication des appels systèmes et des rechargements de cache que cela induit (surtout en environnement SMP). La motivation première de ce service est d'optimiser les services POSIX relatifs aux variables conditionnelles dans la GLIBC/NPTL et notamment pthread_cond_broadcast(). En effet, une variable conditionnelle est associée à un mutex utilisateur et utilise un mutex interne pour protéger l'accès à ses données privées de telle sorte que lorsqu'un thread se met en attente sur une variable conditionnelle, il doit d'abord verrouiller le mutex utilisateur (via pthread_mutex_lock()) avant d'appeler le service pthread_cond_wait(). Les traitements de ce dernier peuvent être schématisés par le pseudo code suivant :
LOCK(mutex interne)
UNLOCK(mutex utilisateur)
[...]
do
[...]
futex_val = futex
UNLOCK(mutex interne)
futex_wait(futex, futex_val)
LOCK(mutex interne)
[...]
while (...)
[...]
UNLOCK(mutex interne)
LOCK(mutex utilisateur)
Le thread appelant est mis en attente sur un futex (ligne en rouge) pour être réveillé lors de l'appel à la fonction pthread_cond_signal() ou pthread_cond_broadcast() par un autre thread. Une fois réveillé le thread verrouille de nouveau puis déverrouille le mutex interne avant de terminer avec le verrouillage du mutex utilisateur à la fin de pthread_cond_wait(). Comme la fonction pthread_cond_broadcast() réveille systématiquement tous les threads en attente sur une variable conditionnelle, si elle procédait par appel à futex() avec l'opération FUTEX_WAKE, tous les threads en attente se réveilleraient en même temps pour effectuer les verrouillages tels un bruyant troupeau (traduction de l'expression anglo-saxonne « thundering herd »). Cela induirait de la consommation de temps CPU, car un seul d'entre eux obtiendrait le verrou alors que les autres se remettraient en attente sur le futex du mutex interne puis sur celui du mutex utilisateur. D'où l'intérêt de l'opération FUTEX_CMP_REQUEUE dans la fonction pthread_cond_broadcast() afin de ne réveiller qu'un thread en attente et de faire migrer les autres sur la file d'attente du futex associé au mutex utilisateur sans les réveiller. Voici le pseudo code pour ce service :
LOCK(mutex interne)
[...]
if (threads à réveiller)
[...]
UNLOCK(mutex interne)
futex_cmp_requeue(futex, 1, INT_MAX, mutex utilisateur, futex_val)
[...]
Les threads ainsi migrés seront réveillés lorsque le mutex utilisateur sera relâché par le thread ayant été réveillé par pthread_cond_broadcast() et non pas directement après l'appel à cette dernière fonction.
est la version simplifiée de FUTEX_CMP_REQUEUE : elle réveille nb_move threads et transfère nb_wake threads de la file d'attente du futex à l'adresse uaddr1 vers celle du futex à l'adresse uaddr2. Le prototype est le suivant :
int futex(int *uaddr1, FUTEX_REQUEUE, int nb_wake, int nb_move, int *uaddr2)
D'après Ulrich Drepper [1], l'opération FUTEX_CMP_REQUEUE rend obsolète FUTEX_REQUEUE qui présenterait un problème de cas de croisement pour l'implémentation des variables conditionnelles POSIX vues plus haut. Cependant, l'article [3] semble contredire cela en proposant un nouvel algorithme pour les variables conditionnelles basées sur cette dernière opération et qui serait plus efficace que l'implémentation actuelle à l'aide de FUTEX_CMP_REQUEUE. L'opération FUTEX_REQUEUE resterait donc d'actualité.
Pour supporter l'héritage de priorité, les opérations FUTEX_WAIT_REQUEUE_PI et FUTEX_CMP_REQUEUE_PI sont disponibles. Ces deux opérations agissent conjointement et sont respectivement utilisées à la place des opérations FUTEX_WAIT et FUTEX_CMP_REQUEUE.
Le prototype de l'appel système pour l'opération FUTEX_CMP_REQUEUE_PI est similaire à celui de l'opération FUTEX_CMP_REQUEUE :
int futex(int *uaddr1, FUTEX_CMP_REQUEUE_PI, int nb_wake, int nb_move, int *uaddr2, int val1)
Le prototype de l'appel système pour l'opération FUTEX_WAIT_REQUEUE_PI a une forme inédite :
int futex(int *uaddr1, FUTEX_WAIT_REQUEUE_PI, int val, const struct timespec *timeout, int *uaddr2)
L'appelant se met en attente sur le futex à l'adresse uaddr1 si la valeur de ce dernier est égale au paramètre val. Si ce n'est pas le cas, le code d'erreur EAGAIN (synonyme de EWOULDBLOCK) est retourné. Au retour de cette fonction, l'appelant a la main sur le futex à l'adresse uaddr2.
Les futex uaddr1 et uaddr2 doivent être différents sous peine du retour d'une erreur avec le code EINVAL. Le paramètre timeout s'il n'est pas NULL, définit le temps d'attente. Il peut être exprimé en heure absolue si le bit FUTEX_CLOCK_REALTIME a été positionné au niveau de la commande (e.g. FUTEX_WAIT_REQUEUE_PI | FUTEX_CLOCK_REALTIME) sinon c'est un intervalle de temps relatif au moment de l'appel système.
3. Autres opérations
Pour compléter l'étude des futex, il convient de citer une dernière opération disponible sur les futex : FUTEX_FD. Cette opération est destinée à associer un descripteur de fichier à un futex de sorte que l'on puisse bénéficier d'opérations telles que select() ou poll(). Le prototype est :
int futex(int *uaddr1, FUTEX_FD, int val)
Cependant, d'après Ulrich Drepper [1], cette opération est inutilisable, car elle présente un certain nombre d'inconvénients et le noyau Linux ne la supporte plus dans ses dernières moutures. Toute tentative d'utilisation se traduira par un retour erreur avec le code ENOSYS.
Conclusion
Cette série d'articles a levé une grande partie du voile sur la notion de futex. Il permet de combler les lacunes voire de corriger les erreurs du manuel en ligne disponible sur le sujet.
La fonction première des futex est de constituer un outil de base pour implémenter des services de synchronisation inter-threads (au sein d'un ou plusieurs processus) en évitant au maximum la multiplication des appels au système qui sont coûteux en terme de performances. Le principe consiste à effectuer l'opération en espace utilisateur et ne recourir au noyau qu'en cas de contention pour mettre en sommeil le thread appelant.
L'utilisation des futex s'est surtout démocratisée chez les développeurs de librairies en recherche permanente de performances et d'adaptation de l'environnement Linux aux contraintes temps réel et aux standards comme POSIX. La GLIBC/NPTL en est l'un des principaux bénéficiaires.
Cette étude fut prétexte à effectuer un tour d'horizon sur des notions importantes dans le domaine de la synchronisation inter-threads et du déterminisme des systèmes d'exploitation : les opérations atomiques, les sections critiques, l'ordonnancement des threads, l'inversion de priorité, l'héritage de priorité, la réduction du temps de latence... Fort de ces connaissances, le lecteur programmeur pourra utiliser les fonctions de la GLIBC/NPTL avec plus de recul voire écrire des services de synchronisation adaptés à ses propres besoins. Cet article devrait aussi constituer un bon support de cours et travaux pratiques pour l'enseignement des méthodes de synchronisation.
Cependant, il ne faut pas oublier que les opérations en espace utilisateur nécessitent l'usage d'opérations atomiques qui ne sont souvent disponibles qu'en assembleur. Se posent alors des problèmes de portabilité du programme qui peuvent être circonscrits dans le cas de l'utilisation de compilateurs comme GCC qui proposent ces opérations sous forme d'extensions du langage. D'ailleurs les exemples de cet article ont été compilés et exécutés sans modifications sur architecture Intel (32 et 64 bits) et PowerPC (32 bits) grâce aux facilités de GCC.
Références
[1] U. Drepper, « Futex are tricky » : http://people.redhat.com/drepper/futex.pdf
[2] Thundering herd problem : http://en.wikipedia.org/wiki/Thundering_herd_problem
[3] Mutexes and Condition Variables using Futexes : http://locklessinc.com/articles/mutex_cv_futex/