Apache Hadoop & Pig - Introduction

Introduction to Apache Hadoop & Pig Milind Bhandarkar (milindb@yahoo-inc.com) Y!IM: gridsolutions Wednesday, 22 September 2010 Agenda • Apache Hadoop • Map-Reduce • Distributed File System • Writing Scalable Applications • Apache Pig • Q & A 2 Wednesday, 22 September 2010 Hadoop: Behind Every Click At Yahoo! 3 Wednesday, 22 September 2010 Hadoop At Yahoo! (Some Statistics) • 40,000 + machines in 2...
  • Tên Ebook: Apache Hadoop & Pig - Introduction
  • Loại file: PDF
  • Dung lượng: 6 MB
  • Số trang:

LINH TẢI:


TRÍCH DẪN:
Introduction to Apache Hadoop & Pig Milind Bhandarkar (milindb@yahoo-inc.com) Y!IM: gridsolutions Wednesday, 22 September 2010 Agenda • Apache Hadoop • Map-Reduce • Distributed File System • Writing Scalable Applications • Apache Pig • Q & A 2 Wednesday, 22 September 2010 Hadoop: Behind Every Click At Yahoo! 3 Wednesday, 22 September 2010 Hadoop At Yahoo! (Some Statistics) • 40,000 + machines in 20+ clusters • Largest cluster is 4,000 machines • 6 Petabytes of data (compressed, unreplicated) • 1000+ users • 200,000+ jobs/day 4 Wednesday, 22 September 2010 Wednesday, 22 September 2010 90 80 70 60 50 40 30 20 10 0 250 38K Servers 170 PB Storage 200 1M+ Monthly Jobs Hadoop Servers Hadoop Storage (PB) Daily 150 Produc;on Science Impact 100 Research 50 Today 0 2006 2007 2008 2009 2010 Sample Applications • Data analysis is the inner loop at Yahoo! • Data ⇒ Information ⇒ Value • Log processing: Analytics, reporting, buzz • Search index • Content Optimization, Spam filters • Computational Advertising 6 Wednesday, 22 September 2010 BEHIND EVERY CLICK Wednesday, 22 September 2010 Wednesday, 22 September 2010 BEHIND EVERY CLICK Wednesday, 22 September 2010 Who Uses Hadoop ? Wednesday, 22 September 2010 Why Clusters ? 10 Wednesday, 22 September 2010 Wednesday, 22 September 2010 Big Datasets (Data-Rich Computing theme proposal. J. Campbell, et al, 2007) Cost Per Gigabyte (http://www.mkomo.com/cost-per-gigabyte) Wednesday, 22 September 2010 Storage Trends (Graph by Adam Leventhal, ACM Queue, Dec 2009) Wednesday, 22 September 2010 Motivating Examples 14 Wednesday, 22 September 2010 Yahoo! Search Assist Wednesday, 22 September 2010 Search Assist • Insight: Related concepts appear close together in text corpus • Input: Web pages • 1 Billion Pages, 10K bytes each • 10 TB of input data • Output: List(word, List(related words)) 16 Wednesday, 22 September 2010 Search Assist // Input: List(URL, Text) foreach URL in Input : Words = Tokenize(Text(URL)); foreach word in Tokens : Insert (word, Next(word, Tokens)) in Pairs; Insert (word, Previous(word, Tokens)) in Pairs; // Result: Pairs = List (word, RelatedWord) Group Pairs by word; // Result: List (word, List(RelatedWords) foreach word in Pairs : Count RelatedWords in GroupedPairs; // Result: List (word, List(RelatedWords, count)) foreach word in CountedPairs : Sort Pairs(word, *) descending by count; choose Top 5 Pairs; // Result: List (word, Top5(RelatedWords)) 17 Wednesday, 22 September 2010 You Might Also Know Wednesday, 22 September 2010 You Might Also Know • Insight: You might also know Joe Smith if a lot of folks you know, know Joe Smith • if you don't know Joe Smith already • Numbers: • 300 MM users • Average connections per user is 100 19 Wednesday, 22 September 2010 You Might Also Know // Input: List(UserName, List(Connections)) foreach u in UserList : // 300 MM foreach x in Connections(u) : // 100 foreach y in Connections(x) : // 100 if (y not in Connections(u)) : Count(u, y)++; // 3 Trillion Iterations Sort (u,y) in descending order of Count(u,y); Choose Top 3 y; Store (u, {y0, y1, y2}) for serving; 20 Wednesday, 22 September 2010 Performance • 101 Random accesses for each user • Assume 1 ms per random access • 100 ms per user • 300 MM users • 300 days on a single machine 21 Wednesday, 22 September 2010 MapReduce Paradigm 22 Wednesday, 22 September 2010 Map & Reduce • Primitives languages) in 1970s Lisp (& Other functional • Google Paper 2004 • http://labs.google.com/papers/ mapreduce.html 23 Wednesday, 22 September 2010 Map Output_List = Map (Input_List) Square (1, 2, 3, 4, 5, 6, 7, 8, 9, 10) = (1, 4, 9, 16, 25, 36,49, 64, 81, 100) 24 Wednesday, 22 September 2010 Reduce Output_Element = Reduce (Input_List) Sum (1, 4, 9, 16, 25, 36,49, 64, 81, 100) = 385 25 Wednesday, 22 September 2010 Parallelism • Map is inherently parallel • Each list element processed independently • Reduce is inherently sequential • Unless processing multiple lists • Grouping to produce multiple lists 26 Wednesday, 22 September 2010 Search Assist Map // Input: http://hadoop.apache.org Pairs = Tokenize_And_Pair ( Text ( Input ) ) Output = { (apache, hadoop) (hadoop, mapreduce) (hadoop, streaming) (hadoop, pig) (apache, pig) (hadoop, DFS) (streaming, commandline) (hadoop, java) (DFS, namenode) (datanode, block) (replication, default)... } 27 Wednesday, 22 September 2010 Search Assist Reduce // Input: GroupedList (word, GroupedList(words)) CountedPairs = CountOccurrences (word, RelatedWords) Output = { (hadoop, apache, 7) (hadoop, DFS, 3) (hadoop, streaming, 4) (hadoop, mapreduce, 9) ... } 28 Wednesday, 22 September 2010 Issues with Large Data • Map Parallelism: Splitting input data • Shipping input data • Reduce Parallelism: • Grouping related data • Dealing with failures • Load imbalance 29 Wednesday, 22 September 2010 Wednesday, 22 September 2010 Apache Hadoop • January 2006: Subproject of Lucene • January 2008: Top-level Apache project • Stable Version: 0.20 S (Strong Authentication) • Latest Version: 0.21 • Major contributors: Yahoo!, Facebook, Powerset, Cloudera 31 Wednesday, 22 September 2010 Apache Hadoop • Reliable, Performant Distributed file system • MapReduce Programming framework • Related Projects: HBase, Hive, Pig, Howl, Oozie, Zookeeper, Chukwa, Mahout, Cascading, Scribe, Cassandra, Hypertable, Voldemort, Azkaban, Sqoop, Flume, Avro ... 32 Wednesday, 22 September 2010 Problem: Bandwidth to Data • Scan 100TB Datasets on 1000 node cluster • Remote storage @ 10MB/s = 165 mins • Local storage @ 50-200MB/s = 33-8 mins • Moving computation is more efficient than moving data • Need visibility into data placement 33 Wednesday, 22 September 2010 Problem: Scaling Reliably • Failure is not an option, it's a rule ! • 1000 nodes, MTBF < 1 day • 4000 disks, 8000 cores, 25 switches, 1000 NICs, 2000 DIMMS (16TB RAM) • Need fault tolerant store with reasonable availability guarantees • Handle hardware faults transparently 34 Wednesday, 22 September 2010 Hadoop Goals • Scalable: Petabytes (1015 Bytes) of data on thousands on nodes • Economical: Commodity components only • Reliable • Engineering reliability into every application is expensive 35 Wednesday, 22 September 2010 Hadoop MapReduce 36 Wednesday, 22 September 2010 Think MR • Record = (Key, Value) • Key : Comparable, Serializable • Value: Serializable • Input, Map, Shuffle, Reduce, Output 37 Wednesday, 22 September 2010 Seems Familiar ? cat /var/log/auth.log* | \ grep "session opened" | cut -d' ' -f10 | \ sort | \ uniq -c > \ ~/userlist 38 Wednesday, 22 September 2010 Map • Input: (Key 1 , Value 1 ) • Output: List(Key 2 , Value 2 ) • Projections, Filtering, Transformation 39 Wednesday, 22 September 2010 Shuffle • Input: List(Key 2 , Value 2 ) • Output • Sort(Partition(List(Key 2 , List(Value 2 )))) • Provided by Hadoop 40 Wednesday, 22 September 2010 Reduce • Input: List(Key 2 , List(Value 2 )) • Output: List(Key 3 , Value 3 ) • Aggregation 41 Wednesday, 22 September 2010 Example: Unigrams • Input: Huge text corpus • Wikipedia Articles (40GB uncompressed) • Output: List of words sorted in descending order of frequency 42 Wednesday, 22 September 2010 Unigrams $ cat ~/wikipedia.txt | \ sed -e 's/ /\n/g' | grep . | \ sort | \ uniq -c > \ ~/frequencies.txt $ cat ~/frequencies.txt | \ # cat | \ sort -n -k1,1 -r | # cat > \ ~/unigrams.txt 43 Wednesday, 22 September 2010 MR for Unigrams mapper (filename, file-contents): for each word in file-contents: emit (word, 1) reducer (word, values): sum = 0 for each value in values: sum = sum + value emit (word, sum) 44 Wednesday, 22 September 2010 MR for Unigrams mapper (word, frequency): emit (frequency, word) reducer (frequency, words): for each word in words: emit (word, frequency) 45 Wednesday, 22 September 2010 Hadoop Streaming • Hadoop is written in Java • Java MapReduce code is "native" • What about Non-Java Programmers ? • Perl, Python, Shell, R • grep, sed, awk, uniq as Mappers/Reducers • Text Input and Output 46 Wednesday, 22 September 2010 Hadoop Streaming • Thin Java wrapper for Map & Reduce Tasks • Forks actual Mapper & Reducer • IPC via stdin, stdout, stderr • Key.toString() \t Value.toString() \n • Slower than Java programs • Allows for quick prototyping / debugging 47 Wednesday, 22 September 2010 Hadoop Streaming $ bin/hadoop jar hadoop-streaming.jar \ -input in-files -output out-dir \ -mapper mapper.sh -reducer reducer.sh # mapper.sh sed -e 's/ /\n/g' | grep . # reducer.sh uniq -c | awk '{print $2 "\t" $1}' 48 Wednesday, 22 September 2010 Running Hadoop Jobs 49 Wednesday, 22 September 2010 Running a Job [milindb@gateway ~]$ hadoop jar \ $HADOOP_HOME/hadoop-examples.jar wordcount \ /data/newsarchive/20080923 /tmp/newsout input.FileInputFormat: Total input paths to process : 4 mapred.JobClient: Running job: job_200904270516_5709 mapred.JobClient: map 0% reduce 0% mapred.JobClient: map 3% reduce 0% mapred.JobClient: map 7% reduce 0% .... mapred.JobClient: map 100% reduce 21% mapred.JobClient: map 100% reduce 31% mapred.JobClient: map 100% reduce 33% mapred.JobClient: map 100% reduce 66% mapred.JobClient: map 100% reduce 100% mapred.JobClient: Job complete: job_200904270516_5709 50 Wednesday, 22 September 2010 Running a Job mapred.JobClient: Counters: 18 mapred.JobClient: Job Counters mapred.JobClient: Launched reduce tasks=1 mapred.JobClient: Rack-local map tasks=10 mapred.JobClient: Launched map tasks=25 mapred.JobClient: Data-local map tasks=1 mapred.JobClient: FileSystemCounters mapred.JobClient: FILE_BYTES_READ=491145085 mapred.JobClient: HDFS_BYTES_READ=3068106537 mapred.JobClient: FILE_BYTES_WRITTEN=724733409 mapred.JobClient: HDFS_BYTES_WRITTEN=377464307 51 Wednesday, 22 September 2010 Running a Job mapred.JobClient: Map-Reduce Framework mapred.JobClient: Combine output records=73828180 mapred.JobClient: Map input records=36079096 mapred.JobClient: Reduce shuffle bytes=233587524 mapred.JobClient: Spilled Records=78177976 mapred.JobClient: Map output bytes=4278663275 mapred.JobClient: Combine input records=371084796 mapred.JobClient: Map output records=313041519 mapred.JobClient: Reduce input records=15784903 52 Wednesday, 22 September 2010 JobTracker WebUI Wednesday, 22 September 2010 JobTracker Status Wednesday, 22 September 2010 Jobs Status Wednesday, 22 September 2010 Job Details Wednesday, 22 September 2010 Job Counters Wednesday, 22 September 2010 Job Progress Wednesday, 22 September 2010 All Tasks Wednesday, 22 September 2010 Task Details Wednesday, 22 September 2010 Task Counters Wednesday, 22 September 2010 Task Logs Wednesday, 22 September 2010 Hadoop Distributed File System 63 Wednesday, 22 September 2010 HDFS • Data is organized into files and directories • Files (default are 64MB) divided and into distributed uniform sized across blocks cluster nodes • HDFS computation exposes can block be migrated placement to so data that 64 Wednesday, 22 September 2010 HDFS • Blocks are replicated (default 3) to handle hardware failure • Replication for performance and fault tolerance (Rack-Aware placement) • HDFS keeps checksums of data for corruption detection and recovery 65 Wednesday, 22 September 2010 HDFS • Master-Worker Architecture • Single NameNode • Many (Thousands) DataNodes 66 Wednesday, 22 September 2010 HDFS Master (NameNode) • Manages filesystem namespace • File metadata (i.e. "inode") • Mapping inode to list of blocks + locations • Authorization & Authentication • Checkpoint & journal namespace changes 67 Wednesday, 22 September 2010 Namenode • Mapping of datanode to list of blocks • Monitor datanode health • Replicate missing blocks • Keeps ALL namespace in memory • 60M objects (File/Block) in 16GB 68 Wednesday, 22 September 2010 Datanodes • Handle block storage on multiple volumes & block integrity • Clients access the blocks directly from data nodes • Periodically send heartbeats and block reports to Namenode • Blocks are stored as underlying OS's files 69 Wednesday, 22 September 2010 HDFS Architecture Wednesday, 22 September 2010 Replication • A file's replication factor can be changed dynamically (default 3) • Block placement is rack aware • Block under-replication & over-replication is detected by Namenode • Balancer application rebalances blocks to balance datanode utilization 71 Wednesday, 22 September 2010 Accessing HDFS hadoop fs [-fs ] [-conf ] [-D ] [-ls ] [-lsr ] [-du ] [-dus ] [-mv ] [-cp ] [-rm ] [-rmr ] [-put ... ] [-copyFromLocal ... ] [-moveFromLocal ... ] [-get [-ignoreCrc] [-crc] [-getmerge [addnl]] [-cat ] [-copyToLocal [-ignoreCrc] [-crc] ] [-moveToLocal ] [-mkdir ] [-report] [-setrep [-R] [-w] ] [-touchz ] [-test -[ezd] ] [-stat [format] ] [-tail [-f] ] [-text ] [-chmod [-R] PATH...] [-chown [-R] [OWNER][:[GROUP]] PATH...] [-chgrp [-R] GROUP PATH...] [-count[-q] ] [-help [cmd]] 72 Wednesday, 22 September 2010 HDFS Java API // Get default file system instance fs = Filesystem.get(new Configuration()); // Or Get file system instance from URI fs = Filesystem.get(URI.create(uri), new Configuration()); // Create, open, list, ... OutputStream out = fs.create(path, ...); InputStream in = fs.open(path, ...); boolean isDone = fs.delete(path, recursive); FileStatus[] fstat = fs.listStatus(path); 73 Wednesday, 22 September 2010 libHDFS #include "hdfs.h" hdfsFS fs = hdfsConnectNewInstance("default", 0); hdfsFile writeFile = hdfsOpenFile(fs, "/tmp/test.txt", O_WRONLY|O_CREAT, 0, 0, 0); tSize num_written = hdfsWrite(fs, writeFile, (void*)buffer, sizeof(buffer)); hdfsCloseFile(fs, writeFile); hdfsFile readFile = hdfsOpenFile(fs, "/tmp/test.txt", O_RDONLY, 0, 0, 0); tSize num_read = hdfsRead(fs, readFile, (void*)buffer, sizeof(buffer)); hdfsCloseFile(fs, readFile); hdfsDisconnect(fs); 74 Wednesday, 22 September 2010 Hadoop Scalability Wednesday, 22 September 2010 Scalability of Parallel Programs • If one node processes k MB/s, then N nodes should process (k*N) MB/s • If some fixed amount of data is processed in T minutes on one node, the N nodes should process same data in (T/N) minutes • Linear Scalability Wednesday, 22 September 2010 Latency Goal: Minimize program execution time Wednesday, 22 September 2010 Throughput Goal: Maximize Data processed per unit time Wednesday, 22 September 2010 Amdahl's Law • If parts, computation C 1 ..C N C is split into N different • If by partial a computation factor of S i C i can be speeded up • Then by overall computation ∑C i /∑(C i /S i ) speedup is limited Wednesday, 22 September 2010 Amdahl's Law • Suppose Job has 5 phases: P 0 is 10 seconds, P 1 , P 2 , P 3 are 200 seconds each, and P 4 is 10 seconds • Sequential runtime = 620 seconds • P 1 , P 2 , P 3 parallelized on 100 machines with speedup of 80 (Each executes in 2.5 seconds) • After parallelization, runtime = 27.5 seconds • Effective Speedup: 22.5 Wednesday, 22 September 2010 Efficiency • Suppose, speedup on N processors is S, then Efficiency = S/N • In previous example, Efficiency = 0.225 • Goal: Maximize efficiency, while meeting required SLA Wednesday, 22 September 2010 Amdahl's Law Wednesday, 22 September 2010 Map Reduce Data Flow Wednesday, 22 September 2010 MapReduce Dataflow Wednesday, 22 September 2010 MapReduce Wednesday, 22 September 2010 Job Submission Wednesday, 22 September 2010 Initialization Wednesday, 22 September 2010 Scheduling Wednesday, 22 September 2010 Execution Wednesday, 22 September 2010 Map Task Wednesday, 22 September 2010 Reduce Task Wednesday, 22 September 2010 Hadoop Performance Tuning Wednesday, 22 September 2010 Example • "Bob" wants to count records in event logs (several hundred GB) • Used Identity Mapper (/bin/cat) & Single counting reducer (/bin/wc -l) • What is he doing wrong ? • This happened, really ! Wednesday, 22 September 2010 MapReduce Performance • Minimize Overheads • Task Scheduling & Startup • Network Connection Setup • Reduce intermediate data size • Map Outputs + Reduce Inputs • Opportunity to load balance Wednesday, 22 September 2010 Number of Maps • Number of Input Splits, computed by InputFormat • Default FileInputFormat: • Number of HDFS Blocks • Good locality • Does not cross file boundary Wednesday, 22 September 2010 Changing Number of Maps • mapred.map.tasks • mapred.min.split.size • Concatenate small files into big files • Change dfs.block.size • Implement InputFormat • CombineFileInputFormat Wednesday, 22 September 2010 Number of Reduces • mapred.reduce.tasks • Shuffle Overheads (M*R) • 1-2 GB of data per reduce task • Consider time needed for re-execution • Memory consumed per group/bag Wednesday, 22 September 2010 Shuffle • Often the most expensive component • M * R Transfers over the network • Sort map outputs (intermediate data) • Merge reduce inputs Wednesday, 22 September 2010 8 x 1Gbps 8 x 1Gbps Typical Hadoop Network Architecture Wednesday, 22 September 2010 40 x 1Gbps 40 x 1Gbps 40 x 1Gbps 40 Improving Shuffle • Avoid shuffling/sorting if possible • Minimize redundant transfers • Compress intermediate data Wednesday, 22 September 2010 Avoid Shuffle • Set mapred.reduce.tasks to zero • Known as map-only computations • Filters, Projections, Transformations • Number of output files = number of input splits = number of input blocks • May overwhelm HDFS Wednesday, 22 September 2010 Minimize Redundant Transfers • Combiners • Intermediate data compression Wednesday, 22 September 2010 Combiners • When Maps produce many repeated keys • Combiner: Local aggregation after Map & before Reduce • Side-effect free • Same interface as Reducers, and often the same class Wednesday, 22 September 2010 Compression • Often yields huge performance gains • Set mapred.output.compress=true to compress job output • Set mapred.compress.map.output=true to compress map outputs • Codecs: Java zlib (default), LZO, bzip2, native gzip Wednesday, 22 September 2010 Load Imbalance • Inherent in application • Imbalance in input splits • Imbalance in computations • Imbalance in partitions • Heterogenous hardware • Degradation over time Wednesday, 22 September 2010 Speculative Execution • Runs multiple instances of slow tasks • Instance that finishes first, succeeds • mapred.map.speculative.execution=true • mapred.reduce.speculative.execution=true • Can dramatically bring in long tails on jobs Wednesday, 22 September 2010 Best Practices - 1 • Use higher-level languages (e.g. Pig) • Coalesce small files into larger ones & use bigger HDFS block size • Tune buffer sizes for tasks to avoid spill to disk, consume one slot per task • Use combiner if local aggregation reduces size Wednesday, 22 September 2010 Best Practices - 2 • Use compression everywhere • CPU-efficient for intermediate data • Disk-efficient for output data • Use Distributed Cache to distribute small side-files (Less than DFS block size) • Minimize number of NameNode/JobTracker RPCs from tasks Wednesday, 22 September 2010 Apache Pig Wednesday, 22 September 2010 What is Pig? • System for processing large semi-structured data sets using Hadoop MapReduce platform • Pig Latin: High-level procedural language • Pig Engine: Parser, Optimizer and distributed query execution Wednesday, 22 September 2010 Pig vs SQL • Pig is procedural (How) • Nested relational data model • Schema is optional • Scan-centric workloads analytic • Limited query optimization • SQL is declarative • Flat relational data model • Schema is required • OLTP + OLAP workloads • Significant query optimization opportunity for Wednesday, 22 September 2010 Pig vs Hadoop • Increase programmer productivity • Decrease duplication of effort • Insulates against Hadoop complexity • Version Upgrades • JobConf configuration tuning • Job Chains Wednesday, 22 September 2010 Wednesday, 22 September 2010 Example • Input: User profiles, Page visits • Find the top 5 most visited pages by users aged 18-25 Wednesday, 22 September 2010 In Native Hadoop In Pig Users = load 'users' as (name, age); Filtered = filter Users by age >= 18 and age <= 25; Pages = load 'pages' as (user, url); Joined = join Filtered by name, Pages by user; Grouped = group Joined by url; Summed = foreach Grouped generate group, COUNT(Joined) as clicks; Sorted = order Summed by clicks desc; Top5 = limit Sorted 5; store Top5 into 'top5sites'; 115 Wednesday, 22 September 2010 Wednesday, 22 September 2010 Natural Fit Wednesday, 22 September 2010 Comparison Flexibility & Control • Easy to plug-in user code • Metadata is not mandatory • Pig does not impose a data model on you • Fine grained control • Complex data types Wednesday, 22 September 2010 Pig Data Types • Tuple: Ordered set of fields • Field can be simple or complex type • Nested relational model • Bag: Collection of tuples • Can contain duplicates • Map: Set of (key, value) pairs Wednesday, 22 September 2010 Simple data types • int : 42 • long : 42L • float : 3.1415f • double : 2.7182818 • chararray : UTF-8 String • bytearray : blob Wednesday, 22 September 2010 Expressions A = LOAD 'data.txt AS (f1:int , f2:{t:(n1:int, n2:int)}, f3: map[] ) A = { (1, -- A.f1 or A.$0 { (2, 3), (4, 6) }, -- A.f2 or A.$1 [ 'yahoo'#'mail' ] -- A.f3 or A.$2 } 121 Wednesday, 22 September 2010 Counting Word Frequencies • Input: Large text document • Process: • Load the file • For each line, generate word tokens • Group by word • Count words in each group Wednesday, 22 September 2010 Load myinput = load '/user/milindb/text.txt' USING TextLoader() as (myword:chararray); (program program) (pig pig) (program pig) (hadoop pig) (latin latin) (pig latin) 123 Wednesday, 22 September 2010 Tokenize words = FOREACH myinput GENERATE FLATTEN(TOKENIZE(*)); (program) (program) (pig) (pig) (program) (pig) (hadoop) (pig) (latin) (latin) (pig) (latin) 124 Wednesday, 22 September 2010 Group grouped = GROUP words BY $0; (pig, {(pig), (pig), (pig), (pig), (pig)}) (latin, {(latin), (latin), (latin)}) (hadoop, {(hadoop)}) (program, {(program), (program), (program)}) 125 Wednesday, 22 September 2010 Count counts = FOREACH grouped GENERATE group, COUNT(words); (pig, 5L) (latin, 3L) (hadoop, 1L) (program, 3L) 126 Wednesday, 22 September 2010 Store store counts into '/user/milindb/output' using PigStorage(); pig 5 latin 3 hadoop 1 program 3 127 Wednesday, 22 September 2010 Example: Log Processing -- use a custom loader Logs = load 'apachelogfile' using CommonLogLoader() as (addr, logname, user, time, method, uri, p, bytes); -- apply your own function Cleaned = foreach Logs generate addr, canonicalize(url) as url; Grouped = group Cleaned by url; -- run the result through a binary Analyzed = stream Grouped through 'urlanalyzer.py'; store Analyzed into 'analyzedurls'; 128 Wednesday, 22 September 2010 Schema on the fly -- declare your types Grades = load 'studentgrades' as (name: chararray, age: int, gpa: double); Good = filter Grades by age > 18 and gpa > 3.0; -- ordering will be by type Sorted = order Good by gpa; store Sorted into 'smartgrownups'; 129 Wednesday, 22 September 2010 Nested Data Logs = load 'weblogs' as (url, userid); Grouped = group Logs by url; -- Code inside {} will be applied to each -- value in turn. DisinctCount = foreach Grouped { Userid = Logs.userid; DistinctUsers = distinct Userid; generate group, COUNT(DistinctUsers); } store DistinctCount into 'distinctcount'; 130 Wednesday, 22 September 2010 Wednesday, 22 September 2010 Pig Architecture Wednesday, 22 September 2010 Pig Frontend Logical Plan • Directed Acyclic Graph • Logical Operator as Node • Data flow as edges • Logical Operators • One per Pig statement • Type checking with Schema Wednesday, 22 September 2010 Pig Statements Load Read data from the file system Store Write data to the file system Dump Write data to stdout Wednesday, 22 September 2010 Pig Statements Foreach..Generate Apply expression to each record and generate one or more records Filter Apply predicate to each record and remove records where false Stream..through Stream user-provided records binary through Wednesday, 22 September 2010 Pig Statements Group/CoGroup Collect records with the same key from one or more inputs Join Join two or more inputs based on a key Order..by Sort records based on a key Wednesday, 22 September 2010 Physical Plan • Pig supports two back-ends • Local • Hadoop MapReduce • 1:1 correspondence with most logical operators • Except Distinct, Group, Cogroup, Join etc Wednesday, 22 September 2010 MapReduce Plan • Detect Map-Reduce boundaries • Group, Cogroup, Order, Distinct • Coalesce operators into Map and Reduce stages • Job.jar is created and submitted to Hadoop JobControl Wednesday, 22 September 2010 Lazy Execution • Nothing really executes until you request output • Store, Dump, Explain, Describe, Illustrate • Advantages • In-memory pipelining • Filter re-ordering across multiple commands Wednesday, 22 September 2010 Parallelism • Split-wise parallelism on Map-side operators • By default, 1 reducer • PARALLEL keyword • group, cogroup, cross, join, distinct, order Wednesday, 22 September 2010 Running Pig $ pig grunt > A = load 'students' as (name, age, gpa); grunt > B = filter A by gpa > '3.5'; grunt > store B into 'good_students'; grunt > dump A; (jessica thompson, 73, 1.63) (victor zipper, 23, 2.43) (rachel hernandez, 40, 3.60) grunt > describe A; A: (name, age, gpa ) 141 Wednesday, 22 September 2010 Running Pig • Batch mode • $ pig myscript.pig • Local mode • $ pig –x local • Java mode (embed pig statements in java) • Keep pig.jar in the class path Wednesday, 22 September 2010 Pig for SQL Programmers Wednesday, 22 September 2010 SQL to Pig SQL Pig ...FROM MyTable... A = LOAD 'MyTable' USING PigStorage('\t') (col1:int, col2:int, col3:int); AS SELECT col1 + col2, col3 ... B = FOREACH A GENERATE col1 + col2, col3; ...WHERE col2 > 2 C = FILTER B by col2 > 2; Wednesday, 22 September 2010 SQL to Pig SQL Pig SELECT col1, col2, sum(col3) FROM X GROUP BY col1, col2 D = GROUP A BY (col1, col2) E = FOREACH D GENERATE FLATTEN(group), SUM(A.col3); ...HAVING sum(col3) > 5 F = FILTER E BY $2 > 5; ...ORDER BY col1 G = ORDER F BY $0; Wednesday, 22 September 2010 SQL to Pig SQL Pig SELECT DISTINCT col1 from X I = FOREACH A GENERATE col1; J = DISTINCT I; SELECT col1, count (DISTINCT col2) FROM X GROUP BY col1 K = GROUP A BY col1; L = FOREACH K { M = DISTINCT A.col2; GENERATE FLATTEN(group), count(M); } Wednesday, 22 September 2010 SQL to Pig SQL Pig SELECT A.col1, B. col3 FROM A JOIN B USING (col1) N = JOIN A by col1 INNER, B by col1 INNER; O = FOREACH N GENERATE A.col1, B.col3; -- Or N = COGROUP A by col1 INNER, B by col1 INNER; O = FOREACH N GENERATE flatten(A), flatten(B); P = FOREACH O GENERATE A.col1, B.col3 Wednesday, 22 September 2010 Wednesday, 22 September 2010 Questions ?
Share: