Cloud Armor ログを BigQuery で検索する

2021年2月24日

概要

やりたいこと

Cloud Armor ログを BigQuery で検索する。

前回

BigQuery パーティション分割テーブル

パーティション分割テーブルは、BigQuery をパーティションと呼ばれるセグメントにテーブルを分割することで、クエリのパフォーマンスを向上させる。BigQuery はクエリで読み取るバイト数の従量課金制のため、コストを減らすことができる。

BigQuery テーブルを分割する方法は以下。

取り込み時間: データの取り込み(読み込み)時間や受信時間に基づいてテーブルが分割されます。
日付 / タイムスタンプ / 日時TIMESTAMP 列、DATE 列、または DATETIME 列に基づいてテーブルが分割されます。
整数範囲: テーブルは整数列に基づいて分割されます

今回は TIMESTAMP 列で分割されている。

Cloud Armor ログの BigQuery でのスキーマ

下記のようなスキーマとして BigQuery にエクスポートされている。

ログ情報

jsonPayload が WAF のメイン情報部分。

  • status_details : レスポンスコードを説明するテキスト
  • enforced_security_policy : 実際のリクエストの状態
    • outcome : 結果(ACCEPT, DENY, UNKNOWN_OUTCOME)
    • configured_action : outcome と同じ
    • priority : ルールの優先度
    • preconfigured_expr_ids : ルールをトリガーしたすべての事前構成済み WAF ルール式の ID
  • previewSecurityPolicy : プレビューモードの場合のリクエストの状態
フィールド名タイプ
logNameSTRING
resourceRECORD
resource. typeSTRING
resource. labelsRECORD
resource.labels. project_idSTRING
resource.labels. url_map_nameSTRING
resource.labels. zoneSTRING
resource.labels. forwarding_rule_nameSTRING
resource.labels. backend_service_nameSTRING
resource.labels. target_proxy_nameSTRING
textPayloadSTRING
jsonpayload_type_loadbalancerlogentryRECORD
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicyRECORD
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.priorityFLOAT
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.nameSTRING
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.outcomeSTRING
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.configuredactionSTRING
jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.preconfiguredexpridsSTRING
jsonpayload_type_loadbalancerlogentry._typeSTRING
jsonpayload_type_loadbalancerlogentry.statusdetailsSTRING
jsonpayload_type_loadbalancerlogentry. previewsecuritypolicyRECORD
jsonpayload_type_loadbalancerlogentry.previewsecuritypolicy. preconfiguredexpridsSTRING
jsonpayload_type_loadbalancerlogentry.previewsecuritypolicy. prioritySTRING
jsonpayload_type_loadbalancerlogentry.previewsecuritypolicy. outcomeSTRING
jsonpayload_type_loadbalancerlogentry.previewsecuritypolicy. configuredactionSTRING
jsonpayload_type_loadbalancerlogentry.previewsecuritypolicy. nameSTRING
timestampTIMESTAMP
receiveTimestampTIMESTAMP
severitySTRING
insertIdSTRING
httpRequestRECORD
httpRequest. requestMethodSTRING
httpRequest. requestUrlSTRING
httpRequest. requestSizeINTEGER
httpRequest. statusINTEGER
httpRequest. responseSizeINTEGER
httpRequest. userAgentSTRING
httpRequest. remoteIpSTRING
httpRequest. serverIpSTRING
httpRequest. refererSTRING
httpRequest. latencyFLOAT
httpRequest. cacheLookupBOOLEAN
httpRequest. cacheHitBOOLEAN
httpRequest. cacheValidatedWithOriginServerBOOLEAN
httpRequest. cacheFillBytesINTEGER
httpRequest. protocolSTRING
operationRECORD
operation. idSTRING
operation. producerSTRING
operation. firstBOOLEAN
operation. lastBOOLEAN
traceSTRING
spanIdSTRING
traceSampledBOOLEAN
sourceLocationRECORD
sourceLocation. fileSTRING
sourceLocation. lineINTEGER
sourceLocation. functionSTRING

クエリ

日付指定

日付

SELECT
  count(timestamp)
FROM
  partition_waf.requests
WHERE
  DATE(timestamp) = '2021-01-23'

範囲

SELECT
  count(timestamp)
FROM
  partition_waf.requests
WHERE
  DATE(timestamp) BETWEEN '2021-01-23' AND '2021-01-24'

WAF 検知・ブロック

WAF が記録した

SELECT
  *
FROM
  partition_waf.requests
WHERE
  severity = 'WARNING'

WAF が検知したがブロックしなかった

SELECT
  *
FROM
  partition_waf.requests
WHERE
  severity = 'WARNING'
  AND jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.outcome = 'ACCEPT'

WAF がブロックした

SELECT
  count(*)
FROM
  partition_waf.requests
WHERE 
  jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.outcome = 'DENY'

ステータスコード一覧

SELECT
  jsonpayload_type_loadbalancerlogentry.statusdetails,
  count(timestamp) AS cnt
FROM
  partition_waf.requests
GROUP BY 
  jsonpayload_type_loadbalancerlogentry.statusdetails
ORDER BY 
  cnt DESC

集計

検知ルール一覧

SELECT
  jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.name,
  count(timestamp) AS cnt
FROM
  `no_partition.requests_202101*`
GROUP BY
  jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.name
ORDER BY 
  cnt DESC

severity, outcome, statusdetailの関係

SELECT
  severity,
  jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.outcome,
  jsonpayload_type_loadbalancerlogentry.statusdetails,
  count(timestamp) AS cnt
FROM
  partition_waf.requests
GROUP BY
  severity,
  jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.outcome,
  jsonpayload_type_loadbalancerlogentry.statusdetails	
ORDER BY
  severity,
  cnt DESC

時間ごとのルール検知数

SELECT
  SUBSTR(FORMAT_TIMESTAMP('%Y-%m-%d %H:%M:%S', timestamp, 'Asia/Tokyo'), 0, 10) AS time,
  jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.name,
  count(timestamp) AS cnt
FROM
  `no_partition.requests_202101*`
GROUP BY
  time,
  jsonpayload_type_loadbalancerlogentry.enforcedsecuritypolicy.name
ORDER BY
  time,
  cnt DESC

エラー

_PARTITIONTIME がない

パーティション分割テーブルとなる。

取り込み時間パーティション分割テーブルには _PARTITIONTIME という疑似列があるらしいが、ない。

分割フィールドを確認する。

今回は timestamp で

SELECT
  *
FROM
  partition_waf.requests
WHERE 
 timestamp = '2021-01-23'

スキャン量が日毎で分割されていることを確認。

参考

パーティション分割テーブルのクエリ

パーティション分割テーブルの概要