增量聚合(Incremental Aggregation)

在數據處理與分析領域,我們經常需要對海量數據進行即時或定時的彙總統計。然而,每一次都重新掃描所有歷史數據進行計算,不僅耗時,也給系統帶來沉重的負擔。增量聚合應運而生,它是一種高效的數據處理模式,其核心思想是「只處理新增數據,並用其更新總和」。

本文將針對一個具體需求:「每日統計用戶在不同時間區間(歷年、1年、半年、3個月)的存款總額」,深入探討如何設計一個健壯的增量聚合程式設計架構。

核心架構設計與數據模型

本架構的核心思想是將「歷史數據的初始化」與「每日數據的增量更新」分離處理。透過一個精密的數據模型與排程任務,實現數據的原子性更新與補救。

數據層設計

我們的架構依賴以下幾個關鍵資料表,它們各司其職,共同構建了數據流的基礎:

  1. 原始交易表t_cashin_proposal (存款提案表)

    • 用途:業務交易數據的源頭。

    • 關鍵欄位gts2_customer_id(客戶 ID)、proposal_date(提案日期)、trans_amount(交易金額)、proposal_status(交易狀態,'2' 代表成功)。

  2. 最終統計表t_customer_payment_gateway_detail

    • 用途:儲存最終的客戶存款聚合數據。

    • 關鍵欄位gts2_customer_idpay_switch_seq_id(支付通道 ID)、time_period(數據時間類型,0 為昨日,1 為歷年,以此類推)、total_amount(總金額)。這個表設計的精妙之處在於,它以客戶支付通道為維度,將不同時間區間的數據扁平化儲存在同一張表中。

  3. 任務執行日誌表t_statistics_customer_execution_log

    • 用途:追蹤排程任務的成功執行日期,是斷點續傳的關鍵。

    • 關鍵欄位group_id(執行統計的數據組 ID)、execution_date(成功執行的時間)。

-- 客戶支付通道詳細資訊
create table t_customer_payment_gateway_detail
(
    customer_number   bigint                                 not null,
    gts2_customer_id  bigint                                 not null,
    gts2_account_id   bigint                                 not null,
    pay_switch_seq_id bigint                                 not null,
    time_period       integer                                not null,
    total_amount      numeric(10, 2)                         not null,
    create_date       timestamp with time zone default now() not null,
    update_date       timestamp with time zone default now() not null,
    constraint t_customer_payment_gateway_detail_unique
        unique (gts2_customer_id, pay_switch_seq_id, time_period)
);
comment on table t_customer_payment_gateway_detail is '客戶支付通道詳細資訊';
comment on column t_customer_payment_gateway_detail.pay_switch_seq_id is '支付通道id';
comment on column t_customer_payment_gateway_detail.time_period is '數據時間類型';
comment on column t_customer_payment_gateway_detail.total_amount is '通道存款總金額';

-- 統計客戶數據執行紀錄
create table office.t_statistics_customer_execution_log
(
    group_id     integer                  not null primary key ,
    execution_date timestamp with time zone not null
);

comment on table office.t_statistics_customer_execution_log is '統計客戶數據執行紀錄';
comment on column office.t_statistics_customer_execution_log.group_id is '執行統計數據組Id';
comment on column office.t_statistics_customer_execution_log.execution_date is '成功執行時間';

核心程式邏輯與實作流程

我們的排程任務是一個每日執行的 Job,其複雜性在於如何處理新用戶與老用戶的數據,並確保異常後的數據修復。

1. 啟動與回溯檢查

  • 啟動:任務啟動時,首先從 t_statistics_customer_execution_log 中查詢指定 group_id 的最新 execution_date

  • 斷點續傳:將最新執行日期與當前日期進行比對,計算出需要「補跑」的天數。如果沒有找到記錄(首次執行),則將起始日期設為當天。任務會進入一個迴圈,逐日處理所有缺失的數據。

2. 分批次處理:新用戶與舊用戶

為了優化效能,任務會將客戶分為兩類進行分批處理:

  • 未統計用戶

    • 定義:從未在 t_customer_payment_gateway_detail 表中有過記錄的客戶。

    • 處理方式:對這些客戶,不採用增量更新。而是透過一個複雜的 SQL 查詢(如日誌中所示的 firstDepositCustomerInitData),一次性計算其從首次存款日到當前處理日期的所有歷史數據,然後直接插入 t_customer_payment_gateway_detail 表中。這是一種「一次性回溯」的初始化策略。

  • 已統計用戶

    • 定義:已在 t_customer_payment_gateway_detail 中有過記錄的客戶。

    • 處理方式:這部分是真正的「增量聚合」。對於這些客戶,我們只統計他們在當前處理日期的新增存款總額,並用這個總額去更新 t_customer_payment_gateway_detail 中各個時間區間的數據。對於「年度」、「半年度」、「季度」等有時效性的統計,還要同步執行過期數據的扣除

3. 數據更新邏輯

  • 每日數據(time_period = 0:這是所有增量更新的基礎。每日任務會重新統計客戶在昨日的存款總額,並使用 ON CONFLICT DO UPDATE 語法進行原子性更新或插入,確保數據的精準。

  • 歷年數據(time_period = 1:這部分數據是純增量。我們只需要將每日數據的總額,加到現有的歷年總額上即可。

  • 有時效性的數據(年度(2)、半年度(3)、季度(4)):這部分更新更為複雜。我們需要將每日總額加到現有總額上,同時減去對應時間區間之外的過期數據總額。這需要一個單獨的邏輯來找到並扣除過期的數據。

給測試人員的說明與測試方式

嗨,測試夥伴!

增量聚合(Incremental Aggregation)這個詞聽起來很技術,我們可以把它比喻成一個日常生活中更簡單、更容易理解的場景。

簡短比喻:家庭開銷記帳

想像一下,您在一個家庭裡負責記錄每個月的總開銷。

  • 傳統做法(非增量聚合):

    • 在每個月底,您會拿出所有過去一年的收據,從一月到十二月,一張一張地加總起來,得到年度總開銷。

    • 下個月,您需要再次從一月開始,把所有收據重新加總一次。這會花費大量時間。

  • 增量聚合做法(更有效率):

    • 您不再從頭開始。您有一個記錄,上面寫著「到昨天為止,年度總開銷是 $120,000」。

    • 今天,您只記錄今天的所有開銷,比如 $3,000。

    • 然後,您只需要做一個簡單的加法:$120,000 (舊總額) + $3,000 (今天新開銷) = $123,000 (新總額)

    • 這樣,您就不需要再處理那些舊的收據,效率大大提升。

將這個比喻帶回您的專案:

  • 客戶的存款統計:我們的系統就像這位記帳員。

  • 傳統做法:每天凌晨,我們需要重新掃描這位客戶「所有」的歷史存款紀錄,從第一筆存款開始,重新計算一次「歷年」、「1年」、「半年」、「3個月」的總額。如果客戶交易歷史很長,這會非常耗時。

  • 增量聚合做法:我們將客戶的「歷年」、「1年」、「半年」、「3個月」的存款總額存到一個單獨的紀錄裡。每天,我們只統計「前一天」的新增存款總額。然後,我們只需要用這個「前一天的總額」,去更新我們的「歷年」、「1年」、「半年」、「3個月」的總額,就像做加法一樣。

增量聚合是一種只處理「新增加」的數據,並用其來更新「總和」的方法,它能讓我們的系統運行得更快、更有效率。

我們的數據統計任務採用了一套非常精密的增量聚合架構。其測試重點不僅是功能正確,更要驗證其在複雜情況下的數據準確性系統健壯性。請按照以下方式進行測試,以確保其品質:

1. 數據正確性驗證

  • 初始化測試

    • 場景:新增一個完全沒有統計數據的新客戶。

    • 驗證:在任務執行後,確認該客戶在 t_customer_payment_gateway_detail 表中是否被正確初始化,且其各個時間區間的存款總額是否與手動從 t_cashin_proposal 總和的結果完全一致。

  • 每日增量測試

    • 場景:為一個已有統計數據的老客戶進行多筆存款。

    • 驗證:在任務執行後,檢查該客戶在 t_customer_payment_gateway_detail 中的「昨日」總額是否與新交易總和一致。同時,驗證其他時間區間(歷年、年度、…)的數據是否在舊數據的基礎上正確地增加了這個「昨日」總額。

  • 跨時效測試

    • 場景:模擬時間流逝,使得一筆交易從「季度」範圍內移出。

    • 驗證:檢查「季度」總額是否正確地減去了該筆交易的金額。這部分可能需要修改系統時間或手動插入歷史數據進行驗證。

2. 任務健壯性與恢復能力

  • 任務中斷與補救

    • 場景:在任務執行過程中,模擬服務中斷。

    • 驗證:重新啟動服務,檢查日誌,確認任務是否能根據 t_statistics_customer_execution_log 中的記錄,自動從中斷點開始,逐日補齊所有數據。最終,所有統計數據應與未中斷情況下的結果一致。

  • 效能測試

    • 場景:在測試環境中,模擬大量的交易數據(例如,一次性插入數十萬筆)。

    • 驗證:觀察排程任務的執行時間。由於本架構對新老客戶進行了分批處理,其執行時間應該遠快於從頭計算所有歷史數據。

透過這些測試,我們不僅能確保數據的精準度,還能為系統在面對故障時的快速恢復能力提供堅實的保障。

Last updated