Kinesis Firehose で AWS WAF Log を Parquet 形式で S3 にエクスポートする

2020年6月28日

概要

やりたいこと

Athena のデータスキャン量が多い。パーティションを使っているのに。Redash がすぐ死ぬ。軽くしたい。

Athena のチューニングをしようと思い下記を確認。
Parquet フォーマットが良いらしい。

Parquet フォーマットは Kinesis Firehose で取り扱えるとのこと。

AWS::KinesisFirehose::DeliveryStream の「レコード形式を変換」を参考につくる。

Athena は列指向なので Parquet がいい

Athena が扱えるフォーマットには以下がある。

  • テキストフォーマット(例:CSVJSON
  • 行指向フォーマット(例:AVRO)
  • 列指向(カラムナ)フォーマット(例:Parquet、ORC)

この列方向にデータを分割して格納することを垂直パーティショニングと言う。さらに内部で行方向に一定の単位でメタ情報を保持することによりスキャン範囲を限定できる水平パーティショニングの機能も持っている。

詳しくは下のブログを読んでくれ!

CloudFormation で反映

AWS::KinesisFirehose::DeliveryStream を参考に AWS WAF 用に変換していく。

AWS WAF 用の Glue Table

Table 構造は AWS WAF のものにしておく。

rulegrouplist について、クラスメソッドの記事で提案されている TYPE のほうが詳細だが、自分の環境ではエラーが起きたため、公式に準拠した Table 定義にしておく。

Resources:
  GlueDatabase:
    Type: AWS::Glue::Database
    Properties: 
      CatalogId: !Ref AWS::AccountId
      DatabaseInput:
        Name: glue-db-datacatalog

  GlueTable:
    Type: AWS::Glue::Table
    Properties:
      CatalogId: !Ref AWS::AccountId
      DatabaseName: !Ref GlueDatabase
      TableInput:
        Name: glue-waf-datacatalog
        Owner: owner
        Retention: 0
        StorageDescriptor:
          Columns:
          - Name: timestamp
            Type: bigint
          - Name: formatversion
            Type: int
          - Name: webaclid
            Type: string
          - Name: terminatingruleid
            Type: string
          - Name: terminatingruletype
            Type: string
          - Name: action
            Type: string
          - Name: terminatingrulematchdetails
            Type: array<struct<conditiontype:string,location:string,matcheddata:array<string>>>
          - Name: httpsourcename
            Type: string
          - Name: httpsourceid
            Type: string
          - Name: rulegrouplist
            Type: array<string>
          - Name: ratebasedrulelist
            Type: array<struct<ratebasedruleid:string,limitkey:string,maxrateallowed:int>>
          - Name: nonterminatingmatchingrules
            Type: array<struct<ruleid:string,action:string>>
          - Name: httprequest
            Type: struct<clientIp:string,country:string,headers:array<struct<name:string,value:string>>,uri:string,args:string,httpVersion:string,httpMethod:string,requestId:string>
          InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
          OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
          Compressed: false
          NumberOfBuckets: -1
          SerdeInfo:
            SerializationLibrary: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
            Parameters:
              serialization.format: '1'
          BucketColumns: []
          SortColumns: []
          StoredAsSubDirectories: false
        PartitionKeys:
        - Name: year
          Type: string
        - Name: month
          Type: string
        - Name: day
          Type: string
        - Name: hour
          Type: string
        TableType: EXTERNAL_TABLE

Firehose 用の IAM Role

公式より、データ形式変換のために AWS Glue に Kinesis Data Firehose アクセス権を付与する必要がある。

              - 
                Effect: Allow
                Action:
                  - glue:GetTable
                  - glue:GetTableVersion
                  - glue:GetTableVersions
                Resource: "*"

全体像は下記。

  FirehoseRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: FirehoseParquetRole
      Path: '/'
      AssumeRolePolicyDocument: # Kinesis FirehoseにIAM Roleを割り当てるため
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - firehose.amazonaws.com
            Action:
              - sts:AssumeRole
            Condition:
              StringEquals:
                sts:ExternalId: !Sub ${AWS::AccountId}
      Policies:
        - 
         PolicyName: firehose_delivery_role
         PolicyDocument:
           Version: "2012-10-17"
           Statement:
              - 
                Effect: Allow
                Action:
                  - glue:GetTable
                  - glue:GetTableVersion
                  - glue:GetTableVersions
                Resource: "*"
              -
                Effect: "Allow"
                Action: 
                  - s3:AbortMultipartUpload
                  - s3:GetBucketLocation
                  - s3:GetObject
                  - s3:ListBucket
                  - s3:ListBucketMultipartUploads
                  - s3:PutObject
                Resource:
                  - !Sub arn:aws:s3:::${BucketName}
                  - !Sub arn:aws:s3:::${BucketName}/*
              -
                Effect: "Allow"
                Action: 
                  - kinesis:DescribeStream
                  - kinesis:GetShardIterator
                  - kinesis:GetRecords
                  - kinesis:ListShards
                Resource:
                  - !Sub arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/%FIREHOSE_STREAM_NAME%
              -
                Effect: "Allow"
                Action: 
                  - kms:GenerateDataKey
                  - kms:Decrypt
                Resource:
                  - !Sub arn:aws:kms:${AWS::Region}:${AWS::AccountId}:alias/aws/s3
                Condition:
                  StringEquals:
                    kms:ViaService: !Sub s3.${AWS::Region}.amazonaws.com
                  StringLike:
                    kms:EncryptionContext:aws:s3:arn:
                      - !Sub arn:aws:s3:::${BucketName}/*
                      - !Sub arn:aws:s3:::${BucketName}/%FIREHOSE_BUCKET_PREFIX%*
              -
                Effect: "Allow"
                Action: 
                  - logs:PutLogEvents
                Resource:
                  - !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/kinesisfirehose/aws-waf-logs-${AWS::Region}-${WebACLName}:log-stream:*
              -
                Effect: "Allow"
                Action: 
                  - lambda:InvokeFunction
                  - lambda:GetFunctionConfiguration
                Resource:
                  - !Sub arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:%FIREHOSE_DEFAULT_FUNCTION%:%FIREHOSE_DEFAULT_VERSION%
              -
                Effect: "Allow"
                Action: 
                  - kms:Decrypt
                Resource:
                  - !Sub arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/%SSE_KEY_ID%
                Condition:
                  StringEquals:
                    kms:ViaService: kinesis.%REGION_NAME%.amazonaws.com
                  StringLike:
                    kms:EncryptionContext:aws:kinesis:arn: !Sub arn:aws:kinesis:%REGION_NAME%:${AWS::AccountId}:stream/%FIREHOSE_STREAM_NAME%

Kinesis Firehose

ExtendedS3DestinationConfiguration を利用し、DataFormatConversionConfiguration で OutputFormatConfiguration で Parquet を指定する。

  
  FirehoseDeliveryStream:
    Type: "AWS::KinesisFirehose::DeliveryStream"
    Properties:
      DeliveryStreamName: !Sub aws-waf-logs-parquet
      DeliveryStreamType: DirectPut
      ExtendedS3DestinationConfiguration:
        RoleARN: !GetAtt FirehoseRole.Arn
        BucketARN: !Sub arn:aws:s3:::${BucketName}
        Prefix: !Join 
          - ''
          - - "WAFLOG"
            -  '/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/'
        ErrorOutputPrefix: !Join 
          - ''
          - - "ERRORLOG"
            -  '/!{firehose:error-output-type}/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/'
        BufferingHints:
          SizeInMBs: 128
          IntervalInSeconds: 300
        CompressionFormat: UNCOMPRESSED
        EncryptionConfiguration:
          NoEncryptionConfig: NoEncryption
        CloudWatchLoggingOptions:
          Enabled: true
          LogGroupName: !Sub '/aws/kinesisfirehose/aws-waf-logs-${AWS::StackName}'
          LogStreamName: S3Delivery
        DataFormatConversionConfiguration:
          SchemaConfiguration:
            CatalogId: !Ref AWS::AccountId
            RoleARN: !GetAtt FirehoseRole.Arn
            DatabaseName: !Ref GlueDatabase
            TableName: glue-table-waflog #!Ref GlueTable
            Region: !Ref AWS::Region
            VersionId: LATEST
          InputFormatConfiguration:
            Deserializer:
              OpenXJsonSerDe: {}
          OutputFormatConfiguration:
            Serializer:
              ParquetSerDe: {}
          Enabled: True

Kinesis Firehose の Conver record format の項目が下記のようになる。

S3 にエクスポートされたログを確認すると parquet になっている。

S3 Select からファイル形式を Parquet でファイルプレビューをすると json が表示されること。

参考

AWS::KinesisFirehose::DeliveryStream

AWS::Glue::Table TableInput

カラムナフォーマットのきほん 〜データウェアハウスを支える技術〜