Files
weather-info/app/tryout/weather_checker.py
2025-11-14 12:39:47 +01:00

115 lines
5.1 KiB
Python

import urequests # type: ignore
import json
from app.classes import Weather, Location, CurrentCondition, WeatherResponse, ResponseStatus
from app.display import NeoPixel_64x64
import app.utils.colors as colors
from app.utils import URLEncoder, http_message, get_datetime_string
API_KEY = '3545ce42d0ba436e8dc164532250410'
ACTUAL_WEATHER_URL = 'http://api.weatherapi.com/v1/current.json?key={API_KEY}&q={city}&aqi=yes&lang={lang}'
class Weather_Checker():
def __init__(self, display: NeoPixel_64x64):
self.display = display
def mock_weather_data(self):
filepath = 'restapi/mock-weather.json'
try:
with open(filepath, 'r', encoding='utf-8') as file:
return json.load(file)
except OSError: # Use OSError instead of FileNotFoundError
print(f"Error: File '{filepath}' not found.")
return None
except FileNotFoundError:
print(f"Error: File '{filepath}' not found.")
return None
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in '{filepath}': {e}")
return None
def _build_weather(self, json_resp) -> Weather:
loc = Location(**json_resp['location'])
cc = CurrentCondition(**json_resp['current'])
weather = Weather(location=loc, current=cc)
return weather
def actual_weather(self, city, lang, test_mode=False) -> WeatherResponse:
wr : WeatherResponse = WeatherResponse()
weather_url = ACTUAL_WEATHER_URL.format(API_KEY=API_KEY, city=city, lang=lang)
if not test_mode:
response_status: ResponseStatus = ResponseStatus()
print(f'query: {weather_url}')
r = urequests.get(weather_url)
print('Status-Code:', r.status_code)
json_resp = r.json()
print('json_resp:', json_resp)
response_status.code = r.status_code
response_status.message = http_message[r.status_code]
if r.status_code == 200:
wr.weather = self._build_weather(json_resp)
else:
response_status.error_text = json_resp['error']['message']
wr.response_status = response_status
r.close()
else:
print('Status-Code: test_mode')
# json_resp = WEATHER_QUERY_MOCK
json_resp = self.mock_weather_data()
wr.weather = self._build_weather(json_resp)
wr.response_status = ResponseStatus( code=200, message="OK (Test-Mode)")
return wr
def check(self, city:str, lang:str, test_mode: bool = False):
bottom_ypos = self.display.MATRIX_HEIGHT - self.display.font_height
time_str: str = get_datetime_string('time')
try:
self.display.clear()
self.display.write_text(time_str[:5], 0, bottom_ypos, color=colors.NEON_YELLOW)
city_encoded = URLEncoder.encode_utf8(city) # url_encode
w_resp: WeatherResponse = self.actual_weather(city=city_encoded, lang=lang, test_mode=test_mode)
if w_resp.response_status.code == 200:
ypos = 0
self.display.write_text(f'{str(w_resp.weather.location.name)}', 0, ypos, color=colors.RAINBOW[0])
ypos += self.display.font_height + 1
self.display.write_text(f'{str(w_resp.weather.location.region)}', 0, ypos, color=colors.RAINBOW[0])
ypos += self.display.font_height + 1
self.display.write_text(f'{str(w_resp.weather.location.localtime)[:10]}', 0, ypos, color=colors.RAINBOW[3])
ypos += self.display.font_height + 1
self.display.write_text(f'{str(w_resp.weather.current.temp_c)}°C', 0, ypos, color=colors.RAINBOW[1])
ypos += self.display.font_height + 1
self.display.write_text(f'{str(w_resp.weather.current.condition.text)}', 0, ypos, color=colors.RAINBOW[2])
ypos += self.display.font_height + 1
self.display.write_text(f'upd:{str(w_resp.weather.current.last_updated)[-5:]}', 0, ypos, color=colors.RAINBOW[3])
ypos += self.display.font_height + 1
self.display.write_text(f'cur:{str(w_resp.weather.location.localtime)[-5:]}', 0, ypos, color=colors.RAINBOW[4])
else:
ypos = 0
self.display.write_text(f'Code:{w_resp.response_status.code}', 0, ypos, color=colors.RAINBOW[0])
ypos += self.display.font_height + 1
self.display.write_text(f'{w_resp.response_status.message}', 0, ypos, color=colors.RAINBOW[1])
ypos += self.display.font_height + 1
self.display.write_text(f'{w_resp.response_status.error_text}', 0, ypos, color=colors.RAINBOW[2])
# unten rechts in die Ecke
self.display.write_text('done.', 39, bottom_ypos, color=colors.NEON_GREEN)
except OSError as e:
print(f'Error: connection closed - {e}')
finally:
print('finally done.')