Kinesis Streams Example
This example demonstrates how to create Amazon Kinesis Data Streams using FsCDK for real-time data streaming.
What is Kinesis?
Amazon Kinesis Data Streams enables you to build custom applications that process or analyze streaming data for specialized needs. It can continuously capture and store terabytes of data per hour from hundreds of thousands of sources.
Prerequisites
- .NET 8 SDK
- AWS CDK CLI (
npm install -g aws-cdk) - AWS credentials configured (for deployment)
Basic Kinesis Stream with Lambda Consumer
#r "../src/bin/Release/net8.0/publish/Amazon.JSII.Runtime.dll"
#r "../src/bin/Release/net8.0/publish/Constructs.dll"
#r "../src/bin/Release/net8.0/publish/Amazon.CDK.Lib.dll"
#r "../src/bin/Release/net8.0/publish/System.Text.Json.dll"
#r "../src/bin/Release/net8.0/publish/FsCDK.dll"
open Amazon.CDK
open Amazon.CDK.AWS.Kinesis
open Amazon.CDK.AWS.Lambda
open Amazon.CDK.AWS.Lambda.EventSources
open FsCDK
Stream Configuration Options
On-Demand Mode
For unpredictable workloads, use on-demand capacity mode. The stream automatically scales to handle varying throughput.
When to use: - Unpredictable traffic patterns - Sporadic workloads - New applications with unknown load
Pricing: Pay per GB of data written and read
stack "OnDemandKinesisStack" {
description "Kinesis stream with on-demand capacity"
kinesisStream "OnDemandStream" {
streamName "on-demand-stream"
streamMode StreamMode.ON_DEMAND
retentionPeriod (Duration.Hours 168.) // 7 days
encryption StreamEncryption.KMS
}
}
Provisioned Mode (High Throughput)
For predictable, high-volume data ingestion with multiple shards.
When to use: - Consistent high throughput - Cost optimization with reserved capacity - Need for predictable performance
Pricing: Pay per shard-hour
stack "HighThroughputKinesisStack" {
description "High-throughput Kinesis stream with multiple shards"
kinesisStream "HighThroughputStream" {
streamName "high-throughput-stream"
shardCount 10
retentionPeriod (Duration.Hours 168.) // 7 days
encryption StreamEncryption.KMS
}
}
Use Cases
1. Real-Time Analytics Pipeline
Process streaming data for real-time dashboards and metrics.
stack "AnalyticsPipelineStack" {
description "Real-time analytics with Kinesis and Lambda"
// Kinesis stream captures clickstream data
let clickstream =
kinesisStream "ClickstreamData" {
streamName "clickstream-events"
shardCount 5
retentionPeriod (Duration.Hours 48.)
encryption StreamEncryption.KMS
}
// Lambda processes events in real-time
let! analyticsProcessor =
lambda "analytics-processor" {
handler "analytics.handler"
runtime Runtime.PYTHON_3_11
code "./analytics-code"
memorySize 1024
timeout 60.0
environment
[ "STREAM_NAME", clickstream.StreamName
"METRIC_NAMESPACE", "Analytics/Clickstream" ]
description "Processes clickstream events for real-time analytics"
}
// This value-cross-linking would need some nicer API.
clickstream.GrantReads.Add(analyticsProcessor.Role)
analyticsProcessor.AddEventSource(
KinesisEventSource(
clickstream.Stream.Value,
KinesisEventSourceProps(
StartingPosition = StartingPosition.LATEST,
BatchSize = 500.,
MaxBatchingWindow = Duration.Seconds(10.),
ParallelizationFactor = 5.
)
)
)
}
2. Event Streaming Architecture
Capture application events for multiple independent consumers.
stack "EventStreamingStack" {
description "Event streaming with multiple consumers"
// Central event stream
let eventStream =
kinesisStream "ApplicationEvents" {
streamName "application-events"
shardCount 3
retentionPeriod (Duration.Hours 72.)
encryption StreamEncryption.KMS
}
// Consumer 1: Event archiver
let! archiver =
lambda "event-archiver" {
handler "archiver.handler"
runtime Runtime.PYTHON_3_11
code "./archiver-code"
memorySize 512
timeout 120.0
description "Archives events to S3"
}
eventStream.GrantReads.Add(archiver.Role)
archiver.AddEventSource(
KinesisEventSource(
eventStream.Stream.Value,
KinesisEventSourceProps(StartingPosition = StartingPosition.TRIM_HORIZON, BatchSize = 100.)
)
)
// Consumer 2: Metrics aggregator
let! metricsAggregator =
lambda "metrics-aggregator" {
handler "metrics.handler"
runtime Runtime.NODEJS_18_X
code "./metrics-code"
memorySize 512
timeout 60.0
description "Aggregates metrics from events"
}
eventStream.GrantReads.Add(metricsAggregator.Role)
metricsAggregator.AddEventSource(
KinesisEventSource(
eventStream.Stream.Value,
KinesisEventSourceProps(StartingPosition = StartingPosition.LATEST, BatchSize = 200.)
)
)
}
3. Log Aggregation
Centralize logs from multiple sources for analysis and storage.
stack "LogAggregationStack" {
description "Centralized log aggregation with Kinesis"
// Log aggregation stream
let logStream =
kinesisStream "LogAggregation" {
streamName "application-logs"
shardCount 4
retentionPeriod (Duration.Hours 24.)
encryption StreamEncryption.KMS
}
// Log processor
let! logProcessor =
lambda "log-processor" {
handler "logs.handler"
runtime Runtime.PYTHON_3_11
code "./log-processor-code"
memorySize 1024
timeout 120.0
environment [ "LOG_GROUP", "/aws/kinesis/logs" ]
description "Processes and filters log data"
}
logStream.GrantReads.Add(logProcessor.Role)
logProcessor.AddEventSource(
KinesisEventSource(
logStream.Stream.Value,
KinesisEventSourceProps(
StartingPosition = StartingPosition.LATEST,
BatchSize = 500.,
MaxBatchingWindow = Duration.Seconds(5.)
)
)
)
}
Security Best Practices
Encryption
At-Rest Encryption: - KMS encryption is enabled by default - Use customer-managed keys for sensitive data - Automatic key rotation available
In-Transit Encryption: - TLS encryption for all data transmission - HTTPS endpoints only
You can add encryptionKey as parameter to builder.
IAM Permissions
Grant least-privilege access to streams.
open Amazon.CDK.AWS.IAM
// Producer permissions
stack "LeastPrivilegeKinesisStack" {
let producerStmt =
policyStatement {
effect Effect.ALLOW
actions [ "kinesis:PutRecord"; "kinesis:PutRecords" ]
resources [ "arn:aws:kinesis:*:*:stream/my-stream" ]
}
role "stream-producer-role" {
constructId "ExecutionRole"
assumedBy (ServicePrincipal("lambda.amazonaws.com"))
addToPolicy producerStmt
}
let consumerStmt =
policyStatement {
effect Effect.ALLOW
actions
[ "kinesis:GetRecords"
"kinesis:GetShardIterator"
"kinesis:DescribeStream"
"kinesis:ListShards" ]
resources [ "arn:aws:kinesis:*:*:stream/my-stream" ]
}
// Consumer permissions
role "stream-consumer-role" {
assumedBy (ServicePrincipal("lambda.amazonaws.com"))
description "Role for Kinesis stream consumer"
addToPolicy consumerStmt
}
}
Monitoring and Observability
CloudWatch Metrics
Kinesis automatically publishes metrics for monitoring:
- IncomingBytes: Data volume ingested
- IncomingRecords: Record count ingested
- GetRecords.IteratorAgeMilliseconds: Consumer lag
- WriteProvisionedThroughputExceeded: Throttling events
- ReadProvisionedThroughputExceeded: Consumer throttling
CloudWatch Alarms
Monitor stream health with alarms:
stack "MonitoredKinesisStack" {
description "Kinesis stream with CloudWatch monitoring"
let monitoredStream =
kinesisStream "MonitoredStream" {
streamName "monitored-stream"
shardCount 2
}
// Alarm for consumer lag
cloudwatchAlarm "stream-lag-alarm" {
description "Alert when consumers fall behind"
metricNamespace "AWS/Kinesis"
metricName "GetRecords.IteratorAgeMilliseconds"
dimensions [ "StreamName", monitoredStream.StreamName ]
statistic "Maximum"
threshold 60000.0 // 1 minute in milliseconds
evaluationPeriods 2
period (Duration.Minutes(5.0))
}
// Alarm for write throttling
cloudwatchAlarm "write-throttle-alarm" {
description "Alert on write throttling"
metricNamespace "AWS/Kinesis"
metricName "WriteProvisionedThroughputExceeded"
dimensions [ "StreamName", monitoredStream.StreamName ]
statistic "Sum"
threshold 10.0
evaluationPeriods 1
period (Duration.Minutes(5.0))
}
()
}
Performance Optimization
Shard Calculation
Calculate required shards based on throughput requirements:
Incoming write bandwidth: 1 MB/sec per shard
Outgoing read bandwidth: 2 MB/sec per shard
Required shards = max(
incoming_write_bandwidth_in_MB / 1,
outgoing_read_bandwidth_in_MB / 2
)
Batch Processing Configuration
Optimize Lambda processing for cost and latency:
stack "OptimizedProcessingStack" {
description "Optimized Kinesis processing with Lambda"
let stream =
kinesisStream "OptimizedStream" {
streamName "optimized-stream"
shardCount 5
}
let! optimizedConsumer =
lambda "optimized-consumer" {
handler "optimized.handler"
runtime Runtime.PYTHON_3_11
code "./optimized-code"
memorySize 1024
timeout 300.0
reservedConcurrentExecutions 10
description "Optimized batch processor"
}
stream.GrantReads.Add(optimizedConsumer.Role)
// Optimized event source mapping
optimizedConsumer.AddEventSource(
KinesisEventSource(
stream.Stream.Value,
KinesisEventSourceProps(
StartingPosition = StartingPosition.LATEST,
BatchSize = 1000., // Larger batches reduce Lambda invocations
MaxBatchingWindow = Duration.Seconds(10.), // Wait up to 10s to collect records
ParallelizationFactor = 5., // Process 5 batches per shard concurrently
RetryAttempts = 3., // Retry failed batches
MaxRecordAge = (Duration.Hours 24.), // Discard old records
BisectBatchOnError = true, // Split batch on error for faster recovery
ReportBatchItemFailures = true // Report individual failures
)
)
)
}
Complete Production Example
stack "ProductionKinesisStack" {
env (
environment {
account config.Account
region config.Region
}
)
description "Production-ready Kinesis streaming pipeline"
tags [ "Environment", "Production"; "Project", "DataPipeline"; "ManagedBy", "FsCDK" ]
// Production stream with extended retention
let prodStream =
kinesisStream "ProductionStream" {
streamName "production-data-stream"
shardCount 10
retentionPeriod (Duration.Hours 168.)
encryption StreamEncryption.KMS
}
// Producer Lambda
let producer =
lambda "data-producer" {
handler "producer.handler"
runtime Runtime.PYTHON_3_11
code "./producer-code"
memorySize 512
timeout 60.0
environment [ "STREAM_NAME", prodStream.StreamName; "BATCH_SIZE", "500" ]
description "Produces events to Kinesis stream"
}
prodStream.GrantWrites.Add(producer.Function.Value.Role)
// Consumer Lambda with optimal settings
let! consumer =
lambda "data-consumer" {
handler "consumer.handler"
runtime Runtime.PYTHON_3_11
code "./consumer-code"
memorySize 2048
timeout 300.0
reservedConcurrentExecutions 50
environment [ "DESTINATION_BUCKET", "processed-data-bucket"; "BATCH_SIZE", "1000" ]
description "Consumes and processes events from Kinesis"
}
prodStream.GrantReads.Add(consumer.Role)
consumer.AddEventSource(
KinesisEventSource(
prodStream.Stream.Value,
KinesisEventSourceProps(
StartingPosition = StartingPosition.TRIM_HORIZON,
BatchSize = 1000.,
MaxBatchingWindow = Duration.Seconds(10.),
ParallelizationFactor = 10.,
RetryAttempts = 3.,
BisectBatchOnError = true,
ReportBatchItemFailures = true
)
)
)
// CloudWatch alarm for monitoring
cloudwatchAlarm "production-stream-lag" {
description "Critical: Production stream consumer lag"
metricNamespace "AWS/Kinesis"
metricName "GetRecords.IteratorAgeMilliseconds"
dimensions [ "StreamName", prodStream.StreamName ]
statistic "Maximum"
threshold 300000.0 // 5 minutes
evaluationPeriods 2
period (Duration.Minutes(5.0))
}
}
Cost Optimization
Choosing the Right Mode
Provisioned Mode: - Best for: Consistent workloads - Cost: \(0.015 per shard-hour +\)0.014 per million PUT payload units - Example: 2 shards x 24 hours x 30 days = $21.60/month
On-Demand Mode: - Best for: Variable workloads - Cost: \(0.04 per GB written +\)0.0125 per GB read - Example: 100 GB write + 200 GB read = $6.50/month
Data Retention
- 24 hours: Default, suitable for most use cases ($0/month)
- 7 days: Extended retention ($0.023 per shard-hour extra)
- 365 days: Maximum retention ($0.033 per shard-hour extra)
Deployment
|
Next Steps
- Integrate with Lambda Functions for stream processing
- Review IAM Best Practices for access control
- Explore DynamoDB for storing processed data
Resources
- AWS Kinesis Documentation
- Kinesis Data Streams Developer Guide
- Lambda Kinesis Integration
- Kinesis Best Practices
- Kinesis Pricing
<summary>Provides information about, and means to manipulate, the current environment and platform. This class cannot be inherited.</summary>
System.Environment.GetEnvironmentVariable(variable: string, target: System.EnvironmentVariableTarget) : string
<summary>Creates an AWS CDK Stack construct.</summary>
<param name="name">The name of the stack.</param>
<code lang="fsharp"> stack "MyStack" { lambda myFunction bucket myBucket } </code>
<summary>Creates an AWS CDK Environment configuration.</summary>
<code lang="fsharp"> environment { account "123456789012" region "us-west-2" } </code>
<summary>Sets the AWS account ID for the environment.</summary>
<param name="config">The current configuration.</param>
<param name="accountId">The AWS account ID.</param>
<code lang="fsharp"> environment { account "123456789012" } </code>
<summary>Sets the AWS region for the environment.</summary>
<param name="config">The current configuration.</param>
<param name="regionName">The AWS region name.</param>
<code lang="fsharp"> environment { region "us-west-2" } </code>
<summary>Sets the stack description.</summary>
<param name="config">The current stack configuration.</param>
<param name="desc">A description of the stack.</param>
<code lang="fsharp"> stack "MyStack" { description "My application stack" } </code>
<summary>Adds tags to the stack.</summary>
<param name="config">The current stack configuration.</param>
<param name="tags">A list of key-value pairs for tagging.</param>
<code lang="fsharp"> stack "MyStack" { tags [ "Environment", "Production"; "Team", "DevOps" ] } </code>
<summary>Creates a Kinesis stream with AWS best practices.</summary>
<param name="name">The stream name.</param>
<code lang="fsharp"> kinesisStream "MyStream" { shardCount 2 retentionPeriod (Duration.Hours(48.0)) } </code>
<summary>Sets the stream name.</summary>
<summary>Sets the number of shards.</summary>
<summary>Sets the retention period.</summary>
<summary>Uses a custom KMS key for encryption.</summary>
<summary>Creates a Lambda function configuration.</summary>
<param name="name">The function name.</param>
<code lang="fsharp"> lambda "MyFunction" { handler "index.handler" runtime Runtime.NODEJS_18_X code "./lambda" timeout 30.0 } </code>
<summary>Sets the handler for the Lambda function.</summary>
<param name="config">The function configuration.</param>
<param name="handler">The handler name (e.g., "index.handler").</param>
<code lang="fsharp"> lambda "MyFunction" { handler "index.handler" } </code>
<summary>Sets the runtime for the Lambda function.</summary>
<param name="config">The function configuration.</param>
<param name="runtime">The Lambda runtime.</param>
<code lang="fsharp"> lambda "MyFunction" { runtime Runtime.NODEJS_18_X } </code>
type Runtime = inherit DeputyBase new: name: string * ?family: Nullable<RuntimeFamily> * ?props: ILambdaRuntimeProps -> unit member RuntimeEquals: other: Runtime -> bool member ToString: unit -> string member BundlingImage: DockerImage member Family: Nullable<RuntimeFamily> member IsVariable: bool member Name: string member SupportsCodeGuruProfiling: bool member SupportsInlineCode: bool ...
--------------------
Runtime(name: string, ?family: System.Nullable<RuntimeFamily>, ?props: ILambdaRuntimeProps) : Runtime
<summary>Sets the code source from a Code object.</summary>
<param name="config">The function configuration.</param>
<param name="path">The Code object.</param>
<code lang="fsharp"> lambda "MyFunction" { code (Code.FromBucket myBucket "lambda.zip") } </code>
<summary>Sets the memory allocation for the Lambda function.</summary>
<param name="config">The function configuration.</param>
<param name="mb">The memory size in megabytes.</param>
<code lang="fsharp"> lambda "MyFunction" { memory 512 } </code>
<summary>Sets the timeout for the Lambda function.</summary>
<param name="config">The function configuration.</param>
<param name="seconds">The timeout in seconds.</param>
<code lang="fsharp"> lambda "MyFunction" { timeout 30.0 } </code>
<summary>Sets environment variables for the Lambda function.</summary>
<param name="config">The function configuration.</param>
<param name="env">List of key-value pairs for environment variables.</param>
<code lang="fsharp"> lambda "MyFunction" { environment [ "KEY1", "value1"; "KEY2", "value2" ] } </code>
<summary>Sets the description for the Lambda function.</summary>
<param name="config">The function configuration.</param>
<param name="desc">The function description.</param>
<code lang="fsharp"> lambda "MyFunction" { description "Processes incoming orders" } </code>
<summary>Sets the stream mode.</summary>
type KinesisEventSource = inherit StreamEventSource new: stream: IStream * props: IKinesisEventSourceProps -> unit member Bind: target: IFunction -> unit member EventSourceMappingArn: string member EventSourceMappingId: string member Stream: IStream
--------------------
KinesisEventSource(stream: IStream, props: IKinesisEventSourceProps) : KinesisEventSource
type KinesisEventSourceProps = interface IKinesisEventSourceProps interface IStreamEventSourceProps interface IBaseStreamEventSourceProps new: unit -> unit member BatchSize: Nullable<float> member BisectBatchOnError: Nullable<bool> member Enabled: Nullable<bool> member FilterEncryption: IKey member Filters: IDictionary<string,obj> array member MaxBatchingWindow: Duration ...
--------------------
KinesisEventSourceProps() : KinesisEventSourceProps
<summary>Creates an IAM Role using the RoleBuilder DSL.</summary>
<param name="name">The name of the IAM Role.</param>
<code lang="fsharp"> let myRole = role "MyLambdaRole" { assumedBy (ServicePrincipal("lambda.amazonaws.com")) description "Role for my Lambda function" managedPolicies [ ManagedPolicy.FromAwsManagedPolicyName("service-role/AWSLambdaBasicExecutionRole") ] } </code>
<summary>Sets the construct ID for the role.</summary>
<param name="config">The current role configuration.</param>
<param name="id">The construct ID.</param>
<code lang="fsharp"> role "MyLambdaRole" { constructId "CustomLambdaRoleId" } </code>
<summary>Sets the principal that can assume the role.</summary>
<param name="config">The current role configuration.</param>
<param name="principal">The IAM principal.</param>
<code lang="fsharp"> role "MyLambdaRole" { assumedBy (ServicePrincipal("lambda.amazonaws.com")) } </code>
type ServicePrincipal = inherit PrincipalBase new: service: string * ?opts: IServicePrincipalOpts -> unit member DedupeString: unit -> string member ToString: unit -> string static member FromStaticServicePrincipleName: servicePrincipalName: string -> ServicePrincipal static member ServicePrincipalName: service: string -> string member PolicyFragment: PrincipalPolicyFragment member Service: string
--------------------
ServicePrincipal(service: string, ?opts: IServicePrincipalOpts) : ServicePrincipal
<summary>Adds a statement to the role's policy.</summary>
<param name="config">The current role configuration.</param>
<param name="statement">The statement to add.</param>
<code lang="fsharp"> role "MyLambdaRole" { addToPolicy myPolicyStatement } </code>
<summary>Sets the description for the role.</summary>
<param name="config">The current role configuration.</param>
<param name="description">The role description.</param>
<code lang="fsharp"> role "MyLambdaRole" { description "Role for my Lambda function" } </code>
<summary>Sets the alarm description.</summary>
<summary>Sets the CloudWatch metric namespace (e.g., "AWS/Lambda", "AWS/RDS").</summary>
<summary>Sets the metric name (e.g., "Errors", "CPUUtilization").</summary>
<summary>Sets the metric dimensions for filtering (e.g., FunctionName, DBInstanceIdentifier).</summary>
<summary>Sets the statistic (Average, Sum, Minimum, Maximum, SampleCount).</summary>
<summary>Sets the alarm threshold value.</summary>
<summary>Sets the number of periods to evaluate.</summary>
<summary>Sets the evaluation period.</summary>
<summary>Sets reserved concurrent executions for the function.</summary>
<param name="config">The function configuration.</param>
<param name="value">Reserved concurrency value.</param>
<code lang="fsharp"> lambda "MyFunction" { reservedConcurrentExecutions 50 } </code>
FsCDK