概要
やりたいこと
Athena のデータスキャン量が多い。パーティションを使っているのに。Redash がすぐ死ぬ。軽くしたい。
Athena のチューニングをしようと思い下記を確認。
Parquet フォーマットが良いらしい。
Parquet フォーマットは Kinesis Firehose で取り扱えるとのこと。
AWS::KinesisFirehose::DeliveryStream の「レコード形式を変換」を参考につくる。
Athena は列指向なので Parquet がいい
Athena が扱えるフォーマットには以下がある。
この列方向にデータを分割して格納することを垂直パーティショニングと言う。さらに内部で行方向に一定の単位でメタ情報を保持することによりスキャン範囲を限定できる水平パーティショニングの機能も持っている。
詳しくは下のブログを読んでくれ!
CloudFormation で反映
AWS::KinesisFirehose::DeliveryStream を参考に AWS WAF 用に変換していく。
AWS WAF 用の Glue Table
Table 構造は AWS WAF のものにしておく。
rulegrouplist について、クラスメソッドの記事で提案されている TYPE のほうが詳細だが、自分の環境ではエラーが起きたため、公式に準拠した Table 定義にしておく。
Resources: GlueDatabase: Type: AWS::Glue::Database Properties: CatalogId: !Ref AWS::AccountId DatabaseInput: Name: glue-db-datacatalog GlueTable: Type: AWS::Glue::Table Properties: CatalogId: !Ref AWS::AccountId DatabaseName: !Ref GlueDatabase TableInput: Name: glue-waf-datacatalog Owner: owner Retention: 0 StorageDescriptor: Columns: - Name: timestamp Type: bigint - Name: formatversion Type: int - Name: webaclid Type: string - Name: terminatingruleid Type: string - Name: terminatingruletype Type: string - Name: action Type: string - Name: terminatingrulematchdetails Type: array<struct<conditiontype:string,location:string,matcheddata:array<string>>> - Name: httpsourcename Type: string - Name: httpsourceid Type: string - Name: rulegrouplist Type: array<string> - Name: ratebasedrulelist Type: array<struct<ratebasedruleid:string,limitkey:string,maxrateallowed:int>> - Name: nonterminatingmatchingrules Type: array<struct<ruleid:string,action:string>> - Name: httprequest Type: struct<clientIp:string,country:string,headers:array<struct<name:string,value:string>>,uri:string,args:string,httpVersion:string,httpMethod:string,requestId:string> InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat Compressed: false NumberOfBuckets: -1 SerdeInfo: SerializationLibrary: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe Parameters: serialization.format: '1' BucketColumns: [] SortColumns: [] StoredAsSubDirectories: false PartitionKeys: - Name: year Type: string - Name: month Type: string - Name: day Type: string - Name: hour Type: string TableType: EXTERNAL_TABLE |
Firehose 用の IAM Role
公式より、データ形式変換のために AWS Glue に Kinesis Data Firehose アクセス権を付与する必要がある。
- Effect: Allow Action: - glue:GetTable - glue:GetTableVersion - glue:GetTableVersions Resource: "*" |
全体像は下記。
FirehoseRole: Type: AWS::IAM::Role Properties: RoleName: FirehoseParquetRole Path: '/' AssumeRolePolicyDocument: # Kinesis FirehoseにIAM Roleを割り当てるため Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - firehose.amazonaws.com Action: - sts:AssumeRole Condition: StringEquals: sts:ExternalId: !Sub ${AWS::AccountId} Policies: - PolicyName: firehose_delivery_role PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - glue:GetTable - glue:GetTableVersion - glue:GetTableVersions Resource: "*" - Effect: "Allow" Action: - s3:AbortMultipartUpload - s3:GetBucketLocation - s3:GetObject - s3:ListBucket - s3:ListBucketMultipartUploads - s3:PutObject Resource: - !Sub arn:aws:s3:::${BucketName} - !Sub arn:aws:s3:::${BucketName}/* - Effect: "Allow" Action: - kinesis:DescribeStream - kinesis:GetShardIterator - kinesis:GetRecords - kinesis:ListShards Resource: - !Sub arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/%FIREHOSE_STREAM_NAME% - Effect: "Allow" Action: - kms:GenerateDataKey - kms:Decrypt Resource: - !Sub arn:aws:kms:${AWS::Region}:${AWS::AccountId}:alias/aws/s3 Condition: StringEquals: kms:ViaService: !Sub s3.${AWS::Region}.amazonaws.com StringLike: kms:EncryptionContext:aws:s3:arn: - !Sub arn:aws:s3:::${BucketName}/* - !Sub arn:aws:s3:::${BucketName}/%FIREHOSE_BUCKET_PREFIX%* - Effect: "Allow" Action: - logs:PutLogEvents Resource: - !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/kinesisfirehose/aws-waf-logs-${AWS::Region}-${WebACLName}:log-stream:* - Effect: "Allow" Action: - lambda:InvokeFunction - lambda:GetFunctionConfiguration Resource: - !Sub arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:%FIREHOSE_DEFAULT_FUNCTION%:%FIREHOSE_DEFAULT_VERSION% - Effect: "Allow" Action: - kms:Decrypt Resource: - !Sub arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/%SSE_KEY_ID% Condition: StringEquals: kms:ViaService: kinesis.%REGION_NAME%.amazonaws.com StringLike: kms:EncryptionContext:aws:kinesis:arn: !Sub arn:aws:kinesis:%REGION_NAME%:${AWS::AccountId}:stream/%FIREHOSE_STREAM_NAME% |
Kinesis Firehose
ExtendedS3DestinationConfiguration を利用し、DataFormatConversionConfiguration で OutputFormatConfiguration で Parquet を指定する。
FirehoseDeliveryStream: Type: "AWS::KinesisFirehose::DeliveryStream" Properties: DeliveryStreamName: !Sub aws-waf-logs-parquet DeliveryStreamType: DirectPut ExtendedS3DestinationConfiguration: RoleARN: !GetAtt FirehoseRole.Arn BucketARN: !Sub arn:aws:s3:::${BucketName} Prefix: !Join - '' - - "WAFLOG" - '/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/' ErrorOutputPrefix: !Join - '' - - "ERRORLOG" - '/!{firehose:error-output-type}/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/' BufferingHints: SizeInMBs: 128 IntervalInSeconds: 300 CompressionFormat: UNCOMPRESSED EncryptionConfiguration: NoEncryptionConfig: NoEncryption CloudWatchLoggingOptions: Enabled: true LogGroupName: !Sub '/aws/kinesisfirehose/aws-waf-logs-${AWS::StackName}' LogStreamName: S3Delivery DataFormatConversionConfiguration: SchemaConfiguration: CatalogId: !Ref AWS::AccountId RoleARN: !GetAtt FirehoseRole.Arn DatabaseName: !Ref GlueDatabase TableName: glue-table-waflog #!Ref GlueTable Region: !Ref AWS::Region VersionId: LATEST InputFormatConfiguration: Deserializer: OpenXJsonSerDe: {} OutputFormatConfiguration: Serializer: ParquetSerDe: {} Enabled: True |
Kinesis Firehose の Conver record format の項目が下記のようになる。

S3 にエクスポートされたログを確認すると parquet になっている。

S3 Select からファイル形式を Parquet でファイルプレビューをすると json が表示されること。


コメント