Apache Pig Notes



What is pig?

Implemented by Yahoo.
Pig Hadoop echo system s/w from apache foundation used for analysing the data.
Pig uses pig latin language.

Data flow language.
handle structured, semi-structured and un-structured
Replacement of mapreduce(not 100%)
Pig internally uses MapReduce.

Components/architecture of pig:

parse-->compile-->optimize-->plan

Pig Data Model

Atom
Tuple
Map
Bag
Relation

Pig Execution Modes(connection to the pig)
------------------------------------------
1) Local mode ---> data will be used from Local file system. commands will run locally.

[cloudera@quickstart ~]$ pig -x local

2) MapReduce Mode(HDFS mode) ---> data should be part of HDFS. commands will run in MapReduce(Hadoop)

[cloudera@quickstart ~]$ pig -x mapreduce   (or)    [cloudera@quickstart ~]$ pig


Execution Mechanisms( how many ways we can execute pig scipts)
-------------------------------------------------------------
1) Interactive mode (in grunt shell)
2) Batch mode (in unix/linux prompt)


Interactive mode (in grunt shell)
-------------
grunt> customers= LOAD '/home/cloudera/training/pigdata/customers.txt' USING PigStorage(',');
grubt> dump;

Batch mode (in unix/linux prompt)
-------------------------

1) local
--------------
[cloudera@quickstart ~]$ cat pig_local.pig
customers= LOAD '/home/cloudera/training/pigdata/customers.txt' USING PigStorage(',') as (id:int,name:chararray,age:int,address:chararray,salary:int);
dump customers;

[cloudera@quickstart ~]$ pig -x local pig_local.pig


2) MapReducemode(HDFS Mode)
------------
[cloudera@quickstart ~]$ cat pig_global.pig
customers= LOAD '/user/cloudera/customers.txt' USING PigStorage(',') as (id:int,name:chararray,age:int,address:chararray,salary:int);
dump customers;

[cloudera@quickstart ~]$ pig -x mapreduce pig_global.pig

[cloudera@quickstart ~]$ hdfs dfsadmin -safemode leave;           ----> optional

Note:  script file extention is  .pig


-----------------------------
Pig Diagnostic operators
Grouping and Joining
-----------------------------

Pig Diagnostic operators
-------------------------
dump
describe
explain
illustrate

dump--> display results on the screen.
----------
grunt> customers= LOAD '/home/cloudera/training/pigdata/customers.txt' USING PigStorage(',') as (id:int,name:chararray,age:int,address:chararray,salary:int);

grunt> dump customers;

describe-->view the schema of the relation.
---------
grunt> describe customers;
output:   customers: {id: int,name: chararray,age: int,address: chararray,salary: int}

explanation---> used to display logical, physical and MapReuce execution plans.
-------
grunt> explain customers;

illustrate --> step by step execution of statements.
-------
grunt> illustrate customers;

Grouping and Joining
----------------------
GROUP Operator
group --> group the data in one or more relations based on a key.

Grouping by single filed
-------------------------
grunt> students_details= LOAD '/home/cloudera/training/pigdata/students.txt' USING PigStorage(',') as (id:int,firstname:chararray,lastname:chararray,age:int,phone:chararray,city:chararray);
grunt> student_groupdata= GROUP students_details by age;
grunt> dump student_groupdata;

output:
(21,{(4,Preethi,Agarwal,21,9848022330,Pune),(1,Rajiv,Reddy,21,9848022337,Hyderabad)})
(22,{(3,Rajesh,Khanna,22,9848022339,Delhi),(2,siddarth,Battacharya,22,9848022338,Kolkata)})
(23,{(6,Archana,Mishra,23,9848022335,Chennai),(5,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar)})
(24,{(8,Bharathi,Nambiayar,24,9848022333,Chennai),(7,Komal,Nayak,24,9848022334,trivendram)})

Grouping by multiple fileds
-----------------------------
grunt> students_details= LOAD '/home/cloudera/training/pigdata/students.txt' USING PigStorage(',') as (id:int,firstname:chararray,lastname:chararray,age:int,phone:chararray,city:chararray);
grunt> student_multiplegroup= GROUP students_details by (age,city);
grunt> dump student_multiplegroup;

((21,Pune),{(4,Preethi,Agarwal,21,9848022330,Pune)})
((21,Hyderabad),{(1,Rajiv,Reddy,21,9848022337,Hyderabad)})
((22,Delhi),{(3,Rajesh,Khanna,22,9848022339,Delhi)})
((22,Kolkata),{(2,siddarth,Battacharya,22,9848022338,Kolkata)})
((23,Chennai),{(6,Archana,Mishra,23,9848022335,Chennai)})
((23,Bhuwaneshwar),{(5,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar)})
((24,Chennai),{(8,Bharathi,Nambiayar,24,9848022333,Chennai)})
((24,trivendram),{(7,Komal,Nayak,24,9848022334,trivendram)})

Joins
------------------
Joins are used for get the data from one or more relations.
Operator:  JOIN

Types:
---
self join
inner join
outer join:  left, right, full

Self join
--------------------------
grunt> customers1= LOAD '/home/cloudera/training/pigdata/customers.txt' USING PigStorage(',') as (id:int,name:chararray,age:int,address:chararray,salary:int);
grunt> customers2= LOAD '/home/cloudera/training/pigdata/customers.txt' USING PigStorage(',') as (id:int,name:chararray,age:int,address:chararray,salary:int);
grunt> customers3= JOIN customers1 BY id,customers2 BY id;

grunt> dump customers;

output:
(1,Ramesh,32,Ahmedabad,2000,1,Ramesh,32,Ahmedabad,2000)
(2,Khilan,25,Delhi,1500,2,Khilan,25,Delhi,1500)
(3,kaushik,23,Kota,2000,3,kaushik,23,Kota,2000)
(4,Chaitali,25,Mumbai,6500,4,Chaitali,25,Mumbai,6500)
(5,Hardik,27,Bhopal,8500,5,Hardik,27,Bhopal,8500)
(6,Komal,22,MP,4500,6,Komal,22,MP,4500)
(7,Muffy,24,Indore,10000,7,Muffy,24,Indore,10000)

inner join
-------------------------
grunt> customers= LOAD '/home/cloudera/training/pigdata/customers.txt' USING PigStorage(',') as (id:int,name:chararray,age:int,address:chararray,salary:int);

grunt> orders= LOAD '/home/cloudera/training/pigdata/orders.txt' USING PigStorage(',') as (oid:int,date:chararray,customer_id:int,amount:int);

grunt> customers_orders= JOIN customers by id,orders by customer_id;

output:
(2,Khilan,25,Delhi,1500,101,2009-11-20 00:00:00,2,1560)
(3,kaushik,23,Kota,2000,100,2009-10-08 00:00:00,3,1500)
(3,kaushik,23,Kota,2000,102,2009-10-08 00:00:00,3,3000)
(4,Chaitali,25,Mumbai,6500,103,2008-05-20 00:00:00,4,2060)

left outer join
------------------
returns all the rows from left table, even if there are no matches in the right table.

grunt> customers= LOAD '/home/cloudera/training/pigdata/customers.txt' USING PigStorage(',') as (id:int,name:chararray,age:int,address:chararray,salary:int);
grunt> orders= LOAD '/home/cloudera/training/pigdata/orders.txt' USING PigStorage(',') as (oid:int,date:chararray,customer_id:int,amount:int);
grunt> customers_orders_leftouter= JOIN customers by id LEFT OUTER,orders by customer_id;

output:
(1,Ramesh,32,Ahmedabad,2000,,,,)
(2,Khilan,25,Delhi,1500,101,2009-11-20 00:00:00,2,1560)
(3,kaushik,23,Kota,2000,100,2009-10-08 00:00:00,3,1500)
(3,kaushik,23,Kota,2000,102,2009-10-08 00:00:00,3,3000)
(4,Chaitali,25,Mumbai,6500,103,2008-05-20 00:00:00,4,2060)
(5,Hardik,27,Bhopal,8500,,,,)
(6,Komal,22,MP,4500,,,,)
(7,Muffy,24,Indore,10000,,,,)

right outer join
--------------------
retuns all the rows from right table, even if there are no matches in left table.

grunt> customers= LOAD '/home/cloudera/training/pigdata/customers.txt' USING PigStorage(',') as (id:int,name:chararray,age:int,address:chararray,salary:int);
grunt> orders= LOAD '/home/cloudera/training/pigdata/orders.txt' USING PigStorage(',') as (oid:int,date:chararray,customer_id:int,amount:int);
grunt> customers_orders_rightouter= JOIN customers by id RIGHT,orders by customer_id;

output
(2,Khilan,25,Delhi,1500,101,2009-11-20 00:00:00,2,1560)
(3,kaushik,23,Kota,2000,100,2009-10-08 00:00:00,3,1500)
(3,kaushik,23,Kota,2000,102,2009-10-08 00:00:00,3,3000)
(4,Chaitali,25,Mumbai,6500,103,2008-05-20 00:00:00,4,2060)

Full outer join
------------------
returns rows when there is a match in one of the relation.
grunt> customers= LOAD '/home/cloudera/training/pigdata/customers.txt' USING PigStorage(',') as (id:int,name:chararray,age:int,address:chararray,salary:int);
grunt> orders= LOAD '/home/cloudera/training/pigdata/orders.txt' USING PigStorage(',') as (oid:int,date:chararray,customer_id:int,amount:int);
grunt> customers_orders_fullouter= JOIN customers by id FULL OUTER,orders by customer_id;

output:
(1,Ramesh,32,Ahmedabad,2000,,,,)
(2,Khilan,25,Delhi,1500,101,2009-11-20 00:00:00,2,1560)
(3,kaushik,23,Kota,2000,100,2009-10-08 00:00:00,3,1500)
(3,kaushik,23,Kota,2000,102,2009-10-08 00:00:00,3,3000)
(4,Chaitali,25,Mumbai,6500,103,2008-05-20 00:00:00,4,2060)
(5,Hardik,27,Bhopal,8500,,,,)
(6,Komal,22,MP,4500,,,,)
(7,Muffy,24,Indore,10000,,,,)

-------------------
Combining
Splitting
Filtering
Sorting
-----------------------

Combining
-------
Union: merging/combining data from more relations.

student_data1.txt
student_data2.txt

grunt> student1= LOAD '/home/cloudera/training/pigdata/Student_data1.txt' USING PigStorage(',') as (id:int,firstname:chararray,lastname:chararray,phone:chararray,city:chararray);
grunt> student2= LOAD '/home/cloudera/training/pigdata/Student_data2.txt' USING PigStorage(',') as (id:int,firstname:chararray,lastname:chararray,phone:chararray,city:chararray);
grunt> student= UNION student1,student2;


Splitting
-----------
SPLIT: splitting data in to multiple relations.

grunt> student_details= LOAD '/home/cloudera/training/pigdata/student_details.txt' USING PigStorage(',') as (id:int,firstname:chararray,lastname:chararray,age:int,phone:chararray,city:chararray);

Problem statement: Split the relation into two, one listing the students age less than 23, and the other listing the students having the age between 23 and 25.

grunt> SPLIT student_details into student_details1 if age<23 age="" if="" student_details2="">23 and age<25 p="">grunt> dump student_details1;
grunt> dump student_details2;

Filtering
-------------------
FILTER: select the required tuples from a relation based on a condition.

grunt> student_details= LOAD '/home/cloudera/training/pigdata/student_details.txt' USING PigStorage(',') as (id:int,firstname:chararray,lastname:chararray,age:int,phone:chararray,city:chararray);
grunt> filter_data= FILTER student_details BY city=='Chennai';
grunt> dump filter_data;


Sorting
---------------
ORDER BY : used to display the contents of a relation in a sorted order based one a field.

grunt> student_details= LOAD '/home/cloudera/training/pigdata/student_details.txt' USING PigStorage(',') as (id:int,firstname:chararray,lastname:chararray,age:int,phone:chararray,city:chararray);
grunt> order_by_age= ORDER student_details BY age DESC;
grunt> dump order_by_age;

Removing duplicates
-----------------------
DISTINCT : used for remove redundant or duplicate tuples from a ralation.

grunt> student_details= LOAD '/home/cloudera/training/pigdata/student_details_dup.txt' USING PigStorage(',') as (id:int,firstname:chararray,lastname:chararray,age:int,phone:chararray,city:chararray);
grunt> distinct_data= DISTINCT student_details;
grunt> dump distinct_data;

FOREACH: Generate spesified data transformations based he column data.
---------------------------
Req:
----
get the id, age, and city values of each student from the relation student_details and store it into another relation named foreach_data using the foreach operator.

grunt> student_details= LOAD '/home/cloudera/training/pigdata/student_details.txt' USING PigStorage(',') as (id:int,firstname:chararray,lastname:chararray,age:int,phone:chararray,city:chararray);

grunt> foreach_data= FOREACH student_details GENERATE id,age,city;

grunt> dump foreach_data;


Limit Operator
---------------------
Used for limiting the rows.

grunt> student_details= LOAD '/home/cloudera/training/pigdata/student_details.txt' USING PigStorage(',') as (id:int,firstname:chararray,lastname:chararray,age:int,phone:chararray,city:chararray);
grunt> student_limited_data= LIMIT student_details 4;
grunt> dump student_limited_data;


-----------------------
Pig Built-in functions
-------------------------
Eval functions
String functins

Eval functions
---------------------------
AVG()
MAX()
MIN()
COUNT()
SUM()


Pre-requisites
-----------
grunt> student_details= LOAD '/home/cloudera/training/pigdata/student_gpa.txt' USING PigStorage(',') as (id:int,firstname:chararray,lastname:chararray,age:int,phone:chararray,city:chararray,gpa:int)
grunt> student_group_all= GROUP student_details ALL;
grunt> dump student_group_all;

AVG()
grunt> student_gpa_avg= foreach student_group_all Generate(student_details.firstname,student_details.gpa),AVG(student_details.gpa);
MAX()
grunt> student_gpa_max= foreach student_group_all Generate(student_details.firstname,student_details.gpa),MAX(student_details.gpa);
MIN()
grunt> student_gpa_min= foreach student_group_all Generate(student_details.firstname,student_details.gpa),MIN(student_details.gpa);
COUNT()
grunt> student_gpa_count= foreach student_group_all Generate(student_details.firstname,student_details.gpa),COUNT(student_details.gpa);
SUM()
grunt> student_gpa_sum= foreach student_group_all Generate(student_details.firstname,student_details.gpa),SUM(student_details.gpa);


DIFF() & SUBTRACT()
-----------------------

grunt> emp_sales= LOAD '/home/cloudera/training/pigdata/emp_sales.txt' USING PigStorage(',') as (sno:int, name:chararray, age:int, salary:int, dept:chararray);
grunt> emp_bonus= LOAD '/home/cloudera/training/pigdata/emp_bonus.txt' USING PigStorage(',') as (sno:int, name:chararray, age:int, salary:int,dept:chararray);
grunt> cogroup_data = COGROUP emp_sales by sno, emp_bonus by sno;

grunt> dump cogroup_data;

(1,{(1,Robin,22,25000,sales)},{(1,Robin,22,25000,sales)})
(2,{(2,BOB,23,30000,sales)},{(2,Jaya,23,20000,admin)})
(3,{(3,Maya,23,25000,sales)},{(3,Maya,23,25000,sales)})
(4,{(4,Sara,25,40000,sales)},{(4,Alia,25,50000,admin)})
(5,{(5,David,23,45000,sales)},{(5,David,23,45000,sales)})
(6,{(6,Maggy,22,35000,sales)},{(6,Omar,30,30000,admin)})

grunt> diff_data = FOREACH cogroup_data GENERATE DIFF(emp_sales,emp_bonus);

output
-------
({})
({(2,BOB,23,30000,sales),(2,Jaya,23,20000,admin)})
({})
({(4,Sara,25,40000,sales),(4,Alia,25,50000,admin)})
({})
({(6,Maggy,22,35000,sales),(6,Omar,30,30000,admin)})

grunt> sub_data = FOREACH cogroup_data GENERATE SUBTRACT(emp_sales,emp_bonus);

output:
-----
({})
({(2,BOB,23,30000,sales)})
({})
({(4,Sara,25,40000,sales)})
({})
({(6,Maggy,22,35000,sales)})


String functions
--------------------
ENDSWITH
STARTSWITH
SUBSTRING
EqualsIgnoreCase
UPPER
LOWER
REPLACE
TRIM, RTRIM, LTRIM

grunt> emp_data = LOAD '/home/cloudera/training/pigdata/emp.txt' USING PigStorage(',') as (id:int, name:chararray, age:int, city:chararray);

grunt> emp_endswith= FOREACH emp_data GENERATE (id,name),ENDSWITH(name,'n');
output:
((1,Robin),true)
((2,BOB),false)
((3,Maya),false)
((4,Sara),false)
((5,David),false)
((6,Maggy),false)
((7,Robert),false)
((8,Syam),false)
((9,Mary),false)
((10,Saran),true)
((11,Stacy),false)
((12,Kelly),false)

grunt> emp_startwith= FOREACH emp_data GENERATE (id,name),STARTSWITH(name,'M');
grunt> dump emp_startwith;


grunt> substring_data= FOREACH emp_data GENERATE (id,name),SUBSTRING(name,0,2);
grunt> DUMP substring_data;
output:
-----
((1,Robin),Ro)
((2,BOB),BO)
((3,Maya),Ma)
((4,Sara),Sa)
((5,David),Da)
((6,Maggy),Ma)
((7,Robert),Ro)
((8,Syam),Sy)
((9,Mary),Ma)
((10,Saran),Sa)
((11,Stacy),St)
((12,Kelly),Ke)

grunt> equals_data= FOREACH emp_data GENERATE (id,name), EqualsIgnoreCase(name, 'Robin');
grunt> dump equals_data;
output
--------
((1,Robin),true)
((2,BOB),false)
((3,Maya),false)
((4,Sara),false)
((5,David),false)
((6,Maggy),false)
((7,Robert),false)
((8,Syam),false)
((9,Mary),false)
((10,Saran),false)
((11,Stacy),false)
((12,Kelly),false)

grunt> upper_data= FOREACH emp_data GENERATE (id,name), UPPER(name);
grunt> dump upper_data;
output:
---------
((1,Robin),ROBIN)
((2,BOB),BOB)
((3,Maya),MAYA)
((4,Sara),SARA)
((5,David),DAVID)
((6,Maggy),MAGGY)
((7,Robert),ROBERT)
((8,Syam),SYAM)
((9,Mary),MARY)
((10,Saran),SARAN)
((11,Stacy),STACY)
((12,Kelly),KELLY)

grunt> lower_data= FOREACH emp_data GENERATE (id,name), LOWER(name);
grunt> dump lower_data;

output
--------
((1,Robin),robin)
((2,BOB),bob)
((3,Maya),maya)
((4,Sara),sara)
((5,David),david)
((6,Maggy),maggy)
((7,Robert),robert)
((8,Syam),syam)
((9,Mary),mary)
((10,Saran),saran)
((11,Stacy),stacy)
((12,Kelly),kelly)

grunt> replace_data= FOREACH emp_data GENERATE (id,name), REPLACE(city,'London','Canada');
grunt> dump replace_data;
output
-------
((1,Robin),newyork)
((2,BOB),Kolkata)
((3,Maya),Tokyo)
((4,Sara),Canada )
((5,David),Bhuwaneshwar )
((6,Maggy),Chennai)
((7,Robert),newyork )
((8,Syam),Kolkata)
((9,Mary),Tokyo)
((10,Saran),Canada )
((11,Stacy),Bhuwaneshwar )
((12,Kelly),Chennai)

grunt> trim_data= FOREACH emp_data GENERATE (id,name), TRIM(name)
grunt> rtrim_data= FOREACH emp_data GENERATE (id,name), RTRIM(name);
grunt> ltrim_data= FOREACH emp_data GENERATE (id,name), LTRIM(name);


-----------
Pig UDF
-------------
Step1: Create new Java project in Eclipse
Step2: Implement UDF in Java/Python.

Ex:
package com;
import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;

public class SampleEval extends EvalFunc {

public String exec(Tuple input) throws IOException {
if (input == null || input.size() == 0)
return null;
try {
String str = (String) input.get(0);
return str.toUpperCase();
} catch (Exception e) {
throw new IOException("Caught exception processing input row ", e);
}
}
}



Step3: Create JAR file
Step4: Copy Jar file in local path
Step5: Register jar file
Step6: Define UDF
Step7: Now, you can use UDF.

Register jar file to pig - step5
---------------------------
grunt> REGISTER /home/cloudera/training/pigudf.jar;

Define UDF   -step6
----------
grunt> DEFINE myfun com.SampleEval();
myfun---> user defined function name
com---> package name used in eclipse
SampleEval--->class name created under com package

grunt> emp_data= LOAD '/home/cloudera/training/pigdata/emp.txt' USING PigStorage(',') as (id:int,name:chararray,age:int,city:chararray);


How to use UDF  -step7
------------------
grunt> empdata_upper= FOREACH emp_data GENERATE myfun(name);

(ROBIN)
(BOB)
(MAYA)
(SARA)
(DAVID)
(MAGGY)
(ROBERT)
(SYAM)
(MARY)
(SARAN)
(STACY)
(KELLY)

Modes of Execution
-------------------
1) Interactive ( give commands one after other manually)
2) Batch mode

pigscript.pig
------------
REGISTER /home/cloudera/training/pigudf.jar;
DEFINE myfun com.SampleEval();
emp_data= LOAD '/home/cloudera/training/pigdata/emp.txt' USING PigStorage(',') as (id:int,name:chararray,age:int,city:chararray);
Upper_case = FOREACH emp_data GENERATE myfun(name);
dump Upper_case;

Executing .pig script file in linux command prompt
-----------------------------------------
[cloudera@quickstart training]$ pig -x local pigudfscript.pig

Followers