Tuesday 27 January 2015

ETL in Hadoop (Big data)

Introduction
Creating data for end user/analysis is always been a challenge. specially if we talk about big data platform.
life cycle to get data in pure form goes through multiple stages like Extract, Transform, Load, Data cleansing, partitioning, bucketing and aggregations.

This paper explains a efficient and meaning full way to transforming data from raw to pure form.

Methodology
ETL methodology is well accepted across industry for data preparation but in BigData space we tend to follow ELT i.e Extract, Load and then transform. In BigData, we get all the data on HDFS and use mutiple tools like MapReduce, Cascading, Scalding, Pig, Hive etc run transformation on data.

HDFS replicates data for reliability and performance and process do take this into account while creating mutiple data layers.

Data layers
1. Staging data layer
2. Incoming data layer
3. Play data layer
4. Pure data layer
5. Snapshot data layer

Staging data layer
This is considered a transit data and need to rotate on regular basis.
this layer holds data in its raw format just after Extract and is loaded into this space. Data here is used a backup data for any reload/re-run or validating audit trails

This is usually not on HDFS.
Data is recommended to rotate after every 15 days or as per use-case.
Since not on HDFS, this has limited space allocated to it.

Incoming data layer
This is exact replica for data what we had in staging and is persisted for longer duration in compressed form.
Data is compressed along with rotation in Staging data i.e. if 2015-01-01 data is moved out of Staging on 2015-01-15 than on same day we do compression of data.

Data here is saved on HDFS with replication of 3.
This layer can hold data upto 2-3 years or as per use-case

Play data layer
This space is used as temporary location for saving any intermediate data used while transformation of data from Raw to Pure state. This space is regularly cleaned after job run.

Pure data layer
This space hold data in its Pure form i.e. Data here is final output of all the transformations that were planned and this data is available to all the down stream processes.
This resits on HDFS with replication of 3 or more and data here may or may not be compressed.

Snapshot data layer
This space is on top of Pure dataset. Data here can have multiple presentation of data in Pure layer. Each presentation is with respect to a particular use-case or down-stream application.

for e.g:
Data set in Pure layer is aggregated based on Date and Customer Id.  

Down-stream application (A) works faster when data is aggregated by Date and Down-steam application (B) works faster when data is aggregated by Customer Id. We save same Pure data in two different partition for two different down-stream applications. 

Conclusion
In BigData space, its very important to have huge data set separated out logical in different partition for efficient space management and reliable functionality of processes.

Different layer of data represents data in each state of transformation and helps segregate Pure from work Data. Different layers of data also helps in efficient Audit trail mechanism and reconcile of data in case of outage.


Tuesday 11 December 2012

Hadoop Online Tutorial



Hadoop Online Tutorial


1.               Business Use case :

2.     hands-on lab :
                      http://ihadoop.blogspot.in/2012/12/hadoop-tutorial-hands-on-guide.html


Hadoop tutorial, hands-on guide


                         Hadoop Developer Tutorial
Software required
1. VMWare Player 4.0.4
2. Linux version – Ubuntu 12.04
3. Putty
4. winscp
5. User name: *******
6. Password: *******

Tutorial is written with respect to VM created that can run on any windows machine using VMWare Player. All required software are all available on VM machine.
VM can be downloaded from:
Else tutorial can be followed if you have access to Unix/Linux OS.
  
Lab 1. Preparation for lab
(Not required if not working on VM provided)
1. Unzip mirror image at any location on windows machine
2. Open VMWare player and
     file -> open virtual machine and select
     -> mirror image folder ->\virtual machine file
     \ubuntu-server-12.04-amd64\ubuntu-server-12.04-amd64 file
3. Cltr+G and make note of IP address and same can be used to
     login via putty and winscp
4. Open Putty -> login via IP address ->
     username: ******, password: ********
5. Now we can minimize VM machine and Putty can be used from here

Lab 2. Setting Hadoop
1. Untar Hadoop jar file
   a. Go to lab/software
   b. Untar Hadoop tar file into software folder
   c. tar –xvf ../../downloads/Hadoop-1.0.3.tar.gz

2. Set up env. Variable
  a. Open .bash_profile i.e. vi .bash_profile
  b. Enter following
      1. export JAVA_HOME=/usr/lib/jvm/java-6-openjdk-amd64
      2. export HADOOP_INSTALL=/home/notroot/lab/software/hadoop-1.0.3
      3. export HADOOP_HOME=/home/notroot/lab/software/hadoop-1.0.3
      4. export PATH=$PATH:$HADOOP_INSTALL/bin
          save and exit i.e. do:wq enter

 c.  Check installations
java –version
                           
hadoop version

3. configuring Hadoop/HDFS/MAPREDUCE

cd $HADOOP_HOME/conf
  reference Link: http://hadoop.apache.org/docs/stable/cluster_setup.html

Modify    core-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.default.name</name>        
<value>hdfs://localhost:8020</value> 
<final>true</final>
</property>
</configuration>


Modify    hdfs-site.xml
<?xml version="1.0"?>
<!-- hdfs-site.xml -->
<configuration>
<property>
<name>dfs.name.dir</name>
<value>/home/notroot/lab/hdfs/namenodep,/home/notroot/lab/hdfs/namenodes</value>
<final>true</final>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.data.dir</name>
<value>/home/notroot/lab/hdfs/datan1,/home/notroot/lab/hdfs/datan2</value>
<final>true</final>
</property>
<property>
<name>fs.checkpoint.dir</name>
<value>/home/notroot/lab/hdfs/checkp</value>
<final>true</final>
</property>
</configuration>


Modify    mapred-site.xml
<?xml version="1.0"?>
<!-- mapred-site.xml -->
<configuration>
<property>
<name>mapred.job.tracker</name>
<value>localhost:8021</value>
<final>true</final>
</property>
<property>
<name>mapred.local.dir</name>
<value>/home/notroot/lab/mapred/local1,/home/notroot/lab/mapred/local2</value>
<final>true</final>
</property>
<property>
<name>mapred.system.dir</name>
<value>/home/notroot/lab/mapred/system</value>
<final>true</final>
</property>
<property>
<name>mapred.tasktracker.map.tasks.maximum</name>
<value>3</value>
<final>true</final>
</property>
<property>
<name>mapred.tasktracker.reduce.tasks.maximum</name>
<value>3</value>
<final>true</final>
</property>
<property>
<name>mapred.child.java.opts</name>
<value>-Xmx400m</value>
<!-- Not marked as final so jobs can include JVM debugging options -->
</property>
</configuration>


Create directories under lab/hdfs
1.      mkdir namenodep
2.      mkdir namenodes
3.      mkdir datan1
4.      mkdir datan2
5.      mkdir checkp

Change permission on folders
1.      chmod 755 datan1
2.      chmod 755 datan2

Create directories under lab/mapred
1.      mkdir local1
2.      mkdir local2
3.      mkdir system


Format namenode (only once)
Cmd: Hadoop namenode –format

Starting DHFS services
1)      cd $HADOOP_HOME/conf
2)      edit  Hadoop-env.sh and set JAVA_HOME
a.      export JAVA_HOME=/usr/lib/jvm/java-6-openjdk-amd64
3)      start HDFS services
a.      cd $HADOOP_HOME/bin
b.      exec: ./start-dfs.sh
4)      start MapReduce services
a.      cd $HADOOP_HOME/bin
b.      exec: ./start-mapred.sh

run jps and check processes running



HDFS services: DataNode, NameNode and SecondaryNameNode
MapReduce services: TaskTracker and JobTracker

Lab 3: HDFS Lab:
 1) Create an input and output directory under HDFS for input and output files
            - Hadoop fs –mkdir input
            - Hadoop fs –mkdir output

2) Check directories
            - Hadoop fs –ls
  
3) Copy files from local system to HDFS and check if copied
            - Hadoop fs –copyFromLocal  /home/notroot/lab/data/txns  input/
              - Checking files, Hadoop fs –ls input/
    
4) Copy from HDFS to local system
            - Hadoop fs –copyToLocal  input/txns  /home/notroot/lab/data/txndatatemp

Goto datan1 and datan2 and check how the file is split and multiple blocks are created

Lab 3: MapReduce – Word Count
1. first we will focus on writing Java program using Eclipse
2. Eclipse lab (most of you know)
    a. Untar Hadoop tar file under, (say: c:\softwares\)
    b. Create new Java project (MRLab), package lab.samples
    c. Add Hadoop jar files to project created
        i. Jars under c:\softwares\hadoop-1.0.3
       ii. Jars under c:\softwares\hadoop-1.0.3\lib
    d. Time to write Map, Reduce functions
    e. We gonna write three classes and package them together in a jar file
      i. Map class
     ii. Reduce class
    iii. Driver class (Hadoop will call main function of this class)

link for sample code:

    f. Compile code and create jar file
       i. Right click on “Project folder” -> export -> jar file
   g. Transfer jar file from local machine to virtual machine, use WinSCP tool for it
   h. Copy jar file to /home/notroot/lab/programs (on virtual machine)

At this point, we have MapReduce function (jar file) on virtual machine and all processes are also running on virtual machine (HDFS, Job tracker, task tracker …)

Run MapReduce as
Hadoop   jar <jar file name>.jar   DriverClass    input file path    output file path
Hadoop   jar <jar file name>.jar   lab.samples.WordCount    input/words     output/wcount

Output file can be check by: Hadoop fs –cat output/wcount/part-r-00000

Lab 6: Hive Configuration

Install MySQL on virtual machine
1.      Sudo apt-get install mysql-server
2.      Sudo apt-get install mysql-client-core-5.5

a.      Untar Hive jar file
Ø  Go to lab/software
Ø  Untar Hive files into software folder
tar –xvf../../downloads/hive-0.9.0-bin.tar.gz
Ø  Browse through the directories and check which
subdirectory contains what files

b.      Set up .bash_profile
Ø  Open .bash_profile file under home directory
Enter the following settings
export HIVE_INSTALL= /home/notroot/lab/software/hive-0.9.0-bin
export PATH=$PATH:$HIVE_INSTALL/bin
Ø  Save and exit .bash_profile
Ø  Run following command
. .bash_profile
Ø  Verify whether variable are defined or not by typing export
at command prompt

c.       Check Hive Table
Ø  Run hive and verify  if enters hive shell
hive
Ø  Check databases and tables
show databases;
show tables;

Lab 7 : Hive Programming

Create databases
create database retail;

Use database
use retail;

Create table for storing transactional records
Create table txnrecords(txnno INT, txndate STRING , custno INT,
 amount DOUBLE, category STRING, product STRING, city STRING,
state STRING, spendBy STRING)

row format delimited

fields terminated by ‘,’

stored as textfile;

Load the data into the table
LOAD DATA LOCAL INPATH ‘/home/notroot/lab/data/txnns’
OVERWRITE INTO TABLE txnrecords;

Describing metadata or schema of the table
describe txnreords;

Counting no of records
Select count(*) from txnrecords;

Counting total spending by category of products
Select category , sum(amount) from txnrecords group by category;

Top 10 customers
Select custno, sum(amount) from txnrecords group by custno limit 10;









Saturday 10 November 2012

Hadoop tutorial - MapReduce by example


Hadoop User/Business case study
We know Hadoop has HDFS (file system), MapReduce framework but this Blog will try to explain how these two individual components come together and solves business problems

Common problem faced by organizations across domains is processing BIG DATA.
What is BIG DATA?
BIG DATA is a relative term i.e. for some start-up company(say CompUS) 2GB data is BIG DATA because there machine spec reads somewhat like Dual Core 2.70GHz,8M cache.
CompUS (company in reference) has to buy bigger machine to process 2GB of data every day.

Imagine CompUS grows and needs to process 4GB/day after 2 months, 10GB/day after 6 months and 1TB/day after 1year. To solve this, CompUS buys new and powerfull machine every time there data increases. That means company tried to solve problem with vertical scaling.  
Vertical scaling is not good because every time company buys new machine/hardware, its adding infrastructure/maintenance cost to company.

Hadoop solves problem by horizontal scaling, how?
Hadoop runs processing on parallel machines and every time data increases, companies can add one more machine to already existing fleet of machines. Hadoop processing doesn’t depend on spec of individual machines i.e. new machine added doesn’t have to be powerful, expensive machine but can be somewhat like commodity machine.

If CompUS uses Hadoop over vertical scaling it gains following
            1)   Low Infrastructure cost.
            2)  System that can be easily scaled in future without effecting existing infrastructure.

Let’s dig into User/Business cases with Hadoop in perspective.
Financial domain:
Financial firms dealing with Stock/equity trading generates huge data every day.

Problem set:
Data in hand contains information of various traders, brokers, orders etc. and company wants to generate metrics for each trader.

Infrastructure in hand:
11 machines running in parallel under Hadoop and are controlled by 1 NameNode (Hadoop Master Node, controlling node) and remaining 10 machine acts as Slaves, gets commands from Master Node


Solution-Hadoop way:
1)      Import data into HDFS (Hadoop file system)
a.      HDFS will split data across all machines in small-small blocks i.e. 10 GB of data will get divided among 10 machines and each machine gets 1GB of data.
b.      HDFS also replicate each block of data across multiple machines to handle machine failure at runtime.
c.       At this point, data is simply divided into smaller blocks that means information of “trader A” can be on machine 1/2/3… and same for all other traders.
2)      Write MapReduce function for analysis
3)      Map function
1)      Map function will create “Key/value” pair somewhat like
a.      If Original data:
                                                              i.      “trader A”, “buy prices”, “sell price”
                                                            ii.      “trader B”, “buy price”, “sell price”
                                                          iii.      “trader C”, “buy price”, “sell price”
b.      Key-value pair generated
                                                              i.      Key: “trader A” – Value: “buy price”, “sell price”
                                                            ii.      Key: “trader B” – Value: “buy price”, “sell price”
                                                          iii.      Key: “trader C” – Value: “buy price”, “sell price”  
c.       Map function will run in parallel on each of 10 machines containing data
d.      Each machine running Map function, will generate intermediate result of key/value pair
e.      Note, at this point, each machine has only created key/value pair from 1GB of data allocated earlier and each machine contains multiple distinct keys at this point.
4)      Hadoop magic before running Reduce function
a.      Hadoop(Master Node) will wait until all machine complete running Map function
b.      Hadoop will now sort and scuffle data (very important point)
                                                              i.      Sort data on individual machine wrt Key
                                                            ii.      Scuffle data across multiple machines wrt Key
                                                          iii.      Data for “Trader A” from machine1/2/3… will be consolidated on say machine1
                                                           iv.       Data for “Trader B” from machine 1/2/3… will be consolidated on machine 2 and so on
c.       At this point, all data required for analysis for “Trader A” is available on 1 machine and same for other Traders
5)      After doing sorting/scuffling of data across nodes, Master Node will ask all slaves machine to run Reduce function in parallel.
6)      Reduce function is main logic that will generate final metric for “Trader A”
7)      Say we want to analyze total profit made by each Trader?
8)      Time to run Reduce function
a.      Analysis will be easy and fast because each machine only gets required data for 1 trader.
b.      Reduce function get list of key/value pair
c.       key is same for individual function, each machine runs 1 function at one time, multiple functions in parallel on each slave machine
d.      Iterate over list  (Reduce function)
                                                              i.      Total profit = 0;
For each (key/value pair){
total profit = total profit + (buy price – sell price)
}
9)      Once all Reduce functions are completed on all slave machines, Master Node aggregates data and stores output in HDFS file system
10)  Consolidated after MapReduce will read somewhat like
a.      Trader              Total profit
b.      Trader A          10,000
c.       Trader B             -9,000
d.      Trader C          12,000