mirror of
https://gitea.publichub.eu/oscar.krause/fastapi-dls.git
synced 2024-11-22 14:28:48 +00:00
55 lines
1.9 KiB
Python
55 lines
1.9 KiB
Python
def load_file(filename) -> bytes:
|
|
with open(filename, 'rb') as file:
|
|
content = file.read()
|
|
return content
|
|
|
|
|
|
def load_key(filename) -> "RsaKey":
|
|
try:
|
|
# Crypto | Cryptodome on Debian
|
|
from Crypto.PublicKey import RSA
|
|
from Crypto.PublicKey.RSA import RsaKey
|
|
except ModuleNotFoundError:
|
|
from Cryptodome.PublicKey import RSA
|
|
from Cryptodome.PublicKey.RSA import RsaKey
|
|
|
|
return RSA.import_key(extern_key=load_file(filename), passphrase=None)
|
|
|
|
|
|
def generate_key() -> "RsaKey":
|
|
try:
|
|
# Crypto | Cryptodome on Debian
|
|
from Crypto.PublicKey import RSA
|
|
from Crypto.PublicKey.RSA import RsaKey
|
|
except ModuleNotFoundError:
|
|
from Cryptodome.PublicKey import RSA
|
|
from Cryptodome.PublicKey.RSA import RsaKey
|
|
|
|
return RSA.generate(bits=2048)
|
|
|
|
|
|
def ha_replicate(logger: "logging.Logger", ha_replicate: str, ha_role: str, version: str, dls_url: str, dls_port: int, site_key_xid: str, instance_ref: str, origins: list, leases: list) -> bool:
|
|
from datetime import datetime
|
|
import httpx
|
|
|
|
if f'{dls_url}:{dls_port}' == ha_replicate:
|
|
logger.error(f'Failed to replicate this node ({ha_role}) to "{ha_replicate}": can\'t replicate to itself')
|
|
return False
|
|
|
|
data = {
|
|
'VERSION': str(version),
|
|
'HA_REPLICATE': f'{dls_url}:{dls_port}',
|
|
'SITE_KEY_XID': str(site_key_xid),
|
|
'INSTANCE_REF': str(instance_ref),
|
|
'origins': origins,
|
|
'leases': leases,
|
|
'sync_timestamp': datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
r = httpx.put(f'https://{ha_replicate}/-/ha/replicate', json=data, verify=False)
|
|
if r.status_code == 202:
|
|
logger.info(f'Successfully replicated this node ({ha_role}) to "{ha_replicate}".')
|
|
return True
|
|
logger.error(f'Failed to replicate this node ({ha_role}) to "{ha_replicate}": {r.status_code} - {r.content}')
|
|
return False
|