utils.py
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
import urllib.parse as p
import re
import os
import pickle
SCOPES = ["https://www.googleapis.com/auth/youtube.force-ssl"]
def youtube_authenticate():
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
api_service_name = "youtube"
api_version = "v3"
client_secrets_file = "credentials.json"
creds = None
# the file token.pickle stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first time
if os.path.exists("token.pickle"):
with open("token.pickle", "rb") as token:
creds = pickle.load(token)
# if there are no (valid) credentials availablle, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(client_secrets_file, SCOPES)
creds = flow.run_local_server(port=0)
# save the credentials for the next run
with open("token.pickle", "wb") as token:
pickle.dump(creds, token)
return build(api_service_name, api_version, credentials=creds)
def get_channel_details(youtube, **kwargs):
return youtube.channels().list(
part="statistics,snippet,contentDetails",
**kwargs
).execute()
def search(youtube, **kwargs):
return youtube.search().list(
part="snippet",
**kwargs
).execute()
def get_video_details(youtube, **kwargs):
return youtube.videos().list(
part="snippet,contentDetails,statistics",
**kwargs
).execute()
def print_video_infos(video_response):
items = video_response.get("items")[0]
# get the snippet, statistics & content details from the video response
snippet = items["snippet"]
statistics = items["statistics"]
content_details = items["contentDetails"]
# get infos from the snippet
channel_title = snippet["channelTitle"]
title = snippet["title"]
description = snippet["description"]
publish_time = snippet["publishedAt"]
# get stats infos
comment_count = statistics["commentCount"]
like_count = statistics["likeCount"]
view_count = statistics["viewCount"]
# get duration from content details
duration = content_details["duration"]
# duration in the form of something like 'PT5H50M15S'
# parsing it to be something like '5:50:15'
parsed_duration = re.search(f"PT(\d+H)?(\d+M)?(\d+S)", duration).groups()
duration_str = ""
for d in parsed_duration:
if d:
duration_str += f"{d[:-1]}:"
duration_str = duration_str.strip(":")
print(f"""
Title: {title}
Description: {description}
Channel Title: {channel_title}
Publish time: {publish_time}
Duration: {duration_str}
Number of comments: {comment_count}
Number of likes: {like_count}
Number of views: {view_count}
""")
def parse_channel_url(url):
"""
This function takes channel `url` to check whether it includes a
channel ID, user ID or channel name
"""
path = p.urlparse(url).path
id = path.split("/")[-1]
if "/c/" in path:
return "c", id
elif "/channel/" in path:
return "channel", id
elif "/user/" in path:
return "user", id
def get_channel_id_by_url(youtube, url):
"""
Returns channel ID of a given `id` and `method`
- `method` (str): can be 'c', 'channel', 'user'
- `id` (str): if method is 'c', then `id` is display name
if method is 'channel', then it's channel id
if method is 'user', then it's username
"""
# parse the channel URL
method, id = parse_channel_url(url)
if method == "channel":
# if it's a channel ID, then just return it
return id
elif method == "user":
# if it's a user ID, make a request to get the channel ID
response = get_channel_details(youtube, forUsername=id)
items = response.get("items")
if items:
channel_id = items[0].get("id")
return channel_id
elif method == "c":
# if it's a channel name, search for the channel using the name
# may be inaccurate
response = search(youtube, q=id, maxResults=1)
items = response.get("items")
if items:
channel_id = items[0]["snippet"]["channelId"]
return channel_id
raise Exception(f"Cannot find ID:{id} with {method} method")
def get_video_id_by_url(url):
"""
Return the Video ID from the video `url`
"""
# split URL parts
parsed_url = p.urlparse(url)
# get the video ID by parsing the query of the URL
video_id = p.parse_qs(parsed_url.query).get("v")
if video_id:
return video_id[0]
else:
raise Exception(f"Wasn't able to parse video URL: {url}")
video_details.py
from utils import (
youtube_authenticate,
get_video_id_by_url,
get_video_details,
print_video_infos
)
if __name__ == "__main__":
# authenticate to YouTube API
youtube = youtube_authenticate()
video_url = "https://www.youtube.com/watch?v=jNQXAC9IVRw&ab_channel=jawed"
# parse video ID from URL
video_id = get_video_id_by_url(video_url)
# make API call to get video info
response = get_video_details(youtube, id=video_id)
# print extracted video infos
print_video_infos(response)
search_by_keyword.py
from utils import (
youtube_authenticate,
get_video_details,
print_video_infos,
search
)
if __name__ == "__main__":
# authenticate to YouTube API
youtube = youtube_authenticate()
# search for the query 'python' and retrieve 2 items only
response = search(youtube, q="python", maxResults=2)
items = response.get("items")
for item in items:
# get the video ID
video_id = item["id"]["videoId"]
# get the video details
video_response = get_video_details(youtube, id=video_id)
# print the video details
print_video_infos(video_response)
print("="*50)
channel_details.py
from utils import (
youtube_authenticate,
get_channel_id_by_url,
get_channel_details,
get_video_details,
print_video_infos
)
def get_channel_videos(youtube, **kwargs):
return youtube.search().list(
**kwargs
).execute()
if __name__ == "__main__":
# authenticate to YouTube API
youtube = youtube_authenticate()
channel_url = "https://www.youtube.com/channel/UC8butISFwT-Wl7EV0hUK0BQ"
# get the channel ID from the URL
channel_id = get_channel_id_by_url(youtube, channel_url)
# get the channel details
response = get_channel_details(youtube, id=channel_id)
# extract channel infos
snippet = response["items"][0]["snippet"]
statistics = response["items"][0]["statistics"]
channel_country = snippet["country"]
channel_description = snippet["description"]
channel_creation_date = snippet["publishedAt"]
channel_title = snippet["title"]
channel_subscriber_count = statistics["subscriberCount"]
channel_video_count = statistics["videoCount"]
channel_view_count = statistics["viewCount"]
print(f"""
Title: {channel_title}
Published At: {channel_creation_date}
Description: {channel_description}
Country: {channel_country}
Number of videos: {channel_video_count}
Number of subscribers: {channel_subscriber_count}
Total views: {channel_view_count}
""")
# the following is grabbing channel videos
# number of pages you want to get
n_pages = 2
# counting number of videos grabbed
n_videos = 0
next_page_token = None
for i in range(n_pages):
params = {
'part': 'snippet',
'q': '',
'channelId': channel_id,
'type': 'video',
}
if next_page_token:
params['pageToken'] = next_page_token
res = get_channel_videos(youtube, **params)
channel_videos = res.get("items")
for video in channel_videos:
n_videos += 1
video_id = video["id"]["videoId"]
# easily construct video URL by its ID
video_url = f"https://www.youtube.com/watch?v={video_id}"
video_response = get_video_details(youtube, id=video_id)
print(f"================Video #{n_videos}================")
# print the video details
print_video_infos(video_response)
print(f"Video URL: {video_url}")
print("="*40)
# if there is a next page, then add it to our parameters
# to proceed to the next page
if "nextPageToken" in res:
next_page_token = res["nextPageToken"]
comments.py
from utils import youtube_authenticate, get_video_id_by_url, get_channel_id_by_url
def get_comments(youtube, **kwargs):
return youtube.commentThreads().list(
part="snippet",
**kwargs
).execute()
if __name__ == "__main__":
# authenticate to YouTube API
youtube = youtube_authenticate()
# URL can be a channel or a video, to extract comments
url = "https://www.youtube.com/watch?v=jNQXAC9IVRw&ab_channel=jawed"
if "watch" in url:
# that's a video
video_id = get_video_id_by_url(url)
params = {
'videoId': video_id,
'maxResults': 2,
'order': 'relevance', # default is 'time' (newest)
}
else:
# should be a channel
channel_id = get_channel_id_by_url(url)
params = {
'allThreadsRelatedToChannelId': channel_id,
'maxResults': 2,
'order': 'relevance', # default is 'time' (newest)
}
# get the first 2 pages (2 API requests)
n_pages = 2
for i in range(n_pages):
# make API call to get all comments from the channel (including posts & videos)
response = get_comments(youtube, **params)
items = response.get("items")
# if items is empty, breakout of the loop
if not items:
break
for item in items:
comment = item["snippet"]["topLevelComment"]["snippet"]["textDisplay"]
updated_at = item["snippet"]["topLevelComment"]["snippet"]["updatedAt"]
like_count = item["snippet"]["topLevelComment"]["snippet"]["likeCount"]
comment_id = item["snippet"]["topLevelComment"]["id"]
print(f"""\
Comment: {comment}
Likes: {like_count}
Updated At: {updated_at}
==================================\
""")
if "nextPageToken" in response:
# if there is a next page
# add next page token to the params we pass to the function
params["pageToken"] = response["nextPageToken"]
else:
# must be end of comments!!!!
break
print("*"*70)
You can also explore the code in this notebook.