Stream

Redis Stream 是 Redis 5.0 版本新增加的數據結構。 Redis Stream 主要用於消息隊列(MQ,Message Queue),Redis 本身是有一個 Redis 發布訂閱 (pub/sub) 來實現消息隊列的功能,但它有個缺點就是消息無法持久化,如果出現網絡斷開、Redis 宕機等,消息就會被丟棄。

以下都是我根據國人的教學再重新寫一遍,做一些調整轉換成自己的東西。 完整內容可以去GitHub上面找。

build.gradle > dependencies

implementation 'org.springframework.boot:spring-boot-starter-data-redis-reactive'
implementation 'org.springframework.boot:spring-boot-starter-web'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'

// https://mvnrepository.com/artifact/org.apache.commons/commons-pool2
implementation 'org.apache.commons:commons-pool2:2.11.1'

// https://mvnrepository.com/artifact/com.github.javafaker/javafaker
implementation 'com.github.javafaker:javafaker:1.0.2'

yml

spring:
  application:
    name: WebStorageSystemRestApi

  redis:
    host: 127.0.0.1
    port: 6379
    password: cPKGpSGvky
    database: 0
    lettuce:
      pool:
        max-wait: 1s
        min-idle: 3
        max-active: 8
    timeout: 2s

config

@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(connectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        // 這個地方不可使用 json 序列化,否則會有問題,會出現一個 java.lang.IllegalArgumentException: Value must not be null! 錯誤
        redisTemplate.setHashValueSerializer(RedisSerializer.string());
        return redisTemplate;
    }
}

stream listener

public class AsyncConsumeStreamListener implements StreamListener<String, ObjectRecord<String, Book>> {
    /**
     * 消費者類型:獨立消費、消費群組
     */
    private String consumerType;
    /**
     * 消費組
     */
    private String group;
    /**
     * 消費組中的某個消費者
     */
    private String consumerName;

    public AsyncConsumeStreamListener(String consumerType, String group, String consumerName) {
        this.consumerType = consumerType;
        this.group = group;
        this.consumerName = consumerName;
    }

    private RedisTemplate<String, Object> redisTemplate;

    @Override
    public void onMessage(ObjectRecord<String, Book> message) {
        String stream = message.getStream();
        RecordId id = message.getId();
        Book value = message.getValue();
        if (StringUtils.isBlank(group)) {
            log.info("[{}]: 接收到一個消息 id:{}", consumerType, id);
        } else {
            log.info("[{}], group:{},  consume:{}, 消息 id:{}", consumerType, group, consumerName, id);
//            redisTemplate.opsForStream().acknowledge(stream, group, id);
        }

        // 當是消費組消費時,如果不是自動ack,則需要在這個地方手動ack, 但獨立消費不知如何手動ack
//         redisTemplate.opsForStream().acknowledge("key","group","recordId");
    }
}

stream configuration

public class RedisStreamConfiguration {

    @Resource
    private RedisConnectionFactory redisConnectionFactory;

    /**
     * 可以同時支援 獨立消費 和 消費者組 消費
     * <p>
     * 可以支援動態的 增加和刪除 消費者
     * <p>
     * <p>
     * 支援消費者發生異常後,還可以繼續消費,消費者不會被剔除,通過StreamReadRequest的cancelOnError來實現
     * </p>
     * 消費組需要預先創建出來
     *
     * @return StreamMessageListenerContainer
     */
    @Bean(initMethod = "start", destroyMethod = "stop")
    public StreamMessageListenerContainer<String, ObjectRecord<String, Book>> streamMessageListenerContainer() {
        AtomicInteger index = new AtomicInteger(1);
        int processors = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), r -> {
            Thread thread = new Thread(r);
            thread.setName("async-stream-consumer-" + index.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        });

        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Book>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                        .builder()
                        // 一次最多獲取多少條消息
                        .batchSize(10)
                        // 運行 Stream 的 poll task
                        .executor(executor)
                        // 可以理解為 Stream Key 的序列化方式
                        .keySerializer(RedisSerializer.string())
                        // 可以理解為 Stream 後方的字段的 key 的序列化方式
                        .hashKeySerializer(RedisSerializer.string())
                        // 可以理解為 Stream 後方的字段的 value 的序列化方式
                        .hashValueSerializer(RedisSerializer.string())
                        // Stream 中沒有消息時,阻塞多長時間,需要比 `spring.redis.timeout` 的時間小
                        .pollTimeout(Duration.ofSeconds(1))
                        // ObjectRecord 時,將 對象的 filed 和 value 轉換成一個 Map 比如:將Book對象轉換成map
                        .objectMapper(new ObjectHashMapper())
                        // 獲取消息的過程或獲取到消息給具體的消息者處理的過程中,發生了異常的處理
                        .errorHandler(new CustomErrorHandler())
                        // 將發送到Stream中的Record轉換成ObjectRecord,轉換成具體的類型是這個地方指定的類型
                        .targetType(Book.class)
                        .build();

        StreamMessageListenerContainer<String, ObjectRecord<String, Book>> streamMessageListenerContainer =
                StreamMessageListenerContainer.create(redisConnectionFactory, options);

        // 獨立消費
        {
            // 目前還不清楚此獨立消費, 使用場景是什麼.
            streamMessageListenerContainer.receive(StreamOffset.fromStart(Constants.STREAM_KEY_001),
                    new AsyncConsumeStreamListener(Constants.singleConsume, null, null));
        }

        // 消費群組
        {
            // 群組A
            {
                // 不自動ack
                streamMessageListenerContainer.receive(Consumer.from(Constants.group_a, "consumer-noAck"),
                        StreamOffset.create(Constants.STREAM_KEY_001, ReadOffset.lastConsumed()),
                        new AsyncConsumeStreamListener(Constants.groupConsume, Constants.group_a, "consumer-noAck"));

                // 自動 ack
                streamMessageListenerContainer.receiveAutoAck(Consumer.from(Constants.group_a, "consumer-a"),
                        StreamOffset.create(Constants.STREAM_KEY_001, ReadOffset.lastConsumed()),
                        new AsyncConsumeStreamListener(Constants.groupConsume, Constants.group_a, "consumer-a"));

                streamMessageListenerContainer.receive(Consumer.from(Constants.group_a, "consumer-b"),
                        StreamOffset.create(Constants.STREAM_KEY_001, ReadOffset.lastConsumed()),
                        new AsyncConsumeStreamListener(Constants.groupConsume, Constants.group_a, "consumer-b"));


                // 可自定義 consumer 消費動作 > 如果需要對某個消費者進行自定義配置時, 需使用register方法的時, 傳入`StreamReadRequest`對象.
                // consumer-c,在消費發生異常時,還可以繼續消費
                StreamMessageListenerContainer.ConsumerStreamReadRequest<String> streamReadRequest = StreamMessageListenerContainer
                        .StreamReadRequest
                        .builder(StreamOffset.create(Constants.STREAM_KEY_001, ReadOffset.lastConsumed()))
                        .consumer(Consumer.from(Constants.group_a, "consumer-c"))
                        .autoAcknowledge(true)
                        // 如果消費者發生了異常,判斷是否取消消費者消費
                        .cancelOnError(throwable -> false)
                        .build();
                streamMessageListenerContainer.register(streamReadRequest,
                        new AsyncConsumeStreamListener(Constants.groupConsume, Constants.group_a, "consumer-c"));
            }

            // 群組B
            {
                // 自動ack
                streamMessageListenerContainer.receiveAutoAck(Consumer.from(Constants.group_b, "consumer-1"),
                        StreamOffset.create(Constants.STREAM_KEY_001, ReadOffset.lastConsumed()),
                        new AsyncConsumeStreamListener(Constants.groupConsume, Constants.group_b, "consumer-1"));
            }
        }
        return streamMessageListenerContainer;
    }
}

consumer

public class NonBlockConsumer01 implements InitializingBean, DisposableBean {

    private ThreadPoolExecutor threadPoolExecutor;
    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    private volatile boolean stop = false;

    @Override
    public void afterPropertiesSet() {

        // 初始化線程池
        threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), r -> {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("nonblock-01");
            return thread;
        });

        StreamReadOptions streamReadOptions = StreamReadOptions.empty()
                // 如果沒有數據,則阻塞1s 阻塞時間需要小於`spring.redis.timeout`配置的時間
                .block(Duration.ofMillis(1000))
                // 一直阻塞直到獲取數據,可能會報超時異常
                // .block(Duration.ofMillis(0))
                // 1次獲取10個數據
                .count(10);

        // 紀錄最後一次 recordId
        StringBuilder readOffset = new StringBuilder("0-0");
        threadPoolExecutor.execute(() -> {
            while (!stop) {
                // 此時Stream可以理解成普通的list,但是Stream中的消息在讀取後不會消失, 如果需要刪除需再透過 修剪stream 方法移除stream 內容
                // 使用XREAD讀取數據時,需要記錄下最後一次讀取到offset,然後當作下次讀取的offset,否則讀取出來的數據會有問題
                List<ObjectRecord<String, Book>> objectRecords = redisTemplate.opsForStream()
                        .read(Book.class, streamReadOptions, StreamOffset.create(Constants.STREAM_KEY_001, ReadOffset.from(readOffset.toString())));
                if (CollectionUtils.isEmpty(objectRecords)) {
                    log.warn("沒有獲取到數據");
                    continue;
                }
                for (ObjectRecord<String, Book> objectRecord : objectRecords) {
                    log.info("NonBlockConsumer01 >> 獲取到的數據信息 id:{}", objectRecord.getId());
                    readOffset.setLength(0);
                    readOffset.append(objectRecord.getId());
                }
            }
        });
    }

    @Override
    public void destroy() throws Exception {
        stop = true;
        threadPoolExecutor.shutdown();
        threadPoolExecutor.awaitTermination(3, TimeUnit.SECONDS);
    }
}

producer

public class StreamProducer {

    private final RedisTemplate<String, Object> redisTemplate;

    public void sendRecord(String streamKey) {
        Book book = Book.create();
        ObjectRecord<String, Book> record = StreamRecords.newRecord()
                .in(streamKey)
                .ofObject(book)
                .withId(RecordId.autoGenerate());

        RecordId recordId = redisTemplate.opsForStream().add(record);
        log.info("產生一本書的資訊:{}, add stream return recordId:{}", book, recordId);
    }
}

entity

public class Book {
    private String title;
    private String author;
    
    public static Book create() {
        Faker faker = new Faker(Locale.JAPAN);
        com.github.javafaker.Book fakerBook = faker.book();
        Book book = new Book();
        book.setTitle(fakerBook.title());
        book.setAuthor(fakerBook.author());
        return book;
    }
}

spring boot application runner

public class CycleGeneratorStreamMessageRunner implements ApplicationRunner {

    private final StreamProducer streamProducer;

    @Override
    public void run(ApplicationArguments args) {
        // 每五秒產生訊息塞入訊息佇列
        Executors.newSingleThreadScheduledExecutor()
                .scheduleAtFixedRate(() -> streamProducer.sendRecord(STREAM_KEY_001),
                        0, 5, TimeUnit.SECONDS);
    }
}

spring boot schedule

public class XTrimStreamTask {
    private final RedisTemplate<String, Object> redisTemplate;

//    @Scheduled(cron ="0 0 0 * * ?")
    @Scheduled(cron ="0 * * * * ?")
    public void execute() {
        // 修剪stream 只留下最後15筆資料  Ex. totalMessage:45 -> trim(15) -> removeCount=30
        Long removeCount = redisTemplate.opsForStream().trim(Constants.STREAM_KEY_001, 15l);
        log.info("XTrimStreamTask > 修剪Stream removeCount:{}", removeCount);
    }
}

initializer redis stream

## 連線 redis
redis-cli -h 127.0.0.1 -p 6379 -a cPKGpSGvky
## 刪除 stream-001
del stream-001
## mkstream 表示如果這個Stream不存在,則會自動創建出來。 創建兩個群組 group-a & group-b
XGROUP CREATE stream-001 group-a $ mkstream
## xinfo stream stream-001
XGROUP CREATE stream-001 group-b $ mkstream
## 查看 stream 狀況
xinfo stream stream-001

console log

2023-01-05 14:31:59.757  INFO 21518 --- [    nonblock-01] c.c.r.s.c.impl.NonBlockConsumer01        : NonBlockConsumer01 >> 獲取到的數據信息 id:1672900319756-0
2023-01-05 14:31:59.754  INFO 21518 --- [pool-1-thread-1] c.c.r.stream.producer.StreamProducer     : 產生一本書的資訊:Book(title=Great Work of Time, author=久保 奈々), add stream return recordId:1672900319756-0
2023-01-05 14:31:59.758  INFO 21518 --- [ream-consumer-1] c.c.r.s.c.AsyncConsumeStreamListener     : [獨立消費]: 接收到一個消息 id:1672900319756-0
2023-01-05 14:31:59.758  INFO 21518 --- [ream-consumer-5] c.c.r.s.c.AsyncConsumeStreamListener     : [消費群組], group:group-a,  consume:consumer-c, 消息 id:1672900319756-0
2023-01-05 14:31:59.758  INFO 21518 --- [ream-consumer-6] c.c.r.s.c.AsyncConsumeStreamListener     : [消費群組], group:group-b,  consume:consumer-1, 消息 id:1672900319756-0
2023-01-05 14:32:00.013  INFO 21518 --- [   scheduling-1] c.caster.redis.schedule.XTrimStreamTask  : XTrimStreamTask > 修剪Stream removeCount:0
2023-01-05 14:32:00.786  WARN 21518 --- [    nonblock-01] c.c.r.s.c.impl.NonBlockConsumer01        : 沒有獲取到數據
2023-01-05 14:32:01.805  WARN 21518 --- [    nonblock-01] c.c.r.s.c.impl.NonBlockConsumer01        : 沒有獲取到數據
2023-01-05 14:32:02.848  WARN 21518 --- [    nonblock-01] c.c.r.s.c.impl.NonBlockConsumer01        : 沒有獲取到數據
2023-01-05 14:32:03.878  WARN 21518 --- [    nonblock-01] c.c.r.s.c.impl.NonBlockConsumer01        : 沒有獲取到數據
2023-01-05 14:32:04.708  INFO 21518 --- [pool-1-thread-1] c.c.r.stream.producer.StreamProducer     : 產生一本書的資訊:Book(title=Ego Dominus Tuus, author=中山 海斗), add stream return recordId:1672900324711-0
2023-01-05 14:32:04.708  INFO 21518 --- [ream-consumer-1] c.c.r.s.c.AsyncConsumeStreamListener     : [獨立消費]: 接收到一個消息 id:1672900324711-0
2023-01-05 14:32:04.708  INFO 21518 --- [ream-consumer-6] c.c.r.s.c.AsyncConsumeStreamListener     : [消費群組], group:group-b,  consume:consumer-1, 消息 id:1672900324711-0
2023-01-05 14:32:04.708  INFO 21518 --- [    nonblock-01] c.c.r.s.c.impl.NonBlockConsumer01        : NonBlockConsumer01 >> 獲取到的數據信息 id:1672900324711-0
2023-01-05 14:32:04.708  INFO 21518 --- [ream-consumer-2] c.c.r.s.c.AsyncConsumeStreamListener     : [消費群組], group:group-a,  consume:consumer-a, 消息 id:1672900324711-0
2023-01-05 14:32:05.753  WARN 21518 --- [    nonblock-01] c.c.r.s.c.impl.NonBlockConsumer01        : 沒有獲取到數據
2023-01-05 14:32:06.773  WARN 21518 --- [    nonblock-01] c.c.r.s.c.impl.NonBlockConsumer01        : 沒有獲取到數據
2023-01-05 14:32:07.814  WARN 21518 --- [    nonblock-01] c.c.r.s.c.impl.NonBlockConsumer01        : 沒有獲取到數據
2023-01-05 14:32:08.870  WARN 21518 --- [    nonblock-01] c.c.r.s.c.impl.NonBlockConsumer01        : 沒有獲取到數據

參考資料

Last updated