Coriolis/Coriolis_stackit/provider.py

1802 lines
70 KiB
Python

"""
VOLLSTÄNDIGER CORIOLIS PROVIDER für STACKIT IaaS v2
Diese Datei implementiert die tatsächlichen Coriolis-Basisklassen
aus dem Repository https://github.com/cloudbase/coriolis.
HINWEIS:
- Diese Datei sollte in `coriolis_stackit/provider.py` liegen.
- Die Klasse heißt `StackitProvider`, passend zur `setup.py`.
- Die Wrapper-Klassen (`StackitInstance` etc.) müssen noch
vervollständigt werden (siehe TODOs).
"""
import base64
import json
import textwrap
import time
import uuid
import requests
try:
import boto3
from botocore.exceptions import ClientError
except ImportError:
boto3 = None
ClientError = Exception
# --- ECHTE Coriolis Basisklassen-Importe ---
# Diese Pfade basieren auf der Struktur im Coriolis GitHub Repo.
# Wenn Ihr Plugin korrekt installiert ist, sollte Python diese finden.
try:
from coriolis.provider.base import BaseProvider
from coriolis.provider.resource_utils import (
BaseAbstractInstance,
BaseAbstractVolume,
BaseAbstractSnapshot,
BaseAbstractNetwork,
BaseAbstractSubnet,
BaseAbstractImage,
BaseAbstractFlavor
)
from coriolis.exceptions import (
ProviderAuthenticationError,
ProviderConnectionError,
ProviderError,
ObjectNotFoundError
)
except ImportError:
print("WARNUNG: Coriolis-Basisklassen nicht gefunden. "
"Dies ist normal, wenn außerhalb einer Coriolis-Umgebung entwickelt wird.")
print("Verwende Dummy-Platzhalter-Klassen für die Entwicklung.")
# --- Dummy-Klassen für die Offline-Entwicklung ---
# Diese werden verwendet, wenn die Coriolis-Pakete nicht installiert sind,
# damit der Code zumindest geladen werden kann.
class BaseProvider:
def __init__(self, connection_details):
self.connection = connection_details # .get('connection', {})
self.provider_name = "dummy" # connection_details.get('provider')
self.client = self._get_client("http://dummy.url")
def _get_client(self, api_url):
# Dummy-Client für Offline-Entwicklung
print("WARNUNG: Erstelle Dummy-Client")
return None
class BaseAbstractInstance:
def __init__(self, provider, data):
self._provider = provider
self._data = data
class BaseAbstractVolume(BaseAbstractInstance): pass
class BaseAbstractSnapshot(BaseAbstractInstance): pass
class BaseAbstractNetwork(BaseAbstractInstance): pass
class BaseAbstractSubnet(BaseAbstractInstance): pass
class BaseAbstractImage(BaseAbstractInstance): pass
class BaseAbstractFlavor(BaseAbstractInstance): pass
class ProviderAuthenticationError(Exception): pass
class ProviderConnectionError(Exception): pass
class ProviderError(Exception): pass
class ObjectNotFoundError(Exception): pass
# --- STACKIT API Client ---
class StackitAPIClient:
"""
Eine Wrapper-Klasse für die STACKIT IaaS API v2.
Verwaltet die Authentifizierung (Bearer Token) und
API-Aufrufe mit der erforderlichen Projekt-ID.
"""
def __init__(
self,
api_url,
auth_url,
client_id,
client_secret,
project_id,
*,
region=None,
include_project_prefix=True,
project_segment="projects"
):
self.base_url = api_url.rstrip('/')
self.auth_url = auth_url
self.client_id = client_id
self.client_secret = client_secret
self.project_id = project_id
self.region = region
self.include_project_prefix = include_project_prefix
self.project_segment = project_segment.rstrip('/') if project_segment else "projects"
self.session = requests.Session()
self.token = None
self.token_expires_at = time.time()
try:
self._login() # Beim Initialisieren erstes Token holen
except requests.exceptions.RequestException as e:
print(f"STACKIT API: Initiale Authentifizierung fehlgeschlagen: {e}")
raise ProviderAuthenticationError(f"STACKIT Auth-Fehler: {e}")
def _login(self):
"""Holt ein neues OAuth 2.0 Bearer Token."""
print(f"STACKIT API: Authentifizierung für {self.base_url} wird durchgeführt...")
try:
payload = {
'grant_type': 'client_credentials',
'client_id': self.client_id,
'client_secret': self.client_secret
}
# STACKIT Identity API erwartet application/x-www-form-urlencoded
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
response = requests.post(self.auth_url, data=payload, headers=headers)
response.raise_for_status() # Löst HTTPError bei 4xx/5xx aus
data = response.json()
access_token = data.get('access_token')
if not access_token:
raise ProviderAuthenticationError("Kein 'access_token' in STACKIT Auth-Antwort gefunden.")
self.token = access_token
# 60 Sekunden Puffer
self.token_expires_at = time.time() + data.get('expires_in', 3600) - 60
self.session.headers.update({
'Authorization': f'Bearer {self.token}',
'Content-Type': 'application/json'
})
print("STACKIT API: Authentifizierung erfolgreich.")
except requests.exceptions.RequestException as e:
# Dies ist ein kritischer Fehler.
raise ProviderAuthenticationError(f"STACKIT Auth-Fehler: {e}")
def _check_token(self):
"""Prüft, ob das Token abgelaufen ist und erneuert es ggf."""
if time.time() >= self.token_expires_at:
print(f"STACKIT API: Token für {self.base_url} abgelaufen, wird erneuert...")
self._login() # _login wirft ProviderAuthenticationError bei Fehlern
def _build_url(self, path):
"""Baut die vollständige API-URL inkl. Projekt- und optional Region-ID."""
if not path.startswith("/"):
path = f"/{path}"
prefix = ""
if self.include_project_prefix:
prefix = f"/{self.project_segment}/{self.project_id}"
if self.region:
prefix += f"/regions/{self.region}"
return f"{self.base_url}{prefix}{path}"
def _request(self, method, path, **kwargs):
"""Zentrale Request-Methode mit Fehlerbehandlung."""
self._check_token()
url = self._build_url(path)
try:
response = self.session.request(method, url, **kwargs)
# Fehlerhafte Anfragen (4xx, 5xx) lösen hier eine Exception aus
response.raise_for_status()
# Für 204 (No Content) o.ä., z.B. bei DELETE
if not response.content:
return {'status_code': response.status_code, 'response_body': 'OK'}
return response.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
raise ObjectNotFoundError(f"Objekt nicht gefunden bei {method} {url}")
elif e.response.status_code in (401, 403):
# Token könnte serverseitig invalidiert worden sein
print(f"STACKIT API: 401/403 bei {url} erhalten, versuche erneute Authentifizierung...")
try:
self._login()
# Anfrage erneut versuchen (nur einmal)
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
if not response.content:
return {'status_code': response.status_code, 'response_body': 'OK'}
return response.json()
except Exception as retry_e:
raise ProviderAuthenticationError(f"Auth-Fehler (auch nach Retry) bei {method} {url}: {retry_e}")
else:
# Andere API-Fehler (z.B. 400, 500)
raise ProviderError(f"STACKIT API Fehler ({e.response.status_code}) "
f"bei {method} {url}: {e.response.text}")
except requests.exceptions.ConnectionError as e:
raise ProviderConnectionError(f"Verbindungsfehler zu {url}: {e}")
except Exception as e:
raise ProviderError(f"Allgemeiner Fehler: {e}")
# --- HTTP Methoden (GET, POST, PUT, DELETE) ---
def get(self, path, params=None):
return self._request('GET', path, params=params)
def post(self, path, data=None):
return self._request('POST', path, data=json.dumps(data) if data else None)
def put(self, path, data=None):
return self._request('PUT', path, data=json.dumps(data))
def delete(self, path):
return self._request('DELETE', path)
# --- Coriolis Wrapper-Klassen ---
# Diese Klassen MÜSSEN die Properties implementieren,
# die Coriolis in seinen Basisklassen (z.B. BaseAbstractInstance)
# als @abstractmethod definiert.
class StackitInstance(BaseAbstractInstance):
"""Wrapper für eine STACKIT VM-Instanz."""
def __init__(self, provider, data):
super(StackitInstance, self).__init__(provider, data)
self._data = data
@property
def id(self):
# STACKIT API: 'id'
return self._data.get('id')
@property
def name(self):
# STACKIT API: 'name'
return self._data.get('name')
@property
def status(self):
# STACKIT API: 'status' (z.B. 'CREATING', 'RUNNING', 'STOPPED', 'FAILED')
# Coriolis erwartet Status-Strings wie 'RUNNING', 'STOPPED', 'ERROR'
stackit_status = self._data.get('status')
# Mapping:
mapping = {
'RUNNING': 'RUNNING',
'STOPPED': 'STOPPED',
'CREATING': 'BUILDING',
'FAILED': 'ERROR',
'DELETING': 'DELETING',
}
# Fallback auf den Originalstatus oder UNKNOWN
return mapping.get(stackit_status, stackit_status or "UNKNOWN")
@property
def flavor_id(self):
# STACKIT API: 'flavorId'
return self._data.get('flavorId')
@property
def image_id(self):
# STACKIT API: 'imageId'
return self._data.get('imageId')
# --- Optionale, aber empfohlene Properties ---
@property
def availability_zone(self):
# STACKIT API: 'availabilityZone'
return self._data.get('availabilityZone')
@property
def networks(self):
# STACKIT API: 'networkIds' (Liste von IDs)
# Coriolis erwartet hier oft eine strukturierte Liste
# Für den Anfang reicht eine einfache Liste von IDs.
return self._data.get('networkIds', [])
@property
def volumes(self):
# STACKIT API: 'volumeIds' (Liste von IDs)
return self._data.get('volumeIds', [])
class StackitVolume(BaseAbstractVolume):
"""Wrapper für ein STACKIT Volume."""
def __init__(self, provider, data):
super(StackitVolume, self).__init__(provider, data)
self._data = data
@property
def id(self):
# STACKIT API: 'id'
return self._data.get('id')
@property
def name(self):
# STACKIT API: 'name'
return self._data.get('name')
@property
def size(self):
# STACKIT API: 'size' (in GB)
# Coriolis erwartet die Größe in GB
return self._data.get('size')
@property
def status(self):
# STACKIT API: 'status'
# Coriolis erwartet 'AVAILABLE', 'CREATING', 'IN-USE', 'ERROR'
stackit_status = self._data.get('status')
# TODO: Status-Mapping (falls nötig)
mapping = {
'AVAILABLE': 'AVAILABLE',
'CREATING': 'CREATING',
'IN_USE': 'IN-USE', # Annahme
'FAILED': 'ERROR',
}
return mapping.get(stackit_status, stackit_status or "UNKNOWN")
@property
def attachments(self):
# STACKIT API: 'attachedTo' (eine einzelne instanceId)
# Coriolis erwartet eine Liste von { "instance_id": "...", "device": "..." }
instance_id = self._data.get('attachedTo')
if instance_id:
# Device-Info fehlt in der STACKIT API-Antwort
return [{"instance_id": instance_id, "device": None}]
return []
# --- Optionale Properties ---
@property
def volume_type(self):
# STACKIT API: 'type'
return self._data.get('type') # z.B. 'storage_premium_perf1'
class StackitSnapshot(BaseAbstractSnapshot):
"""Wrapper für einen STACKIT Snapshot."""
def __init__(self, provider, data):
super(StackitSnapshot, self).__init__(provider, data)
self._data = data
@property
def id(self):
# STACKIT API: 'id'
return self._data.get('id')
@property
def name(self):
# STACKIT API: 'name'
return self._data.get('name')
@property
def size(self):
# STACKIT API: 'size' (in GB)
return self._data.get('size') # Größe in GB
@property
def status(self):
# STACKIT API: 'status'
# Coriolis erwartet 'AVAILABLE', 'CREATING', 'ERROR'
stackit_status = self._data.get('status')
mapping = {
'AVAILABLE': 'AVAILABLE',
'CREATING': 'CREATING',
'FAILED': 'ERROR',
}
return mapping.get(stackit_status, stackit_status or "UNKNOWN")
@property
def source_volume_id(self):
# STACKIT API: 'sourceVolumeId'
return self._data.get('sourceVolumeId')
# --- Wrapper für Flavor, Image, Network (Vervollständigt) ---
class StackitFlavor(BaseAbstractFlavor):
def __init__(self, provider, data):
super(StackitFlavor, self).__init__(provider, data)
self._data = data
@property
def id(self):
# STACKIT API: 'id'
return self._data.get('id')
@property
def name(self):
# STACKIT API: 'name'
return self._data.get('name')
@property
def vcpus(self):
# STACKIT API: 'cpu'
return self._data.get('cpu')
@property
def ram(self):
# STACKIT API: 'memory' (in GB)
# Coriolis erwartet RAM in MB
mem_gb = self._data.get('memory', 0)
return mem_gb * 1024
@property
def disk(self):
# STACKIT API: 'disk' (in GB)
return self._data.get('disk')
class StackitImage(BaseAbstractImage):
def __init__(self, provider, data):
super(StackitImage, self).__init__(provider, data)
self._data = data
@property
def id(self):
# STACKIT API: 'id'
return self._data.get('id')
@property
def name(self):
# STACKIT API: 'name'
return self._data.get('name')
@property
def status(self):
# STACKIT API: Hat keinen Status. Wir nehmen 'AVAILABLE' an.
return "AVAILABLE"
@property
def min_disk(self):
# STACKIT API: 'minDisk' (in GB)
return self._data.get('minDisk')
@property
def min_ram(self):
# STACKIT API: 'minRam' (in GB)
# Coriolis erwartet MB
ram_gb = self._data.get('minRam', 0)
return ram_gb * 1024
class StackitNetwork(BaseAbstractNetwork):
def __init__(self, provider, data):
super(StackitNetwork, self).__init__(provider, data)
self._data = data
@property
def id(self):
# STACKIT API: 'id'
return self._data.get('id')
@property
def name(self):
# STACKIT API: 'name'
return self._data.get('name')
# Coriolis' BaseAbstractNetwork erwartet Subnetz-Infos
@property
def subnets(self):
# STACKIT IaaS v2 hat keine expliziten Subnetze,
# aber das Network-Objekt hat 'cidrRange'.
# Wir erstellen ein "Dummy"-Subnetz-Objekt daraus.
cidr = self._data.get('cidrRange')
if cidr:
# Erstellen eines Dummy-Subnetz-Dicts
subnet_data = {
'id': f"{self.id}-subnet", # Künstliche ID
'name': f"{self.name}-subnet",
'cidr': cidr,
'network_id': self.id
}
# Wir verwenden die Basis-Subnetz-Klasse von Coriolis
return [BaseAbstractSubnet(self._provider, subnet_data)]
return []
# --- DER EIGENTLICHE PROVIDER ---
class StackitProvider(BaseProvider):
"""
Coriolis Provider für STACKIT IaaS v2.
"""
# Name, wie er in der setup.py registriert ist
PROVIDER_NAME = "stackit_v2"
def __init__(self, connection_details):
# 'connection_details' ist das volle Dict aus der .endpoint-Datei
# 'connection' ist der Schlüssel darin
super(StackitProvider, self).__init__(
connection_details.get('connection', {}))
self.provider_name = connection_details.get('provider') # "stackit_v2"
# --- STACKIT API Client Initialisierung (NEU) ---
conn = self.connection
self.iaas_api_url = conn.get('api_url')
self.auth_url = conn.get('auth_url')
self.client_id = conn.get('client_id')
self.client_secret = conn.get('client_secret')
self.project_id = conn.get('project_id')
self.region = conn.get('region')
# NEU: URL für die Object Storage Control Plane API
self.object_storage_api_url = conn.get('object_storage_api_url')
# Datentransfer-Konfiguration
self.migration_bucket_name = conn.get('migration_bucket_name', "coriolis-migration-data")
self.worker_image_id = conn.get('worker_image_id')
self.worker_flavor_id = conn.get('worker_flavor_id')
self.worker_network_id = conn.get('worker_network_id')
self.worker_availability_zone = conn.get('worker_availability_zone')
self.worker_security_group_ids = conn.get('worker_security_group_ids', [])
self.worker_keypair_name = conn.get('worker_keypair_name')
self.worker_metadata = conn.get('worker_metadata', {})
self.worker_user = conn.get('worker_user', "ubuntu")
self.worker_boot_volume_size = conn.get('worker_boot_volume_size', 20)
self.worker_cleanup = conn.get('worker_auto_cleanup', True)
self.object_storage_s3_endpoint = conn.get('object_storage_s3_endpoint_url')
self.run_command_api_url = conn.get('run_command_api_url')
self.run_command_template = conn.get('run_command_template', "RunShellScript")
self.run_command_timeout = conn.get('run_command_timeout', 7200)
self.os_morphing_assets = conn.get('os_morphing_assets', {})
if isinstance(self.worker_security_group_ids, str):
self.worker_security_group_ids = [self.worker_security_group_ids]
if not isinstance(self.worker_metadata, dict):
self.worker_metadata = {}
if not all([self.iaas_api_url, self.auth_url, self.client_id,
self.client_secret, self.project_id, self.object_storage_api_url, self.region]):
raise ProviderConnectionError(
"api_url, auth_url, client_id, client_secret, project_id, region "
"und object_storage_api_url sind in der .endpoint-Datei erforderlich.")
# Client für IaaS API
self.client = self._get_client(self.iaas_api_url, region=self.region)
# NEU: Zweiter Client für Object Storage API (Control Plane)
self.os_client = self._get_client(
self.object_storage_api_url,
region=self.region,
project_segment="project")
self.run_client = None
if self.run_command_api_url:
self.run_client = self._get_client(
self.run_command_api_url,
region=self.region,
project_segment="projects")
def _get_client(self, api_url, region=None, project_segment="projects"):
"""Erstellt den STACKIT API Client für eine gegebene API-URL."""
# Parameter werden jetzt von __init__ übergeben
return StackitAPIClient(
api_url,
self.auth_url,
self.client_id,
self.client_secret,
self.project_id,
region=region,
project_segment=project_segment)
def authenticate(self):
"""
Testet die Verbindung. _get_client() hat bereits
den ersten Login-Versuch unternommen. Wir machen
einen einfachen Lese-Aufruf, um sicherzugehen.
"""
try:
# Test 1: IaaS API
# STACKIT API: GET /projects/{projectId}/flavors?limit=1
self.client.get("/flavors", params={'limit': 1})
print("STACKIT IaaS API: Authentifizierung und Verbindung erfolgreich.")
# NEU: Test 2: Object Storage API
# STACKIT OS API: GET /projects/{projectId}/buckets?limit=1
self.os_client.get("/buckets", params={'limit': 1})
print("STACKIT Object Storage API: Authentifizierung und Verbindung erfolgreich.")
except (ProviderConnectionError, ProviderAuthenticationError) as e:
# Diese Fehler werden bereits von _get_client oder _request ausgelöst
print(f"STACKIT API: Authentifizierung fehlgeschlagen: {e}")
raise
except Exception as e:
# Fallback
raise ProviderError(f"Unerwarteter Fehler bei Authentifizierung: {e}")
# --- Inventarisierung (Listen-Methoden) ---
def list_instances(self, filter_opts=None):
"""Gibt eine Liste von VMs zurück."""
# STACKIT API: GET /projects/{projectId}/instances
api_data = self.client.get("/instances")
instances = []
# 'instances' ist der Schlüssel in der STACKIT API-Antwort
for instance_data in api_data.get('instances', []):
instances.append(StackitInstance(self, instance_data))
return instances
def get_instance(self, instance_id):
"""Gibt Details zu einer einzelnen VM zurück."""
try:
# STACKIT API: GET /projects/{projectId}/instances/{instanceId}
instance_data = self.client.get(f"/instances/{instance_id}")
return StackitInstance(self, instance_data)
except ObjectNotFoundError:
raise ObjectNotFoundError(f"Instanz {instance_id} nicht gefunden")
def list_volumes(self, filter_opts=None):
"""Gibt eine Liste von Volumes zurück."""
# STACKIT API: GET /projects/{projectId}/volumes
api_data = self.client.get("/volumes")
volumes = []
for volume_data in api_data.get('volumes', []):
volumes.append(StackitVolume(self, volume_data))
return volumes
def get_volume(self, volume_id):
try:
# STACKIT API: GET /projects/{projectId}/volumes/{volumeId}
volume_data = self.client.get(f"/volumes/{volume_id}")
return StackitVolume(self, volume_data)
except ObjectNotFoundError:
raise ObjectNotFoundError(f"Volume {volume_id} nicht gefunden")
def list_snapshots(self, filter_opts=None):
# STACKIT API: GET /projects/{projectId}/snapshots
api_data = self.client.get("/snapshots")
snapshots = []
for snap_data in api_data.get('snapshots', []):
snapshots.append(StackitSnapshot(self, snap_data))
return snapshots
def get_snapshot(self, snapshot_id):
try:
# STACKIT API: GET /projects/{projectId}/snapshots/{snapshotId}
snap_data = self.client.get(f"/snapshots/{snapshot_id}")
return StackitSnapshot(self, snap_data)
except ObjectNotFoundError:
raise ObjectNotFoundError(f"Snapshot {snapshot_id} nicht gefunden")
def list_networks(self, filter_opts=None):
# STACKIT API: GET /projects/{projectId}/networks
api_data = self.client.get("/networks")
networks = []
for net_data in api_data.get('networks', []):
networks.append(StackitNetwork(self, net_data))
return networks
def list_subnets(self, network_id=None, filter_opts=None):
# STACKIT IaaS v2 hat keine separaten Subnetze.
# Wir geben die "Dummy"-Subnetze zurück, die in StackitNetwork erstellt wurden.
print("INFO: list_subnets() gibt abgeleitete Subnetze von list_networks() zurück.")
all_subnets = []
networks = self.list_networks(filter_opts=filter_opts)
for net in networks:
if network_id and net.id != network_id:
continue
all_subnets.extend(net.subnets)
return all_subnets
def list_images(self, filter_opts=None):
# STACKIT API: GET /projects/{projectId}/images
api_data = self.client.get("/images")
images = []
for image_data in api_data.get('images', []):
images.append(StackitImage(self, image_data))
return images
def list_flavors(self, filter_opts=None):
# STACKIT API: GET /projects/{projectId}/flavors
api_data = self.client.get("/flavors")
flavors = []
for flavor_data in api_data.get('flavors', []):
flavors.append(StackitFlavor(self, flavor_data))
return flavors
# --- Lebenszyklus-Management ---
def power_on_instance(self, instance_id):
"""Schaltet eine Instanz ein."""
# STACKIT API: POST /projects/{projectId}/instances/{instanceId}/start
task = self.client.post(
f"/instances/{instance_id}/start")
self._wait_for_instance_status(
instance_id,
target_states=("RUNNING",),
error_states=("ERROR", "FAILED"))
return task
def power_off_instance(self, instance_id):
"""Schaltet eine Instanz aus."""
# STACKIT API: POST /projects/{projectId}/instances/{instanceId}/stop
task = self.client.post(
f"/instances/{instance_id}/stop")
self._wait_for_instance_status(
instance_id,
target_states=("STOPPED",),
error_states=("ERROR", "FAILED"))
return task
def get_instance_status(self, instance_id):
"""Holt den aktuellen Status einer VM (z.B. 'RUNNING', 'STOPPED')."""
return self.get_instance(instance_id).status
# --- Storage-Operationen ---
def create_volume(self, name, size_gb, description=None, volume_type=None,
snapshot_id=None, image_id=None, availability_zone=None):
"""Erstellt ein neues Volume."""
# STACKIT API: POST /projects/{projectId}/volumes
payload = {
'name': name,
'size': size_gb,
'type': volume_type,
'availabilityZone': availability_zone,
}
# STACKIT API unterstützt scheinbar nicht 'create from snapshot' oder 'image'
if snapshot_id or image_id:
raise NotImplementedError("STACKIT API unterstützt create_volume "
"aus Snapshot/Image nicht.")
# Entferne None-Werte, damit die API sie nicht als 'null' interpretiert
payload = {k: v for k, v in payload.items() if v is not None}
volume_data = self.client.post("/volumes", data=payload)
volume = StackitVolume(self, volume_data)
self._wait_for_volume_status(volume.id)
return volume
def delete_volume(self, volume_id):
"""Löscht ein Volume."""
# STACKIT API: DELETE /projects/{projectId}/volumes/{volumeId}
self.client.delete(f"/volumes/{volume_id}")
return True
def attach_volume(self, instance_id, volume_id, device_path=None):
"""Hängt ein Volume an eine Instanz an."""
# STACKIT API: POST /projects/{projectId}/instances/{instanceId}/volumes
# Die API-Doku ist hier unklar. Ich nehme an, der Body ist:
payload = { "volumeId": volume_id }
self.client.post(f"/instances/{instance_id}/volumes", data=payload)
return True
# print(f"WARNUNG: attach_volume (nachträglich) scheint von STACKIT IaaS v2 nicht unterstützt zu werden.")
# raise NotImplementedError("attach_volume scheint nicht unterstützt zu werden.")
def detach_volume(self, instance_id, volume_id):
"""Hängt ein Volume von einer Instanz ab."""
# STACKIT API: DELETE /projects/{projectId}/instances/{instanceId}/volumes/{volumeId}
self.client.delete(f"/instances/{instance_id}/volumes/{volume_id}")
return True
# print(f"WARNUNG: detach_volume scheint von STACKIT IaaS v2 nicht unterstützt zu werden.")
# raise NotImplementedError("detach_volume scheint nicht unterstützt zu werden.")
def create_snapshot(self, volume_id, snapshot_name, description=None, force=False):
"""Erstellt einen Snapshot."""
# STACKIT API: POST /projects/{projectId}/volumes/{volumeId}/snapshot
payload = { 'name': snapshot_name }
snapshot_data = self.client.post(f"/volumes/{volume_id}/snapshot", data=payload)
snapshot = StackitSnapshot(self, snapshot_data)
self._wait_for_snapshot_status(snapshot.id)
return snapshot
def delete_snapshot(self, snapshot_id):
"""Löscht einen Snapshot."""
# STACKIT API: DELETE /projects/{projectId}/snapshots/{snapshotId}
self.client.delete(f"/snapshots/{snapshot_id}")
self._wait_for_snapshot_deletion(snapshot_id)
return True
def get_snapshot_status(self, snapshot_id):
"""Holt den Status eines Snapshots."""
return self.get_snapshot(snapshot_id).status
# --- Datentransfer & OS-Morphing (SEHR KOMPLEX) ---
# (Aktualisiert mit Object Storage API-Logik)
def export_snapshot_to_url(self, snapshot_id, *args, **kwargs):
"""Exportiert einen Snapshot in einen S3-kompatiblen Bucket."""
bucket_override = kwargs.get("bucket_name")
object_name = kwargs.get("object_name")
timeout = kwargs.get("timeout", 7200)
snapshot = self.get_snapshot(snapshot_id)
bucket_name = bucket_override or self.migration_bucket_name
bucket = self._get_or_create_migration_bucket(bucket_name)
s3_creds = self._create_temp_s3_credentials()
object_name = object_name or f"{snapshot.id}-{int(time.time())}.img"
job_name = f"coriolis-export-{uuid.uuid4().hex[:8]}"
endpoint_url = self._determine_s3_endpoint(bucket)
worker_volume = None
worker_instance = None
try:
worker_volume = self._create_volume_from_source(
name=f"{job_name}-src",
source_type="snapshot",
source_id=snapshot_id,
availability_zone=getattr(snapshot, "availability_zone", None)
)
job_payload = {
"operation": "export",
"source_volume_id": worker_volume.id,
"s3_target": {
"bucket": bucket_name,
"object": object_name,
"endpoint_url": endpoint_url,
"access_key": s3_creds["access_key"],
"secret_key": s3_creds["secret_key"]
}
}
user_data = self._render_worker_user_data(job_payload)
worker_instance = self._launch_worker_instance(
job_name=job_name,
user_data=user_data,
attached_volume_ids=[worker_volume.id]
)
self._execute_worker_job(worker_instance, job_payload, timeout=timeout)
if worker_volume:
self.detach_volume(worker_instance.id, worker_volume.id)
self._wait_for_volume_status(worker_volume.id)
self._verify_object_exists(bucket_name, object_name, endpoint_url, s3_creds)
s3_url = self._build_bucket_object_url(bucket, object_name)
result = {
"object_url": s3_url,
"bucket": bucket_name,
"object": object_name,
"credentials": s3_creds,
"worker_instance_id": worker_instance.id if worker_instance else None,
"worker_volume_id": worker_volume.id if worker_volume else None
}
return result
finally:
if worker_instance and self.worker_cleanup:
try:
if worker_volume:
self.detach_volume(worker_instance.id, worker_volume.id)
except Exception:
pass
self._cleanup_worker_instance(worker_instance.id)
if worker_volume:
self._delete_volume_safe(worker_volume.id)
def create_volume_from_url(self, name, size_gb, image_url, *args, **kwargs):
"""Erstellt ein Volume, das mit den Daten eines externen URLs befüllt wird."""
timeout = kwargs.get("timeout", 7200)
source_credentials = kwargs.get("source_credentials", {})
availability_zone = kwargs.get("availability_zone", self.worker_availability_zone)
target_volume = self.create_volume(
name=name,
size_gb=size_gb,
availability_zone=availability_zone
)
self._wait_for_volume_status(target_volume.id)
job_name = f"coriolis-import-{uuid.uuid4().hex[:8]}"
worker_instance = None
try:
if source_credentials.get("type") == "s3" or (
source_credentials.get("bucket") and source_credentials.get("object")):
source_cfg = {
"type": "s3",
"bucket": source_credentials.get("bucket"),
"object": source_credentials.get("object"),
"endpoint_url": source_credentials.get("endpoint_url"),
"access_key": source_credentials.get("access_key"),
"secret_key": source_credentials.get("secret_key")
}
else:
source_cfg = {
"type": "http",
"url": image_url
}
job_payload = {
"operation": "import",
"target_volume_id": target_volume.id,
"source": source_cfg
}
user_data = self._render_worker_user_data(job_payload)
worker_instance = self._launch_worker_instance(
job_name=job_name,
user_data=user_data,
attached_volume_ids=[target_volume.id]
)
self._execute_worker_job(worker_instance, job_payload, timeout=timeout)
self.detach_volume(worker_instance.id, target_volume.id)
self._wait_for_volume_status(target_volume.id)
return self.get_volume(target_volume.id)
finally:
if worker_instance and self.worker_cleanup:
try:
self.detach_volume(worker_instance.id, target_volume.id)
except Exception:
pass
self._cleanup_worker_instance(worker_instance.id)
# --- OS-Morphing via Run-Command ---
def prepare_instance_for_os_morphing(
self, volume_id, os_type="linux", network_configuration=None,
timeout=5400):
"""Setzt Netzwerk-/Cloud-Init-Konfigurationen auf dem Volume zurück."""
payload = {
"network": network_configuration or {
"dhcp4": True,
"dhcp6": True,
"interface": "eth0"
}
}
return self._run_os_morphing_phase(
volume_id=volume_id,
os_type=os_type,
phase="prepare",
extra_payload=payload,
timeout=timeout)
def inject_drivers(
self, volume_id, os_type="linux", drivers=None, timeout=5400):
"""Installiert VirtIO-Treiber und baut die initramfs neu."""
payload = {
"drivers": drivers or {
"virtio": True
}
}
return self._run_os_morphing_phase(
volume_id=volume_id,
os_type=os_type,
phase="drivers",
extra_payload=payload,
timeout=timeout)
def _run_os_morphing_phase(
self, volume_id, os_type, phase, extra_payload=None,
timeout=5400):
"""
Startet eine Worker-VM, hängt das Volume an und führt die Morphing-
Phase über die Run-Command API aus.
"""
self._wait_for_volume_status(volume_id)
job_payload = {
"operation": "os_morphing",
"phase": phase,
"os_type": (os_type or "linux").lower(),
"volume_id": volume_id
}
assets_cfg = self._get_os_morphing_assets(os_type)
if assets_cfg:
job_payload["assets"] = assets_cfg
if extra_payload:
job_payload.update(extra_payload)
user_data = self._render_worker_user_data(job_payload)
job_name = f"coriolis-morph-{phase}-{uuid.uuid4().hex[:8]}"
worker_instance = None
try:
worker_instance = self._launch_worker_instance(
job_name=job_name,
user_data=user_data,
attached_volume_ids=[volume_id])
self._execute_worker_job(
worker_instance,
job_payload,
timeout=timeout)
try:
self.detach_volume(worker_instance.id, volume_id)
except Exception:
pass
self._wait_for_volume_status(volume_id)
return {
"status": "completed",
"phase": phase,
"volume_id": volume_id
}
finally:
if worker_instance and self.worker_cleanup:
try:
self.detach_volume(worker_instance.id, volume_id)
except Exception:
pass
self._cleanup_worker_instance(worker_instance.id)
# --- Instanz-Erstellung ---
def create_instance(self, name, image_id, flavor_id, network_id,
availability_zone=None, root_volume_size=None,
volumes=None, boot_volume=None, **kwargs):
"""Erstellt eine neue Instanz."""
# STACKIT API: POST /projects/{projectId}/instances
# 'volumes' ist in Coriolis eine Liste von { "id": "vol-id", "device": "/dev/vdb" }
# STACKIT erwartet nur eine Liste von Volume-IDs.
volume_ids = [v['id'] for v in volumes] if volumes else []
payload = {
"name": name,
"flavorId": flavor_id,
"networkIds": [network_id],
"volumeIds": volume_ids,
"availabilityZone": availability_zone,
}
if boot_volume:
payload["bootVolume"] = boot_volume
else:
payload["imageId"] = image_id
metadata = kwargs.get("metadata")
if metadata:
payload["metadata"] = metadata
security_group_ids = kwargs.get("security_group_ids")
if security_group_ids:
payload["securityGroupIds"] = security_group_ids
keypair_name = kwargs.get("keypair_name")
if keypair_name:
payload["keypairName"] = keypair_name
user_data = kwargs.get("user_data")
if user_data:
payload["userData"] = self._encode_user_data(user_data)
# TODO: STACKIT API Doku für 'create' ist unklar bzgl.
# root_volume_size. Es gibt kein 'root_volume_size'-Feld.
# Evtl. muss ein Volume separat erstellt und als 'bootable' markiert werden?
if root_volume_size and not boot_volume:
print(f"WARNUNG: 'root_volume_size' ({root_volume_size}GB) wird "
"ohne 'bootVolume' nicht berücksichtigt.")
# Entferne None-Werte
payload = {k: v for k, v in payload.items() if v is not None}
try:
instance_data = self.client.post("/instances", data=payload)
return StackitInstance(self, instance_data)
except Exception as e:
raise ProviderError(f"Fehler bei create_instance: {e}")
def delete_instance(self, instance_id):
"""Löscht eine Instanz."""
# STACKIT API: DELETE /projects/{projectId}/instances/{instanceId}
try:
self.client.delete(f"/instances/{instance_id}")
return True
except Exception as e:
raise ProviderError(f"Fehler bei delete_instance: {e}")
# --- Datentransfer-Helfer (NEU) ---
# Implementiert mit der Object Storage Control Plane API
def _get_or_create_migration_bucket(self, bucket_name):
"""
Prüft, ob ein Bucket existiert. Wenn nicht, erstellt er ihn.
Verwendet den Object Storage Client (self.os_client).
"""
try:
# STACKIT OS API: GET /v2/project/{projectId}/regions/{region}/bucket/{bucketName}
bucket_data = self.os_client.get(f"/bucket/{bucket_name}")
print(f"STACKIT OS: Migration-Bucket '{bucket_name}' gefunden.")
return bucket_data
except ObjectNotFoundError:
print(f"STACKIT OS: Migration-Bucket '{bucket_name}' nicht gefunden. Erstelle...")
try:
# STACKIT OS API: POST /v2/project/{projectId}/regions/{region}/bucket/{bucketName}
payload = { "name": bucket_name }
new_bucket_data = self.os_client.post(f"/bucket/{bucket_name}", data=payload)
print(f"STACKIT OS: Bucket '{bucket_name}' erstellt.")
return new_bucket_data
except Exception as e:
raise ProviderError(f"Fehler beim Erstellen des Buckets '{bucket_name}': {e}")
except Exception as e:
raise ProviderError(f"Fehler beim Prüfen des Buckets '{bucket_name}': {e}")
def _create_temp_s3_credentials(self):
"""
Erstellt temporäre S3-Zugangsdaten für den Datentransfer.
"""
print("STACKIT OS: Erstelle temporäre S3-Zugangsdaten...")
try:
# STACKIT OS API: POST /v2/project/{projectId}/regions/{region}/access-key
payload = {}
creds_data = self.os_client.post("/access-key", data=payload)
# Wichtig: Wir brauchen 'accessKey', 'secretAccessKey' und 's3Endpoint'
s3_creds = {
"id": creds_data.get("keyId"),
"access_key": creds_data.get("accessKey"),
"secret_key": creds_data.get("secretAccessKey"),
"s3_endpoint_url": self.object_storage_s3_endpoint
}
if not all([s3_creds["id"], s3_creds["access_key"], s3_creds["secret_key"]]):
raise ProviderError("Unvollständige S3-Zugangsdaten von STACKIT API erhalten.")
print("STACKIT OS: Temporäre S3-Zugangsdaten erstellt.")
return s3_creds
except Exception as e:
raise ProviderError(f"Fehler beim Erstellen der S3-Zugangsdaten: {e}")
def _delete_temp_s3_credentials(self, credential_id):
"""Löscht temporäre S3-Zugangsdaten."""
print(f"STACKIT OS: Lösche temporäre S3-Zugangsdaten (ID: {credential_id})...")
try:
# STACKIT OS API: DELETE /v2/project/{projectId}/regions/{region}/access-key/{credentialId}
self.os_client.delete(f"/access-key/{credential_id}")
print("STACKIT OS: S3-Zugangsdaten gelöscht.")
return True
except Exception as e:
# Nicht-kritischer Fehler (Cleanup)
print(f"WARNUNG: Löschen der S3-Zugangsdaten {credential_id} fehlgeschlagen: {e}")
return False
def _ensure_run_command_available(self):
if not self.run_client:
raise ProviderError(
"run_command_api_url ist nicht konfiguriert. "
"Bitte ergänzen Sie die Endpoint-Datei um 'run_command_api_url'.")
def _run_worker_command(self, instance_id, script, *, command_template=None,
timeout=None, poll_interval=5):
"""Führt ein Skript über die STACKIT Run-Command API aus."""
self._ensure_run_command_available()
template = command_template or self.run_command_template or "RunShellScript"
payload = {
"commandTemplateName": template,
"parameters": {
"script": script
}
}
response = self.run_client.post(
f"/servers/{instance_id}/commands",
data=payload)
command_id = response.get("id")
if command_id is None:
raise ProviderError(
f"Run-Command API hat keine commandId geliefert: {response}")
timeout = timeout or self.run_command_timeout or 3600
start = time.time()
details = None
while True:
details = self.run_client.get(
f"/servers/{instance_id}/commands/{command_id}")
status = details.get("status")
if status in ("completed", "failed"):
break
if time.time() - start >= timeout:
raise ProviderError(
f"Run-Command Zeitüberschreitung nach {timeout}s "
f"(commandId={command_id}, letzter Status={status}).")
time.sleep(poll_interval)
exit_code = details.get("exitCode")
if status != "completed" or (exit_code not in (None, 0)):
output = details.get("output")
raise ProviderError(
f"Worker-Befehl fehlgeschlagen (Status={status}, "
f"ExitCode={exit_code}). Ausgabe: {output}")
return details
def _build_worker_job_script(self, job_payload):
"""Erzeugt ein Shell-Skript, das den Transfer-Job via Python ausführt."""
job_literal = json.dumps(job_payload)
python_code_template = """
import json
import os
import subprocess
import sys
import urllib.request
try:
import boto3
except ImportError:
boto3 = None
JOB = __JOB_PAYLOAD__
def download_asset(asset_cfg, destination):
if not asset_cfg:
return None
source_type = asset_cfg.get("type", "s3")
os.makedirs(os.path.dirname(destination), exist_ok=True)
if source_type == "s3":
if boto3 is None:
raise RuntimeError(
"boto3 wird benötigt, um Assets aus S3 zu laden.")
session = boto3.session.Session(
aws_access_key_id=asset_cfg.get("access_key"),
aws_secret_access_key=asset_cfg.get("secret_key"))
client = session.client(
"s3", endpoint_url=asset_cfg.get("endpoint_url"))
client.download_file(
asset_cfg["bucket"],
asset_cfg["object"],
destination)
return destination
elif source_type == "http":
url = asset_cfg.get("url")
if not url:
raise RuntimeError("HTTP Asset ohne URL angegeben.")
with urllib.request.urlopen(url) as resp, open(
destination, "wb") as dst:
while True:
chunk = resp.read(4 * 1024 * 1024)
if not chunk:
break
dst.write(chunk)
return destination
else:
raise RuntimeError(
f"Unbekannter Asset-Typ: {source_type}")
def _lsblk():
data = subprocess.check_output(
["lsblk", "-J", "-o", "NAME,TYPE,MOUNTPOINT,FSTYPE"]
).decode("utf-8")
return json.loads(data)
def _has_mount(entry):
if entry.get("mountpoint"):
return True
for child in entry.get("children") or []:
if _has_mount(child):
return True
return False
def find_data_device():
lsblk = _lsblk()
for dev in lsblk.get("blockdevices", []):
if dev.get("type") != "disk":
continue
if _has_mount(dev):
continue
return "/dev/" + dev["name"]
raise RuntimeError("Kein zusätzliches Daten-Volume gefunden.")
def get_disk_entry(device_path):
device_name = device_path.replace("/dev/", "")
lsblk = _lsblk()
for dev in lsblk.get("blockdevices", []):
if dev.get("name") == device_name:
return dev
return None
def find_linux_root_partition(disk_entry):
candidates = (disk_entry or {}).get("children") or []
fs_preference = ("ext4", "ext3", "xfs", "btrfs")
for preferred in fs_preference:
for child in candidates:
if (child.get("type") == "part" and
(child.get("fstype") or "").lower() == preferred):
return "/dev/" + child["name"]
for child in candidates:
if child.get("type") == "part":
return "/dev/" + child["name"]
return "/dev/" + (disk_entry or {}).get("name", "")
def _mount(target, mountpoint, options=None):
os.makedirs(mountpoint, exist_ok=True)
cmd = ["mount"]
if options:
cmd.extend(["-o", options])
cmd.extend([target, mountpoint])
subprocess.run(cmd, check=True)
def _umount(target):
if os.path.ismount(target):
subprocess.run(["umount", "-lf", target], check=False)
def _bind_mount(base, name):
src = f"/{name}"
dst = os.path.join(base, name)
os.makedirs(dst, exist_ok=True)
subprocess.run(["mount", "--bind", src, dst], check=True)
return dst
def _unbind_mount(path):
if os.path.ismount(path):
subprocess.run(["umount", "-lf", path], check=False)
def _write_file(path, content):
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as stream:
stream.write(content)
def _append_line(path, line):
with open(path, "a", encoding="utf-8") as stream:
stream.write(line + "\\n")
def _ensure_cloud_init(mount_dir):
cfg_dir = os.path.join(mount_dir, "etc/cloud/cloud.cfg.d")
if os.path.isdir(os.path.join(mount_dir, "etc/cloud")):
content = \"\"\"# Coriolis injected config
datasource_list: [ ConfigDrive, Ec2, NoCloud ]
\"\"\"
_write_file(os.path.join(cfg_dir, "90-coriolis.cfg"), content)
def _configure_netplan(mount_dir, network):
eth_name = network.get("interface", "eth0")
dhcp4 = str(network.get("dhcp4", True)).lower()
dhcp6 = str(network.get("dhcp6", True)).lower()
netplan_yaml = f\"\"\"network:
version: 2
renderer: networkd
ethernets:
{eth_name}:
dhcp4: {dhcp4}
dhcp6: {dhcp6}
\"\"\"
_write_file(os.path.join(
mount_dir, "etc/netplan/99-coriolis.yaml"), netplan_yaml)
def _configure_interfaces_file(mount_dir, network):
content = \"\"\"auto lo
iface lo inet loopback
auto eth0
iface eth0 inet dhcp
\"\"\"
_write_file(os.path.join(mount_dir, "etc/network/interfaces"), content)
def _configure_sysconfig_network(mount_dir, network):
content = \"\"\"DEVICE=eth0
BOOTPROTO=dhcp
ONBOOT=yes
TYPE=Ethernet
PEERDNS=yes
NM_CONTROLLED=no
\"\"\"
scripts_dir = os.path.join(
mount_dir, "etc/sysconfig/network-scripts")
_write_file(os.path.join(scripts_dir, "ifcfg-eth0"), content)
def _ensure_grub_cmdline(mount_dir):
grub_path = os.path.join(mount_dir, "etc/default/grub")
if not os.path.exists(grub_path):
return
with open(grub_path, "r", encoding="utf-8") as stream:
data = stream.read()
if "GRUB_CMDLINE_LINUX" not in data:
data += "\\nGRUB_CMDLINE_LINUX=\"\"\\n"
addition = "net.ifnames=0 biosdevname=0"
if addition not in data:
data = data.replace(
"GRUB_CMDLINE_LINUX=\"",
f"GRUB_CMDLINE_LINUX=\"{addition} ")
_write_file(grub_path, data)
def _cleanup_persistent_rules(mount_dir):
udev_path = os.path.join(
mount_dir, "etc/udev/rules.d/70-persistent-net.rules")
if os.path.exists(udev_path):
os.remove(udev_path)
def _prep_linux_network(mount_dir, network):
_cleanup_persistent_rules(mount_dir)
_ensure_grub_cmdline(mount_dir)
_ensure_cloud_init(mount_dir)
if os.path.isdir(os.path.join(mount_dir, "etc/netplan")):
_configure_netplan(mount_dir, network)
elif os.path.isdir(
os.path.join(mount_dir, "etc/sysconfig/network-scripts")):
_configure_sysconfig_network(mount_dir, network)
else:
_configure_interfaces_file(mount_dir, network)
def _write_kernel_module_overrides(mount_dir):
modules_dir = os.path.join(
mount_dir, "etc/modules-load.d")
content = \"\"\"virtio
virtio_blk
virtio_pci
virtio_net
virtio_scsi
\"\"\"
_write_file(os.path.join(
modules_dir, "coriolis-virtio.conf"), content)
dracut_dir = os.path.join(
mount_dir, "etc/dracut.conf.d")
if os.path.isdir(os.path.join(mount_dir, "etc/dracut.conf.d")):
dracut_cfg = 'add_drivers+=\" virtio virtio_blk virtio_pci virtio_net virtio_scsi \"\\n'
_write_file(os.path.join(
dracut_dir, "coriolis-virtio.conf"), dracut_cfg)
def _chroot_exec(mount_dir, cmd):
binds = []
for name in ("dev", "proc", "sys"):
binds.append(_bind_mount(mount_dir, name))
env = os.environ.copy()
env["PATH"] = "/usr/sbin:/usr/bin:/sbin:/bin"
try:
subprocess.run(
["chroot", mount_dir] + cmd,
check=True, env=env)
finally:
for target in reversed(binds):
_unbind_mount(target)
def _rebuild_initramfs(mount_dir):
if os.path.exists(os.path.join(
mount_dir, "usr/sbin/update-initramfs")):
_chroot_exec(mount_dir, ["update-initramfs", "-u"])
elif os.path.exists(os.path.join(
mount_dir, "usr/bin/dracut")) or os.path.exists(
os.path.join(mount_dir, "usr/sbin/dracut")):
_chroot_exec(mount_dir, ["dracut", "-f"])
def _linux_os_morph(device, job):
assets_cfg = (job.get("assets") or {})
script_cfg = assets_cfg.get("script")
if script_cfg:
script_path = download_asset(
script_cfg,
"/var/lib/coriolis/assets/linux-script.sh")
os.chmod(script_path, 0o755)
env = os.environ.copy()
env["CORIOLIS_DEVICE"] = device
env["CORIOLIS_JOB_JSON"] = json.dumps(job)
subprocess.run(
[script_path, device],
check=True,
env=env)
return
disk_entry = get_disk_entry(device)
if not disk_entry:
raise RuntimeError("lsblk Daten für device wurden nicht gefunden.")
root_part = find_linux_root_partition(disk_entry)
if not root_part or not os.path.exists(root_part):
raise RuntimeError("Konnte Root-Partition nicht bestimmen.")
mount_dir = "/mnt/coriolis-root"
_mount(root_part, mount_dir)
try:
network = job.get("network") or {"dhcp4": True, "dhcp6": True}
_prep_linux_network(mount_dir, network)
phase = job.get("phase", "prepare")
if phase == "drivers":
_write_kernel_module_overrides(mount_dir)
_rebuild_initramfs(mount_dir)
finally:
_umount(mount_dir)
def upload_to_s3(device, cfg):
if boto3 is None:
raise RuntimeError("boto3 ist nicht installiert (S3-Upload).")
session = boto3.session.Session(
aws_access_key_id=cfg["access_key"],
aws_secret_access_key=cfg["secret_key"]
)
client = session.client("s3", endpoint_url=cfg.get("endpoint_url"))
with open(device, "rb") as src:
client.upload_fileobj(src, cfg["bucket"], cfg["object"])
def download_from_s3(device, cfg):
if boto3 is None:
raise RuntimeError("boto3 ist nicht installiert (S3-Download).")
session = boto3.session.Session(
aws_access_key_id=cfg["access_key"],
aws_secret_access_key=cfg["secret_key"]
)
client = session.client("s3", endpoint_url=cfg.get("endpoint_url"))
with open(device, "wb") as dst:
client.download_fileobj(cfg["bucket"], cfg["object"], dst)
def download_from_url(device, url):
with urllib.request.urlopen(url) as resp, open(device, "wb") as dst:
while True:
chunk = resp.read(1024 * 1024)
if not chunk:
break
dst.write(chunk)
def wipe_device(device):
subprocess.run(["wipefs", "--all", device], check=False)
subprocess.run(["sgdisk", "--zap-all", device], check=False)
def _windows_os_morph(device, job):
assets_cfg = (job.get("assets") or {})
script_cfg = assets_cfg.get("script")
if not script_cfg:
raise RuntimeError(
"Windows OS-Morphing erfordert ein Script-Asset.")
script_path = download_asset(
script_cfg,
"/var/lib/coriolis/assets/windows-script.sh")
os.chmod(script_path, 0o755)
env = os.environ.copy()
env["CORIOLIS_DEVICE"] = device
env["CORIOLIS_JOB_JSON"] = json.dumps(job)
subprocess.run(
[script_path, device],
check=True,
env=env)
def run_job():
device = find_data_device()
operation = JOB.get("operation")
if operation == "export":
upload_to_s3(device, JOB["s3_target"])
elif operation == "import":
wipe_device(device)
source = JOB.get("source", {})
if source.get("type") == "s3":
download_from_s3(device, source)
elif source.get("type") == "http":
download_from_url(device, source["url"])
else:
raise RuntimeError(f"Unsupported source type: {source}")
elif operation == "os_morphing":
os_type = (JOB.get("os_type") or "linux").lower()
if os_type == "linux":
_linux_os_morph(device, JOB)
elif os_type == "windows":
_windows_os_morph(device, JOB)
else:
raise RuntimeError(
f"Unbekannte OS-Morphing Plattform: {os_type}")
else:
raise RuntimeError(f"Unbekannte Job-Operation: {operation}")
os.sync()
if __name__ == "__main__":
try:
run_job()
except Exception as exc:
print(f"[coriolis-worker] Fehler: {exc}", file=sys.stderr)
raise
"""
python_code = textwrap.dedent(python_code_template).replace("__JOB_PAYLOAD__", job_literal)
script = textwrap.dedent(f"""\
#!/bin/bash
set -euo pipefail
python3 - <<'PY'
{python_code}
PY
""")
return script
def _execute_worker_job(self, instance, job_payload, *, timeout=None):
script = self._build_worker_job_script(job_payload)
return self._run_worker_command(
instance.id,
script,
timeout=timeout or self.run_command_timeout)
# --- Allgemeine Helfer für Worker- und Transfer-Workflows ---
def _ensure_worker_prerequisites(self):
missing = []
if not self.worker_image_id:
missing.append("worker_image_id")
if not self.worker_flavor_id:
missing.append("worker_flavor_id")
if not self.worker_network_id:
missing.append("worker_network_id")
if missing:
raise ProviderError(
f"Worker-Konfiguration unvollständig. Folgende Felder fehlen in der Endpoint-Datei: {', '.join(missing)}"
)
def _get_os_morphing_assets(self, os_type):
if not self.os_morphing_assets:
return {}
return self.os_morphing_assets.get((os_type or "linux").lower(), {})
def _build_bucket_object_url(self, bucket_data, object_name):
"""Leitet aus den Bucket-Daten einen S3-kompatiblen URL ab."""
virtual_host_url = bucket_data.get("urlVirtualHostedStyle")
path_url = bucket_data.get("urlPathStyle")
if virtual_host_url:
base = virtual_host_url.rstrip('/')
return f"{base}/{object_name}"
if path_url:
base = path_url.rstrip('/')
return f"{base}/{object_name}"
# Fallback auf generisch konfigurierten Endpoint
endpoint = self.object_storage_s3_endpoint or self.object_storage_api_url
return f"{endpoint.rstrip('/')}/{bucket_data.get('name')}/{object_name}"
def _determine_s3_endpoint(self, bucket_data):
if self.object_storage_s3_endpoint:
return self.object_storage_s3_endpoint
virtual_host_url = bucket_data.get("urlVirtualHostedStyle")
if virtual_host_url:
# https://bucket.region.endpoint -> extrahiere endpoint
parts = virtual_host_url.split("://", 1)
scheme = "https"
host = parts[-1]
if len(parts) == 2:
scheme = parts[0]
# bucket-name.first -> entferne ersten Abschnitt
host_bits = host.split('.', 1)
if len(host_bits) == 2:
host = host_bits[1]
return f"{scheme}://{host}"
return self.object_storage_api_url
def _render_worker_user_data(self, job_payload):
"""Erstellt ein minimales cloud-init Skript, das den Job beschreibt."""
job_json = json.dumps(job_payload)
script = f"""#cloud-config
write_files:
- path: /var/lib/coriolis/job.json
permissions: '0600'
owner: root:root
content: |
{job_json}
runcmd:
- [ bash, -c, "echo 'Coriolis job description written to /var/lib/coriolis/job.json'" ]
"""
return script
def _encode_user_data(self, user_data_str):
encoded = base64.b64encode(user_data_str.encode("utf-8")).decode("ascii")
return encoded
def _wait_for_instance_status(
self,
instance_id,
target_states=None,
error_states=None,
timeout=3600,
poll_interval=15
):
target_states = target_states or ("STOPPED", "DELETED")
error_states = error_states or ("ERROR", "FAILED")
start = time.time()
while True:
instance = self.get_instance(instance_id)
status = instance.status
if status in target_states:
return status
if status in error_states:
raise ProviderError(
f"Instanz {instance_id} hat Fehlerstatus '{status}' erreicht.")
if time.time() - start > timeout:
raise ProviderError(
f"Wartezeit für Instanz {instance_id} überschritten ({timeout}s).")
time.sleep(poll_interval)
def _wait_for_volume_status(
self,
volume_id,
target_states=None,
error_states=None,
timeout=1800,
poll_interval=10
):
target_states = target_states or ("AVAILABLE",)
error_states = error_states or ("ERROR", "FAILED")
start = time.time()
while True:
volume = self.get_volume(volume_id)
status = volume.status
if status in target_states:
return status
if status in error_states:
raise ProviderError(
f"Volume {volume_id} hat Fehlerstatus '{status}' erreicht.")
if time.time() - start > timeout:
raise ProviderError(
f"Wartezeit für Volume {volume_id} überschritten ({timeout}s).")
time.sleep(poll_interval)
def _wait_for_snapshot_status(
self,
snapshot_id,
target_states=None,
error_states=None,
timeout=1800,
poll_interval=10
):
target_states = target_states or ("AVAILABLE",)
error_states = error_states or ("ERROR", "FAILED")
start = time.time()
while True:
snapshot = self.get_snapshot(snapshot_id)
status = snapshot.status
if status in target_states:
return status
if status in error_states:
raise ProviderError(
f"Snapshot {snapshot_id} hat Fehlerstatus '{status}' erreicht.")
if time.time() - start > timeout:
raise ProviderError(
f"Wartezeit für Snapshot {snapshot_id} überschritten ({timeout}s).")
time.sleep(poll_interval)
def _wait_for_snapshot_deletion(
self,
snapshot_id,
timeout=900,
poll_interval=10
):
start = time.time()
while True:
try:
self.get_snapshot(snapshot_id)
except ObjectNotFoundError:
return True
if time.time() - start > timeout:
raise ProviderError(
f"Snapshot {snapshot_id} wurde nicht innerhalb von {timeout}s gelöscht.")
time.sleep(poll_interval)
def _create_volume_from_source(
self,
*,
name,
source_type,
source_id,
size_gb=None,
availability_zone=None,
performance_class=None
):
payload = {
"name": name,
"availabilityZone": availability_zone or self.worker_availability_zone,
"performanceClass": performance_class,
"size": size_gb,
"source": {
"id": source_id,
"type": source_type
}
}
payload = {k: v for k, v in payload.items() if v is not None}
volume_data = self.client.post("/volumes", data=payload)
volume = StackitVolume(self, volume_data)
self._wait_for_volume_status(volume.id)
return volume
def _launch_worker_instance(
self,
*,
job_name,
user_data,
attached_volume_ids=None
):
self._ensure_worker_prerequisites()
payload_volumes = [{"id": vol_id} for vol_id in (attached_volume_ids or [])]
metadata = dict(self.worker_metadata or {})
metadata.update({
"coriolis_job": job_name,
})
instance = self.create_instance(
name=job_name,
image_id=self.worker_image_id,
flavor_id=self.worker_flavor_id,
network_id=self.worker_network_id,
availability_zone=self.worker_availability_zone,
volumes=payload_volumes or None,
metadata=metadata,
user_data=user_data,
keypair_name=self.worker_keypair_name,
security_group_ids=self.worker_security_group_ids
)
try:
self._wait_for_instance_status(
instance.id,
target_states=("RUNNING",),
error_states=("ERROR", "FAILED"),
timeout=900)
except ProviderError as exc:
print(f"WARNUNG: Worker-Instanz {instance.id} hat RUNNING nicht erreicht: {exc}")
return instance
def _cleanup_worker_instance(self, instance_id):
try:
self.delete_instance(instance_id)
except Exception as exc:
print(f"WARNUNG: Worker-Instanz {instance_id} konnte nicht gelöscht werden: {exc}")
def _delete_volume_safe(self, volume_id):
try:
self.delete_volume(volume_id)
except Exception as exc:
print(f"WARNUNG: Volume {volume_id} konnte nicht gelöscht werden: {exc}")
def _verify_object_exists(self, bucket_name, object_name, endpoint_url, creds):
if not boto3:
print("WARNUNG: boto3 nicht verfügbar - kann Objektverifikation nicht durchführen.")
return True
session = boto3.session.Session(
aws_access_key_id=creds["access_key"],
aws_secret_access_key=creds["secret_key"]
)
client = session.client("s3", endpoint_url=endpoint_url)
try:
client.head_object(Bucket=bucket_name, Key=object_name)
return True
except ClientError as exc:
raise ProviderError(
f"S3-Objekt {bucket_name}/{object_name} konnte nicht verifiziert werden: {exc}"
)