Hadoop Distributed File System

Hadoop Distributed File System
In this post we will discuss about Hadoop Distributed File System.
Hadoop is divided into two parts :
1) HDFS – Storing files
2) MapReduce – Processing of file.
HDFS: It is a special file system for storing large data on a cluster of commodity hardware in a streaming access pattern. Streaming access pattern means you can write once, read any number of times but can’t change the content of that file once it is kept in HDFS.
HDFS is suitable for batch processing – data access has high throughtput rather than low latency. It supports very large data sets. It manage file storage across multiple disks. Each disk is available on different machine in a cluster.

Difference – Unix file system and HDFS
In Unix file system default size of the block is of 4KB. Suppose your file is of 6KB, then you require 2 blocks in Unix each of 4KB. So total 8KB is used, but actually you require 6KB that extra 2KB is wasted. In HDFS default size of the block is of 64MB (128MB). Suppose your file is of 200MB, then HDFS requires 4 blocks (3 of 64MB + 1 of 8MB). Extra space of 56MB in last block is not engaged. Insted extra space is relieved.

Why block size is of 64MB(128MB) ?
NameNode maintaining metadata for each block in HDFS. If block size is kept small,maintenance of metadata itself would engage much more space relatively. To reduce this overhead block size is kept large enough. Large enough block size reduces network traffic. Due to large block size Hadoop can fetch at a time 64MB of data for processing.
Services in Hadoop
1. Namenode
2. Datanode
3. NodeMananger
4. ResourceMananger
5. SecondaryNamenode

Namenode contains metadata of the datanodes. It is basically like table of contents(Index table). It maintains directory structure. Any request from client is passed through namenode.Datanode contains actual unstructured data.
There are multiple number of Namenodes are present in production. Each stores metadata and block mapping of files and maintain directory structure. List of sub directories managed by namenode is called namespace volume. Blocks for files belonging to namespace is called block pool. So in case if one namenode is failed namespace volume managed by other namenode is still accessible. So entrie cluster dosent goes down.




Storing files in HDFS:


Large text file of GB,PB in size are break into data blocks. Each block is of same size. Everytime HDFS deals with block only. Higly fault tolerant is achieved by replicationof blocks. Default replication factor is 3. Processing time is ensured by equal size of block.
As there is trade off with respect to block size
  • Increase block size – Reduce parallelism
  • Small block size   – Increase overhead to maintain metadata

Time take to read a block data from disk is broken into 2 parts
Use metadata in the name node to lookup block locations
Read block from respective location

Detail working of HDFS:

1. The client has a file of size, say, 200MB and want to put in HDFS.
2. Client sends a request to a NameNode about list of available blocks.
3. NameNode, in response, provides the list of available blocks and maintains metadata of file (e.g. Block Number 1,3,5,7).
4. The client sends the file to available block and HDFS internally splits that file according to the block size.
5. This 200MB file is divided into a.txt(64MB), b.txt(64MB), c.txt(64MB) and d.txt(8MB). Default replication factor in hdfs is 3. Client puts the file a.txt in DataNode 1 (block 1) and this block is replicated into block numbered 2 and 4.
6. Block 4 sends an acknowledgement(ack) to block 2 and block 2 sends ack to block 1.
7. Block 1 gives ack to client. This ack contains information about replication of block 1 data.
8. After a specific interval of time every block sends block report and a heartbeat (default time is 3 second) to NameNode.
9. The block report contains the metadata of the block and the heartbeat of a node symbolizes that node is alive.
10. If any DataNode in the cluster fails, NameNode allocates another node which contains that replicated information and manages another free node to maintain the replication factor.
11. Succedingly, JobTracker comes into the picture.
12. JobTracker needs data for processing. So it contacts NameNode for block info. JobTracker sends code on particular DataNode for processing.
13. As in master slave communication JobTracker cannot directly contact DataNode, it contacts TaskTracker.
14. Map: The process where JobTracker sends information to TaskTracker.
15. TaskTracker then communicate with DataNode and processes the data available in DataNode.
16. Reducer resides in any of the DataNode that combines all o/p files.
17. Alongwith heartbeat, info about o/p file is stored in metadata.
18. The client will know o/p info by reading metadata and will directly fetch that o/p file.
19. If DataNode fails, then JobTracker assign the task to another DataNode where the replicated data is available.
How JobTracker tracks the alive status of TaskTracker? TaskTracker gives hearbeat to JobTracker after every 3 second.





What is Big Data

What is BIG DATA?
BIG DATA is “the next frontier for innovation”.
What is BIG-DATA?
The data which are beyond storing and processing capacity of a conventional database management systems is called “Big Data”. A Huge amount of data is generated daily in PetaBytes, and data generation rate is rapidly increasing.

Characterization of BIG-DATA by “4V’s”
Volume: It is very common to have Terabytes and Petabytes of the storage system for enterprises. (Volume is nothing but Size of data: MB, TB, PB, EB, ZetaB, YottaB…)
Velocity: Traversing of data through the network for processing.
Variety: Structured, Semi-Structured, and UnStructured data.
Veracity: Uncertainty of data.

Sources of BIG-DATA
The data is coming from various sources: – transactions, social media, sensors, digital images, cc camera, online shopping, Airlines-black box, videos, audios, Search engine and click-streams for domains including healthcare, retail, energy, and utilities. In last decade’s 90% of data is generated from all data available in the world. Ex. New York Stock exchange – 1TB/day, Facebook-1PB/day, Internet Archive – 20 TB/month, Large Hadron Collider near Geneva – 15 PB/year.
Where is the use of BIG-DATA
1. Understanding and Targeting Customers.
2. Understanding and Optimizing Business Processes.
3. Improving Healthcare and Public Health.
4. Improving Science and Research.
5. Optimizing Machine and Device Performance.
6. Financial Trading. and in so many fields.
Different types of Data
1. Structured Data :
All data which can be stored in the database in a row and column format i.e. Relational database and it is very simple to manage. Structured data is only 5-10% of all informatics data.
2. Semi-structured Data :
Semi-structured data doesn’t reside in RDBMS but have some organizational properties that make it easier for analyses. Ex. Log files, CSV, XML.
3. Unstructured Data :
Remaining all data is considered as unstructured data, it contains video, images, email photo, audio, video, web pages and much more. It doesn’t fit neatly into the database. Unstructured data contributes 80% of all informatics data. The growth of unstructured data in exponential than the other types of data. This data is either machine generated or human generated.
Machine-generated data: Satellite images, scientific data, Photographs, Videos, Radar or Sensor data and so many.
Human-generated unstructured data: Mobile data, Website data, Social Media data, Text data and so many.
Challenges with BIG-DATA

1) Capturing & Storing the data. (Collection and Storage)
2) Understanding and analysis of the data. (Data Analysis)
3) Synchronization across the Data Sources. (Data Transfer)
4) Getting and displaying meaningful Information out of that data. (Visualization)
Limitations of RDBMS
1) RDBMS is not able to handle huge data volumes properly, it needs to scale up database management system vertically.
2) The majority of the data comes in a semi-structured or unstructured format. RDBMS can handle only structured data.
3) Big Data generated at very high velocity.RDBMS lacks in high velocity because it’s designed for steady data retention rather than rapid growth.Even if RDBMS is used to handle and store “big data,” it will turn out to be very expensive.
Tools for BIG-DATA

NoSQL: MongoDB, CouchDB, Cassandra, Redis, BigTable, HBase, Zookeeper
MapReduce: Hadoop, Hive, Pig, Cascading, Cascalog, Caffeine, S4, MapR, Flume, Kafka, Oozie
Storage: S3, Hadoop Distributed File System.
Servers: EC2, Google App Engine, Elastic, Beanstalk, Heroku
Processing: R, Yahoo! Pipes, Mechanical Turk, ElasticSearch, Datameer, BigSheets, Tinkerpop
Applications of BIG-DATA
Online advertising
Stock exchange analysis
Social networking analysis
Spam filtering
Telecommunication network monitoring and much more.
Case Study
Current Storage = 300 PB
Process/Day = 600TB
User/Month = 1 billion
Like/Day = 2.7 billion
Photo uploaded/Day = 300 million

Current Storage = 5 EB
Process/Day = 30 PB
NSA toches 1.6% of internet traffic/day
(web search, website visited, phone calls. credit/debit card transactions, health, and finance info)

Current Storage = 15 EB
Process/Day = 100 PB
Searches/Second = 2.3 million
Unique Search/Month > 1 billion

Users: 37 million in 2009
Users: 450 million in 2016
Big Data System Requirement
To understand the working of tools used to process a large amount of data, you must have to understand the working of distributed computing framework.
Storage: Store the massive amount of data
Process: Process the data in a timely manner
Scale: Scale easily as data grows
Traditional data technologies not able to handle and process such huge amount of data.

Approach two solve Big Data problems
We can solve big data problems in two ways using
Scale In

Challenges with scaling up:
Less reliable
Challenges with scaling up:
Co-ordinate between machines
Handling failure of machines
Hadoop is the solution to handle BigData

Tools for BIG-DATA


YARN in hadoop

YARN – “Yet Another Resource Negotiator”.
In this article, we will discuss the YARN. With the introduction of YARN, Hadoop becomes more powerful. YARN is introduced in Hadoop 2.x YARN provides advantages over previous versions of Hadoop including better scalability, cluster utilization, and user agility. YARN provides full backward compatibility with existing MapReduce task and application. YARN project started by Apache community to give Hadoop the ability to run the non-MapReduce program on Hadoop framework.
Fig. illustrate how YARN fits into the new Hadoop ecosystem.
Resource Manager runs on a single master node and node manager runs on all other nodes

Services in Hadoop (2.x)

After installation of Hadoop 2.x, format namenode and start all services using start-all.sh, the command on the terminal and then enter jps command. If all services shown in fig.2 are running, then installation of Hadoop is successful.

JobTracker is responsible for the Resource Manager. The fundamental idea of YARN is to split the two major responsibilities of the Job-Tracker – that is, resource management and job scheduling/monitoring—into separate daemons: a global ResourceManager and a per-application ApplicationMaster.

The NodeManager is the per-machine slave, which is responsible for launching the application’s containers, monitoring their resource usage (CPU, memory, disk, network), and reporting the same to the ResourceManager. The ResourceManager divides the resources among all the applications in the system. The ResourceManager has a pluggable scheduler component, which is responsible for allocating resources to the various running applications. The scheduler performs its scheduling function based on the resource requirements of an application by using the abstract notion of a resource container, which incorporates resource dimensions such as memory, CPU, disk, and network. The per-application ApplicationMaster is, performs negotiating for resources from the ResourceManager and working with the NodeManager(s) to execute and monitor the component tasks.

Execution steps of YARN:

  • Container executes a specific application
  • Single NodeManager can have multiple numbers of containers
  • After the container has been assigned to NodeManager, Resource Manager starts Application Master process within the container
  • Perform computation required for the task
  • In MapReduce Application Master process is mapper/reducer process
  • If Application Master requires extra resources then Application Master running on Node Manager request to Resource Manager for additional resources, additional resources are in the form of containers
  • Resource Manager scans the cluster and finds out free Node Manager
  • Requested NodeManager don’t have info about the other free NodeManager
  • Application Master on the original node starts off the Application Master on newly assigned nodes.

Various scheduler options

  1. FIFO Scheduler: Basically a simple “first come, first served” scheduler in which the Job-Tracker pulls jobs from a work queue, oldest job first.
  2. Capacity scheduler:-The Capacity scheduler is another pluggable scheduler for YARN that allows for multiple groups to securely share a large Hadoop cluster.
    • Capacity is distributed among different queues.
    • Each queue is allocated a share of the cluster resources
    • A job can be submitted to the specific queue
    • Within a queue, FIFO scheduling is used
    • Default scheduler – Capacity Scheduler
  3. Fair scheduler:-Fair scheduling is a method of assigning resources to applications such that all applications get, on average, an equal share of resources over time

Configure scheduling policy:

$ vi yarn-site.xml
  <name> yarn.resourcemanager.schedular.class </name>
<value> org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler </value>

We can define different queue for development and production

$ vi etc/hadoop/capacity-schedular.xml
  <value>dev, prod</value>
  <description>The queues at the this level (root is the root queue).



We can submit the job to the specific queue

$ hadoop jar sample.jar wordCount.Main -D mapreduce.job.queue.name=prod input output

You can check the queue at localhost:8088/cluster


A container is a collection of physical resources such as RAM, CPU cores, and disks on a single node. There can be multiple containers on a single node (or a single large one). Every node in the system is considered to be composed of multiple containers of a minimum size of memory (e.g., 512 MB or 1 GB) and CPU. The ApplicationMaster can request any container so as to occupy a multiple of the minimum size.


The NodeManager is YARN’s per-node “worker” agent, taking care of the individual compute nodes in a Hadoop cluster. Its duties include keeping up-to-date with the ResourceManager, overseeing application containers’ life-cycle management, monitoring resource usage (memory, CPU) of individual containers, tracking node health, log management, and auxiliary services that may be exploited by different YARN applications. On start-up, the NodeManager registers with the ResourceManager; it then sends heartbeats with its status and waits for instructions. Its primary goal is to manage application containers assigned to it by the ResourceManager.


The AM is the process that coordinates an application’s execution in the cluster. Each application has its own unique AM, which is tasked with negotiating resources(containers) from the ResourceManager and working with the NodeManager to execute and monitor the tasks. In the YARN design, Map-Reduce is just one application framework; this design permits building and deploying distributed applications using other frameworks. Once the AM is started (as a container), it will periodically send heartbeats to the ResourceManager to affirm its health and to update the record of its resource demands.


RM is primarily a pure scheduler. It is strictly limited to arbitrating requests for available resources in the system made by the competing applications. It optimizes for cluster utilization (i.e., keeps all resources in use all the time) against various constraints such as capacity guarantees, fairness, and service level agreements (SLAs). To allow for different policy constraints, the RM has a pluggable scheduler that enables different algorithms such as those focusing on capacity and fair scheduling to be used as necessary.

That’s all in this article hopes you will clear about YARN architecture and working model