Programmer avec les pools de threads en C#

Deuxième partie : Chaînage et fabrique de tâches

Le pool de threads est un mécanisme facile d'utilisation permettant de tirer profit de la parallélisation. L'amélioration des performances ou de l'expérience utilisateur ne sont que deux exemples classiques.

C'est une série de quatre tutoriels dans laquelle vous allez apprendre à programmer un pool de threads en C#.

Dans cette deuxième partie, il s'agira de découvrir comment relier l'exécution d'une tâche à une autre via le chaînage de tâches ainsi que de présenter les fabriques de tâches.

36 commentaires Donner une note à l'article (5)

Article lu   fois.

L'auteur

Profil ProSite personnel

Liens sociaux

Viadeo Twitter Facebook Share on Google+   

I. Premier chapitre

Bienvenue dans le deuxième article de cette série consacrée à l'utilisation du pool de threads.

Dans le premier article, nous avons vu les avantages qu'il pouvait y avoir à utiliser le pool de threads plutôt que la création de thread, et nous avons vu comment les utiliser.

Dans le présent article, nous allons approfondir l'utilisation du pool de threads, via l'utilisation de la classe Task, notamment :

  • comment gérer plusieurs tâches en même temps ;
  • comment annuler un ensemble de tâches ;
  • comment composer les tâches ;
  • comment utiliser les fabriques de tâches.

II. Gestion de tâches

Afin d'approfondir l'utilisation du pool de threads, nous allons suivre tout au long de cet article, un fil rouge qui sera un algorithme renvoyant vrai ou faux si le nombre passé en paramètre est un nombre premier.

Pour rappel, un nombre premier est défini comme un entier naturel strictement supérieur à 1 et divisible uniquement par 1 et par lui-même.

2 est le seul nombre premier pair. 3 est premier, de même que 5 et 7, mais 9 ne l'est pas, car il est divisible par 3.

L'algorithme implémenté sera volontairement naïf, sans aucune optimisation. L'objectif étant justement d'avoir un algorithme « coûteux » dans le but d'apprécier pleinement l'utilisation du pool de threads.

II-A. Première implémentation, sans parallélisme

La première implémentation de l'algorithme est sans parallélisme. Cette implémentation sera notre mètre-étalon et nous ferons de notre mieux pour proposer des algorithmes plus efficaces tout au long de ce tutoriel.

Première implémentation
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
private static Results IsPrime(long number)
{
    return new Results()
    {
        IsPrime = IsPrimeInRange(number, 2, number)
    };
}

private static bool IsPrimeInRange(long number, long start, long end)
{
    for (long i = Math.Max(start, 2); i < end; i = ++i)
    {              
        if (number % i == 0L)
        {
            return false;
        }
    }

    return true;
}

Un petit mot rapide sur cette implémentation. Afin de préparer l'implémentation avec parallélisme, j'ai fait le choix d'implémenter un algorithme qui renvoie vrai s'il existe un diviseur dans un intervalle donné pour un nombre, et faux sinon.

Ainsi, vérifier qu'un nombre N est premier revient à rechercher l'ensemble de ses diviseurs potentiels dans l'intervalle [2, N-1].

II-B. Deuxième implémentation, avec parallélisme

Comme nous avons pu le voir, l'algorithme choisi est très simple : on teste le nombre N dont on veut vérifier le caractère premier en le divisant successivement par tous les diviseurs possibles, c'est-à-dire les nombres dans l'intervalle I = [2, N - 1]. Si le reste d'une des divisions est nul, alors il existe au moins un diviseur autre que 1 et N et donc le nombre n'est pas premier.

L'idée, pour procéder à la parallélisation, est de découper l'intervalle I en plusieurs intervalles Ii. On aura donc I = I1 U I2 U … U Itt sera le nombre de tâches souhaitées (c.-à-d., le degré de parallélisme). Bien entendu, les intervalles Ii auront à peu près la même longueur.

L'implémentation de cet algorithme est la suivante :

Lancement des tâches
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
private static Results IsPrimeParallized(long number, int numberOfThreads)
{
    Task<bool>[] tasks = new Task<bool>[numberOfThreads];
    for (int i = 0; i < numberOfThreads; ++i)
    {
        int j = i;
        checked
        {
            tasks[i] = Task<bool>.Run(() => { return Program.IsPrimeInRange(number, number * j / numberOfThreads, number * (j + 1) / numberOfThreads); });
        }
    }

    Task.WaitAll(tasks);

    return new Results()
    {
        IsPrime = tasks.All(x => x.Result == true),
        Status = tasks.Select(x => x.Status).ToArray(),
        Tasks = tasks
    };
}

Si on prend un nombre premier et qu'on le teste, on peut aisément vérifier le gain lié à l'utilisation de plusieurs tâches. Par exemple, sur mon ordinateur disposant de quatre cœurs et avec quatre tâches, l'algorithme est près de quatre fois plus rapide.

Il n'y a rien de particulier quant au lancement des tâches (cela correspond à ce que nous avons vu dans le tutoriel précédent). La seule précaution que nous avons à prendre c'est que pour pouvoir dire si un nombre est premier ou non, il faut attendre le résultat de chacune des sous-tâches. C'est le rôle de la méthode Task.WaitAll, qui permet d'attendre que toutes les tâches passées en paramètres soient terminées.

Si on regarde la signature de Task.WaitAll, on remarque que la méthode prend en entrée un params Task[]. Cela signifie que l'on peut au choix :

  • lui passer un tableau contenant l'ensemble des tâches à attendre ;
  • lui passer chaque tâche une à une, puisque c'est une méthode qui accepte un nombre variable d'arguments.

Maintenant, est-ce que cette implémentation est vraiment plus performante ? Vérifions donc avec un autre nombre, qui cette fois ne sera pas premier.

Image non disponible
Comparaison de l'algorithme parallélisé avec l'algorithme naïf

Le résultat est sans appel : notre nouvelle implémentation peut se révéler catastrophique de ce point de vue ! Le nombre 10601859461 qui n'est pas premier est très vite détecté comme non premier par l'algorithme classique, mais met près de 46 s avec l'algorithme parallélisé.

Que se passe-t-il exactement ? Revenons au Task.WaitAll. On attend que toutes les tâches aient terminé leur exécution avant de pouvoir continuer. Ainsi, si jamais le nombre n'est pas premier et qu'une des tâches trouve un diviseur, cette tâche peut s'arrêter. Mais les autres ne s'arrêtent pas pour autant et donc continuent leur exécution. Et c'est exactement ce qui se passe ici.

Aussi, afin d'améliorer notre algorithme, il serait intéressant de pouvoir l'arrêter dès lors qu'une tâche a trouvé un diviseur et que l'on sait que le nombre n'est pas premier.

II-C. Troisième implémentation, avec parallélisme et arrêt au plus tôt

Nous allons donc écrire une nouvelle variante de l'algorithme, où cette fois-ci, au lieu d'attendre que toutes les tâches aient terminé leur exécution, nous allons attendre qu'une tâche ait terminé la sienne, en examiner le résultat et arrêter l'algorithme le cas échéant.

Pour cela, au lieu d'utiliser la méthode Task.WaitAll, nous allons utiliser la méthode Task.WhenAny. Cette méthode, comme sa consœur, prend des instances de Task en paramètres. Dès qu'une des tâches a terminé son exécution, la méthode renvoie cette tâche.

L'idée pour notre nouvel algorithme sera donc :

  • de disposer d'une liste de tâches ;
  • se mettre en attente qu'une tâche termine son exécution, en passant cette liste de tâches à Task.WhenAny ;
  • si la tâche renvoyée par Task.WhenAny a trouvé un diviseur, alors on arrête l'algorithme et on renvoie false (puisque le nombre n'est pas premier). Sinon, on retire la tâche de la liste des tâches et on se remet en attente de la fin de l'exécution d'une autre tâche ;
  • si la liste est vide, c'est que toutes les tâches ont terminé leur exécution sans trouver de diviseur. Le nombre sera donc un nombre premier.

Place à l'implémentation.

Implémentation avec interruption au plus tôt
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
private static Results IsPrimeParallizedStopASAP(long number, int numberOfThreads)
{
    Task<bool>[] tasks = new Task<bool>[numberOfThreads];
    List<Task<bool>> remainingTasks = new List<Task<bool>>(4);
    bool isPrime = true;

    for (int i = 0; i < numberOfThreads; ++i)
    {
        int j = i;
        tasks[i] = Task<bool>.Run(() => { return Program.IsPrimeInRange(number, number * j / numberOfThreads, number * (j + 1) / numberOfThreads); });                
    }

    remainingTasks.AddRange(tasks);
    do
    {
        Task<Task<bool>> finishedTask = Task.WhenAny(remainingTasks);
        remainingTasks.Remove(finishedTask.Result);     
                
        if (finishedTask.Result.Result == false)
        {
            isPrime = false;
            break;
        }          
    } while (remainingTasks.Count > 0);

    return new Results()
    {
        IsPrime = isPrime,
        Status = tasks.Select(x => x.Status).ToArray(),
        Tasks = tasks
    };
}
Image non disponible
Comparaison de l'algorithme avec arrêt au plus tôt et l'algorithme naïf

Si on teste, on voit déjà de bien meilleures performances, même si le nombre n'est pas premier.

Néanmoins, il reste encore un souci avec cette implémentation. Avez-vous deviné lequel ?

II-D. Quatrième implémentation : parallélisme avec annulation

II-D-1. Le jeton d'annulation

Les plus perspicaces auront remarqué que l'algorithme de la section précédente souffre d'un énorme défaut : dès qu'un nombre est reconnu comme non premier, ce résultat est renvoyé à l'appelant. Mais ce n'est pas pour autant que les tâches sont arrêtées ! Les autres tâches continuent leur exécution, consommant inutilement des ressources.

Pour y remédier, nous allons introduire le concept de « jeton d'annulation » (CancellationToken).

Comme son nom l'indique, cela va permettre d'annuler des tâches. Pour cela :

  • on instancie un objet de type CancellationTokenSource, qui va permettre de gérer l'annulation. D'une part, elle fournit la propriété IsCancelRequested, qui vaudra true dès lors qu'une annulation aura été requise et, d'autre part, une méthode Cancel() permettant de demander cette annulation ;
  • à la charge ensuite à chaque tâche de vérifier l'état du jeton, pour éventuellement arrêter son exécution si une annulation a été demandée.

Une tâche qui arrête son exécution, car elle est interrompue, doit en principe lever une exception spécifique : CancellationException.

L'algorithme suivant montre une implémentation utilisant ce jeton d'annulation.

Algorithme avec support de l'annulation
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
private static bool IsPrimeInRange(long number, long start, long end, CancellationToken cancellationToken)
{
  for (long i = Math.Max(start, 2); i < end; i = ++i)
  {             
    cancellationToken.ThrowIfCancellationRequested();                

    if (number % i == 0L)
    {
      return false;
    }                
  }
  return true;
}

Comme vous pouvez le constater, il suffit d'appeler régulièrement la méthode cancellationToken.ThrowIfCancellationRequested(). Cette méthode va lancer une exception CancellationException si une demande d'annulation est en cours.

Maintenant que notre programme gère une demande d'annulation, il reste à demander effectivement cette annulation, lorsque cela est nécessaire :

Demande d'annulation
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
private static Results IsPrimeParallizedWithCancellationToken(long number, int numberOfThreads)
{
    Task<bool>[] tasks = new Task<bool>[numberOfThreads];
    try
    {
                
        CancellationTokenSource cancellationToken = new CancellationTokenSource();

        for (int i = 0; i < numberOfThreads; ++i)
        {
            int j = i;                    
            tasks[i] = Task<bool>.Run(
                () => 
                {
                    if (IsPrimeInRange(number, number * j / numberOfThreads, number * (j + 1) / numberOfThreads, cancellationToken.Token))
                    {
                        return true;
                    }
                    else
                    {
                        cancellationToken.Cancel();
                        return false;
                    }
                }
                ,
                cancellationToken.Token);                    
        }

        Task.WaitAll(tasks, cancellationToken.Token);
                                
        return new Results()
        {
            IsPrime = tasks.All(x => x.Result == true),
            Status = tasks.Select(x => x.Status).ToArray(),
            Tasks = tasks
        };
    }
    catch(OperationCanceledException ex)
    {
        return new Results()
        {
            IsPrime = false,
            Status = tasks.Select(x => x.Status).ToArray(),
            Tasks = tasks
        };
    }
}

Il est possible de mettre fin normalement à la tâche (sans lever d'exception donc). Mais cela n'est pas forcément sans conséquence. En effet, une tâche qui se termine normalement à un statut RanCompleted, tandis qu'une tâche annulée a le statut Canceled. Procéder à l'annulation d'une tâche en la terminant normalement lui conférera le statut RanCompleted au lieu de Canceled. Outre le statut, cela peut avoir une incidence en cas de chaînage de tâches (cf. Section III).

On peut remarquer que l'appel à la méthode Task.WaitAll prend également le jeton d'annulation. Tout comme les tâches, l'attente peut être annulée. Ainsi, dès l'annulation, la méthode Task.WaitAll rend la main et l'algorithme peut continuer son exécution (et ceci, même si les tâches en cours d'annulation n'ont pas encore pu être annulées).

II-D-2. Aller plus loin avec le jeton d'annulation

Dans la section précédente, j'ai introduit le concept de jeton d'annulation, sans trop m'attarder sur le détail de sa mise en œuvre. Ainsi, derrière la notion de jeton d'annulation, il y a en réalité deux concepts :

  • le premier, celui de CancellationToken, qui correspond au jeton à proprement parler. C'est lui qui porte l'information concernant l'arrêt ou non des tâches. Un jeton d'annulation est en « lecture seule ». Ainsi, il n'est pas possible de demander directement l'annulation via ce jeton ;
  • le second, celui de CancellationTokenSource, qui correspond à la manipulation même du jeton. C'est cette classe qui fournit la méthode Cancel() permettant de demander une annulation. C'est également à partir d'une instance de CancellationTokenSource que l'on récupère le jeton d'annulation via la propriété Token.

Parmi les fonctionnalités non explorées dans ce tutoriel, mais qu'il est intéressant de connaître, il est possible de citer :

  • la possibilité de demander l'annulation d'une tâche après un certain laps de temps, via les surcharges de la méthode CancelAfter ;
  • la possibilité de créer des jetons d'annulation liés, c'est-à-dire que l'annulation d'un jeton va entraîner l'annulation de jetons d'annulation « parents ». Cela peut être pratique par exemple si dans un processus décomposable en de nombreuses tâches, l'annulation d'une des tâches doit conduire à l'annulation du processus global. Cela peut se faire via la méthode statique CancellationTokenSource.CreateLinkedTokenSource.

II-E. Récapitulatif

Un petit récapitulatif sous forme d'image de l'exécution des différents algorithmes (une sortie similaire peut être reproduite en exécutant le projet ExempleComplet).

Image non disponible
Récapitulatif

La première colonne correspond au nombre dont on teste le caractère premier.

La seconde correspond au résultat de l'algorithme naïf.

La troisième avec l'introduction du parallélisme.

La quatrième avec l'arrêt au plus tôt.

La cinquième avec la prise en charge d'un jeton d'annulation.

III. Chaînage de tâches

Maintenant, imaginons un instant que nous souhaitions découpler l'algorithme de la demande d'annulation. Après tout, ce n'est pas le rôle de l'algorithme de le faire.

Pour parvenir à cela, il faudrait, lorsqu'une tâche aura terminé son exécution, vérifier si elle a renvoyé true ou false, et le cas échéant, appeler la méthode Cancel() du jeton.

C'est réalisable très facilement en utilisant le chaînage des tâches, via la méthode Task.ContinueWith.

Appelée sur une tâche, ContinueWith permet de préciser une tâche à exécuter une fois que la tâche en cours aura fini son exécution. La tâche qui sera appelée accepte un paramètre, qui est la tâche précédente. Elle peut ainsi accéder au résultat de son exécution.

C'est donc ce que nous allons faire ici. Nous allons vérifier si le résultat est faux, et dans ce cas, appeler la méthode Cancel() du jeton d'annulation.

Exemple avec chaînage de tâches
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
tasks[i] = Task<bool>.Run(() => { return IsPrimeInRange(number, number * j / numberOfThreads, number * (j + 1) / numberOfThreads, cancellationToken.Token); })
  .ContinueWith((antecedant) =>
    {
      if (!antecedant.Result)
      {
        cancellationToken.Cancel();
      }
      return antecedant.Result;
    }, TaskContinuationOptions.NotOnCanceled);

On peut remarquer que la méthode ContinueWith est appelée avec l'option TaskContinuationOptions.. En effet, par défaut, la tâche spécifiée dans ContinueWith est appelée que la tâche précédente ait terminé son exécution normalement, qu'elle ait été annulée ou qu'elle ait échoué (exception non gérée). Ici, en précisant l'option TaskContinuationOptions.NotOnCanceled, on indique qu'il ne faut pas continuer l'exécution si la tâche précédente a été annulée. Il existe de nombreuses possibilités, et je vous invite à vous référer à TaskContinuationOptions sur la MSDN pour la liste exhaustive des possibilités.

IV. Fabrique de tâches

La fabrique de tâches est tombée légèrement en désuétude avec l'arrivée du framework .Net4.5, dans la mesure où la plupart des opérations que fournissait la fabrique de tâches sont accessibles directement depuis la classe Task.

En effet, avec la version 4.0 du framework .Net, le seul moyen de créer un Task était via une fabrique, et plus particulièrement la méthode StartNew.

Avec la version 4.5 du framework .Net, il est maintenant possible d'instancier directement un Task via la méthode statique Task.Run.

Pour autant, la fabrique de tâches n'est pas totalement inutile, et voici les deux cas principaux où vous aurez à l'utiliser :

  • vous utilisez le framework .Net en version 4.0 ;
  • vous voulez définir des options de configuration par défaut pour vos tâches.

Le premier point se passant de commentaire, regardons directement le second.

Nous l'avons précédemment vu, nous pouvons modifier la manière dont une tâche va s'exécuter. Par exemple, en lui précisant un jeton d'annulation.

Il est possible de faire cela pour chaque instance de Task créée. Mais si toutes vos instances de Task partagent la même configuration, il est alors possible de créer tout d'abord une fabrique de tâches avec la configuration choisie, et ensuite d'instancier chacune de vos tâches via la fabrique de tâches. Ainsi, automatiquement, chacune de vos Task héritera de la configuration de la fabrique, rendant inutile le besoin de le préciser à chaque fois.

Exemple d'utilisation d'une fabrique de tâches
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
private static Results IsPrimeParallizedWithTaskFactory(long number, int numberOfThreads)
{
    Task<bool>[] tasks = new Task<bool>[numberOfThreads];
    try
    {
        CancellationTokenSource cancellationToken = new CancellationTokenSource();
        TaskFactory factory = new TaskFactory(cancellationToken.Token, TaskCreationOptions.None, TaskContinuationOptions.None, null);

        for (int i = 0; i < numberOfThreads; ++i)
        {
            int j = i;
            tasks[i] = factory
                .StartNew<bool>(() => { return IsPrimeInRange(number, number * j / numberOfThreads, number * (j + 1) / numberOfThreads); })
                .ContinueWith((antecedant) =>
                {
                    if (!antecedant.Result)
                    {
                        cancellationToken.Cancel();
                    }
                    return antecedant.Result;
                }, TaskContinuationOptions.NotOnCanceled);                
        }

        Task.WaitAll(tasks, cancellationToken.Token);

        return new Results()
        {
            IsPrime = tasks.All(x => x.Result == true),
            Status = tasks.Select(x => x.Status).ToArray(),
            Tasks = tasks
        };
    }
    catch (OperationCanceledException ex)
    {
        return new Results()
        {
            IsPrime = false,
            Status = tasks.Select(x => x.Status).ToArray(),
            Tasks = tasks
        };
    }
}

Malheureusement, il existe quelques limitations à ce mécanisme. Ainsi, il n'est pas possible de définir le comportement par défaut lors du chaînage de tâches avec une option comme TaskContinuationOptions. comme nous l'avions fait durant la section précédente.

Parmi les différentes options de TaskContinuationOptions qui peuvent être intéressantes à préciser au sein d'une fabrique, il y a la notion de « tâche enfant » via l'option TaskContinuationOptions.AttachToParent.

Rapidement, cette notion permet de lier la tâche créée avec la tâche « parent », c'est-à-dire la tâche en cours d'exécution lorsque la tâche enfant a été créée, et change le comportement de la tâche « parente » en fonction du comportement de ses tâches enfants. Le tableau ci-dessous ( extrait de la MSDN ) résume bien la situation :

Catégorie

Tâches enfants non attachées

Tâches enfants attachées

La tâche « parente » attend que ses tâches enfants aient terminé leur exécution.

non

oui

La tâche « parente » propage les exceptions levées par ses tâches enfants.

non

oui

Le statut de la tâche « parente » dépend du statut de ses enfants.

non

oui

V. Conclusion

À travers ce tutoriel, nous avons approfondi encore un peu plus l'utilisation du pool de threads via les notions de chaînage de tâches et de fabrique de tâches.

Le principal intérêt de ces notions est de permettre la création de liens entre les différentes tâches.

Dans le prochain tutoriel, nous verrons comme Microsoft a poussé l'intégration de l'utilisation du pool de threads au sein même du langage C#, via les mots-clés async et await apparu avec la version 4.5 du framework .Net.

Je tiens également à remercier Hinault Romaric pour sa relecture technique et f-leb pour sa relecture orthographique.

VI. Référence

Vous avez aimé ce tutoriel ? Alors partagez-le en cliquant sur les boutons suivants : Viadeo Twitter Facebook Share on Google+   

En complément sur Developpez.com

  

Les sources présentées sur cette page sont libres de droits et vous pouvez les utiliser à votre convenance. Par contre, la page de présentation constitue une œuvre intellectuelle protégée par les droits d'auteur. Copyright © 2016 François DORIN. Aucune reproduction, même partielle, ne peut être faite de ce site et de l'ensemble de son contenu : textes, documents, images, etc. sans l'autorisation expresse de l'auteur. Sinon vous encourez selon la loi jusqu'à trois ans de prison et jusqu'à 300 000 € de dommages et intérêts.