HTTP HEAD事前チェック · High-Water Mark逆順スキャン · Score閾値ノイズフィルタリング
公共データポータル 81MB CSV → PostgreSQL サーバーレス同期実践記

問題定義 — なぜ最適化が必要だったのか

建築法規レビューAIシステムで「地区詳細計画告示マッチング」機能を実装していました。VWorld APIで特定住所の地区詳細計画情報を照会すると ntfc_sn(告示シリアル番号)が返されますが、この番号だけでは実際の告示文書にアクセスできません。

eum.go.kr(土地情報ポータル)に告示リストがありますが、そのサイトのWAFがAWS IPレンジをブロックしており、直接API呼び出しが不可能でした。代替案として、公共データポータルから提供される告示リストCSVファイルをPostgreSQL DBに入れてマッチングする戦略を選択しました。

データ規模

項目
CSVファイルサイズ 約81MB
総レコード数 448,100件
うち地区詳細計画関連 約28,300件
CSV更新頻度 月1〜2回
新規追加量 数十〜数百件/月

問題は、この448K件を毎回全てダウンロードしてパースすると約52秒かかることでした。エージェントがユーザーの質問に回答する過程で52秒待たせるわけにはいきません。

制約条件 — サーバーレス環境の限界

システムはAWS Bedrock AgentCore上でサーバーレスに動作します。この環境には以下の制約があります:

ローカルファイルシステムなし:コンテナが毎回新しく起動するため、CSVをローカルにキャッシュできません。

コールドスタートの考慮:最初の呼び出し時でも合理的な応答時間が必要です。

DBが唯一の永続ストレージ:PostgreSQLだけが状態を維持できる場所です。

HTTP Range非対応:data.go.krがRangeリクエストをサポートしておらず、部分ダウンロードが不可能です。

核心的な質問:「毎月数十件しか追加されないのに、毎回448K件を全て処理する必要があるのか?」

全体アーキテクチャ — 3-Layer最適化戦略

「できる限り何もしない」を原則に3段階最適化を設計しました。各Layerで早期に抜け出すほど、全体コストが下がります。

Layer 戦略 コスト スキップ条件
Layer 1 HEADリクエスト → Content-Length比較 0.3秒 ファイルサイズが同じなら即終了
Layer 2 全体ダウンロード + 逆順ラインスキャン ~2秒 max_seq境界でスキャン中断
Layer 3 新規分のみ csv.reader パース + INSERT ~0.01秒 新規件のみ処理

この戦略の核心は「ほとんどの呼び出しがLayer 1で終わる」という点です。CSVが更新されていなければ、HEADリクエスト1回(0.3秒)で全プロセスが終了します。

呼び出しフロー:

  [HEADリクエスト 0.3秒]
       ├─ Content-Length同じ? ──→ 終了(0.3秒)     ← 90%のケース
       ├─ 異なる → [全体ダウンロード ~1.5秒]
       │              │
       │         [逆順スキャン ~0.1秒]
       │              │
       │              ├─ 新規0件? ──→ CL保存後終了(~2秒)
       │              │
       │              └─ 新規N件 → [パース+INSERT ~0.01秒] ──→ 終了(~2秒)
       └─ HEAD失敗 → 全体ダウンロード進行(fallback)

Layer 1: HEADリクエスト事前チェック

アイデア

HTTP HEADリクエストはレスポンスボディなしでヘッダーのみを返します。Content-LengthをDBに保存しておき、次回呼び出し時に比較すれば、81MBをダウンロードせずにファイルが変更されたかどうかを判断できます。

事前調査 — data.go.kr レスポンスヘッダー

実際のHEADリクエストで確認した結果:

ヘッダー 活用可能性
Content-Length 84,733,371(約81MB) O — ファイル変更検出に使用
Last-Modified (なし) X — 提供されていない
ETag (なし) X — 提供されていない

Content-Lengthのみ提供されていますが、CSVに行が追加されるとファイルサイズが必ず変わるので十分です。ETagやLast-Modifiedがあればより正確でしたが、公共データポータルの特性上、期待しにくいです。

Layer 1 実装

# app_settingsテーブルにContent-Length保存
CREATE TABLE IF NOT EXISTS app_settings (
    setting_key VARCHAR(100) PRIMARY KEY,
    setting_value TEXT,
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
# HEADリクエスト → 比較 → スキップ判断
req = urllib.request.Request(url, method='HEAD')
with urllib.request.urlopen(req, timeout=30) as resp:
    remote_cl = int(resp.headers.get('Content-Length', 0))

stored_cl = get_stored_content_length()  # DBから照会

if remote_cl > 0 and remote_cl == stored_cl:
    return {"status": "ok", "message": "変更なし"}  # 0.3秒で終了

注意:HEADリクエストは失敗する可能性があるため(タイムアウト、サーバーエラーなど)、fallbackとして全体ダウンロードを進行します。最適化が失敗しても機能自体は動作する必要があります。

Layer 2: 逆順ラインスキャン(High-Water Mark)

High-Water Mark パターン

CDC(Change Data Capture)でよく使用されるパターンです。「最後に処理した地点」を記録しておき、次回はその地点以降のみ処理します。

このCSVの場合、各行の eum_url に seq=NNNNN 形式のシリアル番号があり、新しい告示が追加されるほど seq が増加します。DBから MAX(seq) を照会すれば「どこまで処理したか」が分かります。

CSV構造例:

─────────────────────────────────────────────────────
行 1:     "ソウル特別市告示 第2020-001号", ..., seq=100001
行 2:     "釜山広域市告示 第2020-002号", ..., seq=100002
...
行 448098: "京畿道告示 第2025-311号",    ..., seq=611080  ← DB max_seq
行 448099: "国土交通部告示 第2025-598号", ..., seq=611081  ← 新規
行 448100: "ソウル特別市告示 第2025-155号", ..., seq=611082  ← 新規
─────────────────────────────────────────────────────

なぜ逆順(後ろから)スキャンするのか?

正順(前から)スキャンすると448,100行を全て確認する必要があります。しかし新規行は常にファイルの最後に追加されるため、後ろからスキャンすると既存データ(seq ≤ max_seq)に遭遇した瞬間中断できます。

スキャン方向 新規0件 新規5件 新規1000件
正順(前→後) 448,100行全体スキャン 448,100行全体スキャン 448,100行全体スキャン
逆順(後→前) 1行スキャン後中断 6行スキャン後中断 1,001行スキャン後中断

月数十〜数百件追加される現実において、逆順スキャンはほぼ定数時間(O(1)に近い)で動作します。

Layer 2 実装

# DBから最後の処理地点を照会
max_seq = db.query("SELECT MAX(seq) FROM gosi_info WHERE eum_url LIKE '%seq=%'")

# CSV全体をメモリにロード(ローカルファイルなし)
csv_text = urllib.request.urlopen(url).read().decode('utf-8-sig')
lines = csv_text.split('\n')

# 後ろからスキャン — max_seq以下に遭遇したら即中断
new_line_indices = []
for i in range(len(lines) - 1, 0, -1):
    line = lines[i].strip()
    if not line:
        continue

    seq_pos = line.find('seq=')
    if seq_pos == -1:
        continue

    seq = int(line[seq_pos + 4:].split(',')[0].split('"')[0])

    if seq <= max_seq:
        break              # ここからは既にDBにある → スキャン終了

    new_line_indices.append(i)

new_line_indices.reverse()  # 元の順序に復元

核心:CSVをDictReaderで全体パースせず、raw文字列から seq= のみを抽出します。パースは新規分に対してのみ実行します。448,100行全体のDictReaderパース 1.60秒 vs 逆順スキャン 0.14秒で、11倍高速です。

Layer 3: 軽量パース + バッチINSERT

逆順スキャンで新規行のインデックスのみを確保した後、該当行のみ csv.reader でパースします。

csv.DictReader vs csv.reader

DictReaderは毎行ごとに辞書を生成するためオーバーヘッドがあります。カラム順序が固定されたCSVではインデックスベースのアクセスがより効率的です。

# CSVカラム順序: 告示番号[0], 告示日[1], URL[2], 告示名[3]
for i in new_line_indices:
    parsed = list(csv.reader([lines[i]]))[0]
    gosi_no    = parsed[0].strip()
    gosi_date  = parsed[1].strip()
    eum_url    = parsed[2].strip()
    gosi_title = parsed[3].strip()

    new_rows.append((gosi_no, gosi_date, gosi_title, eum_url))

csv.readerを使用する理由:str.split(",")の方が高速ですが、値にカンマが含まれる場合(例:「ソウル特別市江南区、瑞草区」)破損します。csv.readerはRFC 4180規格に従ってquoted値を安全に処理します。

バッチINSERT + ON CONFLICT

from psycopg2.extras import execute_batch

execute_batch(cursor,
    """INSERT INTO gosi_info (gosi_no, gosi_date, gosi_date_raw, gosi_title, eum_url,
                              data_source, updated_at, created_at)
       VALUES (%s, %s, %s, %s, %s, 'csv', NOW(), NOW())
       ON CONFLICT (eum_url) DO NOTHING""",
    new_rows,
    page_size=5000,
)

ON CONFLICT DO NOTHING:逆順スキャンが完璧に動作すれば重複INSERTは発生しませんが、eum_urlにUNIQUEインデックスを張りON CONFLICTを追加してセーフティネットを設けます。防御的プログラミングです。

PoC基盤の意思決定 — sido/sigunguカラムの削除

初期設計では gosi_info テーブルに sido(都道府県名)、sigungu(市郡区名)カラムを設け、告示番号からパースして保存する構造でした。「ソウル特別市告示 第2025-155号」から「ソウル特別市」を抽出して sido カラムに保存する方式でした。

データ品質問題の発見

実データを検証したところ、パース品質に問題がありました:

タイプ 比率
正常(都道府県のみ) ~30% 「ソウル特別市」
都道府県+市郡混合 ~65% 「京畿道龍仁市」、「忠清南道牙山市」
パース不可 ~5% 発行機関未記載

65%が誤ったデータなら、クレンジングロジックが必要です。しかし本当にこのカラムは必要でしょうか?

PoC: sidoなしでも同じ結果が得られるか?

「大邱広域市寿城区蓮湖洞 203-2」の住所に対して、ntfc_sn 27000NTC202411080001 でマッチングした結果:

検索方式 結果数 上位1件
sidoカラムフィルター 5件 大邱蓮湖公共住宅地区地区計画変更(第4次) [score=0.75]
gosi_no LIKEフィルター 5件 大邱蓮湖公共住宅地区地区計画変更(第4次) [score=0.75]
フィルターなし(日付+タイトルのみ) 50件 大邱蓮湖公共住宅地区地区計画変更(第4次) [score=0.75]

結論:3つの方式全て同じ上位結果を返しました。sidoカラムはSQLフィルターで候補を減らしますが、スコアベースのソートで結局同じ結果が上がってきます。特にmin_score=0.3閾値を適用すると50件→2件にノイズが除去されます。

削除のメリット

  • 前処理時間短縮:448K件に対するsido/sigungu UPDATEが不要
  • INSERT時カラム2つ削減 → 若干のI/O節約
  • データ整合性問題(65%エラー)自体が消滅
  • コード簡素化:_update_sido_sigungu()関数削除

教訓:「必要そう」でカラムを追加する前に、PoCで実際の必要性を検証してください。今回の場合、65%がエラーのデータをクレンジングするコストより、カラム自体を無くす方が良い選択でした。

Score閾値で検索ノイズ除去

DB同期を最適化した後、次の問題は検索品質でした。ntfc_snからパースした日付(±7日)と「地区詳細計画」キーワードで候補を絞り込んだ後、複数要素スコアを付けてソートする構造ですが — 閾値なしで上位10件を返すとノイズが深刻でした。

問題:上位10件中8件がノイズ

以下は「大邱広域市寿城区蓮湖洞」住所に対して閾値なしで検索した結果です:

順位 告示名(要約) score
1 大邱蓮湖公共住宅地区地区計画変更(第4次) 0.75
2 大邱蓮湖公共住宅地区地区計画変更(第4次)地形図面 0.75
3 蔚山広域市中区半邱洞一帯地区詳細計画… 0.10
4 江原道原州市杏邱洞一帯地区詳細計画… 0.10
5 慶尚南道巨済市長坪洞地区詳細計画… 0.10
(同じパターンの無関係な告示) 0.10
10 ソウル特別市瑞草区方背洞地区詳細計画… 0.10

上位2件(score=0.75)のみが実際に関連する告示で、残り8件(score=0.10)は同じ期間に告示された全く別の地域のデータです。0.10点は「日付範囲内に存在する」という基本スコアに過ぎず、実質的なマッチングではありません。

スコア構成分析

マッチングスコアは4つの要素の合算です:

要素 スコア 条件
基本(日付範囲マッチ) +0.10 告示日がntfc_sn日付±7日以内
都道府県名マッチ +0.30 gosi_noに都道府県名を含む(例:「大邱広域市」)
洞名マッチ +0.40 gosi_titleに洞名を含む(例:「蓮湖」)
市郡区ボーナス +0.20 gosi_titleに市郡区名を含む(例:「寿城区」)

スコア分布のGap:関連告示(0.75)と無関係告示(0.10)の間に明確な間隔があります。都道府県名、洞名、市郡区のいずれかがマッチすれば最低0.40以上になるため、0.10〜0.30区間は「日付だけが重なるノイズ」です。

解決:min_score=0.3閾値適用

def search_gosi_by_ntfc_sn(
    ntfc_sn: str,
    address: str = None,
    limit: int = 10,
    min_score: float = 0.3    # この1行が核心
) -> List[Dict[str, Any]]:

    # ... 候補照会 + スコア計算 ...

    for row in rows:
        score = calculate_match_score(row, sido_name, dong_name, address)
        if score >= min_score:     # 閾値未満は破棄
            results.append({...})

適用結果

min_scoreなし min_score=0.3
返却件数 10件 2件
関連告示 2件(score=0.75) 2件(score=0.75)
ノイズ 8件(score=0.10) 0件
精度(Precision) 20% 100%

なぜ0.3か:最も弱い意味あるマッチングは「都道府県名のみ一致」する場合(0.10 + 0.30 = 0.40)です。0.3はこの最小意味マッチングより若干低い値で、日付だけが重なる純粋なノイズ(0.10)を確実にフィルタリングしながらも、弱いマッチングは逃さない保守的な閾値です。実際のテストで0.10と0.40の間のスコアは観測されなかったため、0.2〜0.35範囲のどの値でも同じ効果が得られます。

教訓:検索結果をlimit数まで必ず埋めて返すより、品質基準未達の結果を一切返さない方がユーザー体験に遥かに良いです。「無いことは間違いより良い」。特にAIエージェントがこの結果を消費するパイプラインでは、ノイズは誤った推論の原因になります。

パフォーマンス測定結果

シナリオ別応答時間

シナリオ 所要時間 説明
初回フルロード 52.2秒 ダウンロード(1.5s) + パース(4.2s) + INSERT(46.4s)
増分(HEADスキップ) 0.32秒 Content-Length同一 → 即終了
増分(新規5件) 2.26秒 ダウンロード(1.5s) + スキャン(0.1s) + INSERT(0.01s)
増分(新規0件、CL変更) 2.02秒 ダウンロード(1.5s) + スキャン(0.1s) + 新規なし

Layer別時間分解

ステップ 時間 備考
HEADリクエスト 0.14秒 ネットワーク遅延のみ
全体ダウンロード(81MB) 1.5秒 サーバー→サーバー(AWS内)
逆順スキャン(448K行) 0.14秒 新規0件 → 1行のみ確認
DictReader全体パース(比較用) 1.60秒 最適化前の方式
csv.reader 5件パース < 0.01秒 無視できるレベル
execute_batch 5件INSERT < 0.01秒 無視できるレベル
execute_batch 448K件INSERT 46.4秒 初回ロード時のみ

ボトルネック分析:初回フルロードのボトルネックは46.4秒のINSERTです(全体の89%)。しかし初回ロードは1回のみ発生し、以降は増分同期(0.3〜2秒)のみ必要なため、最適化対象から除外しました。COPY FROM方式で改善可能ですが、ROIが低いです。

最適化前後比較(一般的な呼び出し)

最適化前 最適化後 改善率
変更なしの場合 52秒(毎回フルロード) 0.3秒(HEADスキップ) 173倍
新規5件の場合 52秒(フルパース+全体比較) 2.3秒(逆順スキャン+増分) 23倍

核心的な教訓

1. HTTPヘッダーを先に確認せよ

データをダウンロードする前に、HEADリクエストで「ダウンロードする必要があるか」を先に判断してください。Content-Length、ETag、Last-Modifiedのいずれか1つがあれば十分です。公共データAPIはこれらのヘッダーをあまり提供しないことが多いですが、Content-Lengthはほとんど提供されます。

2. データ特性を活用したアルゴリズムを設計せよ

「新しいデータは常に最後に追加される」というドメイン知識が逆順スキャンを可能にしました。汎用アルゴリズム(全体スキャン、ハッシュ比較)より、データ特性に合ったアルゴリズムの方が遥かに効率的です。

3. PoCで「必要そうな」機能を検証せよ

sido/sigunguカラムは直感的に必要に見えましたが、実際のPoC結果、無くても同じ結果が得られました。追加のカラム/インデックスはメンテナンスコストとデータ整合性リスクを伴います。コードを書く前にPoC一回で不要な複雑さを避けられます。

4. 最適化レイヤーを分離せよ

一度に完璧な最適化をしようとせず、レイヤー別に分離すれば各レイヤーを独立してテスト・改善できます。Layer 1が失敗してもLayer 2が動作する構造が安定的です。

5. 防御的にコーディングしつつ、パフォーマンスを犠牲にするな

ON CONFLICT DO NOTHING、HEAD失敗時のfallback、日付パース補正(「2020-03-00」→「2020-03-01」)などのセーフティネットを設けながらも、ハッピーパスのパフォーマンスを最大化しました。

6. 検索結果は量より質

閾値1つ(min_score=0.3)でPrecisionが20%から100%になりました。特にAIエージェントが結果を消費するパイプラインでは、ノイズが誤った推論につながります。「無いことは間違いより良い」という原則を適用してください。