Cloud Armor ログを BigQuery で検索する
目次
概要
やりたいこと
Cloud Armor ログを BigQuery で検索する。
前回
BigQuery パーティション分割テーブル
パーティション分割テーブルは、BigQuery をパーティションと呼ばれるセグメントにテーブルを分割することで、クエリのパフォーマンスを向上させる。BigQuery はクエリで読み取るバイト数の従量課金制のため、コストを減らすことができる。
BigQuery テーブルを分割する方法は以下。
取り込み時間: データの取り込み(読み込み)時間や受信時間に基づいてテーブルが分割されます。
日付 / タイムスタンプ / 日時:TIMESTAMP
列、DATE
列、またはDATETIME
列に基づいてテーブルが分割されます。
整数範囲: テーブルは整数列に基づいて分割されます
今回は TIMESTAMP 列で分割されている。
Cloud Armor ログの BigQuery でのスキーマ
下記のようなスキーマとして BigQuery にエクスポートされている。
ログ情報
jsonPayload が WAF のメイン情報部分。
- status_details : レスポンスコードを説明するテキスト
- enforced_security_policy : 実際のリクエストの状態
- outcome : 結果(ACCEPT, DENY, UNKNOWN_OUTCOME)
- configured_action : outcome と同じ
- priority : ルールの優先度
- preconfigured_expr_ids : ルールをトリガーしたすべての事前構成済み WAF ルール式の ID
- previewSecurityPolicy : プレビューモードの場合のリクエストの状態
フィールド名 | タイプ |
logName | STRING |
resource | RECORD |
resource. type | STRING |
resource. labels | RECORD |
resource.labels. project_id | STRING |
resource.labels. url_map_name | STRING |
resource.labels. zone | STRING |
resource.labels. forwarding_rule_name | STRING |
resource.labels. backend_service_name | STRING |
resource.labels. target_proxy_name | STRING |
textPayload | STRING |
jsonpayload_type_loadbalancerlogentry | RECORD |
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy | RECORD |
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.priority | FLOAT |
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.name | STRING |
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.outcome | STRING |
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.configuredaction | STRING |
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.preconfiguredexprids | STRING |
jsonpayload_type_loadbalancerlogentry._type | STRING |
jsonpayload_type_loadbalancerlogentry.statusdetails | STRING |
jsonpayload_type_loadbalancerlogentry. previewsecuritypolicy | RECORD |
jsonpayload_type_loadbalancerlogentry.previewsecuritypolicy. preconfiguredexprids | STRING |
jsonpayload_type_loadbalancerlogentry.previewsecuritypolicy. priority | STRING |
jsonpayload_type_loadbalancerlogentry.previewsecuritypolicy. outcome | STRING |
jsonpayload_type_loadbalancerlogentry.previewsecuritypolicy. configuredaction | STRING |
jsonpayload_type_loadbalancerlogentry.previewsecuritypolicy. name | STRING |
timestamp | TIMESTAMP |
receiveTimestamp | TIMESTAMP |
severity | STRING |
insertId | STRING |
httpRequest | RECORD |
httpRequest. requestMethod | STRING |
httpRequest. requestUrl | STRING |
httpRequest. requestSize | INTEGER |
httpRequest. status | INTEGER |
httpRequest. responseSize | INTEGER |
httpRequest. userAgent | STRING |
httpRequest. remoteIp | STRING |
httpRequest. serverIp | STRING |
httpRequest. referer | STRING |
httpRequest. latency | FLOAT |
httpRequest. cacheLookup | BOOLEAN |
httpRequest. cacheHit | BOOLEAN |
httpRequest. cacheValidatedWithOriginServer | BOOLEAN |
httpRequest. cacheFillBytes | INTEGER |
httpRequest. protocol | STRING |
operation | RECORD |
operation. id | STRING |
operation. producer | STRING |
operation. first | BOOLEAN |
operation. last | BOOLEAN |
trace | STRING |
spanId | STRING |
traceSampled | BOOLEAN |
sourceLocation | RECORD |
sourceLocation. file | STRING |
sourceLocation. line | INTEGER |
sourceLocation. function | STRING |
クエリ
日付指定
日付
SELECT count(timestamp) FROM partition_waf.requests WHERE DATE(timestamp) = '2021-01-23'
範囲
SELECT count(timestamp) FROM partition_waf.requests WHERE DATE(timestamp) BETWEEN '2021-01-23' AND '2021-01-24'
WAF 検知・ブロック
WAF が記録した
SELECT * FROM partition_waf.requests WHERE severity = 'WARNING'
WAF が検知したがブロックしなかった
SELECT * FROM partition_waf.requests WHERE severity = 'WARNING' AND jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.outcome = 'ACCEPT'
WAF がブロックした
SELECT count(*) FROM partition_waf.requests WHERE jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.outcome = 'DENY'
ステータスコード一覧
SELECT jsonpayload_type_loadbalancerlogentry.statusdetails, count(timestamp) AS cnt FROM partition_waf.requests GROUP BY jsonpayload_type_loadbalancerlogentry.statusdetails ORDER BY cnt DESC
集計
検知ルール一覧
SELECT jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.name, count(timestamp) AS cnt FROM `no_partition.requests_202101*` GROUP BY jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.name ORDER BY cnt DESC
severity, outcome, statusdetailの関係
SELECT severity, jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.outcome, jsonpayload_type_loadbalancerlogentry.statusdetails, count(timestamp) AS cnt FROM partition_waf.requests GROUP BY severity, jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.outcome, jsonpayload_type_loadbalancerlogentry.statusdetails ORDER BY severity, cnt DESC
時間ごとのルール検知数
SELECT SUBSTR(FORMAT_TIMESTAMP('%Y-%m-%d %H:%M:%S', timestamp, 'Asia/Tokyo'), 0, 10) AS time, jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.name, count(timestamp) AS cnt FROM `no_partition.requests_202101*` GROUP BY time, jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.name ORDER BY time, cnt DESC
エラー
_PARTITIONTIME がない
パーティション分割テーブルとなる。

取り込み時間パーティション分割テーブルには _PARTITIONTIME
という疑似列があるらしいが、ない。

分割フィールドを確認する。

今回は timestamp で
SELECT * FROM partition_waf.requests WHERE timestamp = '2021-01-23'
スキャン量が日毎で分割されていることを確認。
ディスカッション
コメント一覧
まだ、コメントがありません