This commit is contained in:
2025-11-21 22:43:31 +01:00
72 changed files with 1544 additions and 546 deletions

View File

@@ -1,20 +1,20 @@
"""
Inkycal opwenweather API abstraction
- retrieves free weather data from OWM 2.5 API endpoints (given provided API key)
- handles unit, language and timezone conversions
- provides ready-to-use current weather, hourly and daily forecasts
Inkycal OpenWeatherMap API abstraction module
- Retrieves free weather data from OWM 2.5/3.0 API endpoints (with provided API key)
- Handles temperature and wind unit conversions
- Converts data to a standardized timezone and language
- Returns ready-to-use weather structures for current, hourly, and daily forecasts
"""
import json
import logging
from datetime import datetime
from datetime import timedelta
from typing import Dict
from typing import List
from typing import Literal
from datetime import datetime, timedelta
from typing import Dict, List, Literal
import requests
from dateutil import tz
# Type annotations for strict typing
TEMP_UNITS = Literal["celsius", "fahrenheit"]
WIND_UNITS = Literal["meters_sec", "km_hour", "miles_hour", "knots", "beaufort"]
WEATHER_TYPE = Literal["current", "forecast"]
@@ -27,15 +27,16 @@ logger.setLevel(level=logging.INFO)
def is_timestamp_within_range(timestamp: datetime, start_time: datetime, end_time: datetime) -> bool:
# Check if the timestamp is within the range
"""Check if the given timestamp lies between start_time and end_time."""
return start_time <= timestamp <= end_time
def get_json_from_url(request_url):
"""Performs an HTTP GET request and returns the parsed JSON response."""
response = requests.get(request_url)
if not response.ok:
raise AssertionError(
f"Failure getting the current weather: code {response.status_code}. Reason: {response.text}"
f"Failure getting weather: code {response.status_code}. Reason: {response.text}"
)
return json.loads(response.text)
@@ -44,280 +45,162 @@ class OpenWeatherMap:
def __init__(self, api_key: str, city_id: int = None, lat: float = None, lon: float = None,
api_version: API_VERSIONS = "2.5", temp_unit: TEMP_UNITS = "celsius",
wind_unit: WIND_UNITS = "meters_sec", language: str = "en", tz_name: str = "UTC") -> None:
"""
Initializes the OWM wrapper with localization settings.
Chooses API version, units, location and timezone preferences.
"""
self.api_key = api_key
self.temp_unit = temp_unit
self.wind_unit = wind_unit
self.language = language
self._api_version = api_version
if self._api_version == "3.0":
assert type(lat) is float and type(lon) is float
self.location_substring = (
f"lat={str(lat)}&lon={str(lon)}" if (lat is not None and lon is not None) else f"id={str(city_id)}"
)
self.tz_zone = tz.gettz(tz_name)
logger.info(
f"OWM wrapper initialized for API version {self._api_version}, language {self.language} and timezone {tz_name}."
if self._api_version == "3.0":
assert isinstance(lat, float) and isinstance(lon, float)
self.location_substring = (
f"lat={lat}&lon={lon}" if (lat and lon) else f"id={city_id}"
)
self.tz_zone = tz.gettz(tz_name)
logger.info(f"OWM wrapper initialized with API v{self._api_version}, lang={language}, tz={tz_name}.")
def get_weather_data_from_owm(self, weather: WEATHER_TYPE):
# Gets current weather or forecast from the configured OWM API.
"""Gets either current or forecast weather data."""
if weather == "current":
# Gets current weather status from the 2.5 API: https://openweathermap.org/current
# This is primarily using the 2.5 API since the 3.0 API actually has less info
weather_url = f"{API_BASE_URL}/2.5/weather?{self.location_substring}&appid={self.api_key}&units=Metric&lang={self.language}"
weather_data = get_json_from_url(weather_url)
# Only if we do have a 3.0 API-enabled key, we can also get the UVI reading from that endpoint: https://openweathermap.org/api/one-call-3
url = f"{API_BASE_URL}/2.5/weather?{self.location_substring}&appid={self.api_key}&units=Metric&lang={self.language}"
data = get_json_from_url(url)
if self._api_version == "3.0":
weather_url = f"{API_BASE_URL}/3.0/onecall?{self.location_substring}&appid={self.api_key}&exclude=minutely,hourly,daily&units=Metric&lang={self.language}"
weather_data["uvi"] = get_json_from_url(weather_url)["current"]["uvi"]
uvi_url = f"{API_BASE_URL}/3.0/onecall?{self.location_substring}&appid={self.api_key}&exclude=minutely,hourly,daily&units=Metric&lang={self.language}"
data["uvi"] = get_json_from_url(uvi_url)["current"].get("uvi")
elif weather == "forecast":
# Gets weather forecasts from the 2.5 API: https://openweathermap.org/forecast5
# This is only using the 2.5 API since the 3.0 API actually has less info
weather_url = f"{API_BASE_URL}/2.5/forecast?{self.location_substring}&appid={self.api_key}&units=Metric&lang={self.language}"
weather_data = get_json_from_url(weather_url)["list"]
return weather_data
url = f"{API_BASE_URL}/2.5/forecast?{self.location_substring}&appid={self.api_key}&units=Metric&lang={self.language}"
data = get_json_from_url(url)["list"]
return data
def get_current_weather(self) -> Dict:
"""
Decodes the OWM current weather data for our purposes
:return:
Current weather as dictionary
Fetches and processes current weather data.
Includes gust fallback and unit conversions.
"""
data = self.get_weather_data_from_owm("current")
wind_data = data.get("wind", {})
base_speed = wind_data.get("speed", 0.0)
gust_speed = wind_data.get("gust", base_speed)
converted_gust = self.get_converted_windspeed(gust_speed)
current_data = self.get_weather_data_from_owm(weather="current")
weather = {
"detailed_status": data["weather"][0]["description"],
"weather_icon_name": data["weather"][0]["icon"],
"temp": self.get_converted_temperature(data["main"]["temp"]),
"temp_feels_like": self.get_converted_temperature(data["main"]["feels_like"]),
"min_temp": self.get_converted_temperature(data["main"]["temp_min"]),
"max_temp": self.get_converted_temperature(data["main"]["temp_max"]),
"humidity": data["main"]["humidity"],
"wind": converted_gust,
"wind_gust": converted_gust,
"uvi": data.get("uvi"),
"sunrise": datetime.fromtimestamp(data["sys"]["sunrise"], tz=self.tz_zone),
"sunset": datetime.fromtimestamp(data["sys"]["sunset"], tz=self.tz_zone),
}
current_weather = {}
current_weather["detailed_status"] = current_data["weather"][0]["description"]
current_weather["weather_icon_name"] = current_data["weather"][0]["icon"]
current_weather["temp"] = self.get_converted_temperature(
current_data["main"]["temp"]
) # OWM Unit Default: Kelvin, Metric: Celsius, Imperial: Fahrenheit
current_weather["temp_feels_like"] = self.get_converted_temperature(current_data["main"]["feels_like"])
current_weather["min_temp"] = self.get_converted_temperature(current_data["main"]["temp_min"])
current_weather["max_temp"] = self.get_converted_temperature(current_data["main"]["temp_max"])
current_weather["humidity"] = current_data["main"]["humidity"] # OWM Unit: % rH
current_weather["wind"] = self.get_converted_windspeed(
current_data["wind"]["speed"]
) # OWM Unit Default: meter/sec, Metric: meter/sec
if "gust" in current_data["wind"]:
current_weather["wind_gust"] = self.get_converted_windspeed(current_data["wind"]["gust"])
else:
logger.info(
f"OpenWeatherMap response did not contain a wind gust speed. Using base wind: {current_weather['wind']} m/s."
)
current_weather["wind_gust"] = current_weather["wind"]
if "uvi" in current_data: # this is only supported in v3.0 API
current_weather["uvi"] = current_data["uvi"]
else:
current_weather["uvi"] = None
current_weather["sunrise"] = datetime.fromtimestamp(
current_data["sys"]["sunrise"], tz=self.tz_zone
) # unix timestamp -> to our timezone
current_weather["sunset"] = datetime.fromtimestamp(current_data["sys"]["sunset"], tz=self.tz_zone)
self.current_weather = current_weather
return current_weather
return weather
def get_weather_forecast(self) -> List[Dict]:
"""
Decodes the OWM weather forecast for our purposes
What you get is a list of 40 forecasts for 3-hour time slices, totaling to 5 days.
:return:
Forecasts data dictionary
Parses OWM 5-day / 3-hour forecast into a list of hourly dictionaries.
"""
#
forecast_data = self.get_weather_data_from_owm(weather="forecast")
forecasts = self.get_weather_data_from_owm("forecast")
hourly = []
# Add forecast data to hourly_data_dict list of dictionaries
hourly_forecasts = []
for forecast in forecast_data:
# calculate combined precipitation (snow + rain)
precip_mm = 0.0
if "rain" in forecast.keys():
precip_mm = +forecast["rain"]["3h"] # OWM Unit: mm
if "snow" in forecast.keys():
precip_mm = +forecast["snow"]["3h"] # OWM Unit: mm
hourly_forecasts.append(
{
"temp": self.get_converted_temperature(
forecast["main"]["temp"]
), # OWM Unit Default: Kelvin, Metric: Celsius, Imperial: Fahrenheit
"min_temp": self.get_converted_temperature(forecast["main"]["temp_min"]),
"max_temp": self.get_converted_temperature(forecast["main"]["temp_max"]),
"precip_3h_mm": precip_mm,
"wind": self.get_converted_windspeed(
forecast["wind"]["speed"]
), # OWM Unit Default: meter/sec, Metric: meter/sec, Imperial: miles/hour
"wind_gust": self.get_converted_windspeed(forecast["wind"]["gust"]),
"pressure": forecast["main"]["pressure"], # OWM Unit: hPa
"humidity": forecast["main"]["humidity"], # OWM Unit: % rH
"precip_probability": forecast["pop"]
* 100.0, # OWM value is unitless, directly converting to % scale
"icon": forecast["weather"][0]["icon"],
"datetime": datetime.fromtimestamp(forecast["dt"], tz=self.tz_zone),
}
)
logger.debug(
f"Added rain forecast at {datetime.fromtimestamp(forecast['dt'], tz=self.tz_zone)}: {precip_mm}"
)
for f in forecasts:
rain = f.get("rain", {}).get("3h", 0.0)
snow = f.get("snow", {}).get("3h", 0.0)
precip_mm = rain + snow
self.hourly_forecasts = hourly_forecasts
hourly.append({
"temp": self.get_converted_temperature(f["main"]["temp"]),
"min_temp": self.get_converted_temperature(f["main"]["temp_min"]),
"max_temp": self.get_converted_temperature(f["main"]["temp_max"]),
"precip_3h_mm": precip_mm,
"wind": self.get_converted_windspeed(f["wind"]["speed"]),
"wind_gust": self.get_converted_windspeed(f["wind"].get("gust", f["wind"]["speed"])),
"pressure": f["main"]["pressure"],
"humidity": f["main"]["humidity"],
"precip_probability": f.get("pop", 0.0) * 100.0,
"icon": f["weather"][0]["icon"],
"datetime": datetime.fromtimestamp(f["dt"], tz=self.tz_zone),
})
return self.hourly_forecasts
return hourly
def get_forecast_for_day(self, days_from_today: int) -> Dict:
"""
Get temperature range, rain and most frequent icon code
for the day that is days_from_today away.
"Today" is based on our local system timezone.
:param days_from_today:
should be int from 0-4: e.g. 2 -> 2 days from today
:return:
Forecast dictionary
Aggregates hourly data into daily summary with min/max temp, precip and icon.
"""
# Make sure hourly forecasts are up-to-date
_ = self.get_weather_forecast()
forecasts = self.get_weather_forecast()
now = datetime.now(tz=self.tz_zone)
start = now.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=days_from_today)
end = start + timedelta(days=1)
# Calculate the start and end times for the specified number of days from now
current_time = datetime.now(tz=self.tz_zone)
start_time = (
(current_time + timedelta(days=days_from_today))
.replace(hour=0, minute=0, second=0, microsecond=0)
.astimezone(tz=self.tz_zone)
)
end_time = (start_time + timedelta(days=1)).astimezone(tz=self.tz_zone)
daily = [f for f in forecasts if start <= f["datetime"] < end]
if not daily:
daily.append(forecasts[0]) # fallback to first forecast
# Get all the forecasts for that day's time range
forecasts = [
f
for f in self.hourly_forecasts
if is_timestamp_within_range(timestamp=f["datetime"], start_time=start_time, end_time=end_time)
]
temps = [f["temp"] for f in daily]
rain = sum(f["precip_3h_mm"] for f in daily)
icons = [f["icon"] for f in daily if f["icon"]]
icon = max(set(icons), key=icons.count)
# In case the next available forecast is already for the next day, use that one for the less than 3 remaining hours of today
if not forecasts:
forecasts.append(self.hourly_forecasts[0])
# Get rain and temperatures for that day
temps = [f["temp"] for f in forecasts]
rain = sum([f["precip_3h_mm"] for f in forecasts])
# Get all weather icon codes for this day
icons = [f["icon"] for f in forecasts]
day_icons = [icon for icon in icons if "d" in icon]
# Use the day icons if possible
icon = max(set(day_icons), key=icons.count) if len(day_icons) > 0 else max(set(icons), key=icons.count)
# Return a dict with that day's data
day_data = {
"datetime": start_time,
return {
"datetime": start,
"icon": icon,
"temp_min": min(temps),
"temp_max": max(temps),
"precip_mm": rain,
"precip_mm": rain
}
return day_data
def get_converted_temperature(self, value: float) -> float:
if self.temp_unit == "fahrenheit":
value = self.celsius_to_fahrenheit(value)
return value
return self.celsius_to_fahrenheit(value) if self.temp_unit == "fahrenheit" else value
def get_converted_windspeed(self, value: float) -> float:
Literal["meters_sec", "km_hour", "miles_hour", "knots", "beaufort"]
if self.wind_unit == "km_hour":
value = self.celsius_to_fahrenheit(value)
elif self.wind_unit == "km_hour":
value = self.mps_to_kph(value)
elif self.wind_unit == "miles_hour":
value = self.mps_to_mph(value)
elif self.wind_unit == "knots":
value = self.mps_to_knots(value)
elif self.wind_unit == "beaufort":
value = self.mps_to_beaufort(value)
return value
return self.mps_to_kph(value)
if self.wind_unit == "miles_hour":
return self.mps_to_mph(value)
if self.wind_unit == "knots":
return self.mps_to_knots(value)
if self.wind_unit == "beaufort":
return self.mps_to_beaufort(value)
return value # default is meters/sec
@staticmethod
def mps_to_beaufort(meters_per_second: float) -> int:
"""Map meters per second to the beaufort scale.
Args:
meters_per_second:
float representing meters per seconds
Returns:
an integer of the beaufort scale mapping the input
"""
def mps_to_beaufort(mps: float) -> int:
thresholds = [0.3, 1.6, 3.4, 5.5, 8.0, 10.8, 13.9, 17.2, 20.8, 24.5, 28.5, 32.7]
return next((i for i, threshold in enumerate(thresholds) if meters_per_second < threshold), 12)
return next((i for i, t in enumerate(thresholds) if mps < t), 12)
@staticmethod
def mps_to_mph(meters_per_second: float) -> float:
"""Map meters per second to miles per hour
Args:
meters_per_second:
float representing meters per seconds.
Returns:
float representing the input value in miles per hour.
"""
# 1 m/s is approximately equal to 2.23694 mph
miles_per_hour = meters_per_second * 2.23694
return miles_per_hour
def mps_to_mph(mps: float) -> float:
return mps * 2.23694
@staticmethod
def mps_to_kph(meters_per_second: float) -> float:
"""Map meters per second to kilometers per hour
Args:
meters_per_second:
float representing meters per seconds.
Returns:
float representing the input value in kilometers per hour.
"""
# 1 m/s is equal to 3.6 km/h
kph = meters_per_second * 3.6
return kph
def mps_to_kph(mps: float) -> float:
return mps * 3.6
@staticmethod
def mps_to_knots(meters_per_second: float) -> float:
"""Map meters per second to knots (nautical miles per hour)
Args:
meters_per_second:
float representing meters per seconds.
Returns:
float representing the input value in knots.
"""
# 1 m/s is equal to 1.94384 knots
knots = meters_per_second * 1.94384
return knots
def mps_to_knots(mps: float) -> float:
return mps * 1.94384
@staticmethod
def celsius_to_fahrenheit(celsius: int or float) -> float:
"""Converts the given temperate from degrees Celsius to Fahrenheit."""
fahrenheit = (float(celsius) * 9.0 / 5.0) + 32.0
return fahrenheit
def celsius_to_fahrenheit(c: float) -> float:
return c * 9.0 / 5.0 + 32.0
def main():
"""Main function, only used for testing purposes"""
# Simple test entry point
key = ""
city = 2643743
lang = "de"
owm = OpenWeatherMap(api_key=key, city_id=city, language=lang, tz="Europe/Berlin")
current_weather = owm.get_current_weather()
print(current_weather)
_ = owm.get_weather_forecast()
city = 2643743 # London
owm = OpenWeatherMap(api_key=key, city_id=city, language="de", tz_name="Europe/Berlin")
print(owm.get_current_weather())
print(owm.get_forecast_for_day(days_from_today=2))

View File

@@ -165,7 +165,12 @@ class Inkycal:
self.pisugar = PiSugar()
self.battery_capacity = self.pisugar.get_battery()
logger.info(f"PiSugar battery capacity: {self.battery_capacity}%")
if not self.battery_capacity:
logger.warning("[PISUGAR] Could not get battery capacity! Is the board off? Setting battery capacity to 0%")
self.battery_capacity = 100
else:
logger.info(f"PiSugar battery capacity: {self.battery_capacity}%")
if self.battery_capacity < 20:
logger.warning("Battery capacity is below 20%!")
@@ -342,8 +347,12 @@ class Inkycal:
logger.info("All images generated successfully!")
del errors
if self.battery_capacity < 20:
self.info += "Low battery! "
if self.use_pi_sugar:
self.battery_capacity = self.pisugar.get_battery() or 0
if self.battery_capacity < 20:
self.info += f"Low battery! ({self.battery_capacity})% "
else:
self.info += f"Battery: {self.battery_capacity}% "
# Assemble image from each module - add info section if specified
self._assemble()

7
inkycal/modules/inky_image.py Executable file → Normal file
View File

@@ -169,6 +169,13 @@ class Inkyimage:
logger.error("no height of width specified")
return
current_width, current_height = self.image.size
# Skip if dimensions are the same
if width == current_width and height == current_height:
logger.info(f"Image already correct size ({width}x{height}), skipping resize")
return
image = self.image
if width:

View File

@@ -114,7 +114,7 @@ class Feeds(inkycal_module):
# if "description" in posts:
if parsed_feeds:
parsed_feeds = [i.split("\n") for i in parsed_feeds][0]
parsed_feeds = [i.split("\n") for i in parsed_feeds]
parsed_feeds = [i for i in parsed_feeds if i]
# Shuffle the list to prevent showing the same content
@@ -129,7 +129,7 @@ class Feeds(inkycal_module):
filtered_feeds, counter = [], 0
for posts in parsed_feeds:
wrapped = text_wrap(posts, font=self.font, max_width=line_width)
wrapped = text_wrap(posts[0], font=self.font, max_width=line_width)
counter += len(wrapped)
if counter < max_lines:
filtered_feeds.append(wrapped)

View File

@@ -78,13 +78,17 @@ class TextToDisplay(inkycal_module):
with open(self.filepath, 'r') as file:
file_content = file.read()
fitted_content = text_wrap(file_content, font=self.font, max_width=im_width)
# Split content by lines if not making a request
if not self.make_request:
lines = file_content.split('\n')
else:
lines = text_wrap(file_content, font=self.font, max_width=im_width)
# Trim down the list to the max number of lines
del fitted_content[max_lines:]
del lines[max_lines:]
# Write feeds on image
for index, line in enumerate(fitted_content):
for index, line in enumerate(lines):
write(
im_black,
line_positions[index],

View File

@@ -3,11 +3,16 @@ Inkycal Todoist Module
Copyright by aceinnolab
"""
import arrow
import json
import os
import time
from datetime import datetime
from inkycal.modules.template import inkycal_module
from inkycal.custom import *
from todoist_api_python.api import TodoistAPI
import requests.exceptions
logger = logging.getLogger(__name__)
@@ -29,6 +34,10 @@ class Todoist(inkycal_module):
'project_filter': {
"label": "Show Todos only from following project (separated by a comma). Leave empty to show " +
"todos from all projects",
},
'show_priority': {
"label": "Show priority indicators for tasks (P1, P2, P3)",
"default": True
}
}
@@ -53,8 +62,15 @@ class Todoist(inkycal_module):
else:
self.project_filter = config['project_filter']
# Priority display option
self.show_priority = config.get('show_priority', True)
self._api = TodoistAPI(config['api_key'])
# Cache file path for storing last successful response
self.cache_file = os.path.join(os.path.dirname(__file__), '..', '..', 'temp', 'todoist_cache.json')
os.makedirs(os.path.dirname(self.cache_file), exist_ok=True)
# give an OK message
logger.debug(f'{__name__} loaded')
@@ -63,6 +79,93 @@ class Todoist(inkycal_module):
if not isinstance(self.api_key, str):
print('api_key has to be a string: "Yourtopsecretkey123" ')
def _fetch_with_retry(self, fetch_func, max_retries=3):
"""Fetch data with retry logic and exponential backoff"""
for attempt in range(max_retries):
try:
return fetch_func()
except requests.exceptions.HTTPError as e:
if e.response.status_code in [502, 503, 504]: # Retry on server errors
if attempt < max_retries - 1:
delay = (2 ** attempt) # Exponential backoff: 1s, 2s, 4s
logger.warning(f"API request failed (attempt {attempt + 1}/{max_retries}), retrying in {delay}s...")
time.sleep(delay)
continue
raise
except requests.exceptions.ConnectionError:
if attempt < max_retries - 1:
delay = (2 ** attempt)
logger.warning(f"Connection error (attempt {attempt + 1}/{max_retries}), retrying in {delay}s...")
time.sleep(delay)
continue
raise
raise Exception("Max retries exceeded")
def _save_cache(self, projects, tasks):
"""Save API response to cache file"""
try:
cache_data = {
'timestamp': datetime.now().isoformat(),
'projects': [{'id': p.id, 'name': p.name} for p in projects],
'tasks': [{
'content': t.content,
'project_id': t.project_id,
'priority': t.priority,
'due': {'date': t.due.date} if t.due else None
} for t in tasks]
}
with open(self.cache_file, 'w') as f:
json.dump(cache_data, f)
logger.debug("Saved Todoist data to cache")
except Exception as e:
logger.warning(f"Failed to save cache: {e}")
def _load_cache(self):
"""Load cached API response"""
try:
if os.path.exists(self.cache_file):
with open(self.cache_file, 'r') as f:
return json.load(f)
except Exception as e:
logger.warning(f"Failed to load cache: {e}")
return None
def _create_error_image(self, im_size, error_msg=None, cached_data=None):
"""Create an error message image when API fails"""
im_width, im_height = im_size
im_black = Image.new('RGB', size=im_size, color='white')
im_colour = Image.new('RGB', size=im_size, color='white')
# Display error message
line_spacing = 1
text_bbox_height = self.font.getbbox("hg")
line_height = text_bbox_height[3] + line_spacing
messages = []
if error_msg:
messages.append("Todoist temporarily unavailable")
if cached_data and 'timestamp' in cached_data:
timestamp = arrow.get(cached_data['timestamp']).format('D-MMM-YY HH:mm')
messages.append(f"Showing cached data from:")
messages.append(timestamp)
else:
messages.append("No cached data available")
messages.append("Please check your connection")
# Center the messages vertically
total_height = len(messages) * line_height
start_y = (im_height - total_height) // 2
for i, msg in enumerate(messages):
y_pos = start_y + (i * line_height)
# First line in red (colour image), rest in black
target_image = im_colour if i == 0 else im_black
write(target_image, (0, y_pos), (im_width, line_height),
msg, font=self.font, alignment='center')
return im_black, im_colour
def generate_image(self):
"""Generate image for this module"""
@@ -77,11 +180,45 @@ class Todoist(inkycal_module):
im_colour = Image.new('RGB', size=im_size, color='white')
# Check if internet is available
if internet_available():
logger.info('Connection test passed')
if not internet_available():
logger.error("Network not reachable. Trying to use cached data.")
cached_data = self._load_cache()
if cached_data:
# Process cached data below
all_projects = [type('Project', (), p) for p in cached_data['projects']]
all_active_tasks = [type('Task', (), {
'content': t['content'],
'project_id': t['project_id'],
'priority': t['priority'],
'due': type('Due', (), {'date': t['due']['date']}) if t['due'] else None
}) for t in cached_data['tasks']]
else:
return self._create_error_image(im_size, "Network error", None)
else:
logger.error("Network not reachable. Please check your connection.")
raise NetworkNotReachableError
logger.info('Connection test passed')
# Try to fetch fresh data from API
try:
all_projects = self._fetch_with_retry(self._api.get_projects)
all_active_tasks = self._fetch_with_retry(self._api.get_tasks)
# Save to cache on successful fetch
self._save_cache(all_projects, all_active_tasks)
except Exception as e:
logger.error(f"Failed to fetch Todoist data: {e}")
# Try to use cached data
cached_data = self._load_cache()
if cached_data:
logger.info("Using cached Todoist data")
all_projects = [type('Project', (), p) for p in cached_data['projects']]
all_active_tasks = [type('Task', (), {
'content': t['content'],
'project_id': t['project_id'],
'priority': t['priority'],
'due': type('Due', (), {'date': t['due']['date']}) if t['due'] else None
}) for t in cached_data['tasks']]
else:
# No cached data available, show error
return self._create_error_image(im_size, str(e), None)
# Set some parameters for formatting todos
line_spacing = 1
@@ -97,10 +234,8 @@ class Todoist(inkycal_module):
line_positions = [
(0, spacing_top + _ * line_height) for _ in range(max_lines)]
# Get all projects by name and id
all_projects = self._api.get_projects()
# Process the fetched or cached data
filtered_project_ids_and_names = {project.id: project.name for project in all_projects}
all_active_tasks = self._api.get_tasks()
logger.debug(f"all_projects: {all_projects}")
print(f"all_projects: {all_projects}")
@@ -126,26 +261,57 @@ class Todoist(inkycal_module):
all_active_tasks = [task for task in all_active_tasks if task.project_id in filtered_project_ids]
# Simplify the tasks for faster processing
simplified = [
{
simplified = []
for task in all_active_tasks:
# Format priority indicator using circle symbols
priority_text = ""
if self.show_priority and task.priority > 1:
# Todoist uses reversed priority (4 = highest, 1 = lowest)
if task.priority == 4: # P1 - filled circle (red)
priority_text = "" # Filled circle for highest priority
elif task.priority == 3: # P2 - filled circle (black)
priority_text = "" # Filled circle for high priority
elif task.priority == 2: # P3 - empty circle (black)
priority_text = "" # Empty circle for medium priority
# Check if task is overdue
# Parse date in local timezone to ensure correct comparison
due_date = arrow.get(task.due.date, "YYYY-MM-DD").replace(tzinfo='local') if task.due else None
today = arrow.now('local').floor('day')
is_overdue = due_date and due_date < today if due_date else False
# Format due date display
if due_date:
if due_date.floor('day') == today:
due_display = "TODAY"
else:
due_display = due_date.format("D-MMM-YY")
else:
due_display = ""
simplified.append({
'name': task.content,
'due': arrow.get(task.due.date, "YYYY-MM-DD").format("D-MMM-YY") if task.due else "",
'due': due_display,
'due_date': due_date,
'is_overdue': is_overdue,
'priority': task.priority,
'priority_text': priority_text,
'project': filtered_project_ids_and_names[task.project_id]
}
for task in all_active_tasks
]
})
logger.debug(f'simplified: {simplified}')
project_lengths = []
due_lengths = []
priority_lengths = []
for task in simplified:
if task["project"]:
project_lengths.append(int(self.font.getlength(task['project']) * 1.1))
if task["due"]:
due_lengths.append(int(self.font.getlength(task['due']) * 1.1))
if task["priority_text"]:
priority_lengths.append(int(self.font.getlength(task['priority_text']) * 1.1))
# Get maximum width of project names for selected font
project_offset = int(max(project_lengths)) if project_lengths else 0
@@ -153,6 +319,9 @@ class Todoist(inkycal_module):
# Get maximum width of project dues for selected font
due_offset = int(max(due_lengths)) if due_lengths else 0
# Get maximum width of priority indicators
priority_offset = int(max(priority_lengths)) if priority_lengths else 0
# create a dict with names of filtered groups
groups = {group_name:[] for group_name in filtered_project_ids_and_names.values()}
for task in simplified:
@@ -160,6 +329,16 @@ class Todoist(inkycal_module):
if group_of_current_task in groups:
groups[group_of_current_task].append(task)
# Sort tasks within each project group by due date first, then priority
for project_name in groups:
groups[project_name].sort(
key=lambda task: (
task['due_date'] is None, # Tasks with dates come first
task['due_date'] if task['due_date'] else arrow.get('9999-12-31'), # Sort by date
-task['priority'] # Then by priority (higher priority first)
)
)
logger.debug(f"grouped: {groups}")
# Add the parsed todos on the image
@@ -179,18 +358,30 @@ class Todoist(inkycal_module):
# Add todos due if not empty
if todo['due']:
# Show overdue dates in red, normal dates in black
due_image = im_colour if todo.get('is_overdue', False) else im_black
write(
im_black,
due_image,
(line_x + project_offset, line_y),
(due_offset, line_height),
todo['due'], font=self.font, alignment='left')
# Add priority indicator if present
if todo['priority_text']:
# P1 (priority 4) in red, P2 and P3 in black
priority_image = im_colour if todo['priority'] == 4 else im_black
write(
priority_image,
(line_x + project_offset + due_offset, line_y),
(priority_offset, line_height),
todo['priority_text'], font=self.font, alignment='left')
if todo['name']:
# Add todos name
write(
im_black,
(line_x + project_offset + due_offset, line_y),
(im_width - project_offset - due_offset, line_height),
(line_x + project_offset + due_offset + priority_offset, line_y),
(im_width - project_offset - due_offset - priority_offset, line_height),
todo['name'], font=self.font, alignment='left')
cursor += 1