Text⭐さぁ、手を動かすのだ⭐

Chapter 8: モデルの改善と最適化

この章では、作成したdbtモデルの性能改善、テストの追加、ドキュメント化、 監視・運用面での最適化について学習します。プロダクション環境で運用するための実践的な技術を身につけます。

主な学習内容

  • パフォーマンス最適化(インデックス、クラスタリング)
  • データ品質テストの実装
  • 包括的なドキュメント作成
  • データリネージの可視化
  • インクリメンタルモデルの実装
  • マクロとカスタム関数の活用
  • 監視とアラートの設定
  • パフォーマンス監視とチューニング

手順1: パフォーマンス最適化

大量のデータを効率的に処理するための最適化技術を実装します。

クラスタリングキー設定
1 -- クラスタリングキーの設定
2 -- models/marts/core/customer_orders.sql の更新
3 {{ config(
4 materialized='table',
5 cluster_by=['customer_id', 'last_order_date']
6 ) }}
7
8 WITH customer_order_summary AS (
9 SELECT
10 c.customer_id,
11 c.first_name,
12 c.last_name,
13 c.email,
14 c.city,
15 c.state,
16 COUNT(DISTINCT o.order_id) AS total_orders,
17 SUM(o.total_amount) AS total_spent,
18 AVG(o.total_amount) AS avg_order_value,
19 MIN(o.order_date) AS first_order_date,
20 MAX(o.order_date) AS last_order_date,
21 DATEDIFF('day', MIN(o.order_date), MAX(o.order_date)) AS customer_lifetime_days
22 FROM {{ ref('stg_customers') }} c
23 LEFT JOIN {{ ref('stg_orders') }} o ON c.customer_id = o.customer_id
24 GROUP BY 1, 2, 3, 4, 5, 6
25 )
26
27 SELECT
28 *,
29 CASE
30 WHEN total_orders >= 3 THEN 'High Value'
31 WHEN total_orders >= 2 THEN 'Medium Value'
32 ELSE 'Low Value'
33 END AS customer_segment
34 FROM customer_order_summary
インクリメンタルモデル
1 -- インクリメンタルモデルの実装
2 -- models/marts/finance/daily_sales_incremental.sql
3 {{ config(
4 materialized='incremental',
5 unique_key='order_date',
6 on_schema_change='append_new_columns'
7 ) }}
8
9 SELECT
10 o.order_date,
11 COUNT(DISTINCT o.order_id) AS total_orders,
12 COUNT(DISTINCT o.customer_id) AS unique_customers,
13 SUM(o.total_amount) AS daily_revenue,
14 AVG(o.total_amount) AS avg_order_value
15 FROM {{ ref('stg_orders') }} o
16 WHERE o.status = 'completed'
17 {% if is_incremental() %}
18 -- インクリメンタル実行時は、過去7日分のデータのみを更新
19 AND o.order_date >= (SELECT MAX(order_date) - INTERVAL '7 days' FROM {{ this }})
20 {% endif %}
21 GROUP BY o.order_date
22 ORDER BY o.order_date

手順2: 包括的なデータ品質テスト

データ品質を保証するための詳細なテストスイートを実装します。

拡張データ品質テスト
1 # models/staging/ecommerce/_sources.yml の拡張
2 version: 2
3
4 sources:
5 - name: ecommerce
6 description: "Eコマースの生データ"
7 database: raw_data
8 schema: ecommerce
9 tables:
10 - name: customers
11 description: "顧客マスタ"
12 columns:
13 - name: customer_id
14 description: "顧客ID"
15 tests:
16 - unique
17 - not_null
18 - name: email
19 description: "メールアドレス"
20 tests:
21 - not_null
22 - unique
23 - dbt_utils.format_email
24 - name: created_at
25 description: "作成日時"
26 tests:
27 - not_null
28 - dbt_expectations.expect_column_values_to_be_of_type:
29 column_type: timestamp
30
31 - name: orders
32 description: "注文テーブル"
33 columns:
34 - name: order_id
35 description: "注文ID"
36 tests:
37 - unique
38 - not_null
39 - name: customer_id
40 description: "顧客ID"
41 tests:
42 - not_null
43 - relationships:
44 to: source('ecommerce', 'customers')
45 field: customer_id
46 - name: total_amount
47 description: "注文合計金額"
48 tests:
49 - not_null
50 - dbt_expectations.expect_column_values_to_be_between:
51 min_value: 0
52 max_value: 10000
53 - name: status
54 description: "注文ステータス"
55 tests:
56 - not_null
57 - accepted_values:
58 values: ['pending', 'processing', 'completed', 'cancelled']
カスタムテスト実装
1 # カスタムテストの作成
2 # tests/assert_positive_revenue.sql
3 SELECT *
4 FROM {{ ref('daily_sales') }}
5 WHERE daily_revenue <= 0
6
7 # tests/assert_customer_order_consistency.sql
8 -- 顧客の注文数と注文詳細の整合性チェック
9 WITH order_counts AS (
10 SELECT
11 customer_id,
12 COUNT(*) AS order_count_from_orders
13 FROM {{ ref('stg_orders') }}
14 GROUP BY customer_id
15 ),
16 customer_summary_counts AS (
17 SELECT
18 customer_id,
19 total_orders AS order_count_from_summary
20 FROM {{ ref('customer_orders') }}
21 )
22
23 SELECT
24 o.customer_id,
25 o.order_count_from_orders,
26 c.order_count_from_summary
27 FROM order_counts o
28 FULL OUTER JOIN customer_summary_counts c
29 ON o.customer_id = c.customer_id
30 WHERE o.order_count_from_orders != c.order_count_from_summary
31 OR o.order_count_from_orders IS NULL
32 OR c.order_count_from_summary IS NULL

手順3: 包括的なドキュメント作成

モデルの理解と保守性を向上させるための詳細なドキュメントを作成します。

モデルドキュメント定義
1 # models/marts/core/_models.yml
2 version: 2
3
4 models:
5 - name: customer_orders
6 description: |
7 顧客別の注文サマリテーブル。
8 各顧客の注文履歴、売上、行動パターンを分析するためのマートテーブル。
9 マーケティング施策の効果測定や顧客セグメンテーションに使用。
10
11 columns:
12 - name: customer_id
13 description: "顧客の一意識別子"
14 tests:
15 - unique
16 - not_null
17
18 - name: total_orders
19 description: "顧客の総注文回数"
20 tests:
21 - not_null
22 - dbt_expectations.expect_column_values_to_be_between:
23 min_value: 0
24 max_value: 1000
25
26 - name: total_spent
27 description: "顧客の総購入金額(円)"
28 tests:
29 - not_null
30 - dbt_expectations.expect_column_values_to_be_between:
31 min_value: 0
32
33 - name: customer_segment
34 description: |
35 顧客セグメント分類:
36 - High Value: 3回以上の注文
37 - Medium Value: 2回の注文
38 - Low Value: 1回の注文
39 tests:
40 - not_null
41 - accepted_values:
42 values: ['High Value', 'Medium Value', 'Low Value']
43
44 - name: product_performance
45 description: |
46 商品別のパフォーマンス分析テーブル。
47 各商品の売上実績、収益性、人気度を分析するためのマートテーブル。
48 商品戦略や在庫管理の意思決定に使用。
49
50 columns:
51 - name: product_id
52 description: "商品の一意識別子"
53 tests:
54 - unique
55 - not_null
56
57 - name: total_revenue
58 description: "商品の総売上金額"
59 tests:
60 - not_null
61 - dbt_expectations.expect_column_values_to_be_between:
62 min_value: 0
63
64 - name: margin_percent
65 description: "商品の利益率(パーセント)"
66 tests:
67 - not_null
68 - dbt_expectations.expect_column_values_to_be_between:
69 min_value: 0
70 max_value: 100

手順4: マクロとカスタム関数の活用

再利用可能なロジックをマクロとして実装し、コードの重複を削減します。

カスタムマクロ実装
1 # macros/get_customer_segment.sql
2 {% macro get_customer_segment(order_count_column) %}
3 CASE
4 WHEN {{ order_count_column }} >= 3 THEN 'High Value'
5 WHEN {{ order_count_column }} >= 2 THEN 'Medium Value'
6 ELSE 'Low Value'
7 END
8 {% endmacro %}
9
10 # macros/format_currency.sql
11 {% macro format_currency(amount_column, currency='JPY') %}
12 CASE
13 WHEN {{ currency }} = 'JPY' THEN CONCAT('¥', FORMAT({{ amount_column }}, 0))
14 WHEN {{ currency }} = 'USD' THEN CONCAT('$', FORMAT({{ amount_column }}, 2))
15 ELSE FORMAT({{ amount_column }}, 2)
16 END
17 {% endmacro %}
18
19 # macros/safe_divide.sql
20 {% macro safe_divide(numerator, denominator) %}
21 CASE
22 WHEN {{ denominator }} = 0 THEN NULL
23 ELSE {{ numerator }} / {{ denominator }}
24 END
25 {% endmacro %}
26
27 # macros/get_date_spine.sql
28 {% macro get_date_spine(start_date, end_date) %}
29 WITH date_spine AS (
30 SELECT
31 DATEADD('day', seq4(), '{{ start_date }}') AS date_day
32 FROM TABLE(GENERATOR(ROWCOUNT => DATEDIFF('day', '{{ start_date }}', '{{ end_date }}') + 1))
33 )
34 SELECT date_day FROM date_spine
35 {% endmacro %}
マクロ活用例
1 # マクロを使用したモデルの改善
2 # models/marts/core/customer_orders_improved.sql
3 {{ config(materialized='table') }}
4
5 WITH customer_order_summary AS (
6 SELECT
7 c.customer_id,
8 c.first_name,
9 c.last_name,
10 c.email,
11 c.city,
12 c.state,
13 COUNT(DISTINCT o.order_id) AS total_orders,
14 SUM(o.total_amount) AS total_spent,
15 {{ safe_divide('SUM(o.total_amount)', 'COUNT(DISTINCT o.order_id)') }} AS avg_order_value,
16 MIN(o.order_date) AS first_order_date,
17 MAX(o.order_date) AS last_order_date,
18 DATEDIFF('day', MIN(o.order_date), MAX(o.order_date)) AS customer_lifetime_days
19 FROM {{ ref('stg_customers') }} c
20 LEFT JOIN {{ ref('stg_orders') }} o ON c.customer_id = o.customer_id
21 GROUP BY 1, 2, 3, 4, 5, 6
22 )
23
24 SELECT
25 *,
26 {{ get_customer_segment('total_orders') }} AS customer_segment,
27 {{ format_currency('total_spent') }} AS total_spent_formatted,
28 {{ format_currency('avg_order_value') }} AS avg_order_value_formatted
29 FROM customer_order_summary

手順5: 監視とアラートの設定

データパイプラインの健全性を監視し、問題を早期発見するためのアラート機能を実装します。

監視メトリクス実装
1 # models/monitoring/data_quality_metrics.sql
2 {{ config(materialized='table') }}
3
4 WITH quality_metrics AS (
5 -- レコード数のモニタリング
6 SELECT
7 'customers' AS table_name,
8 COUNT(*) AS record_count,
9 COUNT(*) FILTER (WHERE email IS NULL) AS null_email_count,
10 COUNT(DISTINCT email) AS unique_email_count,
11 CURRENT_TIMESTAMP() AS measured_at
12 FROM {{ ref('stg_customers') }}
13
14 UNION ALL
15
16 SELECT
17 'orders' AS table_name,
18 COUNT(*) AS record_count,
19 COUNT(*) FILTER (WHERE total_amount <= 0) AS invalid_amount_count,
20 COUNT(DISTINCT customer_id) AS unique_customer_count,
21 CURRENT_TIMESTAMP() AS measured_at
22 FROM {{ ref('stg_orders') }}
23 ),
24
25 quality_alerts AS (
26 SELECT
27 *,
28 CASE
29 WHEN table_name = 'customers' AND record_count < 100 THEN 'ALERT: Low customer count'
30 WHEN table_name = 'customers' AND null_email_count > 0 THEN 'ALERT: Null emails detected'
31 WHEN table_name = 'orders' AND invalid_amount_count > 0 THEN 'ALERT: Invalid order amounts'
32 ELSE 'OK'
33 END AS quality_status
34 FROM quality_metrics
35 )
36
37 SELECT * FROM quality_alerts
38
39 # models/monitoring/pipeline_health.sql
40 {{ config(materialized='view') }}
41
42 SELECT
43 'dbt_run' AS check_type,
44 CASE
45 WHEN COUNT(*) FROM {{ ref('customer_orders') }} > 0 THEN 'HEALTHY'
46 ELSE 'FAILED'
47 END AS status,
48 CURRENT_TIMESTAMP() AS last_checked
49 FROM {{ ref('customer_orders') }}
50
51 UNION ALL
52
53 SELECT
54 'data_freshness' AS check_type,
55 CASE
56 WHEN MAX(order_date) >= CURRENT_DATE() - INTERVAL '7 days' THEN 'HEALTHY'
57 ELSE 'STALE'
58 END AS status,
59 CURRENT_TIMESTAMP() AS last_checked
60 FROM {{ ref('stg_orders') }}

手順6: パフォーマンス監視とチューニング

クエリの実行時間とリソース使用量を監視し、最適化を行います。

パフォーマンス監視
1 # dbt実行時のパフォーマンス監視
2 dbt run --vars '{"enable_profiling": true}'
3
4 # 特定のモデルの詳細分析
5 dbt run --select customer_orders --profiles-dir ~/.dbt --profile snowflake_profile
6
7 # クエリプランの確認(Snowflakeで実行)
8 EXPLAIN SELECT * FROM dbt_tutorial.customer_orders WHERE customer_segment = 'High Value';
9
10 # リソース使用量の確認
11 SELECT
12 query_text,
13 execution_time,
14 warehouse_size,
15 credits_used,
16 bytes_scanned
17 FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
18 WHERE query_text LIKE '%customer_orders%'
19 AND start_time >= CURRENT_DATE() - INTERVAL '1 day'
20 ORDER BY execution_time DESC;
パフォーマンステスト自動化
1 # パフォーマンステストの自動化
2 # tests/performance/test_query_performance.sql
3 -- 重要なクエリが5秒以内に完了することを確認
4 WITH performance_test AS (
5 SELECT
6 CURRENT_TIMESTAMP() AS start_time,
7 *
8 FROM {{ ref('customer_orders') }}
9 WHERE customer_segment = 'High Value'
10 ),
11 execution_check AS (
12 SELECT
13 COUNT(*) AS result_count,
14 DATEDIFF('second', MIN(start_time), CURRENT_TIMESTAMP()) AS execution_seconds
15 FROM performance_test
16 )
17
18 SELECT *
19 FROM execution_check
20 WHERE execution_seconds > 5 -- 5秒を超える場合はテスト失敗
21
22 # packages.yml(必要なdbtパッケージの追加)
23 packages:
24 - package: dbt-labs/dbt_utils
25 version: 1.1.1
26 - package: calogica/dbt_expectations
27 version: 0.9.0
28 - package: dbt-labs/codegen
29 version: 0.11.0

手順7: 総合テストとレポート

すべての改善を統合してテストし、運用レポートを作成します。

総合テスト実行
1 # 包括的なテスト実行
2 dbt deps # パッケージのインストール
3 dbt seed # シードデータのロード
4 dbt run # 全モデルの実行
5 dbt test # 全テストの実行
6
7 # テスト結果の詳細確認
8 dbt test --store-failures
9
10 # 失敗したテストの詳細確認
11 SELECT * FROM dbt_tutorial.dbt_test_failures;
12
13 # ドキュメント生成と確認
14 dbt docs generate
15 dbt docs serve --port 8080
16
17 # パフォーマンス分析レポート
18 dbt run-operation print_profile_docs
19
20 # 依存関係グラフの生成
21 dbt deps --dry-run
22 dbt run --dry-run
Chapter 8 / 8
完了

Sponsored by

スポンサーを募集中。紹介コンテンツもご用意しますので、ご興味あればお問い合わせください。