redis的分片集群模式是不支持redis的事务的,因为不同的key会做hash算法,分布到不同的哈希槽内,这就破坏了redis操作的原子性,在这种模式下面是不能支持事务的,但是集群里边的每个节点是可以支持事务。
为了实现要操作的key分布到同一个节点下面,就需要引入另外的一个概念:Hash Tags
Hash Tags
首先我们需要了解分片技术的矛盾什么地方:
即要求key尽可能地分散到不同机器,又要求某些相关联的key分配到相同机器。
分片,就是一个hash的过程:对key做md5,sha1等hash算法,根据hash值分配到不同的机器上。
为了实现将key分到相同机器,就需要相同的hash值,即相同的key(改变hash算法也行,但不简单)。但key相同是不现实的,因为key都有不同的用途。
例如,user:user1:ids保存用户的tweets ID,
user:user1:tweets保存tweet的具体内容,两个key不可能同名。
仔细观察user:user1:ids和user:user1:tweets,两个key其实有相同的地方,即user1。能不能拿这一部分去计算hash呢?
这就是 Hash Tag
,允许用key的部分字符串来计算hash。
当一个key包含 {} 的时候,就不对整个key做hash,而仅对 {} 包括的字符串做hash。
假设hash算法为sha1。对user:{user1}:ids和user:{user1}:tweets,其hash值都等同于sha1(user1)。
上面的原理也都解释清楚了,接下来就直接上代码了
redis集群模式的配置
1、spring配置redis集群
spring-redis.xml
<!--Redis集群配置 redis 3.0 以后提供集群模式 spring-data-redis 1.8.7以后提供集群api-->
<bean id="redisClusterConfiguration" class="org.springframework.data.redis.connection.RedisClusterConfiguration">
<property name="maxRedirects" value="3"/>
<property name="clusterNodes">
<set>
<bean class="org.springframework.data.redis.connection.RedisClusterNode">
<constructor-arg name="host" value="${session.redis.cluster.host1}" />
<constructor-arg name="port" value="${session.redis.cluster.port1}"/>
</bean>
<bean class="org.springframework.data.redis.connection.RedisClusterNode">
<constructor-arg name="host" value="${session.redis.cluster.host2}" />
<constructor-arg name="port" value="${session.redis.cluster.port2}"/>
</bean>
<bean class="org.springframework.data.redis.connection.RedisClusterNode">
<constructor-arg name="host" value="${session.redis.cluster.host3}" />
<constructor-arg name="port" value="${session.redis.cluster.port3}"/>
</bean>
<bean class="org.springframework.data.redis.connection.RedisClusterNode">
<constructor-arg name="host" value="${session.redis.cluster.host4}" />
<constructor-arg name="port" value="${session.redis.cluster.port4}"/>
</bean>
<bean class="org.springframework.data.redis.connection.RedisClusterNode">
<constructor-arg name="host" value="${session.redis.cluster.host5}" />
<constructor-arg name="port" value="${session.redis.cluster.port5}"/>
</bean>
<bean class="org.springframework.data.redis.connection.RedisClusterNode">
<constructor-arg name="host" value="${session.redis.cluster.host6}" />
<constructor-arg name="port" value="${session.redis.cluster.port6}"/>
</bean>
</set>
</property>
</bean>
<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
<property name="maxIdle" value="${redis.maxIdle}"/>
<property name="maxTotal" value="${redis.maxTotal}"/>
</bean>
<bean id="jedisConnectionFactory"
class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" p:usePool="true" >
<constructor-arg name="clusterConfig" ref="redisClusterConfiguration"/>
<constructor-arg name="poolConfig" ref="jedisPoolConfig" />
</bean>
<!--redis访问模板-->
<bean id ="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="jedisConnectionFactory" />
</bean>
2、redis.properties配置
session.redis.cluster.host1=XX.XX.XX.XX
session.redis.cluster.host2=XX.XX.XX.XX
session.redis.cluster.host3=XX.XX.XX.XX
session.redis.cluster.host4=XX.XX.XX.XX
session.redis.cluster.host5=XX.XX.XX.XX
session.redis.cluster.host6=XX.XX.XX.XX
session.redis.cluster.port1=7379
session.redis.cluster.port2=7380
session.redis.cluster.port3=7381
session.redis.cluster.port4=7379
session.redis.cluster.port5=7380
session.redis.cluster.port6=7381
redis.maxIdle=20
redis.maxTotal=100
3、配置一个JedisCluster的Bean
JedisClusterConfig.java
/**
* Created by guonl
* Date 2019/2/21 1:19 PM
* Description: 从RedisClusterConfiguration中获取redis的连接参数
*/
@Configuration
public class JedisClusterConfig {
@Autowired
private RedisClusterConfiguration cacheRedisClusterConfiguration;
@Bean
public JedisCluster getJedisCluster() {
Set<HostAndPort> nodes = new HashSet<>();
Set<RedisNode> clusterNodes = cacheRedisClusterConfiguration.getClusterNodes();
for (RedisNode clusterNode : clusterNodes) {
nodes.add(new HostAndPort(clusterNode.getHost(), clusterNode.getPort()));
}
return new JedisCluster(nodes);
}
}
4、redis 集群管理事务的工具类
JedisClusterTransactionManager.java
/**
* Created by guonl
* Date 2019/2/21 12:47 PM
* Description: redis集群事务管理工具类
*/
public class JedisClusterTransactionManager {
private static ThreadLocal<Object> txThreadLocal = new ThreadLocal<>();
private static ThreadLocal<JedisCluster> clusterThreadLocal= new ThreadLocal<>();
//开启事务
public static void multi(JedisCluster jedisCluster){
clusterThreadLocal.set(jedisCluster);
}
/**
* 保存string数据类型
* @param key
* @param value
* @param seconds 过期时间
*/
public static void set(String key,String value,int seconds) {
Transaction tx = getTxByKey(key);
tx.set(key, value);
tx.expire(key,seconds);
}
/**
* 批量保存string数据类型
* @param keysvalues
*/
public static void mset(String... keysvalues) {
if(keysvalues!=null && keysvalues.length>0) {
for(int i=0;i<keysvalues.length;i+=2) {
String key = keysvalues[i];
String value = keysvalues[i+1];
Transaction tx = getTxByKey(key);
tx.set(key, value);
}
}
}
/**
* 保存hash数据类型
* @param key
* @param value
*/
public static void hset(String key, String field,String value) {
Transaction tx = getTxByKey(key);
tx.hset(key, field, value);
}
/**
* 批量保存hash数据类型
* @param key
* @param hash
*/
public static void hmset(String key,Map<String,String> hash) {
Transaction tx = getTxByKey(key);
tx.hmset(key, hash);
}
/**
* 保存list数据类型
* @param key
* @param values
*/
public static void lpush(String key,String... values) {
Transaction tx = getTxByKey(key);
tx.lpush(key, values);
}
/**
* 保存set数据类型
* @param key
* @param member
*/
public static void sadd(String key,String... member) {
Transaction tx = getTxByKey(key);
tx.sadd(key, member);
}
/**
* 保存sorted set数据类型
* @param key
* @param scoreMembers
*/
public static void zadd(String key,Map<String,Double> scoreMembers) {
Transaction tx = getTxByKey(key);
tx.zadd(key, scoreMembers);
}
/**
* 通用删除
* @param keys
*/
public static void del(String... keys) {
if(keys!=null && keys.length>0) {
for(String key:keys) {
Transaction tx = getTxByKey(key);
tx.del(key);
}
}
}
/**
* 删除hash
* @param key
* @param field
*/
public static void hdel(String key,String... field) {
Transaction tx = getTxByKey(key);
tx.hdel(key, field);
}
/**
* 删除set
* @param key
* @param member
*/
public static void srem(String key,String... member) {
Transaction tx = getTxByKey(key);
tx.srem(key, member);
}
/**
* 删除sorted set
* @param key
* @param member
*/
public static void zrem(String key,String... member) {
Transaction tx = getTxByKey(key);
tx.zrem(key, member);
}
/**
* 提交
*/
public static void exec() {
Map<String,Transaction> map = (Map<String,Transaction> )txThreadLocal.get();
for(Map.Entry<String,Transaction> entry:map.entrySet()) {
entry.getValue().exec();
}
}
/**
* 回滚
*/
public static void discard() {
Map<String,Transaction> map = (Map<String,Transaction> )txThreadLocal.get();
for(Map.Entry<String,Transaction> entry:map.entrySet()) {
entry.getValue().discard();
}
}
/**
* 根据key,得到事务对象
* @param key
* @return
*/
private static Transaction getTxByKey(String key) {
JedisCluster cluster = clusterThreadLocal.get();
Map<String, Transaction> res = (Map<String, Transaction>)txThreadLocal.get();
if(res==null) {
res = new HashMap<>();
}
Map<String, JedisPool> map = cluster.getClusterNodes();
for(Map.Entry<String,JedisPool> entry:map.entrySet()) {
String keyEntry = entry.getKey();
String strArray[] = keyEntry.split(":");
String host = strArray[0];
Integer port = Integer.parseInt(strArray[1]);
Jedis jedis = new Jedis(host, port);
try {
jedis.exists(key);
Transaction tx = jedis.multi();
res.put(key, tx);
txThreadLocal.set(res);
return tx;
} catch (Exception e) {
jedis.close();
}
}
return null;
}
}
5、测试
RedisTransactionTest.java
/**
* Created by guonl
* Date 2019/3/1 4:02 PM
* Description: redis集群测试
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring/spring-redis.xml")
public class RedisTransactionTest {
private static Logger logger = LoggerFactory.getLogger(RedisTransactionTest.class);
private final static String TEST_KEY_PREFIX = "test:{transaction}:redis";
@Autowired
private JedisCluster jedisCluster;
@Test
private void test() {
//开启事务
JedisClusterTransactionManager.multi(jedisCluster);
try {
for (int i = 0; i < 10; i++) {
JedisClusterTransactionManager.set(TEST_KEY_PREFIX + i, i + "", 120);
}
//事务提交
JedisClusterTransactionManager.exec();
} catch (Exception e) {
//如果异常,事务回滚
JedisClusterTransactionManager.discard();
}
//查询
for (int i = 0; i < 10; i++) {
String s = jedisCluster.get(TEST_KEY_PREFIX + i);
System.out.println("查询结果:" + s);
}
}
}