From 3096de34649d62087ab78411bb5fe377b08b4da2 Mon Sep 17 00:00:00 2001 From: "luigi.tortora" Date: Mon, 10 Nov 2025 13:10:58 +0000 Subject: [PATCH] Upload files to "Coriolis_stackit" --- Coriolis_stackit/__init__.py | Bin 0 -> 1024 bytes Coriolis_stackit/provider.py | 1802 ++++++++++++++++++++++++++++++++++ 2 files changed, 1802 insertions(+) create mode 100644 Coriolis_stackit/__init__.py create mode 100644 Coriolis_stackit/provider.py diff --git a/Coriolis_stackit/__init__.py b/Coriolis_stackit/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..06d7405020018ddf3cacee90fd4af10487da3d20 GIT binary patch literal 1024 ScmZQz7zLvtFd70QH3R?z00031 literal 0 HcmV?d00001 diff --git a/Coriolis_stackit/provider.py b/Coriolis_stackit/provider.py new file mode 100644 index 0000000..0ca0044 --- /dev/null +++ b/Coriolis_stackit/provider.py @@ -0,0 +1,1802 @@ +""" +VOLLSTÄNDIGER CORIOLIS PROVIDER für STACKIT IaaS v2 + +Diese Datei implementiert die tatsächlichen Coriolis-Basisklassen +aus dem Repository https://github.com/cloudbase/coriolis. + +HINWEIS: +- Diese Datei sollte in `coriolis_stackit/provider.py` liegen. +- Die Klasse heißt `StackitProvider`, passend zur `setup.py`. +- Die Wrapper-Klassen (`StackitInstance` etc.) müssen noch + vervollständigt werden (siehe TODOs). +""" + +import base64 +import json +import textwrap +import time +import uuid + +import requests + +try: + import boto3 + from botocore.exceptions import ClientError +except ImportError: + boto3 = None + ClientError = Exception + +# --- ECHTE Coriolis Basisklassen-Importe --- +# Diese Pfade basieren auf der Struktur im Coriolis GitHub Repo. +# Wenn Ihr Plugin korrekt installiert ist, sollte Python diese finden. + +try: + from coriolis.provider.base import BaseProvider + from coriolis.provider.resource_utils import ( + BaseAbstractInstance, + BaseAbstractVolume, + BaseAbstractSnapshot, + BaseAbstractNetwork, + BaseAbstractSubnet, + BaseAbstractImage, + BaseAbstractFlavor + ) + from coriolis.exceptions import ( + ProviderAuthenticationError, + ProviderConnectionError, + ProviderError, + ObjectNotFoundError + ) +except ImportError: + print("WARNUNG: Coriolis-Basisklassen nicht gefunden. " + "Dies ist normal, wenn außerhalb einer Coriolis-Umgebung entwickelt wird.") + print("Verwende Dummy-Platzhalter-Klassen für die Entwicklung.") + + # --- Dummy-Klassen für die Offline-Entwicklung --- + # Diese werden verwendet, wenn die Coriolis-Pakete nicht installiert sind, + # damit der Code zumindest geladen werden kann. + class BaseProvider: + def __init__(self, connection_details): + self.connection = connection_details # .get('connection', {}) + self.provider_name = "dummy" # connection_details.get('provider') + self.client = self._get_client("http://dummy.url") + + def _get_client(self, api_url): + # Dummy-Client für Offline-Entwicklung + print("WARNUNG: Erstelle Dummy-Client") + return None + + class BaseAbstractInstance: + def __init__(self, provider, data): + self._provider = provider + self._data = data + + class BaseAbstractVolume(BaseAbstractInstance): pass + class BaseAbstractSnapshot(BaseAbstractInstance): pass + class BaseAbstractNetwork(BaseAbstractInstance): pass + class BaseAbstractSubnet(BaseAbstractInstance): pass + class BaseAbstractImage(BaseAbstractInstance): pass + class BaseAbstractFlavor(BaseAbstractInstance): pass + + class ProviderAuthenticationError(Exception): pass + class ProviderConnectionError(Exception): pass + class ProviderError(Exception): pass + class ObjectNotFoundError(Exception): pass + + +# --- STACKIT API Client --- + +class StackitAPIClient: + """ + Eine Wrapper-Klasse für die STACKIT IaaS API v2. + Verwaltet die Authentifizierung (Bearer Token) und + API-Aufrufe mit der erforderlichen Projekt-ID. + """ + def __init__( + self, + api_url, + auth_url, + client_id, + client_secret, + project_id, + *, + region=None, + include_project_prefix=True, + project_segment="projects" + ): + self.base_url = api_url.rstrip('/') + self.auth_url = auth_url + self.client_id = client_id + self.client_secret = client_secret + self.project_id = project_id + self.region = region + self.include_project_prefix = include_project_prefix + self.project_segment = project_segment.rstrip('/') if project_segment else "projects" + + self.session = requests.Session() + self.token = None + self.token_expires_at = time.time() + + try: + self._login() # Beim Initialisieren erstes Token holen + except requests.exceptions.RequestException as e: + print(f"STACKIT API: Initiale Authentifizierung fehlgeschlagen: {e}") + raise ProviderAuthenticationError(f"STACKIT Auth-Fehler: {e}") + + def _login(self): + """Holt ein neues OAuth 2.0 Bearer Token.""" + print(f"STACKIT API: Authentifizierung für {self.base_url} wird durchgeführt...") + try: + payload = { + 'grant_type': 'client_credentials', + 'client_id': self.client_id, + 'client_secret': self.client_secret + } + # STACKIT Identity API erwartet application/x-www-form-urlencoded + headers = {'Content-Type': 'application/x-www-form-urlencoded'} + + response = requests.post(self.auth_url, data=payload, headers=headers) + response.raise_for_status() # Löst HTTPError bei 4xx/5xx aus + + data = response.json() + access_token = data.get('access_token') + if not access_token: + raise ProviderAuthenticationError("Kein 'access_token' in STACKIT Auth-Antwort gefunden.") + + self.token = access_token + # 60 Sekunden Puffer + self.token_expires_at = time.time() + data.get('expires_in', 3600) - 60 + + self.session.headers.update({ + 'Authorization': f'Bearer {self.token}', + 'Content-Type': 'application/json' + }) + print("STACKIT API: Authentifizierung erfolgreich.") + + except requests.exceptions.RequestException as e: + # Dies ist ein kritischer Fehler. + raise ProviderAuthenticationError(f"STACKIT Auth-Fehler: {e}") + + def _check_token(self): + """Prüft, ob das Token abgelaufen ist und erneuert es ggf.""" + if time.time() >= self.token_expires_at: + print(f"STACKIT API: Token für {self.base_url} abgelaufen, wird erneuert...") + self._login() # _login wirft ProviderAuthenticationError bei Fehlern + + def _build_url(self, path): + """Baut die vollständige API-URL inkl. Projekt- und optional Region-ID.""" + if not path.startswith("/"): + path = f"/{path}" + prefix = "" + if self.include_project_prefix: + prefix = f"/{self.project_segment}/{self.project_id}" + if self.region: + prefix += f"/regions/{self.region}" + return f"{self.base_url}{prefix}{path}" + + def _request(self, method, path, **kwargs): + """Zentrale Request-Methode mit Fehlerbehandlung.""" + self._check_token() + url = self._build_url(path) + + try: + response = self.session.request(method, url, **kwargs) + # Fehlerhafte Anfragen (4xx, 5xx) lösen hier eine Exception aus + response.raise_for_status() + + # Für 204 (No Content) o.ä., z.B. bei DELETE + if not response.content: + return {'status_code': response.status_code, 'response_body': 'OK'} + + return response.json() + + except requests.exceptions.HTTPError as e: + if e.response.status_code == 404: + raise ObjectNotFoundError(f"Objekt nicht gefunden bei {method} {url}") + elif e.response.status_code in (401, 403): + # Token könnte serverseitig invalidiert worden sein + print(f"STACKIT API: 401/403 bei {url} erhalten, versuche erneute Authentifizierung...") + try: + self._login() + # Anfrage erneut versuchen (nur einmal) + response = self.session.request(method, url, **kwargs) + response.raise_for_status() + if not response.content: + return {'status_code': response.status_code, 'response_body': 'OK'} + return response.json() + except Exception as retry_e: + raise ProviderAuthenticationError(f"Auth-Fehler (auch nach Retry) bei {method} {url}: {retry_e}") + else: + # Andere API-Fehler (z.B. 400, 500) + raise ProviderError(f"STACKIT API Fehler ({e.response.status_code}) " + f"bei {method} {url}: {e.response.text}") + except requests.exceptions.ConnectionError as e: + raise ProviderConnectionError(f"Verbindungsfehler zu {url}: {e}") + except Exception as e: + raise ProviderError(f"Allgemeiner Fehler: {e}") + + # --- HTTP Methoden (GET, POST, PUT, DELETE) --- + def get(self, path, params=None): + return self._request('GET', path, params=params) + + def post(self, path, data=None): + return self._request('POST', path, data=json.dumps(data) if data else None) + + def put(self, path, data=None): + return self._request('PUT', path, data=json.dumps(data)) + + def delete(self, path): + return self._request('DELETE', path) + + +# --- Coriolis Wrapper-Klassen --- +# Diese Klassen MÜSSEN die Properties implementieren, +# die Coriolis in seinen Basisklassen (z.B. BaseAbstractInstance) +# als @abstractmethod definiert. + +class StackitInstance(BaseAbstractInstance): + """Wrapper für eine STACKIT VM-Instanz.""" + def __init__(self, provider, data): + super(StackitInstance, self).__init__(provider, data) + self._data = data + + @property + def id(self): + # STACKIT API: 'id' + return self._data.get('id') + + @property + def name(self): + # STACKIT API: 'name' + return self._data.get('name') + + @property + def status(self): + # STACKIT API: 'status' (z.B. 'CREATING', 'RUNNING', 'STOPPED', 'FAILED') + # Coriolis erwartet Status-Strings wie 'RUNNING', 'STOPPED', 'ERROR' + stackit_status = self._data.get('status') + + # Mapping: + mapping = { + 'RUNNING': 'RUNNING', + 'STOPPED': 'STOPPED', + 'CREATING': 'BUILDING', + 'FAILED': 'ERROR', + 'DELETING': 'DELETING', + } + # Fallback auf den Originalstatus oder UNKNOWN + return mapping.get(stackit_status, stackit_status or "UNKNOWN") + + @property + def flavor_id(self): + # STACKIT API: 'flavorId' + return self._data.get('flavorId') + + @property + def image_id(self): + # STACKIT API: 'imageId' + return self._data.get('imageId') + + # --- Optionale, aber empfohlene Properties --- + @property + def availability_zone(self): + # STACKIT API: 'availabilityZone' + return self._data.get('availabilityZone') + + @property + def networks(self): + # STACKIT API: 'networkIds' (Liste von IDs) + # Coriolis erwartet hier oft eine strukturierte Liste + # Für den Anfang reicht eine einfache Liste von IDs. + return self._data.get('networkIds', []) + + @property + def volumes(self): + # STACKIT API: 'volumeIds' (Liste von IDs) + return self._data.get('volumeIds', []) + + +class StackitVolume(BaseAbstractVolume): + """Wrapper für ein STACKIT Volume.""" + def __init__(self, provider, data): + super(StackitVolume, self).__init__(provider, data) + self._data = data + + @property + def id(self): + # STACKIT API: 'id' + return self._data.get('id') + + @property + def name(self): + # STACKIT API: 'name' + return self._data.get('name') + + @property + def size(self): + # STACKIT API: 'size' (in GB) + # Coriolis erwartet die Größe in GB + return self._data.get('size') + + @property + def status(self): + # STACKIT API: 'status' + # Coriolis erwartet 'AVAILABLE', 'CREATING', 'IN-USE', 'ERROR' + stackit_status = self._data.get('status') + # TODO: Status-Mapping (falls nötig) + mapping = { + 'AVAILABLE': 'AVAILABLE', + 'CREATING': 'CREATING', + 'IN_USE': 'IN-USE', # Annahme + 'FAILED': 'ERROR', + } + return mapping.get(stackit_status, stackit_status or "UNKNOWN") + + @property + def attachments(self): + # STACKIT API: 'attachedTo' (eine einzelne instanceId) + # Coriolis erwartet eine Liste von { "instance_id": "...", "device": "..." } + instance_id = self._data.get('attachedTo') + if instance_id: + # Device-Info fehlt in der STACKIT API-Antwort + return [{"instance_id": instance_id, "device": None}] + return [] + + # --- Optionale Properties --- + @property + def volume_type(self): + # STACKIT API: 'type' + return self._data.get('type') # z.B. 'storage_premium_perf1' + + +class StackitSnapshot(BaseAbstractSnapshot): + """Wrapper für einen STACKIT Snapshot.""" + def __init__(self, provider, data): + super(StackitSnapshot, self).__init__(provider, data) + self._data = data + + @property + def id(self): + # STACKIT API: 'id' + return self._data.get('id') + + @property + def name(self): + # STACKIT API: 'name' + return self._data.get('name') + + @property + def size(self): + # STACKIT API: 'size' (in GB) + return self._data.get('size') # Größe in GB + + @property + def status(self): + # STACKIT API: 'status' + # Coriolis erwartet 'AVAILABLE', 'CREATING', 'ERROR' + stackit_status = self._data.get('status') + mapping = { + 'AVAILABLE': 'AVAILABLE', + 'CREATING': 'CREATING', + 'FAILED': 'ERROR', + } + return mapping.get(stackit_status, stackit_status or "UNKNOWN") + + @property + def source_volume_id(self): + # STACKIT API: 'sourceVolumeId' + return self._data.get('sourceVolumeId') + + +# --- Wrapper für Flavor, Image, Network (Vervollständigt) --- + +class StackitFlavor(BaseAbstractFlavor): + def __init__(self, provider, data): + super(StackitFlavor, self).__init__(provider, data) + self._data = data + @property + def id(self): + # STACKIT API: 'id' + return self._data.get('id') + @property + def name(self): + # STACKIT API: 'name' + return self._data.get('name') + @property + def vcpus(self): + # STACKIT API: 'cpu' + return self._data.get('cpu') + @property + def ram(self): + # STACKIT API: 'memory' (in GB) + # Coriolis erwartet RAM in MB + mem_gb = self._data.get('memory', 0) + return mem_gb * 1024 + @property + def disk(self): + # STACKIT API: 'disk' (in GB) + return self._data.get('disk') + + +class StackitImage(BaseAbstractImage): + def __init__(self, provider, data): + super(StackitImage, self).__init__(provider, data) + self._data = data + @property + def id(self): + # STACKIT API: 'id' + return self._data.get('id') + @property + def name(self): + # STACKIT API: 'name' + return self._data.get('name') + @property + def status(self): + # STACKIT API: Hat keinen Status. Wir nehmen 'AVAILABLE' an. + return "AVAILABLE" + @property + def min_disk(self): + # STACKIT API: 'minDisk' (in GB) + return self._data.get('minDisk') + @property + def min_ram(self): + # STACKIT API: 'minRam' (in GB) + # Coriolis erwartet MB + ram_gb = self._data.get('minRam', 0) + return ram_gb * 1024 + + +class StackitNetwork(BaseAbstractNetwork): + def __init__(self, provider, data): + super(StackitNetwork, self).__init__(provider, data) + self._data = data + @property + def id(self): + # STACKIT API: 'id' + return self._data.get('id') + @property + def name(self): + # STACKIT API: 'name' + return self._data.get('name') + + # Coriolis' BaseAbstractNetwork erwartet Subnetz-Infos + @property + def subnets(self): + # STACKIT IaaS v2 hat keine expliziten Subnetze, + # aber das Network-Objekt hat 'cidrRange'. + # Wir erstellen ein "Dummy"-Subnetz-Objekt daraus. + cidr = self._data.get('cidrRange') + if cidr: + # Erstellen eines Dummy-Subnetz-Dicts + subnet_data = { + 'id': f"{self.id}-subnet", # Künstliche ID + 'name': f"{self.name}-subnet", + 'cidr': cidr, + 'network_id': self.id + } + # Wir verwenden die Basis-Subnetz-Klasse von Coriolis + return [BaseAbstractSubnet(self._provider, subnet_data)] + return [] + + +# --- DER EIGENTLICHE PROVIDER --- + +class StackitProvider(BaseProvider): + """ + Coriolis Provider für STACKIT IaaS v2. + """ + + # Name, wie er in der setup.py registriert ist + PROVIDER_NAME = "stackit_v2" + + def __init__(self, connection_details): + # 'connection_details' ist das volle Dict aus der .endpoint-Datei + # 'connection' ist der Schlüssel darin + super(StackitProvider, self).__init__( + connection_details.get('connection', {})) + + self.provider_name = connection_details.get('provider') # "stackit_v2" + + # --- STACKIT API Client Initialisierung (NEU) --- + conn = self.connection + self.iaas_api_url = conn.get('api_url') + self.auth_url = conn.get('auth_url') + self.client_id = conn.get('client_id') + self.client_secret = conn.get('client_secret') + self.project_id = conn.get('project_id') + self.region = conn.get('region') + + # NEU: URL für die Object Storage Control Plane API + self.object_storage_api_url = conn.get('object_storage_api_url') + + # Datentransfer-Konfiguration + self.migration_bucket_name = conn.get('migration_bucket_name', "coriolis-migration-data") + self.worker_image_id = conn.get('worker_image_id') + self.worker_flavor_id = conn.get('worker_flavor_id') + self.worker_network_id = conn.get('worker_network_id') + self.worker_availability_zone = conn.get('worker_availability_zone') + self.worker_security_group_ids = conn.get('worker_security_group_ids', []) + self.worker_keypair_name = conn.get('worker_keypair_name') + self.worker_metadata = conn.get('worker_metadata', {}) + self.worker_user = conn.get('worker_user', "ubuntu") + self.worker_boot_volume_size = conn.get('worker_boot_volume_size', 20) + self.worker_cleanup = conn.get('worker_auto_cleanup', True) + self.object_storage_s3_endpoint = conn.get('object_storage_s3_endpoint_url') + + self.run_command_api_url = conn.get('run_command_api_url') + self.run_command_template = conn.get('run_command_template', "RunShellScript") + self.run_command_timeout = conn.get('run_command_timeout', 7200) + self.os_morphing_assets = conn.get('os_morphing_assets', {}) + + if isinstance(self.worker_security_group_ids, str): + self.worker_security_group_ids = [self.worker_security_group_ids] + if not isinstance(self.worker_metadata, dict): + self.worker_metadata = {} + + if not all([self.iaas_api_url, self.auth_url, self.client_id, + self.client_secret, self.project_id, self.object_storage_api_url, self.region]): + raise ProviderConnectionError( + "api_url, auth_url, client_id, client_secret, project_id, region " + "und object_storage_api_url sind in der .endpoint-Datei erforderlich.") + + # Client für IaaS API + self.client = self._get_client(self.iaas_api_url, region=self.region) + # NEU: Zweiter Client für Object Storage API (Control Plane) + self.os_client = self._get_client( + self.object_storage_api_url, + region=self.region, + project_segment="project") + self.run_client = None + if self.run_command_api_url: + self.run_client = self._get_client( + self.run_command_api_url, + region=self.region, + project_segment="projects") + + + def _get_client(self, api_url, region=None, project_segment="projects"): + """Erstellt den STACKIT API Client für eine gegebene API-URL.""" + # Parameter werden jetzt von __init__ übergeben + return StackitAPIClient( + api_url, + self.auth_url, + self.client_id, + self.client_secret, + self.project_id, + region=region, + project_segment=project_segment) + + def authenticate(self): + """ + Testet die Verbindung. _get_client() hat bereits + den ersten Login-Versuch unternommen. Wir machen + einen einfachen Lese-Aufruf, um sicherzugehen. + """ + try: + # Test 1: IaaS API + # STACKIT API: GET /projects/{projectId}/flavors?limit=1 + self.client.get("/flavors", params={'limit': 1}) + print("STACKIT IaaS API: Authentifizierung und Verbindung erfolgreich.") + + # NEU: Test 2: Object Storage API + # STACKIT OS API: GET /projects/{projectId}/buckets?limit=1 + self.os_client.get("/buckets", params={'limit': 1}) + print("STACKIT Object Storage API: Authentifizierung und Verbindung erfolgreich.") + + except (ProviderConnectionError, ProviderAuthenticationError) as e: + # Diese Fehler werden bereits von _get_client oder _request ausgelöst + print(f"STACKIT API: Authentifizierung fehlgeschlagen: {e}") + raise + except Exception as e: + # Fallback + raise ProviderError(f"Unerwarteter Fehler bei Authentifizierung: {e}") + + # --- Inventarisierung (Listen-Methoden) --- + + def list_instances(self, filter_opts=None): + """Gibt eine Liste von VMs zurück.""" + # STACKIT API: GET /projects/{projectId}/instances + api_data = self.client.get("/instances") + + instances = [] + # 'instances' ist der Schlüssel in der STACKIT API-Antwort + for instance_data in api_data.get('instances', []): + instances.append(StackitInstance(self, instance_data)) + + return instances + + def get_instance(self, instance_id): + """Gibt Details zu einer einzelnen VM zurück.""" + try: + # STACKIT API: GET /projects/{projectId}/instances/{instanceId} + instance_data = self.client.get(f"/instances/{instance_id}") + return StackitInstance(self, instance_data) + except ObjectNotFoundError: + raise ObjectNotFoundError(f"Instanz {instance_id} nicht gefunden") + + def list_volumes(self, filter_opts=None): + """Gibt eine Liste von Volumes zurück.""" + # STACKIT API: GET /projects/{projectId}/volumes + api_data = self.client.get("/volumes") + volumes = [] + for volume_data in api_data.get('volumes', []): + volumes.append(StackitVolume(self, volume_data)) + return volumes + + def get_volume(self, volume_id): + try: + # STACKIT API: GET /projects/{projectId}/volumes/{volumeId} + volume_data = self.client.get(f"/volumes/{volume_id}") + return StackitVolume(self, volume_data) + except ObjectNotFoundError: + raise ObjectNotFoundError(f"Volume {volume_id} nicht gefunden") + + def list_snapshots(self, filter_opts=None): + # STACKIT API: GET /projects/{projectId}/snapshots + api_data = self.client.get("/snapshots") + snapshots = [] + for snap_data in api_data.get('snapshots', []): + snapshots.append(StackitSnapshot(self, snap_data)) + return snapshots + + def get_snapshot(self, snapshot_id): + try: + # STACKIT API: GET /projects/{projectId}/snapshots/{snapshotId} + snap_data = self.client.get(f"/snapshots/{snapshot_id}") + return StackitSnapshot(self, snap_data) + except ObjectNotFoundError: + raise ObjectNotFoundError(f"Snapshot {snapshot_id} nicht gefunden") + + def list_networks(self, filter_opts=None): + # STACKIT API: GET /projects/{projectId}/networks + api_data = self.client.get("/networks") + networks = [] + for net_data in api_data.get('networks', []): + networks.append(StackitNetwork(self, net_data)) + return networks + + def list_subnets(self, network_id=None, filter_opts=None): + # STACKIT IaaS v2 hat keine separaten Subnetze. + # Wir geben die "Dummy"-Subnetze zurück, die in StackitNetwork erstellt wurden. + print("INFO: list_subnets() gibt abgeleitete Subnetze von list_networks() zurück.") + all_subnets = [] + networks = self.list_networks(filter_opts=filter_opts) + for net in networks: + if network_id and net.id != network_id: + continue + all_subnets.extend(net.subnets) + return all_subnets + + def list_images(self, filter_opts=None): + # STACKIT API: GET /projects/{projectId}/images + api_data = self.client.get("/images") + images = [] + for image_data in api_data.get('images', []): + images.append(StackitImage(self, image_data)) + return images + + def list_flavors(self, filter_opts=None): + # STACKIT API: GET /projects/{projectId}/flavors + api_data = self.client.get("/flavors") + flavors = [] + for flavor_data in api_data.get('flavors', []): + flavors.append(StackitFlavor(self, flavor_data)) + return flavors + + # --- Lebenszyklus-Management --- + + def power_on_instance(self, instance_id): + """Schaltet eine Instanz ein.""" + # STACKIT API: POST /projects/{projectId}/instances/{instanceId}/start + task = self.client.post( + f"/instances/{instance_id}/start") + self._wait_for_instance_status( + instance_id, + target_states=("RUNNING",), + error_states=("ERROR", "FAILED")) + return task + + def power_off_instance(self, instance_id): + """Schaltet eine Instanz aus.""" + # STACKIT API: POST /projects/{projectId}/instances/{instanceId}/stop + task = self.client.post( + f"/instances/{instance_id}/stop") + self._wait_for_instance_status( + instance_id, + target_states=("STOPPED",), + error_states=("ERROR", "FAILED")) + return task + + def get_instance_status(self, instance_id): + """Holt den aktuellen Status einer VM (z.B. 'RUNNING', 'STOPPED').""" + return self.get_instance(instance_id).status + + # --- Storage-Operationen --- + + def create_volume(self, name, size_gb, description=None, volume_type=None, + snapshot_id=None, image_id=None, availability_zone=None): + """Erstellt ein neues Volume.""" + # STACKIT API: POST /projects/{projectId}/volumes + payload = { + 'name': name, + 'size': size_gb, + 'type': volume_type, + 'availabilityZone': availability_zone, + } + + # STACKIT API unterstützt scheinbar nicht 'create from snapshot' oder 'image' + if snapshot_id or image_id: + raise NotImplementedError("STACKIT API unterstützt create_volume " + "aus Snapshot/Image nicht.") + + # Entferne None-Werte, damit die API sie nicht als 'null' interpretiert + payload = {k: v for k, v in payload.items() if v is not None} + + volume_data = self.client.post("/volumes", data=payload) + volume = StackitVolume(self, volume_data) + self._wait_for_volume_status(volume.id) + return volume + + def delete_volume(self, volume_id): + """Löscht ein Volume.""" + # STACKIT API: DELETE /projects/{projectId}/volumes/{volumeId} + self.client.delete(f"/volumes/{volume_id}") + return True + + def attach_volume(self, instance_id, volume_id, device_path=None): + """Hängt ein Volume an eine Instanz an.""" + # STACKIT API: POST /projects/{projectId}/instances/{instanceId}/volumes + # Die API-Doku ist hier unklar. Ich nehme an, der Body ist: + payload = { "volumeId": volume_id } + self.client.post(f"/instances/{instance_id}/volumes", data=payload) + return True + # print(f"WARNUNG: attach_volume (nachträglich) scheint von STACKIT IaaS v2 nicht unterstützt zu werden.") + # raise NotImplementedError("attach_volume scheint nicht unterstützt zu werden.") + + def detach_volume(self, instance_id, volume_id): + """Hängt ein Volume von einer Instanz ab.""" + # STACKIT API: DELETE /projects/{projectId}/instances/{instanceId}/volumes/{volumeId} + self.client.delete(f"/instances/{instance_id}/volumes/{volume_id}") + return True + # print(f"WARNUNG: detach_volume scheint von STACKIT IaaS v2 nicht unterstützt zu werden.") + # raise NotImplementedError("detach_volume scheint nicht unterstützt zu werden.") + + def create_snapshot(self, volume_id, snapshot_name, description=None, force=False): + """Erstellt einen Snapshot.""" + # STACKIT API: POST /projects/{projectId}/volumes/{volumeId}/snapshot + payload = { 'name': snapshot_name } + snapshot_data = self.client.post(f"/volumes/{volume_id}/snapshot", data=payload) + snapshot = StackitSnapshot(self, snapshot_data) + self._wait_for_snapshot_status(snapshot.id) + return snapshot + + def delete_snapshot(self, snapshot_id): + """Löscht einen Snapshot.""" + # STACKIT API: DELETE /projects/{projectId}/snapshots/{snapshotId} + self.client.delete(f"/snapshots/{snapshot_id}") + self._wait_for_snapshot_deletion(snapshot_id) + return True + + def get_snapshot_status(self, snapshot_id): + """Holt den Status eines Snapshots.""" + return self.get_snapshot(snapshot_id).status + + # --- Datentransfer & OS-Morphing (SEHR KOMPLEX) --- + # (Aktualisiert mit Object Storage API-Logik) + + def export_snapshot_to_url(self, snapshot_id, *args, **kwargs): + """Exportiert einen Snapshot in einen S3-kompatiblen Bucket.""" + bucket_override = kwargs.get("bucket_name") + object_name = kwargs.get("object_name") + timeout = kwargs.get("timeout", 7200) + + snapshot = self.get_snapshot(snapshot_id) + bucket_name = bucket_override or self.migration_bucket_name + bucket = self._get_or_create_migration_bucket(bucket_name) + s3_creds = self._create_temp_s3_credentials() + object_name = object_name or f"{snapshot.id}-{int(time.time())}.img" + job_name = f"coriolis-export-{uuid.uuid4().hex[:8]}" + endpoint_url = self._determine_s3_endpoint(bucket) + + worker_volume = None + worker_instance = None + try: + worker_volume = self._create_volume_from_source( + name=f"{job_name}-src", + source_type="snapshot", + source_id=snapshot_id, + availability_zone=getattr(snapshot, "availability_zone", None) + ) + job_payload = { + "operation": "export", + "source_volume_id": worker_volume.id, + "s3_target": { + "bucket": bucket_name, + "object": object_name, + "endpoint_url": endpoint_url, + "access_key": s3_creds["access_key"], + "secret_key": s3_creds["secret_key"] + } + } + user_data = self._render_worker_user_data(job_payload) + worker_instance = self._launch_worker_instance( + job_name=job_name, + user_data=user_data, + attached_volume_ids=[worker_volume.id] + ) + self._execute_worker_job(worker_instance, job_payload, timeout=timeout) + if worker_volume: + self.detach_volume(worker_instance.id, worker_volume.id) + self._wait_for_volume_status(worker_volume.id) + self._verify_object_exists(bucket_name, object_name, endpoint_url, s3_creds) + + s3_url = self._build_bucket_object_url(bucket, object_name) + result = { + "object_url": s3_url, + "bucket": bucket_name, + "object": object_name, + "credentials": s3_creds, + "worker_instance_id": worker_instance.id if worker_instance else None, + "worker_volume_id": worker_volume.id if worker_volume else None + } + return result + finally: + if worker_instance and self.worker_cleanup: + try: + if worker_volume: + self.detach_volume(worker_instance.id, worker_volume.id) + except Exception: + pass + self._cleanup_worker_instance(worker_instance.id) + if worker_volume: + self._delete_volume_safe(worker_volume.id) + + def create_volume_from_url(self, name, size_gb, image_url, *args, **kwargs): + """Erstellt ein Volume, das mit den Daten eines externen URLs befüllt wird.""" + timeout = kwargs.get("timeout", 7200) + source_credentials = kwargs.get("source_credentials", {}) + availability_zone = kwargs.get("availability_zone", self.worker_availability_zone) + + target_volume = self.create_volume( + name=name, + size_gb=size_gb, + availability_zone=availability_zone + ) + self._wait_for_volume_status(target_volume.id) + job_name = f"coriolis-import-{uuid.uuid4().hex[:8]}" + worker_instance = None + try: + if source_credentials.get("type") == "s3" or ( + source_credentials.get("bucket") and source_credentials.get("object")): + source_cfg = { + "type": "s3", + "bucket": source_credentials.get("bucket"), + "object": source_credentials.get("object"), + "endpoint_url": source_credentials.get("endpoint_url"), + "access_key": source_credentials.get("access_key"), + "secret_key": source_credentials.get("secret_key") + } + else: + source_cfg = { + "type": "http", + "url": image_url + } + job_payload = { + "operation": "import", + "target_volume_id": target_volume.id, + "source": source_cfg + } + user_data = self._render_worker_user_data(job_payload) + worker_instance = self._launch_worker_instance( + job_name=job_name, + user_data=user_data, + attached_volume_ids=[target_volume.id] + ) + self._execute_worker_job(worker_instance, job_payload, timeout=timeout) + self.detach_volume(worker_instance.id, target_volume.id) + self._wait_for_volume_status(target_volume.id) + + return self.get_volume(target_volume.id) + finally: + if worker_instance and self.worker_cleanup: + try: + self.detach_volume(worker_instance.id, target_volume.id) + except Exception: + pass + self._cleanup_worker_instance(worker_instance.id) + + # --- OS-Morphing via Run-Command --- + + def prepare_instance_for_os_morphing( + self, volume_id, os_type="linux", network_configuration=None, + timeout=5400): + """Setzt Netzwerk-/Cloud-Init-Konfigurationen auf dem Volume zurück.""" + payload = { + "network": network_configuration or { + "dhcp4": True, + "dhcp6": True, + "interface": "eth0" + } + } + return self._run_os_morphing_phase( + volume_id=volume_id, + os_type=os_type, + phase="prepare", + extra_payload=payload, + timeout=timeout) + + def inject_drivers( + self, volume_id, os_type="linux", drivers=None, timeout=5400): + """Installiert VirtIO-Treiber und baut die initramfs neu.""" + payload = { + "drivers": drivers or { + "virtio": True + } + } + return self._run_os_morphing_phase( + volume_id=volume_id, + os_type=os_type, + phase="drivers", + extra_payload=payload, + timeout=timeout) + + def _run_os_morphing_phase( + self, volume_id, os_type, phase, extra_payload=None, + timeout=5400): + """ + Startet eine Worker-VM, hängt das Volume an und führt die Morphing- + Phase über die Run-Command API aus. + """ + self._wait_for_volume_status(volume_id) + job_payload = { + "operation": "os_morphing", + "phase": phase, + "os_type": (os_type or "linux").lower(), + "volume_id": volume_id + } + assets_cfg = self._get_os_morphing_assets(os_type) + if assets_cfg: + job_payload["assets"] = assets_cfg + if extra_payload: + job_payload.update(extra_payload) + + user_data = self._render_worker_user_data(job_payload) + job_name = f"coriolis-morph-{phase}-{uuid.uuid4().hex[:8]}" + + worker_instance = None + try: + worker_instance = self._launch_worker_instance( + job_name=job_name, + user_data=user_data, + attached_volume_ids=[volume_id]) + self._execute_worker_job( + worker_instance, + job_payload, + timeout=timeout) + try: + self.detach_volume(worker_instance.id, volume_id) + except Exception: + pass + self._wait_for_volume_status(volume_id) + return { + "status": "completed", + "phase": phase, + "volume_id": volume_id + } + finally: + if worker_instance and self.worker_cleanup: + try: + self.detach_volume(worker_instance.id, volume_id) + except Exception: + pass + self._cleanup_worker_instance(worker_instance.id) + + # --- Instanz-Erstellung --- + + def create_instance(self, name, image_id, flavor_id, network_id, + availability_zone=None, root_volume_size=None, + volumes=None, boot_volume=None, **kwargs): + """Erstellt eine neue Instanz.""" + # STACKIT API: POST /projects/{projectId}/instances + + # 'volumes' ist in Coriolis eine Liste von { "id": "vol-id", "device": "/dev/vdb" } + # STACKIT erwartet nur eine Liste von Volume-IDs. + volume_ids = [v['id'] for v in volumes] if volumes else [] + + payload = { + "name": name, + "flavorId": flavor_id, + "networkIds": [network_id], + "volumeIds": volume_ids, + "availabilityZone": availability_zone, + } + if boot_volume: + payload["bootVolume"] = boot_volume + else: + payload["imageId"] = image_id + metadata = kwargs.get("metadata") + if metadata: + payload["metadata"] = metadata + security_group_ids = kwargs.get("security_group_ids") + if security_group_ids: + payload["securityGroupIds"] = security_group_ids + keypair_name = kwargs.get("keypair_name") + if keypair_name: + payload["keypairName"] = keypair_name + user_data = kwargs.get("user_data") + if user_data: + payload["userData"] = self._encode_user_data(user_data) + + # TODO: STACKIT API Doku für 'create' ist unklar bzgl. + # root_volume_size. Es gibt kein 'root_volume_size'-Feld. + # Evtl. muss ein Volume separat erstellt und als 'bootable' markiert werden? + if root_volume_size and not boot_volume: + print(f"WARNUNG: 'root_volume_size' ({root_volume_size}GB) wird " + "ohne 'bootVolume' nicht berücksichtigt.") + + # Entferne None-Werte + payload = {k: v for k, v in payload.items() if v is not None} + + try: + instance_data = self.client.post("/instances", data=payload) + return StackitInstance(self, instance_data) + except Exception as e: + raise ProviderError(f"Fehler bei create_instance: {e}") + + def delete_instance(self, instance_id): + """Löscht eine Instanz.""" + # STACKIT API: DELETE /projects/{projectId}/instances/{instanceId} + try: + self.client.delete(f"/instances/{instance_id}") + return True + except Exception as e: + raise ProviderError(f"Fehler bei delete_instance: {e}") + + # --- Datentransfer-Helfer (NEU) --- + # Implementiert mit der Object Storage Control Plane API + + def _get_or_create_migration_bucket(self, bucket_name): + """ + Prüft, ob ein Bucket existiert. Wenn nicht, erstellt er ihn. + Verwendet den Object Storage Client (self.os_client). + """ + try: + # STACKIT OS API: GET /v2/project/{projectId}/regions/{region}/bucket/{bucketName} + bucket_data = self.os_client.get(f"/bucket/{bucket_name}") + print(f"STACKIT OS: Migration-Bucket '{bucket_name}' gefunden.") + return bucket_data + except ObjectNotFoundError: + print(f"STACKIT OS: Migration-Bucket '{bucket_name}' nicht gefunden. Erstelle...") + try: + # STACKIT OS API: POST /v2/project/{projectId}/regions/{region}/bucket/{bucketName} + payload = { "name": bucket_name } + new_bucket_data = self.os_client.post(f"/bucket/{bucket_name}", data=payload) + print(f"STACKIT OS: Bucket '{bucket_name}' erstellt.") + return new_bucket_data + except Exception as e: + raise ProviderError(f"Fehler beim Erstellen des Buckets '{bucket_name}': {e}") + except Exception as e: + raise ProviderError(f"Fehler beim Prüfen des Buckets '{bucket_name}': {e}") + + def _create_temp_s3_credentials(self): + """ + Erstellt temporäre S3-Zugangsdaten für den Datentransfer. + """ + print("STACKIT OS: Erstelle temporäre S3-Zugangsdaten...") + try: + # STACKIT OS API: POST /v2/project/{projectId}/regions/{region}/access-key + payload = {} + creds_data = self.os_client.post("/access-key", data=payload) + + # Wichtig: Wir brauchen 'accessKey', 'secretAccessKey' und 's3Endpoint' + s3_creds = { + "id": creds_data.get("keyId"), + "access_key": creds_data.get("accessKey"), + "secret_key": creds_data.get("secretAccessKey"), + "s3_endpoint_url": self.object_storage_s3_endpoint + } + + if not all([s3_creds["id"], s3_creds["access_key"], s3_creds["secret_key"]]): + raise ProviderError("Unvollständige S3-Zugangsdaten von STACKIT API erhalten.") + + print("STACKIT OS: Temporäre S3-Zugangsdaten erstellt.") + return s3_creds + + except Exception as e: + raise ProviderError(f"Fehler beim Erstellen der S3-Zugangsdaten: {e}") + + def _delete_temp_s3_credentials(self, credential_id): + """Löscht temporäre S3-Zugangsdaten.""" + print(f"STACKIT OS: Lösche temporäre S3-Zugangsdaten (ID: {credential_id})...") + try: + # STACKIT OS API: DELETE /v2/project/{projectId}/regions/{region}/access-key/{credentialId} + self.os_client.delete(f"/access-key/{credential_id}") + print("STACKIT OS: S3-Zugangsdaten gelöscht.") + return True + except Exception as e: + # Nicht-kritischer Fehler (Cleanup) + print(f"WARNUNG: Löschen der S3-Zugangsdaten {credential_id} fehlgeschlagen: {e}") + return False + + def _ensure_run_command_available(self): + if not self.run_client: + raise ProviderError( + "run_command_api_url ist nicht konfiguriert. " + "Bitte ergänzen Sie die Endpoint-Datei um 'run_command_api_url'.") + + def _run_worker_command(self, instance_id, script, *, command_template=None, + timeout=None, poll_interval=5): + """Führt ein Skript über die STACKIT Run-Command API aus.""" + self._ensure_run_command_available() + template = command_template or self.run_command_template or "RunShellScript" + payload = { + "commandTemplateName": template, + "parameters": { + "script": script + } + } + response = self.run_client.post( + f"/servers/{instance_id}/commands", + data=payload) + command_id = response.get("id") + if command_id is None: + raise ProviderError( + f"Run-Command API hat keine commandId geliefert: {response}") + + timeout = timeout or self.run_command_timeout or 3600 + start = time.time() + details = None + while True: + details = self.run_client.get( + f"/servers/{instance_id}/commands/{command_id}") + status = details.get("status") + if status in ("completed", "failed"): + break + if time.time() - start >= timeout: + raise ProviderError( + f"Run-Command Zeitüberschreitung nach {timeout}s " + f"(commandId={command_id}, letzter Status={status}).") + time.sleep(poll_interval) + + exit_code = details.get("exitCode") + if status != "completed" or (exit_code not in (None, 0)): + output = details.get("output") + raise ProviderError( + f"Worker-Befehl fehlgeschlagen (Status={status}, " + f"ExitCode={exit_code}). Ausgabe: {output}") + return details + + def _build_worker_job_script(self, job_payload): + """Erzeugt ein Shell-Skript, das den Transfer-Job via Python ausführt.""" + job_literal = json.dumps(job_payload) + python_code_template = """ + import json + import os + import subprocess + import sys + import urllib.request + + try: + import boto3 + except ImportError: + boto3 = None + + JOB = __JOB_PAYLOAD__ + + def download_asset(asset_cfg, destination): + if not asset_cfg: + return None + source_type = asset_cfg.get("type", "s3") + os.makedirs(os.path.dirname(destination), exist_ok=True) + if source_type == "s3": + if boto3 is None: + raise RuntimeError( + "boto3 wird benötigt, um Assets aus S3 zu laden.") + session = boto3.session.Session( + aws_access_key_id=asset_cfg.get("access_key"), + aws_secret_access_key=asset_cfg.get("secret_key")) + client = session.client( + "s3", endpoint_url=asset_cfg.get("endpoint_url")) + client.download_file( + asset_cfg["bucket"], + asset_cfg["object"], + destination) + return destination + elif source_type == "http": + url = asset_cfg.get("url") + if not url: + raise RuntimeError("HTTP Asset ohne URL angegeben.") + with urllib.request.urlopen(url) as resp, open( + destination, "wb") as dst: + while True: + chunk = resp.read(4 * 1024 * 1024) + if not chunk: + break + dst.write(chunk) + return destination + else: + raise RuntimeError( + f"Unbekannter Asset-Typ: {source_type}") + + def _lsblk(): + data = subprocess.check_output( + ["lsblk", "-J", "-o", "NAME,TYPE,MOUNTPOINT,FSTYPE"] + ).decode("utf-8") + return json.loads(data) + + def _has_mount(entry): + if entry.get("mountpoint"): + return True + for child in entry.get("children") or []: + if _has_mount(child): + return True + return False + + def find_data_device(): + lsblk = _lsblk() + for dev in lsblk.get("blockdevices", []): + if dev.get("type") != "disk": + continue + if _has_mount(dev): + continue + return "/dev/" + dev["name"] + raise RuntimeError("Kein zusätzliches Daten-Volume gefunden.") + + def get_disk_entry(device_path): + device_name = device_path.replace("/dev/", "") + lsblk = _lsblk() + for dev in lsblk.get("blockdevices", []): + if dev.get("name") == device_name: + return dev + return None + + def find_linux_root_partition(disk_entry): + candidates = (disk_entry or {}).get("children") or [] + fs_preference = ("ext4", "ext3", "xfs", "btrfs") + for preferred in fs_preference: + for child in candidates: + if (child.get("type") == "part" and + (child.get("fstype") or "").lower() == preferred): + return "/dev/" + child["name"] + for child in candidates: + if child.get("type") == "part": + return "/dev/" + child["name"] + return "/dev/" + (disk_entry or {}).get("name", "") + + def _mount(target, mountpoint, options=None): + os.makedirs(mountpoint, exist_ok=True) + cmd = ["mount"] + if options: + cmd.extend(["-o", options]) + cmd.extend([target, mountpoint]) + subprocess.run(cmd, check=True) + + def _umount(target): + if os.path.ismount(target): + subprocess.run(["umount", "-lf", target], check=False) + + def _bind_mount(base, name): + src = f"/{name}" + dst = os.path.join(base, name) + os.makedirs(dst, exist_ok=True) + subprocess.run(["mount", "--bind", src, dst], check=True) + return dst + + def _unbind_mount(path): + if os.path.ismount(path): + subprocess.run(["umount", "-lf", path], check=False) + + def _write_file(path, content): + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, "w", encoding="utf-8") as stream: + stream.write(content) + + def _append_line(path, line): + with open(path, "a", encoding="utf-8") as stream: + stream.write(line + "\\n") + + def _ensure_cloud_init(mount_dir): + cfg_dir = os.path.join(mount_dir, "etc/cloud/cloud.cfg.d") + if os.path.isdir(os.path.join(mount_dir, "etc/cloud")): + content = \"\"\"# Coriolis injected config +datasource_list: [ ConfigDrive, Ec2, NoCloud ] +\"\"\" + _write_file(os.path.join(cfg_dir, "90-coriolis.cfg"), content) + + def _configure_netplan(mount_dir, network): + eth_name = network.get("interface", "eth0") + dhcp4 = str(network.get("dhcp4", True)).lower() + dhcp6 = str(network.get("dhcp6", True)).lower() + netplan_yaml = f\"\"\"network: + version: 2 + renderer: networkd + ethernets: + {eth_name}: + dhcp4: {dhcp4} + dhcp6: {dhcp6} +\"\"\" + _write_file(os.path.join( + mount_dir, "etc/netplan/99-coriolis.yaml"), netplan_yaml) + + def _configure_interfaces_file(mount_dir, network): + content = \"\"\"auto lo +iface lo inet loopback + +auto eth0 +iface eth0 inet dhcp +\"\"\" + _write_file(os.path.join(mount_dir, "etc/network/interfaces"), content) + + def _configure_sysconfig_network(mount_dir, network): + content = \"\"\"DEVICE=eth0 +BOOTPROTO=dhcp +ONBOOT=yes +TYPE=Ethernet +PEERDNS=yes +NM_CONTROLLED=no +\"\"\" + scripts_dir = os.path.join( + mount_dir, "etc/sysconfig/network-scripts") + _write_file(os.path.join(scripts_dir, "ifcfg-eth0"), content) + + def _ensure_grub_cmdline(mount_dir): + grub_path = os.path.join(mount_dir, "etc/default/grub") + if not os.path.exists(grub_path): + return + with open(grub_path, "r", encoding="utf-8") as stream: + data = stream.read() + if "GRUB_CMDLINE_LINUX" not in data: + data += "\\nGRUB_CMDLINE_LINUX=\"\"\\n" + addition = "net.ifnames=0 biosdevname=0" + if addition not in data: + data = data.replace( + "GRUB_CMDLINE_LINUX=\"", + f"GRUB_CMDLINE_LINUX=\"{addition} ") + _write_file(grub_path, data) + + def _cleanup_persistent_rules(mount_dir): + udev_path = os.path.join( + mount_dir, "etc/udev/rules.d/70-persistent-net.rules") + if os.path.exists(udev_path): + os.remove(udev_path) + + def _prep_linux_network(mount_dir, network): + _cleanup_persistent_rules(mount_dir) + _ensure_grub_cmdline(mount_dir) + _ensure_cloud_init(mount_dir) + if os.path.isdir(os.path.join(mount_dir, "etc/netplan")): + _configure_netplan(mount_dir, network) + elif os.path.isdir( + os.path.join(mount_dir, "etc/sysconfig/network-scripts")): + _configure_sysconfig_network(mount_dir, network) + else: + _configure_interfaces_file(mount_dir, network) + + def _write_kernel_module_overrides(mount_dir): + modules_dir = os.path.join( + mount_dir, "etc/modules-load.d") + content = \"\"\"virtio +virtio_blk +virtio_pci +virtio_net +virtio_scsi +\"\"\" + _write_file(os.path.join( + modules_dir, "coriolis-virtio.conf"), content) + dracut_dir = os.path.join( + mount_dir, "etc/dracut.conf.d") + if os.path.isdir(os.path.join(mount_dir, "etc/dracut.conf.d")): + dracut_cfg = 'add_drivers+=\" virtio virtio_blk virtio_pci virtio_net virtio_scsi \"\\n' + _write_file(os.path.join( + dracut_dir, "coriolis-virtio.conf"), dracut_cfg) + + def _chroot_exec(mount_dir, cmd): + binds = [] + for name in ("dev", "proc", "sys"): + binds.append(_bind_mount(mount_dir, name)) + env = os.environ.copy() + env["PATH"] = "/usr/sbin:/usr/bin:/sbin:/bin" + try: + subprocess.run( + ["chroot", mount_dir] + cmd, + check=True, env=env) + finally: + for target in reversed(binds): + _unbind_mount(target) + + def _rebuild_initramfs(mount_dir): + if os.path.exists(os.path.join( + mount_dir, "usr/sbin/update-initramfs")): + _chroot_exec(mount_dir, ["update-initramfs", "-u"]) + elif os.path.exists(os.path.join( + mount_dir, "usr/bin/dracut")) or os.path.exists( + os.path.join(mount_dir, "usr/sbin/dracut")): + _chroot_exec(mount_dir, ["dracut", "-f"]) + + def _linux_os_morph(device, job): + assets_cfg = (job.get("assets") or {}) + script_cfg = assets_cfg.get("script") + if script_cfg: + script_path = download_asset( + script_cfg, + "/var/lib/coriolis/assets/linux-script.sh") + os.chmod(script_path, 0o755) + env = os.environ.copy() + env["CORIOLIS_DEVICE"] = device + env["CORIOLIS_JOB_JSON"] = json.dumps(job) + subprocess.run( + [script_path, device], + check=True, + env=env) + return + disk_entry = get_disk_entry(device) + if not disk_entry: + raise RuntimeError("lsblk Daten für device wurden nicht gefunden.") + root_part = find_linux_root_partition(disk_entry) + if not root_part or not os.path.exists(root_part): + raise RuntimeError("Konnte Root-Partition nicht bestimmen.") + mount_dir = "/mnt/coriolis-root" + _mount(root_part, mount_dir) + try: + network = job.get("network") or {"dhcp4": True, "dhcp6": True} + _prep_linux_network(mount_dir, network) + phase = job.get("phase", "prepare") + if phase == "drivers": + _write_kernel_module_overrides(mount_dir) + _rebuild_initramfs(mount_dir) + finally: + _umount(mount_dir) + + def upload_to_s3(device, cfg): + if boto3 is None: + raise RuntimeError("boto3 ist nicht installiert (S3-Upload).") + session = boto3.session.Session( + aws_access_key_id=cfg["access_key"], + aws_secret_access_key=cfg["secret_key"] + ) + client = session.client("s3", endpoint_url=cfg.get("endpoint_url")) + with open(device, "rb") as src: + client.upload_fileobj(src, cfg["bucket"], cfg["object"]) + + def download_from_s3(device, cfg): + if boto3 is None: + raise RuntimeError("boto3 ist nicht installiert (S3-Download).") + session = boto3.session.Session( + aws_access_key_id=cfg["access_key"], + aws_secret_access_key=cfg["secret_key"] + ) + client = session.client("s3", endpoint_url=cfg.get("endpoint_url")) + with open(device, "wb") as dst: + client.download_fileobj(cfg["bucket"], cfg["object"], dst) + + def download_from_url(device, url): + with urllib.request.urlopen(url) as resp, open(device, "wb") as dst: + while True: + chunk = resp.read(1024 * 1024) + if not chunk: + break + dst.write(chunk) + + def wipe_device(device): + subprocess.run(["wipefs", "--all", device], check=False) + subprocess.run(["sgdisk", "--zap-all", device], check=False) + + def _windows_os_morph(device, job): + assets_cfg = (job.get("assets") or {}) + script_cfg = assets_cfg.get("script") + if not script_cfg: + raise RuntimeError( + "Windows OS-Morphing erfordert ein Script-Asset.") + script_path = download_asset( + script_cfg, + "/var/lib/coriolis/assets/windows-script.sh") + os.chmod(script_path, 0o755) + env = os.environ.copy() + env["CORIOLIS_DEVICE"] = device + env["CORIOLIS_JOB_JSON"] = json.dumps(job) + subprocess.run( + [script_path, device], + check=True, + env=env) + + def run_job(): + device = find_data_device() + operation = JOB.get("operation") + if operation == "export": + upload_to_s3(device, JOB["s3_target"]) + elif operation == "import": + wipe_device(device) + source = JOB.get("source", {}) + if source.get("type") == "s3": + download_from_s3(device, source) + elif source.get("type") == "http": + download_from_url(device, source["url"]) + else: + raise RuntimeError(f"Unsupported source type: {source}") + elif operation == "os_morphing": + os_type = (JOB.get("os_type") or "linux").lower() + if os_type == "linux": + _linux_os_morph(device, JOB) + elif os_type == "windows": + _windows_os_morph(device, JOB) + else: + raise RuntimeError( + f"Unbekannte OS-Morphing Plattform: {os_type}") + else: + raise RuntimeError(f"Unbekannte Job-Operation: {operation}") + os.sync() + + if __name__ == "__main__": + try: + run_job() + except Exception as exc: + print(f"[coriolis-worker] Fehler: {exc}", file=sys.stderr) + raise + """ + python_code = textwrap.dedent(python_code_template).replace("__JOB_PAYLOAD__", job_literal) + script = textwrap.dedent(f"""\ + #!/bin/bash + set -euo pipefail + python3 - <<'PY' + {python_code} + PY + """) + return script + + def _execute_worker_job(self, instance, job_payload, *, timeout=None): + script = self._build_worker_job_script(job_payload) + return self._run_worker_command( + instance.id, + script, + timeout=timeout or self.run_command_timeout) + + # --- Allgemeine Helfer für Worker- und Transfer-Workflows --- + + def _ensure_worker_prerequisites(self): + missing = [] + if not self.worker_image_id: + missing.append("worker_image_id") + if not self.worker_flavor_id: + missing.append("worker_flavor_id") + if not self.worker_network_id: + missing.append("worker_network_id") + if missing: + raise ProviderError( + f"Worker-Konfiguration unvollständig. Folgende Felder fehlen in der Endpoint-Datei: {', '.join(missing)}" + ) + + def _get_os_morphing_assets(self, os_type): + if not self.os_morphing_assets: + return {} + return self.os_morphing_assets.get((os_type or "linux").lower(), {}) + + def _build_bucket_object_url(self, bucket_data, object_name): + """Leitet aus den Bucket-Daten einen S3-kompatiblen URL ab.""" + virtual_host_url = bucket_data.get("urlVirtualHostedStyle") + path_url = bucket_data.get("urlPathStyle") + if virtual_host_url: + base = virtual_host_url.rstrip('/') + return f"{base}/{object_name}" + if path_url: + base = path_url.rstrip('/') + return f"{base}/{object_name}" + # Fallback auf generisch konfigurierten Endpoint + endpoint = self.object_storage_s3_endpoint or self.object_storage_api_url + return f"{endpoint.rstrip('/')}/{bucket_data.get('name')}/{object_name}" + + def _determine_s3_endpoint(self, bucket_data): + if self.object_storage_s3_endpoint: + return self.object_storage_s3_endpoint + virtual_host_url = bucket_data.get("urlVirtualHostedStyle") + if virtual_host_url: + # https://bucket.region.endpoint -> extrahiere endpoint + parts = virtual_host_url.split("://", 1) + scheme = "https" + host = parts[-1] + if len(parts) == 2: + scheme = parts[0] + # bucket-name.first -> entferne ersten Abschnitt + host_bits = host.split('.', 1) + if len(host_bits) == 2: + host = host_bits[1] + return f"{scheme}://{host}" + return self.object_storage_api_url + + def _render_worker_user_data(self, job_payload): + """Erstellt ein minimales cloud-init Skript, das den Job beschreibt.""" + job_json = json.dumps(job_payload) + script = f"""#cloud-config +write_files: + - path: /var/lib/coriolis/job.json + permissions: '0600' + owner: root:root + content: | + {job_json} +runcmd: + - [ bash, -c, "echo 'Coriolis job description written to /var/lib/coriolis/job.json'" ] +""" + return script + + def _encode_user_data(self, user_data_str): + encoded = base64.b64encode(user_data_str.encode("utf-8")).decode("ascii") + return encoded + + def _wait_for_instance_status( + self, + instance_id, + target_states=None, + error_states=None, + timeout=3600, + poll_interval=15 + ): + target_states = target_states or ("STOPPED", "DELETED") + error_states = error_states or ("ERROR", "FAILED") + start = time.time() + while True: + instance = self.get_instance(instance_id) + status = instance.status + if status in target_states: + return status + if status in error_states: + raise ProviderError( + f"Instanz {instance_id} hat Fehlerstatus '{status}' erreicht.") + if time.time() - start > timeout: + raise ProviderError( + f"Wartezeit für Instanz {instance_id} überschritten ({timeout}s).") + time.sleep(poll_interval) + + def _wait_for_volume_status( + self, + volume_id, + target_states=None, + error_states=None, + timeout=1800, + poll_interval=10 + ): + target_states = target_states or ("AVAILABLE",) + error_states = error_states or ("ERROR", "FAILED") + start = time.time() + while True: + volume = self.get_volume(volume_id) + status = volume.status + if status in target_states: + return status + if status in error_states: + raise ProviderError( + f"Volume {volume_id} hat Fehlerstatus '{status}' erreicht.") + if time.time() - start > timeout: + raise ProviderError( + f"Wartezeit für Volume {volume_id} überschritten ({timeout}s).") + time.sleep(poll_interval) + + def _wait_for_snapshot_status( + self, + snapshot_id, + target_states=None, + error_states=None, + timeout=1800, + poll_interval=10 + ): + target_states = target_states or ("AVAILABLE",) + error_states = error_states or ("ERROR", "FAILED") + start = time.time() + while True: + snapshot = self.get_snapshot(snapshot_id) + status = snapshot.status + if status in target_states: + return status + if status in error_states: + raise ProviderError( + f"Snapshot {snapshot_id} hat Fehlerstatus '{status}' erreicht.") + if time.time() - start > timeout: + raise ProviderError( + f"Wartezeit für Snapshot {snapshot_id} überschritten ({timeout}s).") + time.sleep(poll_interval) + + def _wait_for_snapshot_deletion( + self, + snapshot_id, + timeout=900, + poll_interval=10 + ): + start = time.time() + while True: + try: + self.get_snapshot(snapshot_id) + except ObjectNotFoundError: + return True + if time.time() - start > timeout: + raise ProviderError( + f"Snapshot {snapshot_id} wurde nicht innerhalb von {timeout}s gelöscht.") + time.sleep(poll_interval) + + def _create_volume_from_source( + self, + *, + name, + source_type, + source_id, + size_gb=None, + availability_zone=None, + performance_class=None + ): + payload = { + "name": name, + "availabilityZone": availability_zone or self.worker_availability_zone, + "performanceClass": performance_class, + "size": size_gb, + "source": { + "id": source_id, + "type": source_type + } + } + payload = {k: v for k, v in payload.items() if v is not None} + volume_data = self.client.post("/volumes", data=payload) + volume = StackitVolume(self, volume_data) + self._wait_for_volume_status(volume.id) + return volume + + def _launch_worker_instance( + self, + *, + job_name, + user_data, + attached_volume_ids=None + ): + self._ensure_worker_prerequisites() + payload_volumes = [{"id": vol_id} for vol_id in (attached_volume_ids or [])] + metadata = dict(self.worker_metadata or {}) + metadata.update({ + "coriolis_job": job_name, + }) + + instance = self.create_instance( + name=job_name, + image_id=self.worker_image_id, + flavor_id=self.worker_flavor_id, + network_id=self.worker_network_id, + availability_zone=self.worker_availability_zone, + volumes=payload_volumes or None, + metadata=metadata, + user_data=user_data, + keypair_name=self.worker_keypair_name, + security_group_ids=self.worker_security_group_ids + ) + try: + self._wait_for_instance_status( + instance.id, + target_states=("RUNNING",), + error_states=("ERROR", "FAILED"), + timeout=900) + except ProviderError as exc: + print(f"WARNUNG: Worker-Instanz {instance.id} hat RUNNING nicht erreicht: {exc}") + return instance + + def _cleanup_worker_instance(self, instance_id): + try: + self.delete_instance(instance_id) + except Exception as exc: + print(f"WARNUNG: Worker-Instanz {instance_id} konnte nicht gelöscht werden: {exc}") + + def _delete_volume_safe(self, volume_id): + try: + self.delete_volume(volume_id) + except Exception as exc: + print(f"WARNUNG: Volume {volume_id} konnte nicht gelöscht werden: {exc}") + + def _verify_object_exists(self, bucket_name, object_name, endpoint_url, creds): + if not boto3: + print("WARNUNG: boto3 nicht verfügbar - kann Objektverifikation nicht durchführen.") + return True + session = boto3.session.Session( + aws_access_key_id=creds["access_key"], + aws_secret_access_key=creds["secret_key"] + ) + client = session.client("s3", endpoint_url=endpoint_url) + try: + client.head_object(Bucket=bucket_name, Key=object_name) + return True + except ClientError as exc: + raise ProviderError( + f"S3-Objekt {bucket_name}/{object_name} konnte nicht verifiziert werden: {exc}" + )