HTTP HEAD事前チェック · High-Water Mark逆順スキャン · Score閾値ノイズフィルタリング
公共データポータル 81MB CSV → PostgreSQL サーバーレス同期実践記
問題定義 — なぜ最適化が必要だったのか
建築法規レビューAIシステムで「地区詳細計画告示マッチング」機能を実装していました。VWorld APIで特定住所の地区詳細計画情報を照会すると ntfc_sn(告示シリアル番号)が返されますが、この番号だけでは実際の告示文書にアクセスできません。
eum.go.kr(土地情報ポータル)に告示リストがありますが、そのサイトのWAFがAWS IPレンジをブロックしており、直接API呼び出しが不可能でした。代替案として、公共データポータルから提供される告示リストCSVファイルをPostgreSQL DBに入れてマッチングする戦略を選択しました。
データ規模
| 項目 | 値 |
|---|---|
| CSVファイルサイズ | 約81MB |
| 総レコード数 | 448,100件 |
| うち地区詳細計画関連 | 約28,300件 |
| CSV更新頻度 | 月1〜2回 |
| 新規追加量 | 数十〜数百件/月 |
問題は、この448K件を毎回全てダウンロードしてパースすると約52秒かかることでした。エージェントがユーザーの質問に回答する過程で52秒待たせるわけにはいきません。
制約条件 — サーバーレス環境の限界
システムはAWS Bedrock AgentCore上でサーバーレスに動作します。この環境には以下の制約があります:
ローカルファイルシステムなし:コンテナが毎回新しく起動するため、CSVをローカルにキャッシュできません。
コールドスタートの考慮:最初の呼び出し時でも合理的な応答時間が必要です。
DBが唯一の永続ストレージ:PostgreSQLだけが状態を維持できる場所です。
HTTP Range非対応:data.go.krがRangeリクエストをサポートしておらず、部分ダウンロードが不可能です。
核心的な質問:「毎月数十件しか追加されないのに、毎回448K件を全て処理する必要があるのか?」
全体アーキテクチャ — 3-Layer最適化戦略
「できる限り何もしない」を原則に3段階最適化を設計しました。各Layerで早期に抜け出すほど、全体コストが下がります。
| Layer | 戦略 | コスト | スキップ条件 |
|---|---|---|---|
| Layer 1 | HEADリクエスト → Content-Length比較 | 0.3秒 | ファイルサイズが同じなら即終了 |
| Layer 2 | 全体ダウンロード + 逆順ラインスキャン | ~2秒 | max_seq境界でスキャン中断 |
| Layer 3 | 新規分のみ csv.reader パース + INSERT | ~0.01秒 | 新規件のみ処理 |
この戦略の核心は「ほとんどの呼び出しがLayer 1で終わる」という点です。CSVが更新されていなければ、HEADリクエスト1回(0.3秒)で全プロセスが終了します。
呼び出しフロー:
[HEADリクエスト 0.3秒]
│
├─ Content-Length同じ? ──→ 終了(0.3秒) ← 90%のケース
│
├─ 異なる → [全体ダウンロード ~1.5秒]
│ │
│ [逆順スキャン ~0.1秒]
│ │
│ ├─ 新規0件? ──→ CL保存後終了(~2秒)
│ │
│ └─ 新規N件 → [パース+INSERT ~0.01秒] ──→ 終了(~2秒)
│
└─ HEAD失敗 → 全体ダウンロード進行(fallback)
Layer 1: HEADリクエスト事前チェック
アイデア
HTTP HEADリクエストはレスポンスボディなしでヘッダーのみを返します。Content-LengthをDBに保存しておき、次回呼び出し時に比較すれば、81MBをダウンロードせずにファイルが変更されたかどうかを判断できます。
事前調査 — data.go.kr レスポンスヘッダー
実際のHEADリクエストで確認した結果:
| ヘッダー | 値 | 活用可能性 |
|---|---|---|
| Content-Length | 84,733,371(約81MB) | O — ファイル変更検出に使用 |
| Last-Modified | (なし) | X — 提供されていない |
| ETag | (なし) | X — 提供されていない |
Content-Lengthのみ提供されていますが、CSVに行が追加されるとファイルサイズが必ず変わるので十分です。ETagやLast-Modifiedがあればより正確でしたが、公共データポータルの特性上、期待しにくいです。
Layer 1 実装
# app_settingsテーブルにContent-Length保存
CREATE TABLE IF NOT EXISTS app_settings (
setting_key VARCHAR(100) PRIMARY KEY,
setting_value TEXT,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
# HEADリクエスト → 比較 → スキップ判断
req = urllib.request.Request(url, method='HEAD')
with urllib.request.urlopen(req, timeout=30) as resp:
remote_cl = int(resp.headers.get('Content-Length', 0))
stored_cl = get_stored_content_length() # DBから照会
if remote_cl > 0 and remote_cl == stored_cl:
return {"status": "ok", "message": "変更なし"} # 0.3秒で終了
注意:HEADリクエストは失敗する可能性があるため(タイムアウト、サーバーエラーなど)、fallbackとして全体ダウンロードを進行します。最適化が失敗しても機能自体は動作する必要があります。
Layer 2: 逆順ラインスキャン(High-Water Mark)
High-Water Mark パターン
CDC(Change Data Capture)でよく使用されるパターンです。「最後に処理した地点」を記録しておき、次回はその地点以降のみ処理します。
このCSVの場合、各行の eum_url に seq=NNNNN 形式のシリアル番号があり、新しい告示が追加されるほど seq が増加します。DBから MAX(seq) を照会すれば「どこまで処理したか」が分かります。
CSV構造例:
─────────────────────────────────────────────────────
行 1: "ソウル特別市告示 第2020-001号", ..., seq=100001
行 2: "釜山広域市告示 第2020-002号", ..., seq=100002
...
行 448098: "京畿道告示 第2025-311号", ..., seq=611080 ← DB max_seq
行 448099: "国土交通部告示 第2025-598号", ..., seq=611081 ← 新規
行 448100: "ソウル特別市告示 第2025-155号", ..., seq=611082 ← 新規
─────────────────────────────────────────────────────
なぜ逆順(後ろから)スキャンするのか?
正順(前から)スキャンすると448,100行を全て確認する必要があります。しかし新規行は常にファイルの最後に追加されるため、後ろからスキャンすると既存データ(seq ≤ max_seq)に遭遇した瞬間中断できます。
| スキャン方向 | 新規0件 | 新規5件 | 新規1000件 |
|---|---|---|---|
| 正順(前→後) | 448,100行全体スキャン | 448,100行全体スキャン | 448,100行全体スキャン |
| 逆順(後→前) | 1行スキャン後中断 | 6行スキャン後中断 | 1,001行スキャン後中断 |
月数十〜数百件追加される現実において、逆順スキャンはほぼ定数時間(O(1)に近い)で動作します。
Layer 2 実装
# DBから最後の処理地点を照会
max_seq = db.query("SELECT MAX(seq) FROM gosi_info WHERE eum_url LIKE '%seq=%'")
# CSV全体をメモリにロード(ローカルファイルなし)
csv_text = urllib.request.urlopen(url).read().decode('utf-8-sig')
lines = csv_text.split('\n')
# 後ろからスキャン — max_seq以下に遭遇したら即中断
new_line_indices = []
for i in range(len(lines) - 1, 0, -1):
line = lines[i].strip()
if not line:
continue
seq_pos = line.find('seq=')
if seq_pos == -1:
continue
seq = int(line[seq_pos + 4:].split(',')[0].split('"')[0])
if seq <= max_seq:
break # ここからは既にDBにある → スキャン終了
new_line_indices.append(i)
new_line_indices.reverse() # 元の順序に復元
核心:CSVをDictReaderで全体パースせず、raw文字列から seq= のみを抽出します。パースは新規分に対してのみ実行します。448,100行全体のDictReaderパース 1.60秒 vs 逆順スキャン 0.14秒で、11倍高速です。
Layer 3: 軽量パース + バッチINSERT
逆順スキャンで新規行のインデックスのみを確保した後、該当行のみ csv.reader でパースします。
csv.DictReader vs csv.reader
DictReaderは毎行ごとに辞書を生成するためオーバーヘッドがあります。カラム順序が固定されたCSVではインデックスベースのアクセスがより効率的です。
# CSVカラム順序: 告示番号[0], 告示日[1], URL[2], 告示名[3]
for i in new_line_indices:
parsed = list(csv.reader([lines[i]]))[0]
gosi_no = parsed[0].strip()
gosi_date = parsed[1].strip()
eum_url = parsed[2].strip()
gosi_title = parsed[3].strip()
new_rows.append((gosi_no, gosi_date, gosi_title, eum_url))
csv.readerを使用する理由:str.split(",")の方が高速ですが、値にカンマが含まれる場合(例:「ソウル特別市江南区、瑞草区」)破損します。csv.readerはRFC 4180規格に従ってquoted値を安全に処理します。
バッチINSERT + ON CONFLICT
from psycopg2.extras import execute_batch
execute_batch(cursor,
"""INSERT INTO gosi_info (gosi_no, gosi_date, gosi_date_raw, gosi_title, eum_url,
data_source, updated_at, created_at)
VALUES (%s, %s, %s, %s, %s, 'csv', NOW(), NOW())
ON CONFLICT (eum_url) DO NOTHING""",
new_rows,
page_size=5000,
)
ON CONFLICT DO NOTHING:逆順スキャンが完璧に動作すれば重複INSERTは発生しませんが、eum_urlにUNIQUEインデックスを張りON CONFLICTを追加してセーフティネットを設けます。防御的プログラミングです。
PoC基盤の意思決定 — sido/sigunguカラムの削除
初期設計では gosi_info テーブルに sido(都道府県名)、sigungu(市郡区名)カラムを設け、告示番号からパースして保存する構造でした。「ソウル特別市告示 第2025-155号」から「ソウル特別市」を抽出して sido カラムに保存する方式でした。
データ品質問題の発見
実データを検証したところ、パース品質に問題がありました:
| タイプ | 比率 | 例 |
|---|---|---|
| 正常(都道府県のみ) | ~30% | 「ソウル特別市」 |
| 都道府県+市郡混合 | ~65% | 「京畿道龍仁市」、「忠清南道牙山市」 |
| パース不可 | ~5% | 発行機関未記載 |
65%が誤ったデータなら、クレンジングロジックが必要です。しかし本当にこのカラムは必要でしょうか?
PoC: sidoなしでも同じ結果が得られるか?
「大邱広域市寿城区蓮湖洞 203-2」の住所に対して、ntfc_sn 27000NTC202411080001 でマッチングした結果:
| 検索方式 | 結果数 | 上位1件 |
|---|---|---|
| sidoカラムフィルター | 5件 | 大邱蓮湖公共住宅地区地区計画変更(第4次) [score=0.75] |
| gosi_no LIKEフィルター | 5件 | 大邱蓮湖公共住宅地区地区計画変更(第4次) [score=0.75] |
| フィルターなし(日付+タイトルのみ) | 50件 | 大邱蓮湖公共住宅地区地区計画変更(第4次) [score=0.75] |
結論:3つの方式全て同じ上位結果を返しました。sidoカラムはSQLフィルターで候補を減らしますが、スコアベースのソートで結局同じ結果が上がってきます。特にmin_score=0.3閾値を適用すると50件→2件にノイズが除去されます。
削除のメリット
- 前処理時間短縮:448K件に対するsido/sigungu UPDATEが不要
- INSERT時カラム2つ削減 → 若干のI/O節約
- データ整合性問題(65%エラー)自体が消滅
- コード簡素化:_update_sido_sigungu()関数削除
教訓:「必要そう」でカラムを追加する前に、PoCで実際の必要性を検証してください。今回の場合、65%がエラーのデータをクレンジングするコストより、カラム自体を無くす方が良い選択でした。
Score閾値で検索ノイズ除去
DB同期を最適化した後、次の問題は検索品質でした。ntfc_snからパースした日付(±7日)と「地区詳細計画」キーワードで候補を絞り込んだ後、複数要素スコアを付けてソートする構造ですが — 閾値なしで上位10件を返すとノイズが深刻でした。
問題:上位10件中8件がノイズ
以下は「大邱広域市寿城区蓮湖洞」住所に対して閾値なしで検索した結果です:
| 順位 | 告示名(要約) | score |
|---|---|---|
| 1 | 大邱蓮湖公共住宅地区地区計画変更(第4次) | 0.75 |
| 2 | 大邱蓮湖公共住宅地区地区計画変更(第4次)地形図面 | 0.75 |
| 3 | 蔚山広域市中区半邱洞一帯地区詳細計画… | 0.10 |
| 4 | 江原道原州市杏邱洞一帯地区詳細計画… | 0.10 |
| 5 | 慶尚南道巨済市長坪洞地区詳細計画… | 0.10 |
| … | (同じパターンの無関係な告示) | 0.10 |
| 10 | ソウル特別市瑞草区方背洞地区詳細計画… | 0.10 |
上位2件(score=0.75)のみが実際に関連する告示で、残り8件(score=0.10)は同じ期間に告示された全く別の地域のデータです。0.10点は「日付範囲内に存在する」という基本スコアに過ぎず、実質的なマッチングではありません。
スコア構成分析
マッチングスコアは4つの要素の合算です:
| 要素 | スコア | 条件 |
|---|---|---|
| 基本(日付範囲マッチ) | +0.10 | 告示日がntfc_sn日付±7日以内 |
| 都道府県名マッチ | +0.30 | gosi_noに都道府県名を含む(例:「大邱広域市」) |
| 洞名マッチ | +0.40 | gosi_titleに洞名を含む(例:「蓮湖」) |
| 市郡区ボーナス | +0.20 | gosi_titleに市郡区名を含む(例:「寿城区」) |
スコア分布のGap:関連告示(0.75)と無関係告示(0.10)の間に明確な間隔があります。都道府県名、洞名、市郡区のいずれかがマッチすれば最低0.40以上になるため、0.10〜0.30区間は「日付だけが重なるノイズ」です。
解決:min_score=0.3閾値適用
def search_gosi_by_ntfc_sn(
ntfc_sn: str,
address: str = None,
limit: int = 10,
min_score: float = 0.3 # この1行が核心
) -> List[Dict[str, Any]]:
# ... 候補照会 + スコア計算 ...
for row in rows:
score = calculate_match_score(row, sido_name, dong_name, address)
if score >= min_score: # 閾値未満は破棄
results.append({...})
適用結果
| min_scoreなし | min_score=0.3 | |
|---|---|---|
| 返却件数 | 10件 | 2件 |
| 関連告示 | 2件(score=0.75) | 2件(score=0.75) |
| ノイズ | 8件(score=0.10) | 0件 |
| 精度(Precision) | 20% | 100% |
なぜ0.3か:最も弱い意味あるマッチングは「都道府県名のみ一致」する場合(0.10 + 0.30 = 0.40)です。0.3はこの最小意味マッチングより若干低い値で、日付だけが重なる純粋なノイズ(0.10)を確実にフィルタリングしながらも、弱いマッチングは逃さない保守的な閾値です。実際のテストで0.10と0.40の間のスコアは観測されなかったため、0.2〜0.35範囲のどの値でも同じ効果が得られます。
教訓:検索結果をlimit数まで必ず埋めて返すより、品質基準未達の結果を一切返さない方がユーザー体験に遥かに良いです。「無いことは間違いより良い」。特にAIエージェントがこの結果を消費するパイプラインでは、ノイズは誤った推論の原因になります。
パフォーマンス測定結果
シナリオ別応答時間
| シナリオ | 所要時間 | 説明 |
|---|---|---|
| 初回フルロード | 52.2秒 | ダウンロード(1.5s) + パース(4.2s) + INSERT(46.4s) |
| 増分(HEADスキップ) | 0.32秒 | Content-Length同一 → 即終了 |
| 増分(新規5件) | 2.26秒 | ダウンロード(1.5s) + スキャン(0.1s) + INSERT(0.01s) |
| 増分(新規0件、CL変更) | 2.02秒 | ダウンロード(1.5s) + スキャン(0.1s) + 新規なし |
Layer別時間分解
| ステップ | 時間 | 備考 |
|---|---|---|
| HEADリクエスト | 0.14秒 | ネットワーク遅延のみ |
| 全体ダウンロード(81MB) | 1.5秒 | サーバー→サーバー(AWS内) |
| 逆順スキャン(448K行) | 0.14秒 | 新規0件 → 1行のみ確認 |
| DictReader全体パース(比較用) | 1.60秒 | 最適化前の方式 |
| csv.reader 5件パース | < 0.01秒 | 無視できるレベル |
| execute_batch 5件INSERT | < 0.01秒 | 無視できるレベル |
| execute_batch 448K件INSERT | 46.4秒 | 初回ロード時のみ |
ボトルネック分析:初回フルロードのボトルネックは46.4秒のINSERTです(全体の89%)。しかし初回ロードは1回のみ発生し、以降は増分同期(0.3〜2秒)のみ必要なため、最適化対象から除外しました。COPY FROM方式で改善可能ですが、ROIが低いです。
最適化前後比較(一般的な呼び出し)
| 最適化前 | 最適化後 | 改善率 | |
|---|---|---|---|
| 変更なしの場合 | 52秒(毎回フルロード) | 0.3秒(HEADスキップ) | 173倍 |
| 新規5件の場合 | 52秒(フルパース+全体比較) | 2.3秒(逆順スキャン+増分) | 23倍 |
核心的な教訓
1. HTTPヘッダーを先に確認せよ
データをダウンロードする前に、HEADリクエストで「ダウンロードする必要があるか」を先に判断してください。Content-Length、ETag、Last-Modifiedのいずれか1つがあれば十分です。公共データAPIはこれらのヘッダーをあまり提供しないことが多いですが、Content-Lengthはほとんど提供されます。
2. データ特性を活用したアルゴリズムを設計せよ
「新しいデータは常に最後に追加される」というドメイン知識が逆順スキャンを可能にしました。汎用アルゴリズム(全体スキャン、ハッシュ比較)より、データ特性に合ったアルゴリズムの方が遥かに効率的です。
3. PoCで「必要そうな」機能を検証せよ
sido/sigunguカラムは直感的に必要に見えましたが、実際のPoC結果、無くても同じ結果が得られました。追加のカラム/インデックスはメンテナンスコストとデータ整合性リスクを伴います。コードを書く前にPoC一回で不要な複雑さを避けられます。
4. 最適化レイヤーを分離せよ
一度に完璧な最適化をしようとせず、レイヤー別に分離すれば各レイヤーを独立してテスト・改善できます。Layer 1が失敗してもLayer 2が動作する構造が安定的です。
5. 防御的にコーディングしつつ、パフォーマンスを犠牲にするな
ON CONFLICT DO NOTHING、HEAD失敗時のfallback、日付パース補正(「2020-03-00」→「2020-03-01」)などのセーフティネットを設けながらも、ハッピーパスのパフォーマンスを最大化しました。
6. 検索結果は量より質
閾値1つ(min_score=0.3)でPrecisionが20%から100%になりました。特にAIエージェントが結果を消費するパイプラインでは、ノイズが誤った推論につながります。「無いことは間違いより良い」という原則を適用してください。