增量聚合(Incremental Aggregation)
在數據處理與分析領域,我們經常需要對海量數據進行即時或定時的彙總統計。然而,每一次都重新掃描所有歷史數據進行計算,不僅耗時,也給系統帶來沉重的負擔。增量聚合應運而生,它是一種高效的數據處理模式,其核心思想是「只處理新增數據,並用其更新總和」。
本文將針對一個具體需求:「每日統計用戶在不同時間區間(歷年、1年、半年、3個月)的存款總額」,深入探討如何設計一個健壯的增量聚合程式設計架構。
核心架構設計與數據模型
本架構的核心思想是將「歷史數據的初始化」與「每日數據的增量更新」分離處理。透過一個精密的數據模型與排程任務,實現數據的原子性更新與補救。
數據層設計
我們的架構依賴以下幾個關鍵資料表,它們各司其職,共同構建了數據流的基礎:
原始交易表:
t_cashin_proposal
(存款提案表)用途:業務交易數據的源頭。
關鍵欄位:
gts2_customer_id
(客戶 ID)、proposal_date
(提案日期)、trans_amount
(交易金額)、proposal_status
(交易狀態,'2'
代表成功)。
最終統計表:
t_customer_payment_gateway_detail
用途:儲存最終的客戶存款聚合數據。
關鍵欄位:
gts2_customer_id
、pay_switch_seq_id
(支付通道 ID)、time_period
(數據時間類型,0
為昨日,1
為歷年,以此類推)、total_amount
(總金額)。這個表設計的精妙之處在於,它以客戶和支付通道為維度,將不同時間區間的數據扁平化儲存在同一張表中。
任務執行日誌表:
t_statistics_customer_execution_log
用途:追蹤排程任務的成功執行日期,是斷點續傳的關鍵。
關鍵欄位:
group_id
(執行統計的數據組 ID)、execution_date
(成功執行的時間)。
-- 客戶支付通道詳細資訊
create table t_customer_payment_gateway_detail
(
customer_number bigint not null,
gts2_customer_id bigint not null,
gts2_account_id bigint not null,
pay_switch_seq_id bigint not null,
time_period integer not null,
total_amount numeric(10, 2) not null,
create_date timestamp with time zone default now() not null,
update_date timestamp with time zone default now() not null,
constraint t_customer_payment_gateway_detail_unique
unique (gts2_customer_id, pay_switch_seq_id, time_period)
);
comment on table t_customer_payment_gateway_detail is '客戶支付通道詳細資訊';
comment on column t_customer_payment_gateway_detail.pay_switch_seq_id is '支付通道id';
comment on column t_customer_payment_gateway_detail.time_period is '數據時間類型';
comment on column t_customer_payment_gateway_detail.total_amount is '通道存款總金額';
-- 統計客戶數據執行紀錄
create table office.t_statistics_customer_execution_log
(
group_id integer not null primary key ,
execution_date timestamp with time zone not null
);
comment on table office.t_statistics_customer_execution_log is '統計客戶數據執行紀錄';
comment on column office.t_statistics_customer_execution_log.group_id is '執行統計數據組Id';
comment on column office.t_statistics_customer_execution_log.execution_date is '成功執行時間';
核心程式邏輯與實作流程
我們的排程任務是一個每日執行的 Job,其複雜性在於如何處理新用戶與老用戶的數據,並確保異常後的數據修復。
1. 啟動與回溯檢查
啟動:任務啟動時,首先從
t_statistics_customer_execution_log
中查詢指定group_id
的最新execution_date
。斷點續傳:將最新執行日期與當前日期進行比對,計算出需要「補跑」的天數。如果沒有找到記錄(首次執行),則將起始日期設為當天。任務會進入一個迴圈,逐日處理所有缺失的數據。
2. 分批次處理:新用戶與舊用戶
為了優化效能,任務會將客戶分為兩類進行分批處理:
未統計用戶:
定義:從未在
t_customer_payment_gateway_detail
表中有過記錄的客戶。處理方式:對這些客戶,不採用增量更新。而是透過一個複雜的 SQL 查詢(如日誌中所示的
firstDepositCustomerInitData
),一次性計算其從首次存款日到當前處理日期的所有歷史數據,然後直接插入t_customer_payment_gateway_detail
表中。這是一種「一次性回溯」的初始化策略。
已統計用戶:
定義:已在
t_customer_payment_gateway_detail
中有過記錄的客戶。處理方式:這部分是真正的「增量聚合」。對於這些客戶,我們只統計他們在當前處理日期的新增存款總額,並用這個總額去更新
t_customer_payment_gateway_detail
中各個時間區間的數據。對於「年度」、「半年度」、「季度」等有時效性的統計,還要同步執行過期數據的扣除。
3. 數據更新邏輯
每日數據(
time_period = 0
):這是所有增量更新的基礎。每日任務會重新統計客戶在昨日的存款總額,並使用ON CONFLICT DO UPDATE
語法進行原子性更新或插入,確保數據的精準。歷年數據(
time_period = 1
):這部分數據是純增量。我們只需要將每日數據的總額,加到現有的歷年總額上即可。有時效性的數據(年度(2)、半年度(3)、季度(4)):這部分更新更為複雜。我們需要將每日總額加到現有總額上,同時減去對應時間區間之外的過期數據總額。這需要一個單獨的邏輯來找到並扣除過期的數據。
給測試人員的說明與測試方式
嗨,測試夥伴!
增量聚合(Incremental Aggregation)這個詞聽起來很技術,我們可以把它比喻成一個日常生活中更簡單、更容易理解的場景。
簡短比喻:家庭開銷記帳
想像一下,您在一個家庭裡負責記錄每個月的總開銷。
傳統做法(非增量聚合):
在每個月底,您會拿出所有過去一年的收據,從一月到十二月,一張一張地加總起來,得到年度總開銷。
下個月,您需要再次從一月開始,把所有收據重新加總一次。這會花費大量時間。
增量聚合做法(更有效率):
您不再從頭開始。您有一個記錄,上面寫著「到昨天為止,年度總開銷是 $120,000」。
今天,您只記錄今天的所有開銷,比如 $3,000。
然後,您只需要做一個簡單的加法:
$120,000 (舊總額) + $3,000 (今天新開銷) = $123,000 (新總額)
。這樣,您就不需要再處理那些舊的收據,效率大大提升。
將這個比喻帶回您的專案:
客戶的存款統計:我們的系統就像這位記帳員。
傳統做法:每天凌晨,我們需要重新掃描這位客戶「所有」的歷史存款紀錄,從第一筆存款開始,重新計算一次「歷年」、「1年」、「半年」、「3個月」的總額。如果客戶交易歷史很長,這會非常耗時。
增量聚合做法:我們將客戶的「歷年」、「1年」、「半年」、「3個月」的存款總額存到一個單獨的紀錄裡。每天,我們只統計「前一天」的新增存款總額。然後,我們只需要用這個「前一天的總額」,去更新我們的「歷年」、「1年」、「半年」、「3個月」的總額,就像做加法一樣。
增量聚合是一種只處理「新增加」的數據,並用其來更新「總和」的方法,它能讓我們的系統運行得更快、更有效率。
我們的數據統計任務採用了一套非常精密的增量聚合架構。其測試重點不僅是功能正確,更要驗證其在複雜情況下的數據準確性與系統健壯性。請按照以下方式進行測試,以確保其品質:
1. 數據正確性驗證
初始化測試:
場景:新增一個完全沒有統計數據的新客戶。
驗證:在任務執行後,確認該客戶在
t_customer_payment_gateway_detail
表中是否被正確初始化,且其各個時間區間的存款總額是否與手動從t_cashin_proposal
總和的結果完全一致。
每日增量測試:
場景:為一個已有統計數據的老客戶進行多筆存款。
驗證:在任務執行後,檢查該客戶在
t_customer_payment_gateway_detail
中的「昨日」總額是否與新交易總和一致。同時,驗證其他時間區間(歷年、年度、…)的數據是否在舊數據的基礎上正確地增加了這個「昨日」總額。
跨時效測試:
場景:模擬時間流逝,使得一筆交易從「季度」範圍內移出。
驗證:檢查「季度」總額是否正確地減去了該筆交易的金額。這部分可能需要修改系統時間或手動插入歷史數據進行驗證。
2. 任務健壯性與恢復能力
任務中斷與補救:
場景:在任務執行過程中,模擬服務中斷。
驗證:重新啟動服務,檢查日誌,確認任務是否能根據
t_statistics_customer_execution_log
中的記錄,自動從中斷點開始,逐日補齊所有數據。最終,所有統計數據應與未中斷情況下的結果一致。
效能測試:
場景:在測試環境中,模擬大量的交易數據(例如,一次性插入數十萬筆)。
驗證:觀察排程任務的執行時間。由於本架構對新老客戶進行了分批處理,其執行時間應該遠快於從頭計算所有歷史數據。
透過這些測試,我們不僅能確保數據的精準度,還能為系統在面對故障時的快速恢復能力提供堅實的保障。
Last updated