Start of work to bring everything together into one program

This commit is contained in:
Bradley Bickford 2024-06-30 11:21:39 -04:00
parent 1d3782b31c
commit b132f3c28d
5 changed files with 216 additions and 2 deletions

10
config.json Normal file
View File

@ -0,0 +1,10 @@
{
"schedulingCalendarToObserve": "Experimental Calendar",
"streamsToMaintain": [
{
"name": "Steeple Cam Live Stream",
"description": "Live stream from the Steeple Cam of China Baptist Church",
"streamKeyName": "Steeple Cam Stream"
}
]
}

View File

@ -3,6 +3,7 @@ from __future__ import print_function
import datetime import datetime
import os.path import os.path
import argparse import argparse
import time
from google.auth.transport.requests import Request from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials from google.oauth2.credentials import Credentials
@ -57,7 +58,7 @@ def get_calendar_events(calendar_service, calendar_id, num_events):
calendarId=calendar_id, calendarId=calendar_id,
timeMin=now, timeMin=now,
maxResults=num_events, maxResults=num_events,
singleEvents=True, singleEvents=True,
orderBy='startTime' orderBy='startTime'
).execute() ).execute()

View File

@ -94,7 +94,17 @@ def main():
if len(source) == 0: if len(source) == 0:
print("The source name specified does not exist") print("The source name specified does not exist")
sys.exit(1) sys.exit(1)
all_categories = list_all_video_categories(youtube_service)
desired_category_id = [element["id"] for element in all_categories if element["name"] == args.category]
if len(desired_category_id) == 0:
print("The category name specified does not exist")
sys.exit(1)
desired_category_id = desired_category_id[0]
startTime = datetime.strptime(args.starttime, "%m-%d-%Y %H:%M") startTime = datetime.strptime(args.starttime, "%m-%d-%Y %H:%M")
actualStartTime = startTime - timedelta(hours=0, minutes=5) actualStartTime = startTime - timedelta(hours=0, minutes=5)
@ -128,6 +138,23 @@ def main():
).execute() ).execute()
print(json.dumps(response, indent=4)) print(json.dumps(response, indent=4))
print("----------------------------")
print("----------------------------")
print("----------------------------")
print("----------------------------")
response = youtube_service.videos().update(
part = "id,snippet",
body = {
"id": response["id"],
"snippet": {
"categoryId": desired_category_id,
"title": args.name
}
}
).execute()
print(json.dumps(response, indent=4))
if __name__ == '__main__': if __name__ == '__main__':
main() main()

36
reauthorize.py Normal file
View File

@ -0,0 +1,36 @@
from __future__ import print_function
import os.path
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
SCOPES = [
'https://www.googleapis.com/auth/calendar.readonly',
"https://www.googleapis.com/auth/youtube"
]
def authorize():
creds = None
if os.path.exists('token.json'):
creds = Credentials.from_authorized_user_file('token.json', SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
creds = flow.run_local_server(port=7384)
with open('token.json', 'w') as token:
token.write(creds.to_json())
return creds
def main():
creds = authorize()
if __name__ == '__main__':
main()

140
service_orchestrator.py Normal file
View File

@ -0,0 +1,140 @@
from __future__ import print_function
import datetime
import os.path
import argparse
import json
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
SCOPES = [
'https://www.googleapis.com/auth/calendar.readonly',
"https://www.googleapis.com/auth/youtube"
]
def authorize():
creds = None
if os.path.exists('token.json'):
creds = Credentials.from_authorized_user_file('token.json', SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
creds = flow.run_local_server(port=7384)
with open('token.json', 'w') as token:
token.write(creds.to_json())
return creds
def construct_calendar_service(creds):
return build('calendar', 'v3', credentials=creds)
def list_all_calendars(calendar_service):
page_token = None
list_of_entries = []
while True:
calendar_list = calendar_service.calendarList().list(pageToken=page_token).execute()
list_of_entries.extend(calendar_list['items'])
page_token = calendar_list.get('nextPageToken')
if not page_token:
break
return list_of_entries
def get_calendar_events(calendar_service, calendar_id, num_events):
now = datetime.datetime.utcnow().isoformat() + "Z"
events_result = calendar_service.events().list(
calendarId=calendar_id,
timeMin=now,
maxResults=num_events,
singleEvents=True,
orderBy='startTime'
).execute()
events = events_result.get('items', [])
return events
def construct_youtube_service(creds):
return build('youtube', 'v3', credentials=creds)
def list_all_stream_sources(youtube_service):
page_token = None
list_of_entries = []
while True:
stream_list = youtube_service.liveStreams().list(
part="snippet",
mine=True,
pageToken=page_token
).execute()
list_of_entries.extend(stream_list["items"])
page_token = stream_list.get("nextPageToken")
if not page_token:
break
return list_of_entries
def list_all_video_categories(youtube_service):
category_list = youtube_service.videoCategories().list(
part="snippet",
regionCode="US"
).execute()
return [
{
"name": element["snippet"]["title"],
"id": element["id"]
}
for element in category_list["items"] if element["snippet"]["assignable"]
]
def list_all_live_broadcasts(youtube_service, broadcastStatus):
page_token = None
list_of_entries = []
while True:
broadcast_list = youtube_service.liveBroadcasts().list(
part="snippet",
broadcastStatus=broadcastStatus,
pageToken=page_token
).execute()
list_of_entries.extend(broadcast_list["items"])
page_token = broadcast_list.get("nextPageToken")
if not page_token:
break
return list_of_entries
def main():
with open("config.json", "r") as config_file:
config_json = json.loads(config_file.read())
creds = authorize()
calendar_service = construct_calendar_service(creds)
youtube_service = construct_youtube_service(creds)