In this and the following posts I would like to take the opportunity to go into detail about the
MapReduce process as provided by
Hadoop but more importantly how it applies to
HBase.
MapReduce
MapReduce as a process was designed to solve the problem of processing in excess of terabytes of data in a scalable way. There should be a way to design such a system that increases in performance linearly with the number of physical machines added. That is what MapReduce strives to do. It follows a divide-and-conquer approach by splitting the data located on a distributed file system so that the servers (or rather cpu's, or more modern "cores") available can access these pieces and process them as fast as they can. The problem with this approach is that you will have to consolidate the data at the end. Again, MapReduce has this built right into it.
The above simplified image of the MapReduce process shows you how the data is processed. The first thing that happens is the
split which is responsible to divide the input data in reasonable size chunks that are then processed by one server at a time. This splitting has to be done somewhat smart to make best use of available servers and the infrastructure in general. In this example the data may be a very large log file that is divided into equal size pieces on line boundaries. This is OK for example for say Apache log files. Input data may also be binary though where you may have to write your own
getSplits()
method - but more on that below.
Classes
The above image also shows you the classes that are involved in the Hadoop implementation of MapReduce. Let's look at them and also at the specific implementations that HBase provides on top of those.
InputFormatThe first class to deal with is the
InputFormat
class. It is responsible for two things. First is does the actual splitting of the input data as well as returning a
RecordReader
instance that defines the classes of the
key and
value objects as well as providing a
next()
method that is used to iterate over each input record.
As far as HBase is concerned there is a special implementation called
TableInputFormatBase
as well as its subclass
TableInputFormat
. The former implements the majority of the functionality but remains abstract. The subclass is a light-weight concrete version of the TableInputFormat and is used by many supplied sample and real MapReduce classes.
But most importantly these classed implement the full turn-key solution to scan a HBase table. You can provide the name of the table to scan and the columns you want to process during the Map phase. It splits the table into proper pieces for you and hands them over to the subsequent classes. There are quite a few tweaks which we will address below and in the following installments of this series.
For now let's look at the other classes involved.
MapperThe
Mapper
class(es) are for the next stage of the MapReduce process and one of its namesakes. In this step each record read using the
RecordReader
is processed using the
map()
method. What is also visible somewhat from the first figure above is that the Mapper reads a specific type of key/value pair but emits possibly another. This is handy to convert the raw data into something more useful for further processing.
Again, looking at HBase's extensions to this, you will find a
TableMap
class that is specific to iterating over a HBase table. Once specific implementation is the
IdentityTableMap
which is also a good example on how to add your own functionality to the supplied classes. The
TableMap
class itself does not implement anything but only adds the signatures of what the actual key/value pair classes are. The
IdentityTableMap
is simply passing on the records to the next stage of the processing.
Reducer The
Reduce
stage and class layout is very similar to the Mapper one explained above. This time we get the output of a Mapper class and process it after the data was
shuffled and
sorted. In the implicit shuffle between the Mapper and Reducer stages the intermediate data is copied from different Map to the Reduce servers and the sort combines the shuffled (copied) data so that the Reducer sees the intermediate data as a nicely sorted set where now each unique key (and that is something I will get back to later) is associated with all of the possible values it was found with.
OutputFormatThe final stage is the OutputFormat class and its job to persist the data in various locations. There are specific implementations that allow output to files or to HBase tables in case of the
TableOutputFormat
. It uses a
RecordWriter
to write the data into the specific HBase output table.
It is important to note the cardinality as well. While there are many Mappers handing records to many Reducers, there is only one OutputFormat that takes each output record from its Reducer subsequently. It is the final class handling the key/value pairs and writes them to their final destination, this being a file or a table.
The name of the output table is specified when the job is created. Otherwise it does not add much more complexity. One rather significant thing it does is set the table's
auto flush to "false" and handles the buffer flushing implicitly. This helps a lot
speeding up the import of large data sets.
To Map or Reduce or Not Map or Reduce
This is now a crucial point we are at deciding on how to process tables stored in HBase. From the above it seems that we simply use a TableInputFormat to feed through a s TableMap and TableReduce to eventually persist the data with a TableOutputFormat. But this may not be what you want when you deal with HBase. The question is if there is a better way to handle the process given certain specific architectural features HBase provides. Depending on your data source and target there are a few different scenarios we could think of.
If you want to import a large set of data into a HBase table you can read the data using a Mapper and after aggregating it on a per key basis using a Reducer and finally writing it into a HBase table. This involves the whole MapReduce stack including the shuffle and sort using intermediate files. But what if you know that the data for example has a unique key? Why go through the extra step of copying and sorting when there is always just exactly one key/value pair? At this point you can ask yourself, wouldn't it be better if I could skip that whole reduce stage? The answer is
yes you can! And you should as you can harvest the pure computational power of all CPU's to crunch the data and writing it at top IO speed to its final target.
As you can see from the matrix, there are quite a few scenarios where you can decide if you want Map only or both, Map and Reduce. But when it comes to handling HBase tables as sources and targets there are a few exceptions to this rule and I highlighted them accordingly.
Same or different TablesThis is the an important distinction. The bottom line is, when you read a table in the Map stage you should consider not writing back to that very same table in the same process. It could on one hand hinder the proper distribution of regions across the servers (open scanners block regions splits) and on the other hand you may or may not see the new data as you scan. But when you read from one table and write to another then you can do that in a single stage. So for two different tables you can write your table updates directly in the
TableMap.map()
while with the same table you must write the same code in the
TableReduce.reduce()
- or in its
TableOutputFormat
(or even better simply use that class as is and you are done). The reason is that the Map stage completely reads a table and then passes the data on in intermediate files to the Reduce stage. In turn this means that the Reducer reads from the distributed file system (DFS) and writes into the now idle HBase table. And all is well.
All of the above are simply recommendations based on what is currently available with HBase. There is of course no reason not to scan and modify a table in the same process. But to avoid certain non-deterministic issue I personally would not recommend this - especially if you are new to HBase. Maybe this also could be taken into consideration when designing your HBase tables. Maybe you separate distinct data into two tables or separate column families so you can scan one table while changing the other.
Key DistributionAnother specific requirement for an effective import is to have a random keys as they are read. While this will be difficult if you scan a table in the Map phase, as keys are sorted, you may be able to make use of this when reading from a raw data file. Instead of leaving the key the offset of the file, as created by the
TextOutputFormat
for example, you could simply replace the rather useless offset with a random key. This will guarantee that the data is spread across all servers more evenly. Especially the
HRegionServers
will be very thankful as they each host as set of regions and random keys makes for a random load on these regions.
Of course this depends on how the data is written to the raw files or how the real row keys are computed, but still a very valuable thing to keep in mind.
In the next post I will show you how to import data from a raw data file into a HBase table and how you eventually process the data in the HBase table. We will address questions like how many mappers and/or reducers are needed and how can I improve import and processing performance.
Until then, have a good day!