# AI Prompt: "Create Pulumi Python program for data platform infrastructure"
""" Data Platform Infrastructure with Pulumi Python """
import pulumi_kubernetes as k8s
from pulumi import Config, Output, export
environment = config. require ( " environment " )
region = config. get ( " region " ) or " us-east-1 "
vpc_cidr = config. get ( " vpcCidr " ) or " 10.0.0.0/16 "
" Environment " : environment,
" Project " : pulumi. get_project (),
" Stack " : pulumi. get_stack (),
enable_dns_hostnames = True ,
tags = { ** tags, " Name " : f " {environment} -vpc" } ,
# Create Internet Gateway
igw = aws.ec2. InternetGateway (
tags = { ** tags, " Name " : f " {environment} -igw" } ,
azs = aws. get_availability_zones ( state = " available " )
public_subnet = aws.ec2. Subnet (
f " {environment} -public- {i + 1 } " ,
cidr_block = f "10.0. {i + 1 } .0/24" ,
availability_zone = azs.names [ i ] ,
map_public_ip_on_launch = True ,
" Name " : f " {environment} -public- {i + 1 } " ,
" kubernetes.io/role/elb " : " 1 " ,
public_subnets. append ( public_subnet )
private_subnet = aws.ec2. Subnet (
f " {environment} -private- {i + 1 } " ,
cidr_block = f "10.0. {i + 11 } .0/24" ,
availability_zone = azs.names [ i ] ,
" Name " : f " {environment} -private- {i + 1 } " ,
" kubernetes.io/role/internal-elb " : " 1 " ,
private_subnets. append ( private_subnet )
database_subnet = aws.ec2. Subnet (
f " {environment} -database- {i + 1 } " ,
cidr_block = f "10.0. {i + 21 } .0/24" ,
availability_zone = azs.names [ i ] ,
" Name " : f " {environment} -database- {i + 1 } " ,
database_subnets. append ( database_subnet )
f " {environment} -nat-eip" ,
tags = { ** tags, " Name " : f " {environment} -nat-eip" } ,
nat_gateway = aws.ec2. NatGateway (
subnet_id = public_subnets [ 0 ] .id ,
tags = { ** tags, " Name " : f " {environment} -nat" } ,
public_route_table = aws.ec2. RouteTable (
f " {environment} -public-rt" ,
tags = { ** tags, " Name " : f " {environment} -public-rt" } ,
public_route = aws.ec2. Route (
f " {environment} -public-route" ,
route_table_id = public_route_table.id ,
destination_cidr_block = " 0.0.0.0/0 " ,
# Associate public subnets with public route table
for i, subnet in enumerate ( public_subnets ):
aws.ec2. RouteTableAssociation (
f " {environment} -public-rta- {i + 1 } " ,
route_table_id = public_route_table.id ,
private_route_table = aws.ec2. RouteTable (
f " {environment} -private-rt" ,
tags = { ** tags, " Name " : f " {environment} -private-rt" } ,
private_route = aws.ec2. Route (
f " {environment} -private-route" ,
route_table_id = private_route_table.id ,
destination_cidr_block = " 0.0.0.0/0 " ,
nat_gateway_id = nat_gateway.id ,
# Associate private subnets with private route table
for i, subnet in enumerate ( private_subnets ):
aws.ec2. RouteTableAssociation (
f " {environment} -private-rta- {i + 1 } " ,
route_table_id = private_route_table.id ,
alb_sg = aws.ec2. SecurityGroup (
description = " Security group for Application Load Balancer " ,
" cidr_blocks " : [ " 0.0.0.0/0 " ] ,
" cidr_blocks " : [ " 0.0.0.0/0 " ] ,
" cidr_blocks " : [ " 0.0.0.0/0 " ] ,
tags = { ** tags, " Name " : f " {environment} -alb-sg" } ,
app_sg = aws.ec2. SecurityGroup (
description = " Security group for application servers " ,
" security_groups " : [ alb_sg.id ] ,
" cidr_blocks " : [ " 0.0.0.0/0 " ] ,
tags = { ** tags, " Name " : f " {environment} -app-sg" } ,
db_sg = aws.ec2. SecurityGroup (
description = " Security group for database " ,
" security_groups " : [ app_sg.id ] ,
" cidr_blocks " : [ " 0.0.0.0/0 " ] ,
tags = { ** tags, " Name " : f " {environment} -db-sg" } ,
db_subnet_group = aws.rds. SubnetGroup (
f " {environment} -db-subnet-group" ,
subnet_ids = [ subnet.id for subnet in database_subnets ],
tags = { ** tags, " Name " : f " {environment} -db-subnet-group" } ,
database = aws.rds. Instance (
f " {environment} -database" ,
instance_class = " db.t3.medium " ,
password = config. require_secret ( " dbPassword " ) ,
vpc_security_group_ids = [ db_sg.id ],
db_subnet_group_name = db_subnet_group.name ,
backup_retention_period = 7 ,
backup_window = " 03:00-04:00 " ,
maintenance_window = " sun:04:00-sun:05:00 " ,
skip_final_snapshot = False ,
final_snapshot_identifier = f " {environment} -db-final-snapshot" ,
tags = { ** tags, " Name " : f " {environment} -database" } ,
# S3 Buckets for Data Lake
data_lake_bucket = aws.s3. BucketV2 (
f " {environment} -data-lake" ,
bucket = f " {environment} -data-lake- {pulumi. get_stack () } " ,
aws.s3. BucketVersioningV2 (
f " {environment} -data-lake-versioning" ,
bucket = data_lake_bucket.id ,
versioning_configuration = {
aws.s3. BucketServerSideEncryptionConfigurationV2 (
f " {environment} -data-lake-encryption" ,
bucket = data_lake_bucket.id ,
" apply_server_side_encryption_by_default " : {
" sse_algorithm " : " AES256 " ,
# Lifecycle rules for data lake
aws.s3. BucketLifecycleConfigurationV2 (
f " {environment} -data-lake-lifecycle" ,
bucket = data_lake_bucket.id ,
" id " : " archive-old-data " ,
" storage_class " : " STANDARD_IA " ,
" storage_class " : " GLACIER " ,
" id " : " delete-temp-data " ,
# EMR Cluster for Big Data Processing
f " {environment} -emr-role" ,
assume_role_policy = json. dumps ( {
" Service " : " elasticmapreduce.amazonaws.com " ,
" Action " : " sts:AssumeRole " ,
aws.iam. RolePolicyAttachment (
f " {environment} -emr-policy" ,
policy_arn = " arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceRole " ,
emr_ec2_role = aws.iam. Role (
f " {environment} -emr-ec2-role" ,
assume_role_policy = json. dumps ( {
" Service " : " ec2.amazonaws.com " ,
" Action " : " sts:AssumeRole " ,
aws.iam. RolePolicyAttachment (
f " {environment} -emr-ec2-policy" ,
policy_arn = " arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceforEC2Role " ,
emr_ec2_instance_profile = aws.iam. InstanceProfile (
f " {environment} -emr-ec2-profile" ,
emr_cluster = aws.emr. Cluster (
release_label = " emr-6.15.0 " ,
applications = [ " Spark " , " Hadoop " , " Hive " , " JupyterHub " ],
termination_protection = False ,
keep_job_flow_alive_when_no_steps = True ,
scale_down_behavior = " TERMINATE_AT_TASK_COMPLETION " ,
service_role = emr_role.arn ,
" subnet_id " : private_subnets [ 0 ] .id,
" emr_managed_master_security_group " : app_sg.id,
" emr_managed_slave_security_group " : app_sg.id,
" instance_profile " : emr_ec2_instance_profile.arn,
" key_name " : config. get ( " keyPairName " ) ,
" instance_type " : " m5.xlarge " ,
" instance_type " : " m5.xlarge " ,
configurations = json. dumps ( [
" Classification " : " spark-defaults " ,
" spark.dynamicAllocation.enabled " : " true " ,
" spark.executor.memory " : " 4g " ,
" spark.executor.cores " : " 2 " ,
log_uri = Output. concat ( " s3:// " , data_lake_bucket.bucket , " /emr-logs/ " ) ,
tags = { ** tags, " Name " : f " {environment} -emr" } ,
# Lambda Functions for Data Processing
lambda_role = aws.iam. Role (
f " {environment} -lambda-role" ,
assume_role_policy = json. dumps ( {
" Service " : " lambda.amazonaws.com " ,
" Action " : " sts:AssumeRole " ,
# Attach policies to Lambda role
aws.iam. RolePolicyAttachment (
f " {environment} -lambda-basic" ,
policy_arn = " arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole " ,
# Lambda policy for S3 and DynamoDB access
lambda_policy = aws.iam. RolePolicy (
f " {environment} -lambda-policy" ,
policy = Output. all ( data_lake_bucket.arn ) . apply (
lambda args : json. dumps ( {
" Resource " : f " {args [ 0 ] } /*" ,
data_processor = aws.lambda_. Function (
f " {environment} -data-processor" ,
" ENVIRONMENT " : environment,
" DATA_BUCKET " : data_lake_bucket.bucket,
code = pulumi. AssetArchive ( {
" . " : pulumi. FileArchive ( " ./lambda/data-processor " ) ,
tags = { ** tags, " Name " : f " {environment} -data-processor" } ,
# EventBridge Rule for scheduled processing
processing_schedule = aws.cloudwatch. EventRule (
f " {environment} -processing-schedule" ,
schedule_expression = " rate(1 hour) " ,
aws.cloudwatch. EventTarget (
f " {environment} -processing-target" ,
rule = processing_schedule.name ,
f " {environment} -processing-permission" ,
action = " lambda:InvokeFunction " ,
function = data_processor.name ,
principal = " events.amazonaws.com " ,
source_arn = processing_schedule.arn ,
glue_database = aws.glue. CatalogDatabase (
f " {environment} -glue-db" ,
name = f " {environment} _data_catalog" ,
description = f "Data catalog for {environment} environment" ,
athena_workgroup = aws.athena. Workgroup (
f " {environment} -athena-workgroup" ,
name = f " {environment} -workgroup" ,
" result_configuration " : {
" output_location " : Output. concat ( " s3:// " , data_lake_bucket.bucket , " /athena-results/ " ) ,
" enforce_workgroup_configuration " : True ,
" publish_cloudwatch_metrics_enabled " : True ,
kinesis_stream = aws.kinesis. Stream (
name = f " {environment} -data-stream" ,
tags = { ** tags, " Name " : f " {environment} -stream" } ,
# Kinesis Firehose for S3 delivery
firehose_role = aws.iam. Role (
f " {environment} -firehose-role" ,
assume_role_policy = json. dumps ( {
" Service " : " firehose.amazonaws.com " ,
" Action " : " sts:AssumeRole " ,
firehose_policy = aws.iam. RolePolicy (
f " {environment} -firehose-policy" ,
policy = Output. all ( data_lake_bucket.arn , kinesis_stream.arn ) . apply (
lambda args : json. dumps ( {
" kinesis:DescribeStream " ,
" kinesis:GetShardIterator " ,
firehose_delivery_stream = aws.kinesis. FirehoseDeliveryStream (
f " {environment} -firehose" ,
name = f " {environment} -s3-delivery" ,
destination = " extended_s3 " ,
kinesis_source_configuration = {
" kinesis_stream_arn " : kinesis_stream.arn,
" role_arn " : firehose_role.arn,
extended_s3_configuration = {
" role_arn " : firehose_role.arn,
" bucket_arn " : data_lake_bucket.arn,
" prefix " : " raw-data/year=! {timestamp : yyyy} /month=! {timestamp : MM} /day=! {timestamp :dd } / " ,
" error_output_prefix " : " error-data/ " ,
" buffering_interval " : 60 ,
" compression_format " : " GZIP " ,
" data_format_conversion_configuration " : {
" output_format_configuration " : {
" schema_configuration " : {
" database_name " : glue_database.name,
" table_name " : " raw_events " ,
" role_arn " : firehose_role.arn,
tags = { ** tags, " Name " : f " {environment} -firehose" } ,
export ( " database_endpoint " , database.endpoint )
export ( " data_lake_bucket " , data_lake_bucket.bucket )
export ( " kinesis_stream_name " , kinesis_stream.name )
export ( " emr_cluster_id " , emr_cluster.id )
export ( " athena_workgroup " , athena_workgroup.name )
# Component Resources for better organization
class DataPlatform ( pulumi . ComponentResource ):
def __init__ ( self , name , opts= None ) :
super (). __init__ ( " custom:infrastructure:DataPlatform " , name , None , opts )
# All resources created above would be children of this component
# This provides better organization in the Pulumi console
data_platform = DataPlatform ( f " {environment} -data-platform" )