PPXu

Redis分布式锁实现去服务单点问题实践

2019-02-17

问题背景

常见的单个应用服务需要完成某个功能模块,由于业务设计上的原因,该服务只允许一个而不能多个服务实例同时运行。但在高可用架构设计上,这存在单点问题,即一旦主机发生故障,如宕机或网络中断等,而导致服务终止,这种场景下我们希望能采用一种优雅的方式保证服务不中断。于是我们可以采用分布式锁来实现一主多备的高可用方案。

一主多备

我们可以实现服务的一主多备模式,有且仅有一个master,至少一个standby,当master节点失败后则由多个standby中选取一个作为主节点继续提供服务。

分布式锁

考虑到需要有多个服务节点,但同时只允许一个服务节点运行,我们可以实现分布式锁来解决问题。

分布式锁的几个要求

  • 最基本要求:互斥性(唯一性),同一时间只能被一个机器节点上的一个线程获得锁。
  • 避免死锁:可重入性。
  • 高可用的锁获取及锁释放。
  • 高性能的锁获取及锁释放。
  • 阻塞性:最好是一把阻塞锁。

分布式锁的常见三种实现方式

分布式锁的常见3种实现方式有基于数据库、缓存及zookeeper的实现,网上一抓有一大把的实现过程可以搜索,此处不详细说明,只简单说明一下各个实现的核心思路及优劣。

基于数据库的实现方式

核心思想在于:利用数据库表的唯一索引,以方法名字段作为唯一键,想要执行方法时,使用方法名向表中插入数据,成功则表示获得锁,执行完成后再删除对应的行数据以释放锁。

优点

思路简单,容易理解。

问题及优化
  1. 高可用及性能问题:基于数据库实现,数据库的可用性及性能直接影响分布式锁的可用性及性能。数据库要避免单点,需要双机部署、数据同步、主备切换等。
  2. 不可重入:因为同一个线程在释放锁之前,行数据一直存在,无法再次成功插入数据。所以,需要在表中新增一列,用于记录当前获取到锁的机器和线程信息,在再次获取锁的时候,先查询表中机器和线程信息是否和当前机器和线程相同,若相同则直接获取锁。
  3. 没有锁失效机制:因为有可能出现成功插入数据后,服务器宕机了,对应的数据没有被删除,导致锁一直被占用,不但其它节点无法获得锁,当服务恢复后也一直获取不到锁,所以,需要在表中新增一列,用于记录失效时间,并且需要有定时任务清除这些失效的数据。
  4. 非阻塞锁:获取不到锁直接返回失败,所以需要优化获取逻辑,循环多次去获取。

注意:这只是使用基于数据库的一种方法,使用数据库实现分布式锁还有很多其他的玩法!

基于Zookeeper的实现方式

ZooKeeper是一个为分布式应用提供一致性服务的开源组件,它内部是一个分层的文件系统目录树结构,规定同一个目录下只能有一个唯一文件名。基于ZooKeeper实现分布式锁的步骤如下:

(1)创建一个目录作为锁目录;
(2)线程A想获取锁就在锁目录下创建临时顺序节点;
(3)获取锁目录下所有的子节点,然后获取比自己小的兄弟节点,如果不存在,则说明当前线程顺序号最小,获得锁;
(4)线程B获取所有节点,判断自己不是最小节点,设置监听比自己次序小的节点;
(5)线程A处理完,删除自己的节点,线程B监听到变更事件,判断自己是不是最小的节点,如果是则获得锁。

优点

Zookeeper是一个分布式协同服务,使用Zookeeper实现分布式锁具有天然优势,最大的优点是API使用简单。

建议直接使用zookeeper第三方库Curator客户端,这个客户端中封装了一个可重入的锁服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
try {
return interProcessMutex.acquire(timeout, unit);
} catch (Exception e) {
e.printStackTrace();
}
return true;
}

public boolean unlock() {
try {
interProcessMutex.release();
} catch (Throwable e) {
log.error(e.getMessage(), e);
} finally {
executorService.schedule(new Cleaner(client, path), delayTimeForClean, TimeUnit.MILLISECONDS);
}
return true;
}

其它优点:

  1. 解决单点问题:ZK是集群部署的,只要集群中有半数以上的机器存活,就可以对外提供服务。
  2. 解决不可重入问题:客户端在创建节点的时候,把当前客户端的主机信息和线程信息直接写入到节点中,下次想要获取锁的时候和当前最小的节点中的数据比对一下就可以了。如果和自己的信息一样,那么自己直接获取到锁,如果不一样就再创建一个临时的顺序节点,参与排队。
  3. 解决锁失效问题:使用ZK可以有效让锁自动失效而释放。在创建锁的时候,客户端会在ZK中创建一个临时节点,一旦客户端获取到锁之后突然挂掉(Session连接断开),那么这个临时节点就会自动删除掉。其他客户端就可以再次获得锁。
  4. 解决非阻塞性问题:Watch机制,客户端可以通过在ZK中创建顺序节点,并且在节点上绑定监听器,一旦节点有变化,Zookeeper会通知客户端,客户端可以检查自己创建的节点是不是当前所有节点中序号最小的,如果是,那么自己就获取到锁。
缺点
  1. 性能问题:性能上不如使用缓存实现分布式锁。因为每次在创建锁和释放锁的过程中,都要动态创建、销毁瞬时节点来实现锁功能。ZK中创建和删除节点只能通过Leader服务器来执行,然后将数据同不到所有的Follower机器上。
  2. 并发问题:在网络抖动情况下,客户端与ZK集群的session连接断了,那么zk以为客户端宕掉了,就会删除临时节点,这时候其他客户端就可以获取到分布式锁了。就可能产生并发问题。这个问题不常见是因为zk有重试机制,一旦zk集群检测不到客户端的心跳,就会重试,Curator客户端支持多种重试策略。多次重试之后还不行的话才会删除临时节点。(所以,选择一个合适的重试策略也比较重要,要在锁的粒度和并发之间找一个平衡。)

建议尽可能的使用Zookeeper来实现分布式协同服务,但如果,业务强依赖于一个Redis集群且服务并没有使用Zookeeper的意愿,不妨可以试试使用Redis。

基于缓存的实现方式

各种成熟的缓存产品,包括Redis,memcached以及Tair,分布式锁实现思路基本类似,以Redis举例说明核心思路:
(1)获取锁的时候,使用setnx加锁,并使用expire命令为锁添加一个超时时间,超过该时间则自动释放锁,锁的标识值为一个随机生成的UUID,通过此标识在释放锁的时候进行判断。
(2)获取锁的时候还设置一个获取的超时时间,若超过这个时间则放弃获取锁。
(3)释放锁的时候,通过UUID判断是不是该锁,若是该锁,则执行delete进行锁释放。

优点

性能高是最大优点,基于内存实现的缓存系统。

  1. 解决单点问题:Redis既可单点部署,也支持集群部署。
  2. 解决不可重入问题:在一个线程获取到锁之后,把当前主机信息和线程信息保存起来,下次再获取之前先检查自己是不是当前锁的持有者。
  3. 解决锁失效问题:设定锁key失效时间。
  4. 非阻塞锁:可以循环多次去执行获取操作,直至获得锁。

至于失效时间设多久才好?
Redisson给出了解决方案–Watchdog看门狗:
先获得锁,默认过期时间30秒,如果处理完了,走正常逻辑。 对一个值加锁之后,会在自身维护一个Watchdog后台线程,维护一个内部队列,每过10秒去重新设置一下锁Key的过期时间,这样,一个锁即使对应的进程挂掉,也就维持30秒的时间,如果没有挂,并且30秒不够用了,内部队列会不断的更新这个过期时间为30秒,保证不会出现锁饥饿的问题。


以下为重设Key过期时间的核心方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void scheduleExpirationRenewal() {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}

Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
Future<Boolean> future = expireAsync(internalLockLeaseTime, TimeUnit.MILLISECONDS);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}

if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal();
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
task.cancel();
}
}

事实上,使用Redisson,还有一个好处,就是Redisson调Redis命令的底层实现,是使用Lua脚本,这样做,是因为假设有一大坨复杂的业务逻辑,可以通过封装在Lua脚本中发送给Redis,保证这段复杂的业务逻辑执行的原子性。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Future<Long> tryLockInnerAsync(final long leaseTime, final TimeUnit unit, long threadId) {
internalLockLeaseTime = unit.toMillis(leaseTime);

return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('hset', KEYS[1], 'mode', 'write'); " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (mode == 'write') then " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"end;" +
"return redis.call('pttl', KEYS[1]);",
Arrays.<Object>asList(getName()), internalLockLeaseTime, getLockName(threadId));
}
缺点
  1. 通过超时时间来控制锁的失效时间并非十分的靠谱。譬如在指定的锁Key更新时间内网络抖动导致无法正常失效时间,被认为锁被主节点主动释放了,而实际上并没有,但备节点此时可能已抢得锁,出现多客户端获得锁的问题。
  2. 最大的问题,还是在于Redis的主从集群的复制问题。客户端1对Redis Master写入锁Key的Value,此时会异步复制给Redis Slave。一旦发生Master宕机,Redis主从切换,Slave变成了新的Master,而此时若有客户端2来尝试加锁,在新的Master上完成了加锁,而客户端1也以为自己成功加了锁,此时也出现了多客户端获得锁的问题。

使用Redis分布式锁去服务单点实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private static final String REDIS_ADDRESS = "192.168.4.104:6379";

private static final String REDIS_PASSWORD = "redis";

private static final String LOCK_KEY = "redis_lock";

public static void runWithRedisLock(String[] args) {

RedissonClient redissonClient = RedisUtils.getRedissonClientInstance(
REDIS_ADDRESS, REDIS_PASSWORD);

RedissonLock lock = (RedissonLock) redissonClient.getLock(LOCK_KEY);

// 异步方式尝试Redis加锁操作
Future<Boolean> tryLockAsync = lock.tryLockAsync(1, TimeUnit.SECONDS);

try {
// 一定要通过调get方法拿到执行加锁的结果,因为是异步方式加锁,调此方法会阻塞直至拿到执行结果
if (tryLockAsync.get()) {
// 尝试执行业务逻辑,如果发生异常,则释放锁
try {
Application.runApplication(args);
} catch (Exception e) {
ILOG.error("run application exception, ", e);
if( lock.isHeldByCurrentThread() ) {
lock.unlock();
}
}
} else {
ILOG.info("lock[{}] is being held", lock.getName());
Application.stopApplication(args);
}
} catch (InterruptedException e) {
ILOG.error("lock interrupted exception, ", e);
} catch (ExecutionException e) {
ILOG.error("lock executed exception, ", e);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import org.redisson.Config;
import org.redisson.Redisson;
import org.redisson.RedissonClient;
import org.redisson.SingleServerConfig;

public class RedisUtils {

private static String address;

private static String password;

public static RedissonClient getRedissonClientInstance(String redisAddress,
String redisPassword) {
address = redisAddress;
password = redisPassword;
return RedissonClientHolder.instance;
}

private static class RedissonClientHolder {
private static RedissonClient instance = createRedissonClient(address,
password);
}

private static RedissonClient createRedissonClient(String address,
String password) {
try {
Config config = new Config();
SingleServerConfig singleSerververConfig = config.useSingleServer();
singleSerververConfig.setAddress(address).setPassword(password)
.setConnectionMinimumIdleSize(1).setConnectionPoolSize(64);
RedissonClient redissonClient = Redisson.create(config);
return redissonClient;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}

降级处理的实践

如上所述,如果Redis(或Redis集群)服务宕掉了,或者当网络抖动或Redis集群主从切换导致的各种异常导致Redis方式不可行,于是,需要做降级处理,所谓降级,就是异常情况下的备胎处理方式,这里我们先假设最容易处理的一种方式,如果系统中只有一主一从两个节点,那么我们可以简单处理,让双方错开一个定时间隔,分别使用netCat命令向对方进行端口检测,如果检测到对方节点中有正在运行的服务,则不启动甚至退出自己当前的服务。如果对方节点不在运行服务,则说明自己可以“获得锁”,便可运行服务,取代对方成为主节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(final String[] args) throws InterruptedException {

// 设置定时器,定时检测运行
Timer timer = new Timer();

timer.schedule(new TimerTask() {

@Override
public void run() {
try {
runWithRedisLock(args);
} catch (Exception e) {
runWithNetCat(args);
}
}
}, 0, 10000);

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private static volatile boolean isRunning = false;

// 假设两台机器服务地址+端口分别为[对方:"192.168.11.20:8080", 本机:"192.168.11.21:8080"]

// 对方的服务地址
private static final String ADDRESS = "192.168.11.20";

// 对方的服务端口
private static final String PORT = "8080";

public static void runWithNetCat(String[] args) {
if (isRunning) {
ILOG.info("application is already running");
return;
}
boolean hasRunner = false;
if (netCat(ADDRESS, PORT)) {
hasRunner = true;
Application.stopApplication(args);
}
if (!hasRunner) {
Application.runApplication(args);
}
}

private static boolean netCat(String host, int port) {
try {
NetUtil.netCat(host, port, new byte[] {});
return true;
} catch (Exception e) {
ILOG.warn("net check error. ", e);
return false;
}
}

使用服务端口检测方法的优缺点

这样的做法非常容易理解且操作简单,但也存在问题,譬如当两者不能互相ping通对方时,则两者可能都以为自己可以升级为主,造成脑裂现象。

降级处理的更优实践

由上分析,我们得知依赖网络去简单做服务端口检测会存在不可靠,并且条件也限定在系统中只有一主一备两节点,如果存在多台节点服务,即一主多备时,我们还是要保证顺序一致性。基于这一点考虑,我们若使用数据库的实现方式,便可以满足实现上更简单、更可靠的降级要求。

基于Spring Boot的启动优化方案

Spring Boot提供了 CommandLineRunner 接口,实现了 CommandLineRunner 接口的 Component 会在所有 Spring Beans 都初始化之后,SpringApplication.run() 之前执行,我们可以在这个方法里hold住服务的启动,在这里边作加锁成功的判断,从而实现一种只加载容器但不启动服务的“预加载”方案,这么做是因为,我们知道,spring应用启动的过程中,容器加载是较为耗时的,这种“待机式”的方案,可以让应用先准备好,当需要启动服务时便可更快速地进入服务状态。而除 CommandLineRunner 之外,使用ApplicationRunner也可以达到相同的目的,两者差别不大。

Reference

Zookeeper分布式锁原理图:七张图彻底讲清楚ZooKeeper分布式锁的实现原理【石杉的架构笔记】

扫描二维码,分享此文章