2018-11-22 16:59:47 3

业界常用分布式锁实现

前言

    在互联网应用快速发展的今天,系统集成逐步庞大。系统内部分布式部署偏向大规模集群,加之会对外提供大量接口。对内会导致同一资源大量竞争性占用,对外则可能会导致同一业务重复调用,产生异常重复交易(虽然可以通过关系数据库来约束,但是大并发下数据库性能会成为瓶颈)。当竞争性操作产生第一想到的就是加锁,那再多个系统之间如何实现类似多线程竞争下的Synchronize呢?有需求就会有动力,这里就出现了分布式锁的概念。分布式锁顾名思义就是在处理分布式应用环境下对不同主机共享同一资源排斥手段,用于防止彼此之间的干扰以及竞争带来的性能及数据一致性问题。


锁的定义

    这里只列举常见的分布式锁类型,排他锁,以及共享锁。

    排它锁(Exclusive Locks,简称X锁),又称为写锁或独占锁,是一种基本的锁类型。当加锁人加锁后,只有加锁人可以对加锁内容进行读写操作,其他非加锁人统一屏蔽加锁内容,直到加锁人释放了排他锁。

    共享锁(Shared Locks,简称S锁),又称为读锁,同样是一种基本的锁类型。当加锁人加锁后,加锁人可以对加锁内容进行读写操作。其他希望读资源的也必须先加锁共享锁,但只有前面加锁的人释放锁后后一个加锁的才能进行写操作。

    锁的概念与是否分布式无关,最常见的锁多用于数据库上的读写事务操作。在处理竞争性问题上锁的概念尤为重要,包括Java里的线程锁,全局锁以及原子锁等。锁能够限制不同个体对同一资源的访问按锁的规则进行,但加锁操作本身是会损耗一定的效率和资源的(在下文实际案例中说明),所以加锁的范围务必要尽可能缩小和精确。


Zookeeper实现分布式锁

    zookeeper相信都不陌生了,zookeeper是一个分布式的,开放源码的分布式应用程序协调服务,多用于服务注册上,号称与Dubbo支撑起阿里亿万级别的业务服务访问。zookeeper实现中有个znode的概念,znode是一个与Unix文件系统路径类似的节点,可以往节点中存取数据(可理解为一个目录文件)。当节点创建时可以通过Watcher注册事件消息到其他节点上,以此实现当连接超时、节点数据变更、子节点变更(分布式锁实现用)时可以触发相应的行为事件。正是因为zookeeper在处理分布式协调上的优异性以及内部节点Watcher的实现,使得Zookeeper拥有了分布式锁的先天条件。

    zookeeper实现排他锁

    从上面排他锁的概念知道,排它锁的核心是如何保证当前有且仅有一个事务会话获得锁,并且锁被释放后,所有正在等待获取锁的事务都能够被通知到。下面我们就来看看如何借助ZooKeeper实现排它锁。

    在上面zookeeper的概念中知道,zookeeper是通过类似文件系统的节点来实现数据存储的,并且在节点创建和数据变更时都可以对其他节点做Watcher注册,通知其他节点做状态变更。通过这个特性,我们可以对创建的节点进行锁的获取及通知其他节点操作。当需要获取排他锁时,所有客户端(资源使用方)都会试图调用Zookeeper的create()方法,在exclusive_lock(持久节点)底下创建一个子节点lock(临时节点),这个时候就可以触发Zookeeper的分布式竞争特性(Leader选举算法),会保证所有竞争的客户端中,最终只有一个客户端能创建节点成功,这时可以理解为该客户端获取到了锁,同时其他没有获取到锁的客户端就在exclusive_lock节点上注册一个节点变更的Watcher监听。

    初始化节点代码:

    public CuratorLock() throws Exception {
        RetryNTimes retry = new RetryNTimes(times, sleepMsBetweenRetries);
        client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs, retry);
        client.start();
        afterPropertiesSet();
    }

    private void afterPropertiesSet() throws Exception {
        if (client.checkExists().forPath(LOCK_PROJECT) == null) {
            client.create().creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT)
                    .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                    .forPath(LOCK_PROJECT);
        }

        client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
            @Override
            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                System.out.println("session状态发生变化:" + connectionState);
            }
        });

        PathChildrenCache childrenCache = new PathChildrenCache(client, LOCK_PROJECT, true);
        childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
                    String path = pathChildrenCacheEvent.getData().getPath();
                    if (path.contains(ITEM_LOCK)) {
                        zkLockLatch.countDown();
                        System.out.println(threadName + "=====节点删除释放了锁");
                    }
                }
            }
        });
    }


    这个时候当exclusive_lock节点下子节点发生变化时,所有注册了Watcher的客户端都会接收到变化通知,这个时候就相当于释放锁操作。其他客户端就再次发起节点创建请求,以此重复实现排他锁。当获取到锁的客户端主动释放锁的时候其他客户端可以获得锁,但是这个时候会有一种情况,当获得锁的客户端宕机无法主动释放锁的时候怎么办呢?这里又提现了zookeeper对分布式节点操作的特性了,当一个节点客户端发生异常是,zookeeper会主动删除获得锁客户端的节点(可通过设置获得锁临时节点的会话失效时间防止死锁),这个时候同样会触发节点变化Watcher操作,实现锁重取。

    加锁代码

    public void lock(long waitTime) {
        lockTime = System.currentTimeMillis();
        while(true) {
            try {
                client.create().creatingParentsIfNeeded()
                        .withMode(CreateMode.EPHEMERAL)
                        .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                        .forPath(LOCK_PROJECT + ITEM_LOCK);
                System.out.println(threadName + "=====获得了锁");
                return;
            }
            catch (Exception e) {
                System.out.println(threadName + "=====获取锁失败,等待释放锁");
                zkLockLatch = new CountDownLatch(1);

                try {
                    zkLockLatch.await(waitTime, TimeUnit.MILLISECONDS);
                    System.out.println("=====线程唤醒");
                    if (sessionTimeoutMs < (System.currentTimeMillis() - lockTime - 1000)) {
                        System.out.println("=======加锁超时,触发被动解锁");
                        lockTime = System.currentTimeMillis();
                        releaseLock();
                    }
                } catch (InterruptedException e1) {
                    throw new RuntimeException("等待锁失败", e1);
                }
            }
        }
    }

    排他锁也可以通过多临时节点实现,由于zk生成节点是有序的,通过这个有序性,节点可以只订阅前一节点的解锁操作,这样还可以防止羊群效应产生(出现大量加锁节点时释放锁会导致同时大量发送解锁操作给等待锁节点)。


    zookeeper实现共享锁

    通过上面对共享锁的概念可以知道,共享锁跟排他锁的区别是共享锁对所有加锁客户端在没有写操作时都可读。这里同样可以通过zookeeper的节点操作来实现,具体操作如下

    1、客户端在zookeeper创建完节点后需要获取shared_lock持久节点底下所有子节点,并注册Watcher监听

    2、确认自己节点在全部节点中所在的序号

    3、读请求需要没有比自己小的节点或者比自己小的节点都是读请求才行(防止不可重复读和幻读的情况),写请求需要自己是最小序号节点

    4、接收到Watcher通知后重复第2点继续判断

    5、释放锁操作与排他锁一致,通过zookeeper特性实现


    Zookeeper释放锁操作

    public void releaseLock() {
        try {
            if (client.checkExists().forPath(LOCK_PROJECT + ITEM_LOCK) != null) {
                client.delete().forPath(LOCK_PROJECT + ITEM_LOCK);
            }
            System.out.println(threadName + "=====释放锁");
        }
        catch(Exception e) {
            System.out.println(threadName + "=====释放锁失败");
        }
    }


Redis实现分布式锁

    Redis同样是高并发环境下的产物,C语言写的单线程缓存应用。在处理分布式锁下通过具有很大的优势(主要是集群情况下高可用)。

     Redis在数据缓存及Nosql数据库应用环境下可以说是最热门的存在,在处理快速存储,快速查找等应用领域非常强大,同时因为redis对缓存有效期可靠性很高,以及全局统一性(类似Session共享缓存),使得redis同样具有了分布式锁的特性。

    本文只对Redis在分布式锁上的应用实现,不会过多说明Redis的使用配置,如需了解可自行百度。

    正常情况下Redis对锁的获取流程如下:

    当客户端发起加锁请求时,会设置$timeout巡查获取间隔时间,每隔指定时长获取一次锁,直到获取成功。当客户端获取到锁后如果在指定时长内没有释放会自动释放(处理客户端宕机的情况)。

    发起加锁代码

    Jedis实现,需要避免设置key与超时时间的非原子性

public class RedisTool {
    private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";
  
    /**
     * 尝试获取分布式锁
     * @param jedis Redis客户端
     * @param lockKey 锁
     * @param requestId 请求标识
     * @param expireTime 超期时间
     * @return 是否获取成功
     */
    public static boolean tryGetDistributedLock(Jedis jedis, String lockKey,
        String requestId, int expireTime) {
        String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
  
        if (LOCK_SUCCESS.equals(result)) {
            return true;
        }
  
        return false;
    }
}

    第一个参数为key,我们使用key来当锁,因为key是唯一的。

    第二个参数为value,我们传的是requestId,很多童鞋可能不明白,有key作为锁不就够了吗,为什么还要用到value?

        原因就是我们在上面讲到可靠性时,分布式锁要满足第四个条件解铃还须系铃人,通过给value赋值为requestId,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。requestId可以使用UUID.randomUUID().toString()方法生成。

    第三个参数为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;

    第四个参数为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定。

    第五个参数为time,与第四个参数相呼应,代表key的过期时间。

    总的来说,执行上面的set()方法就只会导致两种结果:1. 当前没有锁(key不存在),那么就进行加锁操作,并对锁设置个有效期,同时value表示加锁的客户端。2. 已有锁存在,不做任何操作。

    lettuce实现,通过redisTemplate操作原子性加锁

private boolean tryLock(String value, long expiredTime) {
    try {
        return redisTemplate.execute((RedisCallback<Boolean>) connection -> {
            Object nativeConnection = connection.getNativeConnection();
            String redisResult = "";
            RedisSerializer<String> stringRedisSerializer = (RedisSerializer<String>) redisTemplate.getKeySerializer();
            //lettuce连接包下序列化键值,否知无法用默认的ByteArrayCodec解析            
            byte[] keyByte = stringRedisSerializer.serialize(lockKey);
            byte[] valueByte = stringRedisSerializer.serialize(value);
            // lettuce连接包下 redis 单机模式setnx            
            if (nativeConnection instanceof RedisAsyncCommands) {
                RedisAsyncCommands commands = (RedisAsyncCommands)nativeConnection;
                //同步方法执行、setnx禁止异步
                redisResult = commands.getStatefulConnection().sync().set(keyByte, valueByte, SetArgs.Builder.nx().ex(expiredTime / 1000));
            }
            // lettuce连接包下 redis 集群模式setnx
            if (nativeConnection instanceof RedisAdvancedClusterAsyncCommands) {
                RedisAdvancedClusterAsyncCommands clusterAsyncCommands = (RedisAdvancedClusterAsyncCommands) nativeConnection;
                redisResult = clusterAsyncCommands.getStatefulConnection().sync().set(keyByte, keyByte, SetArgs.Builder.nx().ex(expiredTime / 1000));
            }
            //返回加锁结果            
            return "OK".equalsIgnoreCase(redisResult);
        });
    } catch (Exception e) {
        e.printStackTrace();
        return false;
    }
}

    集群模式下需要通过Redission操作实现加锁

    Redis解锁操作

public class RedisTool {
    private static final Long RELEASE_SUCCESS = 1L;
 
    /**
     * 释放分布式锁
     * @param jedis Redis客户端
     * @param lockKey 锁
     * @param requestId 请求标识
     * @return 是否释放成功
     */
    public static boolean releaseDistributedLock(Jedis jedis, String lockKey,
        String requestId) {
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
 
        if (RELEASE_SUCCESS.equals(result)) {
            return true;
        }
 
        return false;
    }
}

    可以看到,我们解锁只需要两行代码就搞定了!第一行代码,我们写了一个简单的Lua脚本代码(确保操作事务一致性)。第二行代码,我们将Lua代码传到jedis.eval()方法里,并使参数KEYS[1]赋值为lockKey,ARGV[1]赋值为requestId。eval()方法是将Lua代码交给Redis服务端执行。

    那么这段Lua代码的功能是什么呢?其实很简单,首先获取锁对应的value值,检查是否与requestId相等,如果相等则删除锁(解锁)。那么为什么要使用Lua语言来实现呢?因为要确保上述操作是原子性的。关于非原子性会带来什么问题,那就是防止解锁操作部分异常导致的解锁失败 。那么为什么执行eval()方法可以确保原子性,源于Redis的特性,下面是官网对eval命令的部分解释:

    简单来说,就是在eval命令执行Lua代码的时候,Lua代码将被当成一个命令去执行,并且直到eval命令执行完成,Redis才会执行其他命令

    附上通过redisTemplate操作Lua脚本的解锁代码

public void releaseLock(String requestId) {
    System.out.println(Thread.currentThread().getName() + "删除锁:" + requestId);
    String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end ";

    DefaultRedisScript<?> redisScript = new DefaultRedisScript<>();
    redisScript.setScriptText(script);

    try {
        redisTemplate.execute(redisScript, Collections.singletonList(lockKey), requestId);
    } catch (Exception e) { }
}


总结

    在生成环境中要使用哪个技术主要取决于具体业务范畴,不过思想是一致的,当业务量逐步庞大之后,高并发无可避免。只有在通过不断的学习积累才不至于在半夜起来重启机器!!


← Previous Next →

0 Responses to "业界常用分布式锁实现"

Leave a Reply

你没有登陆,必须填写name才能提交评论*

*

*

Copyright Obscura 2017. Design by zqliang. All Rights Reserved.Collect from 小梁
粤ICP备18034967号