Compare commits

...

2 Commits

Author SHA1 Message Date
tiijay
115e402b0d v.0.11.3 api-server public port 8000 2025-11-21 19:07:41 +00:00
tiijay
c27b97fc71 v.0.11.2 added beautyful soup bs4 2025-11-21 15:06:04 +00:00
10 changed files with 85 additions and 64 deletions

View File

@@ -7,8 +7,9 @@
"dockerfile": "Dockerfile"
},
"runArgs": [
"--network=dev-network",
"--name=api-server"
// "--network=dev-network",
"--name=api-server",
"-p", "8000:8000"
],
// "image": "mcr.microsoft.com/devcontainers/python:1-3.12",
// "image": "python:latest",
@@ -31,11 +32,11 @@
}
},
// Use 'forwardPorts' to make a list of ports inside the container available locally.
// "forwardPorts": [9000],
"forwardPorts": [8000],
// Use 'portsAttributes' to set default properties for specific forwarded ports.
// More info: https://containers.dev/implementors/json_reference/#port-attributes
"portsAttributes": {
"9000": {
"8000": {
"label": "API-Server Application",
"onAutoForward": "notify"
}
@@ -43,7 +44,8 @@
// Use 'postCreateCommand' to run commands after the container is created.
"postCreateCommand": "uv sync",
// "postCreateCommand": "apt-get update && apt-get install -y git && pip3 install -r requirements.txt"
"postStartCommand": "uv run app/main.py" // Add this line
// "postStartCommand": "uv run app/main.py" // Add this line
"postStartCommand": "uv run uvicorn app.main:app --reload --host 0.0.0.0 --port 8000"
// Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root.
// "remoteUser": "root"
}

View File

@@ -1,75 +1,71 @@
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional
from typing import List
from app.models import Item, dummy_items
app = FastAPI(title="FastAPI Server", version="1.0.0")
# CORS middleware to allow client requests
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000", "http://client:3000"],
# allow_origins=["http://localhost:3000", "http://client:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class Item(BaseModel):
id: int
name: str
description: Optional[str] = None
price: float
# In-memory database
items_db = [
Item(id=1, name="Laptop", description="High-performance laptop", price=999.99),
Item(id=2, name="Mouse", description="Wireless mouse", price=29.99),
Item(id=3, name="Keyboard", description="Mechanical keyboard", price=79.99),
]
@app.get("/")
async def root():
return {"message": "FastAPI Server is running!"}
@app.get("/items", response_model=List[Item])
async def get_items():
return items_db
return dummy_items
@app.get("/items/{item_id}", response_model=Item)
async def get_item(item_id: int):
item = next((item for item in items_db if item.id == item_id), None)
item = next((item for item in dummy_items if item.id == item_id), None)
if item is None:
raise HTTPException(status_code=404, detail="Item not found")
return item
@app.post("/items", response_model=Item)
async def create_item(item: Item):
if any(existing_item.id == item.id for existing_item in items_db):
if any(existing_item.id == item.id for existing_item in dummy_items):
raise HTTPException(status_code=400, detail="Item ID already exists")
items_db.append(item)
dummy_items.append(item)
return item
@app.put("/items/{item_id}", response_model=Item)
async def update_item(item_id: int, item: Item):
if item.id != item_id:
raise HTTPException(status_code=400, detail="Item ID mismatch")
for idx, existing_item in enumerate(items_db):
for idx, existing_item in enumerate(dummy_items):
if existing_item.id == item_id:
items_db[idx] = item
dummy_items[idx] = item
return item
raise HTTPException(status_code=404, detail="Item not found")
@app.delete("/items/{item_id}")
async def delete_item(item_id: int):
for idx, item in enumerate(items_db):
for idx, item in enumerate(dummy_items):
if item.id == item_id:
deleted_item = items_db.pop(idx)
deleted_item = dummy_items.pop(idx)
return {"message": f"Item {deleted_item.name} deleted successfully"}
raise HTTPException(status_code=404, detail="Item not found")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@@ -0,0 +1,12 @@
from .item import Item
from typing import List
# In-memory database
dummy_items: List[Item] = [
Item(id=1, name="Laptop", description="High-performance laptop", price=999.99),
Item(id=2, name="Mouse", description="Wireless mouse", price=29.99),
Item(id=3, name="Keyboard", description="Mechanical keyboard", price=79.99),
Item(id=4, name="Monitor", description='Samsung 32"', price=187.00),
]
__all__ = ["Item"]

View File

@@ -0,0 +1,9 @@
from pydantic import BaseModel
from typing import Optional
class Item(BaseModel):
id: int
name: str
description: Optional[str] = None
price: float

View File

@@ -5,6 +5,7 @@ requires-python = ">=3.11"
dependencies = [
"fastapi>=0.104.0",
"uvicorn[standard]>=0.24.0",
"beautifulsoup4>=4.14.0",
]
# [build-system]

View File

@@ -7,8 +7,9 @@
"dockerfile": "Dockerfile"
},
"runArgs": [
"--network=dev-network",
"--name=pico-client"
// "--network=dev-network",
"--name=pico-client",
"--add-host", "j-agent-01:192.168.178.105"
],
// "image": "mcr.microsoft.com/devcontainers/python:1-3.12",
// "image": "python:latest",

View File

@@ -21,14 +21,14 @@ async def weather_check_task(weather_checker: Weather_Checker):
while True:
for city in CITY_LIST:
weather_checker.check(city=city, lang="de", test_mode=True)
weather_checker.check(city=city, lang="de", test_mode=False)
print(f"Checked {city}")
await asyncio.sleep(3 * 60)
async def api_server_check_task(api_server_checker: API_Server_Checker):
while True:
rnd_item_id: int = random.randrange(4)
rnd_item_id: int = random.randrange(5)
api_server_checker.check(rnd_item_id)
await asyncio.sleep(10)
@@ -36,13 +36,9 @@ async def api_server_check_task(api_server_checker: API_Server_Checker):
async def print_time_task() -> None:
bottom_ypos = display.MATRIX_HEIGHT - display.font_height
simpleCnt: SimpleCounter = SimpleCounter()
digitalClock: DigitalClock = DigitalClock(display, 0, bottom_ypos)
while True:
dt1: str = get_datetime_string()
simpleCnt += 1
print(f"print_time_task running... {simpleCnt.value % 16} {dt1}")
while True:
await digitalClock.tick()
await asyncio.sleep(2)
@@ -58,10 +54,10 @@ async def main(
) -> None:
# Run both tasks concurrently
await asyncio.gather(
sync_ntp_time_task(),
weather_check_task(weather_checker),
# sync_ntp_time_task(),
# weather_check_task(weather_checker),
api_server_check_task(api_server_checker),
print_time_task(),
# print_time_task(),
)

View File

@@ -1,7 +1,9 @@
import urequests # type: ignore
import json
BASE_API_URL: str = "http://api-server:8000"
# BASE_API_URL: str = "http://api-server:8000"
# BASE_API_URL: str = "http://j-agent-01:8000"
BASE_API_URL: str = "http://192.168.178.105:8000"
class API_Server_Checker:
@@ -15,16 +17,24 @@ class API_Server_Checker:
else f"{BASE_API_URL}/items/{item_id}"
)
print(f"query: {items_url}")
r = urequests.get(items_url)
print("Status-Code:", r.status_code)
json_resp = r.json()
if r.status_code == 200:
print("json_resp:", json_resp)
else:
print("api-server error:", json_resp["error"]["message"])
r: any
try:
r = urequests.get(items_url)
print("Status-Code:", r.status_code)
json_resp = r.json()
r.close()
if r.status_code == 200:
print("json_resp:", json_resp)
else:
print("api-server error:", json_resp["error"]["message"])
except OSError as e:
print("Network error:", e)
except Exception as e:
print("Error:", e)
finally:
if r:
r.close()
def check(self, item_id: int):
self._get_items(item_id)

View File

@@ -123,19 +123,16 @@ class Weather_Checker:
color=colors.RAINBOW[2],
)
ypos += self.display.font_height + 1
updated_time_str: str = str(w_resp.weather.current.last_updated)[-5:]
local_time_str: str = str(w_resp.weather.location.localtime)[-5:]
self.display.write_text(
f"upd:{str(w_resp.weather.current.last_updated)[-5:]}",
f"{local_time_str} {updated_time_str}",
0,
ypos,
color=colors.RAINBOW[3],
)
ypos += self.display.font_height + 1
self.display.write_text(
f"cur:{str(w_resp.weather.location.localtime)[-5:]}",
0,
ypos,
color=colors.RAINBOW[4],
)
else:
ypos = 0
self.display.write_text(

View File

@@ -32,9 +32,6 @@ class DigitalClock:
fresh_part: str = time_str[mismatch_pos:]
part_to_clear: str = self.stored_time_str[mismatch_pos:]
textwidth: int = self._text_width(self.stored_time_str)
print(f"{self.stored_time_str}: #{textwidth}")
textwidth_untouched_part: int = self._text_width(untouched_part)
clear_x_start_pos: int = self.xpos + textwidth_untouched_part + 1