Showing posts with label Z7_Hadoop-Ecosystem. Show all posts
Showing posts with label Z7_Hadoop-Ecosystem. Show all posts

Saturday, April 2, 2016

Apache Spark

Scala on Spark cheatsheet


1. Define a object with main function -- Helloworld

object HelloWorld {
  def main(args: Array[String]) {
    println("Hello, world!")
  }
}
Execute main function:
scala> HelloWorld.main(null)
Hello, world!

2. Creating RDDs

  • Parallelized Collections:

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
  • External Datasets: 

val distFile = sc.textFile("data.txt")
Above command returns the content of the file:
scala> distFile.collect()
res16: Array[String] = Array(1,2,3, 4,5,6)
SparkContext.wholeTextFiles can return (filename, content).
val distFile = sc.wholeTextFiles("/tmp/tmpdir")

scala> distFile.collect()
res17: Array[(String, String)] =
Array((maprfs:/tmp/tmpdir/data3.txt,"1,2,3
4,5,6
"), (maprfs:/tmp/tmpdir/data.txt,"1,2,3
4,5,6
"), (maprfs:/tmp/tmpdir/data2.txt,"1,2,3
4,5,6
"))

3. RDD Operations

  • Transformations (which create a new dataset from an existing one) Lazy!

3.1 map(f:T-U)

Return a new distributed dataset formed by passing each element of the source through a function func.
Example 1: To calculate the length of each line.

scala> lines.map(s => s.length).collect
res46: Array[Int] = Array(48, 25, 34, 5, 6, 6, 5, 5, 6)  

3.2 filter(f:T->Bool)

Return a new dataset formed by selecting those elements of the source on which func returns true.
Example 1: Find the lines which starts with "APPLE":
scala> lines.filter(_.startsWith("APPLE")).collect
res50: Array[String] = Array(APPLE)
Example 2: Find the lines which contains "test":
scala> lines.filter(_.contains("test")).collect
res54: Array[String] = Array("This is a test data text file for Spark to use. ", "To test Scala and Spark, ")

3.3 flatMap(func)

Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
Example 1: Generate the Int List and compare "map" and "flatMap".
scala> val intlist = List( 1,2,3,4,5 )
intlist: List[Int] = List(1, 2, 3, 4, 5)

scala> intlist.map(x=>List(x,x*2))
res72: List[List[Int]] = List(List(1, 2), List(2, 4), List(3, 6), List(4, 8), List(5, 10))

scala> intlist.flatMap(x=>(List(x,x*2)))
res73: List[Int] = List(1, 2, 2, 4, 3, 6, 4, 8, 5, 10)
Example 2: Use flatMap for map
scala> val m = Map(1 -> 2, 2 -> 4, 3 -> 6)
m: scala.collection.immutable.Map[Int,Int] = Map(1 -> 2, 2 -> 4, 3 -> 6)

scala> def h(k:Int, v:Int) = if (v > 2) Some(k->v) else None
h: (k: Int, v: Int)Option[(Int, Int)]

scala> m.flatMap { case (k,v) => h(k,v) }
res76: scala.collection.immutable.Map[Int,Int] = Map(2 -> 4, 3 -> 6)

3.4 mapPartitions(func)    

Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator => Iterator when running on an RDD of type T.
To be simple:
map converts each element of the source RDD into a single element of the result RDD by applying a function.
mapPartitions converts each partition of the source RDD into into multiple elements of the result (possibly none).
It can improve performance by reducing new object creation in the map function.
Example 1: We have totally 9 lines here, instead of map each line, we can firstly split all 9 lines into 2 partitions, and then we only need to map twice.
Firstly we need to create a map function which accepts Iterator as inputs and also as returning value.
This function simply return the size of each partition.

def myfunc(inputs: Iterator[String]) : Iterator[Int] = {
  var results = List[Int]()
  results .::= (inputs.size)
  results.iterator
}
Then:
scala> lines.count
res12: Long = 9

scala> lines.mapPartitions(myfunc).collect
res9: Array[Int] = Array(2, 7)

3.5 mapPartitionsWithIndex(func)   

Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator) => Iterator when running on an RDD of type T.
Example 1: Take above mapPartitions example, and here we have one more "index" as input value for map function.
def myfunc2(index: Int, inputs: Iterator[String]) : Iterator[(Int, Int)] = {
  var results = List[(Int,Int)]()
  results .::= (index, inputs.size)
  results.iterator
}
Then:
scala> lines.mapPartitionsWithIndex(myfunc2).collect
res14: Array[(Int, Int)] = Array((0,2), (1,7))

3.6 sample(withReplacement, fraction, seed)   

Sample a fraction of the data, with or without replacement, using a given random number generator seed.
Note: Comparing to takeSample, the 2nd parameter of sample() is how much percentage of the total number should be sampled. However the actual number sampled may not be exactly the same.
Example 1: Fraction = 0.5 may not be exactly 50% of total numbers. It may change.
scala> val list = sc.parallelize(1 to 9)
list: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[55] at parallelize at :33

scala> list.sample(true, 0.5) .collect
res63: Array[Int] = Array(7, 9)

scala> list.sample(true, 0.5) .collect
res64: Array[Int] = Array(1, 1, 2, 4, 5, 6)
Example 2: "withReplacement"=true means output may have duplicate elements, else, it will not.
scala> list.sample(true, 1).collect
res70: Array[Int] = Array(4, 4, 4, 4, 8, 8, 9, 9)

scala> list.sample(false, 1).collect
res71: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
Example 3: If "seed" does not change, the result will not change.
scala> list.sample(true, 0.5, 1).collect
res73: Array[Int] = Array(5, 7, 8, 9, 9)

scala> list.sample(true, 0.5, 1).collect
res74: Array[Int] = Array(5, 7, 8, 9, 9)

3.7 union(otherDataset)   

Return a new dataset that contains the union of the elements in the source dataset and the argument.
Note: It is the same as operator "++".
Example 1: Union 2 array of Strings.
Unlike "Union" in SQL, here it will not de-duplicate.
scala> val list  = sc.parallelize(List("apple", "orange", "banana", "apple", "orange"))
scala> val list2 = sc.parallelize(List("mapr", "cloudera", "hortonworks"))

scala> (list union list2).collect
res4: Array[String] = Array(apple, orange, banana, apple, orange, mapr, cloudera, hortonworks)

scala> (list ++ list2).collect
res5: Array[String] = Array(apple, orange, banana, apple, orange, mapr, cloudera, hortonworks)

3.8 intersection(otherDataset)   

Return a new RDD that contains the intersection of elements in the source dataset and the argument.
Note: It will do de-duplicate here.
Example 1: Intersection 2 array of Strings. However the result only contains one "apple" although each of the arrays has more than one.
scala> val list  = sc.parallelize(List("apple", "orange", "banana", "apple", "orange"))
scala> val list2 = sc.parallelize(List("apple", "mapr", "apple" ))

list.intersection(list2).collect
res7: Array[String] = Array(apple)

3.9 distinct([numTasks]))   

Return a new dataset that contains the distinct elements of the source dataset.
Example 1: Return distinct values from one array.
scala> val list  = sc.parallelize(List("apple", "orange", "banana", "apple", "orange"))
scala> list.distinct.collect
res13: Array[String] = Array(orange, apple, banana)

3.10 groupByKey([numTasks])   

When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs.
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or combineByKey will yield much better performance.
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.
Example 1: Group by a list of (K,V) pairs.
scala> val kv = sc.parallelize( List(("apple", 1), ("orange", 2), ("banana", 3), ("apple", 2)) )

scala> kv.groupByKey.collect
res14: Array[(String, Iterable[Int])] = Array((orange,ArrayBuffer(2)), (apple,ArrayBuffer(1, 2)), (banana,ArrayBuffer(3)))

3.11 reduceByKey(func, [numTasks])   

When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
Example 1: Take above example to calculate the total sum of value for each key.
scala> kv.reduceByKey(_ + _ ).collect
res19: Array[(String, Int)] = Array((orange,2), (apple,3), (banana,3))

3.12 aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])    

When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
Example 1:Take above example to debug the functions.
scala> kv.aggregateByKey("")(  ( (a,b) => "DEBUG:" + "a=" + a + " b=" + b ), ((v1, v2) => v1 + " and " + v2) ).collect
res34: Array[(String, String)] = Array((orange,DEBUG:a= b=2), (apple,DEBUG:a= b=1 and DEBUG:a= b=2), (banana,DEBUG:a= b=3))

3.13 sortByKey([ascending], [numTasks])   

When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
Example 1:Take above example to sort by key asc or desc.
scala> kv.sortByKey(true).collect
res35: Array[(String, Int)] = Array((apple,2), (apple,1), (banana,3), (orange,2))

scala> kv.sortByKey(false).collect
res36: Array[(String, Int)] = Array((orange,2), (banana,3), (apple,2), (apple,1))

3.14 join(otherDataset, [numTasks])   

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are also supported through leftOuterJoin and rightOuterJoin.
Example 1: Inner Join
scala> val kv = sc.parallelize( List(("apple", 1), ("orange", 2), ("banana", 3), ("apple", 2)) )
scala> val kw = sc.parallelize( List(("apple", 999), ("orange", 222) ))

scala> kv.join(kw).collect
res37: Array[(String, (Int, Int))] = Array((orange,(2,222)), (apple,(1,999)), (apple,(2,999)))
Example 2: Left Outer Join
scala> kv.leftOuterJoin(kw).collect
res38: Array[(String, (Int, Option[Int]))] = Array((orange,(2,Some(222))), (apple,(2,Some(999))), (apple,(1,Some(999))), (banana,(3,None)))
Example 3: Right Outer Join
scala> kv.rightOuterJoin(kw).collect
res39: Array[(String, (Option[Int], Int))] = Array((orange,(Some(2),222)), (apple,(Some(1),999)), (apple,(Some(2),999)))

3.15 cogroup(otherDataset, [numTasks])   

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, Iterable, Iterable) tuples. This operation is also called groupWith.
Example 1: 2 RDDs GroupWith.
scala> kv.cogroup(kw).collect
res40: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((orange,(CompactBuffer(2),CompactBuffer(222))), (apple,(CompactBuffer(2, 1),CompactBuffer(999))), (banana,(CompactBuffer(3),CompactBuffer())))
Example 1: 3 RDDs GroupWith.
scala> val kw2 = sc.parallelize( List(("banana", 123), ("banana", 456) ))

scala> kv.cogroup(kw,kw2).collect
res41: Array[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] = Array((orange,(CompactBuffer(2),CompactBuffer(222),CompactBuffer())), (apple,(CompactBuffer(1, 2),CompactBuffer(999),CompactBuffer())), (banana,(CompactBuffer(3),CompactBuffer(),CompactBuffer(123, 456))))

3.16 cartesian(otherDataset)   

When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements). Example 1: 2 RDDs cartesian sets.
scala> val a = sc.parallelize(List(1,2,3))
scala> val b = sc.parallelize(List(4,5,6))
scala> a.cartesian(b).collect
res42: Array[(Int, Int)] = Array((1,4), (1,5), (1,6), (2,4), (3,4), (2,5), (2,6), (3,5), (3,6))

3.17 pipe(command, [envVars])   

Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.
Example 1: Split a List to 2 partitions, and the command will be executed from each partition.
scala> val list  = sc.parallelize(List("apple", "orange", "banana", "mapr", "cloudera" , "hortonworks") , 2)

scala> list.pipe("tail -1").collect
res34: Array[String] = Array(banana, hortonworks)

scala> list.pipe("tail -2").collect
res35: Array[String] = Array(orange, banana, cloudera, hortonworks)

3.18 coalesce(numPartitions)    

Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
Example 1: Reduce above "list" from 2 partitions to 1 partition. Comparing the differences.
scala> list.coalesce(1, false).pipe("tail -1").collect
res37: Array[String] = Array(hortonworks)

scala> list.coalesce(1, false).pipe("tail -2").collect
res38: Array[String] = Array(cloudera, hortonworks)

3.19 repartition(numPartitions)   

Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
Example 1: Increase above "list" from 2 partitions to 6 partitions. Comparing the differences.
scala> list.repartition(6).pipe("tail -1").collect
res39: Array[String] = Array(hortonworks, apple, orange, banana, mapr, cloudera)
  • Actions (which return a value to the driver program after running a computation on the dataset)

3.20 reduce(f: (T, T) => T): T

Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
Example 1: Calculate the sum of int from 1 to 9.
scala> val a = sc.parallelize(1 to 9)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at :33

scala> a.reduce(_ + _)
res5: Int = 45

3.21 collect()

Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
Example 1: Collect all elements of List and return an Array.
scala> val list = sc.parallelize(List("apple", "orange", "banana"))
list: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at :33

scala> list.collect
res6: Array[String] = Array(apple, orange, banana)

3.22 count()     

Return the number of elements in the dataset.
Example 1: Return number of above list.
scala> list.count
res7: Long = 3

3.23 first()     

Return the first element of the dataset (similar to take(1)).
Example 1: Return first element of above list. Similar as "limit 1" in SQL.
scala> list.first
res8: String = apple

3.24 take(n)     

Return an array with the first n elements of the dataset. Note that this is currently not executed in parallel. Instead, the driver program computes all the elements.
Example 1: Return first 2 elements of above list. Similar as "limit s" in SQL.
scala> list.take(2)
res9: Array[String] = Array(apple, orange)

3.25 takeSample(withReplacement, num, [seed])    

Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
Note: It returns an Array instead of RDD.
Example 1: "withReplacement"=true means output may have duplicate elements, else, it will not.
scala> val list = sc.parallelize(1 to 9)
list: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[55] at parallelize at :33

scala> list.takeSample(true, 10)
res59: Array[Int] = Array(5, 8, 2, 4, 8, 9, 9, 1, 4, 5)

scala> list.takeSample(false, 10)
res60: Array[Int] = Array(3, 8, 2, 6, 1, 7, 5, 9, 4)
Example 2: If "seed" does not change, the result will not change.
scala>  list.takeSample(true, 5, 1)
res61: Array[Int] = Array(8, 6, 3, 8, 9)

scala>  list.takeSample(true, 5, 1)
res62: Array[Int] = Array(8, 6, 3, 8, 9)

scala>  list.takeSample(true, 5, 2)
res63: Array[Int] = Array(3, 8, 4, 8, 9)

3.26 takeOrdered(n, [ordering])     

Return the first n elements of the RDD using either their natural order or a custom comparator.
Example 1: Similar as "order by limit n" in SQL.
scala> val list = sc.parallelize(List("apple", "orange", "banan", "APPLE", "BABY","cat", "1"  , "3" , "9" ))
list: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[64] at parallelize at :33

scala> list.takeOrdered(3)
res67: Array[String] = Array(1, 3, 9)

scala> list.takeOrdered(5)
res68: Array[String] = Array(1, 3, 9, APPLE, BABY)

3.27 saveAsTextFile(path)     

Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
Example 1: Save above list on HDFS.
list.saveAsTextFile("/tmp/tmpout") 

# hadoop fs -ls /tmp/tmpout
Found 3 items
-rwxr-xr-x   3 root root          0 2015-02-28 02:23 /tmp/tmpout/_SUCCESS
-rwxr-xr-x   3 root root         25 2015-02-28 02:23 /tmp/tmpout/part-00000
-rwxr-xr-x   3 root root         15 2015-02-28 02:23 /tmp/tmpout/part-00001
# hadoop fs -cat /tmp/tmpout/part-00000
apple
orange
banan
APPLE
# hadoop fs -cat /tmp/tmpout/part-00001
BABY
cat
1
3
9

3.28 saveAsSequenceFile(path)

(Java and Scala)     Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that either implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
Example 1: Save a key-value pair as sequence file on HDFS.
val a = sc.parallelize(Array(("apple",1), ("orange",2), ("banana",3)))
a.saveAsSequenceFile("/tmp/tmpoutseq")

# hadoop fs -ls /tmp/tmpoutseq
Found 3 items
-rwxr-xr-x   3 root root          0 2015-02-28 02:27 /tmp/tmpoutseq/_SUCCESS
-rw-r--r--   3 root root        103 2015-02-28 02:27 /tmp/tmpoutseq/part-00000
-rw-r--r--   3 root root        123 2015-02-28 02:27 /tmp/tmpoutseq/part-00001

3.29 saveAsObjectFile(path)

(Java and Scala)     Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().
Example 1: Save and load an object file on HDFS.
scala> val list = sc.parallelize(List("apple", "orange", "banan"))
list: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at :12

scala> list.saveAsObjectFile("/tmp/tmpobj")

scala> val newlist = sc.objectFile[String]("/tmp/tmpobj")
newlist: org.apache.spark.rdd.RDD[String] = FlatMappedRDD[11] at objectFile at :12

scala> newlist.collect
res4: Array[String] = Array(orange, banan, apple)

3.30 countByKey()   

Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
Example 1: Count the List of KV pairs.
scala> val kv = sc.parallelize( List(("apple", 1), ("orange", 2), ("banana", 3), ("apple", 2)) )
kv: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[15] at parallelize at :12

scala> kv.countByKey
res6: scala.collection.Map[String,Long] = Map(banana -> 1, apple -> 2, orange -> 1)

3.31 foreach(func)     

Run a function func on each element of the dataset. This is usually done for side effects such as updating an accumulator variable (see below) or interacting with external storage systems.
Example 1: Calculate the sum of 1 to 4.
scala> val accum = sc.accumulator(0)
accum: org.apache.spark.Accumulator[Int] = 0

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

scala> accum.value
res24: Int = 10
In all, here is an example to calculate the total length of the file.
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
  • Persist and Unpersist RDD

Persist can save the RDD in memory or disk in this application after the first time it is computed.
lineLengths.persist()
lineLengths.unpersist()
Example 1: persist() an object in different levels.
scala> import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel

scala> kv.persist(StorageLevel.MEMORY_ONLY)
res9: kv.type = ParallelCollectionRDD[0] at parallelize at :12

scala> kv.getStorageLevel
res10: org.apache.spark.storage.StorageLevel = StorageLevel(false, true, false, true, 1)

scala> kv.unpersist()
res13: kv.type = ParallelCollectionRDD[0] at parallelize at :12

scala> kv.getStorageLevel
res14: org.apache.spark.storage.StorageLevel = StorageLevel(false, false, false, false, 1)

scala> kv.persist(StorageLevel.DISK_ONLY)
res15: kv.type = ParallelCollectionRDD[0] at parallelize at :12

scala> kv.getStorageLevel
res16: org.apache.spark.storage.StorageLevel = StorageLevel(true, false, false, false, 1)

4.  Functions to Spark

  • Pass reference of a function

Example to add "hello" to each element in the RDD.
def sayhello(s: String): String = "Hello " + s
lines.map(sayhello)
Result:
scala> lines.collect
res31: Array[String] = Array(1,2,3, 4,5,6)
scala> lines.map(sayhello).collect
res32: Array[String] = Array(Hello 1,2,3, Hello 4,5,6)
  • Anonymous functions

So simple way to do above stuff is:
lines.map(x => "Hello " + x)

5. KeyValue pairs

  • reduceByKey

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
Sample data:
# cat data.txt
This is a test data text file for Spark to use.
To test Scala and Spark,
we need to repeat again and again.
apple
orange
banana
APPle
APPLE
ORANGE
Sample result:
scala> counts.collect
res41: Array[(String, Int)] = Array((orange,1), (APPLE,1), (ORANGE,1), (apple,1), ("This is a test data text file for Spark to use. ",1), (APPle,1), ("To test Scala and Spark, ",1), (banana,1), (we need to repeat again and again.,1))
  • sortByKey

scala>  counts.sortByKey().collect
res43: Array[(String, Int)] = Array((APPLE,1), (APPle,1), (ORANGE,1), ("This is a test data text file for Spark to use. ",1), ("To test Scala and Spark, ",1), (apple,1), (banana,1), (orange,1), (we need to repeat again and again.,1))

6. Shared Variables

  • Broadcast Variables

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. 
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res18: Array[Int] = Array(1, 2, 3)
  • Accumulators

Accumulators are variables that are only “added” to through an associative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums.
scala> val accum = sc.accumulator(0)
accum: org.apache.spark.Accumulator[Int] = 0

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

scala> accum.value
res24: Int = 10
==

Thursday, March 31, 2016

APACHE KAFKA

 




Define in zookeeper.properties
dataDir=D:/Hadoop/ECOSYSTEM/Kafka/zookeeper

Define in server.properties
log.dirs=D:/Hadoop/ECOSYSTEM/Kafka/logs

Start ZOOKEEPER
start zookeeper-server-start.bat config/zookeeper.properties

Start KAFKA
start kafka-server-start.bat config/server.properties

Create a TOPIC
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

List TOPIC
kafka-topics.bat --list --zookeeper localhost:2181

Start a Producer (Send Messages)
kafka-console-producer.bat --broker-list localhost:9092 --topic test
This is a message
This is another message1

Start a Consumer (receives Messages)
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message

Note:- If you have each of the above commands running in a different console
then you should now be able to type messages into the producer console and
see them appear in the consumer terminal.

Display Topic Information
kafka-topics.bat --describe --zookeeper localhost:2181 --topic test

Add Partitions to a Topic
kafka-topics.bat --alter --zookeeper localhost:2181 --topic test --partitions 3

WARNING: If partitions are increased for a topic that has a key, the partition logic or
ordering of the messages will be affected

Delete Topic
kafka-run-class.bat kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --topic test

Setting up a multi broker cluster
Create config/server2.properties
    broker.id=1
    port=9094
    log.dir=log.dirs=D:/Hadoop/ECOSYSTEM/Kafka/logs1
start kafka-server-start.bat config/server2.properties

Wednesday, March 30, 2016

Apache SQOOP


What is Sqoop in Hadoop?
Apache Sqoop is an effective hadoop tool used for importing/Exporting data from RDBMS’s like MySQL, Oracle, etc. into HBase, Hive or HDFS.

How Apache Sqoop works?

Once the input is recognized by Sqoop hadoop, the metadata for the table is read and a class definition is created for the input requirements. In reality, the dataset being transferred is split into partitions and map only jobs are launched for each partition with the mappers managing transferring the dataset assigned to it.

Challenges with data ingestion in Hadoop ?

  • parallel processing
  • data quality
  • machine data on a higher scale of several gigabytes per minute
  • multiple source ingestion
  • real-time ingestion and scalability
  • Structured and Unstructured data

Sqoop 1.0 Design:

image0.jpg

Sqoop provides many salient features like:

  • Full/Incremental Load
  • Parallel import/export
  • Import results of SQL query
  • Compression
  • Connectors for all major RDBMS Databases
  • Kerberos Security Integration
  • Load data directly into Hbase/Hive/HDFS file system
  • Support for Accumulo

Import process:



Export process:

Basic Commands and Syntax for Sqoop:
Note: place mysql-connector-java-5.1.18-bin in lib folder of Sqoop


List available databases/tables :
$ sqoop list-databases --connect jdbc:mysql://localhost/userdb --username root --password 123
$ sqoop list-tables --connect jdbc:mysql://localhost/userdb --username root --password 123

Import Data from MySql into HDFS :
$ sqoop import --connect jdbc:mysql://localhost/userdb --username root --password 123
--table emp --m 1 --target-dir /queryresult

Executing command using options-file :
$ sqoop list-tables --options-file SqoopImportOptions.txt

Sample options-file:
##############################
# Start of Options file for sqoop import
##############################
--connect
jdbc:mysql://localhost/userdb
--username
root
--password
123
##############################
# End of Options file for sqoop import
##############################

Check file created in HDFS :
$ hadoop fs -cat  /queryresult/part*

Import all rows, but specific columns of the table :
$ sqoop import --options-file SqoopImportOptions.txt --table  emp  --columns "empno,ename" --as-textfile -m 1 --target-dir /queryresult

Import all columns, filter rows using where clause :
$ sqoop --options-file SqoopImportOptions.txt --table emp  --where "empno > 7900"  --as-textfile  -m 1  --target-dir /user/sqoop-mysql/employeeGtTest

Import with a free form query with where clause :
$ sqoop --options-file SqoopImportOptions.txt --query 'select empno,ename,sal,deptno from emp where EMP_NO < 7900 AND $CONDITIONS' -m 1 --target-dir /user/sqoop-mysql/employeeFrfrmQry1

Controlling Parallelism :
Sqoop imports data in parallel from most database sources. You can specify the number of map tasks (parallel processes) to use to perform the import by using the -m or --num-mappers argument.
When performing parallel imports, Sqoop needs a criterion by which it can split the workload. Sqoop uses a splitting column to split the workload. By default, Sqoop will identify the primary key column (if present) in a table and use it as the splitting column. The low and high values for the splitting column are retrieved from the database, and the map tasks operate on evenly-sized components of the total range.If the actual values for the primary key are not uniformly distributed across its range, then this can result in unbalanced tasks. You should explicitly choose a different column with the --split-by argument. For example, --split-by employee_id.

Note: Sqoop cannot currently split on multi-column indices. If your table has no index column, or has a multi-column key, then you must also manually choose a splitting column.

Split by refer section on controlling parallelism :
$ sqoop --options-file SqoopImportOptions.txt --query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where $CONDITIONS' --split-by EMP_NO --direct --target-dir /user/sqoop-mysql/SplitByExampleImport

Boundary query
Again related to controlling parallelism..
--boundary-query “SELECT MIN(EMP_NO), MAX(EMP_NO) from employees”

Fetch size
This argument specifies to sqoop the number of entries to read from database at once.
--fetch-size=5

Compression
Use the --compress argument to enable compression; If you dont specify a compression codec (--compression-codec), the default gzip will be used.


The command:
$ sqoop --options-file SqoopImportOptions.txt \

--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where $CONDITIONS' \
-z \
--split-by EMP_NO \
--direct \
--target-dir /user/airawat/sqoop-mysql/CompressedSample

The output:
$ hadoop fs -ls -R sqoop-mysql/CompressedSample | grep part*

Import all tables
$ sqoop --options-file SqoopImportAllTablesOptions.txt --direct --warehouse-dir sqoop-mysql/EmployeeDatabase

Import formats
With mysql, text file is the only format supported;  Avro and Sequence file formatted imports are feasible through other RDBMS

Sqoop code-gen
Generate jar and class file for employee table

$ sqoop codegen --connect jdbc:mysql://cdh-dev01/employees \
--username myUID \
--password myPWD \
--table employees \
--outdir /user/airawat/sqoop-mysql/jars

Files created:
$ ls /tmp/sqoop-airawat/compile/879394521045bc924ad9321fe46374bc/
employees.class  employees.jar  employees.java

Copy files to your home directory:
cp /tmp/sqoop-airawat/compile/879394521045bc924ad9321fe46374bc/* .

Friday, March 25, 2016

Apache FLUME

 

What is Flume?
Apache Flume is a tool/service/data ingestion mechanism for collecting aggregating and transporting large amounts of streaming data such as log files, events (etc...) from various sources to a centralized data store
Advantages of Flume
Here are the advantages of using Flume −
  • Using Apache Flume we can store the data in to any of the centralized stores (HBase, HDFS).
  • When the rate of incoming data exceeds the rate at which data can be written to the destination, Flume acts as a mediator between data producers and the centralized stores and provides a steady flow of data between them.
  • Flume provides the feature of contextual routing.
  • The transactions in Flume are channel-based where two transactions (one sender and one receiver) are maintained for each message. It guarantees reliable message delivery.
  • Flume is reliable, fault tolerant, scalable, manageable, and customizable.


Apache Flume - Architecture




  • Flume Event
    An event is the basic unit of the data transported inside Flume. It contains a payload of byte array that is to be transported from the source to the destination accompanied by optional headers.
    A typical Flume event would have the following structure −
    Flume Event

    • Flume Agent
      An agent is an independent daemon process (JVM) in Flume. It receives the data (events) from clients or other agents and forwards it to its next destination (sink or agent). Flume may have more than one agent. Following diagram represents a Flume Agent
    Flume Agent
    A  Flume Agent contains three main components namely, source, channel, & sink.
    1. Source
      A source is the component of an agent which receives data from the data generators and transfers it to one or more channels in the form of Flume events.
      ExampleAvro source, Thrift source, Twitter 1% source etc.
    2. Channel
      A channel is a transient store which receives the events from the source and buffers them till they are consumed by sinks. It acts as a bridge between the sources and the sinks.
      Example
      JDBC channel, File system channel, Memory channel, etc.
    3. Sink
      A sink stores the data into centralized stores like HBase and HDFS. It consumes the data (events) from the channels and delivers it to the destination.
      Example
      HDFS sink
      Note
      :- A flume agent can have multiple sources, sinks and channels.

      Additional Components of Flume Agent

      A few more components that play a vital role in transferring the events from the data generator to the centralized stores.
    • Interceptors
      Interceptors are used to alter/inspect flume events which are transferred between source and channel.
    • Channel Selectors
      These are used to determine which channel is to be opted to transfer the data in case of multiple channels. There are two types of channel selectors −
      Default channel selectors
      − These are also known as replicating channel selectors they replicates all the events in each channel.
      Multiplexing channel selectors
      − These decides the channel to send an event based on the address in the header of that event.
    • Sink Processors
      These are used to invoke a particular sink from the selected group of sinks. These are used to create fail over paths for your sinks or load balance events across multiple sinks from a channel.

    Multi-hop Flow

    • Within Flume, there can be multiple agents and before reaching the final destination, an event may travel through more than one agent. This is known as multi-hop flow. 

     

    Fan-out Flow



    The data flow from one source to multiple channels is known as fan-out flow. It is of two types −
    • Replicating − The data flow where the data will be replicated in all the configured channels.
    • Multiplexing − The data flow where the data will be sent to a selected channel which is mentioned in the header of the event.


    Fan-in Flow

    • The data flow in which the data will be transferred from many sources to one channel is known as fan-in flow

    Flume example using netcat(source) and logger(sink):

    # START example.conf file : A single-node Flume configuration
    # Name the components on this AGENT
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    # Configure the SOURCE
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    # Use a CHANNEL which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    # Configure the SINK
    a1.sinks.k1.type = logger
    # Bind the SOURCE and SINK to the CHANNEL
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    # END example.conf file

    ########## RUNNING FLUME AGENT ##########
    # flume-ng agent --conf conf --conf-file example.conf --name a1
    ######## RUNNING DATA GENERATOR #########
    # $ telnet localhost 44444
    # Hello World!

    Sunday, March 20, 2016

    Sqoop Vs Flume


      • Apache Sqoop and Apache Flume work with various kinds of data sources.
        Apache Flume functions well in streaming data sources which are generated continuously in hadoop environment such as log files from multiple servers whereas
        Apache Sqoop is designed to work well with any kind of relational database system that has JDBC connectivity. Sqoop can also import data from NoSQL databases like MongoDB or Cassandra and also allows direct data transfer or Hive or HDFS. For transferring data to Hive using Apache Sqoop tool, a table has to be created for which the schema is taken from the database itself.
      • In Apache Flume data loading is event driven whereas in
        Apache Sqoop data load is not driven by events.
      • Apache  Flume is a better choice when moving bulk streaming data from various sources like JMS or Spooling directory whereas
        Apache Sqoop is an ideal fit if the data is sitting in databases like Teradata, Oracle, MySQL Server, Postgres or any other JDBC compatible database then it is best to use Apache Sqoop.
        In Apache Flume, data flows to HDFS through multiple channels whereas in
        Apache Sqoop HDFS is the destination for importing data.
      • Apache Flume agents are designed to fetch streaming data like tweets from Twitter or log file from the web server whereas
        Apache Sqoop connectors are designed to work only with structured data sources and fetch data from them.
      • Apache Flume has agent based architecture i.e. the code written in flume is known as agent which is responsible for fetching data whereas in
        Apache Sqoop the architecture is based on connectors. The connectors in Sqoop know how to connect with the various data sources and fetch data accordingly.
      • Apache Sqoop is mainly used for parallel data transfers, for data imports as it copies data quickly whereas
        Apache Flume is used for collecting and aggregating data because of its distributed, reliable nature and highly available backup routes.

      Thursday, March 17, 2016

      APACHE HIVE

      What is Hive ?

      • Hive is SQL for Hadoop cluster.
      • It is an open source data warehouse system on top of HDFS that adds structure to the data.
      • It provides SQL like interface which is known as "Hive Query Language (HQL)".
      • We write the query in HQL which translate into Map-Reduce code and run the same on cluster.



      The main components of Hive are:
      • Metastore: It stores all the metadata of Hive. It stores data of data stored in database, tables, columns, etc..
      • Driver: It includes compiler, optimizer and executor used to break down the Hive query language statements.
      • Query compiler: It compiles HiveQL into DAG graph of map reduce tasks.
      • Execution engine: It executes the tasks produces by compiler.
      • Thrift server: It provides an interface to connect to other applications like MySQL, Oracle, Excel, etc. through JDBC/ODBC drivers.
      • Command line interface: It is also called Hive shell. It is used for working with data either interactively or batch data processing.
      • Web Interface: It is a visual structure on Hive used for interaction with data 
      • SerDe : Serializer, Deserializer gives instructions to hive on how to process a record.

      Data Storage in Hive:
      Hive has different forms of storage options and they include:
      • Metastore: Metastore keeps track of all the metadata of database, tables, columns, datatypes etc. in Hive. It also keeps track of HDFS mapping. The default Metastore is DerBy Database.
      • Tables: There can be 2 types of tables in Hive.
        First, normal tables (managed/internal tables) like any other table in database.
        Second, external tables (un-managed tables)  which are like normal tables except for the deletion part. HDFS mappings are used to create external tables which are pointers to table in HDFS.
        The difference between the two types of tables is that when the external table is deleted its data is not deleted. Its data is stored in the HDFS whereas in case of normal table the data also gets deleted on deleting the table.
      • Partitions: Partition is slicing of tables that are stored in different subdirectory within a table’s directory. It enhances query performance especially in case of select statements with “WHERE” clause.
      • Buckets: Buckets are hashed partitions and they speed up joins and sampling of data.
      Hive vs. RDBMS (Relational database)
      Hive and RDBMS are very similar but they have different applications and different schemas that they are based on.
      • RDBMS are built for OLTP (Online transaction processing) that is real time reads and writes in database. They also perform little part of OLAP. (online analytical processing).
      • Hive is built for OLAP that is real time reporting of data. Hive does not support inserting into an existing table or updating table data like RDBMS which is an important part of OLTP process.
        All data is either inserted in new table or overwritten in existing table.
      • RDBMS is based on write schema that means when data is entered in the table it is checked against the schema of table to ensure that it meets the requirements. Thus loading data in RDBMS is slower but reading is very fast.
      • Hive is based on read schema that means data is not checked when it is loaded so data loading is fast but reading is slower.

      Hive Query Language (HQL)
      HQL is very similar to traditional database. It stores data in tables, where each table consists of columns
      1. Data Definition statements (DDL) like create table, alter table, drop table are supported.
        All these DDL statements can be used on Database, tables, partitions, views, functions, Index, etc.
      2. Data Manipulation statements (DML) like load, insert, select and explain are supported.
        Load is used for taking data from HDFS and moving it into Hive.
        Insert is used for moving data from one Hive table to another.
        Select is used for querying data. Explain gives insights into structure of data.

      Hive Commands :

      Data Definition Language (DDL) :
      Example : CREATE, DROP, TRUNCATE, ALTER, SHOW, DESCRIBE Statements.
      Go to Hive shell by giving the command sudo hive and
      Enter the command ’create database to create the new database in the Hive.
      Create Hive database using Hive Commands
      To list out the databases in Hive warehouse, enter the command ‘show databases’.
      List Hive database using Hive Commands
      The database creates in a default location of the Hive warehouse.
      In Cloudera, Hive database store in a /user/hive/warehouse.
      The command to use the database is USE
      Hive command to use the database
      Copy the input data to HDFS from local by using the copy From Local command.

      Data Manipulation Language (DML) : Retrieving Information

      Function MySQL Hive
      Retrieving Information (General) SELECT from_columns FROM table WHERE conditions; SELECT from_columns FROM table WHERE conditions;
      Retrieving All Values SELECT * FROM table; SELECT * FROM table;
      Retrieving Some Values SELECT * FROM table WHERE rec_name = "value"; SELECT * FROM table WHERE rec_name = "value";
      Retrieving With Multiple Criteria SELECT * FROM TABLE WHERE rec1 = "value1" AND rec2 = "value2"; SELECT * FROM TABLE WHERE rec1 = "value1" AND rec2 = "value2";
      Retrieving Specific Columns SELECT column_name FROM table; SELECT column_name FROM table;
      Retrieving Unique Output SELECT DISTINCT column_name FROM table; SELECT DISTINCT column_name FROM table;
      Sorting SELECT col1, col2 FROM table ORDER BY col2; SELECT col1, col2 FROM table ORDER BY col2;
      Sorting Reverse SELECT col1, col2 FROM table ORDER BY col2 DESC; SELECT col1, col2 FROM table ORDER BY col2 DESC;
      Counting Rows SELECT COUNT(*) FROM table; SELECT COUNT(*) FROM table;
      Grouping With Counting SELECT owner, COUNT(*) FROM table GROUP BY owner; SELECT owner, COUNT(*) FROM table GROUP BY owner;
      Maximum Value SELECT MAX(col_name) AS label FROM table; SELECT MAX(col_name) AS label FROM table;
      Selecting from multiple tables (Join same table using alias w/”AS”) SELECT pet.name, comment FROM pet, event WHERE pet.name = event.name; SELECT pet.name, comment FROM pet JOIN event ON (pet.name = event.name)

       

      Using Metadata :

      Function MySQL Hive
      Selecting a database USE database; USE database;
      Listing databases SHOW DATABASES; SHOW DATABASES;
      Listing tables in a database SHOW TABLES; SHOW TABLES;
      Describing the format of a table DESCRIBE table; DESCRIBE (FORMATTED|EXTENDED) table;
      Creating a database CREATE DATABASE db_name; CREATE DATABASE db_name;
      Dropping a database DROP DATABASE db_name; DROP DATABASE db_name (CASCADE);

      Current SQL Compatibility


      Hive Command Line :

      Function Hive
      Run Query hive -e 'select a.col from tab1 a'
      Run Query Silent Mode hive -S -e 'select a.col from tab1 a'
      Set Hive Config Variables hive -e 'select a.col from tab1 a' -hiveconf hive.root.logger=DEBUG,console
      Use Initialization Script hive -i initialize.sql
      Run Non-Interactive Script hive -f script.sql

      The .hiverc file :
      What is .hiverc file?
      It is a file that is executed when you launch the hive shell - making it an ideal place for adding any hive configuration/customization you want set, on start of the hive shell. This could be:
      - Setting column headers to be visible in query results
      - Making the current database name part of the hive prompt
      - Adding any jars or files
      - Registering UDFs

      .hiverc file location
      The file is loaded from the hive conf directory.
      If the file does not exist, you can create it.
      It needs to be deployed to every node from where you might launch the Hive shell.

      Sample .hiverc
      add jar /home/airawat/hadoop-lib/hive-contrib-0.10.0-cdh4.2.0.jar;
      set hive.exec.mode.local.auto=true;
      set hive.cli.print.header=true;
      set hive.cli.print.current.db=true;
      set hive.auto.convert.join=true;
      set hive.mapjoin.smalltable.filesize=30000000;

      Sunday, March 13, 2016

      Apache PIG

      APACHE PIG


      • Apache Pig is a tool used to analyze large amounts of data by represeting them as data flows. 
      • Using the PigLatin scripting language operations like ETL (Extract, Transform and Load), adhoc data anlaysis and iterative processing can be easily achieved.
      • Pig is an abstraction over MapReduce. In other words, all Pig scripts internally are converted into Map and Reduce tasks to get the task done.

       

      Dataset : 

      The dataset is a simple text (movies_data.csv) file lists movie names and its details like
      release year, rating and runtime.
      To download : click here

      A sample of the dataset is as follows: 
      1,The Nightmare Before Christmas,1993,3.9,4568 
      2,The Mummy,1932,3.5,4388 
      3,Orphans of the Storm,1921,3.2,9062 
      4,The Object of Beauty,1991,2.8,6150 
      5,Night Tide,1963,2.8,5126 
      6,One Magic Christmas,1985,3.8,5333 
      7,Muriel's Wedding,1994,3.5,6323 
      8,Mother's Boys,1994,3.4,5733 
      9,Nosferatu: Original Version,1929,3.5,5651 
      10,Nick of Time,1995,3.4,5333
       
      Pig can be started in one of the following two modes:
      1. Local Mode  (In local mode, pig can access files on the local file system. )
      2. Cluster Mode (In cluster mode, pig can access files on HDFS.)
      Restart your terminal and execute the pig command as follows:
      To start in Local Mode:
      $ pig -x local
      To start in Cluster Mode:
      $ pig
      This command presents you with a grunt shell. The grunt shell allows you
      to execute PigLatin statements to quickly test out data flows on your 
      data step by step without having to execute complete scripts.
      Pig Latin Program :

      To LOAD the data :
      grunt> movies = LOAD 'movies_data.csv' USING PigStorage(',') as  id,name,year,rating,duration);
      Note: When this statement is executed, no MapReduce task is executed.
      grunt> DUMP movies;
      - It is only after the DUMP statement that a MapReduce job is initiated.
      - The DUMP command is only used to display information onto the standard output.

      List the movies that having a rating greater than 4 :
      grunt> movies_greater_than_four = FILTER movies BY (float)rating>4.0;
      grunt> DUMP movies_greater_than_four;

      To STORE the data to a file :
      grunt>store movies_greater_than_four into '/user/hduser/movies_greater_than_four';

      To include the data type of the columns :
      grunt> movies = LOAD 'movies_data.csv' USING PigStorage(',') as 
      (id:int,name:chararray,year:int,rating:double,duration:int);


      FILTER command :
      grunt> movies_greater_than_four = FILTER movies BY rating>4.0;

      List the movies that were released between 1950 and 1960 :
      grunt> movies_between_50_60 = FILTER movies by year>1950 and year<1960; 

      List the movies that start with the Alpahbet A :
      grunt> movies_starting_with_A = FILTER movies by name matches 'A.*';

      List the movies that have duration greater that 2 hours :
      grunt> movies_duration_2_hrs = FILTER movies by duration > 7200;

      List the movies that have rating between 3 and 4 :
      grunt> movies_rating_3_4 = FILTER movies BY rating>3.0 and rating<4.0;

      DESCRIBE Command :
      The schema of a relation/alias can be viewed using the DESCRIBE command:
      grunt> DESCRIBE movies;
      movies: {id: int,name: chararray,year: int,rating: double,duration: int}

      ILLUSTRATE Command : 
      To view the step-by-step execution of a sequence of statements you can use the ILLUSTRATE command:
      grunt> ILLUSTRATE movies_duration_2_hrs;

      Note: DESCRIBE & ILLUSTRATE are really useful for debugging.

      FOREACH : FOREACH gives a simple way to apply transformations based on columns.
      List the movie names its duration in minutes :
      grunt> movie_duration = FOREACH movies GENERATE name, (double)(duration/60);
      The above statement generates a new alias that has the list of movies and it duration in minutes.
      You can check the results using the DUMP command.

      GROUP : The GROUP keyword is used to group fields in a relation.
      List the years and the number of movies released each year.
      grunt> grouped_by_year = group movies by year; grunt> count_by_year = FOREACH grouped_by_year GENERATE group, COUNT(movies);

      Total number of movies in the dataset is 49590.
      To check  see if our GROUP operation is correct by verify the total of the COUNT field.

      grunt> group_all = GROUP count_by_year ALL;
      grunt> sum_all = FOREACH group_all GENERATE SUM(count_by_year.$1);
      grunt> DUMP sum_all;

      From the above three statements, the first statement, GROUP ALL, groups all the tuples to one group. This is very useful when we need to perform aggregation operations on the entire set.

      The next statement, performs a FOREACH on the grouped relation group_all and applies the SUM function to the field in position 1 (positions start from 0).
      Here field in position 1, are the counts of movies for each year.
      (49590)The above value matches to our know fact that the dataset has 49590 movies.
      So we can conclude that our GROUP operation worked successfully.

      ORDER BY : Let us question the data to illustrate the ORDER BY operation.
      List all the movies in the ascending order of year.
      grunt> desc_movies_by_year = ORDER movies BY year ASC;
      grunt> DUMP desc_movies_by_year; 

      List all the movies in the descending order of year :
      grunt> asc_movies_by_year = ORDER movies by year DESC;
      grunt> DUMP asc_movies_by_year; 

      DISTINCT : The DISTINCT statement is used to remove duplicated records.
      It works only on entire records, not on individual fields.
      grunt> movies_with_dups = LOAD 'movies_with_duplicates.csv' USING PigStorage(',') as (id:int,name:chararray,year:int,rating:double,duration:int);
      grunt> DUMP movies_with_dups;

      You see that there are are duplicates in this data set.

      List the distinct records present movies_with_dups :
      grunt> no_dups = DISTINCT movies_with_dups;
      grunt> DUMP no_dups;

      LIMIT : Use the LIMIT keyword to get only a limited number for results from relation.

      grunt> top_10_movies = LIMIT movies 10; 
      grunt> DUMP top_10_movies;

      SAMPLE : Use the sample keyword to get sample set from your data.

      grunt> sample_10_percent = sample movies 0.1;
      grunt> dump sample_10_percent;

      Here, 0.1 = 10%

      As we already know that the file has 49590 records.
      We can check to see the count of records in the relation.

      grunt> sample_group_all = GROUP sample_10_percent ALL;
      grunt> sample_count = FOREACH sample_group_all GENERATE COUNT(sample_10_percent.$0);
      grunt> dump sample_count;
      The output is (4937) which is approximately 10% for 49590. 

      Complex Types :
      Pig supports three different complex types to handle data. 
      Tuples : A tuple is just like a row in a table.
      (49539,'The Magic Crystal',2013,3.7,4561)
      The above tuple has five fields. A tuple is surrounded by brackets. 
      Bags : A bag is an unordered collection of tuples.
      { (49382, 'Final Offer'), (49385, 'Delete') }
      The above bag is has two tuples. Each tuple has two fields, Id and movie name. 
      Maps : A map is a store. The key and value are joined together using #.
      ['name'#'The Magic Crystal', 'year'#2013]