概要
やりたいこと
Cloud Armor ログを BigQuery で検索する。
前回
BigQuery パーティション分割テーブル
パーティション分割テーブルは、BigQuery をパーティションと呼ばれるセグメントにテーブルを分割することで、クエリのパフォーマンスを向上させる。BigQuery はクエリで読み取るバイト数の従量課金制のため、コストを減らすことができる。
BigQuery テーブルを分割する方法は以下。
取り込み時間: データの取り込み(読み込み)時間や受信時間に基づいてテーブルが分割されます。
日付 / タイムスタンプ / 日時:TIMESTAMP列、DATE列、またはDATETIME列に基づいてテーブルが分割されます。
整数範囲: テーブルは整数列に基づいて分割されます
今回は TIMESTAMP 列で分割されている。
Cloud Armor ログの BigQuery でのスキーマ
下記のようなスキーマとして BigQuery にエクスポートされている。
ログ情報
jsonPayload が WAF のメイン情報部分。
- status_details : レスポンスコードを説明するテキスト
- enforced_security_policy : 実際のリクエストの状態
- outcome : 結果(ACCEPT, DENY, UNKNOWN_OUTCOME)
- configured_action : outcome と同じ
- priority : ルールの優先度
- preconfigured_expr_ids : ルールをトリガーしたすべての事前構成済み WAF ルール式の ID
- previewSecurityPolicy : プレビューモードの場合のリクエストの状態
| フィールド名 | タイプ |
| logName | STRING |
| resource | RECORD |
| resource. type | STRING |
| resource. labels | RECORD |
| resource.labels. project_id | STRING |
| resource.labels. url_map_name | STRING |
| resource.labels. zone | STRING |
| resource.labels. forwarding_rule_name | STRING |
| resource.labels. backend_service_name | STRING |
| resource.labels. target_proxy_name | STRING |
| textPayload | STRING |
| jsonpayload_type_loadbalancerlogentry | RECORD |
| jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy | RECORD |
| jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.priority | FLOAT |
| jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.name | STRING |
| jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.outcome | STRING |
| jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.configuredaction | STRING |
| jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.preconfiguredexprids | STRING |
| jsonpayload_type_loadbalancerlogentry._type | STRING |
| jsonpayload_type_loadbalancerlogentry.statusdetails | STRING |
| jsonpayload_type_loadbalancerlogentry. previewsecuritypolicy | RECORD |
| jsonpayload_type_loadbalancerlogentry.previewsecuritypolicy. preconfiguredexprids | STRING |
| jsonpayload_type_loadbalancerlogentry.previewsecuritypolicy. priority | STRING |
| jsonpayload_type_loadbalancerlogentry.previewsecuritypolicy. outcome | STRING |
| jsonpayload_type_loadbalancerlogentry.previewsecuritypolicy. configuredaction | STRING |
| jsonpayload_type_loadbalancerlogentry.previewsecuritypolicy. name | STRING |
| timestamp | TIMESTAMP |
| receiveTimestamp | TIMESTAMP |
| severity | STRING |
| insertId | STRING |
| httpRequest | RECORD |
| httpRequest. requestMethod | STRING |
| httpRequest. requestUrl | STRING |
| httpRequest. requestSize | INTEGER |
| httpRequest. status | INTEGER |
| httpRequest. responseSize | INTEGER |
| httpRequest. userAgent | STRING |
| httpRequest. remoteIp | STRING |
| httpRequest. serverIp | STRING |
| httpRequest. referer | STRING |
| httpRequest. latency | FLOAT |
| httpRequest. cacheLookup | BOOLEAN |
| httpRequest. cacheHit | BOOLEAN |
| httpRequest. cacheValidatedWithOriginServer | BOOLEAN |
| httpRequest. cacheFillBytes | INTEGER |
| httpRequest. protocol | STRING |
| operation | RECORD |
| operation. id | STRING |
| operation. producer | STRING |
| operation. first | BOOLEAN |
| operation. last | BOOLEAN |
| trace | STRING |
| spanId | STRING |
| traceSampled | BOOLEAN |
| sourceLocation | RECORD |
| sourceLocation. file | STRING |
| sourceLocation. line | INTEGER |
| sourceLocation. function | STRING |
クエリ
日付指定
日付
SELECT
count(timestamp)
FROM
partition_waf.requests
WHERE
DATE(timestamp) = '2021-01-23'
範囲
SELECT
count(timestamp)
FROM
partition_waf.requests
WHERE
DATE(timestamp) BETWEEN '2021-01-23' AND '2021-01-24'
WAF 検知・ブロック
WAF が記録した
SELECT
*
FROM
partition_waf.requests
WHERE
severity = 'WARNING'
WAF が検知したがブロックしなかった
SELECT
*
FROM
partition_waf.requests
WHERE
severity = 'WARNING'
AND jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.outcome = 'ACCEPT'
WAF がブロックした
SELECT
count(*)
FROM
partition_waf.requests
WHERE
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.outcome = 'DENY'
ステータスコード一覧
SELECT
jsonpayload_type_loadbalancerlogentry.statusdetails,
count(timestamp) AS cnt
FROM
partition_waf.requests
GROUP BY
jsonpayload_type_loadbalancerlogentry.statusdetails
ORDER BY
cnt DESC
集計
検知ルール一覧
SELECT
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.name,
count(timestamp) AS cnt
FROM
`no_partition.requests_202101*`
GROUP BY
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.name
ORDER BY
cnt DESC
severity, outcome, statusdetailの関係
SELECT
severity,
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.outcome,
jsonpayload_type_loadbalancerlogentry.statusdetails,
count(timestamp) AS cnt
FROM
partition_waf.requests
GROUP BY
severity,
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.outcome,
jsonpayload_type_loadbalancerlogentry.statusdetails
ORDER BY
severity,
cnt DESC
時間ごとのルール検知数
SELECT
SUBSTR(FORMAT_TIMESTAMP('%Y-%m-%d %H:%M:%S', timestamp, 'Asia/Tokyo'), 0, 10) AS time,
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.name,
count(timestamp) AS cnt
FROM
`no_partition.requests_202101*`
GROUP BY
time,
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.name
ORDER BY
time,
cnt DESC
エラー
_PARTITIONTIME がない
パーティション分割テーブルとなる。

取り込み時間パーティション分割テーブルには _PARTITIONTIME という疑似列があるらしいが、ない。

分割フィールドを確認する。

今回は timestamp で
SELECT
*
FROM
partition_waf.requests
WHERE
timestamp = '2021-01-23'
スキャン量が日毎で分割されていることを確認。




コメント