Merge pull request #396 from aceinnolab/bug/#392

Use gust speed provided by own api (see #392)
This commit is contained in:
Ace
2025-09-27 16:21:44 +02:00
committed by GitHub

View File

@@ -1,20 +1,20 @@
""" """
Inkycal opwenweather API abstraction Inkycal OpenWeatherMap API abstraction module
- retrieves free weather data from OWM 2.5 API endpoints (given provided API key) - Retrieves free weather data from OWM 2.5/3.0 API endpoints (with provided API key)
- handles unit, language and timezone conversions - Handles temperature and wind unit conversions
- provides ready-to-use current weather, hourly and daily forecasts - Converts data to a standardized timezone and language
- Returns ready-to-use weather structures for current, hourly, and daily forecasts
""" """
import json import json
import logging import logging
from datetime import datetime from datetime import datetime, timedelta
from datetime import timedelta from typing import Dict, List, Literal
from typing import Dict
from typing import List
from typing import Literal
import requests import requests
from dateutil import tz from dateutil import tz
# Type annotations for strict typing
TEMP_UNITS = Literal["celsius", "fahrenheit"] TEMP_UNITS = Literal["celsius", "fahrenheit"]
WIND_UNITS = Literal["meters_sec", "km_hour", "miles_hour", "knots", "beaufort"] WIND_UNITS = Literal["meters_sec", "km_hour", "miles_hour", "knots", "beaufort"]
WEATHER_TYPE = Literal["current", "forecast"] WEATHER_TYPE = Literal["current", "forecast"]
@@ -27,15 +27,16 @@ logger.setLevel(level=logging.INFO)
def is_timestamp_within_range(timestamp: datetime, start_time: datetime, end_time: datetime) -> bool: def is_timestamp_within_range(timestamp: datetime, start_time: datetime, end_time: datetime) -> bool:
# Check if the timestamp is within the range """Check if the given timestamp lies between start_time and end_time."""
return start_time <= timestamp <= end_time return start_time <= timestamp <= end_time
def get_json_from_url(request_url): def get_json_from_url(request_url):
"""Performs an HTTP GET request and returns the parsed JSON response."""
response = requests.get(request_url) response = requests.get(request_url)
if not response.ok: if not response.ok:
raise AssertionError( raise AssertionError(
f"Failure getting the current weather: code {response.status_code}. Reason: {response.text}" f"Failure getting weather: code {response.status_code}. Reason: {response.text}"
) )
return json.loads(response.text) return json.loads(response.text)
@@ -44,280 +45,162 @@ class OpenWeatherMap:
def __init__(self, api_key: str, city_id: int = None, lat: float = None, lon: float = None, def __init__(self, api_key: str, city_id: int = None, lat: float = None, lon: float = None,
api_version: API_VERSIONS = "2.5", temp_unit: TEMP_UNITS = "celsius", api_version: API_VERSIONS = "2.5", temp_unit: TEMP_UNITS = "celsius",
wind_unit: WIND_UNITS = "meters_sec", language: str = "en", tz_name: str = "UTC") -> None: wind_unit: WIND_UNITS = "meters_sec", language: str = "en", tz_name: str = "UTC") -> None:
"""
Initializes the OWM wrapper with localization settings.
Chooses API version, units, location and timezone preferences.
"""
self.api_key = api_key self.api_key = api_key
self.temp_unit = temp_unit self.temp_unit = temp_unit
self.wind_unit = wind_unit self.wind_unit = wind_unit
self.language = language self.language = language
self._api_version = api_version self._api_version = api_version
if self._api_version == "3.0":
assert type(lat) is float and type(lon) is float
self.location_substring = (
f"lat={str(lat)}&lon={str(lon)}" if (lat is not None and lon is not None) else f"id={str(city_id)}"
)
self.tz_zone = tz.gettz(tz_name) if self._api_version == "3.0":
logger.info( assert isinstance(lat, float) and isinstance(lon, float)
f"OWM wrapper initialized for API version {self._api_version}, language {self.language} and timezone {tz_name}."
self.location_substring = (
f"lat={lat}&lon={lon}" if (lat and lon) else f"id={city_id}"
) )
self.tz_zone = tz.gettz(tz_name)
logger.info(f"OWM wrapper initialized with API v{self._api_version}, lang={language}, tz={tz_name}.")
def get_weather_data_from_owm(self, weather: WEATHER_TYPE): def get_weather_data_from_owm(self, weather: WEATHER_TYPE):
# Gets current weather or forecast from the configured OWM API. """Gets either current or forecast weather data."""
if weather == "current": if weather == "current":
# Gets current weather status from the 2.5 API: https://openweathermap.org/current url = f"{API_BASE_URL}/2.5/weather?{self.location_substring}&appid={self.api_key}&units=Metric&lang={self.language}"
# This is primarily using the 2.5 API since the 3.0 API actually has less info data = get_json_from_url(url)
weather_url = f"{API_BASE_URL}/2.5/weather?{self.location_substring}&appid={self.api_key}&units=Metric&lang={self.language}"
weather_data = get_json_from_url(weather_url)
# Only if we do have a 3.0 API-enabled key, we can also get the UVI reading from that endpoint: https://openweathermap.org/api/one-call-3
if self._api_version == "3.0": if self._api_version == "3.0":
weather_url = f"{API_BASE_URL}/3.0/onecall?{self.location_substring}&appid={self.api_key}&exclude=minutely,hourly,daily&units=Metric&lang={self.language}" uvi_url = f"{API_BASE_URL}/3.0/onecall?{self.location_substring}&appid={self.api_key}&exclude=minutely,hourly,daily&units=Metric&lang={self.language}"
weather_data["uvi"] = get_json_from_url(weather_url)["current"]["uvi"] data["uvi"] = get_json_from_url(uvi_url)["current"].get("uvi")
elif weather == "forecast": elif weather == "forecast":
# Gets weather forecasts from the 2.5 API: https://openweathermap.org/forecast5 url = f"{API_BASE_URL}/2.5/forecast?{self.location_substring}&appid={self.api_key}&units=Metric&lang={self.language}"
# This is only using the 2.5 API since the 3.0 API actually has less info data = get_json_from_url(url)["list"]
weather_url = f"{API_BASE_URL}/2.5/forecast?{self.location_substring}&appid={self.api_key}&units=Metric&lang={self.language}" return data
weather_data = get_json_from_url(weather_url)["list"]
return weather_data
def get_current_weather(self) -> Dict: def get_current_weather(self) -> Dict:
""" """
Decodes the OWM current weather data for our purposes Fetches and processes current weather data.
:return: Includes gust fallback and unit conversions.
Current weather as dictionary
""" """
data = self.get_weather_data_from_owm("current")
wind_data = data.get("wind", {})
base_speed = wind_data.get("speed", 0.0)
gust_speed = wind_data.get("gust", base_speed)
converted_gust = self.get_converted_windspeed(gust_speed)
current_data = self.get_weather_data_from_owm(weather="current") weather = {
"detailed_status": data["weather"][0]["description"],
"weather_icon_name": data["weather"][0]["icon"],
"temp": self.get_converted_temperature(data["main"]["temp"]),
"temp_feels_like": self.get_converted_temperature(data["main"]["feels_like"]),
"min_temp": self.get_converted_temperature(data["main"]["temp_min"]),
"max_temp": self.get_converted_temperature(data["main"]["temp_max"]),
"humidity": data["main"]["humidity"],
"wind": converted_gust,
"wind_gust": converted_gust,
"uvi": data.get("uvi"),
"sunrise": datetime.fromtimestamp(data["sys"]["sunrise"], tz=self.tz_zone),
"sunset": datetime.fromtimestamp(data["sys"]["sunset"], tz=self.tz_zone),
}
current_weather = {} return weather
current_weather["detailed_status"] = current_data["weather"][0]["description"]
current_weather["weather_icon_name"] = current_data["weather"][0]["icon"]
current_weather["temp"] = self.get_converted_temperature(
current_data["main"]["temp"]
) # OWM Unit Default: Kelvin, Metric: Celsius, Imperial: Fahrenheit
current_weather["temp_feels_like"] = self.get_converted_temperature(current_data["main"]["feels_like"])
current_weather["min_temp"] = self.get_converted_temperature(current_data["main"]["temp_min"])
current_weather["max_temp"] = self.get_converted_temperature(current_data["main"]["temp_max"])
current_weather["humidity"] = current_data["main"]["humidity"] # OWM Unit: % rH
current_weather["wind"] = self.get_converted_windspeed(
current_data["wind"]["speed"]
) # OWM Unit Default: meter/sec, Metric: meter/sec
if "gust" in current_data["wind"]:
current_weather["wind_gust"] = self.get_converted_windspeed(current_data["wind"]["gust"])
else:
logger.info(
f"OpenWeatherMap response did not contain a wind gust speed. Using base wind: {current_weather['wind']} m/s."
)
current_weather["wind_gust"] = current_weather["wind"]
if "uvi" in current_data: # this is only supported in v3.0 API
current_weather["uvi"] = current_data["uvi"]
else:
current_weather["uvi"] = None
current_weather["sunrise"] = datetime.fromtimestamp(
current_data["sys"]["sunrise"], tz=self.tz_zone
) # unix timestamp -> to our timezone
current_weather["sunset"] = datetime.fromtimestamp(current_data["sys"]["sunset"], tz=self.tz_zone)
self.current_weather = current_weather
return current_weather
def get_weather_forecast(self) -> List[Dict]: def get_weather_forecast(self) -> List[Dict]:
""" """
Decodes the OWM weather forecast for our purposes Parses OWM 5-day / 3-hour forecast into a list of hourly dictionaries.
What you get is a list of 40 forecasts for 3-hour time slices, totaling to 5 days.
:return:
Forecasts data dictionary
""" """
# forecasts = self.get_weather_data_from_owm("forecast")
forecast_data = self.get_weather_data_from_owm(weather="forecast") hourly = []
# Add forecast data to hourly_data_dict list of dictionaries for f in forecasts:
hourly_forecasts = [] rain = f.get("rain", {}).get("3h", 0.0)
for forecast in forecast_data: snow = f.get("snow", {}).get("3h", 0.0)
# calculate combined precipitation (snow + rain) precip_mm = rain + snow
precip_mm = 0.0
if "rain" in forecast.keys():
precip_mm = +forecast["rain"]["3h"] # OWM Unit: mm
if "snow" in forecast.keys():
precip_mm = +forecast["snow"]["3h"] # OWM Unit: mm
hourly_forecasts.append(
{
"temp": self.get_converted_temperature(
forecast["main"]["temp"]
), # OWM Unit Default: Kelvin, Metric: Celsius, Imperial: Fahrenheit
"min_temp": self.get_converted_temperature(forecast["main"]["temp_min"]),
"max_temp": self.get_converted_temperature(forecast["main"]["temp_max"]),
"precip_3h_mm": precip_mm,
"wind": self.get_converted_windspeed(
forecast["wind"]["speed"]
), # OWM Unit Default: meter/sec, Metric: meter/sec, Imperial: miles/hour
"wind_gust": self.get_converted_windspeed(forecast["wind"]["gust"]),
"pressure": forecast["main"]["pressure"], # OWM Unit: hPa
"humidity": forecast["main"]["humidity"], # OWM Unit: % rH
"precip_probability": forecast["pop"]
* 100.0, # OWM value is unitless, directly converting to % scale
"icon": forecast["weather"][0]["icon"],
"datetime": datetime.fromtimestamp(forecast["dt"], tz=self.tz_zone),
}
)
logger.debug(
f"Added rain forecast at {datetime.fromtimestamp(forecast['dt'], tz=self.tz_zone)}: {precip_mm}"
)
self.hourly_forecasts = hourly_forecasts hourly.append({
"temp": self.get_converted_temperature(f["main"]["temp"]),
"min_temp": self.get_converted_temperature(f["main"]["temp_min"]),
"max_temp": self.get_converted_temperature(f["main"]["temp_max"]),
"precip_3h_mm": precip_mm,
"wind": self.get_converted_windspeed(f["wind"]["speed"]),
"wind_gust": self.get_converted_windspeed(f["wind"].get("gust", f["wind"]["speed"])),
"pressure": f["main"]["pressure"],
"humidity": f["main"]["humidity"],
"precip_probability": f.get("pop", 0.0) * 100.0,
"icon": f["weather"][0]["icon"],
"datetime": datetime.fromtimestamp(f["dt"], tz=self.tz_zone),
})
return self.hourly_forecasts return hourly
def get_forecast_for_day(self, days_from_today: int) -> Dict: def get_forecast_for_day(self, days_from_today: int) -> Dict:
""" """
Get temperature range, rain and most frequent icon code Aggregates hourly data into daily summary with min/max temp, precip and icon.
for the day that is days_from_today away.
"Today" is based on our local system timezone.
:param days_from_today:
should be int from 0-4: e.g. 2 -> 2 days from today
:return:
Forecast dictionary
""" """
# Make sure hourly forecasts are up-to-date forecasts = self.get_weather_forecast()
_ = self.get_weather_forecast() now = datetime.now(tz=self.tz_zone)
start = now.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=days_from_today)
end = start + timedelta(days=1)
# Calculate the start and end times for the specified number of days from now daily = [f for f in forecasts if start <= f["datetime"] < end]
current_time = datetime.now(tz=self.tz_zone) if not daily:
start_time = ( daily.append(forecasts[0]) # fallback to first forecast
(current_time + timedelta(days=days_from_today))
.replace(hour=0, minute=0, second=0, microsecond=0)
.astimezone(tz=self.tz_zone)
)
end_time = (start_time + timedelta(days=1)).astimezone(tz=self.tz_zone)
# Get all the forecasts for that day's time range temps = [f["temp"] for f in daily]
forecasts = [ rain = sum(f["precip_3h_mm"] for f in daily)
f icons = [f["icon"] for f in daily if f["icon"]]
for f in self.hourly_forecasts icon = max(set(icons), key=icons.count)
if is_timestamp_within_range(timestamp=f["datetime"], start_time=start_time, end_time=end_time)
]
# In case the next available forecast is already for the next day, use that one for the less than 3 remaining hours of today return {
if not forecasts: "datetime": start,
forecasts.append(self.hourly_forecasts[0])
# Get rain and temperatures for that day
temps = [f["temp"] for f in forecasts]
rain = sum([f["precip_3h_mm"] for f in forecasts])
# Get all weather icon codes for this day
icons = [f["icon"] for f in forecasts]
day_icons = [icon for icon in icons if "d" in icon]
# Use the day icons if possible
icon = max(set(day_icons), key=icons.count) if len(day_icons) > 0 else max(set(icons), key=icons.count)
# Return a dict with that day's data
day_data = {
"datetime": start_time,
"icon": icon, "icon": icon,
"temp_min": min(temps), "temp_min": min(temps),
"temp_max": max(temps), "temp_max": max(temps),
"precip_mm": rain, "precip_mm": rain
} }
return day_data
def get_converted_temperature(self, value: float) -> float: def get_converted_temperature(self, value: float) -> float:
if self.temp_unit == "fahrenheit": return self.celsius_to_fahrenheit(value) if self.temp_unit == "fahrenheit" else value
value = self.celsius_to_fahrenheit(value)
return value
def get_converted_windspeed(self, value: float) -> float: def get_converted_windspeed(self, value: float) -> float:
Literal["meters_sec", "km_hour", "miles_hour", "knots", "beaufort"]
if self.wind_unit == "km_hour": if self.wind_unit == "km_hour":
value = self.celsius_to_fahrenheit(value) return self.mps_to_kph(value)
elif self.wind_unit == "km_hour": if self.wind_unit == "miles_hour":
value = self.mps_to_kph(value) return self.mps_to_mph(value)
elif self.wind_unit == "miles_hour": if self.wind_unit == "knots":
value = self.mps_to_mph(value) return self.mps_to_knots(value)
elif self.wind_unit == "knots": if self.wind_unit == "beaufort":
value = self.mps_to_knots(value) return self.mps_to_beaufort(value)
elif self.wind_unit == "beaufort": return value # default is meters/sec
value = self.mps_to_beaufort(value)
return value
@staticmethod @staticmethod
def mps_to_beaufort(meters_per_second: float) -> int: def mps_to_beaufort(mps: float) -> int:
"""Map meters per second to the beaufort scale.
Args:
meters_per_second:
float representing meters per seconds
Returns:
an integer of the beaufort scale mapping the input
"""
thresholds = [0.3, 1.6, 3.4, 5.5, 8.0, 10.8, 13.9, 17.2, 20.8, 24.5, 28.5, 32.7] thresholds = [0.3, 1.6, 3.4, 5.5, 8.0, 10.8, 13.9, 17.2, 20.8, 24.5, 28.5, 32.7]
return next((i for i, threshold in enumerate(thresholds) if meters_per_second < threshold), 12) return next((i for i, t in enumerate(thresholds) if mps < t), 12)
@staticmethod @staticmethod
def mps_to_mph(meters_per_second: float) -> float: def mps_to_mph(mps: float) -> float:
"""Map meters per second to miles per hour return mps * 2.23694
Args:
meters_per_second:
float representing meters per seconds.
Returns:
float representing the input value in miles per hour.
"""
# 1 m/s is approximately equal to 2.23694 mph
miles_per_hour = meters_per_second * 2.23694
return miles_per_hour
@staticmethod @staticmethod
def mps_to_kph(meters_per_second: float) -> float: def mps_to_kph(mps: float) -> float:
"""Map meters per second to kilometers per hour return mps * 3.6
Args:
meters_per_second:
float representing meters per seconds.
Returns:
float representing the input value in kilometers per hour.
"""
# 1 m/s is equal to 3.6 km/h
kph = meters_per_second * 3.6
return kph
@staticmethod @staticmethod
def mps_to_knots(meters_per_second: float) -> float: def mps_to_knots(mps: float) -> float:
"""Map meters per second to knots (nautical miles per hour) return mps * 1.94384
Args:
meters_per_second:
float representing meters per seconds.
Returns:
float representing the input value in knots.
"""
# 1 m/s is equal to 1.94384 knots
knots = meters_per_second * 1.94384
return knots
@staticmethod @staticmethod
def celsius_to_fahrenheit(celsius: int or float) -> float: def celsius_to_fahrenheit(c: float) -> float:
"""Converts the given temperate from degrees Celsius to Fahrenheit.""" return c * 9.0 / 5.0 + 32.0
fahrenheit = (float(celsius) * 9.0 / 5.0) + 32.0
return fahrenheit
def main(): def main():
"""Main function, only used for testing purposes""" # Simple test entry point
key = "" key = ""
city = 2643743 city = 2643743 # London
lang = "de" owm = OpenWeatherMap(api_key=key, city_id=city, language="de", tz_name="Europe/Berlin")
owm = OpenWeatherMap(api_key=key, city_id=city, language=lang, tz="Europe/Berlin") print(owm.get_current_weather())
current_weather = owm.get_current_weather()
print(current_weather)
_ = owm.get_weather_forecast()
print(owm.get_forecast_for_day(days_from_today=2)) print(owm.get_forecast_for_day(days_from_today=2))