from pymongo import MongoClient
# Local connection
client = MongoClient ('localhost' , 27017 )
# or
client = MongoClient ('mongodb://localhost:27017/' )
# Remote connection
client = MongoClient ('mongodb://username:password@host:port/' )
# MongoDB Atlas
client = MongoClient ('mongodb+srv://username:password@cluster.mongodb.net/' )
client = MongoClient (
host = 'localhost' ,
port = 27017 ,
username = 'user' ,
password = 'pass' ,
authSource = 'admin' ,
authMechanism = 'SCRAM-SHA-256'
)
Access Database and Collection
# Access database
db = client ['database_name' ]
# or
db = client .database_name
# Access collection
collection = db ['collection_name' ]
# or
collection = db .collection_name
with MongoClient ('mongodb://localhost:27017/' ) as client :
db = client .mydb
# operations here
client .list_database_names ()
db = client ['new_database' ]
client .drop_database ('database_name' )
db .list_collection_names ()
db .create_collection ('collection_name' )
# With options
db .create_collection ('collection_name' ,
capped = True ,
size = 100000 ,
max = 100
)
db .collection_name .drop ()
# or
db ['collection_name' ].drop ()
db .old_name .rename ('new_name' )
doc = {'name' : 'username' , 'age' : 28 , 'city' : 'cityname' }
result = collection .insert_one (doc )
# Get inserted ID
print (result .inserted_id )
print (result .acknowledged )
docs = [
{'name' : 'username1' , 'age' : 25 },
{'name' : 'username2' , 'age' : 35 },
{'name' : 'username3' , 'age' : 28 }
]
result = collection .insert_many (docs )
# Get inserted IDs
print (result .inserted_ids )
doc = {'_id' : 'custom_id' , 'name' : 'username' }
collection .insert_one (doc )
# Find first document
doc = collection .find_one ()
# Find with filter
doc = collection .find_one ({'name' : 'username' })
# Find with projection (specify fields)
doc = collection .find_one ({'name' : 'username' }, {'_id' : 0 , 'name' : 1 , 'age' : 1 })
# Find all
cursor = collection .find ()
for doc in cursor :
print (doc )
# Find with filter
cursor = collection .find ({'age' : {'$gte' : 25 }})
# Find with projection
cursor = collection .find ({'age' : {'$gte' : 25 }}, {'name' : 1 , 'age' : 1 , '_id' : 0 })
# Convert to list
docs = list (collection .find ())
# Comparison
collection .find ({'age' : {'$eq' : 30 }}) # Equal
collection .find ({'age' : {'$ne' : 30 }}) # Not equal
collection .find ({'age' : {'$gt' : 30 }}) # Greater than
collection .find ({'age' : {'$gte' : 30 }}) # Greater than or equal
collection .find ({'age' : {'$lt' : 30 }}) # Less than
collection .find ({'age' : {'$lte' : 30 }}) # Less than or equal
collection .find ({'age' : {'$in' : [25 , 30 , 35 ]}}) # In array
collection .find ({'age' : {'$nin' : [25 , 30 , 35 ]}}) # Not in array
# Logical
collection .find ({'$and' : [{'age' : {'$gte' : 25 }}, {'age' : {'$lte' : 35 }}]})
collection .find ({'$or' : [{'name' : 'John' }, {'name' : 'Jane' }]})
collection .find ({'$nor' : [{'age' : {'$lt' : 25 }}, {'age' : {'$gt' : 35 }}]})
collection .find ({'age' : {'$not' : {'$gte' : 30 }}})
# Element
collection .find ({'field' : {'$exists' : True }})
collection .find ({'field' : {'$type' : 'string' }})
# Array
collection .find ({'tags' : {'$all' : ['python' , 'mongodb' ]}})
collection .find ({'tags' : {'$elemMatch' : {'$gte' : 80 , '$lt' : 90 }}})
collection .find ({'tags' : {'$size' : 3 }})
# Regex
collection .find ({'name' : {'$regex' : '^J' }})
collection .find ({'name' : {'$regex' : 'john' , '$options' : 'i' }}) # Case insensitive
# Limit
cursor = collection .find ().limit (10 )
# Skip
cursor = collection .find ().skip (5 )
# Sort (1 for ascending, -1 for descending)
cursor = collection .find ().sort ('age' , 1 )
cursor = collection .find ().sort ([('age' , - 1 ), ('name' , 1 )])
# Count
count = collection .count_documents ({'age' : {'$gte' : 25 }})
total = collection .estimated_document_count ()
# Distinct
values = collection .distinct ('city' )
# Update specific fields
result = collection .update_one (
{'name' : 'username' },
{'$set' : {'age' : 31 , 'city' : 'cityname' }}
)
print (result .matched_count )
print (result .modified_count )
# Update with operators
collection .update_one (
{'name' : 'username' },
{'$inc' : {'age' : 1 }} # Increment age by 1
)
result = collection .update_many (
{'age' : {'$lt' : 30 }},
{'$set' : {'status' : 'young' }}
)
print (result .matched_count )
print (result .modified_count )
result = collection .replace_one (
{'name' : 'username' },
{'name' : 'username' , 'age' : 31 , 'city' : 'cityname' }
)
# Field operators
{'$set' : {'field' : 'value' }} # Set value
{'$unset' : {'field' : '' }} # Remove field
{'$rename' : {'old' : 'new' }} # Rename field
{'$inc' : {'counter' : 1 }} # Increment
{'$mul' : {'price' : 1.1 }} # Multiply
{'$min' : {'score' : 50 }} # Update if less than current
{'$max' : {'score' : 100 }} # Update if greater than current
{'$currentDate' : {'lastModified' : True }} # Set current date
# Array operators
{'$push' : {'tags' : 'python' }} # Add to array
{'$push' : {'tags' : {'$each' : ['python' , 'mongodb' ]}}} # Add multiple
{'$pop' : {'tags' : 1 }} # Remove last (1) or first (-1)
{'$pull' : {'tags' : 'python' }} # Remove matching
{'$pullAll' : {'tags' : ['python' , 'js' ]}} # Remove multiple
{'$addToSet' : {'tags' : 'python' }} # Add if not exists
Upsert (Update or Insert)
collection .update_one (
{'name' : 'username' },
{'$set' : {'age' : 30 }},
upsert = True
)
result = collection .delete_one ({'name' : 'username' })
print (result .deleted_count )
result = collection .delete_many ({'age' : {'$lt' : 25 }})
print (result .deleted_count )
# Delete all documents
result = collection .delete_many ({})
pipeline = [
{'$match' : {'age' : {'$gte' : 25 }}},
{'$group' : {'_id' : '$city' , 'count' : {'$sum' : 1 }}},
{'$sort' : {'count' : - 1 }}
]
results = collection .aggregate (pipeline )
for doc in results :
print (doc )
# $match - Filter documents
{'$match' : {'age' : {'$gte' : 25 }}}
# $group - Group documents
{'$group' : {
'_id' : '$city' ,
'count' : {'$sum' : 1 },
'avgAge' : {'$avg' : '$age' },
'maxAge' : {'$max' : '$age' },
'minAge' : {'$min' : '$age' }
}}
# $project - Select/reshape fields
{'$project' : {'name' : 1 , 'age' : 1 , '_id' : 0 }}
{'$project' : {'name' : 1 , 'ageGroup' : {'$cond' : [{'$gte' : ['$age' , 30 ]}, 'senior' , 'junior' ]}}}
# $sort - Sort documents
{'$sort' : {'age' : - 1 }}
# $limit - Limit results
{'$limit' : 10 }
# $skip - Skip documents
{'$skip' : 5 }
# $unwind - Deconstruct array
{'$unwind' : '$tags' }
# $lookup - Join collections
{'$lookup' : {
'from' : 'other_collection' ,
'localField' : 'user_id' ,
'foreignField' : '_id' ,
'as' : 'user_info'
}}
# $addFields - Add new fields
{'$addFields' : {'fullName' : {'$concat' : ['$firstName' , ' ' , '$lastName' ]}}}
# $count - Count documents
{'$count' : 'total' }
# $out - Write to collection
{'$out' : 'new_collection' }
# Count by category
pipeline = [
{'$group' : {'_id' : '$category' , 'count' : {'$sum' : 1 }}}
]
# Average by group
pipeline = [
{'$group' : {'_id' : '$department' , 'avgSalary' : {'$avg' : '$salary' }}}
]
# Top N
pipeline = [
{'$sort' : {'score' : - 1 }},
{'$limit' : 10 }
]
# Pagination
page = 2
page_size = 10
pipeline = [
{'$skip' : (page - 1 ) * page_size },
{'$limit' : page_size }
]
# Single field index
collection .create_index ('name' )
collection .create_index ([('name' , 1 )]) # 1 for ascending, -1 for descending
# Compound index
collection .create_index ([('name' , 1 ), ('age' , - 1 )])
# Unique index
collection .create_index ('email' , unique = True )
# Text index
collection .create_index ([('description' , 'text' )])
# TTL index (auto-delete after time)
collection .create_index ('createdAt' , expireAfterSeconds = 3600 )
# Partial index
collection .create_index (
[('email' , 1 )],
partialFilterExpression = {'age' : {'$gte' : 18 }}
)
indexes = collection .list_indexes ()
for index in indexes :
print (index )
collection .drop_index ('name_1' )
collection .drop_index ([('name' , 1 )])
collection .drop_indexes () # Drop all except _id
collection .index_information ()
from pymongo import InsertOne , DeleteMany , ReplaceOne , UpdateOne
operations = [
InsertOne ({'name' : 'username1' , 'age' : 25 }),
InsertOne ({'name' : 'username2' , 'age' : 30 }),
UpdateOne ({'name' : 'username3' }, {'$set' : {'age' : 28 }}),
DeleteMany ({'age' : {'$lt' : 20 }}),
ReplaceOne ({'name' : 'username4' }, {'name' : 'username5' , 'age' : 35 })
]
result = collection .bulk_write (operations )
print (result .inserted_count )
print (result .modified_count )
print (result .deleted_count )
# Ordered (stops on first error)
result = collection .bulk_write (operations , ordered = True )
# Unordered (continues on errors)
result = collection .bulk_write (operations , ordered = False )
from pymongo import MongoClient
client = MongoClient ('mongodb://localhost:27017/' )
db = client .mydb
with client .start_session () as session :
with session .start_transaction ():
collection1 = db .accounts
collection2 = db .logs
collection1 .update_one (
{'account' : 'A' },
{'$inc' : {'balance' : - 100 }},
session = session
)
collection1 .update_one (
{'account' : 'B' },
{'$inc' : {'balance' : 100 }},
session = session
)
collection2 .insert_one (
{'action' : 'transfer' , 'amount' : 100 },
session = session
)
Transaction with Error Handling
with client .start_session () as session :
try :
with session .start_transaction ():
# operations
collection .insert_one ({'name' : 'test' }, session = session )
# More operations...
except Exception as e :
print (f"Transaction aborted: { e } " )
session .abort_transaction ()
12. GridFS (File Storage)
from gridfs import GridFS
fs = GridFS (db )
# Store file
with open ('image.jpg' , 'rb' ) as f :
file_id = fs .put (f , filename = 'image.jpg' , content_type = 'image/jpeg' )
# Store with metadata
file_id = fs .put (
b'data' ,
filename = 'data.txt' ,
metadata = {'author' : 'username' , 'tags' : ['important' ]}
)
# Get file
file = fs .get (file_id )
data = file .read ()
# Get by filename
file = fs .find_one ({'filename' : 'image.jpg' })
# Save to disk
with open ('output.jpg' , 'wb' ) as f :
f .write (file .read ())
# List all files
for grid_file in fs .find ():
print (grid_file .filename , grid_file ._id )
# Delete file
fs .delete (file_id )
13. Change Streams (Watch for Changes)
# Watch all changes
with collection .watch () as stream :
for change in stream :
print (change )
# Watch with filter
pipeline = [{'$match' : {'operationType' : 'insert' }}]
with collection .watch (pipeline ) as stream :
for change in stream :
print (change )
with db .watch () as stream :
for change in stream :
print (change )
exists = collection .count_documents ({'name' : 'username' }, limit = 1 ) > 0
# or
exists = collection .find_one ({'name' : 'username' }) is not None
size = collection .count_documents ({})
# or
size = db .command ('collStats' , 'collection_name' )['count' ]
result = db .command ('validate' , 'collection_name' )
info = client .server_info ()
print (info ['version' ])
try :
client .admin .command ('ping' )
print ("Connected successfully" )
except Exception as e :
print (f"Connection failed: { e } " )
Pagination
def paginate (collection , page = 1 , per_page = 10 ):
skip = (page - 1 ) * per_page
cursor = collection .find ().skip (skip ).limit (per_page )
total = collection .count_documents ({})
return list (cursor ), total
# Create text index first
collection .create_index ([('description' , 'text' )])
# Search
results = collection .find ({'$text' : {'$search' : 'python mongodb' }})
result = collection .find_one_and_update (
{'_id' : 'counter' },
{'$inc' : {'sequence' : 1 }},
upsert = True ,
return_document = True
)
next_id = result ['sequence' ]
doc = collection .find_one ({'email' : 'user@example.com' })
if not doc :
doc = {'email' : 'user@example.com' , 'name' : 'User' }
collection .insert_one (doc )
from pymongo .errors import (
ConnectionFailure ,
DuplicateKeyError ,
BulkWriteError ,
ServerSelectionTimeoutError
)
try :
collection .insert_one ({'_id' : 1 , 'name' : 'username' })
except DuplicateKeyError :
print ("Document with this ID already exists" )
except ConnectionFailure :
print ("Failed to connect to MongoDB" )
except ServerSelectionTimeoutError :
print ("Server selection timeout" )
except Exception as e :
print (f"An error occurred: { e } " )
17. Configuration Options
client = MongoClient (
'mongodb://localhost:27017/' ,
maxPoolSize = 50 ,
minPoolSize = 10 ,
maxIdleTimeMS = 30000
)
client = MongoClient (
'mongodb://localhost:27017/' ,
serverSelectionTimeoutMS = 5000 ,
socketTimeoutMS = 10000 ,
connectTimeoutMS = 10000
)
from pymongo import WriteConcern
collection = db .get_collection (
'mycollection' ,
write_concern = WriteConcern (w = 'majority' , j = True , wtimeout = 5000 )
)
from pymongo import ReadPreference
collection = db .get_collection (
'mycollection' ,
read_preference = ReadPreference .SECONDARY
)
from datetime import datetime , timedelta
# Insert with datetime
collection .insert_one ({
'name' : 'username' ,
'createdAt' : datetime .utcnow ()
})
# Query by date
yesterday = datetime .utcnow () - timedelta (days = 1 )
collection .find ({'createdAt' : {'$gte' : yesterday }})
# Date range
start = datetime (2024 , 1 , 1 )
end = datetime (2024 , 12 , 31 )
collection .find ({
'createdAt' : {
'$gte' : start ,
'$lt' : end
}
})