Stream
Redis Stream 是 Redis 5.0 版本新增加的數據結構。 Redis Stream 主要用於消息隊列(MQ,Message Queue),Redis 本身是有一個 Redis 發布訂閱 (pub/sub) 來實現消息隊列的功能,但它有個缺點就是消息無法持久化,如果出現網絡斷開、Redis 宕機等,消息就會被丟棄。
Last updated
Redis Stream 是 Redis 5.0 版本新增加的數據結構。 Redis Stream 主要用於消息隊列(MQ,Message Queue),Redis 本身是有一個 Redis 發布訂閱 (pub/sub) 來實現消息隊列的功能,但它有個缺點就是消息無法持久化,如果出現網絡斷開、Redis 宕機等,消息就會被丟棄。
Last updated
以下都是我根據國人的教學再重新寫一遍,做一些調整轉換成自己的東西。 完整內容可以去上面找。
implementation 'org.springframework.boot:spring-boot-starter-data-redis-reactive'
implementation 'org.springframework.boot:spring-boot-starter-web'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'
// https://mvnrepository.com/artifact/org.apache.commons/commons-pool2
implementation 'org.apache.commons:commons-pool2:2.11.1'
// https://mvnrepository.com/artifact/com.github.javafaker/javafaker
implementation 'com.github.javafaker:javafaker:1.0.2'
spring:
application:
name: WebStorageSystemRestApi
redis:
host: 127.0.0.1
port: 6379
password: cPKGpSGvky
database: 0
lettuce:
pool:
max-wait: 1s
min-idle: 3
max-active: 8
timeout: 2s
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(connectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
// 這個地方不可使用 json 序列化,否則會有問題,會出現一個 java.lang.IllegalArgumentException: Value must not be null! 錯誤
redisTemplate.setHashValueSerializer(RedisSerializer.string());
return redisTemplate;
}
}
public class AsyncConsumeStreamListener implements StreamListener<String, ObjectRecord<String, Book>> {
/**
* 消費者類型:獨立消費、消費群組
*/
private String consumerType;
/**
* 消費組
*/
private String group;
/**
* 消費組中的某個消費者
*/
private String consumerName;
public AsyncConsumeStreamListener(String consumerType, String group, String consumerName) {
this.consumerType = consumerType;
this.group = group;
this.consumerName = consumerName;
}
private RedisTemplate<String, Object> redisTemplate;
@Override
public void onMessage(ObjectRecord<String, Book> message) {
String stream = message.getStream();
RecordId id = message.getId();
Book value = message.getValue();
if (StringUtils.isBlank(group)) {
log.info("[{}]: 接收到一個消息 id:{}", consumerType, id);
} else {
log.info("[{}], group:{}, consume:{}, 消息 id:{}", consumerType, group, consumerName, id);
// redisTemplate.opsForStream().acknowledge(stream, group, id);
}
// 當是消費組消費時,如果不是自動ack,則需要在這個地方手動ack, 但獨立消費不知如何手動ack
// redisTemplate.opsForStream().acknowledge("key","group","recordId");
}
}
public class RedisStreamConfiguration {
@Resource
private RedisConnectionFactory redisConnectionFactory;
/**
* 可以同時支援 獨立消費 和 消費者組 消費
* <p>
* 可以支援動態的 增加和刪除 消費者
* <p>
* <p>
* 支援消費者發生異常後,還可以繼續消費,消費者不會被剔除,通過StreamReadRequest的cancelOnError來實現
* </p>
* 消費組需要預先創建出來
*
* @return StreamMessageListenerContainer
*/
@Bean(initMethod = "start", destroyMethod = "stop")
public StreamMessageListenerContainer<String, ObjectRecord<String, Book>> streamMessageListenerContainer() {
AtomicInteger index = new AtomicInteger(1);
int processors = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(), r -> {
Thread thread = new Thread(r);
thread.setName("async-stream-consumer-" + index.getAndIncrement());
thread.setDaemon(true);
return thread;
});
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Book>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
// 一次最多獲取多少條消息
.batchSize(10)
// 運行 Stream 的 poll task
.executor(executor)
// 可以理解為 Stream Key 的序列化方式
.keySerializer(RedisSerializer.string())
// 可以理解為 Stream 後方的字段的 key 的序列化方式
.hashKeySerializer(RedisSerializer.string())
// 可以理解為 Stream 後方的字段的 value 的序列化方式
.hashValueSerializer(RedisSerializer.string())
// Stream 中沒有消息時,阻塞多長時間,需要比 `spring.redis.timeout` 的時間小
.pollTimeout(Duration.ofSeconds(1))
// ObjectRecord 時,將 對象的 filed 和 value 轉換成一個 Map 比如:將Book對象轉換成map
.objectMapper(new ObjectHashMapper())
// 獲取消息的過程或獲取到消息給具體的消息者處理的過程中,發生了異常的處理
.errorHandler(new CustomErrorHandler())
// 將發送到Stream中的Record轉換成ObjectRecord,轉換成具體的類型是這個地方指定的類型
.targetType(Book.class)
.build();
StreamMessageListenerContainer<String, ObjectRecord<String, Book>> streamMessageListenerContainer =
StreamMessageListenerContainer.create(redisConnectionFactory, options);
// 獨立消費
{
// 目前還不清楚此獨立消費, 使用場景是什麼.
streamMessageListenerContainer.receive(StreamOffset.fromStart(Constants.STREAM_KEY_001),
new AsyncConsumeStreamListener(Constants.singleConsume, null, null));
}
// 消費群組
{
// 群組A
{
// 不自動ack
streamMessageListenerContainer.receive(Consumer.from(Constants.group_a, "consumer-noAck"),
StreamOffset.create(Constants.STREAM_KEY_001, ReadOffset.lastConsumed()),
new AsyncConsumeStreamListener(Constants.groupConsume, Constants.group_a, "consumer-noAck"));
// 自動 ack
streamMessageListenerContainer.receiveAutoAck(Consumer.from(Constants.group_a, "consumer-a"),
StreamOffset.create(Constants.STREAM_KEY_001, ReadOffset.lastConsumed()),
new AsyncConsumeStreamListener(Constants.groupConsume, Constants.group_a, "consumer-a"));
streamMessageListenerContainer.receive(Consumer.from(Constants.group_a, "consumer-b"),
StreamOffset.create(Constants.STREAM_KEY_001, ReadOffset.lastConsumed()),
new AsyncConsumeStreamListener(Constants.groupConsume, Constants.group_a, "consumer-b"));
// 可自定義 consumer 消費動作 > 如果需要對某個消費者進行自定義配置時, 需使用register方法的時, 傳入`StreamReadRequest`對象.
// consumer-c,在消費發生異常時,還可以繼續消費
StreamMessageListenerContainer.ConsumerStreamReadRequest<String> streamReadRequest = StreamMessageListenerContainer
.StreamReadRequest
.builder(StreamOffset.create(Constants.STREAM_KEY_001, ReadOffset.lastConsumed()))
.consumer(Consumer.from(Constants.group_a, "consumer-c"))
.autoAcknowledge(true)
// 如果消費者發生了異常,判斷是否取消消費者消費
.cancelOnError(throwable -> false)
.build();
streamMessageListenerContainer.register(streamReadRequest,
new AsyncConsumeStreamListener(Constants.groupConsume, Constants.group_a, "consumer-c"));
}
// 群組B
{
// 自動ack
streamMessageListenerContainer.receiveAutoAck(Consumer.from(Constants.group_b, "consumer-1"),
StreamOffset.create(Constants.STREAM_KEY_001, ReadOffset.lastConsumed()),
new AsyncConsumeStreamListener(Constants.groupConsume, Constants.group_b, "consumer-1"));
}
}
return streamMessageListenerContainer;
}
}
public class NonBlockConsumer01 implements InitializingBean, DisposableBean {
private ThreadPoolExecutor threadPoolExecutor;
@Resource
private RedisTemplate<String, Object> redisTemplate;
private volatile boolean stop = false;
@Override
public void afterPropertiesSet() {
// 初始化線程池
threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(), r -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("nonblock-01");
return thread;
});
StreamReadOptions streamReadOptions = StreamReadOptions.empty()
// 如果沒有數據,則阻塞1s 阻塞時間需要小於`spring.redis.timeout`配置的時間
.block(Duration.ofMillis(1000))
// 一直阻塞直到獲取數據,可能會報超時異常
// .block(Duration.ofMillis(0))
// 1次獲取10個數據
.count(10);
// 紀錄最後一次 recordId
StringBuilder readOffset = new StringBuilder("0-0");
threadPoolExecutor.execute(() -> {
while (!stop) {
// 此時Stream可以理解成普通的list,但是Stream中的消息在讀取後不會消失, 如果需要刪除需再透過 修剪stream 方法移除stream 內容
// 使用XREAD讀取數據時,需要記錄下最後一次讀取到offset,然後當作下次讀取的offset,否則讀取出來的數據會有問題
List<ObjectRecord<String, Book>> objectRecords = redisTemplate.opsForStream()
.read(Book.class, streamReadOptions, StreamOffset.create(Constants.STREAM_KEY_001, ReadOffset.from(readOffset.toString())));
if (CollectionUtils.isEmpty(objectRecords)) {
log.warn("沒有獲取到數據");
continue;
}
for (ObjectRecord<String, Book> objectRecord : objectRecords) {
log.info("NonBlockConsumer01 >> 獲取到的數據信息 id:{}", objectRecord.getId());
readOffset.setLength(0);
readOffset.append(objectRecord.getId());
}
}
});
}
@Override
public void destroy() throws Exception {
stop = true;
threadPoolExecutor.shutdown();
threadPoolExecutor.awaitTermination(3, TimeUnit.SECONDS);
}
}
public class StreamProducer {
private final RedisTemplate<String, Object> redisTemplate;
public void sendRecord(String streamKey) {
Book book = Book.create();
ObjectRecord<String, Book> record = StreamRecords.newRecord()
.in(streamKey)
.ofObject(book)
.withId(RecordId.autoGenerate());
RecordId recordId = redisTemplate.opsForStream().add(record);
log.info("產生一本書的資訊:{}, add stream return recordId:{}", book, recordId);
}
}
public class Book {
private String title;
private String author;
public static Book create() {
Faker faker = new Faker(Locale.JAPAN);
com.github.javafaker.Book fakerBook = faker.book();
Book book = new Book();
book.setTitle(fakerBook.title());
book.setAuthor(fakerBook.author());
return book;
}
}
public class CycleGeneratorStreamMessageRunner implements ApplicationRunner {
private final StreamProducer streamProducer;
@Override
public void run(ApplicationArguments args) {
// 每五秒產生訊息塞入訊息佇列
Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(() -> streamProducer.sendRecord(STREAM_KEY_001),
0, 5, TimeUnit.SECONDS);
}
}
public class XTrimStreamTask {
private final RedisTemplate<String, Object> redisTemplate;
// @Scheduled(cron ="0 0 0 * * ?")
@Scheduled(cron ="0 * * * * ?")
public void execute() {
// 修剪stream 只留下最後15筆資料 Ex. totalMessage:45 -> trim(15) -> removeCount=30
Long removeCount = redisTemplate.opsForStream().trim(Constants.STREAM_KEY_001, 15l);
log.info("XTrimStreamTask > 修剪Stream removeCount:{}", removeCount);
}
}
## 連線 redis
redis-cli -h 127.0.0.1 -p 6379 -a cPKGpSGvky
## 刪除 stream-001
del stream-001
## mkstream 表示如果這個Stream不存在,則會自動創建出來。 創建兩個群組 group-a & group-b
XGROUP CREATE stream-001 group-a $ mkstream
## xinfo stream stream-001
XGROUP CREATE stream-001 group-b $ mkstream
## 查看 stream 狀況
xinfo stream stream-001
2023-01-05 14:31:59.757 INFO 21518 --- [ nonblock-01] c.c.r.s.c.impl.NonBlockConsumer01 : NonBlockConsumer01 >> 獲取到的數據信息 id:1672900319756-0
2023-01-05 14:31:59.754 INFO 21518 --- [pool-1-thread-1] c.c.r.stream.producer.StreamProducer : 產生一本書的資訊:Book(title=Great Work of Time, author=久保 奈々), add stream return recordId:1672900319756-0
2023-01-05 14:31:59.758 INFO 21518 --- [ream-consumer-1] c.c.r.s.c.AsyncConsumeStreamListener : [獨立消費]: 接收到一個消息 id:1672900319756-0
2023-01-05 14:31:59.758 INFO 21518 --- [ream-consumer-5] c.c.r.s.c.AsyncConsumeStreamListener : [消費群組], group:group-a, consume:consumer-c, 消息 id:1672900319756-0
2023-01-05 14:31:59.758 INFO 21518 --- [ream-consumer-6] c.c.r.s.c.AsyncConsumeStreamListener : [消費群組], group:group-b, consume:consumer-1, 消息 id:1672900319756-0
2023-01-05 14:32:00.013 INFO 21518 --- [ scheduling-1] c.caster.redis.schedule.XTrimStreamTask : XTrimStreamTask > 修剪Stream removeCount:0
2023-01-05 14:32:00.786 WARN 21518 --- [ nonblock-01] c.c.r.s.c.impl.NonBlockConsumer01 : 沒有獲取到數據
2023-01-05 14:32:01.805 WARN 21518 --- [ nonblock-01] c.c.r.s.c.impl.NonBlockConsumer01 : 沒有獲取到數據
2023-01-05 14:32:02.848 WARN 21518 --- [ nonblock-01] c.c.r.s.c.impl.NonBlockConsumer01 : 沒有獲取到數據
2023-01-05 14:32:03.878 WARN 21518 --- [ nonblock-01] c.c.r.s.c.impl.NonBlockConsumer01 : 沒有獲取到數據
2023-01-05 14:32:04.708 INFO 21518 --- [pool-1-thread-1] c.c.r.stream.producer.StreamProducer : 產生一本書的資訊:Book(title=Ego Dominus Tuus, author=中山 海斗), add stream return recordId:1672900324711-0
2023-01-05 14:32:04.708 INFO 21518 --- [ream-consumer-1] c.c.r.s.c.AsyncConsumeStreamListener : [獨立消費]: 接收到一個消息 id:1672900324711-0
2023-01-05 14:32:04.708 INFO 21518 --- [ream-consumer-6] c.c.r.s.c.AsyncConsumeStreamListener : [消費群組], group:group-b, consume:consumer-1, 消息 id:1672900324711-0
2023-01-05 14:32:04.708 INFO 21518 --- [ nonblock-01] c.c.r.s.c.impl.NonBlockConsumer01 : NonBlockConsumer01 >> 獲取到的數據信息 id:1672900324711-0
2023-01-05 14:32:04.708 INFO 21518 --- [ream-consumer-2] c.c.r.s.c.AsyncConsumeStreamListener : [消費群組], group:group-a, consume:consumer-a, 消息 id:1672900324711-0
2023-01-05 14:32:05.753 WARN 21518 --- [ nonblock-01] c.c.r.s.c.impl.NonBlockConsumer01 : 沒有獲取到數據
2023-01-05 14:32:06.773 WARN 21518 --- [ nonblock-01] c.c.r.s.c.impl.NonBlockConsumer01 : 沒有獲取到數據
2023-01-05 14:32:07.814 WARN 21518 --- [ nonblock-01] c.c.r.s.c.impl.NonBlockConsumer01 : 沒有獲取到數據
2023-01-05 14:32:08.870 WARN 21518 --- [ nonblock-01] c.c.r.s.c.impl.NonBlockConsumer01 : 沒有獲取到數據