v.0.8.3 main.py print_time_task on display

This commit is contained in:
tiijay
2025-11-19 08:31:16 +00:00
parent c817f0330a
commit 2be6d888b3
3 changed files with 116 additions and 57 deletions

View File

@@ -1,23 +1,29 @@
import urequests # type: ignore
import json
from app.classes import Weather, Location, CurrentCondition, WeatherResponse, ResponseStatus
from app.classes import (
Weather,
Location,
CurrentCondition,
WeatherResponse,
ResponseStatus,
)
from app.display import NeoPixel_64x64
import app.utils.colors as colors
from app.utils import URLEncoder, http_message, get_datetime_string
API_KEY = '3545ce42d0ba436e8dc164532250410'
ACTUAL_WEATHER_URL = 'http://api.weatherapi.com/v1/current.json?key={API_KEY}&q={city}&aqi=yes&lang={lang}'
API_KEY = "3545ce42d0ba436e8dc164532250410"
ACTUAL_WEATHER_URL = "http://api.weatherapi.com/v1/current.json?key={API_KEY}&q={city}&aqi=yes&lang={lang}"
class Weather_Checker():
class Weather_Checker:
def __init__(self, display: NeoPixel_64x64):
self.display = display
def mock_weather_data(self):
filepath = 'restapi/mock-weather.json'
filepath = "restapi/mock-weather.json"
try:
with open(filepath, 'r', encoding='utf-8') as file:
with open(filepath, "r", encoding="utf-8") as file:
return json.load(file)
except OSError: # Use OSError instead of FileNotFoundError
print(f"Error: File '{filepath}' not found.")
@@ -30,8 +36,8 @@ class Weather_Checker():
return None
def _build_weather(self, json_resp) -> Weather:
loc = Location(**json_resp['location'])
cc = CurrentCondition(**json_resp['current'])
loc = Location(**json_resp["location"])
cc = CurrentCondition(**json_resp["current"])
weather = Weather(location=loc, current=cc)
return weather
@@ -40,15 +46,14 @@ class Weather_Checker():
wr: WeatherResponse = WeatherResponse()
weather_url = ACTUAL_WEATHER_URL.format(API_KEY=API_KEY, city=city, lang=lang)
if not test_mode:
response_status: ResponseStatus = ResponseStatus()
print(f'query: {weather_url}')
print(f"query: {weather_url}")
r = urequests.get(weather_url)
print('Status-Code:', r.status_code)
print("Status-Code:", r.status_code)
json_resp = r.json()
print('json_resp:', json_resp)
print("json_resp:", json_resp)
response_status.code = r.status_code
response_status.message = http_message[r.status_code]
@@ -56,13 +61,13 @@ class Weather_Checker():
if r.status_code == 200:
wr.weather = self._build_weather(json_resp)
else:
response_status.error_text = json_resp['error']['message']
response_status.error_text = json_resp["error"]["message"]
wr.response_status = response_status
r.close()
else:
print('Status-Code: test_mode')
print("Status-Code: test_mode")
# json_resp = WEATHER_QUERY_MOCK
json_resp = self.mock_weather_data()
wr.weather = self._build_weather(json_resp)
@@ -70,46 +75,94 @@ class Weather_Checker():
return wr
def check(self, city: str, lang: str, test_mode: bool = False):
bottom_ypos = self.display.MATRIX_HEIGHT - self.display.font_height
time_str: str = get_datetime_string('time')
try:
self.display.clear()
self.display.write_text(time_str[:5], 0, bottom_ypos, color=colors.NEON_YELLOW)
city_encoded = URLEncoder.encode_utf8(city) # url_encode
w_resp: WeatherResponse = self.actual_weather(city=city_encoded, lang=lang, test_mode=test_mode)
w_resp: WeatherResponse = self.actual_weather(
city=city_encoded, lang=lang, test_mode=test_mode
)
if w_resp.response_status.code == 200:
ypos = 0
self.display.write_text(f'{str(w_resp.weather.location.name)}', 0, ypos, color=colors.RAINBOW[0])
self.display.write_text(
f"{str(w_resp.weather.location.name)}",
0,
ypos,
color=colors.RAINBOW[0],
)
ypos += self.display.font_height + 1
self.display.write_text(f'{str(w_resp.weather.location.region)}', 0, ypos, color=colors.RAINBOW[0])
self.display.write_text(
f"{str(w_resp.weather.location.region)}",
0,
ypos,
color=colors.RAINBOW[0],
)
ypos += self.display.font_height + 1
self.display.write_text(f'{str(w_resp.weather.location.localtime)[:10]}', 0, ypos, color=colors.RAINBOW[3])
self.display.write_text(
f"{str(w_resp.weather.location.localtime)[:10]}",
0,
ypos,
color=colors.RAINBOW[3],
)
ypos += self.display.font_height + 1
self.display.write_text(f'{str(w_resp.weather.current.temp_c)}°C', 0, ypos, color=colors.RAINBOW[1])
self.display.write_text(
f"{str(w_resp.weather.current.temp_c)}°C",
0,
ypos,
color=colors.RAINBOW[1],
)
ypos += self.display.font_height + 1
self.display.write_text(f'{str(w_resp.weather.current.condition.text)}', 0, ypos, color=colors.RAINBOW[2])
self.display.write_text(
f"{str(w_resp.weather.current.condition.text)}",
0,
ypos,
color=colors.RAINBOW[2],
)
ypos += self.display.font_height + 1
self.display.write_text(f'upd:{str(w_resp.weather.current.last_updated)[-5:]}', 0, ypos, color=colors.RAINBOW[3])
self.display.write_text(
f"upd:{str(w_resp.weather.current.last_updated)[-5:]}",
0,
ypos,
color=colors.RAINBOW[3],
)
ypos += self.display.font_height + 1
self.display.write_text(f'cur:{str(w_resp.weather.location.localtime)[-5:]}', 0, ypos, color=colors.RAINBOW[4])
self.display.write_text(
f"cur:{str(w_resp.weather.location.localtime)[-5:]}",
0,
ypos,
color=colors.RAINBOW[4],
)
else:
ypos = 0
self.display.write_text(f'Code:{w_resp.response_status.code}', 0, ypos, color=colors.RAINBOW[0])
self.display.write_text(
f"Code:{w_resp.response_status.code}",
0,
ypos,
color=colors.RAINBOW[0],
)
ypos += self.display.font_height + 1
self.display.write_text(f'{w_resp.response_status.message}', 0, ypos, color=colors.RAINBOW[1])
self.display.write_text(
f"{w_resp.response_status.message}",
0,
ypos,
color=colors.RAINBOW[1],
)
ypos += self.display.font_height + 1
self.display.write_text(f'{w_resp.response_status.error_text}', 0, ypos, color=colors.RAINBOW[2])
self.display.write_text(
f"{w_resp.response_status.error_text}",
0,
ypos,
color=colors.RAINBOW[2],
)
# unten rechts in die Ecke
self.display.write_text('done.', 39, bottom_ypos, color=colors.NEON_GREEN)
self.display.write_text("done.", 39, bottom_ypos, color=colors.NEON_GREEN)
except OSError as e:
print(f'Error: connection closed - {e}')
print(f"Error: connection closed - {e}")
finally:
print('finally, check done.')
print("finally, check done.")

View File

@@ -4,13 +4,13 @@ import ntptime # type: ignore
_UTC_OFFSET: int = 1 * 3600 # +1 or +2 hours for CEST, adjust for your timezone
def local_time_with_offset():
def local_time_with_offset() -> int:
# Set timezone offset (in seconds)
current_time = time.time() + _UTC_OFFSET
return time.localtime(current_time)
def get_datetime_string(format="full"):
def get_datetime_string(format="full") -> str:
"""
Return date/time as string with different formats
@@ -39,7 +39,7 @@ def get_datetime_string(format="full"):
return f"Ticks: {time.ticks_ms()} ms"
def get_german_datetime():
def get_german_datetime() -> str:
"""Return German date and time"""
try:
year, month, day, hour, minute, second, weekday, yearday = (
@@ -56,7 +56,7 @@ def get_german_datetime():
return f"Zeit: {ticks} sek"
def get_german_timestamp_short():
def get_german_timestamp_short() -> str:
"""Get German timestamp with short months (for Wokwi)"""
ticks = time.ticks_ms()
day = (ticks // 86400000) % 31 + 1
@@ -83,7 +83,7 @@ def get_german_timestamp_short():
return f"{day}.{month_name} {hour:02d}:{minute:02d}"
def get_german_time_ticks():
def get_german_time_ticks() -> str:
"""Get German time using ticks (works in Wokwi)"""
ticks = time.ticks_ms()
@@ -96,7 +96,7 @@ def get_german_time_ticks():
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
def get_german_date_ticks():
def get_german_date_ticks() -> str:
"""Get German date using ticks"""
ticks = time.ticks_ms()
@@ -125,7 +125,7 @@ def get_german_date_ticks():
return f"{day:02d}.{month_name}.{str(year)[-2:]}"
def sync_ntp_time():
def sync_ntp_time() -> bool:
try:
print("Syncing time via NTP...")
ntptime.settime() # Default uses pool.ntp.org

12
main.py
View File

@@ -7,7 +7,7 @@ from app.utils import (
get_datetime_string,
get_german_datetime,
) # Time-related functions
from app.utils import SimpleCounter
from app.utils import SimpleCounter, colors
import time
import uasyncio as asyncio # type: ignore
from app.web import Wlan
@@ -28,10 +28,16 @@ async def weather_check_task(weather_checker: Weather_Checker):
async def print_time_task() -> None:
simpleCnt: SimpleCounter = SimpleCounter()
while True:
dt1: int = get_datetime_string()
dt2: int = get_german_datetime()
dt1: str = get_datetime_string()
dt2: str = get_german_datetime()
simpleCnt += 1
print(f"print_time_task running... {simpleCnt.value % 16} {dt1} {dt2}")
bottom_ypos = display.MATRIX_HEIGHT - display.font_height
time_str: str = get_datetime_string("time")
display.clear_row(bottom_ypos)
display.write_text(time_str[:8], 0, bottom_ypos, color=colors.NEON_YELLOW)
await asyncio.sleep(10)