Text⭐さぁ、手を動かすのだ⭐

Chapter 4: dbtモデルの開発

この章では、個人の金融取引データを使ってdbtモデルを作成し、データ変換の基本的な処理を学びます。 複数のCSVデータソースをロードし、クリーニング、統合して、分析用のデータマートを構築するまでを実践していきます。

主な学習内容

  • dbtプロジェクトの初期化と構造の理解
  • 複数のCSVデータの準備とSnowflakeへのロード
  • ソーステーブルの定義
  • ステージングモデルの作成(データクリーニングと整形)
  • マートモデルの作成(複数データソースの統合と集計)
  • モデル間の依存関係の構築(ref関数)
  • dbtコマンドの実行とテスト

手順1: dbtプロジェクトの初期化

まず、新しいdbtプロジェクトを作成し、基本的な設定を行います。

dbtプロジェクト初期化
1 # dbtプロジェクトの初期化
2 dbt init dbt_tutorial
3
4 # プロジェクトディレクトリに移動
5 cd dbt_tutorial
6
7 # プロジェクト構造を確認
8 ls -la
9
10 # dbt_project.ymlの内容を確認
11 cat dbt_project.yml

プロジェクトが作成されると、以下のような構造になります:

dbtプロジェクト構造
1 dbt_tutorial/
2 ├── dbt_project.yml # プロジェクト設定ファイル
3 ├── models/ # SQLモデルファイル
4 │ └── example/
5 ├── macros/ # カスタムマクロ
6 ├── seeds/ # CSVデータファイル
7 ├── snapshots/ # スナップショット設定
8 ├── tests/ # カスタムテスト
9 ├── analyses/ # 分析用SQL
10 └── README.md

手順2: サンプルデータの準備

データ変換の練習用に、個人の金融取引に関する4種類のサンプルCSVデータを使用します。 銀行の入出金、クレジットカードの明細、Suicaの利用履歴、そして家計簿アプリのデータです。

まず、これらのCSVファイルをダウンロードします。(ファイルのダウンロードURLは後ほどこちらで設定します。) ダウンロード後、Snowflakeにデータをロードします。

サンプルテーブル作成
1 -- Snowflakeにログインして、以下のSQLを実行
2 -- サンプルデータベースとスキーマの作成
3 CREATE DATABASE IF NOT EXISTS RAW;
4 CREATE SCHEMA IF NOT EXISTS RAW.PUBLIC;
5
6 -- 銀行取引テーブル
7 CREATE OR REPLACE TABLE RAW.PUBLIC.BANK_TRANSACTIONS (
8 "日付" DATE, "摘要" STRING, "摘要内容" STRING, "支払い金額" NUMBER,
9 "預かり金額" NUMBER, "差引残高" NUMBER, "メモ" STRING,
10 "未資金化区分" STRING, "入払区分" STRING
11 );
12
13 -- クレジットカード決済テーブル
14 CREATE OR REPLACE TABLE RAW.PUBLIC.CREDIT_CARD_TRANSACTIONS (
15 "日付" DATE, "利用先" STRING, "利用者" STRING, "支払方法" STRING, "不明1" STRING,
16 "支払月" STRING, "利用金額" NUMBER, "支払総額" NUMBER, "不明2" STRING, "不明3" STRING,
17 "不明4" STRING, "不明5" STRING, "不明6" STRING
18 );
19
20 -- Suica利用履歴テーブル
21 CREATE OR REPLACE TABLE RAW.PUBLIC.SUICA_TRANSACTIONS (
22 "日付" STRING, "種別" STRING, "入場駅" STRING, "退場駅" STRING,
23 "残高" NUMBER, "利用額" NUMBER
24 );
25
26 -- 家計簿アプリデータテーブル
27 CREATE OR REPLACE TABLE RAW.PUBLIC.KAKEIBO_TRANSACTIONS (
28 "日付" DATE, "方法" STRING, "カテゴリ" STRING, "カテゴリの内訳" STRING,
29 "支払元" STRING, "入金先" STRING, "品目" STRING, "メモ" STRING, "お店" STRING,
30 "通貨" STRING, "収入" NUMBER, "支出" NUMBER
31 );

テーブルを作成したら、SnowsightのUIを使って、ダウンロードした各CSVファイルを対応するテーブルにロードしてください。 ファイル形式はヘッダーが1行あるCSVです。

手順3: ソーステーブルの定義

dbtでソーステーブル(生データ)を定義し、モデルから参照できるようにします。

ソース定義ファイル作成
1 # modelsディレクトリの構造を作成
2 mkdir -p models/staging/personal_finance
3 mkdir -p models/marts/core
4 mkdir -p models/marts/finance
5
6 # ソース定義ファイルを作成
7 cat > models/staging/personal_finance/_sources.yml << 'EOF'
8 version: 2
9
10 sources:
11 - name: personal_finance
12 description: "個人の金融取引に関する生データ"
13 database: RAW
14 schema: PUBLIC
15 tables:
16 - name: bank_transactions
17 description: "銀行の入出金データ"
18 - name: credit_card_transactions
19 description: "クレジットカードの決済データ"
20 - name: suica_transactions
21 description: "Suicaの利用履歴データ"
22 - name: kakeibo_transactions
23 description: "家計簿アプリのデータ"
24 EOF

手順4: ステージングモデルの作成

生データを正規化・クリーニングするステージングモデルを作成します。 ここではカラム名を日本語から英語に変換し、簡単なデータ整形を行います。

ステージングモデル作成
1 # 銀行取引ステージングモデル
2 cat > models/staging/personal_finance/stg_bank_transactions.sql << 'EOF'
3 select
4 "日付" as transaction_date,
5 "摘要" as transaction_category,
6 "摘要内容" as description,
7 "支払い金額" as payment_amount,
8 "預かり金額" as deposit_amount,
9 "差引残高" as balance
10 from {{ source('personal_finance', 'bank_transactions') }}
11 EOF
12
13 # クレジットカード決済ステージングモデル
14 cat > models/staging/personal_finance/stg_credit_card_transactions.sql << 'EOF'
15 select
16 "日付" as transaction_date,
17 "利用先" as store,
18 "支払方法" as payment_method,
19 "利用金額" as amount
20 from {{ source('personal_finance', 'credit_card_transactions') }}
21 EOF
22
23 # Suica利用履歴ステージングモデル
24 cat > models/staging/personal_finance/stg_suica_transactions.sql << 'EOF'
25 -- Suicaデータは年に情報が含まれていないため、ここでは2025年と仮定します。
26 select
27 to_date('2025/' || "日付", 'YYYY/MM/DD') as transaction_date,
28 "種別" as transaction_type,
29 "入場駅" as entry_station,
30 "退場駅" as exit_station,
31 "残高" as balance,
32 "利用額" as amount
33 from {{ source('personal_finance', 'suica_transactions') }}
34 EOF
35
36 # 家計簿アプリデータステージングモデル
37 cat > models/staging/personal_finance/stg_kakeibo_transactions.sql << 'EOF'
38 select
39 "日付" as transaction_date,
40 "方法" as method,
41 "カテゴリ" as category,
42 "カテゴリの内訳" as sub_category,
43 "品目" as item,
44 "お店" as store,
45 "収入" as income,
46 "支出" as expense
47 from {{ source('personal_finance', 'kakeibo_transactions') }}
48 EOF

手順5: マートモデルの作成

ビジネス価値のあるデータマートを作成します。ここでは、すべての取引を統合したファクトテーブルと、月次の財務サマリテーブルを作成します。

マートモデル作成
1 # 全取引ファクトテーブル
2 cat > models/marts/core/fct_transactions.sql << 'EOF'
3 {{ config(materialized='table') }}
4
5 with bank as (
6 select
7 transaction_date,
8 '銀行' as source,
9 coalesce(deposit_amount, 0) - coalesce(payment_amount, 0) as amount,
10 description,
11 transaction_category as category
12 from {{ ref('stg_bank_transactions') }}
13 ),
14
15 credit_card as (
16 select
17 transaction_date,
18 'クレジットカード' as source,
19 amount * -1 as amount,
20 store as description,
21 'カード利用' as category
22 from {{ ref('stg_credit_card_transactions') }}
23 ),
24
25 suica as (
26 select
27 transaction_date,
28 'Suica' as source,
29 amount,
30 case
31 when transaction_type = '物販' then '物販'
32 when transaction_type = 'バス等' then 'バス等利用'
33 when entry_station is not null then entry_station || ' -> ' || exit_station
34 else transaction_type
35 end as description,
36 '交通費' as category
37 from {{ ref('stg_suica_transactions') }}
38 ),
39
40 kakeibo as (
41 select
42 transaction_date,
43 '家計簿' as source,
44 coalesce(income, 0) - coalesce(expense, 0) as amount,
45 coalesce(item, sub_category) as description,
46 category
47 from {{ ref('stg_kakeibo_transactions') }}
48 )
49
50 select * from bank
51 union all
52 select * from credit_card
53 union all
54 select * from suica
55 union all
56 select * from kakeibo
57 EOF
58
59 # 月次財務サマリ
60 cat > models/marts/finance/monthly_summary.sql << 'EOF'
61 {{ config(materialized='table') }}
62
63 select
64 date_trunc('month', transaction_date)::date as year_month,
65 source,
66 sum(case when amount > 0 then amount else 0 end) as monthly_income,
67 sum(case when amount < 0 then amount * -1 else 0 end) as monthly_expense,
68 sum(amount) as monthly_net_change,
69 count(*) as transaction_count
70 from {{ ref('fct_transactions') }}
71 group by 1, 2
72 order by 1, 2
73 EOF

手順6: dbtモデルの実行とテスト

作成したモデルを実行し、正常に動作することを確認します。

dbtコマンド実行
1 # dbt接続確認
2 dbt debug
3
4 # 依存関係の確認 (パッケージを使用している場合)
5 # dbt deps
6
7 # 全モデルの実行
8 dbt run
9
10 # 特定のモデルの実行結果確認
11 dbt run --select stg_bank_transactions
12
13 # 特定のモデルとその依存関係の実行
14 dbt run --select +monthly_summary
15
16 # テストの実行 (今後テストを追加した場合)
17 dbt test
18
19 # ドキュメント生成
20 dbt docs generate
21
22 # ドキュメントサーバー起動(ローカルで確認)
23 dbt docs serve

手順7: 結果の確認

作成したモデルがSnowflakeで正常に動作していることを確認します。

結果確認クエリ
1 -- Snowflakeで以下のクエリを実行して結果を確認
2
3 -- 全取引ファクトテーブルの確認
4 SELECT * FROM dbt_tutorial.fct_transactions ORDER BY transaction_date DESC LIMIT 10;
5
6 -- 月次財務サマリの確認
7 SELECT * FROM dbt_tutorial.monthly_summary ORDER BY year_month, source;
Chapter 4 / 8

Sponsored by

スポンサーを募集中。紹介コンテンツもご用意しますので、ご興味あればお問い合わせください。