一直觉得Spark的算子调用超级厉害,弄不懂怎么样让传入map的函数作用在一个个值之上的,比如Spark的算子调用模式:
rdd.map(word=>(word,1)).reduceByKey((a,b)=>a+b).foreach(println)
两年来,我一直搞不明白,为何xxx.map(word=>(word,1))
就可以把xxx里面的内容全部都变成map返回。可能是因为做惯了面向对象java,对这种面相函数的编程有点晕。现在想想,真的是太简单的,不就是:word=>(word,1) 这个匿名函数处理xxx对象中的属性嘛!
这里用Scala模仿SparkContext 的map,filter 算子调用写一个例子,让我们体会下Scala面向函数编程的快感:
import scala.collection.mutable.ListBuffer
/**
* 定义SparkContext
*/
class SparkContext {
//传入一个k可变的List,创建RDD
def creatRDD(list:ListBuffer[Int]):RDD={
return new RDD(list)
}
}
/**
* 定义RDD
* @param list
*/
class RDD(list:ListBuffer[Int]){
/**
* 定义map算子
* @param f map算子传入的处理函数
* @return
*/
def map(f:Int=>Int):RDD={
var li = ListBuffer[Int]();
for(i<-list){
val j =f(i)
li+=j;
}
return new RDD(li);
}
/**
* 定义过来filter算子
* @param f
* @return
*/
def filter(f:Int=>Boolean):RDD={
var li = ListBuffer[Int]();
for(i<-list){
val j =f(i)
if(j){
li+=i;
}
}
return new RDD(li);
}
/**
* 遍历方法
*/
def show():Unit={
for(l<-list){
println(l)
}
}
}
object SparkContext{
def apply(): SparkContext = new SparkContext()
def main(args: Array[String]): Unit = {
var sc = SparkContext()
val list = ListBuffer(1,2,3,4,5)
val rdd = sc.creatRDD(list)
println("--------------初始值------------")
rdd.show();
println("--------------每个元素+1------------")
val rdd2 = rdd.map((x:Int)=>x+1);
rdd2.show()
println("---------------每个元素+9-----------")
val rdd3 = rdd.map((x:Int)=>x+9);
rdd3.show()
println("----------------取出大于等于4的元素----------")
val rdd4 = rdd.filter((x:Int)=>x>=4)
rdd4.show()
}
}
当然,可能用的参数不太正确,我用的是可变的List,不过也成功实现了map和filter的调用模式。
运行结果如下
--------------初始值------------
1
2
3
4
5
--------------每个元素+1------------
2
3
4
5
6
---------------每个元素+9-----------
10
11
12
13
14
----------------取出大于等于4的元素----------
4
5
Process finished with exit code 0