# Stream

以下都是我根據國人的教學再重新寫一遍，做一些調整轉換成自己的東西。\
完整內容可以去[GitHub](https://github.com/MinchangHsu/redis-stream)上面找。

#### build.gradle > dependencies

```groovy
implementation 'org.springframework.boot:spring-boot-starter-data-redis-reactive'
implementation 'org.springframework.boot:spring-boot-starter-web'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'

// https://mvnrepository.com/artifact/org.apache.commons/commons-pool2
implementation 'org.apache.commons:commons-pool2:2.11.1'

// https://mvnrepository.com/artifact/com.github.javafaker/javafaker
implementation 'com.github.javafaker:javafaker:1.0.2'
```

#### yml

```yaml
spring:
  application:
    name: WebStorageSystemRestApi

  redis:
    host: 127.0.0.1
    port: 6379
    password: cPKGpSGvky
    database: 0
    lettuce:
      pool:
        max-wait: 1s
        min-idle: 3
        max-active: 8
    timeout: 2s
```

#### config

```java
@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(connectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        // 這個地方不可使用 json 序列化，否則會有問題，會出現一個 java.lang.IllegalArgumentException: Value must not be null! 錯誤
        redisTemplate.setHashValueSerializer(RedisSerializer.string());
        return redisTemplate;
    }
}
```

#### stream listener

```java
public class AsyncConsumeStreamListener implements StreamListener<String, ObjectRecord<String, Book>> {
    /**
     * 消費者類型：獨立消費、消費群組
     */
    private String consumerType;
    /**
     * 消費組
     */
    private String group;
    /**
     * 消費組中的某個消費者
     */
    private String consumerName;

    public AsyncConsumeStreamListener(String consumerType, String group, String consumerName) {
        this.consumerType = consumerType;
        this.group = group;
        this.consumerName = consumerName;
    }

    private RedisTemplate<String, Object> redisTemplate;

    @Override
    public void onMessage(ObjectRecord<String, Book> message) {
        String stream = message.getStream();
        RecordId id = message.getId();
        Book value = message.getValue();
        if (StringUtils.isBlank(group)) {
            log.info("[{}]: 接收到一個消息 id:{}", consumerType, id);
        } else {
            log.info("[{}], group:{},  consume:{}, 消息 id:{}", consumerType, group, consumerName, id);
//            redisTemplate.opsForStream().acknowledge(stream, group, id);
        }

        // 當是消費組消費時，如果不是自動ack，則需要在這個地方手動ack, 但獨立消費不知如何手動ack
//         redisTemplate.opsForStream().acknowledge("key","group","recordId");
    }
}
```

#### stream configuration

```java
public class RedisStreamConfiguration {

    @Resource
    private RedisConnectionFactory redisConnectionFactory;

    /**
     * 可以同時支援 獨立消費 和 消費者組 消費
     * <p>
     * 可以支援動態的 增加和刪除 消費者
     * <p>
     * <p>
     * 支援消費者發生異常後，還可以繼續消費，消費者不會被剔除，通過StreamReadRequest的cancelOnError來實現
     * </p>
     * 消費組需要預先創建出來
     *
     * @return StreamMessageListenerContainer
     */
    @Bean(initMethod = "start", destroyMethod = "stop")
    public StreamMessageListenerContainer<String, ObjectRecord<String, Book>> streamMessageListenerContainer() {
        AtomicInteger index = new AtomicInteger(1);
        int processors = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), r -> {
            Thread thread = new Thread(r);
            thread.setName("async-stream-consumer-" + index.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        });

        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Book>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                        .builder()
                        // 一次最多獲取多少條消息
                        .batchSize(10)
                        // 運行 Stream 的 poll task
                        .executor(executor)
                        // 可以理解為 Stream Key 的序列化方式
                        .keySerializer(RedisSerializer.string())
                        // 可以理解為 Stream 後方的字段的 key 的序列化方式
                        .hashKeySerializer(RedisSerializer.string())
                        // 可以理解為 Stream 後方的字段的 value 的序列化方式
                        .hashValueSerializer(RedisSerializer.string())
                        // Stream 中沒有消息時，阻塞多長時間，需要比 `spring.redis.timeout` 的時間小
                        .pollTimeout(Duration.ofSeconds(1))
                        // ObjectRecord 時，將 對象的 filed 和 value 轉換成一個 Map 比如：將Book對象轉換成map
                        .objectMapper(new ObjectHashMapper())
                        // 獲取消息的過程或獲取到消息給具體的消息者處理的過程中，發生了異常的處理
                        .errorHandler(new CustomErrorHandler())
                        // 將發送到Stream中的Record轉換成ObjectRecord，轉換成具體的類型是這個地方指定的類型
                        .targetType(Book.class)
                        .build();

        StreamMessageListenerContainer<String, ObjectRecord<String, Book>> streamMessageListenerContainer =
                StreamMessageListenerContainer.create(redisConnectionFactory, options);

        // 獨立消費
        {
            // 目前還不清楚此獨立消費, 使用場景是什麼.
            streamMessageListenerContainer.receive(StreamOffset.fromStart(Constants.STREAM_KEY_001),
                    new AsyncConsumeStreamListener(Constants.singleConsume, null, null));
        }

        // 消費群組
        {
            // 群組A
            {
                // 不自動ack
                streamMessageListenerContainer.receive(Consumer.from(Constants.group_a, "consumer-noAck"),
                        StreamOffset.create(Constants.STREAM_KEY_001, ReadOffset.lastConsumed()),
                        new AsyncConsumeStreamListener(Constants.groupConsume, Constants.group_a, "consumer-noAck"));

                // 自動 ack
                streamMessageListenerContainer.receiveAutoAck(Consumer.from(Constants.group_a, "consumer-a"),
                        StreamOffset.create(Constants.STREAM_KEY_001, ReadOffset.lastConsumed()),
                        new AsyncConsumeStreamListener(Constants.groupConsume, Constants.group_a, "consumer-a"));

                streamMessageListenerContainer.receive(Consumer.from(Constants.group_a, "consumer-b"),
                        StreamOffset.create(Constants.STREAM_KEY_001, ReadOffset.lastConsumed()),
                        new AsyncConsumeStreamListener(Constants.groupConsume, Constants.group_a, "consumer-b"));


                // 可自定義 consumer 消費動作 > 如果需要對某個消費者進行自定義配置時, 需使用register方法的時, 傳入`StreamReadRequest`對象.
                // consumer-c，在消費發生異常時，還可以繼續消費
                StreamMessageListenerContainer.ConsumerStreamReadRequest<String> streamReadRequest = StreamMessageListenerContainer
                        .StreamReadRequest
                        .builder(StreamOffset.create(Constants.STREAM_KEY_001, ReadOffset.lastConsumed()))
                        .consumer(Consumer.from(Constants.group_a, "consumer-c"))
                        .autoAcknowledge(true)
                        // 如果消費者發生了異常，判斷是否取消消費者消費
                        .cancelOnError(throwable -> false)
                        .build();
                streamMessageListenerContainer.register(streamReadRequest,
                        new AsyncConsumeStreamListener(Constants.groupConsume, Constants.group_a, "consumer-c"));
            }

            // 群組B
            {
                // 自動ack
                streamMessageListenerContainer.receiveAutoAck(Consumer.from(Constants.group_b, "consumer-1"),
                        StreamOffset.create(Constants.STREAM_KEY_001, ReadOffset.lastConsumed()),
                        new AsyncConsumeStreamListener(Constants.groupConsume, Constants.group_b, "consumer-1"));
            }
        }
        return streamMessageListenerContainer;
    }
}
```

#### consumer

```java
public class NonBlockConsumer01 implements InitializingBean, DisposableBean {

    private ThreadPoolExecutor threadPoolExecutor;
    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    private volatile boolean stop = false;

    @Override
    public void afterPropertiesSet() {

        // 初始化線程池
        threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), r -> {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("nonblock-01");
            return thread;
        });

        StreamReadOptions streamReadOptions = StreamReadOptions.empty()
                // 如果沒有數據，則阻塞1s 阻塞時間需要小於`spring.redis.timeout`配置的時間
                .block(Duration.ofMillis(1000))
                // 一直阻塞直到獲取數據，可能會報超時異常
                // .block(Duration.ofMillis(0))
                // 1次獲取10個數據
                .count(10);

        // 紀錄最後一次 recordId
        StringBuilder readOffset = new StringBuilder("0-0");
        threadPoolExecutor.execute(() -> {
            while (!stop) {
                // 此時Stream可以理解成普通的list，但是Stream中的消息在讀取後不會消失, 如果需要刪除需再透過 修剪stream 方法移除stream 內容
                // 使用XREAD讀取數據時，需要記錄下最後一次讀取到offset，然後當作下次讀取的offset，否則讀取出來的數據會有問題
                List<ObjectRecord<String, Book>> objectRecords = redisTemplate.opsForStream()
                        .read(Book.class, streamReadOptions, StreamOffset.create(Constants.STREAM_KEY_001, ReadOffset.from(readOffset.toString())));
                if (CollectionUtils.isEmpty(objectRecords)) {
                    log.warn("沒有獲取到數據");
                    continue;
                }
                for (ObjectRecord<String, Book> objectRecord : objectRecords) {
                    log.info("NonBlockConsumer01 >> 獲取到的數據信息 id:{}", objectRecord.getId());
                    readOffset.setLength(0);
                    readOffset.append(objectRecord.getId());
                }
            }
        });
    }

    @Override
    public void destroy() throws Exception {
        stop = true;
        threadPoolExecutor.shutdown();
        threadPoolExecutor.awaitTermination(3, TimeUnit.SECONDS);
    }
}
```

#### producer

```java
public class StreamProducer {

    private final RedisTemplate<String, Object> redisTemplate;

    public void sendRecord(String streamKey) {
        Book book = Book.create();
        ObjectRecord<String, Book> record = StreamRecords.newRecord()
                .in(streamKey)
                .ofObject(book)
                .withId(RecordId.autoGenerate());

        RecordId recordId = redisTemplate.opsForStream().add(record);
        log.info("產生一本書的資訊:{}, add stream return recordId:{}", book, recordId);
    }
}
```

#### entity

```java
public class Book {
    private String title;
    private String author;
    
    public static Book create() {
        Faker faker = new Faker(Locale.JAPAN);
        com.github.javafaker.Book fakerBook = faker.book();
        Book book = new Book();
        book.setTitle(fakerBook.title());
        book.setAuthor(fakerBook.author());
        return book;
    }
}
```

#### spring boot application runner

```java
public class CycleGeneratorStreamMessageRunner implements ApplicationRunner {

    private final StreamProducer streamProducer;

    @Override
    public void run(ApplicationArguments args) {
        // 每五秒產生訊息塞入訊息佇列
        Executors.newSingleThreadScheduledExecutor()
                .scheduleAtFixedRate(() -> streamProducer.sendRecord(STREAM_KEY_001),
                        0, 5, TimeUnit.SECONDS);
    }
}
```

#### spring boot schedule

```java
public class XTrimStreamTask {
    private final RedisTemplate<String, Object> redisTemplate;

//    @Scheduled(cron ="0 0 0 * * ?")
    @Scheduled(cron ="0 * * * * ?")
    public void execute() {
        // 修剪stream 只留下最後15筆資料  Ex. totalMessage:45 -> trim(15) -> removeCount=30
        Long removeCount = redisTemplate.opsForStream().trim(Constants.STREAM_KEY_001, 15l);
        log.info("XTrimStreamTask > 修剪Stream removeCount:{}", removeCount);
    }
}
```

#### initializer redis stream

```powershell
## 連線 redis
redis-cli -h 127.0.0.1 -p 6379 -a cPKGpSGvky
## 刪除 stream-001
del stream-001
## mkstream 表示如果這個Stream不存在，則會自動創建出來。 創建兩個群組 group-a & group-b
XGROUP CREATE stream-001 group-a $ mkstream
## xinfo stream stream-001
XGROUP CREATE stream-001 group-b $ mkstream
## 查看 stream 狀況
xinfo stream stream-001
```

#### console log

```log
2023-01-05 14:31:59.757  INFO 21518 --- [    nonblock-01] c.c.r.s.c.impl.NonBlockConsumer01        : NonBlockConsumer01 >> 獲取到的數據信息 id:1672900319756-0
2023-01-05 14:31:59.754  INFO 21518 --- [pool-1-thread-1] c.c.r.stream.producer.StreamProducer     : 產生一本書的資訊:Book(title=Great Work of Time, author=久保 奈々), add stream return recordId:1672900319756-0
2023-01-05 14:31:59.758  INFO 21518 --- [ream-consumer-1] c.c.r.s.c.AsyncConsumeStreamListener     : [獨立消費]: 接收到一個消息 id:1672900319756-0
2023-01-05 14:31:59.758  INFO 21518 --- [ream-consumer-5] c.c.r.s.c.AsyncConsumeStreamListener     : [消費群組], group:group-a,  consume:consumer-c, 消息 id:1672900319756-0
2023-01-05 14:31:59.758  INFO 21518 --- [ream-consumer-6] c.c.r.s.c.AsyncConsumeStreamListener     : [消費群組], group:group-b,  consume:consumer-1, 消息 id:1672900319756-0
2023-01-05 14:32:00.013  INFO 21518 --- [   scheduling-1] c.caster.redis.schedule.XTrimStreamTask  : XTrimStreamTask > 修剪Stream removeCount:0
2023-01-05 14:32:00.786  WARN 21518 --- [    nonblock-01] c.c.r.s.c.impl.NonBlockConsumer01        : 沒有獲取到數據
2023-01-05 14:32:01.805  WARN 21518 --- [    nonblock-01] c.c.r.s.c.impl.NonBlockConsumer01        : 沒有獲取到數據
2023-01-05 14:32:02.848  WARN 21518 --- [    nonblock-01] c.c.r.s.c.impl.NonBlockConsumer01        : 沒有獲取到數據
2023-01-05 14:32:03.878  WARN 21518 --- [    nonblock-01] c.c.r.s.c.impl.NonBlockConsumer01        : 沒有獲取到數據
2023-01-05 14:32:04.708  INFO 21518 --- [pool-1-thread-1] c.c.r.stream.producer.StreamProducer     : 產生一本書的資訊:Book(title=Ego Dominus Tuus, author=中山 海斗), add stream return recordId:1672900324711-0
2023-01-05 14:32:04.708  INFO 21518 --- [ream-consumer-1] c.c.r.s.c.AsyncConsumeStreamListener     : [獨立消費]: 接收到一個消息 id:1672900324711-0
2023-01-05 14:32:04.708  INFO 21518 --- [ream-consumer-6] c.c.r.s.c.AsyncConsumeStreamListener     : [消費群組], group:group-b,  consume:consumer-1, 消息 id:1672900324711-0
2023-01-05 14:32:04.708  INFO 21518 --- [    nonblock-01] c.c.r.s.c.impl.NonBlockConsumer01        : NonBlockConsumer01 >> 獲取到的數據信息 id:1672900324711-0
2023-01-05 14:32:04.708  INFO 21518 --- [ream-consumer-2] c.c.r.s.c.AsyncConsumeStreamListener     : [消費群組], group:group-a,  consume:consumer-a, 消息 id:1672900324711-0
2023-01-05 14:32:05.753  WARN 21518 --- [    nonblock-01] c.c.r.s.c.impl.NonBlockConsumer01        : 沒有獲取到數據
2023-01-05 14:32:06.773  WARN 21518 --- [    nonblock-01] c.c.r.s.c.impl.NonBlockConsumer01        : 沒有獲取到數據
2023-01-05 14:32:07.814  WARN 21518 --- [    nonblock-01] c.c.r.s.c.impl.NonBlockConsumer01        : 沒有獲取到數據
2023-01-05 14:32:08.870  WARN 21518 --- [    nonblock-01] c.c.r.s.c.impl.NonBlockConsumer01        : 沒有獲取到數據
```

#### 參考資料

1. [基本介紹](https://www.runoob.com/redis/redis-stream.html)
2. [官方網站](https://redis.io/commands/)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://xu-min-chang.gitbook.io/caster-develop-note/java/redis/stream.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
