Amazon DynamoDB is a key-value and document database, which delivers single-digit millisecond performance at any scale.
It’s a fully managed, multi-region, multi-active, durable database with built-in security,
backup and restoration, and in-memory caching for internet-scale applications. - AWS
{"order_id": "12345", "customer_name": "Alice", "total_amount": 150}
"customer_name": "Alice"is an attribute with key "customer_name" and value "Alice".
{"order_id": "12345"}{"user_id": "abc123", "order_date": "2025-01-18"}{"order_id": "12345", "customer_name": "Alice", "total_amount": 150}{"order_id": "12346", "customer_name": "Bob", "total_amount": 200, "items": ["item1", "item2"]}true or false.
null{"name": "John", "age": 30}[1, "apple", 3.14]{1, 2, 3}{"apple", "banana", "orange"}{, } ExampleTable.UserID, and choose its data type (String, Number, or Binary).OrderID, with its data type.55
aws dynamodb create-table \
--table-name UserDetails \
--attribute-definitions \
AttributeName=userId,AttributeType=S \
--key-schema \
AttributeName=userId,KeyType=HASH \
--provisioned-throughput ReadCapacityUnits=1,WriteCapacityUnits=1
{
"TableName": "UserDetails",
"Item": {
"userId": {"S": "12345"},
"name": {"S": "John Doe"},
"email": {"S": "johndoe@example.com"}
}
}
{
"RequestItems": {
"UserDetails": [
{
"PutRequest": {
"Item": {
"userId": {"S": "12345"},
"name": {"S": "Alice"}
}
}
},
{
"PutRequest": {
"Item": {
"userId": {"S": "67890"},
"name": {"S": "Bob"}
}
}
}
]
}
}
Perform following task using python:
import boto3
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
table = dynamodb.create_table(
TableName='Products',
KeySchema=[
{'AttributeName': 'productId', 'KeyType': 'HASH'} # Partition key
],
AttributeDefinitions=[
{'AttributeName': 'productId', 'AttributeType': 'S'}
],
ProvisionedThroughput={
'ReadCapacityUnits': 5,
'WriteCapacityUnits': 5
}
)
table.wait_until_exists()
print("Table created successfully!")
put_item method to add items.
table.put_item(
Item={
'productId': '101',
'name': 'Laptop',
'price': 1500,
'stock': 10
}
)
print("Item added successfully!")
query method to retrieve specific data using the partition key.
response = table.query(
KeyConditionExpression=Key('productId').eq('101')
)
print("Query Result:", response['Items'])
productId is returned.scan method to retrieve all data from the table.
response = table.scan()
print("Scan Result:", response['Items'])
delete_table method to delete the table.
table.delete()
print("Table deleted successfully!")
import boto3
import math
# Initialize DynamoDB resource
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
table = dynamodb.Table('Products') # Replace 'Products' with your table name
# Generate bulk data
bulk_data = [
{'productId': str(i), 'name': f'Product-{i}', 'price': 100 + i, 'stock': 50 + i}
for i in range(1, 501) # Example: Insert 500 items
]
# Function to insert bulk data
def insert_bulk_data(items, table, batch_size=25):
total_items = len(items)
num_batches = math.ceil(total_items / batch_size)
print(f"Total items: {total_items}, Batch size: {batch_size}, Total batches: {num_batches}")
for i in range(0, total_items, batch_size):
batch = items[i:i + batch_size]
with table.batch_writer() as batch_writer:
for item in batch:
batch_writer.put_item(Item=item)
print(f"Batch {i // batch_size + 1}/{num_batches} inserted successfully!")
# Call the function to insert data
insert_bulk_data(bulk_data, table)
[
{"productId": "1", "name": "Product-1", "price": 101, "stock": 51},
{"productId": "2", "name": "Product-2", "price": 102, "stock": 52},
...
{"productId": "500", "name": "Product-500", "price": 600, "stock": 550}
]
Total items: 500, Batch size: 25, Total batches: 20
Batch 1/20 inserted successfully!
Batch 2/20 inserted successfully!
...
Batch 20/20 inserted successfully!
response = table.scan()
print(f"Total items in the table: {len(response['Items'])}")
Expected Output
Total items in the table: 500
Querying on the primary key is fast, but it can only give us a limited set of access patterns.
The scan can give us any access pattern, but it is slow and costly.
There is a third option, using a secondary index, which is relatively cheap and provides us with more access patterns.
| Reading Option | Access Patterns | Cost | Speed |
|---|---|---|---|
| Query on primary key | Limited | Low | High |
| Scan on table | Many (virtually all) | Very high | Slow |
| Query on secondary indexes | Moderate | Moderate | High |
{
"KeyConditionExpression": "Artist = :artist",
"ExpressionAttributeValues": {
":artist": "The Beatles"
}
}
{
"IndexName": "GenreYearIndex",
"KeyConditionExpression": "Genre = :genre",
"ExpressionAttributeValues": {
":genre": "Rock"
}
}
{
"IndexName": "ArtistYearIndex",
"KeyConditionExpression": "Artist = :artist",
"ExpressionAttributeValues": {
":artist": "The Beatles"
}
}
{
"FilterExpression": "Year > :year",
"ExpressionAttributeValues": {
":year": 2010
}
}
import boto3
dynamodb = boto3.client('dynamodb', region_name='us-east-1')
def create_table():
try:
response = dynamodb.create_table(
TableName='MusicCollection',
KeySchema=[
{'AttributeName': 'Artist', 'KeyType': 'HASH'}, # Partition key
{'AttributeName': 'Song', 'KeyType': 'RANGE'} # Sort key
],
AttributeDefinitions=[
{'AttributeName': 'Artist', 'AttributeType': 'S'},
{'AttributeName': 'Song', 'AttributeType': 'S'},
{'AttributeName': 'Genre', 'AttributeType': 'S'},
{'AttributeName': 'Year', 'AttributeType': 'N'}
],
ProvisionedThroughput={
'ReadCapacityUnits': 5,
'WriteCapacityUnits': 5
},
GlobalSecondaryIndexes=[
{
'IndexName': 'GenreYearIndex',
'KeySchema': [
{'AttributeName': 'Genre', 'KeyType': 'HASH'}, # GSI Partition key
{'AttributeName': 'Year', 'KeyType': 'RANGE'} # GSI Sort key
],
'Projection': {
'ProjectionType': 'ALL'
},
'ProvisionedThroughput': {
'ReadCapacityUnits': 5,
'WriteCapacityUnits': 5
}
}
]
)
print("Table created successfully.")
except Exception as e:
print(f"Error creating table: {e}")
create_table()
Sample Output:
Table created successfully.
def add_item():
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
table = dynamodb.Table('MusicCollection')
response = table.put_item(
Item={
'Artist': 'The Beatles',
'Song': 'Hey Jude',
'Album': 'The White Album',
'Genre': 'Rock',
'Year': 1968
}
)
print("Item added:", response)
add_item()
Sample Output:
Item added: { 'ResponseMetadata': { 'HTTPStatusCode': 200 } }
def add_bulk_items():
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
table = dynamodb.Table('MusicCollection')
items = [
{'Artist': 'The Beatles', 'Song': f'Song {i}', 'Album': 'Album 1', 'Genre': 'Rock', 'Year': 1968 + i}
for i in range(1, 26)
]
with table.batch_writer() as batch:
for item in items:
batch.put_item(Item=item)
print("Bulk items added.")
add_bulk_items()
Sample Output:
Bulk items added.
def query_table():
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
table = dynamodb.Table('MusicCollection')
response = table.query(
KeyConditionExpression=boto3.dynamodb.conditions.Key('Artist').eq('The Beatles')
)
print("Query result:", response['Items'])
query_table()
Sample Output:
Query result: [
{'Artist': 'The Beatles', 'Song': 'Hey Jude', 'Album': 'The White Album', 'Genre': 'Rock', 'Year': 1968},
...
]
def query_gsi():
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
table = dynamodb.Table('MusicCollection')
response = table.query(
IndexName='GenreYearIndex',
KeyConditionExpression=boto3.dynamodb.conditions.Key('Genre').eq('Rock')
)
print("GSI Query result:", response['Items'])
query_gsi()
Sample Output:
GSI Query result: [
{'Artist': 'The Beatles', 'Song': 'Hey Jude', 'Album': 'The White Album', 'Genre': 'Rock', 'Year': 1968},
...
]
def query_lsi():
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
table = dynamodb.Table('MusicCollection')
response = table.query(
IndexName='ArtistYearIndex',
KeyConditionExpression=boto3.dynamodb.conditions.Key('Artist').eq('The Beatles') &
boto3.dynamodb.conditions.Key('Year').gte(1968)
)
print("LSI Query result:", response['Items'])
query_lsi()
Sample Output:
LSI Query result: [
{'Artist': 'The Beatles', 'Song': 'Hey Jude', 'Album': 'The White Album', 'Genre': 'Rock', 'Year': 1968},
...
]
def delete_table():
try:
response = dynamodb.delete_table(TableName='MusicCollection')
print("Table deleted successfully.")
except Exception as e:
print(f"Error deleting table: {e}")
delete_table()
Sample Output:
Table deleted successfully.
Subject: AWS Notification - Alarm "HighReadCapacityAlarm" in ALARM
Body:
Alarm Name: HighReadCapacityAlarm
State: ALARM
Metric: ConsumedReadCapacityUnits
Threshold: Greater than 500 units
DynamoDB Table: MusicCollection
To create a DynamoDB database for the Instagram Stories feature, follow these steps:
We will design a DynamoDB database to store and manage Instagram stories. The project involves three primary steps:
import boto3
# Initialize DynamoDB client
dynamodb = boto3.resource('dynamodb')
# Create the DynamoDB table
table = dynamodb.create_table(
TableName='InstagramStories',
KeySchema=[
{
'AttributeName': 'user_id', # Partition Key
'KeyType': 'HASH'
},
{
'AttributeName': 'story_id', # Sort Key
'KeyType': 'RANGE'
}
],
AttributeDefinitions=[
{
'AttributeName': 'user_id',
'AttributeType': 'S'
},
{
'AttributeName': 'story_id',
'AttributeType': 'S'
},
{
'AttributeName': 'story_type',
'AttributeType': 'S'
},
{
'AttributeName': 'timestamp',
'AttributeType': 'S'
},
{
'AttributeName': 'is_memory',
'AttributeType': 'BOOL'
},
{
'AttributeName': 'likes_count',
'AttributeType': 'N'
}
],
ProvisionedThroughput={
'ReadCapacityUnits': 5,
'WriteCapacityUnits': 5
}
)
print(f"Table {table.table_name} is being created...")
# Add sample story data
def add_sample_story(user_id, story_id, story_type, title, description, is_memory, likes_count):
table.put_item(
Item={
'user_id': user_id,
'story_id': story_id,
'story_type': story_type,
'title': title,
'description': description,
'timestamp': str(datetime.now()),
'is_memory': is_memory,
'likes_count': likes_count
}
)
# Sample data for User U1 and U2
add_sample_story('U1', 'S1', 'image', 'Sunset at the beach', 'A beautiful sunset I captured while at the beach.', False, 0)
add_sample_story('U2', 'S2', 'video', 'Morning run', 'A short video of my morning run through the park.', True, 10)
def get_user_stories(user_id):
response = table.query(
KeyConditionExpression=boto3.dynamodb.conditions.Key('user_id').eq(user_id)
)
return response['Items']
# Get stories for User U1
user_stories = get_user_stories('U1')
print(user_stories)
import time
def remove_expired_stories():
current_time = time.time()
response = table.scan()
for item in response['Items']:
story_time = time.mktime(datetime.strptime(item['timestamp'], '%Y-%m-%d %H:%M:%S.%f').timetuple())
if current_time - story_time > 86400 and not item['is_memory']: # 86400 seconds = 24 hours
table.delete_item(
Key={
'user_id': item['user_id'],
'story_id': item['story_id']
}
)
print(f"Deleted expired story: {item['story_id']}")
# Remove expired stories
remove_expired_stories()