redis集群模式下对事务的支持

2019/02/20 Redis

redis的分片集群模式是不支持redis的事务的,因为不同的key会做hash算法,分布到不同的哈希槽内,这就破坏了redis操作的原子性,在这种模式下面是不能支持事务的,但是集群里边的每个节点是可以支持事务。

为了实现要操作的key分布到同一个节点下面,就需要引入另外的一个概念:Hash Tags

Hash Tags

首先我们需要了解分片技术的矛盾什么地方:
即要求key尽可能地分散到不同机器,又要求某些相关联的key分配到相同机器。

分片,就是一个hash的过程:对key做md5,sha1等hash算法,根据hash值分配到不同的机器上。
为了实现将key分到相同机器,就需要相同的hash值,即相同的key(改变hash算法也行,但不简单)。但key相同是不现实的,因为key都有不同的用途。
例如,user:user1:ids保存用户的tweets ID, user:user1:tweets保存tweet的具体内容,两个key不可能同名。
仔细观察user:user1:ids和user:user1:tweets,两个key其实有相同的地方,即user1。能不能拿这一部分去计算hash呢?
这就是 Hash Tag ,允许用key的部分字符串来计算hash。

当一个key包含 {} 的时候,就不对整个key做hash,而仅对 {} 包括的字符串做hash。

假设hash算法为sha1。对user:{user1}:ids和user:{user1}:tweets,其hash值都等同于sha1(user1)。

上面的原理也都解释清楚了,接下来就直接上代码了


redis集群模式的配置

1、spring配置redis集群

spring-redis.xml

<!--Redis集群配置 redis 3.0 以后提供集群模式 spring-data-redis 1.8.7以后提供集群api-->
<bean id="redisClusterConfiguration" class="org.springframework.data.redis.connection.RedisClusterConfiguration">
 <property name="maxRedirects" value="3"/>
 <property name="clusterNodes">
 <set>
 <bean class="org.springframework.data.redis.connection.RedisClusterNode">
 <constructor-arg name="host" value="${session.redis.cluster.host1}" />
 <constructor-arg name="port" value="${session.redis.cluster.port1}"/>
 </bean>
 <bean class="org.springframework.data.redis.connection.RedisClusterNode">
 <constructor-arg name="host" value="${session.redis.cluster.host2}" />
 <constructor-arg name="port" value="${session.redis.cluster.port2}"/>
 </bean>
 <bean class="org.springframework.data.redis.connection.RedisClusterNode">
 <constructor-arg name="host" value="${session.redis.cluster.host3}" />
 <constructor-arg name="port" value="${session.redis.cluster.port3}"/>
 </bean>
 <bean class="org.springframework.data.redis.connection.RedisClusterNode">
 <constructor-arg name="host" value="${session.redis.cluster.host4}" />
 <constructor-arg name="port" value="${session.redis.cluster.port4}"/>
 </bean>
 <bean class="org.springframework.data.redis.connection.RedisClusterNode">
 <constructor-arg name="host" value="${session.redis.cluster.host5}" />
 <constructor-arg name="port" value="${session.redis.cluster.port5}"/>
 </bean>
 <bean class="org.springframework.data.redis.connection.RedisClusterNode">
 <constructor-arg name="host" value="${session.redis.cluster.host6}" />
 <constructor-arg name="port" value="${session.redis.cluster.port6}"/>
 </bean>
 </set>
 </property>
</bean>

<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
 <property name="maxIdle" value="${redis.maxIdle}"/>
 <property name="maxTotal" value="${redis.maxTotal}"/>
</bean>

<bean id="jedisConnectionFactory"
 class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" p:usePool="true" >
 <constructor-arg name="clusterConfig" ref="redisClusterConfiguration"/>
 <constructor-arg name="poolConfig" ref="jedisPoolConfig" />
</bean>

<!--redis访问模板-->
<bean id ="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
 <property name="connectionFactory" ref="jedisConnectionFactory" />
</bean>

2、redis.properties配置

session.redis.cluster.host1=XX.XX.XX.XX
session.redis.cluster.host2=XX.XX.XX.XX
session.redis.cluster.host3=XX.XX.XX.XX
session.redis.cluster.host4=XX.XX.XX.XX
session.redis.cluster.host5=XX.XX.XX.XX
session.redis.cluster.host6=XX.XX.XX.XX
session.redis.cluster.port1=7379
session.redis.cluster.port2=7380
session.redis.cluster.port3=7381
session.redis.cluster.port4=7379
session.redis.cluster.port5=7380
session.redis.cluster.port6=7381

redis.maxIdle=20
redis.maxTotal=100

3、配置一个JedisCluster的Bean

JedisClusterConfig.java

/**
 * Created by guonl
 * Date 2019/2/21 1:19 PM
 * Description: 从RedisClusterConfiguration中获取redis的连接参数
 */
@Configuration
public class JedisClusterConfig {

    @Autowired
    private RedisClusterConfiguration cacheRedisClusterConfiguration;

    @Bean
    public JedisCluster getJedisCluster() {
        Set<HostAndPort> nodes = new HashSet<>();
        Set<RedisNode> clusterNodes = cacheRedisClusterConfiguration.getClusterNodes();
        for (RedisNode clusterNode : clusterNodes) {
            nodes.add(new HostAndPort(clusterNode.getHost(), clusterNode.getPort()));
        }
        return new JedisCluster(nodes);
    }
}

4、redis 集群管理事务的工具类

JedisClusterTransactionManager.java

/**
 * Created by guonl
 * Date 2019/2/21 12:47 PM
 * Description: redis集群事务管理工具类
 */
public class JedisClusterTransactionManager {

    private static ThreadLocal<Object> txThreadLocal = new ThreadLocal<>();
    private static ThreadLocal<JedisCluster> clusterThreadLocal= new ThreadLocal<>();

    //开启事务
    public static void multi(JedisCluster jedisCluster){
        clusterThreadLocal.set(jedisCluster);
    }

    /**
     * 保存string数据类型
     * @param key
     * @param value
     * @param seconds 过期时间
     */
    public static void set(String key,String value,int seconds) {
        Transaction tx = getTxByKey(key);
        tx.set(key, value);
        tx.expire(key,seconds);
    }

    /**
     * 批量保存string数据类型
     * @param keysvalues
     */
    public static void mset(String... keysvalues) {
        if(keysvalues!=null && keysvalues.length>0) {
            for(int i=0;i<keysvalues.length;i+=2) {
                String key = keysvalues[i];
                String value = keysvalues[i+1];
                Transaction tx = getTxByKey(key);
                tx.set(key, value);
            }
        }
    }

    /**
     * 保存hash数据类型
     * @param key
     * @param value
     */
    public static void hset(String key, String field,String value) {
        Transaction tx = getTxByKey(key);
        tx.hset(key, field, value);
    }

    /**
     * 批量保存hash数据类型
     * @param key
     * @param hash
     */
    public static void hmset(String key,Map<String,String> hash) {
        Transaction tx = getTxByKey(key);
        tx.hmset(key, hash);
    }

    /**
     * 保存list数据类型
     * @param key
     * @param values
     */
    public static void lpush(String key,String... values) {
        Transaction tx = getTxByKey(key);
        tx.lpush(key, values);
    }

    /**
     * 保存set数据类型
     * @param key
     * @param member
     */
    public static void sadd(String key,String... member) {
        Transaction tx = getTxByKey(key);
        tx.sadd(key, member);
    }

    /**
     * 保存sorted set数据类型
     * @param key
     * @param scoreMembers
     */
    public static void zadd(String key,Map<String,Double> scoreMembers) {
        Transaction tx = getTxByKey(key);
        tx.zadd(key, scoreMembers);
    }

    /**
     * 通用删除
     * @param keys
     */
    public static void del(String... keys) {
        if(keys!=null && keys.length>0) {
            for(String key:keys) {
                Transaction tx = getTxByKey(key);
                tx.del(key);
            }
        }
    }


    /**
     * 删除hash
     * @param key
     * @param field
     */
    public static void hdel(String key,String... field) {
        Transaction tx = getTxByKey(key);
        tx.hdel(key, field);
    }

    /**
     * 删除set
     * @param key
     * @param member
     */
    public static void srem(String key,String... member) {
        Transaction tx = getTxByKey(key);
        tx.srem(key, member);
    }

    /**
     * 删除sorted set
     * @param key
     * @param member
     */
    public static void zrem(String key,String... member) {
        Transaction tx = getTxByKey(key);
        tx.zrem(key, member);
    }


    /**
     * 提交
     */
    public static void exec() {
        Map<String,Transaction> map = (Map<String,Transaction> )txThreadLocal.get();
        for(Map.Entry<String,Transaction> entry:map.entrySet()) {
            entry.getValue().exec();
        }
    }

    /**
     * 回滚
     */
    public static void discard() {
        Map<String,Transaction> map = (Map<String,Transaction> )txThreadLocal.get();
        for(Map.Entry<String,Transaction> entry:map.entrySet()) {
            entry.getValue().discard();
        }
    }

    /**
     * 根据key,得到事务对象
     * @param key
     * @return
     */
    private static Transaction getTxByKey(String key) {
        JedisCluster cluster = clusterThreadLocal.get();
        Map<String, Transaction> res = (Map<String, Transaction>)txThreadLocal.get();
        if(res==null) {
            res = new HashMap<>();
        }
        Map<String, JedisPool> map = cluster.getClusterNodes();
        for(Map.Entry<String,JedisPool> entry:map.entrySet()) {
            String keyEntry = entry.getKey();
            String strArray[] = keyEntry.split(":");
            String host = strArray[0];
            Integer port = Integer.parseInt(strArray[1]);
            Jedis jedis = new Jedis(host, port);
            try {
                jedis.exists(key);
                Transaction tx = jedis.multi();
                res.put(key, tx);
                txThreadLocal.set(res);
                return tx;
            } catch (Exception e) {
                jedis.close();
            }
        }
        return null;
    }

}

5、测试

RedisTransactionTest.java

/**
 * Created by guonl
 * Date 2019/3/1 4:02 PM
 * Description: redis集群测试
 */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring/spring-redis.xml")
public class RedisTransactionTest {

    private static Logger logger = LoggerFactory.getLogger(RedisTransactionTest.class);

    private final static String TEST_KEY_PREFIX = "test:{transaction}:redis";

    @Autowired
    private JedisCluster jedisCluster;

    @Test
    private void test() {
        //开启事务
        JedisClusterTransactionManager.multi(jedisCluster);
        try {
            for (int i = 0; i < 10; i++) {
                JedisClusterTransactionManager.set(TEST_KEY_PREFIX + i, i + "", 120);
            }
            //事务提交
            JedisClusterTransactionManager.exec();
        } catch (Exception e) {
            //如果异常,事务回滚
            JedisClusterTransactionManager.discard();
        }

        //查询
        for (int i = 0; i < 10; i++) {
            String s = jedisCluster.get(TEST_KEY_PREFIX + i);
            System.out.println("查询结果:" + s);
        }

    }


}

Show Disqus Comments

Search

    Post Directory