MapReduce — Exploration Approfondie

MapReduce — Paradigme de traitement parallèle

Phases Map/Shuffle/Reduce · Combiner · Partitioner · Simulateur interactif · Travaux Pratiques · Quiz

Concept Phases Simulateur TP Quiz

1 Concept et Historique

Qu'est-ce que MapReduce ?

MapReduce est un modèle de programmation pour traiter de très grands ensembles de données en parallèle sur un cluster. Il décompose tout traitement en deux opérations fondamentales :

MAP
Transforme chaque enregistrement en paires (clé, valeur)
REDUCE
Agrège les valeurs ayant la même clé

Idée clé : Plutôt que de déplacer les données vers le code, on déplace le code vers les données. Chaque nœud traite ses données locales → data locality.

Analogie simple — Compter des mots dans 1000 livres
Sans MapReduce : 1 personne lit 1000 livres et compte → très lent
Avec MapReduce : 100 personnes (Mappers) lisent chacune 10 livres et notent les mots → 10 personnes (Reducers) comptent par lettre → résultat en parallèle ✓
Historique
2004
Google publie le paper "MapReduce: Simplified Data Processing on Large Clusters"
2006
Yahoo! implémente MapReduce en Java → naissance d'Hadoop
2008
Hadoop 0.18 — Premier sort par un nœud externe à Yahoo
2012
Hadoop 2.0 + YARN : MapReduce devient une app parmi d'autres sur YARN
2014
Apache Spark émerge comme alternative 10-100× plus rapide pour workloads itératifs
2024
MapReduce reste fondamental pour batch processing massif et l'apprentissage de Hadoop

2 Les Phases de MapReduce en Détail

Flux complet d'un job MapReduce
 HDFS Input
 ┌──────────────────────────────────────────────────────────────────────────────┐
 │  InputFormat → InputSplit → RecordReader → (key, value) pairs               │
 └──────────────────────────────────────────────────────────────────────────────┘
                                    │
                          ┌─────────▼──────────┐
                          │   MAP  (Mapper.java) │  × N tâches parallèles
                          │  map(key, value)     │
                          │  context.write(k, v) │
                          └─────────┬────────────┘
                                    │ (intermediate key, value) pairs
                          ┌─────────▼────────────────────────────┐
                          │  Buffer circulaire (100 MB défaut)    │
                          │  → Spill sur disque si 80% plein      │
                          │  → Tri + Combiner (local pre-reduce)  │
                          └─────────┬────────────────────────────┘
                                    │
                          ┌─────────▼──────────────┐
                          │   SHUFFLE & SORT         │
                          │  Partitioner → réseau    │
                          │  Merge sort côté Reducer │
                          └─────────┬────────────────┘
                                    │ (key, [v1, v2, v3, ...])
                          ┌─────────▼────────────────┐
                          │  REDUCE (Reducer.java)    │  × M tâches parallèles
                          │  reduce(key, Iterable<V>) │
                          │  context.write(k, result) │
                          └─────────┬────────────────┘
                                    │
 HDFS Output
 ┌──────────────────────────────────────────────────────────────────────────────┐
 │  OutputFormat → part-r-00000, part-r-00001, ... (un fichier par Reducer)    │
 └──────────────────────────────────────────────────────────────────────────────┘
Phase MAP — Transformation

Le Mapper reçoit des paires (clé, valeur) en entrée et produit des paires (clé intermédiaire, valeur) en sortie. Chaque Mapper traite un seul InputSplit (≈ 1 bloc HDFS).

Cycle de vie d'un Mapper
setup() Initialisation (connexion DB, chargement config)
map() Traitement ligne par ligne — émission (k,v)
cleanup() Libération ressources
InputFormats courants
InputFormatCléValeurUsage
TextInputFormatOffsetLigne texteFichiers texte (défaut)
KeyValueTextInputFormat1er champResteCSV/TSV
SequenceFileInputFormatCustomCustomDonnées binaires
NLineInputFormatOffsetN lignesContrôle split size
Exemple — WordCount Mapper
public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

  private final Text word = new Text();
  private final IntWritable one = new IntWritable(1);

  @Override
  protected void map(LongWritable key, Text value, Context ctx)
      throws IOException, InterruptedException {

    // value = une ligne du fichier texte
    String[] words = value.toString().toLowerCase()
      .split("\\s+");

    for (String w : words) {
      if (!w.isEmpty()) {
        word.set(w);
        ctx.write(word, one); // ("hello", 1)
      }
    }
  }
}
Buffer circulaire (mapreduce.task.io.sort.mb)
Les sorties du Mapper sont d'abord écrites dans un buffer circulaire en RAM (100 MB défaut). Quand il atteint 80%, un spill sur disque est déclenché. À la fin du Mapper, tous les spills sont fusionnés et triés.
Shuffle & Sort — La phase cachée

Invisible pour le développeur, mais la plus coûteuse. Les données intermédiaires sont triées, partitionnées et transférées sur le réseau depuis les Mappers vers les Reducers.

Côté Mapper
1 Partitionneur : chaque paire (k,v) → affectée à 1 Reducer selon hash(k) % numReducers
2 Tri local dans le buffer (spill trié par clé)
3 Combiner optionnel (pre-reduce local → réduit le trafic réseau)
4 Fusion (merge) de tous les spills en 1 fichier trié par clé
Côté Reducer
5 Fetch (copie HTTP) des données depuis chaque Mapper en parallèle
6 Merge sort des données reçues (tri externe sur disque si nécessaire)
7 Regroupement : toutes les valeurs d'une même clé → une invocation du Reducer
// Partitionneur par défaut (HashPartitioner)

partition = Math.abs(key.hashCode() % numReducers)

// Exemple avec 3 Reducers :
"hadoop" hash=231 → 231 % 3 = 0 → Reducer 0
"spark" hash=114 → 114 % 3 = 0 → Reducer 0
"hive" hash=310 → 310 % 3 = 1 → Reducer 1
"yarn" hash=422 → 422 % 3 = 2 → Reducer 2
Data Skew — Le problème numéro 1
Si une clé est très fréquente (ex: "the" dans WordCount), 1 Reducer reçoit des millions de valeurs pendant que les autres sont libres → goulot d'étranglement. Solutions : Combiner, clé composite, Partitioner custom.
Configs clés Shuffle
mapreduce.task.io.sort.mb = 100
mapreduce.map.sort.spill.percent = 0.8
mapreduce.reduce.shuffle.parallelcopies = 5
Phase REDUCE — Agrégation

Le Reducer reçoit (clé, Iterable<valeurs>) — toutes les valeurs ayant la même clé — et produit le résultat final écrit dans HDFS.

Exemple — WordCount Reducer
public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

  private IntWritable result = new IntWritable();

  @Override
  protected void reduce(Text key, Iterable<IntWritable> values,
      Context ctx) throws IOException, InterruptedException {

    int sum = 0;
    for (IntWritable val : values) {
      sum += val.get(); // additionne tous les 1
    }
    result.set(sum);
    ctx.write(key, result); // ("hello", 42)
  }
}
Nombre de Reducers — Quel impact ?
ReducersCas d'usageComportement
0Map-Only jobPas de shuffle, sortie directe en HDFS. Très rapide !
1Petit volumeSortie triée globalement. 1 seul fichier part-r-00000
NProductionParallélisme max. N fichiers en sortie.
Règle empirique — Nombre de Reducers
numReducers ≈ 0.95 × (nodes × mapred.tasktracker.reduce.tasks.maximum)
Ou plus simplement : visez que chaque Reducer traite entre 1 et 5 minutes et produit un fichier de 128 MB à 1 GB.
OutputFormats courants
OutputFormatFormat sortie
TextOutputFormatclé TAB valeur (défaut)
SequenceFileOutputFormatBinaire compact (réutilisable)
NullOutputFormatAucune sortie (compteurs uniquement)
MultipleOutputsPlusieurs fichiers de sortie
Combiner — Mini-Reducer local

Le Combiner s'exécute sur chaque Mapper avant le transfert réseau. Il réduit considérablement la quantité de données à shuffler.

Sans Combiner : Mapper envoie → ("hello",1) ("hello",1) ("hello",1) = 3 paires
Avec Combiner : Mapper envoie → ("hello",3) = 1 seule paire ✓
Attention : Utilisable seulement si la fonction est commutative et associative (somme ✓, max ✓, moyenne ✗).
Partitioner custom

Par défaut, les clés sont distribuées par hash. Si les données sont déséquilibrées, un Partitioner custom évite le data skew.

public class ZonePartitioner extends Partitioner<Text, IntWritable> {
  public int getPartition(Text key, IntWritable val, int numReducers) {
    char first = key.toString().charAt(0);
    if (first < 'h') return 0 % numReducers;
    if (first < 'p') return 1 % numReducers;
    return 2 % numReducers;
  }
}
Paramètres de performance clés
ParamètreDéfautImpactConseil
mapreduce.task.io.sort.mb100 MBTaille buffer MapperAugmenter si RAM disponible (256-512)
mapreduce.map.sort.spill.percent0.80Seuil de spill0.9 pour moins de spills
mapreduce.reduce.shuffle.parallelcopies5Copies réseau parallèles10-20 pour gros clusters
mapreduce.map.memory.mb1024 MBRAM par MapperAdapter à la taille des données
mapreduce.reduce.memory.mb1024 MBRAM par Reducer2048+ pour shuffles lourds

3 Simulateur MapReduce Interactif

Simulez WordCount sur votre texte

4 Travaux Pratiques

TP 1 — WordCount complet avec Combiner
Débutant

Implémenter le job MapReduce classique WordCount complet avec Driver, Mapper, Reducer et Combiner. Filtrer les mots courts (< 3 chars).

TP 2 — Température maximale par ville
Intermédiaire

Fichier CSV : "Paris,18.5\nLyon,22.1\nParis,21.3\nLyon,19.8". Calculer la température maximale par ville avec un Reducer.

TP 3 — Filtrage logs Apache (Map-Only)
Intermédiaire

Filtrer les lignes de logs Apache contenant "ERROR" ou "WARN" depuis un fichier HDFS. Utiliser 0 Reducer pour maximiser la performance.

TP 4 — Index inversé (mot → documents)
Avancé

Construire un index inversé : pour chaque mot, lister les fichiers HDFS qui le contiennent. Utilisé par les moteurs de recherche.

5 Quiz — MapReduce

Quiz MapReduce — 10 questions
Score : 0 / 0
Question 1 / 10