概要
やりたいこと
Cloud Armor ログを BigQuery で検索する。
前回
BigQuery パーティション分割テーブル
パーティション分割テーブルは、BigQuery をパーティションと呼ばれるセグメントにテーブルを分割することで、クエリのパフォーマンスを向上させる。BigQuery はクエリで読み取るバイト数の従量課金制のため、コストを減らすことができる。
BigQuery テーブルを分割する方法は以下。
取り込み時間: データの取り込み(読み込み)時間や受信時間に基づいてテーブルが分割されます。
日付 / タイムスタンプ / 日時:TIMESTAMP
列、DATE
列、またはDATETIME
列に基づいてテーブルが分割されます。
整数範囲: テーブルは整数列に基づいて分割されます
今回は TIMESTAMP 列で分割されている。
Cloud Armor ログの BigQuery でのスキーマ
下記のようなスキーマとして BigQuery にエクスポートされている。
ログ情報
jsonPayload が WAF のメイン情報部分。
- status_details : レスポンスコードを説明するテキスト
- enforced_security_policy : 実際のリクエストの状態
- outcome : 結果(ACCEPT, DENY, UNKNOWN_OUTCOME)
- configured_action : outcome と同じ
- priority : ルールの優先度
- preconfigured_expr_ids : ルールをトリガーしたすべての事前構成済み WAF ルール式の ID
- previewSecurityPolicy : プレビューモードの場合のリクエストの状態
フィールド名 | タイプ |
logName | STRING |
resource | RECORD |
resource. type | STRING |
resource. labels | RECORD |
resource.labels. project_id | STRING |
resource.labels. url_map_name | STRING |
resource.labels. zone | STRING |
resource.labels. forwarding_rule_name | STRING |
resource.labels. backend_service_name | STRING |
resource.labels. target_proxy_name | STRING |
textPayload | STRING |
jsonpayload_type_loadbalancerlogentry | RECORD |
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy | RECORD |
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.priority | FLOAT |
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.name | STRING |
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.outcome | STRING |
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.configuredaction | STRING |
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.preconfiguredexprids | STRING |
jsonpayload_type_loadbalancerlogentry._type | STRING |
jsonpayload_type_loadbalancerlogentry.statusdetails | STRING |
jsonpayload_type_loadbalancerlogentry. previewsecuritypolicy | RECORD |
jsonpayload_type_loadbalancerlogentry.previewsecuritypolicy. preconfiguredexprids | STRING |
jsonpayload_type_loadbalancerlogentry.previewsecuritypolicy. priority | STRING |
jsonpayload_type_loadbalancerlogentry.previewsecuritypolicy. outcome | STRING |
jsonpayload_type_loadbalancerlogentry.previewsecuritypolicy. configuredaction | STRING |
jsonpayload_type_loadbalancerlogentry.previewsecuritypolicy. name | STRING |
timestamp | TIMESTAMP |
receiveTimestamp | TIMESTAMP |
severity | STRING |
insertId | STRING |
httpRequest | RECORD |
httpRequest. requestMethod | STRING |
httpRequest. requestUrl | STRING |
httpRequest. requestSize | INTEGER |
httpRequest. status | INTEGER |
httpRequest. responseSize | INTEGER |
httpRequest. userAgent | STRING |
httpRequest. remoteIp | STRING |
httpRequest. serverIp | STRING |
httpRequest. referer | STRING |
httpRequest. latency | FLOAT |
httpRequest. cacheLookup | BOOLEAN |
httpRequest. cacheHit | BOOLEAN |
httpRequest. cacheValidatedWithOriginServer | BOOLEAN |
httpRequest. cacheFillBytes | INTEGER |
httpRequest. protocol | STRING |
operation | RECORD |
operation. id | STRING |
operation. producer | STRING |
operation. first | BOOLEAN |
operation. last | BOOLEAN |
trace | STRING |
spanId | STRING |
traceSampled | BOOLEAN |
sourceLocation | RECORD |
sourceLocation. file | STRING |
sourceLocation. line | INTEGER |
sourceLocation. function | STRING |
クエリ
日付指定
日付
SELECT
count(timestamp)
FROM
partition_waf.requests
WHERE
DATE(timestamp) = '2021-01-23'
範囲
SELECT
count(timestamp)
FROM
partition_waf.requests
WHERE
DATE(timestamp) BETWEEN '2021-01-23' AND '2021-01-24'
WAF 検知・ブロック
WAF が記録した
SELECT
*
FROM
partition_waf.requests
WHERE
severity = 'WARNING'
WAF が検知したがブロックしなかった
SELECT
*
FROM
partition_waf.requests
WHERE
severity = 'WARNING'
AND jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.outcome = 'ACCEPT'
WAF がブロックした
SELECT
count(*)
FROM
partition_waf.requests
WHERE
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.outcome = 'DENY'
ステータスコード一覧
SELECT
jsonpayload_type_loadbalancerlogentry.statusdetails,
count(timestamp) AS cnt
FROM
partition_waf.requests
GROUP BY
jsonpayload_type_loadbalancerlogentry.statusdetails
ORDER BY
cnt DESC
集計
検知ルール一覧
SELECT
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.name,
count(timestamp) AS cnt
FROM
`no_partition.requests_202101*`
GROUP BY
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.name
ORDER BY
cnt DESC
severity, outcome, statusdetailの関係
SELECT
severity,
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.outcome,
jsonpayload_type_loadbalancerlogentry.statusdetails,
count(timestamp) AS cnt
FROM
partition_waf.requests
GROUP BY
severity,
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.outcome,
jsonpayload_type_loadbalancerlogentry.statusdetails
ORDER BY
severity,
cnt DESC
時間ごとのルール検知数
SELECT
SUBSTR(FORMAT_TIMESTAMP('%Y-%m-%d %H:%M:%S', timestamp, 'Asia/Tokyo'), 0, 10) AS time,
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.name,
count(timestamp) AS cnt
FROM
`no_partition.requests_202101*`
GROUP BY
time,
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.name
ORDER BY
time,
cnt DESC
エラー
_PARTITIONTIME がない
パーティション分割テーブルとなる。
取り込み時間パーティション分割テーブルには _PARTITIONTIME
という疑似列があるらしいが、ない。
分割フィールドを確認する。
今回は timestamp で
SELECT
*
FROM
partition_waf.requests
WHERE
timestamp = '2021-01-23'
スキャン量が日毎で分割されていることを確認。
コメント