Technologies in Hadoop Ecosystem
- Hadoop => framework for distributed processing of large datasets across clusters of computers, it can scale up to thousands of machines, where each machine offers computation and data storage.
- HBase => a scalable, distributed database that supports structured data storage for large tables.
- Hive => a data warehouse infrastructure that provides data summarization and ad hoc querying.
- Mahout => A scalable machine learning and data mining library.
- Pig => data flow language and execution framework for parallel computation.
- Spark => A fast engine for computing hadoop data.
- Storm => a scalable and reliable engine for processing data as it flows into the system
- Zookeeper => high performance coordination service for distributed applications.
Processes in Hadoop
can be found out by running command "jps"
- Namenode
- Secondary Namenode
- Datanode
- Job Tracker
- Task Tracker
Note that in the hadoop version 2.0+, job tracker and task tracker have been replaced by YARN taskmanager, and resourcemanager.
Hadoop Setup
- One machine in the cluster is specified as the namenode(which holds the metadata of the system).
- Another dedicated machine is specified as the Resource Manager
- The other machines act as a data node(where data is processed)
Namenode
- has the metadata of the hadoop job, files to be processed, data blocks, etc
- stores the file location related information in HDFS.
Datanode
- data which needs to be processed is stored on the datanode.
- mapper and reducer tasks run on the datanode.
Job Tracker
- Normally, jobtracker runs on the same machine as the namenode.
- takes the jobs from the client.
- finds out the location of the data from the namenode.
- submits the data to appropriate nodes.
Task Tracker
- Normally tasktracker runs on the data node, where processing takes place.
- send message to job tracker by sending heartbeat messages(normally every 2 minutes.)
- heartbeat messages also contain the no of slots which are available with that tasktracker.
Secondary Namenode
- keeps a backup of the namenode, and can be used if namenode goes down.
InputSplit
- chunk of data which is processed by a single mapper.
- generally equal to the HDFS block size, 256 MB.
- Is the logical division of data in HDFS while HDFS block is the physical division of data.
- hadoop job splits the datafile in multiple chunks, normally of block size, 256 MB in hadoop 2.4.0, each of which is processed by a mapper.
Types of Inputformat
- Text InputFormat (Default) => offset of line as key and line as value.
- KeyValue InputFormat => upto first tab is key, rest of line is passed as value.
- Sequence InputFormat
HDFS
- Hadoop Distributed File System
- designed to hold and process large amount of data, which can range upto terabytes, or petabytes.
- data is redundant and each block is stored in multiple machines.
Hadoop Overall flow
- Clients submit jobs to the Hadoop Job tracker.
- The JobTracker finds out the location of the data from the Namenode.
- The JobTracker locates TaskTracker nodes which have available slots for processing or are close to the data that needs processing.
- The JobTracker submits the task to the chosen TaskTracker nodes.
- If the tasktracker do not submit heartbeat signals often to the jobtracker, the work is scheduled on a different TaskTracker.
- If a task fails, the jobtracker may resubmit the job elsewhere, if a tasktracker/node fails often, it may be blacklisted, and no future tasks may be assigned to it.
- When the task is completed by a tasktracker, the JobTracker updates the status of the job.
Hadoop Task flow
Mapper => Combiner(Optional) => Partitioner(Optional) => Reducer(Optional)
Mapper
- Mapper task operate on data read from HDFS
- The total no of mappers is defined by the input file size and the block size(256 MB default in hadoop 2.4.0+).
- The no of mappers will be file size in MB divided by the block size(256 MB).
- Roughly, each machine could execute between 10 to 100 mapper tasks.
- Mapper write their intermediate output to the disk and eventually they are passed to the reducer.
Combiner
- combiners are used to reduce the flow of the data over the network between the mappers and reducers.
- operates on the data passed on by mapper.
- is like a local reducer
- on a given node, a combiner receives all data which a mapper passes.
- data is passed on to the reducer from the combiner.
- acts as a mini reducer which acts where the mapper exists.
- it is written to reduce the load on the reducer, to reduce the network traffic.
- It is not always guaranteed that a combiner will be executed.
- the output format of the combiner must match the output format of the mapper.
- example in a wordcount example, it could be used.
Partitioner
- When the data is sent from the mapper to the reducer, a mapper needs to determine where a particular key will go.
- partitioner makes sure that all the keys from a map reduce job go to the same reducer.
- We could write our custom Partitioner if we wanted to send some particular keys to some specific reducers only.
Reducer
A reducer has three phases.
- Shuffle: data is fetched from the mappers.
- Sort: the data for a key is sorted just when it is fetched
- Reduce: reduces the data with the appropriate computation of a key.
No of Reducers
- (1.75 ~ 0.95)*N*(mapreduce.tasktracker.reduce.tasks.maximum)
- With 0.95, all the reducers can start immediately whenever mapper ends.
- If we give 1.70, that increases the no of reducers, and faster reducers do their job fast, and take up the second round of extra reducing if required.
Hadoop Configuration Params
- input files location in HDFS
- output file location in HDFS, (should not exist already, otherwise, an error is thrown)
- Input format
- Output format
- Mapper class, containing the map function.
- Reducer class, containing the reduce function.
- Combined Jar, containing the Mapper, reducer, and any other classes that may be needed.
Hadoop Configuration files
- /src/core/core-site.xml => common properties for site.
- /src/hdfs/hdfs-site.xml => HDFS properties, like replication
- /src/mapred/mapred-site.xml => properties related to mapreduce
- /src/yarn/yarn-site.xml => yarn specific properties
- hadoop-env.sh => hadoop environment.
Speculative Execution
- If a hadoop processing is running slow at a node, hadoop will launch duplicate tasks for it at other nodes.
- whichever node finishes the task first acknowledges it, others are stopped.
- This is speculative execution.
YARN
- YARN was introduced in hadoop version 2.0
- YARN's resource manager, node manager, application master have deprecated the job tracker, and task tracker.
Distributed Cache
- allows you to share common configuration data across all nodes.
- normally a properties file.
- deprecated in the new version of hadoop
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| // In the driver Configuration conf = getConf(); Job job = Job.getInstance(conf, "CountJob" ); job.setMapperClass(CountMapper. class ); // ... // see the # sign after the file location. // we will be using the name after the # sign as our file name in the Mapper/Reducer job.addCacheFile( new URI( "/user/name/cache/some_file.json#some" )); job.addCacheFile( new URI( "/user/name/cache/other_file.json#other" )); // In the setup method of the mapper, which you will need to override. if (context.getCacheFiles() != null && context.getCacheFiles().length > 0 ) { File some_file = new File( "./some" ); File other_file = new File( "./other" ); // do something, like may be read them or parse as JSON or whatever. } super .setup(context); |
Counters
- counters in hadoop can be used for debugging hadoop code.
- Each mapper/reducer has its own configuration, so that there is no global configuration/counter
- feature provided by the hadoop framework that allows us to globally set some key value pairs during job execution.
- can be used to count how many records were processed successfully/unsuccessfully, etc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| // creare an enum class to be used as a counter class public static enum RECORD_COUNTER { SESSION_ID_COUNTER, USER_ID_COUNTER }; // in mapper or reducer code context.getCounter(RECORD_COUNTER.USER_ID_COUNTER).increment( 1 ); // printing the value at the end when the job is finished. Configuration conf = new Configuration(); Cluster cluster = new Cluster(conf); Job job = Job.getInstance(cluster,conf); result = job.waitForCompletion( true ); ... // job finished, get counters. Counters counters = job.getCounters(); // get counter by name and print its stats. Counter userIdCounter = counters.findCounter(RECORD_COUNTER.USER_ID_COUNTER); System.out.println(userIdCounter.getDisplayName()+ ":" +c1.getValue()); |
Sqoop
- used to transfer data from RDBMS to HDFS and vice versa.
Misc
- hadoop tries to rerun a failed job a few (default of 4) times before killing the job, and reporting the error.
- to add a node in hadoop cluster, add the host file entry in configuration file and run "hadoop dfsadmin -refreshNodes"
- WebDAV is an extension which allows us to view HDFS files as local filesystem.
- By default, data in HDFS is replicated by a factor of 3 in which 2 copies of the data are on the same rack, and one copy is in other rack.
- Map reduce is normally used to do distributed programming on a cluster on computers.
- Intially, the Master node, reduces the problems to smaller subsets, and distributes to worker nodes, like a tree structure.
- subtasks get done, and passed to upper levels.
- Master node then collects the answers to the smaller problems, and combines them to get the initial answer it needed.
- FSImage: metadata about all the file, contains filenames, permissions, block locations of each file,
- EditLog: maintains the logs of any change in the file system meta data.
- difference between Hive and HBase???
Commands
- "hadoop job -list" => list all hadoop jobs
- "hadoop job -kill
" => kill a job - "hadoop balancer" => balances the data in the data nodes.