v.0.11.2 added beautyful soup bs4

This commit is contained in:
tiijay
2025-11-21 15:06:04 +00:00
parent 3a8fb95f4f
commit c27b97fc71
9 changed files with 73 additions and 60 deletions

View File

@@ -43,7 +43,8 @@
// Use 'postCreateCommand' to run commands after the container is created. // Use 'postCreateCommand' to run commands after the container is created.
"postCreateCommand": "uv sync", "postCreateCommand": "uv sync",
// "postCreateCommand": "apt-get update && apt-get install -y git && pip3 install -r requirements.txt" // "postCreateCommand": "apt-get update && apt-get install -y git && pip3 install -r requirements.txt"
"postStartCommand": "uv run app/main.py" // Add this line // "postStartCommand": "uv run app/main.py" // Add this line
"postStartCommand": "uv run uvicorn app.main:app --reload --host 0.0.0.0 --port 8000"
// Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root. // Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root.
// "remoteUser": "root" // "remoteUser": "root"
} }

View File

@@ -1,75 +1,71 @@
from fastapi import FastAPI, HTTPException from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel from typing import List
from typing import List, Optional
from app.models import Item, dummy_items
app = FastAPI(title="FastAPI Server", version="1.0.0") app = FastAPI(title="FastAPI Server", version="1.0.0")
# CORS middleware to allow client requests # CORS middleware to allow client requests
app.add_middleware( # app.add_middleware(
CORSMiddleware, # CORSMiddleware,
allow_origins=["http://localhost:3000", "http://client:3000"], # # allow_origins=["http://localhost:3000", "http://client:3000"],
allow_credentials=True, # allow_credentials=True,
allow_methods=["*"], # allow_methods=["*"],
allow_headers=["*"], # allow_headers=["*"],
) # )
class Item(BaseModel):
id: int
name: str
description: Optional[str] = None
price: float
# In-memory database
items_db = [
Item(id=1, name="Laptop", description="High-performance laptop", price=999.99),
Item(id=2, name="Mouse", description="Wireless mouse", price=29.99),
Item(id=3, name="Keyboard", description="Mechanical keyboard", price=79.99),
]
@app.get("/") @app.get("/")
async def root(): async def root():
return {"message": "FastAPI Server is running!"} return {"message": "FastAPI Server is running!"}
@app.get("/items", response_model=List[Item]) @app.get("/items", response_model=List[Item])
async def get_items(): async def get_items():
return items_db return dummy_items
@app.get("/items/{item_id}", response_model=Item) @app.get("/items/{item_id}", response_model=Item)
async def get_item(item_id: int): async def get_item(item_id: int):
item = next((item for item in items_db if item.id == item_id), None) item = next((item for item in dummy_items if item.id == item_id), None)
if item is None: if item is None:
raise HTTPException(status_code=404, detail="Item not found") raise HTTPException(status_code=404, detail="Item not found")
return item return item
@app.post("/items", response_model=Item) @app.post("/items", response_model=Item)
async def create_item(item: Item): async def create_item(item: Item):
if any(existing_item.id == item.id for existing_item in items_db): if any(existing_item.id == item.id for existing_item in dummy_items):
raise HTTPException(status_code=400, detail="Item ID already exists") raise HTTPException(status_code=400, detail="Item ID already exists")
items_db.append(item) dummy_items.append(item)
return item return item
@app.put("/items/{item_id}", response_model=Item) @app.put("/items/{item_id}", response_model=Item)
async def update_item(item_id: int, item: Item): async def update_item(item_id: int, item: Item):
if item.id != item_id: if item.id != item_id:
raise HTTPException(status_code=400, detail="Item ID mismatch") raise HTTPException(status_code=400, detail="Item ID mismatch")
for idx, existing_item in enumerate(items_db): for idx, existing_item in enumerate(dummy_items):
if existing_item.id == item_id: if existing_item.id == item_id:
items_db[idx] = item dummy_items[idx] = item
return item return item
raise HTTPException(status_code=404, detail="Item not found") raise HTTPException(status_code=404, detail="Item not found")
@app.delete("/items/{item_id}") @app.delete("/items/{item_id}")
async def delete_item(item_id: int): async def delete_item(item_id: int):
for idx, item in enumerate(items_db): for idx, item in enumerate(dummy_items):
if item.id == item_id: if item.id == item_id:
deleted_item = items_db.pop(idx) deleted_item = dummy_items.pop(idx)
return {"message": f"Item {deleted_item.name} deleted successfully"} return {"message": f"Item {deleted_item.name} deleted successfully"}
raise HTTPException(status_code=404, detail="Item not found") raise HTTPException(status_code=404, detail="Item not found")
if __name__ == "__main__": if __name__ == "__main__":
import uvicorn import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000) uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@@ -0,0 +1,12 @@
from .item import Item
from typing import List
# In-memory database
dummy_items: List[Item] = [
Item(id=1, name="Laptop", description="High-performance laptop", price=999.99),
Item(id=2, name="Mouse", description="Wireless mouse", price=29.99),
Item(id=3, name="Keyboard", description="Mechanical keyboard", price=79.99),
Item(id=4, name="Monitor", description='Samsung 32"', price=187.00),
]
__all__ = ["Item"]

View File

@@ -0,0 +1,9 @@
from pydantic import BaseModel
from typing import Optional
class Item(BaseModel):
id: int
name: str
description: Optional[str] = None
price: float

View File

@@ -5,6 +5,7 @@ requires-python = ">=3.11"
dependencies = [ dependencies = [
"fastapi>=0.104.0", "fastapi>=0.104.0",
"uvicorn[standard]>=0.24.0", "uvicorn[standard]>=0.24.0",
"beautifulsoup4>=4.14.0",
] ]
# [build-system] # [build-system]

View File

@@ -21,14 +21,14 @@ async def weather_check_task(weather_checker: Weather_Checker):
while True: while True:
for city in CITY_LIST: for city in CITY_LIST:
weather_checker.check(city=city, lang="de", test_mode=True) weather_checker.check(city=city, lang="de", test_mode=False)
print(f"Checked {city}") print(f"Checked {city}")
await asyncio.sleep(3 * 60) await asyncio.sleep(3 * 60)
async def api_server_check_task(api_server_checker: API_Server_Checker): async def api_server_check_task(api_server_checker: API_Server_Checker):
while True: while True:
rnd_item_id: int = random.randrange(4) rnd_item_id: int = random.randrange(5)
api_server_checker.check(rnd_item_id) api_server_checker.check(rnd_item_id)
await asyncio.sleep(10) await asyncio.sleep(10)
@@ -36,13 +36,9 @@ async def api_server_check_task(api_server_checker: API_Server_Checker):
async def print_time_task() -> None: async def print_time_task() -> None:
bottom_ypos = display.MATRIX_HEIGHT - display.font_height bottom_ypos = display.MATRIX_HEIGHT - display.font_height
simpleCnt: SimpleCounter = SimpleCounter()
digitalClock: DigitalClock = DigitalClock(display, 0, bottom_ypos) digitalClock: DigitalClock = DigitalClock(display, 0, bottom_ypos)
while True:
dt1: str = get_datetime_string()
simpleCnt += 1
print(f"print_time_task running... {simpleCnt.value % 16} {dt1}")
while True:
await digitalClock.tick() await digitalClock.tick()
await asyncio.sleep(2) await asyncio.sleep(2)

View File

@@ -15,16 +15,20 @@ class API_Server_Checker:
else f"{BASE_API_URL}/items/{item_id}" else f"{BASE_API_URL}/items/{item_id}"
) )
print(f"query: {items_url}") print(f"query: {items_url}")
r = urequests.get(items_url)
print("Status-Code:", r.status_code)
json_resp = r.json()
if r.status_code == 200: try:
print("json_resp:", json_resp) r = urequests.get(items_url)
else: print("Status-Code:", r.status_code)
print("api-server error:", json_resp["error"]["message"]) json_resp = r.json()
r.close() if r.status_code == 200:
print("json_resp:", json_resp)
else:
print("api-server error:", json_resp["error"]["message"])
except OSError as e:
print(f"api-server call failed: {e}")
finally:
r.close()
def check(self, item_id: int): def check(self, item_id: int):
self._get_items(item_id) self._get_items(item_id)

View File

@@ -123,19 +123,16 @@ class Weather_Checker:
color=colors.RAINBOW[2], color=colors.RAINBOW[2],
) )
ypos += self.display.font_height + 1 ypos += self.display.font_height + 1
updated_time_str: str = str(w_resp.weather.current.last_updated)[-5:]
local_time_str: str = str(w_resp.weather.location.localtime)[-5:]
self.display.write_text( self.display.write_text(
f"upd:{str(w_resp.weather.current.last_updated)[-5:]}", f"{local_time_str} {updated_time_str}",
0, 0,
ypos, ypos,
color=colors.RAINBOW[3], color=colors.RAINBOW[3],
) )
ypos += self.display.font_height + 1
self.display.write_text(
f"cur:{str(w_resp.weather.location.localtime)[-5:]}",
0,
ypos,
color=colors.RAINBOW[4],
)
else: else:
ypos = 0 ypos = 0
self.display.write_text( self.display.write_text(

View File

@@ -32,9 +32,6 @@ class DigitalClock:
fresh_part: str = time_str[mismatch_pos:] fresh_part: str = time_str[mismatch_pos:]
part_to_clear: str = self.stored_time_str[mismatch_pos:] part_to_clear: str = self.stored_time_str[mismatch_pos:]
textwidth: int = self._text_width(self.stored_time_str)
print(f"{self.stored_time_str}: #{textwidth}")
textwidth_untouched_part: int = self._text_width(untouched_part) textwidth_untouched_part: int = self._text_width(untouched_part)
clear_x_start_pos: int = self.xpos + textwidth_untouched_part + 1 clear_x_start_pos: int = self.xpos + textwidth_untouched_part + 1