K

Create a custom LlamaIndex loader for Google Calendar

Create a custom loader for LlamaIndex to pull and process more data from Google Calendar's API.

Posted: May 16, 2023
Find more posts about:

The published GoogleCalendarReader only accesses events from the ‘primary’ calendar; that is, the calendar of the user whose credentials are used.

We need the calendar events from any users that we specify, so I’ve written a new loader that allows for that.

"""Google Calendar data loader. Pulls in events from the Google Calendar API for specified users."""

import datetime
import os
from typing import Any, List, Optional, Union

from llama_index.readers.base import BaseReader
from llama_index.readers.schema.base import Document

SCOPES = ["https://www.googleapis.com/auth/calendar.readonly"]


class GoogleCalendarReader(BaseReader):
    """Google Calendar data loader.

    Pulls in events from the Google Calendar API for specified users.

    """

    def load_data(
        self,
        users: Optional[List[str]] = ["primary"],
        number_of_results: Optional[int] = 100,
        start_date: Optional[Union[str, datetime.date]] = None,
        end_date: Optional[Union[str, datetime.date]] = None,
    ) -> List[Document]:
        """Load data from users' calendars.

        Args:
            users (Optional[List[str]]): a list of the users' email addresses to load data for. Defaults to 'primary' (own calendar)
            number_of_results (Optional[int]): the number of events to return. Defaults to 100.
            start_date (Optional[Union[str, datetime.date]]): the start date to return events from. Defaults to today.
            end_date (Optional[Union[str, datetime.date]]): the end date to return events from. Defaults to 7 days from start_date.
        """

        from googleapiclient.discovery import build

        credentials = self._get_credentials()
        service = build("calendar", "v3", credentials=credentials)

        if start_date is None:
            start_date = datetime.date.today()
        elif isinstance(start_date, str):
            start_date = datetime.date.fromisoformat(start_date)

        if end_date is None:
            end_date = start_date + datetime.timedelta(days=7)
        elif isinstance(end_date, str):
            end_date = datetime.date.fromisoformat(end_date)

        start_datetime = datetime.datetime.combine(start_date, datetime.time.min)
        start_datetime_utc = start_datetime.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
        end_datetime = datetime.datetime.combine(end_date, datetime.time.max)
        end_datetime_utc = end_datetime.strftime("%Y-%m-%dT%H:%M:%S.%fZ")

        events = []
        for user in users:
            events_result = (
                service.events()
                .list(
                    calendarId=user,
                    timeMin=start_datetime_utc,
                    timeMax=end_datetime_utc,
                    maxResults=number_of_results,
                    singleEvents=True,
                    orderBy="startTime",
                )
                .execute()
            )
            events += events_result.get("items", [])

        if not events:
            print("No upcoming events found.")
            return []

        results = []

        for event in events:
            if "dateTime" in event["start"]:
                start = event["start"]["dateTime"]
            else:
                start = event["start"]["date"]

            if "dateTime" in event["end"]:
                end = event["end"]["dateTime"]
            else:
                end = event["end"]["date"]

            title = event.get("title", None)
            summary = event.get("summary", None)
            event_string = f"Status: {event['status']}, "
            if title is not None:
                event_string += f"Title: {title}, "
            if summary is not None:
                event_string += f"Summary: {summary}, "
            event_string += f"Start time: {start}, "
            event_string += f"End time: {end}, "

            organizer = event.get("organizer", {})
            display_name = organizer.get("displayName", "N/A")
            email = organizer.get("email", "N/A")
            if display_name == "N/A":
                event_string += f"Organizer: {email}"
            else:
                event_string += f"Organizer: {display_name} ({email})"

            results.append(Document(event_string, event["id"]))

        return results

    def _get_credentials(self) -> Any:
        """Get valid user credentials from storage.

        The file token.json stores the user's access and refresh tokens, and is
        created automatically when the authorization flow completes for the first
        time.

        Returns:
            Credentials, the obtained credential.
        """
        from google.auth.transport.requests import Request
        from google.oauth2.credentials import Credentials
        from google_auth_oauthlib.flow import InstalledAppFlow

        creds = None
        if os.path.exists("token.json"):
            creds = Credentials.from_authorized_user_file("token.json", SCOPES)
        # If there are no (valid) credentials available, let the user log in.
        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_secrets_file(
                    "credentials.json", SCOPES
                )
                creds = flow.run_local_server(port=0)
            # Save the credentials for the next run
            with open("token.json", "w") as token:
                token.write(creds.to_json())

        return creds


if __name__ == "__main__":
    reader = GoogleCalendarReader()
    print(reader.load_data())