Text⭐さぁ、手を動かすのだ⭐

Chapter 4: dbtモデルの開発

この章では、個人の金融取引データを使ってdbtモデルを作成し、データ変換の基本的な処理を学びます。 複数のCSVデータソースをロードし、クリーニング、統合して、分析用のデータマートを構築するまでを実践していきます。

主な学習内容

  • dbtプロジェクトの初期化と構造の理解
  • 複数のCSVデータの準備とSnowflakeへのロード
  • ソーステーブルの定義
  • ステージングモデルの作成(データクリーニングと整形)
  • マートモデルの作成(複数データソースの統合と集計)
  • モデル間の依存関係の構築(ref関数)
  • dbtコマンドの実行とテスト

手順1: dbtプロジェクトの初期化

まず、新しいdbtプロジェクトを作成し、基本的な設定を行います。

dbtプロジェクト初期化
1 # dbtプロジェクトの初期化
2 dbt init dbt_tutorial
3
4 # プロジェクトディレクトリに移動
5 cd dbt_tutorial
6
7 # プロジェクト構造を確認
8 ls -la
9
10 # dbt_project.ymlの内容を確認
11 cat dbt_project.yml

プロジェクトが作成されると、以下のような構造になります:

dbtプロジェクト構造
1 dbt_tutorial/
2 ├── dbt_project.yml # プロジェクト設定ファイル
3 ├── models/ # SQLモデルファイル
4 │ └── example/
5 ├── macros/ # カスタムマクロ
6 ├── seeds/ # CSVデータファイル
7 ├── snapshots/ # スナップショット設定
8 ├── tests/ # カスタムテスト
9 ├── analyses/ # 分析用SQL
10 └── README.md

手順2: サンプルデータの準備

データ変換の練習用に、個人の金融取引に関する4種類のサンプルCSVデータを使用します。 銀行の入出金、クレジットカードの明細、Suicaの利用履歴、そして家計簿アプリのデータです。

まず、これらのCSVファイルをダウンロードします。(ファイルのダウンロードURLは後ほどこちらで設定します。) ダウンロード後、Snowflakeにデータをロードします。

サンプルテーブル作成
1 -- Snowflakeにログインして、以下のSQLを実行
2 use role dbt_role;
3
4 -- スキーマの作成
5 CREATE SCHEMA IF NOT EXISTS dbt_tutorial.personal_finance;
6
7 -- 銀行取引テーブル
8 CREATE OR REPLACE TABLE dbt_tutorial.personal_finance.bank_transactions (
9 "日付" STRING, -- 後で日付に変換します
10 "摘要" STRING,
11 "摘要内容" STRING,
12 "支払い金額" STRING, -- 後で数値に変換します
13 "預かり金額" STRING, -- 後で数値に変換します
14 "差引残高" STRING, -- 後で数値に変換します
15 "メモ" STRING,
16 "未資金化区分" STRING,
17 "入払区分" STRING
18 );
19
20 -- クレジットカード決済テーブル
21 CREATE OR REPLACE TABLE dbt_tutorial.personal_finance.credit_card_transactions (
22 transaction_date STRING, -- 後で日付に変換します
23 store STRING,
24 card_holder STRING,
25 payment_method STRING,
26 extra_field_1 STRING,
27 billing_month STRING,
28 amount STRING, -- 後で数値に変換します
29 total_amount STRING, -- 後で数値に変換します
30 extra_field_2 STRING,
31 extra_field_3 STRING,
32 extra_field_4 STRING,
33 extra_field_5 STRING,
34 extra_field_6 STRING
35 );
36
37 -- Suica利用履歴テーブル
38 CREATE OR REPLACE TABLE dbt_tutorial.personal_finance.suica_transactions (
39 "月" STRING,
40 "日" STRING,
41 "種別1" STRING,
42 "利用駅1" STRING,
43 "種別2" STRING,
44 "利用駅2" STRING,
45 "残高" STRING, -- 後で数値に変換します
46 "入金・利用額" STRING -- 後で数値に変換します
47 );
48
49 -- 家計簿アプリデータテーブル
50 CREATE OR REPLACE TABLE dbt_tutorial.personal_finance.kakeibo_transactions (
51 "日付" STRING, -- 後で日付に変換します
52 "方法" STRING,
53 "カテゴリ" STRING,
54 "カテゴリの内訳" STRING,
55 "支払元" STRING,
56 "入金先" STRING,
57 "品目" STRING,
58 "メモ" STRING,
59 "お店" STRING,
60 "通貨" STRING,
61 "収入" STRING, -- 後で数値に変換します
62 "支出" STRING -- 後で数値に変換します
63 );

テーブルを作成したら、SnowsightのUIを使って、ダウンロードした各CSVファイルを対応するテーブルにロードしてください。 ファイル形式はヘッダーが1行あるCSVです。

手順3: ソーステーブルの定義

dbtでソーステーブル(生データ)を定義し、モデルから参照できるようにします。

ソース定義ファイル作成
1 # modelsディレクトリの構造を作成
2 mkdir -p models/staging/personal_finance
3 mkdir -p models/marts/core
4 mkdir -p models/marts/finance
5
6 # ソース定義ファイルを作成
7 cat > models/staging/personal_finance/_sources.yml << 'EOF'
8 version: 2
9
10 sources:
11 - name: personal_finance
12 description: "個人の金融取引に関する生データ"
13 database: dbt_tutorial
14 schema: personal_finance
15 tables:
16 - name: bank_transactions
17 description: "銀行の入出金データ"
18 - name: credit_card_transactions
19 description: "クレジットカードの決済データ"
20 - name: suica_transactions
21 description: "Suicaの利用履歴データ"
22 - name: kakeibo_transactions
23 description: "家計簿アプリのデータ"
24 EOF

手順4: ステージングモデルの作成

生データを正規化・クリーニングするステージングモデルを作成します。 ここではカラム名を日本語から英語に変換し、簡単なデータ整形を行います。

ステージングモデル作成
1 # 銀行取引ステージングモデル
2 cat > models/staging/personal_finance/stg_bank_transactions.sql << 'EOF'
3 select
4 "日付" as transaction_date,
5 "摘要" as description,
6 "摘要内容" as transaction_category,
7 "支払い金額" as payment_amount,
8 "預かり金額" as deposit_amount,
9 "差引残高" as balance,
10 "未資金化区分" as uncapitalized_flag,
11 "入払区分" as deposit_flag
12 from {{ source('personal_finance', 'bank_transactions') }}
13 EOF
14
15 # クレジットカード決済ステージングモデル
16 cat > models/staging/personal_finance/stg_credit_card_transactions.sql << 'EOF'
17 select
18 transaction_date,
19 store,
20 payment_method,
21 amount,
22 total_amount,
23 extra_field_1,
24 extra_field_2,
25 extra_field_3,
26 extra_field_4,
27 extra_field_5,
28 extra_field_6
29 from {{ source('personal_finance', 'credit_card_transactions') }}
30 EOF
31
32 # Suica利用履歴ステージングモデル
33 cat > models/staging/personal_finance/stg_suica_transactions.sql << 'EOF'
34 select
35 -- Suicaデータは年に情報が含まれていないため、ここでは2025年と仮定します。
36 to_date('2025/' || "月" || '/' || "日", 'YYYY/MM/DD') as transaction_date,
37 "種別1" as transaction_type,
38 "利用駅1" as entry_station,
39 "種別2" as transaction_type_2,
40 "利用駅2" as transaction_station_2,
41 "残高" as balance,
42 "入金・利用額" as amount
43 from {{ source('personal_finance', 'suica_transactions') }}
44 EOF
45
46 # 家計簿アプリデータステージングモデル
47 cat > models/staging/personal_finance/stg_kakeibo_transactions.sql << 'EOF'
48 select
49 "日付" as transaction_date,
50 "方法" as method,
51 "カテゴリ" as category,
52 "カテゴリの内訳" as sub_category,
53 "品目" as item,
54 "お店" as store,
55 "収入" as income,
56 "支出" as expense
57 from {{ source('personal_finance', 'kakeibo_transactions') }}
58 EOF

手順5: マートモデルの作成

ビジネス価値のあるデータマートを作成します。ここでは、すべての取引を統合したファクトテーブルと、月次の財務サマリテーブルを作成します。

マートモデル作成
1 # 全取引ファクトテーブル
2 cat > models/marts/core/fct_transactions.sql << 'EOF'
3 {{ config(materialized='table') }}
4
5 with bank as (
6 select
7 transaction_date,
8 '銀行' as source,
9 coalesce(deposit_amount, 0) - coalesce(payment_amount, 0) as amount,
10 description,
11 transaction_category as category
12 from {{ ref('stg_bank_transactions') }}
13 ),
14
15 credit_card as (
16 select
17 transaction_date,
18 'クレジットカード' as source,
19 amount * -1 as amount,
20 store as description,
21 'カード利用' as category
22 from {{ ref('stg_credit_card_transactions') }}
23 ),
24
25 suica as (
26 select
27 transaction_date,
28 'Suica' as source,
29 amount,
30 case
31 when transaction_type = '物販' then '物販'
32 when transaction_type = 'バス等' then 'バス等利用'
33 when entry_station is not null then entry_station || ' -> ' || exit_station
34 else transaction_type
35 end as description,
36 '交通費' as category
37 from {{ ref('stg_suica_transactions') }}
38 ),
39
40 kakeibo as (
41 select
42 transaction_date,
43 '家計簿' as source,
44 coalesce(income, 0) - coalesce(expense, 0) as amount,
45 coalesce(item, sub_category) as description,
46 category
47 from {{ ref('stg_kakeibo_transactions') }}
48 )
49
50 select * from bank
51 union all
52 select * from credit_card
53 union all
54 select * from suica
55 union all
56 select * from kakeibo
57 EOF
58
59 # 月次財務サマリ
60 cat > models/marts/finance/monthly_summary.sql << 'EOF'
61 {{ config(materialized='table') }}
62
63 select
64 date_trunc('month', transaction_date)::date as year_month,
65 source,
66 sum(case when amount > 0 then amount else 0 end) as monthly_income,
67 sum(case when amount < 0 then amount * -1 else 0 end) as monthly_expense,
68 sum(amount) as monthly_net_change,
69 count(*) as transaction_count
70 from {{ ref('fct_transactions') }}
71 group by 1, 2
72 order by 1, 2
73 EOF

手順6: dbtモデルの実行とテスト

作成したモデルを実行し、正常に動作することを確認します。

dbtコマンド実行
1 # dbt接続確認
2 dbt debug
3
4 # 依存関係の確認 (パッケージを使用している場合)
5 # dbt deps
6
7 # 全モデルの実行
8 dbt run
9
10 # 特定のモデルの実行結果確認
11 dbt run --select stg_bank_transactions
12
13 # 特定のモデルとその依存関係の実行
14 dbt run --select +monthly_summary
15
16 # テストの実行 (今後テストを追加した場合)
17 dbt test
18
19 # ドキュメント生成
20 dbt docs generate
21
22 # ドキュメントサーバー起動(ローカルで確認)
23 dbt docs serve

手順7: 結果の確認

作成したモデルがSnowflakeで正常に動作していることを確認します。

結果確認クエリ
1 -- Snowflakeで以下のクエリを実行して結果を確認
2
3 -- 全取引ファクトテーブルの確認
4 SELECT * FROM dbt_tutorial.fct_transactions ORDER BY transaction_date DESC LIMIT 10;
5
6 -- 月次財務サマリの確認
7 SELECT * FROM dbt_tutorial.monthly_summary ORDER BY year_month, source;
Chapter 4 / 8

Sponsored by

スポンサーを募集中。紹介コンテンツもご用意しますので、ご興味あればお問い合わせください。