概要
やりたいこと
Athena のデータスキャン量が多い。パーティションを使っているのに。Redash がすぐ死ぬ。軽くしたい。
Athena のチューニングをしようと思い下記を確認。
Parquet フォーマットが良いらしい。
Parquet フォーマットは Kinesis Firehose で取り扱えるとのこと。
AWS::KinesisFirehose::DeliveryStream の「レコード形式を変換」を参考につくる。
Athena は列指向なので Parquet がいい
Athena が扱えるフォーマットには以下がある。
この列方向にデータを分割して格納することを垂直パーティショニングと言う。さらに内部で行方向に一定の単位でメタ情報を保持することによりスキャン範囲を限定できる水平パーティショニングの機能も持っている。
詳しくは下のブログを読んでくれ!
CloudFormation で反映
AWS::KinesisFirehose::DeliveryStream を参考に AWS WAF 用に変換していく。
AWS WAF 用の Glue Table
Table 構造は AWS WAF のものにしておく。
rulegrouplist について、クラスメソッドの記事で提案されている TYPE のほうが詳細だが、自分の環境ではエラーが起きたため、公式に準拠した Table 定義にしておく。
Resources:
GlueDatabase:
Type: AWS::Glue::Database
Properties:
CatalogId: !Ref AWS::AccountId
DatabaseInput:
Name: glue-db-datacatalog
GlueTable:
Type: AWS::Glue::Table
Properties:
CatalogId: !Ref AWS::AccountId
DatabaseName: !Ref GlueDatabase
TableInput:
Name: glue-waf-datacatalog
Owner: owner
Retention: 0
StorageDescriptor:
Columns:
- Name: timestamp
Type: bigint
- Name: formatversion
Type: int
- Name: webaclid
Type: string
- Name: terminatingruleid
Type: string
- Name: terminatingruletype
Type: string
- Name: action
Type: string
- Name: terminatingrulematchdetails
Type: array<struct<conditiontype:string,location:string,matcheddata:array<string>>>
- Name: httpsourcename
Type: string
- Name: httpsourceid
Type: string
- Name: rulegrouplist
Type: array<string>
- Name: ratebasedrulelist
Type: array<struct<ratebasedruleid:string,limitkey:string,maxrateallowed:int>>
- Name: nonterminatingmatchingrules
Type: array<struct<ruleid:string,action:string>>
- Name: httprequest
Type: struct<clientIp:string,country:string,headers:array<struct<name:string,value:string>>,uri:string,args:string,httpVersion:string,httpMethod:string,requestId:string>
InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
Compressed: false
NumberOfBuckets: -1
SerdeInfo:
SerializationLibrary: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
Parameters:
serialization.format: '1'
BucketColumns: []
SortColumns: []
StoredAsSubDirectories: false
PartitionKeys:
- Name: year
Type: string
- Name: month
Type: string
- Name: day
Type: string
- Name: hour
Type: string
TableType: EXTERNAL_TABLE
Firehose 用の IAM Role
公式より、データ形式変換のために AWS Glue に Kinesis Data Firehose アクセス権を付与する必要がある。
-
Effect: Allow
Action:
- glue:GetTable
- glue:GetTableVersion
- glue:GetTableVersions
Resource: "*"
全体像は下記。
FirehoseRole:
Type: AWS::IAM::Role
Properties:
RoleName: FirehoseParquetRole
Path: '/'
AssumeRolePolicyDocument: # Kinesis FirehoseにIAM Roleを割り当てるため
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- firehose.amazonaws.com
Action:
- sts:AssumeRole
Condition:
StringEquals:
sts:ExternalId: !Sub ${AWS::AccountId}
Policies:
-
PolicyName: firehose_delivery_role
PolicyDocument:
Version: "2012-10-17"
Statement:
-
Effect: Allow
Action:
- glue:GetTable
- glue:GetTableVersion
- glue:GetTableVersions
Resource: "*"
-
Effect: "Allow"
Action:
- s3:AbortMultipartUpload
- s3:GetBucketLocation
- s3:GetObject
- s3:ListBucket
- s3:ListBucketMultipartUploads
- s3:PutObject
Resource:
- !Sub arn:aws:s3:::${BucketName}
- !Sub arn:aws:s3:::${BucketName}/*
-
Effect: "Allow"
Action:
- kinesis:DescribeStream
- kinesis:GetShardIterator
- kinesis:GetRecords
- kinesis:ListShards
Resource:
- !Sub arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/%FIREHOSE_STREAM_NAME%
-
Effect: "Allow"
Action:
- kms:GenerateDataKey
- kms:Decrypt
Resource:
- !Sub arn:aws:kms:${AWS::Region}:${AWS::AccountId}:alias/aws/s3
Condition:
StringEquals:
kms:ViaService: !Sub s3.${AWS::Region}.amazonaws.com
StringLike:
kms:EncryptionContext:aws:s3:arn:
- !Sub arn:aws:s3:::${BucketName}/*
- !Sub arn:aws:s3:::${BucketName}/%FIREHOSE_BUCKET_PREFIX%*
-
Effect: "Allow"
Action:
- logs:PutLogEvents
Resource:
- !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/kinesisfirehose/aws-waf-logs-${AWS::Region}-${WebACLName}:log-stream:*
-
Effect: "Allow"
Action:
- lambda:InvokeFunction
- lambda:GetFunctionConfiguration
Resource:
- !Sub arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:%FIREHOSE_DEFAULT_FUNCTION%:%FIREHOSE_DEFAULT_VERSION%
-
Effect: "Allow"
Action:
- kms:Decrypt
Resource:
- !Sub arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/%SSE_KEY_ID%
Condition:
StringEquals:
kms:ViaService: kinesis.%REGION_NAME%.amazonaws.com
StringLike:
kms:EncryptionContext:aws:kinesis:arn: !Sub arn:aws:kinesis:%REGION_NAME%:${AWS::AccountId}:stream/%FIREHOSE_STREAM_NAME%
Kinesis Firehose
ExtendedS3DestinationConfiguration を利用し、DataFormatConversionConfiguration で OutputFormatConfiguration で Parquet を指定する。
FirehoseDeliveryStream:
Type: "AWS::KinesisFirehose::DeliveryStream"
Properties:
DeliveryStreamName: !Sub aws-waf-logs-parquet
DeliveryStreamType: DirectPut
ExtendedS3DestinationConfiguration:
RoleARN: !GetAtt FirehoseRole.Arn
BucketARN: !Sub arn:aws:s3:::${BucketName}
Prefix: !Join
- ''
- - "WAFLOG"
- '/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/'
ErrorOutputPrefix: !Join
- ''
- - "ERRORLOG"
- '/!{firehose:error-output-type}/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/'
BufferingHints:
SizeInMBs: 128
IntervalInSeconds: 300
CompressionFormat: UNCOMPRESSED
EncryptionConfiguration:
NoEncryptionConfig: NoEncryption
CloudWatchLoggingOptions:
Enabled: true
LogGroupName: !Sub '/aws/kinesisfirehose/aws-waf-logs-${AWS::StackName}'
LogStreamName: S3Delivery
DataFormatConversionConfiguration:
SchemaConfiguration:
CatalogId: !Ref AWS::AccountId
RoleARN: !GetAtt FirehoseRole.Arn
DatabaseName: !Ref GlueDatabase
TableName: glue-table-waflog #!Ref GlueTable
Region: !Ref AWS::Region
VersionId: LATEST
InputFormatConfiguration:
Deserializer:
OpenXJsonSerDe: {}
OutputFormatConfiguration:
Serializer:
ParquetSerDe: {}
Enabled: True
Kinesis Firehose の Conver record format の項目が下記のようになる。
S3 にエクスポートされたログを確認すると parquet になっている。
S3 Select からファイル形式を Parquet でファイルプレビューをすると json が表示されること。
コメント