Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。Redis 客户端可以订阅任意数量的频道。
新建SpringBoot项目
POM添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
添加配置
在application.yml
中配置redis连接信息:
server:
port: 8888
spring:
datasource:
redis:
host: 127.0.0.1
port: 6379
cache:
type: redis
创建redis配置类
package com.bystart.redis.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
public class RedisConfig {
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<Object, Object> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
//使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(mapper);
template.setValueSerializer(jackson2JsonRedisSerializer);
//使用StringRedisSerializer来序列化和反序列化redis的key值
template.setKeySerializer(new StringRedisSerializer());
template.afterPropertiesSet();
return template;
}
}
监听主题
处理监听到的消息
package com.bystart.redis.listener;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class RedisMessageListener implements MessageListener {
@Resource
private RedisTemplate redisTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
// 解析消息体
byte[] messageBody = message.getBody();
// 使用值序列化器转换
Object msg = redisTemplate.getValueSerializer().deserialize(messageBody);
// 获取监听的主题
byte[] channelByte = message.getChannel();
// 使用字符串序列化器转换
Object channel = redisTemplate.getStringSerializer().deserialize(channelByte);
// 主题名称转换
String patternStr = new String(pattern);
System.out.println(patternStr);
System.out.println("---主题---: " + channel);
System.out.println("---消息内容---: " + msg);
}
}
配置监听主题和消费者
package com.bystart.redis.config;
import com.bystart.redis.listener.RedisMessageListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
@Configuration
public class RedisSubConfig {
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory factory,
RedisMessageListener listener){
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
// 监听指定主题,将收到的消息移交给 RedisMessageListener 处理
container.addMessageListener(listener, new ChannelTopic("login.notice"));
return container;
}
}
推送测试
新增一个controller
:
package com.bystart.redis.controller;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class TestController {
@Resource
private RedisTemplate redisTemplate;
@GetMapping
public String test(@RequestParam String message) {
redisTemplate.convertAndSend("login.notice", message);
return "ok: " + message;
}
}
访问http://127.0.0.1:8888?message=hello
返回
login.notice
---频道---: login.notice
---消息内容---: hello
评论 (0)