mirror of
https://gitea.publichub.eu/oscar.krause/fastapi-dls.git
synced 2025-11-03 17:36:07 +00:00
implemented ha endpoints and configuration
This commit is contained in:
113
app/main.py
113
app/main.py
@@ -6,7 +6,7 @@ from os.path import join, dirname
|
||||
from os import getenv as env
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from fastapi import FastAPI
|
||||
from fastapi import FastAPI, BackgroundTasks
|
||||
from fastapi.requests import Request
|
||||
from json import loads as json_loads
|
||||
from datetime import datetime, timedelta
|
||||
@@ -19,7 +19,7 @@ from starlette.responses import StreamingResponse, JSONResponse as JSONr, HTMLRe
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
from util import load_key, load_file
|
||||
from util import load_key, load_file, ha_replicate
|
||||
from orm import Origin, Lease, init as db_init, migrate
|
||||
|
||||
load_dotenv('../version.env')
|
||||
@@ -36,6 +36,7 @@ db_init(db), migrate(db)
|
||||
# everything prefixed with "INSTANCE_*" is used as "SERVICE_INSTANCE_*" or "SI_*" in official dls service
|
||||
DLS_URL = str(env('DLS_URL', 'localhost'))
|
||||
DLS_PORT = int(env('DLS_PORT', '443'))
|
||||
HA_REPLICATE, HA_ROLE = str(env('HA_REPLICATE', None)), str(env('HA_ROLE', None))
|
||||
SITE_KEY_XID = str(env('SITE_KEY_XID', '00000000-0000-0000-0000-000000000000'))
|
||||
INSTANCE_REF = str(env('INSTANCE_REF', '10000000-0000-0000-0000-000000000001'))
|
||||
ALLOTMENT_REF = str(env('ALLOTMENT_REF', '20000000-0000-0000-0000-000000000001'))
|
||||
@@ -199,6 +200,36 @@ async def _client_token():
|
||||
cur_time = datetime.utcnow()
|
||||
exp_time = cur_time + CLIENT_TOKEN_EXPIRE_DELTA
|
||||
|
||||
if HA_REPLICATE is not None and HA_ROLE.lower() == "secondary":
|
||||
return RedirectResponse(f'https://{HA_REPLICATE}/-/client-token')
|
||||
|
||||
idx_port, idx_node = 0, 0
|
||||
|
||||
def create_svc_port_set(port: int):
|
||||
idx = idx_port
|
||||
return {
|
||||
"idx": idx,
|
||||
"d_name": "DLS",
|
||||
"svc_port_map": [{"service": "auth", "port": port}, {"service": "lease", "port": port}]
|
||||
}
|
||||
|
||||
def create_node_url(url: str, svc_port_set_idx: int):
|
||||
idx = idx_node
|
||||
return {"idx": idx, "url": url, "url_qr": url, "svc_port_set_idx": svc_port_set_idx}
|
||||
|
||||
service_instance_configuration = {
|
||||
"nls_service_instance_ref": INSTANCE_REF,
|
||||
"svc_port_set_list": [create_svc_port_set(DLS_PORT)],
|
||||
"node_url_list": [create_node_url(DLS_URL, idx_port)]
|
||||
}
|
||||
idx_port += 1
|
||||
idx_node += 1
|
||||
|
||||
if HA_REPLICATE is not None and HA_ROLE.lower() == "primary":
|
||||
SEC_URL, SEC_PORT, *invalid = HA_REPLICATE.split(':')
|
||||
service_instance_configuration['svc_port_set_list'].append(create_svc_port_set(SEC_PORT))
|
||||
service_instance_configuration['node_url_list'].append(create_node_url(SEC_URL, idx_port))
|
||||
|
||||
payload = {
|
||||
"jti": str(uuid4()),
|
||||
"iss": "NLS Service Instance",
|
||||
@@ -209,17 +240,7 @@ async def _client_token():
|
||||
"update_mode": "ABSOLUTE",
|
||||
"scope_ref_list": [ALLOTMENT_REF],
|
||||
"fulfillment_class_ref_list": [],
|
||||
"service_instance_configuration": {
|
||||
"nls_service_instance_ref": INSTANCE_REF,
|
||||
"svc_port_set_list": [
|
||||
{
|
||||
"idx": 0,
|
||||
"d_name": "DLS",
|
||||
"svc_port_map": [{"service": "auth", "port": DLS_PORT}, {"service": "lease", "port": DLS_PORT}]
|
||||
}
|
||||
],
|
||||
"node_url_list": [{"idx": 0, "url": DLS_URL, "url_qr": DLS_URL, "svc_port_set_idx": 0}]
|
||||
},
|
||||
"service_instance_configuration": service_instance_configuration,
|
||||
"service_instance_public_key_configuration": {
|
||||
"service_instance_public_key_me": {
|
||||
"mod": hex(INSTANCE_KEY_PUB.public_key().n)[2:],
|
||||
@@ -239,6 +260,62 @@ async def _client_token():
|
||||
return response
|
||||
|
||||
|
||||
@app.get('/-/ha/replicate', summary='* HA Sync')
|
||||
async def _ha_replicate_to_ha(request: Request, background_tasks: BackgroundTasks):
|
||||
if HA_REPLICATE is None:
|
||||
logger.warning('HA replicate endpoint triggerd, but no value for "HA_REPLICATE" is set!')
|
||||
return JSONr(status_code=503, content={'status': 503, 'detail': 'no value for "HA_REPLICATE" set'})
|
||||
background_tasks.add_task(ha_replicate, logger, HA_REPLICATE, HA_ROLE, VERSION, DLS_URL, DLS_PORT, SITE_KEY_XID, INSTANCE_REF)
|
||||
return JSONr(status_code=202, content=None)
|
||||
|
||||
|
||||
@app.put('/-/ha/replicate', summary='* HA Sync')
|
||||
async def _ha_replicate_by_ha(request: Request):
|
||||
j, cur_time = json_loads((await request.body()).decode('utf-8')), datetime.utcnow()
|
||||
|
||||
if HA_REPLICATE is None:
|
||||
logger.warning(f'HA replicate endpoint triggerd, but no value for "HA_REPLICATE" is set!')
|
||||
return JSONr(status_code=503, content={'status': 503, 'detail': 'no value for "HA_REPLICATE" set'})
|
||||
|
||||
version = j.get('VERSION')
|
||||
if version != VERSION:
|
||||
logger.error(f'Version missmatch on HA replication task!')
|
||||
return JSONr(status_code=503, content={'status': 503, 'detail': 'Missmatch for "VERSION"'})
|
||||
|
||||
site_key_xid = j.get('SITE_KEY_XID')
|
||||
if site_key_xid != SITE_KEY_XID:
|
||||
logger.error(f'Site-Key missmatch on HA replication task!')
|
||||
return JSONr(status_code=503, content={'status': 503, 'detail': 'Missmatch for "SITE_KEY_XID"'})
|
||||
|
||||
instance_ref = j.get('INSTANCE_REF')
|
||||
if instance_ref != INSTANCE_REF:
|
||||
logger.error(f'Version missmatch on HA replication task!')
|
||||
return JSONr(status_code=503, content={'status': 503, 'detail': 'Missmatch for "INSTANCE_REF"'})
|
||||
|
||||
remote_time, max_seconds_behind = j.get('cur_time'), 30
|
||||
if remote_time <= cur_time - timedelta(seconds=max_seconds_behind):
|
||||
logger.error(f'Request time more than {max_seconds_behind}s behind!')
|
||||
return JSONr(status_code=503, content={'status': 503, 'detail': 'Request time behind'})
|
||||
|
||||
origins, leases = j.get('origins'), j.get('leases')
|
||||
for origin in origins:
|
||||
origin_ref = origin.get('origin_ref')
|
||||
logging.info(f'> [ ha ]: origin {origin_ref}')
|
||||
data = Origin.deserialize(origin)
|
||||
Origin.create_or_update(db, data)
|
||||
|
||||
for lease in leases:
|
||||
lease_ref = lease.get('lease_ref')
|
||||
x = Lease.find_by_lease_ref(db, lease_ref)
|
||||
if x.lease_updated > remote_time:
|
||||
continue
|
||||
logging.info(f'> [ ha ]: lease {lease_ref}')
|
||||
data = Lease.deserialize(lease)
|
||||
Lease.create_or_update(db, data)
|
||||
|
||||
return JSONr(status_code=202, content=None)
|
||||
|
||||
|
||||
# venv/lib/python3.9/site-packages/nls_services_auth/test/test_origins_controller.py
|
||||
@app.post('/auth/v1/origin', description='find or create an origin')
|
||||
async def auth_v1_origin(request: Request):
|
||||
@@ -545,6 +622,16 @@ async def app_on_startup():
|
||||
Your client-token file (.tok) is valid for {str(CLIENT_TOKEN_EXPIRE_DELTA)}.
|
||||
''')
|
||||
|
||||
if HA_REPLICATE is not None:
|
||||
from hashlib import md5
|
||||
logger.info(f'''
|
||||
HA mode is enabled. Make sure theses md5-hashes matches on all your nodes:
|
||||
- jwt_encode_key md5: "{str(md5(jwt_encode_key))}"
|
||||
- jwt_decode_key md5: "{str(md5(jwt_decode_key))}"
|
||||
|
||||
This node ({HA_ROLE}) listens to "https://{DLS_URL}:{DLS_PORT}" and replicates to "https://{HA_REPLICATE}".
|
||||
''')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import uvicorn
|
||||
|
||||
22
app/util.py
22
app/util.py
@@ -26,3 +26,25 @@ def generate_key() -> "RsaKey":
|
||||
from Cryptodome.PublicKey.RSA import RsaKey
|
||||
|
||||
return RSA.generate(bits=2048)
|
||||
|
||||
|
||||
def ha_replicate(logger: "logging.Logger", ha_replicate: str, ha_role: str, version: str, dls_url: str, dls_port: int, site_key_xid: str, instance_ref: str, origins: list["Origin"], leases: list["Lease"]):
|
||||
from datetime import datetime
|
||||
import httpx
|
||||
|
||||
data = {
|
||||
'VERSION': str(version),
|
||||
'HA_REPLICATE': f'{dls_url}:{dls_port}',
|
||||
'SITE_KEY_XID': str(site_key_xid),
|
||||
'INSTANCE_REF': str(instance_ref),
|
||||
'origins': [origin.serialize() for origin in origins],
|
||||
'leases': [lease.serialize() for lease in leases],
|
||||
'cur_time': datetime.utcnow(),
|
||||
}
|
||||
|
||||
r = httpx.put(f'https://{ha_replicate}/-/ha/replicate', json=data)
|
||||
if r.status_code == 202:
|
||||
logger.info(f'Successfully replicated this node ({ha_role}) to "{ha_replicate}".')
|
||||
else:
|
||||
logger.error(f'Failed to replicate this node ({ha_role}) to "{ha_replicate}": {r.status_code} - {r.content}')
|
||||
|
||||
|
||||
Reference in New Issue
Block a user