DynamoDB


About DynamoDB
  1. What is DynamoDB?
  2. DynamoDB Features
  3. AWS Value-added Services
  4. DynamoDB: When to Use and When to Avoid

DynamoDB Basic Concepts
  1. Tables, Items, Attributes, Primary-Key(Partition Key + Sort Key)

DynamoDB Data Types
  1. Scalar, Document, Set

DynamoDB Practical Approch
  1. Understanding DynamoDB Partitions
  2. Primary Key and Range Key
  3. DynamoDB Capacity Units - Charges
  4. Querying and Scanning in DynamoDB
  5. Perform DynamoDB Tasks in AWS
  6. Creating Table in DynamoDB
  7. Creating Items in DynamoDB
  8. Python Script to access DynamoDB
  9. Python Script to insert bulk data into DynamoDB
  10. Secondary Index
  11. Python Script for Secondary Index
  12. CloudWatch Monitoring for DynamoDB
  13. Use Case - Instagram Stories using DynamoDB

What is DynamoDB?

Amazon DynamoDB is a key-value and document database, which delivers single-digit millisecond performance at any scale. It’s a fully managed, multi-region, multi-active, durable database with built-in security, backup and restoration, and in-memory caching for internet-scale applications. - AWS

  • White paper published - 2007
  • First release - 2012

  • DynamoDB Features

    AWS Value-added Services

    DynamoDB: When to Use and When to Avoid

    DynamoDB Basic Concepts


    Data Types


    Understanding DynamoDB Partitions


    Primary Key and Range Key

    Why Do We Need a Primary Key?

    Types of Primary Keys

    Why Use a Sort Key (Range Key)?

    Example Table: Library Books

    What Can You Do with This Model?

    DynamoDB Capacity Units - Charges

    How DynamoDB Charges

    Write Capacity

    Read Capacity

    Key Recommendations

    Examples

    Querying and Scanning in DynamoDB

    Options for Reading Data

    Querying

    Scanning

    Difference Between Query and Scan

    Perform DynamoDB Tasks in AWS

    1. Find DynamoDB Service
    2. Create a Table
    3. Add the Primary Key
    4. Set the Read/Write Capacity
    5. Clean Up

    Creating Table in DynamoDB


    Creating Items in DynamoDB


    Python Script to access DynamoDB

    Perform following task using python:

    1. Create a Table

    2. Add Data to the Table

    3. Query the Table

    4. Scan the Table

    5. Clean Up

    Python Script to insert bulk data into DynamoDB

        import boto3
        import math
    
        # Initialize DynamoDB resource
        dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
        table = dynamodb.Table('Products')  # Replace 'Products' with your table name
    
        # Generate bulk data
        bulk_data = [
            {'productId': str(i), 'name': f'Product-{i}', 'price': 100 + i, 'stock': 50 + i}
            for i in range(1, 501)  # Example: Insert 500 items
        ]
    
        # Function to insert bulk data
        def insert_bulk_data(items, table, batch_size=25):
            total_items = len(items)
            num_batches = math.ceil(total_items / batch_size)
            print(f"Total items: {total_items}, Batch size: {batch_size}, Total batches: {num_batches}")
    
            for i in range(0, total_items, batch_size):
                batch = items[i:i + batch_size]
                with table.batch_writer() as batch_writer:
                    for item in batch:
                        batch_writer.put_item(Item=item)
                print(f"Batch {i // batch_size + 1}/{num_batches} inserted successfully!")
    
        # Call the function to insert data
        insert_bulk_data(bulk_data, table)
    
    
    Sample Input:
        [
        {"productId": "1", "name": "Product-1", "price": 101, "stock": 51},
        {"productId": "2", "name": "Product-2", "price": 102, "stock": 52},
        ...
        {"productId": "500", "name": "Product-500", "price": 600, "stock": 550}
    ]
    

    Sample Output Logs in CloudWatch
        Total items: 500, Batch size: 25, Total batches: 20
        Batch 1/20 inserted successfully!
        Batch 2/20 inserted successfully!
        ...
        Batch 20/20 inserted successfully!    
    

    Verification
    Scan the table
        response = table.scan()
        print(f"Total items in the table: {len(response['Items'])}")
    
    Expected Output
        Total items in the table: 500
    

    Secondary Index

    Querying on the primary key is fast, but it can only give us a limited set of access patterns.
    The scan can give us any access pattern, but it is slow and costly.
    There is a third option, using a secondary index, which is relatively cheap and provides us with more access patterns.

    Reading Option Access Patterns Cost Speed
    Query on primary key Limited Low High
    Scan on table Many (virtually all) Very high Slow
    Query on secondary indexes Moderate Moderate High

    Considerations for Secondary Indexes

    Sample Data

    1. Primary Index

    2. Global Secondary Index (GSI)

    3. Local Secondary Index (LSI)

    4. Scan

    Python Script for Secondary Index

    Write script for following tasks:
    1. Create a Table
    Python Script:
        import boto3
    
        dynamodb = boto3.client('dynamodb', region_name='us-east-1')
    
        def create_table():
            try:
                response = dynamodb.create_table(
                    TableName='MusicCollection',
                    KeySchema=[
                        {'AttributeName': 'Artist', 'KeyType': 'HASH'},  # Partition key
                        {'AttributeName': 'Song', 'KeyType': 'RANGE'}   # Sort key
                    ],
                    AttributeDefinitions=[
                        {'AttributeName': 'Artist', 'AttributeType': 'S'},
                        {'AttributeName': 'Song', 'AttributeType': 'S'},
                        {'AttributeName': 'Genre', 'AttributeType': 'S'},
                        {'AttributeName': 'Year', 'AttributeType': 'N'}
                    ],
                    ProvisionedThroughput={
                        'ReadCapacityUnits': 5,
                        'WriteCapacityUnits': 5
                    },
                    GlobalSecondaryIndexes=[
                        {
                            'IndexName': 'GenreYearIndex',
                            'KeySchema': [
                                {'AttributeName': 'Genre', 'KeyType': 'HASH'},  # GSI Partition key
                                {'AttributeName': 'Year', 'KeyType': 'RANGE'}   # GSI Sort key
                            ],
                            'Projection': {
                                'ProjectionType': 'ALL'
                            },
                            'ProvisionedThroughput': {
                                'ReadCapacityUnits': 5,
                                'WriteCapacityUnits': 5
                            }
                        }
                    ]
                )
                print("Table created successfully.")
            except Exception as e:
                print(f"Error creating table: {e}")
    
        create_table()
      
    Sample Output:
        Table created successfully.
      

    2. Add Items to the Table
    Python Script:
        def add_item():
            dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
            table = dynamodb.Table('MusicCollection')
            response = table.put_item(
                Item={
                    'Artist': 'The Beatles',
                    'Song': 'Hey Jude',
                    'Album': 'The White Album',
                    'Genre': 'Rock',
                    'Year': 1968
                }
            )
            print("Item added:", response)
    
        add_item()
      
    Sample Output:
        Item added: { 'ResponseMetadata': { 'HTTPStatusCode': 200 } }
      

    3. Add Bulk Items with Batch Size = 25
    Python Script:
        def add_bulk_items():
            dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
            table = dynamodb.Table('MusicCollection')
    
            items = [
                {'Artist': 'The Beatles', 'Song': f'Song {i}', 'Album': 'Album 1', 'Genre': 'Rock', 'Year': 1968 + i}
                for i in range(1, 26)
            ]
    
            with table.batch_writer() as batch:
                for item in items:
                    batch.put_item(Item=item)
            print("Bulk items added.")
    
        add_bulk_items()
      
    Sample Output:
        Bulk items added.
      

    4. Check Indexes and Run a Query
    Python Script:
        def query_table():
            dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
            table = dynamodb.Table('MusicCollection')
            response = table.query(
                KeyConditionExpression=boto3.dynamodb.conditions.Key('Artist').eq('The Beatles')
            )
            print("Query result:", response['Items'])
    
        query_table()
      
    Sample Output:
        Query result: [
          {'Artist': 'The Beatles', 'Song': 'Hey Jude', 'Album': 'The White Album', 'Genre': 'Rock', 'Year': 1968},
          ...
        ]
      

    5. Run the Global Secondary Index (GSI)
    Python Script:
        def query_gsi():
            dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
            table = dynamodb.Table('MusicCollection')
            response = table.query(
                IndexName='GenreYearIndex',
                KeyConditionExpression=boto3.dynamodb.conditions.Key('Genre').eq('Rock')
            )
            print("GSI Query result:", response['Items'])
    
        query_gsi()
      
    Sample Output:
        GSI Query result: [
          {'Artist': 'The Beatles', 'Song': 'Hey Jude', 'Album': 'The White Album', 'Genre': 'Rock', 'Year': 1968},
          ...
        ]
      

    6. Run the Local Secondary Index (LSI)
    Python Script:
        def query_lsi():
            dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
            table = dynamodb.Table('MusicCollection')
            response = table.query(
                IndexName='ArtistYearIndex',
                KeyConditionExpression=boto3.dynamodb.conditions.Key('Artist').eq('The Beatles') &
                                        boto3.dynamodb.conditions.Key('Year').gte(1968)
            )
            print("LSI Query result:", response['Items'])
    
        query_lsi()
      
    Sample Output:
        LSI Query result: [
          {'Artist': 'The Beatles', 'Song': 'Hey Jude', 'Album': 'The White Album', 'Genre': 'Rock', 'Year': 1968},
          ...
        ]
      

    7. Clean Up
    Python Script:
        def delete_table():
            try:
                response = dynamodb.delete_table(TableName='MusicCollection')
                print("Table deleted successfully.")
            except Exception as e:
                print(f"Error deleting table: {e}")
    
        delete_table()
      
    Sample Output:
        Table deleted successfully.
      

    CloudWatch Monitoring for DynamoDB

    The following technologies are available for logging and monitoring AWS services:

    Steps to Create a CloudWatch Alarm for DynamoDB Table

    1. Choose the Metric

    2. Define the Alarm Condition

    3. Configure Actions

    4. Add a Name and Review

    Example: CloudWatch Alarm for DynamoDB Table

    Use Case:

    Sample Alarm Configuration:

    Expected Notification:
          Subject: AWS Notification - Alarm "HighReadCapacityAlarm" in ALARM
          Body:
          Alarm Name: HighReadCapacityAlarm
          State: ALARM
          Metric: ConsumedReadCapacityUnits
          Threshold: Greater than 500 units
          DynamoDB Table: MusicCollection
        

    Verifying the Alarm


    Cleaning Up


    Use Case - Instagram Stories using DynamoDB

    To create a DynamoDB database for the Instagram Stories feature, follow these steps:

    We will design a DynamoDB database to store and manage Instagram stories. The project involves three primary steps:

    Create Dynamodb table
            import boto3
    
    # Initialize DynamoDB client
    dynamodb = boto3.resource('dynamodb')
    
    # Create the DynamoDB table
    table = dynamodb.create_table(
        TableName='InstagramStories',
        KeySchema=[
            {
                'AttributeName': 'user_id',  # Partition Key
                'KeyType': 'HASH'
            },
            {
                'AttributeName': 'story_id',  # Sort Key
                'KeyType': 'RANGE'
            }
        ],
        AttributeDefinitions=[
            {
                'AttributeName': 'user_id',
                'AttributeType': 'S'
            },
            {
                'AttributeName': 'story_id',
                'AttributeType': 'S'
            },
            {
                'AttributeName': 'story_type',
                'AttributeType': 'S'
            },
            {
                'AttributeName': 'timestamp',
                'AttributeType': 'S'
            },
            {
                'AttributeName': 'is_memory',
                'AttributeType': 'BOOL'
            },
            {
                'AttributeName': 'likes_count',
                'AttributeType': 'N'
            }
        ],
        ProvisionedThroughput={
            'ReadCapacityUnits': 5,
            'WriteCapacityUnits': 5
        }
    )
    
    print(f"Table {table.table_name} is being created...")
    
        

    Populate the Table with Sample Data
            # Add sample story data
    def add_sample_story(user_id, story_id, story_type, title, description, is_memory, likes_count):
        table.put_item(
            Item={
                'user_id': user_id,
                'story_id': story_id,
                'story_type': story_type,
                'title': title,
                'description': description,
                'timestamp': str(datetime.now()),
                'is_memory': is_memory,
                'likes_count': likes_count
            }
        )
    
    # Sample data for User U1 and U2
    add_sample_story('U1', 'S1', 'image', 'Sunset at the beach', 'A beautiful sunset I captured while at the beach.', False, 0)
    add_sample_story('U2', 'S2', 'video', 'Morning run', 'A short video of my morning run through the park.', True, 10)
    
        

    Query the Data
            def get_user_stories(user_id):
        response = table.query(
            KeyConditionExpression=boto3.dynamodb.conditions.Key('user_id').eq(user_id)
        )
        return response['Items']
    
    # Get stories for User U1
    user_stories = get_user_stories('U1')
    print(user_stories)
        

    Automatic Cleanup for Expired Stories
            import time
    
    def remove_expired_stories():
        current_time = time.time()
        response = table.scan()
        for item in response['Items']:
            story_time = time.mktime(datetime.strptime(item['timestamp'], '%Y-%m-%d %H:%M:%S.%f').timetuple())
            if current_time - story_time > 86400 and not item['is_memory']:  # 86400 seconds = 24 hours
                table.delete_item(
                    Key={
                        'user_id': item['user_id'],
                        'story_id': item['story_id']
                    }
                )
                print(f"Deleted expired story: {item['story_id']}")
    
    # Remove expired stories
    remove_expired_stories()