Hadoop大数据技术基础

ObjectKaz Lv4

大数据介绍

  1. 定义:大数据是从各种类型的海量数据中快速获得有价值信息的技术。
  2. 特点:
    • 大量化 Volume :数据体量大
    • 多样化 Vaiety:数据类型多
    • 快速化 Velocity:处理速度快
    • 价值 Value:价值密度低

Hadoop介绍

Google的三驾马车和Hadoop的开源实现

Google三驾马车Hadoop的实现
GFSHDFS
MapReduceMapReduce
BigTableHBase

特点

  • 高可靠性
  • 高拓展性
  • 高效性
  • 高容错性

生态圈

名称功能
HDFS分布式文件系统
YARN资源调度框架
MapReduce分布式并行编程模型
HBase分布式列式数据库
Hive大数据数据仓库
Pig查询半结构化数据集的分析平台
Flume一个高可用、高可靠、分布式的海量日志采集、聚合和传输系统
Sqoop传统数据库与Hadoop数据存储和处理平台间进行数据传递的工具
ZooKeeper提供分布式协调一致性服务
AmbariHadoop快速部署工具
Mahout提供一些可拓展的机器学习领域经典算法的实现

HDFS

介绍

  1. 优点:低成本运行、处理超大文件、流式访问、高容错性
  2. 缺点:不适合低延迟访问、不适合小文件读写、不适合多用户写入、不适合文件任意修改

HDFS 架构

HDFSNameNodeSecondaryNameNodeDataNode 三块组成。

graph TB
	subgraph 服务端
		SenondaryNameNode
		NameNode
		DataNode1
		DataNode2
		DataNode3
		
	end
		Client--处理数据块-->DataNode1
		Client(客户端)--请求文件-->NameNode

NameNode

  1. 功能:

    1. 管理和维护 HDFS 的命名空间:两类文件
    2. 管理 DataNode 上的数据块
    3. 接受客户端的请求
  2. 两类文件:

    1. fsimage文件: 命名空间镜像文件,存储文件系统元信息。
      • 包含所有的目录和文件信息
      • 对于目录来说,包含的信息主要有修改时间、访问控制权限等
      • 对于文件来说,包含的信息有修改时间、访问时间、访问控制、块大小和组成一个文件块等
    2. edits文件: 记录操作日志 editlog
      • edits 保存了很多对文件进行操作的指令。
    3. 关系:
      • edits记录的是文件的操作记录,是过程
      • fsimage 保存的是文件的操作结果,是结果

DataNode

  1. 数据块

    1. 数据块是磁盘进行数据读、写的最小单位,
    2. 文件系统数据块的大小通常为磁盘数据块的整数倍。Hadoop 2.x 默认为 128MB
    3. 目的:减少寻址开销,减少磁盘一次读取时间。
    4. HDFS将每个文件划分为大小相等的数据块进行存储。为了容错,文件的所有数据块都被冗余复制,
  2. 功能

    1. 存储数据块
    2. I/O:根据 NameNode 的指令,和客户端就行 I/O 操作
    3. 心跳通信:定期向 NameNode 发送心跳信息,保持和 NameNode 的通信。

SecondaryNameNode

定期合并 fsimageedits,避免 edits 过大。

graph LR
STOP(暂停)-->DOWNLOAD(下载)
DOWNLOAD-->MERGE(合并)
-->UPLOAD(上传)
-->REPLACE(替换)
-->RESTORE(恢复)
  1. 暂停:SecondaryNameNodeNameNode 发送请求,要求其暂停使用 edits 文件。之后新的更新操作写入 edits.new文件。
  2. 下载:SecondaryNameNodeNameNode 下载 fsimageedits 文件。
  3. 合并:SecondaryNameNode 根据 edits 中的信息,对 fsimage 进行操作。最终形成一个新的 fsimage.ckpt
  4. 上传:SecondaryNameNode 将合并后的 fsimage.ckpt 上传到 NameNode。‘
  5. 替换: NameNode用得到的新 fsimage.ckpt 替换旧的 fsimage,用 edits.new 替换 edits
  6. 恢复:NameNode 恢复使用 edits 记录操作日志。

工作机制

机架感知

  1. 概述:根据网络的拓扑结构,对 HDFS 的操作进行优化。
  2. 默认情况:不开启机架感知,即认为全部在一个机架上。
  3. 自定义:通过外部脚本文件来判断。输入IP地址,输出机架信息。

副本冗余策略

  1. 副本存储

    1. 副本1:在上传文件的数据节点
    2. 副本2:在与第一个节点不同的机架上
    3. 副本3:和副本2一个机架,但不同的节点
    4. 更多副本:均匀分布在剩余的机架下
    5. 其余副本:随机选择节点存放
  2. 优点

    1. 减小了机架间的数据传输,提高写操作效率
    2. 不影响数据的可靠性和可用性,但改进了写的性能
    3. 减小因机架间的数据传输,导致的网络传输之间的总带宽

文件读取

sequenceDiagram
    Client->>DistributedFileSystem: 打开文件
    DistributedFileSystem->> NameNode: 请求数据块信息
    Client->>FSDataInputStream: 打开流
    FSDataInputStream->>DataNode: 读取数据
    Client->>FSDataInputStream: 关闭流
  1. 客户端通过 DistributedFileSystem 打开文件
  2. DistributedFileSystemNameNode 请求数据块信息
  3. ClientFSDataInputStream读取数据
  4. FSDataInputStreamDataNode 读取数据
  5. Client关闭FSDataInputStream

文件写入

sequenceDiagram
    Client->>DistributedFileSystem: 创建元请求
    DistributedFileSystem->> NameNode: 创建文件元数据
    Client->>FSDataOutputStream: 写入数据
    FSDataOutputStream->>DataNode: 写入数据包
    DataNode->>FSDataOutputStream: 接受数据包
    Client->>FSDataInputStream: 关闭
    FSDataInputStream->>NameNode: 通知写入完成
  1. 客户端通过 DistributedFileSystem 打开文件。

  2. DistributedFileSystem 请求 NameNode 创建元数据。

    此操作通过 RPC 调用完成。RPC 是指远程过程调用,它类似函数调用,区别是函数在另一台服务器上。

  3. ClientFSDataOutputStream写入数据,先写入缓冲区,再一个个切分成数据包。

  4. FSDataOutputStreamDataNode 发送数据,节点由 NameNode 分配。数据包由这些 DataNode 组成的管道进行传输。

  5. 管道上的 DataNode 反向返回确认信息,最终由第一个数据节点返回给 FSDataOutputStream

  6. Client关闭FSDataOutputStream

  7. FSDataInputStream 通知 NameNode 文件写入完成。

错误处理

DataNode 出错

由 HDFS 自动处理

  1. 判定:若NameNode 近期未接受到 DataNode 的心跳信息,就认为这个 DataNode 挂了。
  2. 处理方法:检测需要复制的数据块,将其复制到其他节点。

NameNode 出错

  1. 因素: fsimage 或者 edits 损坏。
  2. 后果:整个 HDFS 直接挂掉

解决方案

  1. 同步存储到其他文件系统
  2. HDFS HA: 高可用性
    • 共享存储系统:如ZooKeeper
    • 使用主从 NameNode存储数据
    • 各个节点之间使用 JournalNode 同步数据

Shell 操作

操作命令参数示例
创建文件夹hdfs dfs -mkdir [-p] <paths>-p:如果父目录不存在,则先创建父目录创建 hadoop 目录:hdfs dfs -mkdir /hadoop
列出文件夹hdfs dfs -ls [-dhR] <paths>-d:返回父目录
-R:同时显示子目录
显示 hadoop 目录:hdfs dfs -ls /hadoop
创建文件hdfs dfs -touchz <paths>创建 hadoop.txt 文件:hdfs dfs -touchz /hadoop.txt
上传文件hdfs dfs -put <local_src> <hdfs_dst>
hdfs dfs -copyFromLocal <local_src> <hdfs_dst>
/root/app.txt 上传到 HDFS的根目录:
hdfs dfs -put /root/app.txt /app.txt
移动文件到HDFShdfs dfs -moveFromLocal <local_src> <hdfs_dst>/root/app.txt 移动到 HDFS的根目录:
hdfs dfs -moveFromLocal /root/app.txt /app.txt
下载文件hdfs dfs -get <hdfs_src> <local_dst>
hdfs dfs -copyToLocal <hdfs_src> <local_dst>
/app.txt 下载到 /root/app.txt:
hdfs dfs -copyToLocal /app.txt /root/app.txt
查看文件hdfs dfs -cat <hdfs_path>查看 /app.txt:
hdfs dfs -cat /app.txt
追加文件hdfs dfs -appendToFile <hdfs_src> <local_dst>追加 /root/app.txt/app.txt:
hdfs dfs -appendToFile /root/app.txt /app.txt
删除文件或目录hdfs dfs -rm [-fr] <hdfs_path>-f 文件不存在时,不显示错误信息
-r递归删除文件夹
删除文件 /app.txt
hdfs -dfs -rm /app.txt
删除空文件夹 /hadoop
hdfs -dfs -rm /hadoop
删除非空文件夹 /hadoophdfs -dfs -rm -r /hadoop

YARN

概述

  1. 一个资源管理系统,为上层应用提供统一的资源管理和调度
  2. 主要功能:
    • 安排任务
    • 任务的进度监控

架构

YARNContainerResourceManagerNodeManagerApplicationMaster 组成。

graph TB
	subgraph Node1
		NodeManager1
		subgraph Container1
			ApplicationMaster
		end
	end
	subgraph Node2
		NodeManager2
		subgraph Container2
			Application1
		end
	end
	subgraph Node3
		NodeManager3
		subgraph Container3
			Application2
		end
	end
		Client-->ResourceManger
		ResourceManger-->NodeManager1
		ResourceManger-->NodeManager2
		ResourceManger-->NodeManager3
  1. Container:是 YARN 中资源CPU、内存等的抽象,它封装了某个节点的多维度资源。
  2. ResourceManager:负责整个系统的资源分配和管理。
    • Scheduler:为应用程序分配封装在 Container 的容器
    • ApplicationManager:管理整个系统中的应用程序
  3. NodeManager:每个节点上的资源管理器
    • 定时向ResourceManager报告资源使用情况
    • 接受和处理来自ApplicationManager的启动/停止请求
  4. ApplicationMaster
    • ResourceManager协商以获取 Container
    • 负责应用的监控,跟踪执行状态、重启失败任务
    • NodeManager 协同完成工作的监控

应用运行和监控机制

sequenceDiagram
    Client->>ResourceManager: 提交应用
    ResourceManager->>Container1(内含ApplicationMaster): 初始化
    Container1(内含ApplicationMaster)->>Container1(内含ApplicationMaster): 启动容器
	alt 计算资源不够
            Container1(内含ApplicationMaster)->>ResourceManager: 申请资源
            Container1(内含ApplicationMaster)->>Container2(内含Application): 启动容器
    end
	loop 进度监控
            Container2(内含Application)->>Container1(内含ApplicationMaster): 汇报进度
            Container1(内含ApplicationMaster)->>Container1(内含ApplicationMaster): 形成视图
    end    
    Client->>Container1(内含ApplicationMaster): 获取进度

运行机制

  1. 提交应用:ClientResourceManager 提交应用
  2. 初始化容器:ResourceManager 初始化一个容器Container
  3. 启动容器:节点在 NodeManager 的协助下启动容器Container
    • 首次启动时 Container内包含 ApplicationMaster
    • 如果 ApplicationMaster计算资源不足,则向 ResourceManager申请资源。
    • 申请资源后,ApplicationMaster启动一个新的容器Container,内含 Application
  4. 执行应用

监控机制

  1. 汇报状态:ApplicationApplicationMaster报告自己的进度
  2. 汇聚视图:ApplicationMaster汇聚作业视图
  3. 获取状态:客户端获取作业的状态

MapReduce

介绍

  1. 核心思想 分而治之,汇总结果 ,将一个大任务拆分成很多的小任务,每个小任务进行各自的处理,最后将小任务的结果进行汇总。
  2. 特点:易于编程、良好的拓展性、高容错性、离线处理海量数据
  3. 缺点:不擅长实时计算、不擅长流式计算、不擅长有向图计算

编程模型

graph LR
Input--InputFormat-->K11V11(k1,v1)
Input--InputFormat-->K12V12(k1,v1)
Input--InputFormat-->K13V13(k1,v1)
Input--InputFormat-->K14V14(k1,v1)
K11V11--Map-->K21V21(k2,v2)
K12V12--Map-->K22V22(k2,v2)
K13V13--Map-->K23V23(k2,v2)
K14V14--Map-->K24V24(k2,v2)
K21V21--Shuffle-->K32V32(k2,v2的数组)
K22V22--Shuffle-->K31V31(k2,v2的数组)
K23V23--Shuffle-->K31V31
K24V24--Shuffle-->K33V33(k2,v2的数组)
K31V31--Reduce-->K41V41(k3,v3)
K32V32--Reduce-->K42V42(k3,v3)
K33V33--Reduce-->K43V43(k3,v3)
K41V41--OutputFormat-->Output
K42V42--OutputFormat-->Output
K43V43--OutputFormat-->Output

执行流程

  1. 输入Input一个大文件,拆分Split成很多片段。
  2. 每个片段由单独一个节点处理,称为 Map 阶段。
  3. 将各个主机计算得到的结果进行汇总得到最终结果,称为 Reduce 阶段。

特点

  1. Job = Map + Reduce
  2. Map 的输出是 Reduce 的输入
  3. 输入和输出都是 <K,V>的形式
  4. 输入和输出的类型必须是 Hadoop 的类型
  5. 处理的数据都是 HDFS 上的数据

编程步骤

graph LR
create(创建项目)-->maven(配置 Maven)
maven-->mapper(编写Mapper)
mapper--> reducer(编写 Reducer)
reducer--> App(编写主类)

创建项目

使用 idea 创建一个项目。

配置 Maven

  1. 依赖项:
groupIdartifactIdversion
org.apache.hadoophadoop-common和安装的 hadoop 版本一致
org.apache.hadoophadoop-hdfs和安装的 hadoop 版本一致
org.apache.hadoophadoop-client和安装的 hadoop 版本一致
org.apache.hadoophadoop-common和安装的 hadoop 版本一致
  1. 构建插件需要设置主类

示例(hadoop-2.9.2):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>cn.kaz</groupId>
<artifactId>mapreduce</artifactId>
<version>1.0-SNAPSHOT</version>

<name>mapreduce</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.9.2</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<archive>
<manifest>
<!--设置插件的主类-->
<mainClass>cn.kaz.App</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>

编写 Mapper

  1. 自定义的Mapper 类需要继承 org.apache.hadoop.mapreduce.Mapper,后面需要接上括号。括号中分别为k1,v1,k2,v2

    如果没有指定 InputFormat,或者指定的 InputFormatTextInputFormat,则会将输入按行进行分割。

    k1LongWritable,是该行在整个文件中的字节偏移量

    v1Text,是该行的内容

  2. 自定义的 Mapper 需要覆写 map 函数,参数为 k1 keyv1 valueContext context,抛出 IOExceptionInterruptedException

  3. 写入键值对 context.write(k2,v2)

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] values = value.toString().split(" ");
for (String str: values) {
context.write(new Text(str), new IntWritable(1));
}
}
}

编写 Reducer

  1. 自定义的Reducer 类需要继承 org.apache.hadoop.mapreduce.Reducer,后面需要接上括号。括号中分别为k2,v2,k3,v3

  2. 自定义的 Reducer 需要覆写 reduce 函数,参数为 k2 keyIterable<v2> valuesContext context,抛出 IOExceptionInterruptedException

  3. 写入键值对 context.write(k3,v3)

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int num = 0;
for (IntWritable v: values) {
num += v.get();
}
context.write(key,new IntWritable(num));
}
}

编写主类

graph LR
create(创建 job 实例)-->app(设置主类)
app-->mapper(设置Mapper和输出键值对)
mapper--> reducer(设置Reducer和输出键值对)
reducer--> io(设置 I/O 路径)
--> run(执行任务)

!> 设置的类需要以 类名.class 结尾,而不是类名本身。详情请参考 Java反射相关知识。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class App
{
public static void main( String[] args ) throws Exception
{
// 创建实例
Job job = Job.getInstance(new Configuration());

// 设置主类
job.setJarByClass(App.class);

// 设置 Mapper 类和 Mapper 输出键值对
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

// 设置 Reducer 类和 Reducer 输出键值对
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// 设置 I/O 路径
Path input = new Path(args[0]);
Path output = new Path(args[1]);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);

// 执行任务
job.waitForCompletion(true);
}
}

数据处理和转换

Java内置类型转换

源类型目标类型转换方法 srcdst
booleanStringdst = Boolean.toString(src)
Stringbooleandst = Boolean.parseBoolean(src)
intStringdst = Integer.toString(src)
Stringintdst = Integer.parseInt(src)
longStringdst = Long.toString(src)
Stringlongdst = Long.parseLong(src)
floatStringdst = Float.toString(src)
Stringfloatdst = Float.parseFloat(src)
doubleStringdst = Double.toString(src)
Stringdoubledst = Double.parseDouble(src)
intlong,double隐式类型转换dst = src
long,doubleint强制类型转换dst = (int)src
doublelong强制类型转换dst = (long)src
booleanint,long三目语句dst=src ? 1 : 0

Hadoop类型和Java类型转换

Java类型Hadoop类型Hadoop类型hv转换成Java类型jvJava类型jv转换成Hadoop类型hv
booleanBooleanWritablejv=hv.get()hv=new BooleanWritable(jv)
intIntWritablejv=hv.get()hv=new IntWritable(jv)
longLongWritablejv=hv.get()hv=new LongWritable(jv)
floatFloatWritablejv=hv.get()hv=new FloatWritable(jv)
doubleDoubleWritablejv=hv.get()hv=new DoubleWritable(jv)
StringTextjv=hv.toString()hv=new Text(jv)
NullWritablehv=NullWritable.get()

csv表格的处理

csv 使用 , 作为数据分隔符。通过字符串分割获取每一列的内容。

1
2
String data = v1.toString(); // v1 是单行的内容
String[] words = data.split(','); // words 即为得到的由列构成的数组

自定义类型

  1. 序列化接口用于将内存中的Java对象转换成可存储文件或者可以传送到其他设备的流。
  2. Hadoop 类型相对于Java内置类型,添加了序列化接口 :org.apache.hadoop.io.Writable。对于自定义类型,如果需要支持 MapReduce 的操作,则需要实现 Writable 接口。
  3. Writable 接口有两个方法:
    + void write(DataOutput out) throws IOException:用于将数据写入流
    + void readFields(DataInput in) throws IOException:用于从流中读取数据
两个方法中,读写顺序必须严格一致。
  1. 常见类型的读写
Java类型输出out输入in
booleanout.writeBoolean(obj)输出in.readBoolean(obj)
intout.writeInt(obj)输出in.readInt(obj)
longout.writeLong(obj)输出in.readLong(obj)
doubleout.writeDouble(obj)输出in.readDouble(obj)
Stringout.writeUTF(obj)输出in.readUTF(obj)

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class Emp implements Writable {

private int empno;
private String ename;
private String job;
private int mgr;
private String hiredate;
private int sal;
private int comm;
private int deptno;

public void write(DataOutput output) throws IOException {
output.writeInt(this.empno);
output.writeUTF(this.ename);
output.writeUTF(this.job);
output.writeInt(this.mgr);
output.writeUTF(this.hiredate);
output.writeInt(this.sal);
output.writeInt(this.comm);
output.writeInt(this.deptno);
}

public void readFields(DataInput input) throws IOException {
this.empno = input.readInt();
this.ename = input.readUTF();
this.job = input.readUTF();
this.mgr = input.readInt();
this.hiredate = input.readUTF();
this.sal = input.readInt();
this.comm = input.readInt();
this.deptno = input.readInt();
}

public void setEmpno(int empno) {
this.empno = empno;
}

public void setEname(String ename) {
this.ename = ename;
}

public void setJob(String job) {
this.job = job;
}

public void setMgr(int mgr) {
this.mgr = mgr;
}

public void setHiredate(String hiredate) {
this.hiredate = hiredate;
}

public void setSal(int sal) {
this.sal = sal;
}

public void setComm(int comm) {
this.comm = comm;
}

public void setDeptno(int deptno) {
this.deptno = deptno;
}

public int getEmpno() {
return empno;
}

public String getEname() {
return ename;
}

public String getJob() {
return job;
}

public int getMgr() {
return mgr;
}

public String getHiredate() {
return hiredate;
}

public int getSal() {
return sal;
}

public int getComm() {
return comm;
}

public int getDeptno() {
return deptno;
}
}

运行和监控机制

MapReduce的运行机制和 YARN 大体相同,不过 MapReduceApplication Master称为 MRAppMaster

sequenceDiagram
	Client->>Client: 请求执行 Job
	Client->>ResourceManager: 请求Job ID
	Client->>Client: 计算输入分片
	Client->>HDFS: 保存相关资源
    Client->>ResourceManager: 提交应用
    ResourceManager->>Container1(内含MRAppMaster): 初始化
    Container1(内含MRAppMaster)->>Container1(内含MRAppMaster): 启动容器
    HDFS->>Container1(内含MRAppMaster): 获取分片资源
    Container1(内含MRAppMaster)->>Container1(内含MRAppMaster): 创建任务对象    
	alt 计算资源不够
            Container1(内含MRAppMaster)->>ResourceManager: 申请资源
            Container1(内含MRAppMaster)->>Container2(内含Application): 启动容器
            HDFS->>Container2(内含Application): 获取资源并本地化
    end
	loop 进度监控
            Container2(内含Application)->>Container1(内含MRAppMaster): 汇报进度
            Container1(内含MRAppMaster)->>Container1(内含MRAppMaster): 形成视图
    end    
    Client->>Container1(内含MRAppMaster): 获取进度

运行机制

  1. 请求JobClient 请求执行 Job

  2. 准备Job

    • JobResourceManager请求Job ID
    • 计算输入分片。
    • 将运行作业所需的Jar 文件、配置文件、分片等资源保存在HDFS中以 jobID 命名的目录下。
  3. 提交应用:JobResourceManager提交应用,并传入相应资源。

  4. 初始化容器:ResourceManager 初始化一个容器Container

  5. 启动容器:

    • 节点在 NodeManager 的协助下启动容器,包含 MRAppMaster
    • MRAppMaster对应用进行初始化。
  6. 创建和分发 MapReduce任务

    • MRAppMaster 获取从 HDFS 中输入的分片,对每个分片创建一个 Map 任务对象和多个 Reduce 任务对象。此时分配各个 Task ID
    • 如果 MRAppMaster计算资源不足,则向 ResourceManager申请资源。
    • 申请资源后,MRAppMaster启动一个新的容器Container,内含 Application
    • Application获取Jar 文件、配置文件、分片等资源,保存到本地。
  7. 运行任务

监控机制

  1. 汇报状态:ApplicationMRAppMaster报告自己的进度
  2. 汇聚视图:MRAppMaster汇聚作业视图
  3. 获取状态:客户端获取作业的状态
编号单独考查混合考查HBase重点Hive重点Flume重点
37303190285640%0HBase Shell操作Hive 数据模型
Hive表操作
Hive数据查询
Hive常用函数
Flume配置文件编写
37303200123445%0HBase Shell操作Hive安装
Hive表操作
Hive数据查询
Hive常用函数
Flume配置文件编写

HBase

简介

  1. 特性:伸缩性强、自动分区、线性拓展和新节点自动处理、容错能力强、检索性能强、支持普通硬件
  2. 应用场景:对象存储、时序数据存储、推荐画像、时空数据存储

基本概念

  1. Table:由行和列构成,列又分成若干个列族。

  2. Row:用行键 rowKey 唯一标识数据

  3. 列族Column Family

    • 需要在表的定义阶段给出。
    • 有一个或多个列成员。列成员可以动态加入
  4. 列限定符Qualifier:又称为列 Column。列族里面的数据通过列限定符来定位。无需事先定义,也不需要在不同行之间保持一致。

  5. 单元格 Cell:由行键、列族、列限定符、时间戳唯一决定。不分类型。

  6. 时间戳 TimeStamp:通过时间戳区分版本。

基本操作

表操作

操作命令注意
创建表create "表名","列族2","列族2",...
添加列族alter "表名","列族名"
删除列族alter "表名",{NAME => "列族名称",METHOD => "delete"}
查看表的描述describe "表名"
禁用表disable "表名"
删除表drop "表名"需要先禁用表,再执行删除操作

数据操作

操作命令注意
插入/更新数据create "表名","行键","列族:列名","值"
删除数据delete "表名","行键","列族:列名"
查看某条数据get "表名","行键"
查看全部数据scan "表名"

Hive

一个基于 Hadoop 的数据仓库工具。

环境搭建

嵌入式安装

  1. 安装

    • 解压目录

    • 设置环境变量

      HIVE_HOME=Hive安装目录

      PATH=$PATH:$HIVE_HOME/bin

  2. 配置

    • 修改配置文件,位置:$HIVE_HOME/conf/hive-site.xml
配置项描述
javax.jdo.option.ConnectionURLjdbc:derby;databaseName=metastore_db;create=true数据库连接地址
javax.jdo.option.ConnectionDriverNameorg.apache.derby.jdbc.EmbeddedDriver数据库连接驱动
hive.metastore.localtrue是否使用本地存储
hive.metastore.warehouse.dirfile://home/hadoop/hive/warehouse本地存储目录
  1. 初始化
    • 初始化 Derby 数据库

      schematool -dbType derby -initSchema

本地模式安装

需要先装好 HADOOPMySQL

  1. 安装

    • 解压目录

    • 设置环境变量

      HIVE_HOME=Hive安装目录

      PATH=$PATH:$HIVE_HOME/bin

  2. 配置

    • 修改配置文件,位置:$HIVE_HOME/conf/hive-site.xml
配置项描述
javax.jdo.option.ConnectionURLjdbc:mysql://localhost:3306/hive?useSSL=false数据库连接地址
javax.jdo.option.ConnectionDriverNamecom.mysql.jdbc.Driver数据库连接驱动
javax.jdo.option.ConnectionUserNamehive数据库用户名
javax.jdo.option.ConnectionPassword123456数据库密码
  1. 初始化
    • 复制 mysql 驱动到 $HIVE_HOME/lib

    • 初始化 MySQL 数据库

      schematool -dbType mysql -initSchema

数据模型

    • 内部表:
      • 概念:与数据库中的表在概念上是类似
      • 存储:每一个内部表在 Hive 中有相应的存储目录。删除时元数据和数据都会删除。
    • 外部表:
      • 概念:指向一个已经在 HDFS 中的数据,可以创建分区。
      • 存储:元数据的组织和内部表相同,但不会移动数据到数据仓库目录中,只是与外部数据建立一个链接。删除时,只删除元数据和链接。
    • 分区表:
      • 概念:通过对某或某些列的数据进行分区,使得查询时不必扫描全表,以提高查询速度。
      • 存储:每个分区在单独一个文件夹下,每个文件夹存放于表目录下。
      • 分区在表中以字段的形式存在,但仅表示分区,不表示数据。
  1. 桶:是指将表或分区中指定列的值为 key 进行 hashhash到指定的桶中

    • 优点:提高查询处理效率、提高取样效率
  2. 视图:是一个虚表。视图并不在数据库中以存储的数据值集形式存在。行和列数据来自由定义视图的查询所引用的表,并且在引用视图时动态生成。

数据库操作

  1. 创建数据库
1
2
3
4
CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name
[COMMENT database_comment]
[LOCATION hdfs_path]
[WITH DBPROPERTIES (property_name=property_value,…)];
语句说明
DATABASE|SCHEMA用于限定创建数据库或数据库模式
IF NOT EXISTS目标对象不存在时才执行创建操作(可选)
COMMENT起注释说明作用
LOCATION指定数据库位于HDFS上的存储路径。若未指定,将使用hive.metastore.warehouse.dir 定义值作为其上层路径位置
WITH DBPROPERTIES为数据库提供描述信息,如创建database的用户或时间

创建数据库示例

1
2
create database hadoop_hive 
location '/home/ubuntu/Desktop/hadoop_hive'; // 指定存储路径

  1. 选择数据库
1
use database_name;

使用默认数据库:

1
USE DEFAULT;

表操作

  1. 创建表
1
2
3
4
5
6
7
8
9
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name // 指定表名
[(col_name data_type [COMMENT col_comment], ...)] // 指定表中的列
[LIKE table_name] // 复制其他表的定义
[COMMENT table_comment] // 表备注
[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] //添加分区列
[CLUSTERED BY (col_name, col_name, ...) INTO num_buckets BUCKETS] // 分桶
[ROW FORMAT DELIMITED [FIFLDS TERMINATED BY 'char'] [COLLECTION ITEMS TERMINATED BY char]] // 指定表的存储格式
[LOCATION hdfs_path] //指定存储位置
[TBLPROPERTIES (property_name=property_value,…)] // 自定义属性
语句说明
EXTERNAL创建外部表,若未指定,则默认创建的是内部表
IF NOT EXISTS若表不存在才创建,若未指定,当目标表存在时,创建操作抛出异常
COMMENT添加注释说明,注释内容位于单引号内
LIKE复制其他表的定义
PARTITIONED BY添加分区列
CLUSTERED BY根据列之间的相关性指定列聚类在相同桶中(BUCKETS),可以对表内容按某一列进行升序(ASC)或降序(DESC)排序(SORTED BY关键字)
ROW FORMAT指定 hive 表行对象(ROW Object)数据与 HDFS 数据之间进行传输的转换方式(HDFS files -> Deserializer ->Row object以及Row object ->Serializer ->HDFS files),以及数据文件内容与表行记录各列的对应。
在创建表时可以指定数据列分隔符(FIFLDS TERMINATED BY 子句),
LOCATION指定表数据在 HDFS 上的存储位置。若未指定,db_name数据库将会储存在${hive.metastore.warehouse.dir}定义位置的db_name目录下
TBLPROPERTIES为所创建的表设置属性(如创建时间和创建者,默认为当前用户和当前系统时间)

例子 创建普通表

INFOTYPECOMMENT
SnoINTstudent sno
nameSTRINGstudent name
ageINTstudent age
sexSTRINGstudent sex
scoreSTRUCT <Chinese:FLOAT,Math:FLOAT,English:FLOAT>student score
1
2
3
4
5
6
CREATE TABLE IF NOT EXISTS test2.student(
sno INT COMMENT 'student sno',
name STRING COMMENT 'student name',
age INT COMMENT 'student age',
sex STRING COMMENT 'student sex',
score STRUCT<Chinese:FLOAT,Math:FLOAT,English:FLOAT> COMMENT 'student score');

例子 复制表结构

1
CREATE TABLE IF NOT EXISTS items_info2 LIKE items_info;

例子 创建分区表

使用 shopping 数据库创建一张商品信息分区表 items_info2 ,按商品品牌 p_brand 和商品分类 p_category 进行分区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE TABLE IF NOT EXISTS shopping.items_info2(
name STRING COMMENT 'item name',
price FLOAT COMMENT 'item price',
category STRING COMMENT 'item category',
brand STRING COMMENT 'item brand',
type STRING COMMENT 'item type',
stock INT COMMENT 'item stock',
address STRUCT<street:STRING, city:STRING, state:STRING, zip:INT> COMMENT 'item sales address')
COMMENT 'goods information table' // 指定表备注
PARTITIONED BY (p_category STRING,p_brand STRING) // 指定分区列
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t' // 列之间的分隔
COLLECTION ITEMS TERMINATED BY ',' //结构体之间的分隔
TBLPROPERTIES ('creator'='Xiaoming','date'='2019-01-01'); //自定义表属性

例子 指定存储格式

建立指定存储格式的表示例:

1
2
3
4
5
6
7
8
9
10
11
12
CREATE TABLE IF NOT EXISTS shopping.items_info(
id INT COMMENT 'item id',
name STRING COMMENT 'item name',
price FLOAT COMMENT 'item price',
category STRING COMMENT 'item category',
brand STRING COMMENT 'item brand',
stock INT COMMENT 'item stock',
address STRUCT<city:STRING, country:STRING, zip:INT> COMMENT 'item sales address')
COMMENT 'goods information table'
row format delimited fields terminated by ',' //字段以‘,’分隔
collection items terminated by '-' //集合以‘-’分隔
TBLPROPERTIES ('creator'='Xiaoming','date'='2019-01-01');

例子 创建分桶表

1
2
3
4
5
6
7
create table bucket_user (
id int,
name string
)
clustered by(id) into 4 buckets // 按照 id 进行分桶,一共分成4个桶
row format delimited fields terminated by '\t'
stored as textfile;
  1. 导入数据:用于把数据搬运到 Hive 表上位于 HDFS 上的目录位置。

    • 导入HDFS的目录:执行移动操作
    • 导入本地的目录:执行复制操作
    • 导入本地的文件:执行复制操作
    • 若创建表时指定了分区列,使用 LOAD 命令加载数据时也要为所有分区列指定特定值。

!> 此操作不会对数据进行格式化和检查。创建的表需要指定存储格式。

1
2
LOAD DATA [LOCAL] INPATH 'filepath' 
[OVERWRITE] INTO TABLE tablename [PARTITION (partcol1=vall,partcol2=val2 …)];
语句说明
LOCAL加载本地文件系统中的数据
OVERWRITE覆盖原有数据,而不是追加
PARTITION (partcol1=vall,partcol2=val2 …)指定数据所在分区
  1. 导出数据

单文件写入

1
2
3
INSERT OVERWRITE [LOCAL] DIRECTORY directory
[ROW FORMAT row_format] [STORED AS file_format]
SELECT select_statement FROM from_statements;
语句说明
LOCAL数据写入到本地文件系统

写入数据示例

1
2
INSERT OVERWRITE LOCAL DIRECTORY '/home/test4'
SELECT * FROM student limit 2;

视图操作

  1. 创建视图
1
2
3
4
5
CREATE VIEW [IF NOT EXISTS] view_name
[(column_name [COMMENT column_comment],…)] // 列信息
[COMMENT view_comment] // 视图备注
[TBLPROPERTIES (property_name = property_value,…)] // 视图自定义属性
AS SELECT …; // select 子句

例子

基于表test创建一个test_view视图:

1
2
CREATE VIEW test_view(id,name_length) 
AS SELECT id,length(name) FROM test;

数据查询

1
2
3
4
5
6
7
SELECT select_expr column_name, select_expr  column_name, ... | * // 指定要查询的列
FROM table_reference //指定表名
[[LEFT | RIGHT [[OUTER] | [SEMI]]]JOIN table_reference2 ON table_reference.column_name=table_reference2.column_name] // 表连接
[WHERE where_condition] // 指定查询条件
[GROUP BY col_list [HAVING condition]] // 数据分组
[SORT BY| ORDER BY column_name [DESC | ASC],...] // 数据排序
[LIMIT number] // 限制查询数目
  1. 查询所有列
1
select * from db_name;
  1. where 子句:将不满足条件的行过滤,在SQL语句中执行顺序优先于group by。

    • 多个语句间可以使用 AND OR 进行连接。

示例

1
select name from student where age>25;

  1. like:用于在 WHERE子句中搜索列中的指定模式。%代表任意多个字符。

示例

查询出工作职责涉及hive的数据

1
select * from db1.table1 where responsibility like '%hive%';

  1. group by:表示按照某些字段的值进行分组,有相同的值放到一起,需要注意的是select后面的非聚合列必须出现在group by中。

示例

1
select city,avg(salary) from st group by city;

  1. join:用于多表查询,并将查询结果进行连接。Hive只支持等值连接,即ON子句中只能使用等号连接。
类型解释示例
内连接把符合两边连接条件的数据查询出来。select a.name,b.score from a join b on a.id=b.cid;
左外连接左表全部查询出来,右表不符合连接条件的显示为空select a.name,b.score from a left outer join b on a.id=b.cid;
右外连接右表全部查询出来,左表不符合连接条件的显示为空select a.name,b.score from a right outer join b on a.id=b.cid;
全外连接左右表符合连接条件和不符合连接条件的都查出来,不符合的显示空select a.name,b.score from a full outer join b on a.id=b.cid;
左半开连接查询出满足连接条件的左边表记录,需要注意的是select和where语句中都不能使用右表的字段select a.name from a LEFT SEMI JOIN b on a.id=b.cid;
  1. order by排序

    order by后面可以有多列进行排序,默认按字典排序(desc:降序,asc(默认):升序。order by为全局排序。

  2. limit 限制输出条数

在Hive查询中限制查询输出条数

插入数据

将查询结果写入表

1
2
INSERT OVERWRITE|INTO TABLE table_name
SELECT 子句;
语句说明
OVERWRITE直接覆盖原来的数据
INTO追加到原来的数据

将查询结果写入文件

1
2
3
INSERT OVERWRITE [LOCAL] DIRECTORY directory
[ROW FORMAT row_format]
SELECT 子句;
语句说明
LOCAL查询结果写入本地文件系统,不指定则写入 HDFS
ROW FORMAT参考查询语句

示例

1
2
INSERT OVERWRITE LOCAL DIRECTORY '/home/test4'
SELECT * FROM student limit 2;

常用函数的使用

聚合函数

聚合函数通常用于获取一组数据的数字特征。

函数说明
COUNT计算某一列的个数
SUM计算某一列的合计值,该列必须为数值类型
AVG计算某一列的平均值,该列必须为数值类型
MAX计算某一列的最大值
MIN计算某一列的最小值

例子 使用聚合函数计算男生平均成绩

1
SELECT AVG(score) average FROM students WHERE gender = 'M';

例子 使用聚合函数查询表的行数

1
SELECT COUNT(*) TOTAL FROM students;

例子 使用分组聚合查询各班的人数

1
SELECT COUNT(*) TOTAL FROM students GROUP BY cls;

条件函数

1
2
3
4
5
6
CASE a 
WHEN b THEN c
WHEN d THEN e
...
ELSE f
END

上述语句可以理解为如果 a 等于 b,那么返回 c;如果 a 等于 d,那么返回 e;否则返回 f。


示例

1
Select case job when 0 then 'president' when 1 then 'student' else 'person' end from student;

根据 job 输出职位。

  • 0: president
  • 1: student
  • 其他: person

Flume

常用Source

  1. Avro Source:Avro端口监听并接收来自外部的 avro 客户流的事件。
属性名默认值说明
type-必须为avro
channel-channel的名称
bind-IP地址或者主机名
port-绑定的端口
  1. Exec Source:通过设定一个 Unix(linux) 命令监控文件。
属性名默认值说明
type-必须为exec
channel-channel的名称
command-执行的命令,常使用 tail -F file
  1. Spooling Directory Source:监控某个目录下新增的文件,并读取文件中的数据
属性名默认值说明
type-必须为spooldir
channel-channel的名称
spoolDir-监控目录

常用 Sink

  1. HDFS Sink:将事件写入 HDFS ,目前支持创建文本文件和序列文件,并支持压缩。可以根据时间长短或数据大小或事件数量定期滚动文件。
属性名默认值说明
type-必须为hdfs
hdfs.path-保存文件的目录
channel-channel的名称
hdfs.rollInterval30s回滚间隔,为零则不根据时间回滚
hdfs.rollSize1024byte回滚大小,为零则不根据大小回滚
hdfs.rollCount10回滚条数,为零则不根据条数回滚
hdfs.fileTypeSequenceFile保存到hdfs的文件格式,支持SequenceFile, DataStream 或者CompressedStream,当使用CompressedStream时需指定压缩形式
  1. Logger Sink:INFO 级别记录事件。通常用于测试/调试目的。
属性名默认值说明
type-必须为logger
channel-channel的名称

常见 Channel

  1. Memory Channel:将 agent 缓存于内存中,适用于高吞吐量并且当 agent 挂掉以后允许数据丢失的业务上。
属性名默认值说明
type-必须为memory
capacity100存储channel的最大event数
transitionCapacity100从 Source接收的channel,或发送到 Sink 的最大event数
keep-alive3添加或删除 Event 的超时时间
byteCapacityBufferPercentge20定义缓存百分比
byteCapacity-Channel 中所有 Event 总和允许的最大内存字节数
  1. File Channel:将 agent存储于磁盘上,当 agent挂掉以后数据不会丢失。
属性名默认值说明
type-必须为file
dataDirs/.flume/file-channel/data数据存储目录,可配置多个(可提高性能),用逗号分隔
checkpointDir/.flume/file-channel/checkpoint存储 checkpoint 文件的目录

编写步骤

graph LR
DEFINE(命名)-->CHANNEL(配置Channel)
CHANNEL-->SOURCE(配置Source)
SOURCE-->SINK(配置Sink)
-->BIND(绑定)

示例 采集/opt/flume/data所有数据到 HDFS 的 /flume 目录下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#配置命名
a1.sources = r1
a1.channels = c1
a1.sinks = k1

#配置source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir=/opt/flume/data

#配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100

#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://localhost:9000/flume/
a1.sinks.k1.hdfs.filePrefix = flume
a1.sinks.k1.hdfs.rollInterval = 4
a1.sinks.k1.hdfs.fileType = DataStream

#绑定
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

ZooKeeper

核心架构

graph LR
Leader
Follower1(Follower)-->Leader
Follower2(Follower)-->Leader
  1. Leader

    • 协调请求:所有事务请求,并将事务请求转换成一个提议
    • 分发提议:分发提议到所有 Follower,等待半数以上 Follower 的正确反馈
    • 分发 Commit:若半数以上 Follower 的正确反馈提议,则再次发送 Commit 请求,要求 Follower 对提议进行提交。
  2. Follower

    • 响应提议
    • 提交提议
  3. 服务器一般为奇数个。2n+12n+1 个节点可以承受nn 个节点故障。

请求处理

  1. 写请求:可以被任意服务器接收,但全部转发给 Leader
  2. 读请求:可以被任意服务器接收
  3. zxid:每次读写返回 zxid 编号,保证返回的数据不会比客户端传过来的 zxid 新。
  4. 模糊快照和日志:定期将内存的数据保存到磁盘中,形成模糊快照。

工作模式

  1. 崩溃恢复:若 Leader 挂掉了,则可以通过选举机制,选举产生新的 Leader 服务器。选举的过程称为 崩溃恢复模式。
  2. 消息广播:若过半服务器完成状态同步,则进入消息广播 模式。若新服务器加入已进行消息广播的集群中,则自动进入数据恢复模式。

数据模型

  1. 树形结构

  2. 每个节点称为 ZNode

  3. 一般在 1MB 以内

  4. 每个节点包含版本号、时间戳等信息。

  5. 不能创建一个已存在的节点。

  6. 临时节点:

    • 一旦创建,不可改变
    • 创建临时节点的会话一旦消失,临时节点也会消失
    • 临时节点不允许拥有子节点
  7. 顺序节点

    • 创建节点时,可以在路径结尾加上一个递增的计数。
  8. 节点的监听

    • 节点的状态改变时,会触发 Watch 对应的操作,且只触发一次。

选举机制

  1. Zookeeper集群中只有超过半数以上的服务器启动,集群才能正常工作。
  2. 在集群正常工作之前,每个服务器启动时发起一次选举。先给自己投票,然后和其他服务器交换数据,将投票改成id大的。
  3. 在集群正常工作时,获得票最多的节点成为 Leader
  4. 选出 Leader 之后,之前的服务器状态由 Looking 改变为 Following ,以后的服务器都是 Follower

示例

假设有7台服务器。

  1. 服务器1启动,发起一次选举。此时服务器1投自己一票。此时服务器1票数1票。(不足4票)
  2. 服务器2启动,发起一次选举。此时服务器2投自己一票。此时服务器1发现服务器2的id比自己大,更改选票投给服务器2。此时服务器1票数0票,服务器2票数1票。(不足4票)
  3. 服务器3启动,发起一次选举。此时服务器3投自己一票。此时服务器1,2发现服务器3的id最大,都更改选票投给服务器3。此时服务器1,2票数0票,服务器3票数3票。(不足4票)
  4. 服务器4启动,发起一次选举。此时服务器4投自己一票。此时服务器1,2,3发现服务器4的id最大,都更改选票投给服务器4。此时服务器1,2,3票数0票,服务器4票数4票。此时服务器4成为 Leader
  5. 后面启动的服务器,发现已经有 Leader了,就直接成为 Follower

应用

介绍

  1. Master选举
  2. 分布式锁
  3. 数据发布和订阅
  4. 分布式协调通知
  5. 心跳检测
  6. 命名服务
  7. 分布式队列
  8. 组服务

Master选举

  1. 原理:
    • ZooKeeper无法创建一个已存在的节点
    • 创建临时节点的会话一旦消失,临时节点也会消失
  2. 机制:
    • 多台机器同时申请创建同一个临时节点
    • 创建成功的机器即为 Master
    • 创建不成功的机器则监听此节点
    • Master出现故障,临时节点消失,则各个机器重新同时申请创建同一个临时节点
graph TB
APPLY(多台机器同时创建相同节点)--创建成功-->MASTER(即为Master节点)
APPLY--创建不成功-->WATCH(监听Master节点)
MASTER--挂了-->APPLY

分布式锁

  1. 原理:

    • ZooKeeper可以创建顺序节点
    • 创建临时节点的会话一旦消失,临时节点也会消失
  2. 加锁机制

    1. 一台机器申请一个临时顺序节点
    2. 获取节点所在目录的最小节点
    3. 若最小节点就是申请节点,则获得
    4. 若最小节点不是申请节点,则监听前一个节点
    5. 若监听的节点发生变化(解锁或者机器坏了),则跳转到 2 步骤
graph TB
APPLY[申请一个临时顺序节点]-->GETNODE

GETNODE[获取最小节点]-->MINNODE

MINNODE{最小节点=申请的节点}--是-->LOCK[获得锁]
MINNODE--否-->WAIT[监听前一个节点]
WAIT--节点变化-->GETNODE
  1. 解锁机制:删除加锁机器所申请的节点即可
  • 标题: Hadoop大数据技术基础
  • 作者: ObjectKaz
  • 创建于: 2021-01-24 07:05:00
  • 更新于: 2021-04-26 00:12:00
  • 链接: https://www.objectkaz.cn/d477d713584d.html
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。