10/31/16

Hadoop Notes

Technologies in Hadoop Ecosystem

  • Hadoop => framework for distributed processing of large datasets across clusters of computers, it can scale up to thousands of machines, where each machine offers computation and data storage.
  • HBase => a scalable, distributed database that supports structured data storage for large tables.
  • Hive => a data warehouse infrastructure that provides data summarization and ad hoc querying.
  • Mahout => A scalable machine learning and data mining library.
  • Pig => data flow language and execution framework for parallel computation.
  • Spark => A fast engine for computing hadoop data.
  • Storm => a scalable and reliable engine for processing data as it flows into the system
  • Zookeeper => high performance coordination service for distributed applications.

Processes in Hadoop

can be found out by running command "jps"
  • Namenode
  • Secondary Namenode
  • Datanode
  • Job Tracker
  • Task Tracker
Note that in the hadoop version 2.0+, job tracker and task tracker have been replaced by YARN taskmanager, and resourcemanager.

Hadoop Setup

  • One machine in the cluster is specified as the namenode(which holds the metadata of the system).
  • Another dedicated machine is specified as the Resource Manager
  • The other machines act as a data node(where data is processed)

Namenode

  • has the metadata of the hadoop job, files to be processed, data blocks, etc
  • stores the file location related information in HDFS.

Datanode

  • data which needs to be processed is stored on the datanode.
  • mapper and reducer tasks run on the datanode.

Job Tracker

  • Normally, jobtracker runs on the same machine as the namenode.
  • takes the jobs from the client.
  • finds out the location of the data from the namenode.
  • submits the data to appropriate nodes.

Task Tracker

  • Normally tasktracker runs on the data node, where processing takes place.
  • send message to job tracker by sending heartbeat messages(normally every 2 minutes.)
  • heartbeat messages also contain the no of slots which are available with that tasktracker.

Secondary Namenode

  • keeps a backup of the namenode, and can be used if namenode goes down.

InputSplit

  • chunk of data which is processed by a single mapper.
  • generally equal to the HDFS block size, 256 MB.
  • Is the logical division of data in HDFS while HDFS block is the physical division of data.
  • hadoop job splits the datafile in multiple chunks, normally of block size, 256 MB in hadoop 2.4.0, each of which is processed by a mapper.

Types of Inputformat

  • Text InputFormat (Default) => offset of line as key and line as value.
  • KeyValue InputFormat => upto first tab is key, rest of line is passed as value.
  • Sequence InputFormat

HDFS

  • Hadoop Distributed File System
  • designed to hold and process large amount of data, which can range upto terabytes, or petabytes.
  • data is redundant and each block is stored in multiple machines.

Hadoop Overall flow

  • Clients submit jobs to the Hadoop Job tracker.
  • The JobTracker finds out the location of the data from the Namenode.
  • The JobTracker locates TaskTracker nodes which have available slots for processing or are close to the data that needs processing.
  • The JobTracker submits the task to the chosen TaskTracker nodes.
  • If the tasktracker do not submit heartbeat signals often to the jobtracker, the work is scheduled on a different TaskTracker.
  • If a task fails, the jobtracker may resubmit the job elsewhere, if a tasktracker/node fails often, it may be blacklisted, and no future tasks may be assigned to it.
  • When the task is completed by a tasktracker, the JobTracker updates the status of the job.

Hadoop Task flow

Mapper => Combiner(Optional) => Partitioner(Optional) => Reducer(Optional)

Mapper

  • Mapper task operate on data read from HDFS
  • The total no of mappers is defined by the input file size and the block size(256 MB default in hadoop 2.4.0+).
  • The no of mappers will be file size in MB divided by the block size(256 MB).
  • Roughly, each machine could execute between 10 to 100 mapper tasks.
  • Mapper write their intermediate output to the disk and eventually they are passed to the reducer.

Combiner

  • combiners are used to reduce the flow of the data over the network between the mappers and reducers.
  • operates on the data passed on by mapper.
  • is like a local reducer
  • on a given node, a combiner receives all data which a mapper passes.
  • data is passed on to the reducer from the combiner.
  • acts as a mini reducer which acts where the mapper exists.
  • it is written to reduce the load on the reducer, to reduce the network traffic.
  • It is not always guaranteed that a combiner will be executed.
  • the output format of the combiner must match the output format of the mapper.
  • example in a wordcount example, it could be used.

Partitioner

  • When the data is sent from the mapper to the reducer, a mapper needs to determine where a particular key will go.
  • partitioner makes sure that all the keys from a map reduce job go to the same reducer.
  • We could write our custom Partitioner if we wanted to send some particular keys to some specific reducers only.

Reducer

A reducer has three phases.
  • Shuffle: data is fetched from the mappers.
  • Sort: the data for a key is sorted just when it is fetched
  • Reduce: reduces the data with the appropriate computation of a key.

No of Reducers

  • (1.75 ~ 0.95)*N*(mapreduce.tasktracker.reduce.tasks.maximum)
  • With 0.95, all the reducers can start immediately whenever mapper ends.
  • If we give 1.70, that increases the no of reducers, and faster reducers do their job fast, and take up the second round of extra reducing if required.

Hadoop Configuration Params

  • input files location in HDFS
  • output file location in HDFS, (should not exist already, otherwise, an error is thrown)
  • Input format
  • Output format
  • Mapper class, containing the map function.
  • Reducer class, containing the reduce function.
  • Combined Jar, containing the Mapper, reducer, and any other classes that may be needed.

Hadoop Configuration files

  • /src/core/core-site.xml => common properties for site.
  • /src/hdfs/hdfs-site.xml => HDFS properties, like replication
  • /src/mapred/mapred-site.xml => properties related to mapreduce
  • /src/yarn/yarn-site.xml => yarn specific properties
  • hadoop-env.sh => hadoop environment.

Speculative Execution

  • If a hadoop processing is running slow at a node, hadoop will launch duplicate tasks for it at other nodes.
  • whichever node finishes the task first acknowledges it, others are stopped.
  • This is speculative execution.

YARN

  • YARN was introduced in hadoop version 2.0
  • YARN's resource manager, node manager, application master have deprecated the job tracker, and task tracker.

Distributed Cache

  • allows you to share common configuration data across all nodes.
  • normally a properties file.
  • deprecated in the new version of hadoop
  • ?
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    // In the driver
    Configuration conf = getConf();
    Job job = Job.getInstance(conf, "CountJob");
    job.setMapperClass(CountMapper.class);
    // ...
    // see the # sign after the file location.
    // we will be using the name after the # sign as our file name in the Mapper/Reducer
    job.addCacheFile(new URI("/user/name/cache/some_file.json#some"));
    job.addCacheFile(new URI("/user/name/cache/other_file.json#other"));
    //  In the setup method of the mapper, which you will need to override.
    if (context.getCacheFiles() != null && context.getCacheFiles().length > 0) {
            File some_file = new File("./some");
            File other_file = new File("./other");
            // do something, like may be read them  or parse as JSON or whatever.
        }
        super.setup(context);

Counters

  • counters in hadoop can be used for debugging hadoop code.
  • Each mapper/reducer has its own configuration, so that there is no global configuration/counter
  • feature provided by the hadoop framework that allows us to globally set some key value pairs during job execution.
  • can be used to count how many records were processed successfully/unsuccessfully, etc
  • ?
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    // creare an enum class to be used as a counter class
    public static enum RECORD_COUNTER {
        SESSION_ID_COUNTER,
        USER_ID_COUNTER
    };
    // in mapper or reducer code
    context.getCounter(RECORD_COUNTER.USER_ID_COUNTER).increment(1);
    // printing the value at the end when the job is finished.
    Configuration conf = new Configuration();
    Cluster cluster = new Cluster(conf);
    Job job = Job.getInstance(cluster,conf);
    result = job.waitForCompletion(true);
    ...
    // job finished, get counters.
    Counters counters = job.getCounters(); 
    // get counter by name and print its stats.    
    Counter userIdCounter = counters.findCounter(RECORD_COUNTER.USER_ID_COUNTER);
    System.out.println(userIdCounter.getDisplayName()+":"+c1.getValue());

Sqoop

  • used to transfer data from RDBMS to HDFS and vice versa.

Misc

  • hadoop tries to rerun a failed job a few (default of 4) times before killing the job, and reporting the error.
  • to add a node in hadoop cluster, add the host file entry in configuration file and run "hadoop dfsadmin -refreshNodes"
  • WebDAV is an extension which allows us to view HDFS files as local filesystem.
  • By default, data in HDFS is replicated by a factor of 3 in which 2 copies of the data are on the same rack, and one copy is in other rack.
  • Map reduce is normally used to do distributed programming on a cluster on computers.
  • Intially, the Master node, reduces the problems to smaller subsets, and distributes to worker nodes, like a tree structure.
  • subtasks get done, and passed to upper levels.
  • Master node then collects the answers to the smaller problems, and combines them to get the initial answer it needed.
  • FSImage: metadata about all the file, contains filenames, permissions, block locations of each file,
  • EditLog: maintains the logs of any change in the file system meta data.
  • difference between Hive and HBase???

Commands

  • "hadoop job -list" => list all hadoop jobs
  • "hadoop job -kill " => kill a job
  • "hadoop balancer" => balances the data in the data nodes.

Google Q&A Forum