こんにちは。estie のデータマネジメント事業部に所属しています万代です。今回は estie レジリサーチの公開に寄せて、その裏側の技術的な内容についてのブログです(estie レジリサーチの説明については PMの勝田のブログをぜひご覧ください!)。
私は普段はオフィスの募集情報(このビルのこの階の広さ何坪の区画は入居者をいくらで募集しているよ、のような情報)を扱っています。世の中にオフィスビルはたくさんあり、estie マーケット調査が扱っているオフィスビルは2024年10月現在8万棟以上、募集情報は200万件以上と大量のデータを扱っています。
しかしながら日本にはオフィスビルよりもずっと多くの住宅が存在します!estie レジリサーチで取り扱うデータも estie マーケット調査(オフィスを扱うプロダクト)とは比較にならないような量があり、執筆時点では建物情報(マンション・アパートの基本情報)は全国220万棟、それらに対して900万件もの募集情報が存在するという規模の大きいものになっています。それらを扱う中でデータを効率的に処理するための工夫が必要だったため、このブログで簡単に紹介できればと思います。
問題設定
estie はさまざまな形でデータ提携を行っており、estie レジリサーチで利用するデータも提携先からいただいています。estie レジリサーチで使うデータは物件の住所、間取り、階数や賃料をはじめとするさまざまな種類のものがありますが、そのほとんどは estie が管理している Snowflake へ Snowpipe 経由で直接インポートされます。
ただ間取り図面や内装写真などの画像データは URL の形で受領しており、URL へアクセスしてから画像をダウンロードし、estie が管理している AWS S3 バケットへ保存する必要があります。提携いただいているデータの量は凄まじく、今回対象とする画像データはさまざまなデータ提携先からのものを含めて数千万〜億枚のオーダーで受領しています。よって逐次的に一つの計算機(計算ノード)でダウンロードしようとすると、一枚0.1秒で処理できるとしても数週間オーダーで処理時間がかかってしまい、期待されている時間およびコストで処理が終わりません。なのでなんとか高速に処理が終了するようにアーキテクチャーを考えなければなりません。
このブログではそのような大量の画像データをどう処理したかについて書いていこうと思います。
処理概要
ここからはどのような処理を実装したかについて記述します。その前により詳細な前提についても触れておきます。
estie レジリサーチで利用するデータは画像データを含め CSV 形式で毎日データをもらっています。そのうち画像レコードに関しては、物件ID (bukken_id
), 画像タイプ (image_type
), 画像URL (url
) という主要な3つのカラムが存在します。画像タイプとはその画像が間取り画像なのか内装画像なのかなどを表す enum 値です。estie では Snowflake 上でデータ基盤を構築しているので、Snowpipe を利用していただいた画像レコードを簡単に取り込むことができます。
またプロダクト上で使いやすい形でデータを提供するために、dbt を利用してデータパイプラインを構築しています。そのデータパイプラインも Snowflake 上に構築しているため、画像をダウンロードした後に、「この物件IDの画像はこのS3パスにあり、画像タイプはこれです」といった情報をまた Snowflake に戻す必要があります。
よって「(1) Snowflake から画像URLを取得し、(2) 画像を S3 にダウンロードしたあと、(3) 物件 ID ごとにどんな画像がどの S3 パスに存在するかを Snowflake に書き込む」という流れの処理を実装するという問題になりました。以下の図は今回最終的に実装した処理の全体像を表しています。
以降の節で一つ一つ解説していきます。
(1) Snowflake から画像 URL を取得する
この処理は特に考えることもなく、Snowflake からデータを普通に取得するだけです。
実装には Python を利用したのですが、以前ブログ(Python経由でSnowflakeのデータ操作をやってみた - estie inside blog) にも書いたとおり snowflake-connector-python を利用することもできます。
今回は Snowpark と呼ばれる Spark ライクな Snowflake 用ライブラリを利用してデータの取得を行いました。Snowpark は estie の中でもいろいろなところで利用されています。
以下のようなコードで Snowflake 上に存在する photo_table から所望のデータを取得することができます。今回は SQL を直接記述する形で実装しました。
session = ... df = session.sql(f""" SELECT bukken_id, URL, image_type FROM photo_table """)
この df
の中にダウンロードする必要がある画像の URL があり、それらを全て処理していきます。
(2) 画像を S3 にダウンロードする
先述の通り逐次的に一つの計算ノードでダウンロードするとものすごく時間がかかるのでなんとかしたい箇所です。複数の計算機を利用して並列にダウンロード処理を行いたいので、今回は分散キューを利用して処理を並列化しました。
AWS で分散キューといえば AWS SQS ですね。これを利用してダウンロード処理を複数の計算機で行えるようにしました。一つの計算ノード (enqueuer) がダウンロードしたい画像のURLをその他の情報とともに SQS に投入します。以下の JSON のようなデータをメッセージとしてキューに入れます。
{ "URL": "http://example.com/test.jpg", "bukken_id": 12345, "image_type": 2 }
これは Python で AWS を操作する際にはお馴染みの boto3 を利用して簡単に行うことができます。以下のような処理で行えるでしょう。
sqs_client = boto3.client("sqs") for row in df: sqs_client.send_message(QueueUrl="...", MessageBody=json.dumps(row))
enqueuer が投入したデータを SQS キューから取り出し、実際にダウンロードしていきます。
この処理を担当する計算ノード (worker) は、SQS キューからデータを取り出し、画像をダウンロードし、AWS S3 に保存します。
worker ノード一つ一つは AWS ECS のタスクとして実装しました。ECS タスクのマネージは AWS Step Functions を利用して、並列度(一度に立ち上がる worker ノード数)を制御しています。Step Functions も SQS にデータが投入されたタイミングで起動するように、AWS EventBridge を利用して制御しています。
(3) 物件 ID ごとにどんな画像がどの S3 パスに存在するかを Snowflake に書き込む
プロダクト上で特定の物件にアクセスした際にその物件に紐づく画像を表示させたいので、物件IDと画像の保存場所をテーブルとして保持したいです。単純にダウンロードした後にMySQL などの RDS へレコードを挿入するということも考えられるのですが、前述の通り estie では Snowflake + dbt でデータパイプラインを構築しているため、Snowflake 上にデータを保存するような処理を考えたいです。
ここで問題になるのが、Snowflake に対する OLTP 的な処理は非常に遅い、ということです。
Snowflake は一度に多くのレコードを取り扱うということは得意なのですが、画像をダウンロードした後に Snowflake へレコードを挿入するということを繰り返す、といったような処理は非常に時間がかかります。
よってバッファリングのようなことを考えたくなるのですが、今回は AWS のマネージドサービスである Amazon Data Firehose を利用して Snowflake へデータを投入するという方法を考えました。
Amazon Data Firehose (旧 Kinesis Firehose) とは公式ページでは以下のようなものだと説明されています。
所有するカスタムHTTPエンドポイントまたはHTTPエンドポイントにリアルタイムのストリーミングデータを配信するためのフルマネージドサービスです。
要は S3 や Redshift などへいい感じにストリーミングデータを送る仕組みです。今回はこの仕組みを利用し、複数のノードからデータを Firehose へ投入し、バッファリングしてから永続化するようにしました。
以下の図のように Firehose へ JSON 形式のレコードを投入し、 いくらかバッファリングした後、S3 に JSONL 形式でデータを保存して Snowpipe を利用して Snowflake へデータを投入する方法を今回は実装しました。
(2) も含めた worker 側の処理は以下のようなものになります。
sqs_client = boto3.client("sqs") s3_client = boto3.client("s3") firehose_client = boto3.client("firehose") def process(): while True: record = sqs_client.get() image = download_image(record["URL"]) path = calc_path(image) # 適当に S3 へのパスを生成する s3_client.put_object(BUCKET, path, image) # kinesis へデータを投入 kinesis_client.put_record( DeliveryStreamName="...", Record={"Data": json.dumps({ "bukken_id": record["bukken_id"], "s3_path": path, "image_type": record["image_type"] }).encode() } )
処理の高速化
以上のような実装で画像ダウンロード処理の並列化をすることができました。
ただ上記のような実装だと処理が期待した通りの速さになりませんでした。実際の処理中では単純な画像ダウンロードだけではなく、別の処理も行なっているのですが、それを加えると 100 images/s 程度の速度しか出ていませんでした(後述の並列数で実行した場合)。コスト的な要件もあり、これより早い処理を求められていました。そこでプロファイリングしつつ処理の高速化を試み、その中でいくつか効果があったものについて以降の節で記載します。
画像ダウンロードの非同期処理化
プロファイルをとってみると worker 側は画像のダウンロードに思ったより時間がかかっていることがわかりました。手っ取り早く高速化するために asyncio の非同期処理を利用して画像ダウンロードをするように変更しました。これに伴い SQS からメッセージを取得する際にも複数のメッセージを一度に取得するように変更しました。
本番コードを一部抜粋したものが以下のスニペットです。 aiohttp
というライブラリを利用することで簡単に実装することができました。
import aiohttp async def download_image(session: aiohttp.ClientSession, url: str) -> bytes | None: try: async with session.get(url) as response: res = await response.read() return res except aiohttp.ClientError as e: logger.warning(f"Failed to download image: {e}", extra={"url": url, "process_type": "download"}) return None async def download(messages: list[QueueMessage[PhotoRecord]]) -> list[WorkerQueueMessage]: async with aiohttp.ClientSession() as session: tasks = [asyncio.ensure_future(download_image(session, message.body.url)) for message in messages] images = await asyncio.gather(*tasks) return [(message, image) for (message, image) in zip(messages, images)]
enqueuer の高速化
enqueuer は単純な処理で特に高速化する必要もなかったと思っていたのですが、 worker 側の高速化をした後にテスト実行すると enqueuer 側の処理が律速になってしまうことがわかりました。元々の実装では Snowflake から取り出したレコードを一つずつ SQS へ投入するようになっていましたが、これを thread 並列化しました。
import threading as t def enqueue_worker(queue: Queue) -> None: sqs_client = boto3.client("sqs") while (d := queue.get()) is not None: sqs_client.send_message(QueueUrl="...", MessageBody=json.dumps(row)) def main(): queue = Queue() workers = [ t.Thread(target=target, args=args, daemon=daemon) for args in [(queue) for _ in range(n_thread)] ] for worker in workers: worker.start() for row in df: queue.put_nowait(row) # 終了したら thread も終了させる for _ in range(len(workers)): queue.put_nowait(None) for worker in workers: worker.join()
これにより幾分か enqueuer の処理を早くすることができました。
他方で SQS へデータを投入する際に複数のレコードを一つのメッセージにするという方式も考えられると思うのですが、デッドレターキュー (DLQ) による処理の再実行の際に考えることが増えてしまうというデメリットがあります。DLQ とは処理に失敗したメッセージを溜めておくことができる別の SQS キューで、コード修正などした後に再度 DLQ 内のメッセージを処理することができます。一つのメッセージに複数のレコードを入れてしまうと、一つのレコードが失敗したら DLQ にメッセージを移すのか、そうでないのか、再実行の際には成功しているレコードが混じるがそれはどうするか、などいろいろ考えることが増えてしまうため、今回は一メッセージにつき一つのレコードという方針で実装しました。
パラメータの調整
ここは正直なところ調整にあまり時間がかけられていないのですが、設定するべきパラメータがいくつか存在します。以下のように設定しました。
- enqueuer の thread 数: 16
- worker の並列数: 50
- SQS から一度に取得するメッセージの数: 10
- ECS タスク (worker) の vCPU/メモリ数: 0.5/1024
上記の条件のもと実行すると 675 images/s 程度で処理することができ、約6倍の高速化をすることができました!
実装してみた感想
AWS のマネージドサービスはやはり使いやすい
今回いろいろな AWS マネージドサービスを利用したのですがやはり使いやすいなと感じました。特にサービス間の連携は使いやすく考えられており設定がしやすかったです。estie では基本的に全ての AWS 上のインフラを terraform で管理しているのですが、terraform でも管理しやすくて実装はスムーズでした。
Snowflake と AWS が loosely-coupled だったので実装がシンプルだった
Snowflake からデータを取得する際には Snowpark を、Snowflake へデータを投入する際には S3 経由の Snowpipe を利用したため実装が比較的シンプルになりました。AWS で動く処理は Snowflake からレコードを取ってくるとき以外は Snowflake を意識せずに実装することができたので、テストなどもしやすかったです。このような構成にできたのは以前ブログ
(Terraform で Snowpipe を美しく構築する方法が知りたい! - estie inside blog)で紹介したような Snowflake へのデータ取り込みの仕組みが整えられていたからなのでデータ基盤チームに感謝です。
またデータの抽出処理が Snowflake で完結することも良かったと思います。例として「すでにダウンロードが完了している画像は処理対象から外す」というような処理も Snowpark で記述できるので Python の dict を取り回すといったようなことをする必要がありませんでした。
大規模処理のテストの難しさを感じた
ローカル環境での開発では主に localstack を利用して AWS 環境を手元で構築し、デバッグを含めたテスト実行を行なっていました。ただこれだと実際のスペックで動作をさせることがもちろんできません。
それによる困難の例としては SQS への enqueue のパフォーマンスボトルネックがあります。ローカル環境ではもちろん 50 並列でプロセスを走らせることは無理なので、実際に AWS で動かすまでは SQS のメッセージがすぐに枯渇してしまい、enqueue の速度が律速であることに気づけませんでした。
楽しい!
どのようなアーキテクチャーを設計するべきか、CPU バウンドな処理・IO バウンドな処理はどれか、ボトルネックになっている箇所はどこか、などいろいろ考えるべきことがあり、それらを踏まえて実装することは非常に楽しかったです。なかなか estie では扱ってこなかったような規模のデータであり、住宅データの量の凄さを味わえました。今後このようなチャレンジングなデータを扱えるということは大変そうでもありつつ面白そうに思います。
最後に
今回は大規模なデータを分散キューを利用して処理するということをお話ししました。実のところ執筆時点では estie レジリサーチに画像はまだ提供できていないのですが、これからすぐに提供できる予定です。
上記に出てきたような、大規模でチャレンジングなデータを扱うためにクラウド上のインフラを整備したり Snowflake を利用して処理を書いたり Python でプログラミングをしたりすることが好きな方、ぜひカジュアル面談しましょう。バックエンドエンジニア(データ)や SRE へのご応募もお待ちしています。