Procesarea de fișiere mici comprimate în hadoop folosind combinefileinputputformat

Articolul detaliază exemplele care demonstrează extinderea și implementarea clasei CombineFileInputFormat pentru citirea conținutului fișierelor gzip (codecul implicit) în timpul rulării. Veți învăța cum să aplicați CombineFileInputFormat cu tehnologia MapReduce pentru a separa dimensiunea porțiunii de date consumate de interfața Mapper de dimensiunea blocului de fișiere în HDFS.







Sujay Som. consultant de mari dimensiuni și dezvoltator de aplicații, IBM

Biblioteca programului Apache Hadoop vă permite să procesați date într-o varietate de formate, de la fișiere text simple la baze de date. În mediul MapReduce, clasa InputFormat este utilizată pentru a evalua specificația de intrare și pentru a rupe fișierele de intrare în obiecte logice InputSplit, fiecare fiind apoi atribuită unei instanțe Mapper separate.

FileInputFormat. care este clasa de bază pentru toate formatele de fișiere InputFormat. are următoarele subclase directe:

  • TextInputFormat
  • SequenceFileInputFormat
  • NLineInputFormat
  • KeyValueTextInputFormat
  • CombineFileInputFormat

Fiecare din aceste clase InputFormat oferă o implementare generică a metodei getSplits (JobContext). care poate înlocui, de asemenea, metoda isSplitable (JobContext.Path) astfel încât fișierele de intrare să nu fie separate și să fie complet procesate de interfața Mapper. Acesta implementează clasa createRecordReader. moștenit de la clasa org.apache.hadoop.mapreduce.InputFormat. care este folosit pentru a colecta înregistrările de intrare de la obiectul InputSplit pentru procesare de către interfața Mapper.

InfoSphere BigInsights Quick Start Edition

Cu toate acestea, Hadoop funcționează mai eficient cu un număr mic de fișiere mari decât cu multe fișiere mici. (În acest caz, „imagine mică“ înseamnă că dimensiunea sa este mult mai mică decât unitatea Distributed System Dimensiune fișier Hadoop (HDFS)). Problema este rezolvată prin crearea unei clase CombineFileInputFormat. care funcționează efectiv cu fișiere mici, astfel încât clasa FileInputFormat creează o defalcare a fișierelor. CombineFileInputFormat pune o mulțime de fișiere în fiecare element al partiției astfel încât fiecare instanță a Mapper-ului primește mai multe date pentru procesare. Clasa CombineFileInputFormat poate oferi beneficii și atunci când procesează fișiere mari. De fapt, acesta separă cantitatea de date consumate de interfața Mapper de dimensiunea blocului de fișiere în HDFS.

În prezent, CombineFileInputFormat este o clasă abstractă în bibliotecă de clasă Hadoop (org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat) fără o implementare specifică. Pentru a folosi CombineFileInputFormat, trebuie să creați o subclasă specifică a CombineFileInputFormat și să implementați metoda CombineFileInputFormat și să implementați createRecordReader (). care instanțiează clasa delegată CustomFileRecordReader. Extinde RecordReader. folosind propriul designer special. Această abordare impune, de asemenea, ca CustomWritable să fie creat ca o cheie pentru clasa Hadoop Mapper. Pentru fiecare rând, tasta CustomWritable va conține numele fișierului și lungimea offset-ului pentru acea linie.

Citiți exemplul lui Hadoop. care demonstrează modul în care se utilizează CombineFileInputForma pentru a număra aparițiile de cuvinte în fișierele text dintr-un director de intrare dat.

Aici veți afla cum să extindeți și să implementați CombineFileInputFormat. adăugând posibilitatea de a citi conținutul fișierelor gzip (codecul implicit) în timpul rulării în formă neambalată. Vom produce rezultatul cheie-valoare, în cazul în care cheia - este o combinație a numelui fișierului și a liniei de compensare, iar valoarea - este o reprezentare textuală a acestui șir de caractere. Exemplul din acest articol utilizează CustomInputFormat în mediul MapReduce. Vom dezvolta următoarele clase cheie:

CompressedCombineFileInputFormat Extinde CombineFileInputFormat createRecordReader și vinde pentru transferul către cititor înregistrările care realizează logica pentru CompressedCombineFileRecordReader Unite extinde fișierele RecordReader și este delegând clasa CombineFileRecordReader CompressedCombineFileWritable Implementează Interface WritableComparable. stochează numele fișierului și offset, precum și înlocuiește metoda compareTo. pentru a compara primul nume al fișierului, și apoi offset.

Exemplul CompressedCombineFileInputFormat utilizează programul MapReduce care utilizează datele publice meteorologice istorice ale Administrației Naționale Oceanic si Atmospheric (National Oceanic si Atmospheric Administration, NOAA). NOAA colectează statistici privind performanțele extreme pentru stațiile de noi folosind rezumate lunare pentru cele trei date meteorologice - temperatura, precipitații și zăpadă. Așa cum este utilizat aici, un exemplu calculează temperatura maximă a datelor meteorologice (Climatic Data Center National), care sunt disponibile în fișierul comprimat (gzip). Secțiunea Resurse oferă informații suplimentare cu privire la modul de a obține date meteorologice.

Formatul de intrare pentru mai multe fișiere comprimate

Soluția noastră utilizează un CombineFileInputFormat comprimat. care utilizează trei clase specifice:

  • O subclasă a implementării abstracte a CombineFileInputFormat (org.apache.hadoop.mapreduce.lib.input .CombineFileInputFormat)
  • O subclasă specifică a RecordReader (org.apache.hadoop.mapreduce.RecordReader)
  • Clasa specială scrisă. care implementează WritableComparable și generează o cheie de linie de fișier care conține numele fișierului și offsetul liniei.

CompressedCombineFileInputFormat

CompressedCombineFileInputFormat.java - o CombineFileInputFormat subclase. Se pune în aplicare InputFormat.createRecordReader (InputSplit, TaskAttemptContext) pentru construirea de instalații pentru RecordReader subcolecția fișiere de intrare CombineFileSplit. Spre deosebire de FileSplit. clasa CombineFileSplit nu este partajarea de fișiere, precum și împărțirea de intrare fișiere în seturi mai mici. Separarea poate conține blocuri de fișiere diferite, dar toate blocurile din același set trebuie să fie local pentru un anumit post. CombineFileSplit folosit pentru a pune în aplicare RecordReader citind câte un rând pentru fiecare fișier. Bisect fișiere nu sunt necesare, astfel încât isSplitable () metoda se înlocuiește pentru a returna valoarea fals (în caz contrar ar fi setat la true în mod implicit). Un exemplu este prezentat în listare 1.

Listing 1. CompressedCombineFileInputFormat.java

CompressedCombineFileRecordReader

CompressedCombineFileRecordReader.java este o clasă delegată a programului CombineFileRecordReader. care este clasa generică RecordReader, care poate emite diferite instanțe ale RecordReader la fiecare bloc din CombineFileSplit. CombineFileSplit poate combina fragmente de date din mai multe fișiere. Vă permite să utilizați instanțe diferite ale recordReader pentru a procesa fragmente de date din diferite fișiere.

Atunci când sarcina este numit Hadoop, CombineFileRecordReader citește dimensiunile tuturor fișierelor în calea de intrare HDFS, ceea ce este necesar pentru a procesa și de a decide cât de mult doriți să divizare, bazată pe MaxSplitSize valoare. Pentru fiecare set separat (care ar trebui să fie un fișier, ca isSplitabe () returneaza o valoare false și înlocuit) CombineFileRecordReader CompressedCombineFileRecordReader creează o instanță și transmite CombineFileSplit. context și index pentru CompressedCombineFileRecordReader pentru a detecta fișiere pentru procesare.







După instanțiarea CompressedCombine, FileRecordReader determină dacă fișierul de intrare conține codecul de compresie (org.apache.hadoop.io.compress.GzipCodec). În cazul în care conține, fișierul este despachetat în timpul rulării și utilizat pentru procesare. În caz contrar, este considerat un fișier text. Atunci când CompressedCombineFileRecordReader este procesată, acesta creează un CompressedCombineFileWritable ca cheie pentru clasa numită Mapper. Pentru fiecare citire a șirului, CompressedCombineFileWritable conține numele fișierului și lungimea de offset a acestui șir, așa cum se arată în exemplul MapReduce.

Listă 2 arată un exemplu de CompressedCombineFileRecordReader.java.

Listing 2. CompressedCombineFileRecordReader.java

CompressedCombineFileWritable

Clasa CompressedCombineFileWritable.java implementează interfața WritableComparable și extinde org.apache.hadoop.io.Writable, java.lang.Comparable. după cum se arată în Lista 3.

Writable este un obiect serializabil care implementează un protocol de serializare simplu și eficient bazat pe DataInput și DataOutput. Orice tip de cheie sau valoare în mediul MapReduce implementează această interfață. Implementările folosesc de obicei metoda statică de citire (DataInput). care creează o nouă instanță, apelează metoda readFields (DataInput). și returnează o instanță.

Comparabil este o interfață care este un element al Java ™ Collections Framework (JCF). Aplică o comandă completă la obiectele din fiecare clasă care o implementează. O astfel de ordonare se numește ordinea naturală a unei clase. Metoda comparației de clasă Se numește metoda de comparație naturală. Listele (sau arrays) de obiecte care implementează această interfață pot fi sortate automat folosind Collections.sort (și Arrays.sort). Obiectele care implementează această interfață pot fi utilizate ca chei într-o colecție sortată Map sau ca elemente dintr-un set sortat, fără a fi nevoie să specificați un comparator.

În legătură cu astfel de proprietăți, puteți compara CompressedCombineFileWritable utilizând comparatori. Comanda hashCode () este adesea folosită în Hadoop pentru a separa cheile. Este important faptul că implementarea hashCode () returnează același rezultat pentru diferite instanțe JVM. Implementarea implicită a hashCode () în Object nu satisface această proprietate. Prin urmare, metodele hashCode (). equals () și toString () sunt înlocuite pentru a asigura coerența și eficiența.

Listă 3. CompressedCombineFileWritable.java

Exemplu MapReduce

Exemplul din această secțiune arată modul de utilizare a CompressedCombineFileInputFormat cu programul MapReduce. Programul MapReduce folosește date istorice ale NOAA privind vremea cu statisticile terminate ale indicatorilor extreme pentru posturile din SUA, sub forma unor rapoarte lunare despre temperatură, precipitații și zăpadă. Exemplul calculează temperaturile maxime din datele meteorologice care se află într-un fișier comprimat (gzip).

Exemplul discută diferite aspecte ale utilizării CompressedCombineFileInputFormat. Pentru a executa exemplul, a fost utilizat InfoSphere BigInsights Quick Start Edition.

  • Mediul Hadoop este implementat și funcționează.
  • Datele necesare pentru acest exemplu sunt descărcate de pe site-ul Centrului Național de Date Climatice (NCDC) al NOAA.
  • Datele încărcate sunt găzduite în HDFS.
  • Datele statistice primite sunt procesate pe platforma InfoSphere BigInsights Quick Start Edition (într-un mediu non-producție).

Rularea exemplului

Clasele CompressedCombineFileInputFormat sunt împachetate într-un fișier JAR (CompressedCombine-FileInput.jar), care poate fi menționat în alte proiecte. Vom folosi programe MapReduce separat, pentru a arăta diferența de performanță atunci când se utilizează formatul de intrare implicit (org.apache.hadoop.io.Text) și un format de intrare specială (com.ssom.combinefile.lib.input. CompressedCombineFileInputFormat).

Specificații Introducere

Datele CCDC sunt disponibile publicului. Tabelul 1 prezintă formatul de date NCDC în exemplele noastre.

Tabelul 1. Datele CCDC

Fișierele de date sunt organizate de date și stații meteorologice. Există un catalog pentru fiecare an din anul 1901. Fiecare catalog conține un fișier comprimat pentru fiecare stație meteorologică cu date pentru fiecare an. Figura 1 prezintă primele intrări pentru anul 1947.

Figura 1. Exemplu de listă de fișiere

Procesarea de fișiere mici comprimate în hadoop folosind combinefileinputputformat

Am ales datele pentru 1901, 1902 și 1947, care sunt conținute în mai mult de 1000 de fișiere comprimate. Dimensiunea totală a fișierului este de aproximativ 56 MB.

Utilizând formatul de intrare implicit

Clasa Mapper este un tip generic cu patru parametri de tip formal care specifică tipurile de taste de intrare, valoarea de intrare, tasta de ieșire și valoarea de ieșire a funcției de hartă, după cum se arată mai jos.

Pentru formatul de intrare implicit:

  • Cheia de introducere este o valoare de offset lungă întreg
  • Valoarea de intrare este un șir de text
  • Cheia de ieșire este anul
  • Valoarea de ieșire este temperatura aerului (valoare întregă)

Metoda hartă () primește o cheie și o valoare, transformând valoarea textului care conține șirul de intrare în șirul Java. Apoi utilizează metoda substring () pentru a prelua valorile. În acest caz, Mapper înregistrează anul ca obiect Text (pentru că îl folosim doar ca o cheie), iar temperatura se prăbușește într-un obiect de tipul Intwritable. Înregistrarea de ieșire este fixă ​​numai dacă temperatura este prezentă, iar codul de calitate indică corectitudinea măsurătorilor de temperatură.

Clasa Reducer este, de asemenea, un tip generic. Are patru parametri de tip formal folosiți pentru a specifica tipurile de intrări și ieșiri. Tipurile de ieșiri ale funcției de reducere sunt Text și Intrusibile. an și temperatura maximă a acestuia, obținută prin iterarea temperaturilor și compararea fiecăruia cu cea a celei mai mari valori găsite. Definiția clasei Reducer este după cum urmează:

Rezultatul va arăta ca cel prezentat mai jos, dacă există intrări pentru mai mulți ani.

Figurile 2 și 3 arată rezultatele execuției programului utilizând formatul de intrare implicit (Text) din consola administrativă InfoSphere BigInsights.

Figura 2. Fila Stare aplicație - execuție implicită

Procesarea de fișiere mici comprimate în hadoop folosind combinefileinputputformat

Figura 4 arată rezultatul executării programului MapReduce.

Figura 4. Conținutul fișierului de răspuns - execuție implicită

Procesarea de fișiere mici comprimate în hadoop folosind combinefileinputputformat

Utilizând un format de intrare special

Acum folosim din nou clasa Mapper, care este un tip generic, cu CompressedCombineFileWritable ca tastă de intrare, după cum se arată mai jos.

Clasa Reducer rămâne aceeași, fără modificări și are următoarea definiție:

Pentru a determina formatul de intrare al jobului, folosim:

Exemple cheie și valori:

Figurile 5 și 6 arată rezultatul utilizării unui format de intrare special (CompressedCombineFileInputFormat) în consola administrativă InfoSphere BigInsights.

Figura 5. Panoul de stare a aplicației - execuție utilizând un format special

Procesarea de fișiere mici comprimate în hadoop folosind combinefileinputputformat

Figura 7 prezintă rezultatul executării programului MapReduce.

Figura 7. Conținutul fișierului de răspuns - execuție utilizând un format special

Procesarea de fișiere mici comprimate în hadoop folosind combinefileinputputformat

concluzie

Dacă este posibil, este recomandabil să evitați să procesați o mulțime de fișiere mici, deoarece MapReduce funcționează cel mai bine dacă poate funcționa cu viteza de transfer a datelor din discuri în cluster. Procesarea multor fișiere mici mărește numărul de operații de căutare necesare pentru a finaliza lucrarea. Dacă din anumite motive, de afaceri sau strategice, aveți un număr mare de fișiere mici și HDFS disponibile, atunci puteți utiliza clasa CombineFileInputFormat. CombineFileInputFormat nu este utilă numai pentru lucrul cu fișiere mici, dar poate și să îmbunătățească performanțele atunci când procesează fișiere mari.

Obțineți produse și tehnologii







Trimiteți-le prietenilor: