Kinesis Firehose で AWS WAF Log を Parquet 形式で S3 にエクスポートする

概要

やりたいこと

Athena のデータスキャン量が多い。パーティションを使っているのに。Redash がすぐ死ぬ。軽くしたい。

Athena のチューニングをしようと思い下記を確認。
Parquet フォーマットが良いらしい。

Parquet フォーマットは Kinesis Firehose で取り扱えるとのこと。

AWS::KinesisFirehose::DeliveryStream の「レコード形式を変換」を参考につくる。

Athena は列指向なので Parquet がいい

Athena が扱えるフォーマットには以下がある。

  • テキストフォーマット(例:CSVJSON
  • 行指向フォーマット(例:AVRO)
  • 列指向(カラムナ)フォーマット(例:Parquet、ORC)

この列方向にデータを分割して格納することを垂直パーティショニングと言う。さらに内部で行方向に一定の単位でメタ情報を保持することによりスキャン範囲を限定できる水平パーティショニングの機能も持っている。

詳しくは下のブログを読んでくれ!

CloudFormation で反映

AWS::KinesisFirehose::DeliveryStream を参考に AWS WAF 用に変換していく。

AWS WAF 用の Glue Table

Table 構造は AWS WAF のものにしておく。

rulegrouplist について、クラスメソッドの記事で提案されている TYPE のほうが詳細だが、自分の環境ではエラーが起きたため、公式に準拠した Table 定義にしておく。

Resources:
  GlueDatabase:
    Type: AWS::Glue::Database
    Properties:
      CatalogId: !Ref AWS::AccountId
      DatabaseInput:
        Name: glue-db-datacatalog
 
  GlueTable:
    Type: AWS::Glue::Table
    Properties:
      CatalogId: !Ref AWS::AccountId
      DatabaseName: !Ref GlueDatabase
      TableInput:
        Name: glue-waf-datacatalog
        Owner: owner
        Retention: 0
        StorageDescriptor:
          Columns:
          - Name: timestamp
            Type: bigint
          - Name: formatversion
            Type: int
          - Name: webaclid
            Type: string
          - Name: terminatingruleid
            Type: string
          - Name: terminatingruletype
            Type: string
          - Name: action
            Type: string
          - Name: terminatingrulematchdetails
            Type: array<struct<conditiontype:string,location:string,matcheddata:array<string>>>
          - Name: httpsourcename
            Type: string
          - Name: httpsourceid
            Type: string
          - Name: rulegrouplist
            Type: array<string>
          - Name: ratebasedrulelist
            Type: array<struct<ratebasedruleid:string,limitkey:string,maxrateallowed:int>>
          - Name: nonterminatingmatchingrules
            Type: array<struct<ruleid:string,action:string>>
          - Name: httprequest
            Type: struct<clientIp:string,country:string,headers:array<struct<name:string,value:string>>,uri:string,args:string,httpVersion:string,httpMethod:string,requestId:string>
          InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
          OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
          Compressed: false
          NumberOfBuckets: -1
          SerdeInfo:
            SerializationLibrary: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
            Parameters:
              serialization.format: '1'
          BucketColumns: []
          SortColumns: []
          StoredAsSubDirectories: false
        PartitionKeys:
        - Name: year
          Type: string
        - Name: month
          Type: string
        - Name: day
          Type: string
        - Name: hour
          Type: string
        TableType: EXTERNAL_TABLE

Firehose 用の IAM Role

公式より、データ形式変換のために AWS Glue に Kinesis Data Firehose アクセス権を付与する必要がある。

-
  Effect: Allow
  Action:
    - glue:GetTable
    - glue:GetTableVersion
    - glue:GetTableVersions
  Resource: "*"

全体像は下記。

FirehoseRole:
  Type: AWS::IAM::Role
  Properties:
    RoleName: FirehoseParquetRole
    Path: '/'
    AssumeRolePolicyDocument: # Kinesis FirehoseにIAM Roleを割り当てるため
      Version: 2012-10-17
      Statement:
        - Effect: Allow
          Principal:
            Service:
              - firehose.amazonaws.com
          Action:
            - sts:AssumeRole
          Condition:
            StringEquals:
              sts:ExternalId: !Sub ${AWS::AccountId}
    Policies:
      -
       PolicyName: firehose_delivery_role
       PolicyDocument:
         Version: "2012-10-17"
         Statement:
            -
              Effect: Allow
              Action:
                - glue:GetTable
                - glue:GetTableVersion
                - glue:GetTableVersions
              Resource: "*"
            -
              Effect: "Allow"
              Action:
                - s3:AbortMultipartUpload
                - s3:GetBucketLocation
                - s3:GetObject
                - s3:ListBucket
                - s3:ListBucketMultipartUploads
                - s3:PutObject
              Resource:
                - !Sub arn:aws:s3:::${BucketName}
                - !Sub arn:aws:s3:::${BucketName}/*
            -
              Effect: "Allow"
              Action:
                - kinesis:DescribeStream
                - kinesis:GetShardIterator
                - kinesis:GetRecords
                - kinesis:ListShards
              Resource:
                - !Sub arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/%FIREHOSE_STREAM_NAME%
            -
              Effect: "Allow"
              Action:
                - kms:GenerateDataKey
                - kms:Decrypt
              Resource:
                - !Sub arn:aws:kms:${AWS::Region}:${AWS::AccountId}:alias/aws/s3
              Condition:
                StringEquals:
                  kms:ViaService: !Sub s3.${AWS::Region}.amazonaws.com
                StringLike:
                  kms:EncryptionContext:aws:s3:arn:
                    - !Sub arn:aws:s3:::${BucketName}/*
                    - !Sub arn:aws:s3:::${BucketName}/%FIREHOSE_BUCKET_PREFIX%*
            -
              Effect: "Allow"
              Action:
                - logs:PutLogEvents
              Resource:
                - !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/kinesisfirehose/aws-waf-logs-${AWS::Region}-${WebACLName}:log-stream:*
            -
              Effect: "Allow"
              Action:
                - lambda:InvokeFunction
                - lambda:GetFunctionConfiguration
              Resource:
                - !Sub arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:%FIREHOSE_DEFAULT_FUNCTION%:%FIREHOSE_DEFAULT_VERSION%
            -
              Effect: "Allow"
              Action:
                - kms:Decrypt
              Resource:
                - !Sub arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/%SSE_KEY_ID%
              Condition:
                StringEquals:
                  kms:ViaService: kinesis.%REGION_NAME%.amazonaws.com
                StringLike:
                  kms:EncryptionContext:aws:kinesis:arn: !Sub arn:aws:kinesis:%REGION_NAME%:${AWS::AccountId}:stream/%FIREHOSE_STREAM_NAME%

Kinesis Firehose

ExtendedS3DestinationConfiguration を利用し、DataFormatConversionConfiguration で OutputFormatConfiguration で Parquet を指定する。

FirehoseDeliveryStream:
  Type: "AWS::KinesisFirehose::DeliveryStream"
  Properties:
    DeliveryStreamName: !Sub aws-waf-logs-parquet
    DeliveryStreamType: DirectPut
    ExtendedS3DestinationConfiguration:
      RoleARN: !GetAtt FirehoseRole.Arn
      BucketARN: !Sub arn:aws:s3:::${BucketName}
      Prefix: !Join
        - ''
        - - "WAFLOG"
          -  '/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/'
      ErrorOutputPrefix: !Join
        - ''
        - - "ERRORLOG"
          -  '/!{firehose:error-output-type}/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/'
      BufferingHints:
        SizeInMBs: 128
        IntervalInSeconds: 300
      CompressionFormat: UNCOMPRESSED
      EncryptionConfiguration:
        NoEncryptionConfig: NoEncryption
      CloudWatchLoggingOptions:
        Enabled: true
        LogGroupName: !Sub '/aws/kinesisfirehose/aws-waf-logs-${AWS::StackName}'
        LogStreamName: S3Delivery
      DataFormatConversionConfiguration:
        SchemaConfiguration:
          CatalogId: !Ref AWS::AccountId
          RoleARN: !GetAtt FirehoseRole.Arn
          DatabaseName: !Ref GlueDatabase
          TableName: glue-table-waflog #!Ref GlueTable
          Region: !Ref AWS::Region
          VersionId: LATEST
        InputFormatConfiguration:
          Deserializer:
            OpenXJsonSerDe: {}
        OutputFormatConfiguration:
          Serializer:
            ParquetSerDe: {}
        Enabled: True

Kinesis Firehose の Conver record format の項目が下記のようになる。

S3 にエクスポートされたログを確認すると parquet になっている。

S3 Select からファイル形式を Parquet でファイルプレビューをすると json が表示されること。

参考

AWS::KinesisFirehose::DeliveryStream

AWS::Glue::Table TableInput

カラムナフォーマットのきほん 〜データウェアハウスを支える技術〜

コメント

タイトルとURLをコピーしました