使用Redis实现分布式消息订阅

场景:

WebSocket的Session不能序列化 无法分布式部署

思考:

消息可以被序列化存到redis中,再向每个ws服务器发送序列化后的消息,实现分布式部署

依赖引入:

<!-- redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

配置:

application.yml 

spring:
  redis:
    database: 3 #0-15随意
    host: 127.0.0.1 #你的redis地址
    port: 6379 #端口号
    password: #密码
    timeout: 6000ms #超时时间
    lettuce:
      pool:
        max-active: 1000
        max-wait: -1ms
        max-idle: 10
        min-idle: 5

新建channel监听类:这里会处理 接收到消息执行的动作

public class MessagerListener implements MessageListener {
    @Override
    public void onMessage(Message message, byte[] bytes) {  //这里处理逻辑                     
        System.out.print("收到了一条消息");    
    }
}

配置channel监听:(毕竟监听类也不知道你监听的啥啊)

@Bean
RedisMessageListenerContainer redisContainer(RedisConnectionFactory connectionFactory) {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.addMessageListener(new MessagerListener(), new ChannelTopic("example"));
    return container;
}

这里我直接写在启动类里了,example可以随意替换为你想用的channel名称 

此处为 监听名为example的channelTopic,监听器为MessagerListener

这时对Channel的监听已经完成,但是我们还无法向这个Channel中发送任何内容,这时我们需要一个消息发送器

新建消息发送器:

由于是对Redis进行操作 这里使用RedisTemplate来操作

创建RedisConfig

@Configuration
public class RedisConfig {
    /**
     * 注入 RedisConnectionFactory 连接工厂
     */
    @Autowired
    RedisConnectionFactory factory;

    /**
     * 实例化 RedisTemplate 对象
     *
     * @return
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate() {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        FastJsonRedisSerializer<Object> fastJsonRedisSerializer = new FastJsonRedisSerializer<Object>(Object.class);
        redisTemplate.setHashValueSerializer(fastJsonRedisSerializer);
        redisTemplate.setValueSerializer(fastJsonRedisSerializer);
        redisTemplate.setConnectionFactory(factory);
        return redisTemplate;
    }
}

创建redis工具类

@Component
public class RedisUtils { 
    @Autowired 
    RedisTemplate&lt;String,Object&gt; redisTemplate; 
    public void sendMessage(String message){  
        redisTemplate.convertAndSend("example", message); //这里example替换成你定义的ChannelTopic 
    }
}

这时调用sendMessage 就可以执行MessageListener的监听方法了

但是Message类我们无法直接拿到他的内容,我们就需要使用redisTemplate解析内容了

在redis工具类中新建方法:

public String getMessage(org.springframework.data.redis.connection.Message message) {
    return redisTemplate.getStringSerializer().deserialize(message.getBody());
}

最后 在MessagerListener 中 静态注入RedisUtils类,将onMessage方法修改为:

    @Override    
    public void onMessage(Message message, byte[] bytes) {
        System.out.print(redisUtils.getMessage(message));
    }

就可以打印出你发送的内容了

下一篇