admin:Breeze
1.采用Spark编写WordCount程序,输入为单个或者多个HDFS文本文件,输出为文本文件中的字数。
创建WordCount.scala文件,并编写代码。对应源代码为:
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]) {
//创建SparkConf
val conf: SparkConf = new SparkConf()
//创建SparkContext
val sc: SparkContext = new SparkContext(conf)
//从文件读取数据
val lines: RDD[String] = sc.textFile(args(0))
//按空格切分单词
val words: RDD[String] = lines.flatMap(_.split(" "))
//单词计数,每个单词每出现一次就计数为1
val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
//聚合,统计每个单词总共出现的次数
val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
//排序,根据单词出现的次数排序
val fianlResult: RDD[(String, Int)] = result.sortBy(_._2, false)
//将统计结果保存到文件
fianlResult.saveAsTextFile(args(1))
//释放资源
sc.stop()
}
}
2.采用Spark编写PageRank程序,需要实现:
1)如果一个网页被很多其他网页链接到的话,说明这个网页比较重要,也就是PageRank值会相对较高;
2)如果一个PageRank值很高的网页链接到一个其他的网页,那么被链接到的网页的PageRank值会相应地因此而提高。
创建对应PageRank.scala文件,并编写代码。对于源代码为:
import org.apache.spark.{SparkConf, SparkContext}
object PageRank {
def main(args: Array[String]): Unit ={
val conf = new SparkConf().setAppName("pagerank").setMaster("local")
val sc = new SparkContext(conf)
val links = sc.parallelize(List(
("A", List("B", "C")),
("B", List("A", "C")),
("C", List("A", "B", "C")),
("D", List("C"))
)).persist()
var ranks = links.mapValues(v => 1.0)
for (i <- 0 until 10)
{
val contributions = links.join(ranks).flatMap
{
case (pageId, (tolinks, rank)) =>
tolinks.map(tolink => (tolink, rank / tolinks.size))
}
ranks = contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85 * v)
}
ranks.saveAsTextFile("ranks")
}
}
发表回复
要发表评论,您必须先登录。