Terraform で Snowpipe を美しく構築する方法が知りたい!

データマネジメントプラットフォーム (DMP) チームのソフトウェアエンジニアの和田(@wxy_zzz)です。

この記事の内容

さーて、今回の estie inside blog は

  • 実現したいこと: 簡単に安全に Snowflake にデータを取り込みたい
  • こんな風に Terraform で作ってみた。が……?
  • 良い方法を知っている方、お話ししましょう!

という内容になっております!それでは本編どうぞ!

実現したいこと:
簡単に安全に Snowflake にデータを取り込みたい

estie では、プロダクトのログデータや、外部から取得したデータ、更にそれらを加工した結果などを、ndjson 形式や csv 形式のテキストファイルデータとして AWS S3 上に配置して管理することが多いです。これらの S3 上のデータを利用するために、S3 から Snowflake にデータを取り込みたいという場面が良くあります。

Snowflake には外部ステージ上のファイルを自動的に Snowflake に取り込むために Snowpipe という仕組みが用意されており、estie ではこの機能を利用してデータ基盤を構築しています(ざっくりとした estie のデータ基盤についてはこちらをご覧ください → コンパウンドスタートアップを目指す estie のデータ基盤の現状 - estie inside blog

Snowflake の公式ドキュメントが充実していることもあり、Snowflake の Admin 権限を持っている DMP チームがとりあえず Snowpipe が動作する状態にするのは比較的簡単です。ただ、estie はコンパウンドスタートアップを目指して複数のプロダクトが同時に立ち上がろうとしている状態にあり、各々のチームが試行錯誤を行っている状態です。データの取り込みもその試行錯誤の一部に当たるため、できるだけ各チームが主体になって(DMP チームがブロッカーにならずに)設定が完了できるような状態が望ましいと考えています。そのような状態を作るために、強い権限を広く配布して「自由にやってください」というのは意図しないデータアクセスが行われてしまう危険性があって避けたいです。一方で、一つ一つの設定に関して「しっかり考えながら設定してください」というのも現場に対する負荷が高く、避けたい選択肢です。

こういった背景の中で実現したいことは

AWS 側でも Snowflake 側でも意図しないデータアクセスがないような状態を維持しながら、各プロダクトのエンジニアがなるべく簡単に Snowpipe の設定を完了できるようにする

です。

こんな風に Terraform で作ってみた。が……?

AWS と Snowflake の連携方針

色々議論した結果、AWS 側の IAM ロール・ポリシー設定と Snowflake 側のストレージ統合の設定部分を都度調整するのは負荷が高そうという結論に至りました。それぞれのチームが必要になったタイミングで AWS と Snowflake の両方のリソースを個別に作成していくと、それらの設定とその後の使われ方についての管理でミスが発生する可能性が上がることが懸念されました。

AWS 側を管理している SRE チームと Snowflake を管理している DMP チームで認識を揃えて管理していくためには、可変な部分を減らしたい。そこで現状は、Snowflake にデータを入れるための共通的に利用する AWS S3 を用意して、その S3 と Snowflake との接続設定は事前に済ませておき、S3 にデータを置くときの key 設定と、Snowflake 側で作成するステージの key 設定をレビューすることで、データアクセスのセキュリティを担保する方針としています。

この記事では、主に右側の Snowflake 側の設定について詳細を書いています。

設定の管理

Snowflake のリソースは基本的に Terraform Snowflake Provider を使ってコード管理しています。

Snowflake に S3 からデータを取り込む Snowpipe を設定したいエンジニアは、Terraform の設定をコピペしてチョロっと設定を変更して Pull Request を上げるような運用を想定しています。

なお、この記事を書いている時点では

を使っています。

実際に作成した Terraform module

ndjson 形式のファイルを取り込む Terraform module

実際に作成した module はこんな感じです(一部細かい設定項目を省略しています)

variable "database_name" { type = string }
variable "schema_name" { type = string }
variable "table_name" { type = string } # stage と pipe も同じ名前で作成される
variable "stage_url" { type = string }
variable "storage_integration" { type = string }
variable "owner_role" { type = string } # 作成する table へアクセス可能な role

resource "snowflake_table" "ndjson_table" {
  database = var.database_name
  schema   = var.schema_name
  name     = var.table_name
  column {
    name     = "RAW_DATA"
    type     = "OBJECT"
    comment  = "そのままの JSON"
  }
  column {
    name = "_FILENAME"
    type = "VARCHAR(16777216)"
  }
  column {
    name = "_FILE_ROW_NUMBER"
    type = "NUMBER(38,0)"
  }
  column {
    name = "_FILE_CONTENT_KEY"
    type = "VARCHAR(16777216)"
  }
  column {
    name = "_FILE_LAST_MODIFIED"
    type = "TIMESTAMP_NTZ(3)"
  }
  column {
    name = "_START_SCAN_TIME"
    type = "TIMESTAMP_LTZ(9)"
  }
}

resource "snowflake_grant_privileges_to_account_role" "ndjson_table" {
  privileges        = ["SELECT"]
  account_role_name = var.owner_role
  on_schema_object {
    object_type = "TABLE"
    object_name = "${var.database_name}.${var.schema_name}.${snowflake_table.ndjson_table.name}"
  }
}

resource "snowflake_stage" "ndjson_stage" {
  database            = var.database_name
  schema              = var.schema_name
  name                = var.table_name
  storage_integration = var.storage_integration
  url                 = var.stage_url
  file_format         = "TYPE = JSON NULL_IF = []"
}

resource "snowflake_pipe" "ndjson_pipe" {
  depends_on  = [snowflake_table.ndjson_table, snowflake_stage.ndjson_stage]
  database    = var.database_name
  schema      = var.schema_name
  name        = var.table_name
  auto_ingest = true
  copy_statement = templatefile("${path.module}/copy_statement.sql", {
    database = var.database_name
    schema   = var.schema_name
    stage    = var.table_name
    table    = var.table_name
  })
}

ロードに使用する copy_statement は以下のように作っています。

COPY INTO ${database}.${schema}.${table} FROM (
  SELECT
      $1::OBJECT
    , METADATA$FILENAME
    , METADATA$FILE_ROW_NUMBER
    , METADATA$FILE_CONTENT_KEY
    , METADATA$FILE_LAST_MODIFIED
    , METADATA$START_SCAN_TIME
  FROM @${database}.${schema}.${stage}
)
FILE_FORMAT = (TYPE = JSON)

module の中で、最低限のテーブル、外部ステージ、アクセス設定、パイプが定義してあり、利用者側は

module "hoge" {
  source              = < ndjson 取り込み設定用 module path >
  database_name       = <自分のチームのデータベース>
  schema_name         = <適当なスキーマを指定>
  table_name          = < NDJSON を取り込むテーブル名>
  stage_url           = < S3 の key >
  storage_integration = <共通で使うストレージ統合。コピペで OK >
  owner_role          = <アクセスを許可する role >
}

これだけ設定すれば、必要な設定が完了するようにしています。

メリデメあるのですが、json をパースせずに object 型のまま取り込むようにしたことで、ndjson 形式でありさえすればどんなデータでも取り込めるようにしています。json の要素の追加や削除などにもある程度耐性があり、ちょっとデータの形式を変えてもパース時にある程度対応可能です。(ただし、利用者側で改めて json をパースする必要があり、それはデメリット)

利用者が何も意識せずともファイルのメタデータも含めて pipe が動作するようになるのが地味に便利で、エラー調査の際に助かったりします。

module のブロックだけをコピペすればいいので比較的気軽に Pull Request が出しやすくなったのではないかと思っています。レビュー時も確認するところが少ないのでかなり楽です。

csv 形式のファイルを取り込む Terraform module

実際に作成した module はこんな感じです(一部細かい設定項目を省略しています)

variable "database_name" { type = string }
variable "schema_name" { type = string }
variable "table_name" { type = string } # stage と pipe も同じ名前で作成される
variable "table_columns" { type = list(object({ name = string, comment = string })) }
variable "stage_url" { type = string }
variable "stage_file_format" { type = string }
variable "storage_integration" { type = string }
variable "pipe_copy_statement" { type = string }
variable "owner_role" { type = string } # 作成する table へアクセス可能な role

resource "snowflake_table" "csv_table" {
  database = var.database_name
  schema   = var.schema_name
  name     = var.table_name
  dynamic "column" {
    # リストの順序を維持するため、インデックスを 0 padding した 4 桁の文字列をキーにしている
    for_each = { for i, v in var.table_columns : format("%.4d", i) => v }
    content {
      name    = column.value.name
      type    = "VARCHAR(16777216)"
      comment = column.value.comment
    }
  }
  column {
    name     = "_FILENAME"
    type     = "VARCHAR(16777216)"
  }
  column {
    name     = "_FILE_ROW_NUMBER"
    type     = "NUMBER(38,0)"
  }
  column {
    name     = "_FILE_CONTENT_KEY"
    type     = "VARCHAR(16777216)"
  }
  column {
    name     = "_FILE_LAST_MODIFIED"
    type     = "TIMESTAMP_NTZ(3)"
  }
  column {
    name     = "_START_SCAN_TIME"
    type     = "TIMESTAMP_LTZ(9)"
  }
}

resource "snowflake_grant_privileges_to_account_role" "csv_table" {
  depends_on        = [snowflake_table.csv_table, ]
  privileges        = ["SELECT"]
  account_role_name = var.owner_role
  on_schema_object {
    object_type = "TABLE"
    object_name = "${var.database_name}.${var.schema_name}.${snowflake_table.csv_table.name}"
  }
}

resource "snowflake_stage" "csv_stage" {
  database            = var.database_name
  schema              = var.schema_name
  name                = var.table_name
  storage_integration = var.storage_integration
  url                 = var.stage_url
  file_format         = var.stage_file_format
}

resource "snowflake_pipe" "csv_pipe" {
  depends_on     = [snowflake_table.csv_table, snowflake_stage.csv_stage]
  database       = var.database_name
  schema         = var.schema_name
  name           = var.table_name
  auto_ingest    = true
  copy_statement = var.pipe_copy_statement
}

利用者側は、

module "hoge" {
  source        = < csv 取り込み設定用 module path >
  database_name = <自分のチームのデータベース>
  schema_name   = <適当なスキーマを指定>
  table_name    = < csv を取り込むテーブル名>
  table_columns = [
    { name = "a", comment = "foo", },
    { name = "b", comment = "bar", },
    { name = "c", comment = "baz", },
  ]
  stage_file_format   = "FIELD_OPTIONALLY_ENCLOSED_BY = \\\" NULL_IF = []"
  stage_url           = < S3 の key >
  storage_integration = <共通で使うストレージ統合。コピペで OK >
  owner_role          = <アクセスを許可する role >
  pipe_copy_statement = <<-EOT
COPY INTO <database_name>.<schema_name>.<table_name> FROM (
  SELECT
      $1 AS "a"
    , $2 AS "b"
    , $3 AS "c"
    , METADATA$FILENAME AS _FILENAME
    , METADATA$FILE_ROW_NUMBER AS _FILE_ROW_NUMBER
    , METADATA$FILE_CONTENT_KEY AS _FILE_CONTENT_KEY
    , METADATA$FILE_LAST_MODIFIED AS _FILE_LAST_MODIFIED
    , METADATA$START_SCAN_TIME AS _START_SCAN_TIME
  FROM @<database_name>.<schema_name>.<table_name> -- table 名と stage 名は同じ
)
FILE_FORMAT = (
  TYPE = CSV
  FIELD_OPTIONALLY_ENCLOSED_BY = '"'
  NULL_IF = ('')
)
;
  EOT
}

というような設定をします。こ、これは苦し紛れ感が凄い……

ndjson を取り込む module と同じような思想で作り始めたのですが、ファイルのメタデータも取り込みつつ、ヘッダのない csv データにも対応したい(そういうものが存在してしまっている)、というのを満たそうとしたところ、このような形に……

禍々しさ故に csv ではない形式でデータファイルを保存しようという気を起こさせるくらいしかメリットは見いだせていない状態です。

まとめ

Snowpipe で S3 上のファイルを取り込むための各種リソースを、なるべく簡単に設定するための Terraform module を紹介しました。

ndjson を取り込むための module はファイルメタデータも自動的に取り込み対象にできるので、比較的嬉しい部分があるのですが、同じようなことを csv の取り込みでも実現しようとした結果、あまりうまくいきませんでした。もっと美しく管理したい……

良い方法を知っている方、お話ししましょう!

今回は、estie の Snowflake で使っている、S3 上のデータを取り込む Snowpipe を構築する Terraform の module を紹介してみました。率直に言って、自分でもあまり完成度に納得していない(特に csv 取り込み module)のですが、今のところクリティカルには困っていないので一旦はこれで運用しています。

ただ、絶対もっと良い方法はありそうな気がしています!もしもっと良い方法をご存知でしたら是非カジュアルにお話しさせていただけると嬉しいです!採用選考に乗らなくたって全然良いです!

こちらから!

hrmos.co

© 2019- estie, inc.