Saturday, May 21, 2016

Partitioning and Bucketing in Hive

Partitioning data is often used for distributing load horizontally, this has performance benefit, and helps in organizing data in a logical fashion. Example like if we are dealing with large employee table and often run queries with WHERE clauses that restrict the results to a particular country or department . For a faster query response Hive table can be PARTITIONED BY (country STRING, DEPT STRING), Partitioning tables changes how Hive structures the data storage and Hive will now create subdirectories reflecting the partitioning structure like . .../employees/country=ABC/DEPT=XYZ. If query limits for employee from country ABC t will only scan the contents of one directory ABC. This can dramatically improve query performance, but only if the partitioning scheme reflects common filtering. Partitioning feature is very useful in Hive, however, a design that creates too many partitions may optimize some queries, but be detrimental for other important queries. Other drawback is having too many partitions is the large number of Hadoop files and directories that are created unnecessarily and overhead to NameNode since it must keep all metadata for the file system in memory.
Bucketing is another technique for decomposing data sets into more manageable parts. For example, suppose a table using the date as the top-level partition and the employee_id as the second-level partition leads to too many small partitions. Instead, if we bucket the employee table and use employee_id as the bucketing column, the value of this column will be hashed by a user-defined number into buckets. Records with the same employee_id will always be stored in the same bucket. Assuming the number of employee_id is much greater than the number of buckets, each bucket will have many employee_id. While creating table you can specify like CLUSTERED BY (employee_id) INTO XX BUCKETS ; where XX is the number of buckets . Bucketing has several advantages. The number of buckets is fixed so it does not fluctuate with data. If two tables are bucketed by employee_id, Hive can create a logically correct sampling. Bucketing also aids in doing efficient map-side joins etc.

The below given questions and answers will give a clear idea about partitioning and bucketing together.

Question : Suppose if we specify 32 buckets in the CLUSED BY clause and the CREATE TABLE statement also contains the Partitioning clause, how partitions and buckets will be managed together ? Does number of partitions will be limited to 32 ? OR for each partition, 32 buckets will be created ? Is every bucket an HDFS file ?

Answer : A hive table can have both partitioning and bucketing. Based on your partition clause , for each partition will have 32 buckets created. Yes HDFS file.

Hive Join strategies

An SQL JOIN clause is used to combine rows from two or more tables, based on a common field between them. In a similar line we’ve Hive Query Language(HQL or HiveQL) joins; which is the key factor for the optimization and performance of hive queries. Choosing the right join based on the data and business need is key principal to improve the Hive query performance.
Let’s try to understand that how join works in Hive execution. In general join operation will be compiled to a MapReduce task, involves a map stage and a reduce stage. A mapper reads from join tables and emits the join key and join value pair into an intermediate file. Hadoop sorts and merges these pairs in what’s called the shuffle stage. The reducer takes the sorted results as input and does the actual join work. The shuffle stage is very expensive; hence saving this stage will improves the task performance. In simple, the Join is a clause that combines the records of two tables (or Data-Sets).
Shuffle Join / Common Join:
When: 
  • It’s a default choice and it always works
How:
  • Reads from part of one of the tables
  • Buckets and sorts on Join key
  • Sends one bucket to each reduce
  • Join is done on the Reduce side
Pointers:
  • Works all time
Map Join:
When:
  • One table should be small, which fits in memory
  • To save the shuffle & reduce stages
  • To do the join work only in the map stage
  • It’s suitable for small tables to optimize the task
How:
  • Reads small table into memory hash table
  • Streams through part of the big file
  • Joining each record from hash table
  • It is similar to a join but all the task will be performed by the mapper alone
Pointers:
  • Very fast, but it’s limited
  • Using the Distributed Cache solve the scaling problem limitations
  • Most improvement comes from removing the JDBM component
  • No need to use persistent hashtable for map join
  • query hint as MAPJOIN
  • RIGHT OUTER JOIN / FULL OUTER JOIN is not possible
  • It has no reduce task and it can handle only one key at a time
  • set hive.auto.convert.join; If true automatically converts the joins to mapjoins at run time
  • set hive.auto.convert.join.noconditionaltask; If true no longer a need to provide the map-join hint
  • set hive.auto.convert.join.noconditionaltask.size; It controls the size of table to fit in memory
Left Semi Join:
When:
  • To have functionality of IN/EXISTS subquery semantics
How:
  • It’s generic join and effective for inner joins
  • Once the match is found it will stop looking for the scan of the other records
Pointers:
  • Right hand side table should only be reference with ON clause, not with WHERE/SELECT clause
  • Right semi-joins are not supported in Hive
Bucket Map Join:
When:
  • Total table/partition size is big, not good for mapjoin
  • Non-sorted the same
  • Bucketd the same
  • Joining on the sort/bucketing on the multiple columns
How:
  • Work together with map join, and all join tables are bucketized
  • Each small tableʼs bucket number can be divided by big tableʼs bucket number
  • Bucket columns == Join columns
  • Only matching buckets of all small tables are replicated onto each mapper
Pointers:
  • set hive.optimize.bucketmapjoin; If true  then Bucket Map Join is activated
Sort Merge Bucket (SMB) Join:
When:
  • Sorted the same
  • Bucketed the same
  • Joining on the sort/bucket on the same/equal columns
How:
  • Reads a bucket from each table
  • Process the row with the lowest value
Pointers:
  • Very efficient if applicable
  • Both Mapt & Reduce task are used
  • set hive.input.format; If it’s org.apache.hadoop.hive.ql.io.bucketizedhiveinputformat the SMB join is activated
  • set hive.auto.convert.sortmerge.join=true
  • set hive.optimize.bucketmapjoin = true
  • set hive.optimize.bucketmapjoin.sortedmerge = true
  • set hive.auto.convert.sortmerge.join.noconditionaltask=true
Sort Merge Bucket Map(SMB Map) Join:
When:
  • Sorted the same
  • Bucketed the same
  • Joining on the sort/bucket on the same/equal columns
  • No limit on file/partition/table size
How:
  • Partitioned table might be slows down due to each single key in map needs small chunk
  • Work together with bucket map join
  • Bucket columns == Join columns == sort columns Sort Merge
  • Small tables are read on demand
  • NOT hold entire small tables in memory
  • Can perform outer join
Pointers:
  • set hive.auto.convert.sortmerge.join=true
  • set hive.optimize.bucketmapjoin = true
  • set hive.optimize.bucketmapjoin.sortedmerge = true
  • set hive.auto.convert.sortmerge.join.noconditionaltask=true
  • sethive.auto.convert.sortmerge.join.bigtable.selection.policy=org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ
Skew Join:
When:
  • Needs to join on the two very large data tables
  • Join bottle necked on the reducer who gets the skewed key
How:
  • If a small number of skewed keys make up for a significant percentage of the data,
    they will not become bottlenecks
Pointers:
  • Because of the partial results, the results also have to be read and written twice
  • The user needs to be aware of the skew in the data and manually do the above process
Cartesian Product Join:
When:
  • To generate all the set of records from all the tables in the applications
How:
  • Not optimized as in the MapReduce usage
  • Computes full cartesian, before WHERE clause is applied
Pointers:
  • set hive.mapred.mode=strict; help us to prevent from submitting unknowing Cartesian product query