要解决什么问题?

现在在互联网的场景中大家的服务应该都是进行分布式部署的,分布式锁和我们之前Java应用中的锁其实都是为了解决多线程的问题。只是说在以前单机部署情况下使用的如 SynchronizedLock 等锁在分布式的场景下某些业务无法满足,需要我们使用分布式锁来解决这一分布式竞争的情况。

分布式锁会出现哪样的一些问题?

互斥(只有一个客户端能获取锁)

互斥是最基本的、在同一时刻只能有一个客户端能够获取到锁,这个通过数据库、RedisZookeeper等特性都能够保证。

不能死锁

死锁、顾名思义,就是说锁一直都不被释放,导致其他线程或任务一直都阻塞在这里,永远都不能获取到锁(如果是阻塞锁可能会使任务一直堆积)。

不能死锁很多人也会回答说给我们的锁来设置一个过期时间(比如说Redis锁)

这边给一个我们线上出现死锁的情况,我们来看一下这段代码:

1
2
3
4
5
6
7
8
9
10
11
12
public boolean tryLock(String key, int timeToLive) throws Exception {
if (timeToLive <= 0) {
timeToLive = TTL;
}

Long v = jedisCluster.incr(key);
if (target.equals(v)) {
jedisCluster.expire(key, timeToLive);
return true;
}
return false;
}

我们看到这段代码中是使用JedisCluster.incr(key)来创建分布式锁的,如果说获取到锁了那么返回结果应该是1L,成功后便设置锁的过期时间,这段代码看起来是没问题,但是在我们线上的情况竟然发现了死锁!

这是什么情况呢,某天发现该分布式锁下面代码逻辑都没有再执行,到redis里面查看了该key的值发现竟然有7w多,明显是执行了7w多次tryLock最终还是没有能够获取到锁,但是难道一次代码逻辑需要执行这么多次吗?很明显不是,按道理来讲如果锁了一定时间后这个key应该是会过期的,我们的timeToLive设置的值是60s,不会说一直在这里不释放的。

后面查看应用日志发现我们的redis在某1s内突然失去了连接(具体原因不明), 于是恍然大悟, 在我们tryLock的逻辑中是先设置key再去设置过期时间,这两者并不是一个原子性的操作,可能刚好在这个时间点我们设置好了key的值但是没来得及设置过期时间 redis 失去了连接,导致我们的过期时间设置失败,所以这个key一直没能够释放。

明白了原因我们就知道这种分布式锁的实现方式其实是存在明显漏洞的,要保证给锁设置值和过期时间是一个原子性的操作才能够保证不被死锁。

我们把这个方式直接改成了使用redis命令来设置:

SET my:lock 随机值 NX PX 30000

执行这个命令就 ok。

  • NX:表示只有 key 不存在的时候才会设置成功。(如果此时 redis 中存在这个 key,那么设置失败,返回 nil)
  • PX 30000:意思是 30s 后锁自动释放。别人创建的时候如果发现已经有了就不能加锁了。

容错

我这边理解的容错可能需要应对以下几种情况:

  • 分布式锁依赖的中间件不可用(如RedisZookeeper数据库等)
  • 应用突然宕机(包括应用发布过程)
  • 中间件集群(如Redis集群中只要大部分节点创建了这把锁就可以,N个Redis节点,需要 (N/2 + 1) 个节点以上)
  • 过期时间超时后不能让该线程解锁

我这边讲一下最后一个点,也就是说如果分布式锁过期了但是该线程中的任务还没有结束,当该线程的执行逻辑结束之后其实之前他自己获得的锁已经被释放了,这时候它如果再去做解锁的操作其实是将其他线程获取的锁给释放掉了,这种情况肯定是会有问题的。

所以在分布式锁释放的时候我们必须保证一个线程释放的锁必须是这个线程获得的,针对Redis分布式锁来说我们可以使用lua脚本来做释放锁的操作:

1
2
3
4
5
6
-- 删除锁的时候,找到 key 对应的 value,跟自己传过去的 value 做比较,如果是一样的才删除。
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end

这里我们需要判断value,这个value是我们在tryLock的时候去创建的,我们可以使用UUID之类的随机数(要确保每次创建的都不会一致)来创建、然后在删除的时候去判断这个value,不相同说明不是该线程创建的。

实现分布式锁的几种方式

其实实现分布式锁的方式有很多种,基于Redis的我上面基本都介绍了,我这边着重讲一下基于Zookeeper来实现分布式锁。

zk 分布式锁,其实可以做的比较简单,就是某个节点尝试创建临时 znode,此时创建成功了就获取了这个锁;这个时候别的客户端来创建锁会失败,只能注册个监听器监听这个锁。释放锁就是删除这个 znode,一旦释放掉就会通知客户端,然后有一个等待着的客户端就可以再次重新加锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
public class ZooKeeperSession {

private static CountDownLatch connectedSemaphore = new CountDownLatch(1);

private ZooKeeper zookeeper;
private CountDownLatch latch;

public ZooKeeperSession() {
try {
this.zookeeper = new ZooKeeper("192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181", 50000, new ZooKeeperWatcher());
try {
connectedSemaphore.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("ZooKeeper session established......");
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 获取分布式锁
*
* @param productId
*/
public Boolean acquireDistributedLock(Long productId) {
String path = "/product-lock-" + productId;

try {
zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
return true;
} catch (Exception e) {
while (true) {
try {
// 相当于是给node注册一个监听器,去看看这个监听器是否存在
Stat stat = zk.exists(path, true);

if (stat != null) {
this.latch = new CountDownLatch(1);
this.latch.await(waitTime, TimeUnit.MILLISECONDS);
this.latch = null;
}
zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
return true;
} catch (Exception ee) {
continue;
}
}

}
return true;
}

/**
* 释放掉一个分布式锁
*
* @param productId
*/
public void releaseDistributedLock(Long productId) {
String path = "/product-lock-" + productId;
try {
zookeeper.delete(path, -1);
System.out.println("release the lock for product[id=" + productId + "]......");
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 建立zk session的watcher
*
*/
private class ZooKeeperWatcher implements Watcher {

public void process(WatchedEvent event) {
System.out.println("Receive watched event: " + event.getState());

if (KeeperState.SyncConnected == event.getState()) {
connectedSemaphore.countDown();
}

if (this.latch != null) {
this.latch.countDown();
}
}

}

/**
* 封装单例的静态内部类
*
*/
private static class Singleton {

private static ZooKeeperSession instance;

static {
instance = new ZooKeeperSession();
}

public static ZooKeeperSession getInstance() {
return instance;
}

}

/**
* 获取单例
*
* @return
*/
public static ZooKeeperSession getInstance() {
return Singleton.getInstance();
}

/**
* 初始化单例的便捷方法
*/
public static void init() {
getInstance();
}

}

也可以采用另一种方式,创建临时顺序节点:

如果有一把锁,被多个人给竞争,此时多个人会排队,第一个拿到锁的人会执行,然后释放锁;后面的每个人都会去监听排在自己前面的那个人创建的 node 上,一旦某个人释放了锁,排在自己后面的人就会被 zookeeper 给通知,一旦被通知了之后,就 ok 了,自己就获取到了锁,就可以执行代码了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
public class ZooKeeperDistributedLock implements Watcher {

private ZooKeeper zk;
private String locksRoot = "/locks";
private String productId;
private String waitNode;
private String lockNode;
private CountDownLatch latch;
private CountDownLatch connectedLatch = new CountDownLatch(1);
private int sessionTimeout = 30000;

public ZooKeeperDistributedLock(String productId) {
this.productId = productId;
try {
String address = "192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181";
zk = new ZooKeeper(address, sessionTimeout, this);
connectedLatch.await();
} catch (IOException e) {
throw new LockException(e);
} catch (KeeperException e) {
throw new LockException(e);
} catch (InterruptedException e) {
throw new LockException(e);
}
}

public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
connectedLatch.countDown();
return;
}

if (this.latch != null) {
this.latch.countDown();
}
}

public void acquireDistributedLock() {
try {
if (this.tryLock()) {
return;
} else {
waitForLock(waitNode, sessionTimeout);
}
} catch (KeeperException e) {
throw new LockException(e);
} catch (InterruptedException e) {
throw new LockException(e);
}
}

public boolean tryLock() {
try {
// 传入进去的locksRoot + “/” + productId
// 假设productId代表了一个商品id,比如说1
// locksRoot = locks
// /locks/10000000000,/locks/10000000001,/locks/10000000002
lockNode = zk.create(locksRoot + "/" + productId, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

// 看看刚创建的节点是不是最小的节点
// locks:10000000000,10000000001,10000000002
List<String> locks = zk.getChildren(locksRoot, false);
Collections.sort(locks);

if(lockNode.equals(locksRoot+"/"+ locks.get(0))){
//如果是最小的节点,则表示取得锁
return true;
}

//如果不是最小的节点,找到比自己小1的节点
int previousLockIndex = -1;
for(int i = 0; i < locks.size(); i++) {
if(lockNode.equals(locksRoot + “/” + locks.get(i))) {
previousLockIndex = i - 1;
break;
}
}

this.waitNode = locks.get(previousLockIndex);
} catch (KeeperException e) {
throw new LockException(e);
} catch (InterruptedException e) {
throw new LockException(e);
}
return false;
}

private boolean waitForLock(String waitNode, long waitTime) throws InterruptedException, KeeperException {
Stat stat = zk.exists(locksRoot + "/" + waitNode, true);
if (stat != null) {
this.latch = new CountDownLatch(1);
this.latch.await(waitTime, TimeUnit.MILLISECONDS);
this.latch = null;
}
return true;
}

public void unlock() {
try {
// 删除/locks/10000000000节点
// 删除/locks/10000000001节点
System.out.println("unlock " + lockNode);
zk.delete(lockNode, -1);
lockNode = null;
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}

public class LockException extends RuntimeException {
private static final long serialVersionUID = 1L;

public LockException(String e) {
super(e);
}

public LockException(Exception e) {
super(e);
}
}
}

基于RedisZk 来实现分布式锁其实有一个最本质的区别:

  • redis 分布式锁,其实需要自己不断去尝试获取锁,比较消耗性能。
  • zk 分布式锁,获取不到锁,注册个监听器即可,不需要不断主动尝试获取锁,性能开销较小。

另外一点就是,如果是 redis 获取锁的那个客户端 出现 bug 挂了,那么只能等待超时时间之后才能释放锁;而 zk 的话,因为创建的是临时 znode,只要客户端挂了,znode 就没了,此时就自动释放锁。

但是,在我们高并发的场景下,基于Zookeeper实现的分布式锁的qps不能太高,其性能是要低于Redis的,所以总的来说,基于ZookeeperRedis 实现的分布式锁各有优缺点,大家在选择的时候可以根据业务具体去看哪一种符合你的业务场景。