网站流量日志数据分析系统
软件架构说明
数据采集:flume
数据预处理:编写mapreduce实现mr程序
数据仓库
hive
数据统计及分析
通过sqoop将数据导入到mysql数据库
数据可视化
azkaban
项目流程:
一、 网站日志数据采集(代码及实现步骤)
a) 网站日志数据生成及按时间拆解日志数据
需求:生成并拆解日志数据到指定文件夹
实现过程描述:首先通过一个shell脚本指定日志文件存储路径,在路径里面生成log文件,最后通过crontab配置指令实现每分钟采集。
代码如下:
Shell:
#!/bin/sh
logs_path="/home/hadoop/nginx/logs/"
pid_path="/home/hadoop/nginx/logs/nginx.pid"
filepath=${logs_path}"access.log"
echo $filepath
mv ${logs_path}access.log ${logs_path}access_$(date -d '-1 day' '+%Y-%m-%d-%H-%M').log
kill -USR1 cat ${pid_path}
crontab配置:
/1 * * * * sh /home/hadoop/bigdatasoftware/project1/nginx_log.sh
b) 通过flume日志数据采集到hdfs
实现过程描述:通过配置文件确定日志在hdfs的存储路径以及相关配置,再运行flume脚本,到浏览器上点击指定网页,再去查看hdfs是否生成日志文件夹。如果有则采集成功。
代码如下:
Hdfs配置文件:
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
#监控一个目录下的多个文件新增的内容
agent1.sources.source1.type = TAILDIR
#通过 json 格式存下每个文件消费的偏移量,避免从头消费
agent1.sources.source1.positionFile = /home/hadoop/taildir_position.json
agent1.sources.source1.filegroups = f1
agent1.sources.source1.filegroups.f1 = /home/hadoop/nginx/logs/access_.log
agent1.sources.source1.interceptors = i1 agent1.sources.source1.interceptors.i1.type = host agent1.sources.source1.interceptors.i1.hostHeader = hostname #配置sink组件为hdfs agent1.sinks.sink1.type = hdfs agent1.sinks.sink1.hdfs.path=hdfs://hadoop-001:8020/weblog/flume-collection/%Y-%m-%d/%H-%M_%{hostname} #指定文件名前缀 agent1.sinks.sink1.hdfs.filePrefix = access_log #指定每批下沉数据的记录条数 agent1.sinks.sink1.hdfs.batchSize= 100 agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.writeFormat =Text #指定下沉文件按1MB大小滚动 agent1.sinks.sink1.hdfs.rollSize = 1048576 #指定下沉文件按1000000条数滚动 agent1.sinks.sink1.hdfs.rollCount = 1000000 #指定下沉文件按30分钟滚动 agent1.sinks.sink1.hdfs.rollInterval = 30 #agent1.sinks.sink1.hdfs.round = true #agent1.sinks.sink1.hdfs.roundValue = 10 #agent1.sinks.sink1.hdfs.roundUnit = minute agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
#使用memory类型channel agent1.channels.channel1.type = memory agent1.channels.channel1.capacity = 500000 agent1.channels.channel1.transactionCapacity = 600
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
运行flume脚本:
bin/flume-ng agent --conf conf/ --name agent1 --conf-file job/TaildirSource-hdfs.conf -Dflume.root.logger=INFO,console
二、 数据预处理(代码及实现步骤)
a) 将数据从采集目录移动到预处理目录
需求:移动文件到预处理工作目录
实现过程描述:通过设置flume采集生成的日志文件存放的目录和预处理程序的工作目录,就可以实现两者的移动。
代码如下:
#!/bin/bash
#flume采集生成的日志文件存放的目录 log_flume_dir=/weblog/flume-collection
#预处理程序的工作目录 log_pre_input=/data/weblog/preprocess/input
#获取时间信息
day_01="2013-09-18"
day_01=date -d'-1 day' +%Y-%m-%d
syear=date --date=$day_01 +%Y
smonth=date --date=$day_01 +%m
sday=date --date=$day_01 +%d
#读取日志文件的目录,判断是否有需要上传的文件
files=hadoop fs -ls $log_flume_dir | grep $day_01 | wc -l
if [ $files -gt 0 ]; then
hadoop fs -mv ${log_flume_dir}/${day_01} ${log_pre_input}
echo "success moved ${log_flume_dir}/${day_01} to ${log_pre_input} ....."
fi
b) 数据清洗(5分)
需求:从第一次采集的数据筛选出有效的数据。
实现过程描述:编写一个数据清洗的mr程序
代码如下:
定义一个实体类描述源日志数据:
package com.gec.mr.pre.mrbean;
import org.apache.hadoop.io.Writable;
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;
/**
*/ public class WebLogBean implements Writable {
private boolean valid = true;// 判断数据是否合法
private String remote_addr;// 记录客户端的ip地址
private String remote_user;// 记录客户端用户名称,忽略属性"-"
private String time_local;// 记录访问时间与时区
private String request;// 记录请求的url与http协议
private String status;// 记录请求状态;成功是200
private String body_bytes_sent;// 记录发送给客户端文件主体内容大小
private String http_referer;// 用来记录从那个页面链接访问过来的
private String http_user_agent;// 记录客户浏览器的相关信息
public void set(boolean valid,String remote_addr, String remote_user, String time_local, String request, String status, String body_bytes_sent, String http_referer, String http_user_agent) {
this.valid = valid;
this.remote_addr = remote_addr;
this.remote_user = remote_user;
this.time_local = time_local;
this.request = request;
this.status = status;
this.body_bytes_sent = body_bytes_sent;
this.http_referer = http_referer;
this.http_user_agent = http_user_agent;
}
public String getRemote_addr() {
return remote_addr;
}
public void setRemote_addr(String remote_addr) {
this.remote_addr = remote_addr;
}
public String getRemote_user() {
return remote_user;
}
public void setRemote_user(String remote_user) {
this.remote_user = remote_user;
}
public String getTime_local() {
return this.time_local;
}
public void setTime_local(String time_local) {
this.time_local = time_local;
}
public String getRequest() {
return request;
}
public void setRequest(String request) {
this.request = request;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getBody_bytes_sent() {
return body_bytes_sent;
}
public void setBody_bytes_sent(String body_bytes_sent) {
this.body_bytes_sent = body_bytes_sent;
}
public String getHttp_referer() {
return http_referer;
}
public void setHttp_referer(String http_referer) {
this.http_referer = http_referer;
}
public String getHttp_user_agent() {
return http_user_agent;
}
public void setHttp_user_agent(String http_user_agent) {
this.http_user_agent = http_user_agent;
}
public boolean isValid() {
return valid;
}
public void setValid(boolean valid) {
this.valid = valid;
}
/**
* \001是hive当中默认的分隔符,不会出现用户手打出来的情况
* @return
*/
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(this.valid);
sb.append("\001").append(this.getRemote_addr());
sb.append("\001").append(this.getRemote_user());
sb.append("\001").append(this.getTime_local());
sb.append("\001").append(this.getRequest());
sb.append("\001").append(this.getStatus());
sb.append("\001").append(this.getBody_bytes_sent());
sb.append("\001").append(this.getHttp_referer());
sb.append("\001").append(this.getHttp_user_agent());
return sb.toString();
}
@Override
public void readFields(DataInput in) throws IOException {
this.valid = in.readBoolean();
this.remote_addr = in.readUTF();
this.remote_user = in.readUTF();
this.time_local = in.readUTF();
this.request = in.readUTF();
this.status = in.readUTF();
this.body_bytes_sent = in.readUTF();
this.http_referer = in.readUTF();
this.http_user_agent = in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeBoolean(this.valid);
out.writeUTF(null==remote_addr?"":remote_addr);
out.writeUTF(null==remote_user?"":remote_user);
out.writeUTF(null==time_local?"":time_local);
out.writeUTF(null==request?"":request);
out.writeUTF(null==status?"":status);
out.writeUTF(null==body_bytes_sent?"":body_bytes_sent);
out.writeUTF(null==http_referer?"":http_referer);
out.writeUTF(null==http_user_agent?"":http_user_agent);
}
} 编写处理数据清洗的工具类: package com.gec.mr.pre;
import com.gec.mr.pre.mrbean.WebLogBean;
import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Locale; import java.util.Set;
public class WebLogParser {
public static SimpleDateFormat df1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US);
public static SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
public static WebLogBean parser(String line) {
WebLogBean webLogBean = new WebLogBean();
//通过空格来对我们的数据进行切割,然后拼接字符串,将我们同一个字段里面的数据拼接到一起
//222.66.59.174 -- [18/Sep/2013:06:53:30 +0000] "GET /images/my.jpg HTTP/1.1" 200 19939 "http://www.angularjs.cn/A00n" "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:23.0) Gecko/20100101 Firefox/23.0"
String[] arr = line.split(" ");
if (arr.length > 11) {
webLogBean.setRemote_addr(arr[0]);
webLogBean.setRemote_user(arr[1]);
//将我们的字符串转换成中文习惯的字符串
// [18/Sep/2013:06:52:32 +0000]
// 18/Sep/2013:06:52:32------》2013-09-18 06:52:32
String time_local = formatDate(arr[3].substring(1));
if(null==time_local || "".equals(time_local)) {
time_local="-invalid_time-";
}
webLogBean.setTime_local(time_local);
webLogBean.setRequest(arr[6]);
webLogBean.setStatus(arr[8]);
webLogBean.setBody_bytes_sent(arr[9]);
webLogBean.setHttp_referer(arr[10]);
//如果useragent元素较多,拼接useragent。
//数组长度大于12,说明我们的最后一个字段切出来比较长,我们把所有多的数据都塞到最后一个字段里面去
// "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; .NET CLR 1.1.4322; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; MDDR; InfoPath.2; .NET4.0C)"
if (arr.length > 12) {
StringBuilder sb = new StringBuilder();
for(int i=11;i<arr.length;i++){
sb.append(arr[i]);
}
webLogBean.setHttp_user_agent(sb.toString());
} else {
webLogBean.setHttp_user_agent(arr[11]);
}
//如果请求状态码大于400值,就认为是请求出错了,请求出错的数据直接认为是无效数据
if (Integer.parseInt(webLogBean.getStatus()) >= 400) {// 大于400,HTTP错误
webLogBean.setValid(false);
}
//如果获取时间没拿到,那么也是认为是无效的数据
if("-invalid_time-".equals(webLogBean.getTime_local())){
webLogBean.setValid(false);
}
} else {
//58.215.204.118 - - [18/Sep/2013:06:52:33 +0000] "-" 400 0 "-" "-"
//如果切出来的数组长度小于11个,说明数据不全,,直接丢掉
webLogBean=null;
}
return webLogBean;
}
public static void filtStaticResource(WebLogBean bean, Set<String> pages) {
if (!pages.contains(bean.getRequest())) {
bean.setValid(false);
}
}
//格式化时间方法
public static String formatDate(String time_local) {
try {
return df2.format(df1.parse(time_local));
} catch (ParseException e) {
return null;
}
}
} 配置mr类: package com.gec.mr.pre.mapper;
import com.gec.mr.pre.mrbean.WebLogBean; import com.gec.mr.pre.utils.WebLogParser; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException; import java.util.HashSet; import java.util.Set;
public class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> { // 用来存储网站url分类数据 Set pages = new HashSet(); Text k = new Text(); NullWritable v = NullWritable.get(); /** * map阶段的初始化方法 * 从外部配置文件中加载网站的有用url分类数据 存储到maptask的内存中,用来对日志数据进行过滤 * 过滤掉我们日志文件当中的一些静态资源,包括js css img 等请求日志都需要过滤掉 */ @Override protected void setup(Context context) throws IOException, InterruptedException { //定义一个集合 pages.add("/about"); pages.add("/black-ip-list/"); pages.add("/cassandra-clustor/"); pages.add("/finance-rhive-repurchase/"); pages.add("/hadoop-family-roadmap/"); pages.add("/hadoop-hive-intro/"); pages.add("/hadoop-zookeeper-intro/"); pages.add("/hadoop-mahout-roadmap/");
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//得到我们一行数据
String line = value.toString();
WebLogBean webLogBean = WebLogParser.parser(line);
if (webLogBean != null) {
// 过滤js/图片/css等静态资源
WebLogParser.filtStaticResource(webLogBean, pages);
if (!webLogBean.isValid()) return;
k.set(webLogBean.toString());
context.write(k, v);
}
}
} Driver: package com.gec.mr.pre.driver;
import com.gec.mr.pre.mapper.WeblogPreProcessMapper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
public class WeblogEtlPreProcessDriver {
static {
try {
// 设置 HADOOP_HOME 目录
System.setProperty("hadoop.home.dir", "D:/winutils-master/winutils-master/hadoop-3.0.0/");
// 加载库文件
System.load("D:/winutils-master/winutils-master/hadoop-3.0.0/bin/hadoop.dll");
} catch (UnsatisfiedLinkError e) {
System.err.println("Native code library failed to load.\n" + e);
System.exit(1);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
FileInputFormat.addInputPath(job,new Path("file:///d:\\src\\input"));
job.setInputFormatClass(TextInputFormat.class);
FileOutputFormat.setOutputPath(job,new Path("file:///d:\\src\\weblogPreOut2"));
job.setOutputFormatClass(TextOutputFormat.class);
job.setJarByClass(WeblogEtlPreProcessDriver.class);
job.setMapperClass(WeblogPreProcessMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);
boolean res = job.waitForCompletion(true);
}
}
c) 数据点击流数据模型生成
需求:将点击流数据存到hdfs。
实现过程描述:通过编写clickstreampageview,实现点击流访问数据
代码如下:
核心类Reducer:
package com.gec.mr.pre.reducer;
import com.gec.mr.pre.mrbean.WebLogBean; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.shaded.org.apache.commons.beanutils.BeanUtils;
import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*;
public class ClickStreamReducer extends Reducer<Text, WebLogBean, NullWritable, Text> {
Text v = new Text();
/**
* reduce阶段接收到的key就是我们的IP
* 接收到的value就是我们一行行的数据
* @param key
* @param values
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<WebLogBean> values, Context context) throws IOException, InterruptedException
{
ArrayList<WebLogBean> beans = new ArrayList<WebLogBean>();
// 先将一个用户的所有访问记录中的时间拿出来排序
try {
//循环遍历V2,这里面装的,都是我们的同一个用的数据
for (WebLogBean bean : values) {
// beans.add(bean);
//为什么list集合当中不能直接添加循环出来的这个bean?
//这里通过属性拷贝,每次new 一个对象,避免了bean的属性值每次覆盖
//这是涉及到java的深浅拷贝问题
WebLogBean webLogBean = new WebLogBean();
try {
BeanUtils.copyProperties(webLogBean, bean);
} catch(Exception e) {
e.printStackTrace();
}
//beans.add(bean);
beans.add(webLogBean);
}
//将bean按时间先后顺序排序,排好序之后,就计算这个集合当中下一个时间和上一个时间的差值 ,如
//如果差值小于三十分钟,那么就代表一次会话,如果差值大于30分钟,那么就代表多次会话
//将我们的weblogBean塞到一个集合当中,我们就可以自定义排序,对集合当中的数据进行排序
Collections.sort(beans, new Comparator<WebLogBean>() {
@Override
public int compare(WebLogBean o1, WebLogBean o2) {
try {
Date d1 = toDate(o1.getTime_local());
Date d2 = toDate(o2.getTime_local());
if (d1 == null || d2 == null)
return 0;
return d1.compareTo(d2);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
});
/**
* 以下逻辑为:从有序bean中分辨出各次visit,并对一次visit中所访问的page按顺序标号step
* 核心思想:
* 就是比较相邻两条记录中的时间差,如果时间差<30分钟,则该两条记录属于同一个session
* 否则,就属于不同的session
*
*/
int step = 1;
//定义一个uuid作为我们的session编号
String session = UUID.randomUUID().toString();
///经过排序之后,集合里面的数据都是按照时间来排好序了
for (int i = 0; i < beans.size(); i++) {
WebLogBean bean = beans.get(i);
// 如果仅有1条数据,则直接输出
if (1 == beans.size()) {
// 设置默认停留时长为60s
v.set(session+"\001"+key.toString()+"\001"+bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001"
+ bean.getStatus());
context.write(NullWritable.get(), v);
session = UUID.randomUUID().toString();
break;
}
// 如果不止1条数据,则将第一条跳过不输出,遍历第二条时再输出
if (i == 0) {
continue;
}
// 求近两次时间差
long timeDiff = timeDiff(toDate(bean.getTime_local()), toDate(beans.get(i - 1).getTime_local()));
// 如果本次-上次时间差<30分钟,则输出前一次的页面访问信息
if (timeDiff < 30 * 60 * 1000) {
v.set(session+"\001"+key.toString()+"\001"+beans.get(i - 1).getRemote_user() + "\001" + beans.get(i - 1).getTime_local() + "\001" + beans.get(i - 1).getRequest() + "\001" + step + "\001" + (timeDiff / 1000) + "\001" + beans.get(i - 1).getHttp_referer() + "\001"
+ beans.get(i - 1).getHttp_user_agent() + "\001" + beans.get(i - 1).getBody_bytes_sent() + "\001" + beans.get(i - 1).getStatus());
context.write(NullWritable.get(), v);
step++;
} else {
// 如果本次-上次时间差>30分钟,则输出前一次的页面访问信息且将step重置,以分隔为新的visit
v.set(session+"\001"+key.toString()+"\001"+beans.get(i - 1).getRemote_user() + "\001" + beans.get(i - 1).getTime_local() + "\001" + beans.get(i - 1).getRequest() + "\001" + (step) + "\001" + (60) + "\001" + beans.get(i - 1).getHttp_referer() + "\001"
+ beans.get(i - 1).getHttp_user_agent() + "\001" + beans.get(i - 1).getBody_bytes_sent() + "\001" + beans.get(i - 1).getStatus());
context.write(NullWritable.get(), v);
// 输出完上一条之后,重置step编号
step = 1;
session = UUID.randomUUID().toString();
}
// 如果此次遍历的是最后一条,则将本条直接输出
if (i == beans.size() - 1) {
// 设置默认停留市场为60s
v.set(session+"\001"+key.toString()+"\001"+bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001" + bean.getStatus());
context.write(NullWritable.get(), v);
}
}
} catch (ParseException e) {
e.printStackTrace();
}
}
private String toStr(Date date) {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
return df.format(date);
}
private Date toDate(String timeStr) throws ParseException {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
return df.parse(timeStr);
}
private long timeDiff(String time1, String time2) throws ParseException {
Date d1 = toDate(time1);
Date d2 = toDate(time2);
return d1.getTime() - d2.getTime();
}
private long timeDiff(Date time1, Date time2) throws ParseException {
// date 调用 getTime获取毫秒值
return time1.getTime() - time2.getTime();
}
}
d) 数据点击访问页面数据模型生成
需求:生成点击流访问数据
实现过程描述:通过编写ClickStreamVisit,实现模型生成。
代码如下:
核心类reducer:
package com.gec.mr.pre.reducer;
import com.gec.mr.pre.mrbean.PageViewsBean; import com.gec.mr.pre.mrbean.VisitBean; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.shaded.org.apache.commons.beanutils.BeanUtils;
import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator;
public class ClickStreamVisitReducer extends Reducer<Text, PageViewsBean, NullWritable, VisitBean> {
@Override
protected void reduce(Text session, Iterable<PageViewsBean> pvBeans, Context context) throws IOException, InterruptedException {
// 将pvBeans按照step排序
ArrayList<PageViewsBean> pvBeansList = new ArrayList<PageViewsBean>();
for (PageViewsBean pvBean : pvBeans) {
PageViewsBean bean = new PageViewsBean();
try {
BeanUtils.copyProperties(bean, pvBean);
pvBeansList.add(bean);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 将数据按照我们的步骤进行排序,这样就可以得到哪个页面先访问,哪个页面后访问的
*/
Collections.sort(pvBeansList, new Comparator<PageViewsBean>() {
@Override
public int compare(PageViewsBean o1, PageViewsBean o2) {
return o1.getStep() > o2.getStep() ? 1 : -1;
}
});
// 取这次visit的首尾pageview记录,将数据放入VisitBean中
VisitBean visitBean = new VisitBean();
// 取visit的首记录
visitBean.setInPage(pvBeansList.get(0).getRequest());
visitBean.setInTime(pvBeansList.get(0).getTimestr());
// 取visit集合当中末尾的记录即可
visitBean.setOutPage(pvBeansList.get(pvBeansList.size() - 1).getRequest());
visitBean.setOutTime(pvBeansList.get(pvBeansList.size() - 1).getTimestr());
// visit访问的页面数
visitBean.setPageVisits(pvBeansList.size());
// 来访者的ip
visitBean.setRemote_addr(pvBeansList.get(0).getRemote_addr());
// 本次visit的referal
visitBean.setReferal(pvBeansList.get(0).getReferal());
visitBean.setSession(session.toString());
context.write(NullWritable.get(), visitBean);
}
}
三、 将数据导入hive(代码及实现步骤)
a) 将清洗后的数据导入Hive
需求:将数据导入hive
实现过程描述:首先生成hive数据表,用load data语句将数据从hdfs导入hive
代码如下:drop table if exists ods_weblog_origin;
create table ods_weblog_origin(
valid string,
remote_addr string,
remote_user string,
time_local string,
request string,
status string,
body_bytes_sent string,
http_referer string,
http_user_agent string)
partitioned by (datestr string)
row format delimited
fields terminated by '\001';
pageview:
drop table if exists ods_click_pageviews;
create table ods_click_pageviews(
session string,
remote_addr string,
remote_user string,
time_local string,
request string,
visit_step string,
page_staylong string,
http_referer string,
http_user_agent string,
body_bytes_sent string,
status string)
partitioned by (datestr string)
row format delimited
fields terminated by '\001';
visit:
drop table if exists ods_click_stream_visit;
create table ods_click_stream_visit(
session string,
remote_addr string,
inTime string,
outTime string,
inPage string,
outPage string,
referal string,
pageVisits int)
partitioned by (datestr string)
row format delimited
fields terminated by '\001';
hadoop jar etlmr-1.0-SNAPSHOT.jar com.gec.mr.pre.driver.WeblogEtlPreProcessDriver
/data/weblog/preprocess/input/2021-12-09 /data/weblog/preprocess/weblogPreOut/2021-12-09
hadoop jar clickstreamdata-1.0-SNAPSHOT.jar com.gec.mr.pre.driver.ClickStreamDriver /data/weblog/preprocess/weblogPreOut/2021-12-10 /data/weblog/preprocess/pageViewOut/2021-12-15
hadoop jar visitstreamdata-1.0-SNAPSHOT.jar com.gec.mr.pre.driver.ClickStreamVisitDriver /data/weblog/preprocess/pageViewOut/2021-12-15 /data/weblog/preprocess/clickStreamVisit/2021-12-15
load data inpath '/data/weblog/preprocess/weblogPreOut/2021-12-10/' overwrite into table ods_weblog_origin partition(datestr='2021-12-10');
b) 将点击页面流数据导入Hive
需求:将点击页面流数据导入Hive
实现过程描述:用load data语句将数据从hdfs导入hive
代码如下:
load data inpath '/data/weblog/preprocess/pageViewOut/2021-12-15' overwrite into table ods_click_pageviews partition(datestr='2021-12-15');
c) 将点击流访问表数据导入Hive
需求:将点击流访问表数据导入Hive
实现过程描述:用load data语句将数据从hdfs导入hive
代码如下:
load data inpath '/data/weblog/preprocess/clickStreamVisit/2021-12-15' overwrite into table ods_click_stream_visit partition(datestr='2021-12-15');
四、 数据导入(代码及实现步骤)
a) 基于sqoop将数据导出mysql
需求:将hive里面的数据导入到关系型数据库mysql
实现过程描述:创建关系型数据库并建表,用sqoop工具连接mysql,创建三个ods表分别算每天的pvs值,指定日期的每个小时的pvs值,每天的page的pvs值。
代码如下:
bin/sqoop export --connect jdbc:mysql://192.168.94.121:3306/weblogs --username root --password 123456 --m 1 --export-dir /user/hive/warehouse/dw_pvs_everyday --table dw_pvs_everyday --input-fields-terminated-by '\001'
bin/sqoop export --connect jdbc:mysql://192.168.94.121:3306/weblogs --username root --password 123456 --m 1 --export-dir /user/hive/warehouse/dw_pvs_everyhour_oneday/datestr=2021-12-15 --table dw_pvs_everyhour_oneday --input-fields-terminated-by '\001'
bin/sqoop export --connect jdbc:mysql://192.168.94.121:3306/weblogs --username root --password 123456 --m 1 --export-dir /user/hive/warehouse/dw_pvs_request_everyday --table dw_pvs_request_page --input-fields-terminated-by '\001'
五、 定制开发web图表(代码及实现步骤)
a) 每天访问量的pvs值(折线图)
需求:用折线图表示出每天访问量的pvs值
实现过程描述:在idea里面通过导入echarts,编写html文件连通mysql即可。
代码如下:
Html:
//在js读取thymeleaf变量值
var data1=[[${dayJson}]];
var data2=[[${pvsListJson}]];
// 基于准备好的dom,初始化echarts实例
var myChart = echarts.init(document.getElementById('main'));
var option = {
title: {
text: '每天访问量pvs值折线图(x轴为某月的其中几天)'
},
xAxis: {
tooltip: {},
legend: {
data: ['页面pvs值']
},
type: 'category',
// data: ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
data: JSON.parse(data1)
},
yAxis: {
type: 'value'
},
series: [
{
// data: [120, 200, 150, 80, 70, 110, 130],
data: JSON.parse(data2),
type: 'line',
symbol: 'triangle',
symbolSize: 20,
lineStyle: {
color: '#5470C6',
width: 4,
type: 'dashed'
},
itemStyle: {
borderWidth: 3,
borderColor: '#EE6666',
color: 'yellow'
}
}
]
};
myChart.setOption(option);
//在js读取thymeleaf变量值
var data1=[[${hourJson}]];
var data2=[[${pvsListJson}]];
// 基于准备好的dom,初始化echarts实例
var myChart = echarts.init(document.getElementById('main'));
var option = {
title: {
text: '每小时页面pvs值折线图(x轴为某整点)'
},
tooltip: {},
legend: {
data: ['页面pvs值']
},
xAxis: {
type: 'category',
// data: ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
data: JSON.parse(data1)
},
yAxis: {
type: 'value'
},
series: [
{
// data: [150, 230, 224, 218, 135, 147, 260],
data: JSON.parse(data2),
type: 'line'
}
]
};
myChart.setOption(option);
c) 统计页面pvs值(柱状图) 需求:用柱状图统计页面pvs值 实现过程描述:在idea里面通过导入echarts,编写html文件连通mysql即可 代码如下: Html:
<title>点击流数据图表</title> <script src="/js/echarts.js"></script> <script type="text/javascript" th:inline="javascript">//在js读取thymeleaf变量值
var data1=[[${pageJson}]];
var data2=[[${pvsJson}]];
// 基于准备好的dom,初始化echarts实例
var myChart = echarts.init(document.getElementById('main'));
// 指定图表的配置项和数据
var option = {
title: {
text: '数据可视化'
},
tooltip: {},
legend: {
data: ['页面的pvs']
},
xAxis: {
data: JSON.parse(data1)
},
yAxis: {},
series: [
{
name: '页面',
type: 'bar',
data: JSON.parse(data2)
}
]
};
/* var option = {
series: [
{
type: 'pie',
data: [
{
value: 335,
name: '直接访问'
},
{
value: 234,
name: '联盟广告'
},
{
value: 1548,
name: '搜索引擎'
}
],
radius: '50%'
}
]
};*/
// 使用刚指定的配置项和数据显示图表。
myChart.setOption(option);
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。