Exception in thread “main” java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages

reference:   https://stackoverflow.com/questions/46001583/why-does-spark-submit-fail-to-find-kafka-data-source-unless-packages-is-used 实验成功: spark-submit.cmd –packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 f:\structuredstreaming-test.jar   Spark Structured Streaming supports Apache Kafka as the streaming source and sink using the external kafka-0-10-sql module. kafka-0-10-sql module is not available to Spark applications that are submitted for execution using spark-submit. The module is external and to have it available you should define it as a dependency. Unless you use kafka-0-10-sql module-specific…

spark shuffle improvement 1.hash shuffle 2.consolidation 3.sorted shuffle

hash shuffle: 临时文件个数:M*R   (map端个数partition,reduce端个数partition) consolidation:临时文件个数:C*R  (cpu个数,reduce端个数partition) sort-based shuffle:临时文件个数:2*M   (map端个数partition) http://blog.csdn.net/johnny_lee/article/details/22619585 sort-based shuffle:http://blog.csdn.net/duan_zhihua/article/details/71190682 Sorted-Based Shuffle 的核心是借助于 ExternalSorter 把每个 ShuffleMapTask 的输出,排序到一个文件中 (FileSegmentGroup),为了区分下一个阶段 Reducer Task 不同的内容,它还需要有一个索引文件 (Index) 来告诉下游 Stage 的并行任务,那一部份是属于你的。 再次强调 Sort-Based Shuffle 最大的意义是减少临时文件的输出数量,且只会产生两个文件:一个是包含不同内容划分成不同 FileSegment 构成的单一文件 File,另外一个是索引文件 Index。 一件很重要的事情:在Sorted-Shuffle中会排序吗?从测试的结果来看,结果一般不排序。(例如我们可以在spark2.0中做一个wordcount测试,结果是不排序的) Sort-Based Shuffle  Mapper端的 Sort and Spill 的过程 (ApependOnlyMap时不进行排序,Spill 到磁盘的时候再进行排序的)

build spark from source and make debug environment

reference: https://www.zhihu.com/question/24869894   (知乎第一个回答) http://spark.apache.org/docs/latest/building-spark.html(直接总的build就可以了) http://www.jianshu.com/p/491d019eb9b6 (debug环境搭建) 运行环境 java 1.8 scala 2.11.0 maven 3.3.9 idea 2016 spark 2.0.2 1完成以下配置 java环境变量 scala环境变量 maven setting配置文件jar包存放路径 idea下载scala plugins语言插件 idea配置maven setting及jar包存放路径 spark git :https://github.com/apache/spark.git 2编译spark源码 进入目录$spark_home 配置maven内存大小,或者在maven配置文件中配置($maven_home/bin/mvn) exportMAVEN_OPTS=”-Xmx2g -XX:ReservedCodeCacheSize=512m” 打包spark源码 ./build/mvn-Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package 3 debug调试配置 1)加载所有jars idea =>file => project structure=> libraries => add java =>$spark_home/assembly/target/scala-2.11/jars =>all project…

sbt package assembly spark program

1)new file “assembly.sbt” in the “/project” directory . and insert the following line: addSbtPlugin(“com.eed3si9n” % “sbt-assembly” % “0.11.2”) 2)new file”assembly.sbt” under the root “/”directory. and insert the content follows: import AssemblyKeys._ assemblySettings jarName in assembly := “dataProcess20171124.jar” test in assembly := {} mainClass in assembly := Some( “cn.adatafun.dataprocess.accumulate.BannerAcc”) assemblyOption in packageDependency ~= { _.copy(appendContentHash =…

GSON with Scala – Unable to invoke no-args constructor for scala.collection.immutable.Map

   Here is a sample Scala object in which I want to deserialize a JSON String to a Map of String -> String. I use GSON 2.2.2 and Scala 2.10. import com.google.gson.Gson import com.google.gson.reflect.TypeToken object MyTest { def main(args: Array[String]) { val gson = new Gson val jsonString = “{\”test1\”: \”value-test1\”,\”test2\”:\”value-test2\”}” val mapType = new TypeToken[Map[String,…

spark flatmap is different between dataset/dataframe and rdd

Java Edition: JavaPairRDD<Tuple2<String, String>, Integer> pairRDD = rowRDD.flatMapToPair(new PairFlatMapFunction<Row, Tuple2<String, String>, Integer>() { public Iterator<Tuple2<Tuple2<String, String>, Integer>> call(Row row) throws Exception { ArrayList<Tuple2<Tuple2<String, String>, Integer>> iterList = new ArrayList<Tuple2<Tuple2<String, String>, Integer>>(); String userId = String.valueOf(row.getAs(0)); String codeStr = String.valueOf(row.getAs(1)); String[] codeList = codeStr.split(“,”); for(String code : codeList){ Tuple2<String, String> tpl2 = new Tuple2<String, String>(userId, code);…

Unable to find encoder for type stored in a Dataset. spark

原文:http://mangocool.com/1477619031890.html Error:(36, 11) Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.       .map(parseRating) Error:(36, 11) not enough arguments for method map: (implicit evidence$7: org.apache.spark.sql.Encoder[com.dtxy.xbdp.test.ALSTest.Rating])org.apache.spark.sql.Dataset[com.dtxy.xbdp.test.ALSTest.Rating]. Unspecified value parameter evidence$7….

sparksql read data from elasticsearch and then represent

import org.apache.spark.{SparkConf, SparkContext} import org.elasticsearch.spark._ /** * Created by yanggf on 2017/11/9. */ object Elastic { def main(args: Array[String]): Unit ={ val conf = new SparkConf().setMaster(“local”).setAppName(“elastic”) .set(“es.nodes”, “127.0.0.1”) .set(“es.port”, “9200”) .set(“es.net.http.auth.user”, “elastic”) .set(“es.net.http.auth.pass”, “changeme”) .set(“es.mapping.date.rich”, “false”) val sc = new SparkContext(conf) val query = “””{“query”:{“match_all”:{}}}””” val esRdd = sc.esRDD(“user/flight”, query) import org.apache.spark.sql.SparkSession val sparkSession =…

spark elasticsearch scala idea sbt integration problem solved

build.sbt内容: name := “search” version := “1.0” scalaVersion := “2.11.1” libraryDependencies ++= Seq( “org.apache.spark” % “spark-core_2.11” % “2.1.0” ) elasticsearch官网下载最新elasticsearch-hadoop包找到版本对应的jar包 elasticsearch-spark-20_2.11-5.6.4.jar import进来,目前来看5.6.4兼容5.5.0elasticsearch spark读取es代码: import org.apache.spark.{SparkConf, SparkContext} import org.elasticsearch.spark._ /** * Created by yanggf on 2017/11/9. */ object Elastic { def main(args: Array[String]): Unit ={ // def file = “f:/scala.txt” // val conf = new SparkConf().setMaster(“local”).setAppName(“elastic”)…

Spark Unable to find JDBC Driver,No suitable driver

java.sql.SQLException: No suitable driver found for jdbc in your config maps put “driver” key  “com.mysql.jdbc.Driver Map<String, String> options = new HashMap<String, String>(); options.put(“url”, “jdbc:mysql://localhost:3306/video_rcmd?user=root&password=123456”); options.put(“dbtable”, “video”); options.put(“driver”, “com.mysql.jdbc.Driver”); //here