from __future__ import print_function from datetime import datetime, timedelta import pytz import os.path import json import argparse import sys from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build from googleapiclient.errors import HttpError from pprint import pprint SCOPES = ["https://www.googleapis.com/auth/youtube"] def authorize(): creds = None if os.path.exists('token.json'): creds = Credentials.from_authorized_user_file('token.json', SCOPES) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES) creds = flow.run_local_server(port=7384) with open('token.json', 'w') as token: token.write(creds.to_json()) return creds def construct_youtube_service(creds): return build('youtube', 'v3', credentials=creds) def list_all_stream_sources(youtube_service): page_token = None list_of_entries = [] while True: stream_list = youtube_service.liveStreams().list( part="snippet", mine=True, pageToken=page_token ).execute() list_of_entries.extend(stream_list["items"]) page_token = stream_list.get("nextPageToken") if not page_token: break return list_of_entries def list_all_video_categories(youtube_service): category_list = youtube_service.videoCategories().list( part="snippet", regionCode="US" ).execute() return [ { "name": element["snippet"]["title"], "id": element["id"] } for element in category_list["items"] if element["snippet"]["assignable"] ] def main(): argument_parser = argparse.ArgumentParser(description="Insert a new live API broadcast") argument_parser.add_argument("name", help="The name of the Broadcast to create") argument_parser.add_argument("desc", help="The description of the Broadcast") argument_parser.add_argument("stream", help="The name of the stream to use for the Broadcast") argument_parser.add_argument("starttime", help="A MM-DD-YYYY HH:mm formatted date for the start time") argument_parser.add_argument("endtime", help="A MM-DD-YYYY HH:mm formatted date for the end time") argument_parser.add_argument("privacysetting", help="public, unlisted, or private to indicate who can see the coming stream") argument_parser.add_argument("category", help="The category you want to apply to the stream") argument_parser.add_argument("-a", "--autostartstop", help="Should the stream automatically start and stop", action="store_true") argument_parser.add_argument("-t", "--timezone", help="Set the timezone that the script should use for building broadcasts", default="America/New_York") args = argument_parser.parse_args() creds = authorize() youtube_service = construct_youtube_service(creds) all_sources = list_all_stream_sources(youtube_service) source = [element for element in all_sources if element["snippet"]["title"] == args.stream] if len(source) == 0: print("The source name specified does not exist") sys.exit(1) startTime = datetime.strptime(args.starttime, "%m-%d-%Y %H:%M") actualStartTime = startTime - timedelta(hours=0, minutes=5) endTime = datetime.strptime(args.endtime, "%m-%d-%Y %H:%M") actualEndTime = endTime + timedelta(hours=0, minutes=5) mytz = pytz.timezone(args.timezone) response = youtube_service.liveBroadcasts().insert( part = "id,snippet,contentDetails,status", body = { "snippet": { "title": args.name, "description": args.desc, "scheduledStartTime": mytz.localize(startTime).isoformat(), "actualStartTime": mytz.localize(actualStartTime).isoformat(), "scheduledEndTime": mytz.localize(endTime).isoformat(), "actualEndTime": mytz.localize(actualEndTime).isoformat() }, "contentDetails": { "enableAutoStart": args.autostartstop, "enableAutoStop": args.autostartstop, "enableDvr": True, "boundStreamId": source[0]["id"] }, "status": { "privacyStatus": args.privacysetting, "selfDeclaredMadeForKids": False } } ).execute() print(json.dumps(response, indent=4)) if __name__ == '__main__': main()