4 Star 17 Fork 5

dff / spark-template

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
README.md 3.06 KB
一键复制 编辑 原始数据 按行查看 历史
dff 提交于 2022-03-20 15:19 . commit

spark-template

工作中基于spark开发的代码模板。

使用spark-session

val sparkConf = SparkConfBuilder().setAppName(name).setKryoSerializer().get()
val sparkSession = SparkSessionBuilder().setConf(sparkConf).get()

使用spark-streaming

  • 屏蔽获取StreamingContext和管理偏移量细节
  • 用户只需编写相应的的业务代码
val sparkConf = SparkConfBuilder()
      .setAppName(name)
      .setKryoSerializer()
      .setSqlShufflePartition(1000)
      .setStreamingStopGracefullyOnShutdown(true)
      .setStreamingBackpressure(true)
      .setStreamingKafkaMaxRatePerPartition(10000)
      .get()

val kafkaParam: Map[String, Object] = Map[String,Object](
      "bootstrap.servers" -> "",
      "key.deserializer" -> "",
      "value.deserializer" -> "",
      "group.id"-> "",
      "enable.auto.commit" -> (false:java.lang.Boolean)
    )

SparkStreamingBuilder()
      .setSparkConf(sparkConf)
      .setKafkaParam(kafkaParam)
      .setTopics(Array("test"))
      .setDuration(Seconds(5))
      .execute{ input =>
        input.map(_.value()).foreachRDD{ rdd =>
          if(!rdd.isEmpty()) {
            rdd.foreachPartition{ partition=>
              partition.foreach(println(_))
            }
          }
        }
      }

工具类封装

  • 配置文件读取 - com.typesafe.conf
def confPath: String = {
  val path = getClass.getProtectionDomain.getCodeSource.getLocation.getPath
  var url = URLDecoder.decode(path, "utf-8")
  if (url.endsWith(".jar")) {
    url = url.substring(0, url.lastIndexOf("/") + 1)
  }
  val file = new File(url)
  val parent = file.getParent
  parent + "/conf"
}

def load(name: String): Config = {
  val conf = new File(confPath + "/" + name)
  ConfigFactory.parseFile(conf)
}
  • jdbc连接池 - scalikejdbc
def init(
          name: String, url: String,
          username: String, password: String, settings: ConnectionPoolSettings
        ): Unit = {
  if(!ConnectionPool.isInitialized(Symbol(name))) {
    ConnectionPool.add(Symbol(name), url, username, password, settings)
    info(s"successful initialization the database connection pool: $name")
  }
}

def init(name: String, dataSource: DataSource): Unit = {
  if(!ConnectionPool.isInitialized(Symbol(name))) {
    ConnectionPool.add(Symbol(name), new DataSourceConnectionPool(dataSource))
    info(s"successful initialization the database connection pool: $name")
  }
}

def getConnection(
                   driverClassName: String, url: String,
                   username: String, password: String
                 ): Connection = {
  Class.forName(driverClassName)
  DriverManager.getConnection(url, username, password)
}

def using[A](conn: Connection)(execute: DB => A): A = {
  val db = DB(conn)
  db.autoClose(false)
  using(db)(execute)
}

def using[A](name: String)(execute: DB => A): A = {
  val db = DB(ConnectionPool(Symbol(name)).borrow())
  db.autoClose(false)
  using(db)(execute)
}
  • 更多详情见代码
Scala
1
https://gitee.com/dufafei/spark-template.git
git@gitee.com:dufafei/spark-template.git
dufafei
spark-template
spark-template
master

搜索帮助