代码拉取完成,页面将自动刷新
工作中基于spark开发的代码模板。
val sparkConf = SparkConfBuilder().setAppName(name).setKryoSerializer().get()
val sparkSession = SparkSessionBuilder().setConf(sparkConf).get()
val sparkConf = SparkConfBuilder()
.setAppName(name)
.setKryoSerializer()
.setSqlShufflePartition(1000)
.setStreamingStopGracefullyOnShutdown(true)
.setStreamingBackpressure(true)
.setStreamingKafkaMaxRatePerPartition(10000)
.get()
val kafkaParam: Map[String, Object] = Map[String,Object](
"bootstrap.servers" -> "",
"key.deserializer" -> "",
"value.deserializer" -> "",
"group.id"-> "",
"enable.auto.commit" -> (false:java.lang.Boolean)
)
SparkStreamingBuilder()
.setSparkConf(sparkConf)
.setKafkaParam(kafkaParam)
.setTopics(Array("test"))
.setDuration(Seconds(5))
.execute{ input =>
input.map(_.value()).foreachRDD{ rdd =>
if(!rdd.isEmpty()) {
rdd.foreachPartition{ partition=>
partition.foreach(println(_))
}
}
}
}
def confPath: String = {
val path = getClass.getProtectionDomain.getCodeSource.getLocation.getPath
var url = URLDecoder.decode(path, "utf-8")
if (url.endsWith(".jar")) {
url = url.substring(0, url.lastIndexOf("/") + 1)
}
val file = new File(url)
val parent = file.getParent
parent + "/conf"
}
def load(name: String): Config = {
val conf = new File(confPath + "/" + name)
ConfigFactory.parseFile(conf)
}
def init(
name: String, url: String,
username: String, password: String, settings: ConnectionPoolSettings
): Unit = {
if(!ConnectionPool.isInitialized(Symbol(name))) {
ConnectionPool.add(Symbol(name), url, username, password, settings)
info(s"successful initialization the database connection pool: $name")
}
}
def init(name: String, dataSource: DataSource): Unit = {
if(!ConnectionPool.isInitialized(Symbol(name))) {
ConnectionPool.add(Symbol(name), new DataSourceConnectionPool(dataSource))
info(s"successful initialization the database connection pool: $name")
}
}
def getConnection(
driverClassName: String, url: String,
username: String, password: String
): Connection = {
Class.forName(driverClassName)
DriverManager.getConnection(url, username, password)
}
def using[A](conn: Connection)(execute: DB => A): A = {
val db = DB(conn)
db.autoClose(false)
using(db)(execute)
}
def using[A](name: String)(execute: DB => A): A = {
val db = DB(ConnectionPool(Symbol(name)).borrow())
db.autoClose(false)
using(db)(execute)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。