[toc]
HDFS
MapReduce
YARN
文件系统namespace
副本机制data replication
![hdfsdatanodes](/Users/andy/Library/Application Support/typora-user-images/hdfsdatanodes.png)
mkdir software # 存放课程使用的软件包
mkdir app # 存放课程所有软件的安装目录
mkdir data # 数据
mkdir lib # 开发过的作业jar
mkdir shell # 脚本
mkdir maven_resp # 课程中使用到的maven依赖
sudo -i # 从hadoop用户切换到root用户
su hadoop # 从root切换到hadoop
部署方式
模式 | 机器数 | 进程数 |
---|---|---|
本地 | 1 | 1 |
伪分布式 | 1 | 多个 |
集群 | 多台 | 多个 |
Hadoop安装前置要求
ssh
ssh-keygen -t rsa
cd .ssh
cat id_rsa.pub >> authorized_keys
chmod 600 authorized_keys
wget https://download.oracle.com/otn/java/jdk/8u241-b07/1f5b5a70bf22433b84d0e960903adac8/jdk-8u241-linux-x64.tar.gz
vim .bash_profile
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.242.b08-0.el7_7.x86_64/jre
export PATH=$JAVA_HOME/bin:$PATH
export JAVA_HOME=/home/hadoop/app/jdk1.8.0_91
export PATH=$JAVA_HOME/bin:$PATH
export HADOOP_HOME=/home/hadoop/app/hadoop-2.6.0-cdh5.15.1
export PATH=$HADOOP_HOME/bin:$PATH
export HIVE_HOME=/home/hadoop/app/hive-1.1.0-cdh5.15.1
export PATH=$HIVE_HOME/bin:$PATH
vi hadoop-env.sh
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.242.b08-0.el7_7.x86_64/jre
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop000:8020</value>
</property>
<-- 回收站 生产上必须要回收站,且回收站默认时间尽量长,7天 --/>
<property>
<name>fs.trash.interval</name>
<value>10080</value>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/home/hadoop/tmp/dfs/data</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>/home/hadoop/tmp/dfs/name</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/home/hadoop/tmp</value>
</property>
<property>
<name>dfs.encrypt.data.transfer</name>
<value>false</value>
</property>
<property>
<name>dfs.data.transfer.protection</name>
<value>authentication</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hadoop000</value>
</property>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
# 修改
hadoop000
# 第一次执行时 一定要格式化文件系统,不要重复执行
hdfs namenode -format
# 启动集群
cd sbin
./start-dfs.sh
# 验证 方法1
jps
# 验证 方法2
# http://hadoop000:50070
## 如果发现jps ok, 但是浏览器不ok?大概率是防火墙问题
## 检查防火墙
firewall-cmd --state
systemctl stop firewall.service
# 启动yarn
./sbin/start-yarn.sh
# 启动yarn historyserver
./mr-jobhistory-daemon.sh start historyserver
hadoop fs -ls /
hadoop fs -put
hadoop fs -copyFromLocal
hadoop fs -moveFromLocal
hadoop fs -cat
hadoop fs -text
hadoop fs -get
hadoop fs -mkdir
hadoop fs -mv
hadoop fs -getmerge
hadoop fs -rm
hadoop fs -rmdir
hadoop fs -rm -r
hadoop fsck / # 健康检查
1-本rack的一个节点
2-另外一个rack的节点
3-与2相同rack的另外一个节点
1-本rack的一个节点
2-本rack的另一个节点
3-不同rack的一个节点
curl http://hadoop000:50070/jmx
http://hadoop000:50070/jmx?qry=Hadoop:service=NameNode,name=NameNodeInfo
http://hadoop000:50070/jmx?qry=Hadoop:service=NameNode,name=JvmMetrics
http://hadoop000:50070/jmx?qry=Hadoop:service=NameNode,name=FSNamesystem
![IMG_0187](/Users/andy/Library/Application Support/typora-user-images/IMG_0187.PNG)
Yet Another Resource Negotiator
通用资源管理系统
为上层应用提供统一的资源管理和调度
master:resource management =》ResourceManager (RM)
job scheduling/monitoring =》per-application ApplicationMaster (AM)
slave:NodeManager (NM) ![yarn_architecture](/Users/andy/Library/Application Support/typora-user-images/yarn_architecture.gif)
Client、ResourceManager、NodeManager、ApplicationMaster
master/slave:RM/NM
Client:向RM提交任务,杀死任务等
ApplicationMaster
NodeManager:多个
ResourceManager:集群中同一时刻对外提供服务的只有1个,负责资源相关
container:任务的运行抽象
# 在本地项目目录下打包成jar
# windows/MAc/Linux ==> Maven
mvn clean package -DskipTests
# jar上传到服务器上
scp target/hadoop-train-v2-1.0.jar hadoop000:~/lib/
# 数据传上服务器
scp …
# 执行作业 hadoop jar xxx.jar 完整的类名 args 。。。。
hadoop jar hadoop-train-v2-1.0.jar com.imooc.bigdata.hadoop.mr.access.AccessYarnApp /access/input/access.log /access/out2/
# 到YARN url:8088 上去观察作业
# 到输出目录查看
var executorMemory = 1024 // 默认值,1024MB
val MEMORY_OVERHEAD_FACTOR = 0.10 // OverHead 比例参数,默认0.1
val MEMORY_OVERHEAD_MIN = 384
val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
// 假设有设置参数,即获取参数,否则使用executorMemoryOverhead 的默认值
val executorMem = args.executorMemory + executorMemoryOverhead
// 最终分配的executor 内存为 两部分的和
// 但是还有个规整化因子
hadoop000:192.168.101.31 NN DN hadoop001:192.168.101.3x DN NM hadoop002:192.168.101.3y DN NM
# 修改hostname(hodoop000/hadoop001….)
vi /etc/hostname
# 修改ip和hostname映射关系
vi /etc/hosts
192.168.101.31 hadoop000
192.168.101.3x hadoop001
192.168.101.3y hadoop002
192.168.101.31 localhost
ssh-keygen -t rsa
ssh-copy-id -i /home/hadoop/.ssh/id_rsa.pub hadoop000
ssh-copy-id -i /home/hadoop/.ssh/id_rsa.pub hadoop001
ssh-copy-id -i /home/hadoop/.ssh/id_rsa.pub hadoop002
# 解压
tar -zvxf jdk-8u91-linux-x64.tar.gz -C ~/app/
# 添加系统环境变量
# .bash_profile
export JAVA_HOME=/home/hadoop/app/jdk1.8.0_91
export PATH=$JAVA_HOME/bin:$PATH
# step3-1 拷贝到其他节点(从hadoop000出发)
scp -r ~/app/jdk1.8.0_91/ hadoop@hadoop001:~/app/
scp -r ~/app/jdk1.8.0_91/ hadoop@hadoop002:~/app/
# step3-2 拷贝环境变量
scp -r ~/.bash_profile hadoop@hadoop001:~/
scp -r ~/.bash_profile hadoop@hadoop002:~/
## master
# 解压
tar -zxvf ~/software/hadoop-2.6.0-cdh5.15.1.tar.gz -C ~/app/
# hadoop配置文件
vi ~/app/hadoop-2.6.0-cdh5.15.1/etc/hadoop/hadoop-env.sh
# step 1 => 配置JAVA_HOME 其实也可以不用操作
export JAVA_HOME=/home/hadoop/app/jdk1.8.0_91
# step 2 => 配置core-site.xml
vi core-site.xml
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop000:8020</value>
</property>
# step 3 => hdfs-site.xml
# 不用改
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
# step 4 => yarn-site.xml
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hadoop000</value>
</property>
# step 5 => mapred-site.xml
# 先复制
cp ~/app/hadoop-2.6.0-cdh5.15.1/etc/hadoop/mapred-site.xml.template ~/app/hadoop-2.6.0-cdh5.15.1/etc/hadoop/mapred-site.xml
# 增加配置
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
# step 6 => 配置从节点
vi ~/app/hadoop-2.6.0-cdh5.15.1/etc/hadoop/slaves
hadoop000 # master/slave
hadoop001
hadoop002
# 7 分发
# hadoop-2.6.0-cdh5.15.1
scp -r ~/app/hadoop-2.6.0-cdh5.15.1 hadoop@hadoop001:~/
scp -r ~/app/hadoop-2.6.0-cdh5.15.1 hadoop@hadoop002:~/
# 添加系统环境变量
# .bash_profile
export HADOOP_HOME=/home/hadoop/app/hadoop-2.6.0-cdh5.15.1
export PATH=$HADOOP_HOME/bin:$PATH
# 拷贝环境变量
scp -r ~/.bash_profile hadoop@hadoop001:~/
scp -r ~/.bash_profile hadoop@hadoop002:~/
# 在各机器上source
# step 8
#NN格式化: 在hadoop000上执行就行 执行一次就行 切记
hadoop namenode -format
cd ~/app/hadoop-2.6.0-cdh5.15.1/sbin/
# 方式一 一个一个起
./hadoop-daemon.sh start namenode
# 方式二
./start-dfs.sh
# 验证
jps
# 如果发现有缺失的进程,找到对应日志
# example
cat /home/hadoop/app/hadoop-2.6.0-cdh5.15.1/logs/hadoop-hadoop-secondarynamenode-hadoop000.log
# 停先停datanode 起的时候反过来
cd ~/app/hadoop-2.6.0-cdh5.15.1/sbin/
./hadoop-daemon.sh stop datanode
# 在hadoop000
# 启动HDFS
./app/hadoop-2.6.0-cdh5.15.1/sbin/start-dfs.sh
# 启动YARN 默认端口8088
./app/hadoop-2.6.0-cdh5.15.1/sbin/start-yarn.sh
# ./sbin/mr-jobhistory-daemon.sh start historyserver
Hive:SQL on Hadoop
Interview:描述如何使用MapReduce来实现join的功能?
ReduceJoin
数据通过Mapper加载过来,然后经过shuffle阶段,在Reduce端完成真正的join操作
有些数据比较小(几十兆或者内存存的下),是否真的有必要全部进行shuffle呢?
Shuffle是整个大数据处理过程中非常耗时、非常损耗性能的地方
能规避shuffle的地方就不要使用shuffle
MapJoin ![IMG_0195](/Users/andy/Library/Application Support/typora-user-images/IMG_0195.PNG)
reducer个数 ==》 文件输出个数
删?
合?
仅有Map, 不需要reduce 场景:MapJoin ETL Sqoop
自己造数据(特征 ) !!!!!
# 归档小文件
hadoop archive -archiveName name -p <parent> [-r <replication factor>] <src>* <dest>
hadoop archive -archiveName pksmall.har -p /small /pksmallhar
#
hadoop fs -ls har:///pksmallhar/pksmall.har
HDFS数据:删 定时备份
Mac-rz sz--> scp
distcp缺陷
用jps做脚本状态检测,当看到这个不代表进程存在或不存在
一般使用 ps -ef | grep xxx 命令去查看进程是否存在,这才是真正状态检测
但是,比如spark thriftserver + hive 会启动一个driver,进程110,默认端口 10000.由于该程序内存泄漏或某种bug,导致进程ps是存在的,10000端口号下线,就不能对外提供服务。
总结:未来做任何程序的状态检测,必须通过端口号来
CDH root用户,jps查看或有很多 Process information unavailable
ps -ef | grep xxx 查看是正确的
那么想看正确的表述,需要切换对应的用户,
比如 su - hdfs (有可能你切换不过去,需要/etc/passwd文件的修正)
好处
Shuffle是所有分布式计算框架主要技术瓶颈
注意事项
Input ===> xxxxx ===> output
参数:
public class CompressionApp {
/**
* 1 通过输入流读入
* 2 通过输出流写出
* 3 使用IOUtils.copyBytes
*/
public static void main(String[] args) throws Exception {
String filename = "access/input/trackinfo_20130721.data";
String method = "org.apache.hadoop.io.compress.BZip2Codec";
long start = System.currentTimeMillis();
compress(filename, method);
System.out.println((System.currentTimeMillis() - start)/1000);
System.out.println("metnod2");
method = "org.apache.hadoop.io.compress.GzipCodec";
start = System.currentTimeMillis();
compress(filename, method);
System.out.println((System.currentTimeMillis() - start)/1000);
}
private static void compress(String filename, String method) throws IOException, ClassNotFoundException {
// 读取数据 待压缩
FileInputStream fis = new FileInputStream(new File(filename));
Class<?> codecClass = Class.forName(method);
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, new Configuration());
// TODO 压缩
FileOutputStream fos = new FileOutputStream(new File(filename + codec.getDefaultExtension()));
CompressionOutputStream cos = codec.createOutputStream(fos);
IOUtils.copyBytes(fis, cos, 1024*1024*5);
cos.close();
fos.close();
fis.close();
}
}
private static void decompression(String filename) throws IOException {
CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
CompressionCodec codec = factory.getCodec(new Path(filename));
CompressionInputStream fis = codec.createInputStream(new FileInputStream(new File(filename)));
FileOutputStream fos = new FileOutputStream(new File(filename) + ".decode");
fos.close();
fis.close();
}
// mapreduce.output.fileoutputformat.compress.codec
// mapreduce.output.fileoutputformat.compress
configuration.setBoolean("mapreduce.output.fileoutputformat.compress", true);
configuration.setClass("mapreduce.output.fileoutputformat.compress.codec", BZip2Codec.class, CompressionCodec.class);
<property>
<name>io.compression.codecs</name>
<value>
org.apache.hadoop.io.compress.GzipCodec,
org.apache.hadoop.io.compress.DefaultCodec,
org.apache.hadoop.io.compress.BZip2Codec,
org.apache.hadoop.io.compress.SnappyCodec
</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.compress</name>
<value>true</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.compress.codec</name>
<value>org.apache.hadoop.io.compress.BZip2Codec</value>
</property>
<property>
<name>mapreduce.map.output.compress</name>
<value>true</value>
</property>
<property>
<name>mapreduce.map.output.compress.codec</name>
<value>org.apache.hadoop.io.compress.BZip2Codec</value>
</property>
hadoop-daemon.sh start namenode
--> bin/dfs
--> org.apache.hadoop.hdfs.server.namenode.NameNode (hadoop-client)
The NameNode controls two critical tables:
* 1) filename->blocksequence (namespace)
文件到block的映射
* 2) block->machinelist ("inodes")
block到DN的映射
// 入口 main方法
public static void main(String argv[]) throws Exception {
NameNode namenode = createNameNode(argv, null);
if (namenode != null) {
namenode.join();
}
}
// 解析命令
// hadoop namenode -format
// hadoop-daemin.sh start namenode
createNameNode {
StartupOption startOpt = parseArguments(argv);
switch(startOpt) {
default:
new NameNode(conf); { // 主要就是初始化
initialize(conf); {
// 1
startHttpServer(conf);{
infoHost = bindAddress.getHostName(); // 0.0.0.0:50070
initWebHdfs(conf);
httpServer = new NameNodeHttpServer
setupServlets(httpServer, conf);
httpServer.start();
}
// 2
loadNamesystem(conf); // 加载元数据
{
loadFromDisk(conf) {
FSImage fsImage;
FSNamesystem namesystem;
namesystem.loadFSImage(startOpt);
new TranslatorPB...
}
}
// 3
createRpcServer(conf); // remote procedure call 远程过程调用
// 4 通过网络调用其他机器上的服务
startCommonServices(conf);
}
}
}
}
namenode.join();
// 解析参数
parseArguments()
loadNamesystem(conf);
NameNodeRpcServer{
// client --> NN
ClientNamenodeProtocolServerSideTranslatorPB
// DN --> NN
DatanodeProtocolServerSideTranslatorPB
// NN
NamenodeProtocolServerSideTranslatorPB
// 注册DN、心跳
serviceRpcServer
// HDFS客户端 ls mkdir rmr ...
clientRpcServer
}
// 构造器中实例化各种协议
LogServer ==> Flume ==> HDFS ==> 离线 ==> Kafka ==> 流处理
LogServer ==> Flume ==> Kafka ==> HDFS ==> 离线 ==> 流处理
#user nobody;
worker_processes 1;
#error_log logs/error.log;
#error_log logs/error.log notice;
#error_log logs/error.log info;
#pid logs/nginx.pid;
events {
worker_connections 1024;
}
http {
upstream bigdata.com {
server hadoop000:16666;
server hadoop000:18888;
}
include mime.types;
default_type application/octet-stream;
#log_format main '$remote_addr - $remote_user [$time_local] "$request" '
# '$status $body_bytes_sent "$http_referer" '
# '"$http_user_agent" "$http_x_forwarded_for"';
#access_log logs/access.log main;
sendfile on;
#tcp_nopush on;
#keepalive_timeout 0;
keepalive_timeout 65;
#gzip on;
server {
listen 6789;
server_name localhost;
#charset koi8-r;
#access_log logs/host.access.log main;
location / {
proxy_pass http://bigdata.com;
proxy_redirect default;
}
#error_page 404 /404.html;
# redirect server error pages to the static page /50x.html
#
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
# proxy the PHP scripts to Apache listening on 127.0.0.1:80
#
#location ~ \.php$ {
# proxy_pass http://127.0.0.1;
#}
# pass the PHP scripts to FastCGI server listening on 127.0.0.1:9000
#
#location ~ \.php$ {
# root html;
# fastcgi_pass 127.0.0.1:9000;
# fastcgi_index index.php;
# fastcgi_param SCRIPT_FILENAME /scripts$fastcgi_script_name;
# include fastcgi_params;
#}
# deny access to .htaccess files, if Apache's document root
# concurs with nginx's one
#
#location ~ /\.ht {
# deny all;
#}
}
# another virtual host using mix of IP-, name-, and port-based configuration
#
#server {
# listen 8000;
# listen somename:8080;
# server_name somename alias another.alias;
# location / {
# root html;
# index index.html index.htm;
# }
#}
# HTTPS server
#
#server {
# listen 443 ssl;
# server_name localhost;
# ssl_certificate cert.pem;
# ssl_certificate_key cert.key;
# ssl_session_cache shared:SSL:1m;
# ssl_session_timeout 5m;
# ssl_ciphers HIGH:!aNULL:!MD5;
# ssl_prefer_server_ciphers on;
# location / {
# root html;
# index index.html index.htm;
# }
#}
}
bigdata.ruozedata.com 你要配置在hosts
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
public class FileUtils {
public static void deleteOutput(Configuration configuration, String output) throws IOException {
FileSystem fileSystem = FileSystem.get(configuration);
Path outputPath = new Path(output);
if (fileSystem.exists(outputPath)) {
fileSystem.delete(outputPath, true);
}
}
}
作业:造数据
[11/06/2021:20:51:07 +0800] 182.90.24.6 - 111 - POST http://www.ruozedata.com 200 38 821 HIT
倒数第二个字段:造10%的脏数据【字符串类型】
https://help.aliyun.com/document_detail/27142.htm?spm=a2c4g.11186623.2.4.68b4f625hzxEUK#task-187634
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。