This action will force synchronization from hellowzk/light-spark, which will overwrite any changes that you have made since you forked the repository, and can not be recovered!!!
Synchronous operation will process in the background and will refresh the page when finishing processing. Please be patient.
[toc]
本人从 2014 年开始使用 Apache Spark,基于以往项目经验,为了降低项目开发复杂度,减少开发成本,开发了 light-spark 框架。 本框架屏蔽了底层技术细节,让开发人员面向 SQL 编程,更专注于业务逻辑,减少开发过程中 Debug 工作,让 Apache Spark 更方便使用。
项目业务大部分功能可以用 SQL 实现,符合以下情况,建议使用本框架:
本项目已发布到 maven 中央仓库,暂时支持 Spark 2.2.x、Spark 2.3.x、Spark 2.4.x
<dependency>
<groupId>com.github.hellowzk</groupId>
<artifactId>light-spark-core_2.2</artifactId>
<version>1.0.4-release</version>
</dependency>
<dependency>
<groupId>com.github.hellowzk</groupId>
<artifactId>light-spark-core_2.3</artifactId>
<version>1.0.4-release</version>
</dependency>
<dependency>
<groupId>com.github.hellowzk</groupId>
<artifactId>light-spark-core_2.4</artifactId>
<version>1.0.4-release</version>
</dependency>
参考 pom 文件
参考 run
开发过程中,可以通过配置,自动把指定步骤的结果持久化存储,支持 hdfs csv、hive 表
本框架支持在配置中自定义及使用变量,支持变量嵌套使用
灵活的定义位置
自定义变量可以在 yaml 配置中,也可以定义在 class 类中,定义的变量名称必须唯一。
定义在 class 中,例子 MyConstants
变量使用
import com.hellowzk.light.spark.constants.AppConstants
AppConstants.variables("mysql_url")
// 类型 variables: Map[String, String]
所有可用变量存放在 AppConstants.variables 中
支持通过日期表达式定义日期,表达式格式为
DATE([yyyy-MM-dd, 20200722, yyyyMMdd][+3d][-1m][+1y]...)
...
constansMap:
today: "${DATE([yyyy-MM-dd])}" # 当前日期
today2: "${DATE([yyyy-MM, 2020-08-05, yyyy-MM-dd])}" # 2020-08-05 所在月份,2020-08
date1: "${DATE([yyyy-MM-dd, 2020-08-05][-1d])}" # 2020-08-05 减一天,2020-08-04
date2: "${DATE([yyyy-MM-dd, ${date1}][-1d])}" # 2020-08-04 减 1 天,2020-08-03
date3: "${DATE([yyyy-MM-dd, ${date2}][-1d])}" # 2020-08-03 减 1 天,2020-08-02
date4: "${DATE([yyyy-MM, ${date3}, yyyy-MM-dd][-1m])}" # 2020-08-02 减 1 月月份,2020-07
date5: "${DATE([yyyy-MM-dd, ${date4}, yyyy-MM][-1d])}" # 2020-07 月初减 1 天, 2020-06-30
...
配置项 | 默认值 | 是否必填 | 说明 |
---|---|---|---|
name | 是 | 定义当前 yaml 文件流程名称 | |
constansMap | 否,Map 类型 | 自定义常量,可以在当前 yaml 中通过 ${variable} 使用;在自定义代码中通过 AppConstants.variables("variable" 使用) | |
persistType | 否 | 使用 debug 组件,配置中间数据持久化的地址 hive/hdfs;开启条件,启动脚本中传入 --debug 参数启用 debug 组件 | |
persistDir | 否 | 持久化到 hdfs 的位置,需要启用 debug 组件 | |
persistHiveDb | 否 | 持久化到 hive 的库,需要启用 debug 组件 | |
udaf | 否 | 自定义 UDAF,Map类型, key 为 udaf 命名,value 为值 | |
inputs | 是 | 定义的输入数据 | |
processes | 是 | 处理逻辑 | |
outputs | 否 | 输出 | |
envs | 否 | 环境参数,暂时只支持 Spark 相关参数 |
# 流程名称
name: "xxx"
# 自定义常量,可以在sql/文件路径中使用 ${variable},常量名称不可重复
constansMap:
hive_out_db: xxx
mysql_url: xxx
mysql_username: xxx
mysql_passwd: xxx
# 启用 debug 工具,SQL处理时,如果 store=true , 结果会缓存到 $baseDir/{{EVENT_DATE}}/$tableName 目录,启动脚本必须传入参数 --debug, 否则该配置不生效
persistType: hive # hive hdfs
persistDir: /tmp/spark/app/full-batch
persistHiveDb: "${outputDB}"
# 输入定义,支持 3 种数据源
inputs:
- ...
# 逻辑处理,支持SQL 与 自定义处理, 有 3 种
processes:
- ...
# 结果输出, 支持 3 种
outputs:
- ...
# 环境设置
envs:
# 暂时只支持 spark 相关参数,如下
spark:
- ...
支持类型
type 值 | 说明 |
---|---|
classpathFile | 加载 classpath 文件,通过配置字段、分隔符生成表 |
customClasspath | 加载 hdfs 文件,自定义加载逻辑,生成表 |
hdfsfile | 加载 hdfs 文件,通过配置字段、分隔符生成表 |
customHdfs | 加载 hdfs 文件,自定义加载逻辑,生成表 |
hive | 加载 hive 表 |
jdbc | 加载 jdbc 表 |
clickhouseBalanced | 加载 clickhouse 表,使用 BalancedClickhouseDataSource 负载均衡接口,单机版或通过 SLB 做负载使用 jdbc 就好 |
配置项 | 默认值 | 是否必填 | 说明 |
---|---|---|---|
name | 是 | 定义该步生成的表的名称,可以在下面处理中引用 | |
type | 是 | 固定值 classpathFile | |
columns | 是 | 数据生成的 columns, 用 "," 隔开 | |
path | 是 | 文件在 classpath 的路径 | |
fs | 是 | 列分割符 | |
partations | 0 | 否 | 进行重新分区的数量,不配置不重分区 |
cache | false | 否 | 是否缓存,如果下面的操作多次使用该表,建议设置为true |
show | 0 | 否 | 开发时 debug 使用,对表进行 show 操作,配置成需要 show 的条数 |
store | false | 否 | 开发时 debug 使用,数据存储起来,支持存储到 hdfs/hive,请配置 persistType 参数 |
...
inputs:
- name: person
type: classpathFile
columns: name, gen, age
path: person.txt
fs: ","
...
自定义加载 classpath 的数据,生成表
配置项 | 默认值 | 是否必填 | 说明 |
---|---|---|---|
name | 是 | 定义该步生成的表的名称,可以在下面处理中引用 | |
type | 是 | 固定值 customClasspath | |
path | 是 | 文件在 classpath 的路径 | |
clazz | 是 | 自定义的加载逻辑类,必须继承 com.hellowzk.light.spark.stages.custom.CustomBaseInput 实现 process 方法 | |
partations | 0 | 否 | 进行重新分区的数量,不配置不重分区 |
cache | false | 否 | 是否缓存,如果下面的操作多次使用该表,建议设置为true |
show | 0 | 否 | 开发时 debug 使用,对表进行 show 操作,配置成需要 show 的条数 |
store | false | 否 | 开发时 debug 使用,数据存储起来,支持存储到 hdfs/hive,请配置 persistType 参数 |
...
inputs:
- name: custompersonClass
type: customClasspath
path: person2.txt
clazz: com.hellowzk.light.spark.stages.Process1
...
配置项 | 默认值 | 是否必填 | 说明 |
---|---|---|---|
name | 是 | 定义该步生成的表的名称,可以在下面处理中引用 | |
type | 是 | 固定值 hdfsfile | |
columns | 是 | 数据生成的 columns, 用 "," 隔开 | |
path | 是 | 文件在 hdfs 的路径 | |
fs | 是 | 列分割符 | |
partations | 0 | 否 | 进行重新分区的数量,不配置不重分区 |
cache | false | 否 | 是否缓存,如果下面的操作多次使用该表,建议设置为true |
show | 0 | 否 | 开发时 debug 使用,对表进行 show 操作,配置成需要 show 的条数 |
store | false | 否 | 开发时 debug 使用,数据存储起来,支持存储到 hdfs/hive,请配置 persistType 参数 |
...
inputs:
- name: person_hdfs
type: hdfsfile
columns: name,gen,age
path: /tmp/zhaokui/testdata/test/20200706/
fs: ","
...
自定义加载 customHdfs 的数据,生成表
配置项 | 默认值 | 是否必填 | 说明 |
---|---|---|---|
name | 是 | 定义该步生成的表的名称,可以在下面处理中引用 | |
type | 是 | 固定值 customHdfs | |
path | 是 | 文件在 hdfs 的路径 | |
clazz | 是 | 自定义加载逻辑类 | |
partations | 0 | 否 | 进行重新分区的数量,不配置不重分区 |
cache | false | 否 | 是否缓存,如果下面的操作多次使用该表,建议设置为true |
show | 0 | 否 | 开发时 debug 使用,对表进行 show 操作,配置成需要 show 的条数 |
store | false | 否 | 开发时 debug 使用,数据存储起来,支持存储到 hdfs/hive,请配置 persistType 参数 |
...
inputs:
- name: customPersonHdfs
type: customHdfs
path: /tmp/zhaokui/testdata/test/20200706/
clazz: com.hellowzk.light.spark.stages.Process1
...
自定义加载 customHdfs 的数据,生成表
配置项 | 默认值 | 是否必填 | 说明 |
---|---|---|---|
name | 是 | 定义该步生成的表的名称,可以在下面处理中引用 | |
type | 是 | 固定值 jdbc | |
driver | 是 | jdbc driver | |
url | 是 | jdbc url 地址 | |
user | 是 | 用户名 | |
password | 是 | 密码 | |
dbtable | 是 | 要加载的表,为 map 结构 | |
jdbcOpts | 否 | 请参看 SparkSQL官方文档 |
...
inputs:
- name: mysql1
type: jdbc
driver: "com.mysql.jdbc.Driver"
url: "${jdbcurl}" # 在 constansMap 中定义的该值
user: root
password: 123456
dbtable:
b_osp_app_event_detail: b_osp_app_event_detail
b_osp_app_area_analyze_total: b_osp_app_area_analyze_total
...
使用 BalancedClickhouseDataSource 加载 clickhouse 数据,生成 spark 表,如果 clickhouse 为单机版或集群做了 SLB,请使用使用 jdbc 方式
配置项 | 默认值 | 是否必填 | 说明 |
---|---|---|---|
name | 是 | 定义该步生成的表的名称,可以在下面处理中引用 | |
type | 是 | 固定值 clickhouseBalance | |
url | 是 | clickhouse url 地址 | |
user | 是 | 用户名 | |
password | 是 | 密码 | |
dbtable | 是 | 要加载的表,为 map 结构 | |
jdbcOpts | 否 | 请参看 SparkSQL官方文档 |
...
inputs:
- name: clickhouse1
type: clickhouseBalance
url: "${jdbcurl}" # 在 constansMap 中定义的该值
user: root
password: 123456
dbtable:
preson_all: all_data
...
配置项 | 默认值 | 是否必填 | 说明 |
---|---|---|---|
name | 是 | 定义该步生成的表的名称,可以在下面处理中引用 | |
type | 是 | 固定值 hive | |
database | 是 | hive 库名 | |
dbtable | 是 | 要加载的表,为 map 结构 |
...
inputs:
- name: hive1
type: hive
database: "${inputDB}"
dbtable:
dm_road_trackset_di: dm_road_trackset_di
dm_trackset_di: dm_trackset_di
...
支持类型
type 值 | 说明 |
---|---|
sql | 使用 SQL 处理数据 |
custom | 硬编码处理数据 |
配置项 | 默认值 | 是否必填 | 说明 |
---|---|---|---|
name | 是 | 定义该步生成的表的名称,可以在下面处理中引用 | |
sql | 是 | SQL 语句 | |
dimKey | 否 | 用于计算多维度数据 | |
allPlaceholder | all | 否 | 多维度数据 select 占位符 |
partations | 0 | 否 | 进行重新分区的数量,不配置不重分区 |
cache | false | 否 | 是否缓存,如果下面的操作多次使用该表,建议设置为true |
show | 0 | 否 | 开发时 debug 使用,对表进行 show 操作,配置成需要 show 的条数 |
store | false | 否 | 开发时 debug 使用,数据存储起来,支持存储到 hdfs/hive,请配置 persistType 参数 |
...
processes:
- name: table1
sql: "select name,gen,age, '${DATE([yyyy-MM-dd, 2020-08-05][-3d])}' from person"
cache: true # 出发action操作,缓存执行SQL后的表
store: true # 是否保存到本地,debug用,保存目录 $baseDir/{{EVENT_DATE}}/table
show: 20 # 在控制台打印表中数据,不打印则删除该节点,df.show(20)
partations: 3 # reparation 个数,不reparation则删除节点
# 多维度数据处理
- name: table1_1
dimKey: "gen, age" # 两个维度
allPlaceholder: "ALL"
sql: |- # |- :yaml 语法,支持字符串换行,适合格式化写 SQL
select name, ${DIM_FIELDS}, count(1) as count from person group by name ${DIM_GROUP}
cache: true # 出发action操作,缓存执行SQL后的表
store: true # 是否保存到本地,debug用,保存目录 $baseDir/{{EVENT_DATE}}/table
show: 20 # 在控制台打印表中数据,不打印则删除该节点,df.show(20)
partations: 3 # reparation 个数,多维度数据处理,建议使用该功能,否则 task 数量会根据维度个数爆炸膨胀,严重降低后续计算过程
...
自定义处理类,必须继承 com.hellowzk.light.spark.stages.custom.CustomBaseTransform 类,实现 doProcess 方法,如 Process2
配置项 | 默认值 | 是否必填 | 说明 |
---|---|---|---|
name | 是 | 定义该步生成的表的名称,可以在下面处理中引用 | |
clazz | 是 | 自定义处理类 | |
partations | 0 | 否 | 进行重新分区的数量,不配置不重分区 |
cache | false | 否 | 是否缓存,如果下面的操作多次使用该表,建议设置为true |
show | 0 | 否 | 开发时 debug 使用,对表进行 show 操作,配置成需要 show 的条数 |
store | false | 否 | 开发时 debug 使用,数据存储起来,支持存储到 hdfs/hive,请配置 persistType 参数 |
...
processes:
- name: customProcessShow
clazz: "com.hellowzk.light.spark.stages.Process2"
cache: true
store: true
show: 20
partations: 1
...
支持类型
type 值 | 说明 |
---|---|
jdbc | 保存数据到 jdbc |
clickhouseBalance | 使用 BalancedClickhouseDataSource 负载均衡接口,保存数据到 clickhouse,如果 clickhouse 为单机版或集群做了 SLB,请使用使用 jdbc |
hive | 保存数据到 hive |
hdfsfile | 保存数据到 hdfs 目录 |
kafkaField | 指定分隔符,数据发送到 kafka |
kafkaJson | JSON 格式,数据发送到 kafka |
通过此配置把数据保存到支持 jdbc 的持久化工具
配置项 | 默认值 | 是否必填 | 说明 |
---|---|---|---|
name | 是 | 节点名称,用于打印日志,定位问题 | |
type | 是 | 固定值 jdbc | |
driver | 是 | 数据库 driver | |
url | 是 | 连接数据库的 url | |
user | 是 | 用户名 | |
password | 是 | 密码 | |
tables | 是 | 要输出的表,为 map 类型 | |
mode | append | 否 | 保存模式,默认为 append |
opts | 否 | jdbc 操作的其他参数 | |
preSQL | 是 | 执行保存操作前,要对目标数据库执行的 SQL语句 |
...
outputs:
- name: mysql1
type: jdbc
driver: "com.mysql.jdbc.Driver"
url: "${jdbcurl}"
user: root
password: 123456
opts:
isolationLevel: NONE # 关闭事务
numPartitions: 1 # 设置并发
batchsize: 50000
preSQL:
- delete from b_osp_app_event_detail
- delete from b_osp_app_area_analyze_total
tables:
table3: b_osp_app_event_detail
b_osp_app_area_analyze_total: b_osp_app_area_analyze_total
...
如果 clickhouse 为单机版或集群做了 SLB,请使用使用 jdbc
配置项 | 默认值 | 是否必填 | 说明 |
---|---|---|---|
name | 是 | 节点名称,用于打印日志,定位问题 | |
type | 是 | 固定值 clickhouseBalance | |
url | 是 | 连接数据库的 url | |
user | 是 | 用户名 | |
password | 是 | 密码 | |
tables | 是 | 要输出的表,为 map 类型 | |
mode | append | 否 | 保存模式,默认为 append |
opts | 否 | jdbc 操作的其他参数 | |
preSQL | 是 | 执行保存操作前,要对目标数据库执行的 SQL语句,为 clickhouse 分布式表操作语句 |
...
outputs:
- name: clickhouse1
type: clickhouseBalance
url: "${jdbcurl}"
user: root
password: 123456
opts:
isolationLevel: NONE # 关闭事务
numPartitions: 1 # 设置并发
batchsize: 50000
# clickhouse 分布式表的操作语句
preSQL:
- alter table b_osp_app_event_detail on cluster test_cluster delete where dt = '${event_date}'
tables:
table3: b_osp_app_event_detail
...
配置项 | 默认值 | 是否必填 | 说明 |
---|---|---|---|
name | 是 | 节点名称,用于打印日志,定位问题 | |
type | 是 | 固定值 hive | |
database | 是 | 要保存到的 hive 库 | |
tables | 是 | 要输出的表,为 map 类型 | |
mode | overwrite | 否 | 保存模式,默认为 overwrite |
preSQL | 是 | 执行保存操作前,要对目标数据库执行的 SQL语句 |
...
outputs:
- name: mysql1
type: hive
database: test
tables:
table3: b_osp_app_event_detail
b_osp_app_area_analyze_total: b_osp_app_area_analyze_total
...
配置项 | 默认值 | 是否必填 | 说明 |
---|---|---|---|
name | 是 | 节点名称,用于打印日志,定位问题 | |
type | 是 | 固定值 hdfsfile | |
srcName | 是 | 要保存的数据,必须使用上面定义的表名 | |
format | 是 | 要输出的文件类型,暂时支持 csv, txt, lzo, json, parquet 这些类型 | |
path | 是 | 数据要保存的目录 | |
fs | \u0001 | 否 | 保存成文件时的列分隔符,只针对 txt lzo 类型输出生效 |
...
outputs:
- name: out1
type: hdfsfile
srcName: table1
format: csv # 保存为 csv
path: /user/osp/data/out1/${EVENT_DATE}
- name: out2
type: hdfsfile
srcName: table2
format: txt # 保存为 txt,字段用逗号分隔
fs: ","
path: /user/osp/data/out2/${EVENT_DATE}
- name: out3
type: hdfsfile
srcName: table3
format: lzo # 保存为 lzo,字段用逗号分隔
fs: ","
path: /user/osp/data/out3/${EVENT_DATE}
- name: out4
type: hdfsfile
srcName: table4
format: json # 保存为 json
path: /user/osp/data/out4/${EVENT_DATE}
- name: out5
type: hdfsfile
srcName: table5
format: parquet # 保存为 parquet
fs: ","
path: /user/osp/data/out5/${EVENT_DATE}
...
配置项 | 默认值 | 是否必填 | 说明 |
---|---|---|---|
name | 是 | 节点名称,用于打印日志,定位问题 | |
type | 是 | 固定值 kafkaField | |
srcName | 是 | 要保存的 table | |
brokers | 是 | 目标 kafka brokers | |
topic | 是 | 目标 kafka topic | |
fs | 是 | 字段分隔符 |
...
outputs:
- name: kafkaFieldTest
type: kafkaField
srcName: table1
brokers: ${outputKafka}
topic: test
fs: ","
...
配置项 | 默认值 | 是否必填 | 说明 |
---|---|---|---|
name | 是 | 节点名称,用于打印日志,定位问题 | |
type | 是 | 固定值 kafkaJson | |
srcName | 是 | 要保存的 table | |
brokers | 是 | 目标 kafka brokers | |
topic | 是 | 目标 kafka topic |
...
outputs:
- name: kafkaJsonTest
type: kafkaJson
srcName: table1
brokers: ${outputKafka}
topic: testJson
...
请参考 LICENSE 文件.
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。