1 Star 0 Fork 0

Jay / network-log-data-collection-sys

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
MulanPSL-2.0

network-log-data-collection-sys

介绍

网站流量日志数据分析系统

软件架构

软件架构说明

  • 数据采集:flume

  • 数据预处理:编写mapreduce实现mr程序

    • 数据清洗(ETL)
  • 数据仓库

    • hive

    • 数据统计及分析

  • 通过sqoop将数据导入到mysql数据库

  • 数据可视化

    • 编写一个web程序,springboot写
    • echarts api实现
  • azkaban

    • 调用整个工作流程

项目流程:

一、 网站日志数据采集(代码及实现步骤) a) 网站日志数据生成及按时间拆解日志数据 需求:生成并拆解日志数据到指定文件夹 实现过程描述:首先通过一个shell脚本指定日志文件存储路径,在路径里面生成log文件,最后通过crontab配置指令实现每分钟采集。 代码如下: Shell: #!/bin/sh logs_path="/home/hadoop/nginx/logs/" pid_path="/home/hadoop/nginx/logs/nginx.pid" filepath=${logs_path}"access.log" echo $filepath mv ${logs_path}access.log ${logs_path}access_$(date -d '-1 day' '+%Y-%m-%d-%H-%M').log kill -USR1 cat ${pid_path}
crontab配置: /1 * * * * sh /home/hadoop/bigdatasoftware/project1/nginx_log.sh
b) 通过flume日志数据采集到hdfs 实现过程描述:通过配置文件确定日志在hdfs的存储路径以及相关配置,再运行flume脚本,到浏览器上点击指定网页,再去查看hdfs是否生成日志文件夹。如果有则采集成功。 代码如下:
Hdfs配置文件: agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1 #监控一个目录下的多个文件新增的内容 agent1.sources.source1.type = TAILDIR #通过 json 格式存下每个文件消费的偏移量,避免从头消费 agent1.sources.source1.positionFile = /home/hadoop/taildir_position.json agent1.sources.source1.filegroups = f1 agent1.sources.source1.filegroups.f1 = /home/hadoop/nginx/logs/access_
.log

agent1.sources.source1.interceptors = i1 agent1.sources.source1.interceptors.i1.type = host agent1.sources.source1.interceptors.i1.hostHeader = hostname #配置sink组件为hdfs agent1.sinks.sink1.type = hdfs agent1.sinks.sink1.hdfs.path=hdfs://hadoop-001:8020/weblog/flume-collection/%Y-%m-%d/%H-%M_%{hostname} #指定文件名前缀 agent1.sinks.sink1.hdfs.filePrefix = access_log #指定每批下沉数据的记录条数 agent1.sinks.sink1.hdfs.batchSize= 100 agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.writeFormat =Text #指定下沉文件按1MB大小滚动 agent1.sinks.sink1.hdfs.rollSize = 1048576 #指定下沉文件按1000000条数滚动 agent1.sinks.sink1.hdfs.rollCount = 1000000 #指定下沉文件按30分钟滚动 agent1.sinks.sink1.hdfs.rollInterval = 30 #agent1.sinks.sink1.hdfs.round = true #agent1.sinks.sink1.hdfs.roundValue = 10 #agent1.sinks.sink1.hdfs.roundUnit = minute agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

#使用memory类型channel agent1.channels.channel1.type = memory agent1.channels.channel1.capacity = 500000 agent1.channels.channel1.transactionCapacity = 600

Bind the source and sink to the channel

agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1
运行flume脚本: bin/flume-ng agent --conf conf/ --name agent1 --conf-file job/TaildirSource-hdfs.conf -Dflume.root.logger=INFO,console 二、 数据预处理(代码及实现步骤) a) 将数据从采集目录移动到预处理目录 需求:移动文件到预处理工作目录 实现过程描述:通过设置flume采集生成的日志文件存放的目录和预处理程序的工作目录,就可以实现两者的移动。 代码如下: #!/bin/bash

===========================================================================

程序名称:

功能描述: 移动文件到预处理工作目录

输入参数: 运行日期

目标路径: /data/weblog/preprocess/input

数据源 : flume采集数据所存放的路径: /weblog/flume-collection

代码审核:

修改人名:

修改日期:

修改原因:

修改列表:

===========================================================================

#flume采集生成的日志文件存放的目录 log_flume_dir=/weblog/flume-collection

#预处理程序的工作目录 log_pre_input=/data/weblog/preprocess/input

#获取时间信息 day_01="2013-09-18" day_01=date -d'-1 day' +%Y-%m-%d syear=date --date=$day_01 +%Y smonth=date --date=$day_01 +%m sday=date --date=$day_01 +%d

#读取日志文件的目录,判断是否有需要上传的文件 files=hadoop fs -ls $log_flume_dir | grep $day_01 | wc -l if [ $files -gt 0 ]; then hadoop fs -mv ${log_flume_dir}/${day_01} ${log_pre_input} echo "success moved ${log_flume_dir}/${day_01} to ${log_pre_input} ....." fi b) 数据清洗(5分) 需求:从第一次采集的数据筛选出有效的数据。 实现过程描述:编写一个数据清洗的mr程序 代码如下: 定义一个实体类描述源日志数据: package com.gec.mr.pre.mrbean;

import org.apache.hadoop.io.Writable;

import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;

/**

  • 对接外部数据的层,表结构定义最好跟外部数据源保持一致
  • 术语: 贴源表
  • @author

*/ public class WebLogBean implements Writable {

private boolean valid = true;// 判断数据是否合法
private String remote_addr;// 记录客户端的ip地址
private String remote_user;// 记录客户端用户名称,忽略属性"-"
private String time_local;// 记录访问时间与时区
private String request;// 记录请求的url与http协议
private String status;// 记录请求状态;成功是200
private String body_bytes_sent;// 记录发送给客户端文件主体内容大小
private String http_referer;// 用来记录从那个页面链接访问过来的
private String http_user_agent;// 记录客户浏览器的相关信息


public void set(boolean valid,String remote_addr, String remote_user, String time_local, String request, String status, String body_bytes_sent, String http_referer, String http_user_agent) {
    this.valid = valid;
    this.remote_addr = remote_addr;
    this.remote_user = remote_user;
    this.time_local = time_local;
    this.request = request;
    this.status = status;
    this.body_bytes_sent = body_bytes_sent;
    this.http_referer = http_referer;
    this.http_user_agent = http_user_agent;
}

public String getRemote_addr() {
    return remote_addr;
}

public void setRemote_addr(String remote_addr) {
    this.remote_addr = remote_addr;
}

public String getRemote_user() {
    return remote_user;
}

public void setRemote_user(String remote_user) {
    this.remote_user = remote_user;
}

public String getTime_local() {
    return this.time_local;
}

public void setTime_local(String time_local) {
    this.time_local = time_local;
}

public String getRequest() {
    return request;
}

public void setRequest(String request) {
    this.request = request;
}

public String getStatus() {
    return status;
}

public void setStatus(String status) {
    this.status = status;
}

public String getBody_bytes_sent() {
    return body_bytes_sent;
}

public void setBody_bytes_sent(String body_bytes_sent) {
    this.body_bytes_sent = body_bytes_sent;
}

public String getHttp_referer() {
    return http_referer;
}

public void setHttp_referer(String http_referer) {
    this.http_referer = http_referer;
}

public String getHttp_user_agent() {
    return http_user_agent;
}

public void setHttp_user_agent(String http_user_agent) {
    this.http_user_agent = http_user_agent;
}

public boolean isValid() {
    return valid;
}

public void setValid(boolean valid) {
    this.valid = valid;
}

/**
 * \001是hive当中默认的分隔符,不会出现用户手打出来的情况
 * @return
 */
@Override
public String toString() {
    StringBuilder sb = new StringBuilder();
    sb.append(this.valid);
    sb.append("\001").append(this.getRemote_addr());
    sb.append("\001").append(this.getRemote_user());
    sb.append("\001").append(this.getTime_local());
    sb.append("\001").append(this.getRequest());
    sb.append("\001").append(this.getStatus());
    sb.append("\001").append(this.getBody_bytes_sent());
    sb.append("\001").append(this.getHttp_referer());
    sb.append("\001").append(this.getHttp_user_agent());
    return sb.toString();
}

@Override
public void readFields(DataInput in) throws IOException {
    this.valid = in.readBoolean();
    this.remote_addr = in.readUTF();
    this.remote_user = in.readUTF();
    this.time_local = in.readUTF();
    this.request = in.readUTF();
    this.status = in.readUTF();
    this.body_bytes_sent = in.readUTF();
    this.http_referer = in.readUTF();
    this.http_user_agent = in.readUTF();

}

@Override
public void write(DataOutput out) throws IOException {
    out.writeBoolean(this.valid);
    out.writeUTF(null==remote_addr?"":remote_addr);
    out.writeUTF(null==remote_user?"":remote_user);
    out.writeUTF(null==time_local?"":time_local);
    out.writeUTF(null==request?"":request);
    out.writeUTF(null==status?"":status);
    out.writeUTF(null==body_bytes_sent?"":body_bytes_sent);
    out.writeUTF(null==http_referer?"":http_referer);
    out.writeUTF(null==http_user_agent?"":http_user_agent);

}

} 编写处理数据清洗的工具类: package com.gec.mr.pre;

import com.gec.mr.pre.mrbean.WebLogBean;

import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Locale; import java.util.Set;

public class WebLogParser {

public static SimpleDateFormat df1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US);
public static SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);

public static WebLogBean parser(String line) {
    WebLogBean webLogBean = new WebLogBean();
    //通过空格来对我们的数据进行切割,然后拼接字符串,将我们同一个字段里面的数据拼接到一起
    //222.66.59.174  -- [18/Sep/2013:06:53:30 +0000] "GET /images/my.jpg HTTP/1.1" 200 19939 "http://www.angularjs.cn/A00n" "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:23.0) Gecko/20100101 Firefox/23.0"
    String[] arr = line.split(" ");
    if (arr.length > 11) {
        webLogBean.setRemote_addr(arr[0]);
        webLogBean.setRemote_user(arr[1]);
        //将我们的字符串转换成中文习惯的字符串
        //  [18/Sep/2013:06:52:32 +0000]
        //   18/Sep/2013:06:52:32------》2013-09-18 06:52:32
        String time_local = formatDate(arr[3].substring(1));
        if(null==time_local || "".equals(time_local)) {
            time_local="-invalid_time-";
        }

        webLogBean.setTime_local(time_local);
        webLogBean.setRequest(arr[6]);
        webLogBean.setStatus(arr[8]);
        webLogBean.setBody_bytes_sent(arr[9]);
        webLogBean.setHttp_referer(arr[10]);

        //如果useragent元素较多,拼接useragent。
        //数组长度大于12,说明我们的最后一个字段切出来比较长,我们把所有多的数据都塞到最后一个字段里面去
        //  "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; .NET CLR 1.1.4322; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; MDDR; InfoPath.2; .NET4.0C)"
        if (arr.length > 12) {
            StringBuilder sb = new StringBuilder();
            for(int i=11;i<arr.length;i++){
                sb.append(arr[i]);
            }
            webLogBean.setHttp_user_agent(sb.toString());
        } else {
            webLogBean.setHttp_user_agent(arr[11]);
        }
        //如果请求状态码大于400值,就认为是请求出错了,请求出错的数据直接认为是无效数据
        if (Integer.parseInt(webLogBean.getStatus()) >= 400) {// 大于400,HTTP错误
            webLogBean.setValid(false);
        }

        //如果获取时间没拿到,那么也是认为是无效的数据
        if("-invalid_time-".equals(webLogBean.getTime_local())){
            webLogBean.setValid(false);
        }
    } else {
        //58.215.204.118 - - [18/Sep/2013:06:52:33 +0000] "-" 400 0 "-" "-"
        //如果切出来的数组长度小于11个,说明数据不全,,直接丢掉
        webLogBean=null;
    }

    return webLogBean;
}

public static void filtStaticResource(WebLogBean bean, Set<String> pages) {
    if (!pages.contains(bean.getRequest())) {
        bean.setValid(false);
    }
}
//格式化时间方法
public static String formatDate(String time_local) {
    try {
        return df2.format(df1.parse(time_local));
    } catch (ParseException e) {
        return null;
    }

}

} 配置mr类: package com.gec.mr.pre.mapper;

import com.gec.mr.pre.mrbean.WebLogBean; import com.gec.mr.pre.utils.WebLogParser; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException; import java.util.HashSet; import java.util.Set;

public class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> { // 用来存储网站url分类数据 Set pages = new HashSet(); Text k = new Text(); NullWritable v = NullWritable.get(); /** * map阶段的初始化方法 * 从外部配置文件中加载网站的有用url分类数据 存储到maptask的内存中,用来对日志数据进行过滤 * 过滤掉我们日志文件当中的一些静态资源,包括js css img 等请求日志都需要过滤掉 */ @Override protected void setup(Context context) throws IOException, InterruptedException { //定义一个集合 pages.add("/about"); pages.add("/black-ip-list/"); pages.add("/cassandra-clustor/"); pages.add("/finance-rhive-repurchase/"); pages.add("/hadoop-family-roadmap/"); pages.add("/hadoop-hive-intro/"); pages.add("/hadoop-zookeeper-intro/"); pages.add("/hadoop-mahout-roadmap/");

}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    //得到我们一行数据
    String line = value.toString();
    WebLogBean webLogBean = WebLogParser.parser(line);
    if (webLogBean != null) {
        // 过滤js/图片/css等静态资源
        WebLogParser.filtStaticResource(webLogBean, pages);
        if (!webLogBean.isValid()) return;
        k.set(webLogBean.toString());
        context.write(k, v);
    }
}

} Driver: package com.gec.mr.pre.driver;

import com.gec.mr.pre.mapper.WeblogPreProcessMapper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;

public class WeblogEtlPreProcessDriver {

static {
    try {
        // 设置 HADOOP_HOME 目录
        System.setProperty("hadoop.home.dir", "D:/winutils-master/winutils-master/hadoop-3.0.0/");
        // 加载库文件
        System.load("D:/winutils-master/winutils-master/hadoop-3.0.0/bin/hadoop.dll");
    } catch (UnsatisfiedLinkError e) {
        System.err.println("Native code library failed to load.\n" + e);
        System.exit(1);
    }
}


public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    Configuration configuration = new Configuration();
    Job job = Job.getInstance(configuration);

    FileInputFormat.addInputPath(job,new Path("file:///d:\\src\\input"));
    job.setInputFormatClass(TextInputFormat.class);
    FileOutputFormat.setOutputPath(job,new Path("file:///d:\\src\\weblogPreOut2"));
    job.setOutputFormatClass(TextOutputFormat.class);
    job.setJarByClass(WeblogEtlPreProcessDriver.class);

    job.setMapperClass(WeblogPreProcessMapper.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);
    job.setNumReduceTasks(0);
    boolean res = job.waitForCompletion(true);
}

} c) 数据点击流数据模型生成 需求:将点击流数据存到hdfs。 实现过程描述:通过编写clickstreampageview,实现点击流访问数据 代码如下:
核心类Reducer: package com.gec.mr.pre.reducer;

import com.gec.mr.pre.mrbean.WebLogBean; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.shaded.org.apache.commons.beanutils.BeanUtils;

import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*;

public class ClickStreamReducer extends Reducer<Text, WebLogBean, NullWritable, Text> {

Text v = new Text();

/**
 * reduce阶段接收到的key就是我们的IP
 * 接收到的value就是我们一行行的数据
 * @param key
 * @param values
 * @param context
 * @throws IOException
 * @throws InterruptedException
 */
@Override
protected void reduce(Text key, Iterable<WebLogBean> values, Context context) throws IOException, InterruptedException
{
    ArrayList<WebLogBean> beans = new ArrayList<WebLogBean>();
    // 先将一个用户的所有访问记录中的时间拿出来排序
    try {
        //循环遍历V2,这里面装的,都是我们的同一个用的数据
        for (WebLogBean bean : values) {
            //	beans.add(bean);
            //为什么list集合当中不能直接添加循环出来的这个bean?
            //这里通过属性拷贝,每次new  一个对象,避免了bean的属性值每次覆盖
            //这是涉及到java的深浅拷贝问题
            WebLogBean webLogBean = new WebLogBean();
            try {
                BeanUtils.copyProperties(webLogBean, bean);
            } catch(Exception e) {
                e.printStackTrace();
            }
            //beans.add(bean);
            beans.add(webLogBean);
        }
        //将bean按时间先后顺序排序,排好序之后,就计算这个集合当中下一个时间和上一个时间的差值 ,如
        //如果差值小于三十分钟,那么就代表一次会话,如果差值大于30分钟,那么就代表多次会话
        //将我们的weblogBean塞到一个集合当中,我们就可以自定义排序,对集合当中的数据进行排序
        Collections.sort(beans, new Comparator<WebLogBean>() {
            @Override
            public int compare(WebLogBean o1, WebLogBean o2) {
                try {
                    Date d1 = toDate(o1.getTime_local());
                    Date d2 = toDate(o2.getTime_local());
                    if (d1 == null || d2 == null)
                        return 0;
                    return d1.compareTo(d2);
                } catch (Exception e) {
                    e.printStackTrace();
                    return 0;
                }
            }

        });

        /**
         * 以下逻辑为:从有序bean中分辨出各次visit,并对一次visit中所访问的page按顺序标号step
         * 核心思想:
         * 就是比较相邻两条记录中的时间差,如果时间差<30分钟,则该两条记录属于同一个session
         * 否则,就属于不同的session
         *
         */

        int step = 1;
        //定义一个uuid作为我们的session编号
        String session = UUID.randomUUID().toString();
        ///经过排序之后,集合里面的数据都是按照时间来排好序了
        for (int i = 0; i < beans.size(); i++) {
            WebLogBean bean = beans.get(i);
            // 如果仅有1条数据,则直接输出
            if (1 == beans.size()) {

                // 设置默认停留时长为60s
                v.set(session+"\001"+key.toString()+"\001"+bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001"
                        + bean.getStatus());
                context.write(NullWritable.get(), v);
                session = UUID.randomUUID().toString();
                break;
            }

            // 如果不止1条数据,则将第一条跳过不输出,遍历第二条时再输出
            if (i == 0) {
                continue;
            }
            // 求近两次时间差
            long timeDiff = timeDiff(toDate(bean.getTime_local()), toDate(beans.get(i - 1).getTime_local()));
            // 如果本次-上次时间差<30分钟,则输出前一次的页面访问信息
            if (timeDiff < 30 * 60 * 1000) {

                v.set(session+"\001"+key.toString()+"\001"+beans.get(i - 1).getRemote_user() + "\001" + beans.get(i - 1).getTime_local() + "\001" + beans.get(i - 1).getRequest() + "\001" + step + "\001" + (timeDiff / 1000) + "\001" + beans.get(i - 1).getHttp_referer() + "\001"
                        + beans.get(i - 1).getHttp_user_agent() + "\001" + beans.get(i - 1).getBody_bytes_sent() + "\001" + beans.get(i - 1).getStatus());
                context.write(NullWritable.get(), v);
                step++;
            } else {
                // 如果本次-上次时间差>30分钟,则输出前一次的页面访问信息且将step重置,以分隔为新的visit
                v.set(session+"\001"+key.toString()+"\001"+beans.get(i - 1).getRemote_user() + "\001" + beans.get(i - 1).getTime_local() + "\001" + beans.get(i - 1).getRequest() + "\001" + (step) + "\001" + (60) + "\001" + beans.get(i - 1).getHttp_referer() + "\001"
                        + beans.get(i - 1).getHttp_user_agent() + "\001" + beans.get(i - 1).getBody_bytes_sent() + "\001" + beans.get(i - 1).getStatus());
                context.write(NullWritable.get(), v);
                // 输出完上一条之后,重置step编号
                step = 1;
                session = UUID.randomUUID().toString();
            }

            // 如果此次遍历的是最后一条,则将本条直接输出
            if (i == beans.size() - 1) {
                // 设置默认停留市场为60s
                v.set(session+"\001"+key.toString()+"\001"+bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001" + bean.getStatus());
                context.write(NullWritable.get(), v);
            }
        }

    } catch (ParseException e) {
        e.printStackTrace();

    }
}

private String toStr(Date date) {
    SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
    return df.format(date);
}

private Date toDate(String timeStr) throws ParseException {
    SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
    return df.parse(timeStr);
}

private long timeDiff(String time1, String time2) throws ParseException {
    Date d1 = toDate(time1);
    Date d2 = toDate(time2);
    return d1.getTime() - d2.getTime();

}

private long timeDiff(Date time1, Date time2) throws ParseException {
    // date  调用 getTime获取毫秒值
    return time1.getTime() - time2.getTime();

}

}
d) 数据点击访问页面数据模型生成 需求:生成点击流访问数据 实现过程描述:通过编写ClickStreamVisit,实现模型生成。 代码如下: 核心类reducer: package com.gec.mr.pre.reducer;

import com.gec.mr.pre.mrbean.PageViewsBean; import com.gec.mr.pre.mrbean.VisitBean; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.shaded.org.apache.commons.beanutils.BeanUtils;

import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator;

public class ClickStreamVisitReducer extends Reducer<Text, PageViewsBean, NullWritable, VisitBean> {

@Override
protected void reduce(Text session, Iterable<PageViewsBean> pvBeans, Context context) throws IOException, InterruptedException {

    // 将pvBeans按照step排序
    ArrayList<PageViewsBean> pvBeansList = new ArrayList<PageViewsBean>();
    for (PageViewsBean pvBean : pvBeans) {
        PageViewsBean bean = new PageViewsBean();
        try {
            BeanUtils.copyProperties(bean, pvBean);
            pvBeansList.add(bean);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * 将数据按照我们的步骤进行排序,这样就可以得到哪个页面先访问,哪个页面后访问的
     */
    Collections.sort(pvBeansList, new Comparator<PageViewsBean>() {
        @Override
        public int compare(PageViewsBean o1, PageViewsBean o2) {
            return o1.getStep() > o2.getStep() ? 1 : -1;
        }
    });

    // 取这次visit的首尾pageview记录,将数据放入VisitBean中
    VisitBean visitBean = new VisitBean();
    // 取visit的首记录
    visitBean.setInPage(pvBeansList.get(0).getRequest());
    visitBean.setInTime(pvBeansList.get(0).getTimestr());
    // 取visit集合当中末尾的记录即可
    visitBean.setOutPage(pvBeansList.get(pvBeansList.size() - 1).getRequest());
    visitBean.setOutTime(pvBeansList.get(pvBeansList.size() - 1).getTimestr());
    // visit访问的页面数
    visitBean.setPageVisits(pvBeansList.size());
    // 来访者的ip
    visitBean.setRemote_addr(pvBeansList.get(0).getRemote_addr());
    // 本次visit的referal
    visitBean.setReferal(pvBeansList.get(0).getReferal());
    visitBean.setSession(session.toString());
    context.write(NullWritable.get(), visitBean);
}

}
三、 将数据导入hive(代码及实现步骤) a) 将清洗后的数据导入Hive 需求:将数据导入hive 实现过程描述:首先生成hive数据表,用load data语句将数据从hdfs导入hive

代码如下:drop table if exists ods_weblog_origin; create table ods_weblog_origin( valid string, remote_addr string, remote_user string, time_local string, request string, status string, body_bytes_sent string, http_referer string, http_user_agent string) partitioned by (datestr string) row format delimited fields terminated by '\001'; pageview: drop table if exists ods_click_pageviews; create table ods_click_pageviews( session string, remote_addr string, remote_user string, time_local string, request string, visit_step string, page_staylong string, http_referer string, http_user_agent string, body_bytes_sent string, status string) partitioned by (datestr string) row format delimited fields terminated by '\001'; visit: drop table if exists ods_click_stream_visit; create table ods_click_stream_visit( session string, remote_addr string, inTime string, outTime string, inPage string, outPage string, referal string, pageVisits int) partitioned by (datestr string) row format delimited fields terminated by '\001';
hadoop jar etlmr-1.0-SNAPSHOT.jar com.gec.mr.pre.driver.WeblogEtlPreProcessDriver /data/weblog/preprocess/input/2021-12-09 /data/weblog/preprocess/weblogPreOut/2021-12-09

hadoop jar clickstreamdata-1.0-SNAPSHOT.jar com.gec.mr.pre.driver.ClickStreamDriver /data/weblog/preprocess/weblogPreOut/2021-12-10 /data/weblog/preprocess/pageViewOut/2021-12-15

hadoop jar visitstreamdata-1.0-SNAPSHOT.jar com.gec.mr.pre.driver.ClickStreamVisitDriver /data/weblog/preprocess/pageViewOut/2021-12-15 /data/weblog/preprocess/clickStreamVisit/2021-12-15

load data inpath '/data/weblog/preprocess/weblogPreOut/2021-12-10/' overwrite into table ods_weblog_origin partition(datestr='2021-12-10');

b) 将点击页面流数据导入Hive 需求:将点击页面流数据导入Hive 实现过程描述:用load data语句将数据从hdfs导入hive 代码如下: load data inpath '/data/weblog/preprocess/pageViewOut/2021-12-15' overwrite into table ods_click_pageviews partition(datestr='2021-12-15');
c) 将点击流访问表数据导入Hive 需求:将点击流访问表数据导入Hive 实现过程描述:用load data语句将数据从hdfs导入hive 代码如下:
load data inpath '/data/weblog/preprocess/clickStreamVisit/2021-12-15' overwrite into table ods_click_stream_visit partition(datestr='2021-12-15'); 四、 数据导入(代码及实现步骤) a) 基于sqoop将数据导出mysql 需求:将hive里面的数据导入到关系型数据库mysql 实现过程描述:创建关系型数据库并建表,用sqoop工具连接mysql,创建三个ods表分别算每天的pvs值,指定日期的每个小时的pvs值,每天的page的pvs值。 代码如下:

bin/sqoop export --connect jdbc:mysql://192.168.94.121:3306/weblogs --username root --password 123456 --m 1 --export-dir /user/hive/warehouse/dw_pvs_everyday --table dw_pvs_everyday --input-fields-terminated-by '\001' bin/sqoop export --connect jdbc:mysql://192.168.94.121:3306/weblogs --username root --password 123456 --m 1 --export-dir /user/hive/warehouse/dw_pvs_everyhour_oneday/datestr=2021-12-15 --table dw_pvs_everyhour_oneday --input-fields-terminated-by '\001' bin/sqoop export --connect jdbc:mysql://192.168.94.121:3306/weblogs --username root --password 123456 --m 1 --export-dir /user/hive/warehouse/dw_pvs_request_everyday --table dw_pvs_request_page --input-fields-terminated-by '\001'
五、 定制开发web图表(代码及实现步骤)
a) 每天访问量的pvs值(折线图) 需求:用折线图表示出每天访问量的pvs值 实现过程描述:在idea里面通过导入echarts,编写html文件连通mysql即可。 代码如下: Html:

<title>每天访问量的pvs值</title> <script src="/js/echarts.js"></script>
<script type="text/javascript" th:inline="javascript">
//在js读取thymeleaf变量值
var data1=[[${dayJson}]];
var data2=[[${pvsListJson}]];

// 基于准备好的dom,初始化echarts实例
var myChart = echarts.init(document.getElementById('main'));


var option = {
    title: {
        text: '每天访问量pvs值折线图(x轴为某月的其中几天)'
    },
    xAxis: {
        tooltip: {},
        legend: {
            data: ['页面pvs值']
        },
        type: 'category',
        // data: ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
        data: JSON.parse(data1)
    },
    yAxis: {
        type: 'value'
    },
    series: [
        {
            // data: [120, 200, 150, 80, 70, 110, 130],
            data: JSON.parse(data2),
            type: 'line',
            symbol: 'triangle',
            symbolSize: 20,
            lineStyle: {
                color: '#5470C6',
                width: 4,
                type: 'dashed'
            },
            itemStyle: {
                borderWidth: 3,
                borderColor: '#EE6666',
                color: 'yellow'
            }
        }
    ]
};

myChart.setOption(option);
</script> b) 指定某天的每小时的pvs(折线图) 需求:用折线图指定某天的每小时的pvs 实现过程描述:在idea里面通过导入echarts,编写html文件连通mysql即可 代码如下: Html: <title>每天访问量的pvs值</title> <script src="/js/echarts.js"></script>
<script type="text/javascript" th:inline="javascript">
//在js读取thymeleaf变量值
var data1=[[${hourJson}]];
var data2=[[${pvsListJson}]];

// 基于准备好的dom,初始化echarts实例
var myChart = echarts.init(document.getElementById('main'));


var option = {
    title: {
        text: '每小时页面pvs值折线图(x轴为某整点)'
    },
    tooltip: {},
    legend: {
        data: ['页面pvs值']
    },
    xAxis: {
        type: 'category',
        // data: ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
        data: JSON.parse(data1)
    },
    yAxis: {
        type: 'value'
    },
    series: [
        {
            // data: [150, 230, 224, 218, 135, 147, 260],
            data: JSON.parse(data2),
            type: 'line'
        }
    ]
};

myChart.setOption(option);
</script>

c) 统计页面pvs值(柱状图) 需求:用柱状图统计页面pvs值 实现过程描述:在idea里面通过导入echarts,编写html文件连通mysql即可 代码如下: Html:

<title>点击流数据图表</title> <script src="/js/echarts.js"></script>
<script type="text/javascript" th:inline="javascript">
//在js读取thymeleaf变量值
var data1=[[${pageJson}]];
var data2=[[${pvsJson}]];


// 基于准备好的dom,初始化echarts实例
var myChart = echarts.init(document.getElementById('main'));
// 指定图表的配置项和数据
var option = {
    title: {
        text: '数据可视化'
    },
    tooltip: {},
    legend: {
        data: ['页面的pvs']
    },
    xAxis: {
        data: JSON.parse(data1)
    },
    yAxis: {},
    series: [
        {
            name: '页面',
            type: 'bar',
            data: JSON.parse(data2)
        }
    ]
};

/* var option = {
     series: [
         {
             type: 'pie',
             data: [
                 {
                     value: 335,
                     name: '直接访问'
                 },
                 {
                     value: 234,
                     name: '联盟广告'
                 },
                 {
                     value: 1548,
                     name: '搜索引擎'
                 }
             ],
             radius: '50%'
         }
     ]
 };*/
// 使用刚指定的配置项和数据显示图表。
myChart.setOption(option);
</script> 六、 Azkaban运行项目(代码及实现步骤) a) 编写运行job脚本 需求:为了更好的工作调度 实现过程描述:配置好Azkaban,再将job脚本编写到Azkaban。此脚本是在linux打包到windows再布置到azkaban。 代码如下: 1.job: type=command command=sh 1.sh 2.job: type=command command=hadoop jar etlmr-1.0-SNAPSHOT.jar com.gec.mr.pre.driver.WeblogEtlPreProcessDriver /data/weblog/preprocess/input/2021-12-12 /data/weblog/preprocess/weblogPreOut/2021-12-12 dependencies=1 3.job: type=command command=hadoop jar clickstreamdata-1.0-SNAPSHOT.jar com.gec.mr.pre.driver.ClickStreamDriver /data/weblog/preprocess/weblogPreOut/2021-12-12 /data/weblog/preprocess/pageViewOut/2021-12-12 dependencies=2 4.job: type=command command=hadoop jar visitstreamdata-1.0-SNAPSHOT.jar com.gec.mr.pre.driver.ClickStreamVisitDriver /data/weblog/preprocess/pageViewOut/2021-12-12 /data/weblog/preprocess/clickStreamVisit/2021-12-12 dependencies=3 5.job: type=command command=sh 5.sh dependencies=4 6.job: type=command command=/home/hadoop/bigdatasoftware/hive/bin/hive -f "6.sql" dependencies=5 7.job: type=command command=sh 7.sh dependencies=6 b) 运行azkaban项目 需求:为了很好地组织起这样的复杂执行计划,需要一个工作流调度系统来调度执行(azkaban)。只需要一键即可开始运行布置的整个流程。 实现过程描述:配置好job脚本,运行前将data里面的数据移到日志采集目录,直接运行即可。不过时间接近两个小时。最后数据库有新增加的数据,成功运行项目。
木兰宽松许可证, 第2版 木兰宽松许可证, 第2版 2020年1月 http://license.coscl.org.cn/MulanPSL2 您对“软件”的复制、使用、修改及分发受木兰宽松许可证,第2版(“本许可证”)的如下条款的约束: 0. 定义 “软件”是指由“贡献”构成的许可在“本许可证”下的程序和相关文档的集合。 “贡献”是指由任一“贡献者”许可在“本许可证”下的受版权法保护的作品。 “贡献者”是指将受版权法保护的作品许可在“本许可证”下的自然人或“法人实体”。 “法人实体”是指提交贡献的机构及其“关联实体”。 “关联实体”是指,对“本许可证”下的行为方而言,控制、受控制或与其共同受控制的机构,此处的控制是指有受控方或共同受控方至少50%直接或间接的投票权、资金或其他有价证券。 1. 授予版权许可 每个“贡献者”根据“本许可证”授予您永久性的、全球性的、免费的、非独占的、不可撤销的版权许可,您可以复制、使用、修改、分发其“贡献”,不论修改与否。 2. 授予专利许可 每个“贡献者”根据“本许可证”授予您永久性的、全球性的、免费的、非独占的、不可撤销的(根据本条规定撤销除外)专利许可,供您制造、委托制造、使用、许诺销售、销售、进口其“贡献”或以其他方式转移其“贡献”。前述专利许可仅限于“贡献者”现在或将来拥有或控制的其“贡献”本身或其“贡献”与许可“贡献”时的“软件”结合而将必然会侵犯的专利权利要求,不包括对“贡献”的修改或包含“贡献”的其他结合。如果您或您的“关联实体”直接或间接地,就“软件”或其中的“贡献”对任何人发起专利侵权诉讼(包括反诉或交叉诉讼)或其他专利维权行动,指控其侵犯专利权,则“本许可证”授予您对“软件”的专利许可自您提起诉讼或发起维权行动之日终止。 3. 无商标许可 “本许可证”不提供对“贡献者”的商品名称、商标、服务标志或产品名称的商标许可,但您为满足第4条规定的声明义务而必须使用除外。 4. 分发限制 您可以在任何媒介中将“软件”以源程序形式或可执行形式重新分发,不论修改与否,但您必须向接收者提供“本许可证”的副本,并保留“软件”中的版权、商标、专利及免责声明。 5. 免责声明与责任限制 “软件”及其中的“贡献”在提供时不带任何明示或默示的担保。在任何情况下,“贡献者”或版权所有者不对任何人因使用“软件”或其中的“贡献”而引发的任何直接或间接损失承担责任,不论因何种原因导致或者基于何种法律理论,即使其曾被建议有此种损失的可能性。 6. 语言 “本许可证”以中英文双语表述,中英文版本具有同等法律效力。如果中英文版本存在任何冲突不一致,以中文版为准。 条款结束 如何将木兰宽松许可证,第2版,应用到您的软件 如果您希望将木兰宽松许可证,第2版,应用到您的新软件,为了方便接收者查阅,建议您完成如下三步: 1, 请您补充如下声明中的空白,包括软件名、软件的首次发表年份以及您作为版权人的名字; 2, 请您在软件包的一级目录下创建以“LICENSE”为名的文件,将整个许可证文本放入该文件中; 3, 请将如下声明文本放入每个源文件的头部注释中。 Copyright (c) [Year] [name of copyright holder] [Software Name] is licensed under Mulan PSL v2. You can use this software according to the terms and conditions of the Mulan PSL v2. You may obtain a copy of Mulan PSL v2 at: http://license.coscl.org.cn/MulanPSL2 THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. See the Mulan PSL v2 for more details. Mulan Permissive Software License,Version 2 Mulan Permissive Software License,Version 2 (Mulan PSL v2) January 2020 http://license.coscl.org.cn/MulanPSL2 Your reproduction, use, modification and distribution of the Software shall be subject to Mulan PSL v2 (this License) with the following terms and conditions: 0. Definition Software means the program and related documents which are licensed under this License and comprise all Contribution(s). Contribution means the copyrightable work licensed by a particular Contributor under this License. Contributor means the Individual or Legal Entity who licenses its copyrightable work under this License. Legal Entity means the entity making a Contribution and all its Affiliates. Affiliates means entities that control, are controlled by, or are under common control with the acting entity under this License, ‘control’ means direct or indirect ownership of at least fifty percent (50%) of the voting power, capital or other securities of controlled or commonly controlled entity. 1. Grant of Copyright License Subject to the terms and conditions of this License, each Contributor hereby grants to you a perpetual, worldwide, royalty-free, non-exclusive, irrevocable copyright license to reproduce, use, modify, or distribute its Contribution, with modification or not. 2. Grant of Patent License Subject to the terms and conditions of this License, each Contributor hereby grants to you a perpetual, worldwide, royalty-free, non-exclusive, irrevocable (except for revocation under this Section) patent license to make, have made, use, offer for sale, sell, import or otherwise transfer its Contribution, where such patent license is only limited to the patent claims owned or controlled by such Contributor now or in future which will be necessarily infringed by its Contribution alone, or by combination of the Contribution with the Software to which the Contribution was contributed. The patent license shall not apply to any modification of the Contribution, and any other combination which includes the Contribution. If you or your Affiliates directly or indirectly institute patent litigation (including a cross claim or counterclaim in a litigation) or other patent enforcement activities against any individual or entity by alleging that the Software or any Contribution in it infringes patents, then any patent license granted to you under this License for the Software shall terminate as of the date such litigation or activity is filed or taken. 3. No Trademark License No trademark license is granted to use the trade names, trademarks, service marks, or product names of Contributor, except as required to fulfill notice requirements in Section 4. 4. Distribution Restriction You may distribute the Software in any medium with or without modification, whether in source or executable forms, provided that you provide recipients with a copy of this License and retain copyright, patent, trademark and disclaimer statements in the Software. 5. Disclaimer of Warranty and Limitation of Liability THE SOFTWARE AND CONTRIBUTION IN IT ARE PROVIDED WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED. IN NO EVENT SHALL ANY CONTRIBUTOR OR COPYRIGHT HOLDER BE LIABLE TO YOU FOR ANY DAMAGES, INCLUDING, BUT NOT LIMITED TO ANY DIRECT, OR INDIRECT, SPECIAL OR CONSEQUENTIAL DAMAGES ARISING FROM YOUR USE OR INABILITY TO USE THE SOFTWARE OR THE CONTRIBUTION IN IT, NO MATTER HOW IT’S CAUSED OR BASED ON WHICH LEGAL THEORY, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGES. 6. Language THIS LICENSE IS WRITTEN IN BOTH CHINESE AND ENGLISH, AND THE CHINESE VERSION AND ENGLISH VERSION SHALL HAVE THE SAME LEGAL EFFECT. IN THE CASE OF DIVERGENCE BETWEEN THE CHINESE AND ENGLISH VERSIONS, THE CHINESE VERSION SHALL PREVAIL. END OF THE TERMS AND CONDITIONS How to Apply the Mulan Permissive Software License,Version 2 (Mulan PSL v2) to Your Software To apply the Mulan PSL v2 to your work, for easy identification by recipients, you are suggested to complete following three steps: i Fill in the blanks in following statement, including insert your software name, the year of the first publication of your software, and your name identified as the copyright owner; ii Create a file named “LICENSE” which contains the whole context of this License in the first directory of your software package; iii Attach the statement to the appropriate annotated syntax at the beginning of each source file. Copyright (c) [Year] [name of copyright holder] [Software Name] is licensed under Mulan PSL v2. You can use this software according to the terms and conditions of the Mulan PSL v2. You may obtain a copy of Mulan PSL v2 at: http://license.coscl.org.cn/MulanPSL2 THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. See the Mulan PSL v2 for more details.

简介

网站流量日志数据分析系统 展开 收起
MulanPSL-2.0
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
1
https://gitee.com/jaywalker/network-log-data-collection-sys.git
git@gitee.com:jaywalker/network-log-data-collection-sys.git
jaywalker
network-log-data-collection-sys
network-log-data-collection-sys
master

搜索帮助