common.py
import os
import pickle
# Gmail API utils
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
# Request all access (permission to read/send/receive emails, manage the inbox, and more)
SCOPES = ['https://mail.google.com/']
our_email = 'our_email@gmail.com'
def gmail_authenticate():
creds = None
# the file token.pickle stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first time
if os.path.exists("token.pickle"):
with open("token.pickle", "rb") as token:
creds = pickle.load(token)
# if there are no (valid) credentials availablle, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
# save the credentials for the next run
with open("token.pickle", "wb") as token:
pickle.dump(creds, token)
return build('gmail', 'v1', credentials=creds)
def search_messages(service, query):
result = service.users().messages().list(userId='me',q=query).execute()
messages = [ ]
if 'messages' in result:
messages.extend(result['messages'])
while 'nextPageToken' in result:
page_token = result['nextPageToken']
result = service.users().messages().list(userId='me',q=query, pageToken=page_token).execute()
if 'messages' in result:
messages.extend(result['messages'])
return messages
send_emails.py
# for getting full paths to attachements
import os
# for encoding messages in base64
from base64 import urlsafe_b64encode
# for dealing with attachement MIME types
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.image import MIMEImage
from email.mime.audio import MIMEAudio
from email.mime.base import MIMEBase
from mimetypes import guess_type as guess_mime_type
from common import our_email, gmail_authenticate
# Adds the attachment with the given filename to the given message
def add_attachment(message, filename):
content_type, encoding = guess_mime_type(filename)
if content_type is None or encoding is not None:
content_type = 'application/octet-stream'
main_type, sub_type = content_type.split('/', 1)
if main_type == 'text':
fp = open(filename, 'rb')
msg = MIMEText(fp.read().decode(), _subtype=sub_type)
fp.close()
elif main_type == 'image':
fp = open(filename, 'rb')
msg = MIMEImage(fp.read(), _subtype=sub_type)
fp.close()
elif main_type == 'audio':
fp = open(filename, 'rb')
msg = MIMEAudio(fp.read(), _subtype=sub_type)
fp.close()
else:
fp = open(filename, 'rb')
msg = MIMEBase(main_type, sub_type)
msg.set_payload(fp.read())
fp.close()
filename = os.path.basename(filename)
msg.add_header('Content-Disposition', 'attachment', filename=filename)
message.attach(msg)
def build_message(destination, obj, body, attachments=[]):
if not attachments: # no attachments given
message = MIMEText(body)
message['to'] = destination
message['from'] = our_email
message['subject'] = obj
else:
message = MIMEMultipart()
message['to'] = destination
message['from'] = our_email
message['subject'] = obj
message.attach(MIMEText(body))
for filename in attachments:
add_attachment(message, filename)
return {'raw': urlsafe_b64encode(message.as_bytes()).decode()}
def send_message(service, destination, obj, body, attachments=[]):
return service.users().messages().send(
userId="me",
body=build_message(destination, obj, body, attachments)
).execute()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Email Sender using Gmail API")
parser.add_argument('destination', type=str, help='The destination email address')
parser.add_argument('subject', type=str, help='The subject of the email')
parser.add_argument('body', type=str, help='The body of the email')
parser.add_argument('-f', '--files', type=str, help='email attachments', nargs='+')
args = parser.parse_args()
service = gmail_authenticate()
send_message(service, args.destination, args.subject, args.body, args.files)
read_emails.py
import os
import sys
# for encoding/decoding messages in base64
from base64 import urlsafe_b64decode
from common import gmail_authenticate, search_messages
def get_size_format(b, factor=1024, suffix="B"):
"""
Scale bytes to its proper byte format
e.g:
1253656 => '1.20MB'
1253656678 => '1.17GB'
"""
for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]:
if b < factor:
return f"{b:.2f}{unit}{suffix}"
b /= factor
return f"{b:.2f}Y{suffix}"
def clean(text):
# clean text for creating a folder
return "".join(c if c.isalnum() else "_" for c in text)
def parse_parts(service, parts, folder_name, message):
"""
Utility function that parses the content of an email partition
"""
if parts:
for part in parts:
filename = part.get("filename")
mimeType = part.get("mimeType")
body = part.get("body")
data = body.get("data")
file_size = body.get("size")
part_headers = part.get("headers")
if part.get("parts"):
# recursively call this function when we see that a part
# has parts inside
parse_parts(service, part.get("parts"), folder_name, message)
if mimeType == "text/plain":
# if the email part is text plain
if data:
text = urlsafe_b64decode(data).decode()
print(text)
elif mimeType == "text/html":
# if the email part is an HTML content
# save the HTML file and optionally open it in the browser
if not filename:
filename = "index.html"
filepath = os.path.join(folder_name, filename)
print("Saving HTML to", filepath)
with open(filepath, "wb") as f:
f.write(urlsafe_b64decode(data))
else:
# attachment other than a plain text or HTML
for part_header in part_headers:
part_header_name = part_header.get("name")
part_header_value = part_header.get("value")
if part_header_name == "Content-Disposition":
if "attachment" in part_header_value:
# we get the attachment ID
# and make another request to get the attachment itself
print("Saving the file:", filename, "size:", get_size_format(file_size))
attachment_id = body.get("attachmentId")
attachment = service.users().messages() \
.attachments().get(id=attachment_id, userId='me', messageId=message['id']).execute()
data = attachment.get("data")
filepath = os.path.join(folder_name, filename)
if data:
with open(filepath, "wb") as f:
f.write(urlsafe_b64decode(data))
def read_message(service, message):
"""
This function takes Gmail API `service` and the given `message_id` and does the following:
- Downloads the content of the email
- Prints email basic information (To, From, Subject & Date) and plain/text parts
- Creates a folder for each email based on the subject
- Downloads text/html content (if available) and saves it under the folder created as index.html
- Downloads any file that is attached to the email and saves it in the folder created
"""
msg = service.users().messages().get(userId='me', id=message['id'], format='full').execute()
# parts can be the message body, or attachments
payload = msg['payload']
headers = payload.get("headers")
parts = payload.get("parts")
folder_name = "email"
has_subject = False
if headers:
# this section prints email basic info & creates a folder for the email
for header in headers:
name = header.get("name")
value = header.get("value")
if name.lower() == 'from':
# we print the From address
print("From:", value)
if name.lower() == "to":
# we print the To address
print("To:", value)
if name.lower() == "subject":
# make our boolean True, the email has "subject"
has_subject = True
# make a directory with the name of the subject
folder_name = clean(value)
# we will also handle emails with the same subject name
folder_counter = 0
while os.path.isdir(folder_name):
folder_counter += 1
# we have the same folder name, add a number next to it
if folder_name[-1].isdigit() and folder_name[-2] == "_":
folder_name = f"{folder_name[:-2]}_{folder_counter}"
elif folder_name[-2:].isdigit() and folder_name[-3] == "_":
folder_name = f"{folder_name[:-3]}_{folder_counter}"
else:
folder_name = f"{folder_name}_{folder_counter}"
os.mkdir(folder_name)
print("Subject:", value)
if name.lower() == "date":
# we print the date when the message was sent
print("Date:", value)
if not has_subject:
# if the email does not have a subject, then make a folder with "email" name
# since folders are created based on subjects
if not os.path.isdir(folder_name):
os.mkdir(folder_name)
parse_parts(service, parts, folder_name, message)
print("="*50)
if __name__ == "__main__":
service = gmail_authenticate()
# get emails that match the query you specify from the command lines
results = search_messages(service, sys.argv[1])
print(f"Found {len(results)} results.")
# for each email matched, read it (output plain/text to console & save HTML and attachments)
for msg in results:
read_message(service, msg)
mark_emails.py
from common import gmail_authenticate, search_messages
def mark_as_read(service, query):
messages_to_mark = search_messages(service, query)
print(f"Matched emails: {len(messages_to_mark)}")
return service.users().messages().batchModify(
userId='me',
body={
'ids': [ msg['id'] for msg in messages_to_mark ],
'removeLabelIds': ['UNREAD']
}
).execute()
def mark_as_unread(service, query):
messages_to_mark = search_messages(service, query)
print(f"Matched emails: {len(messages_to_mark)}")
return service.users().messages().batchModify(
userId='me',
body={
'ids': [ msg['id'] for msg in messages_to_mark ],
'addLabelIds': ['UNREAD']
}
).execute()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Marks a set of emails as read or unread")
parser.add_argument('query', help='a search query that selects emails to mark')
parser.add_argument("-r", "--read", action="store_true", help='Whether to mark the message as read')
parser.add_argument("-u", "--unread", action="store_true", help='Whether to mark the message as unread')
args = parser.parse_args()
service = gmail_authenticate()
if args.read:
mark_as_read(service, args.query)
elif args.unread:
mark_as_unread(service, args.query)
delete_emails.py
from common import gmail_authenticate, search_messages
def delete_messages(service, query):
messages_to_delete = search_messages(service, query)
print(f"Deleting {len(messages_to_delete)} emails.")
# it's possible to delete a single message with the delete API, like this:
# service.users().messages().delete(userId='me', id=msg['id'])
# but it's also possible to delete all the selected messages with one query, batchDelete
return service.users().messages().batchDelete(
userId='me',
body={
'ids': [ msg['id'] for msg in messages_to_delete]
}
).execute()
if __name__ == "__main__":
import sys
service = gmail_authenticate()
delete_messages(service, sys.argv[1])