public class StreamProducer {
private final RedisTemplate<String, Object> redisTemplate;
public void sendRecord(String streamKey) {
Book book = Book.create();
ObjectRecord<String, Book> record = StreamRecords.newRecord()
.in(streamKey)
.ofObject(book)
.withId(RecordId.autoGenerate());
RecordId recordId = redisTemplate.opsForStream().add(record);
log.info("產生一本書的資訊:{}, add stream return recordId:{}", book, recordId);
}
}
public class Book {
private String title;
private String author;
public static Book create() {
Faker faker = new Faker(Locale.JAPAN);
com.github.javafaker.Book fakerBook = faker.book();
Book book = new Book();
book.setTitle(fakerBook.title());
book.setAuthor(fakerBook.author());
return book;
}
}
public class CycleGeneratorStreamMessageRunner implements ApplicationRunner {
private final StreamProducer streamProducer;
@Override
public void run(ApplicationArguments args) {
// 每五秒產生訊息塞入訊息佇列
Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(() -> streamProducer.sendRecord(STREAM_KEY_001),
0, 5, TimeUnit.SECONDS);
}
}
public class XTrimStreamTask {
private final RedisTemplate<String, Object> redisTemplate;
// @Scheduled(cron ="0 0 0 * * ?")
@Scheduled(cron ="0 * * * * ?")
public void execute() {
// 修剪stream 只留下最後15筆資料 Ex. totalMessage:45 -> trim(15) -> removeCount=30
Long removeCount = redisTemplate.opsForStream().trim(Constants.STREAM_KEY_001, 15l);
log.info("XTrimStreamTask > 修剪Stream removeCount:{}", removeCount);
}
}