""" VOLLSTÄNDIGER CORIOLIS PROVIDER für STACKIT IaaS v2 Diese Datei implementiert die tatsächlichen Coriolis-Basisklassen aus dem Repository https://github.com/cloudbase/coriolis. HINWEIS: - Diese Datei sollte in `coriolis_stackit/provider.py` liegen. - Die Klasse heißt `StackitProvider`, passend zur `setup.py`. - Die Wrapper-Klassen (`StackitInstance` etc.) müssen noch vervollständigt werden (siehe TODOs). """ import base64 import json import textwrap import time import uuid import requests try: import boto3 from botocore.exceptions import ClientError except ImportError: boto3 = None ClientError = Exception # --- ECHTE Coriolis Basisklassen-Importe --- # Diese Pfade basieren auf der Struktur im Coriolis GitHub Repo. # Wenn Ihr Plugin korrekt installiert ist, sollte Python diese finden. try: from coriolis.provider.base import BaseProvider from coriolis.provider.resource_utils import ( BaseAbstractInstance, BaseAbstractVolume, BaseAbstractSnapshot, BaseAbstractNetwork, BaseAbstractSubnet, BaseAbstractImage, BaseAbstractFlavor ) from coriolis.exceptions import ( ProviderAuthenticationError, ProviderConnectionError, ProviderError, ObjectNotFoundError ) except ImportError: print("WARNUNG: Coriolis-Basisklassen nicht gefunden. " "Dies ist normal, wenn außerhalb einer Coriolis-Umgebung entwickelt wird.") print("Verwende Dummy-Platzhalter-Klassen für die Entwicklung.") # --- Dummy-Klassen für die Offline-Entwicklung --- # Diese werden verwendet, wenn die Coriolis-Pakete nicht installiert sind, # damit der Code zumindest geladen werden kann. class BaseProvider: def __init__(self, connection_details): self.connection = connection_details # .get('connection', {}) self.provider_name = "dummy" # connection_details.get('provider') self.client = self._get_client("http://dummy.url") def _get_client(self, api_url): # Dummy-Client für Offline-Entwicklung print("WARNUNG: Erstelle Dummy-Client") return None class BaseAbstractInstance: def __init__(self, provider, data): self._provider = provider self._data = data class BaseAbstractVolume(BaseAbstractInstance): pass class BaseAbstractSnapshot(BaseAbstractInstance): pass class BaseAbstractNetwork(BaseAbstractInstance): pass class BaseAbstractSubnet(BaseAbstractInstance): pass class BaseAbstractImage(BaseAbstractInstance): pass class BaseAbstractFlavor(BaseAbstractInstance): pass class ProviderAuthenticationError(Exception): pass class ProviderConnectionError(Exception): pass class ProviderError(Exception): pass class ObjectNotFoundError(Exception): pass # --- STACKIT API Client --- class StackitAPIClient: """ Eine Wrapper-Klasse für die STACKIT IaaS API v2. Verwaltet die Authentifizierung (Bearer Token) und API-Aufrufe mit der erforderlichen Projekt-ID. """ def __init__( self, api_url, auth_url, client_id, client_secret, project_id, *, region=None, include_project_prefix=True, project_segment="projects" ): self.base_url = api_url.rstrip('/') self.auth_url = auth_url self.client_id = client_id self.client_secret = client_secret self.project_id = project_id self.region = region self.include_project_prefix = include_project_prefix self.project_segment = project_segment.rstrip('/') if project_segment else "projects" self.session = requests.Session() self.token = None self.token_expires_at = time.time() try: self._login() # Beim Initialisieren erstes Token holen except requests.exceptions.RequestException as e: print(f"STACKIT API: Initiale Authentifizierung fehlgeschlagen: {e}") raise ProviderAuthenticationError(f"STACKIT Auth-Fehler: {e}") def _login(self): """Holt ein neues OAuth 2.0 Bearer Token.""" print(f"STACKIT API: Authentifizierung für {self.base_url} wird durchgeführt...") try: payload = { 'grant_type': 'client_credentials', 'client_id': self.client_id, 'client_secret': self.client_secret } # STACKIT Identity API erwartet application/x-www-form-urlencoded headers = {'Content-Type': 'application/x-www-form-urlencoded'} response = requests.post(self.auth_url, data=payload, headers=headers) response.raise_for_status() # Löst HTTPError bei 4xx/5xx aus data = response.json() access_token = data.get('access_token') if not access_token: raise ProviderAuthenticationError("Kein 'access_token' in STACKIT Auth-Antwort gefunden.") self.token = access_token # 60 Sekunden Puffer self.token_expires_at = time.time() + data.get('expires_in', 3600) - 60 self.session.headers.update({ 'Authorization': f'Bearer {self.token}', 'Content-Type': 'application/json' }) print("STACKIT API: Authentifizierung erfolgreich.") except requests.exceptions.RequestException as e: # Dies ist ein kritischer Fehler. raise ProviderAuthenticationError(f"STACKIT Auth-Fehler: {e}") def _check_token(self): """Prüft, ob das Token abgelaufen ist und erneuert es ggf.""" if time.time() >= self.token_expires_at: print(f"STACKIT API: Token für {self.base_url} abgelaufen, wird erneuert...") self._login() # _login wirft ProviderAuthenticationError bei Fehlern def _build_url(self, path): """Baut die vollständige API-URL inkl. Projekt- und optional Region-ID.""" if not path.startswith("/"): path = f"/{path}" prefix = "" if self.include_project_prefix: prefix = f"/{self.project_segment}/{self.project_id}" if self.region: prefix += f"/regions/{self.region}" return f"{self.base_url}{prefix}{path}" def _request(self, method, path, **kwargs): """Zentrale Request-Methode mit Fehlerbehandlung.""" self._check_token() url = self._build_url(path) try: response = self.session.request(method, url, **kwargs) # Fehlerhafte Anfragen (4xx, 5xx) lösen hier eine Exception aus response.raise_for_status() # Für 204 (No Content) o.ä., z.B. bei DELETE if not response.content: return {'status_code': response.status_code, 'response_body': 'OK'} return response.json() except requests.exceptions.HTTPError as e: if e.response.status_code == 404: raise ObjectNotFoundError(f"Objekt nicht gefunden bei {method} {url}") elif e.response.status_code in (401, 403): # Token könnte serverseitig invalidiert worden sein print(f"STACKIT API: 401/403 bei {url} erhalten, versuche erneute Authentifizierung...") try: self._login() # Anfrage erneut versuchen (nur einmal) response = self.session.request(method, url, **kwargs) response.raise_for_status() if not response.content: return {'status_code': response.status_code, 'response_body': 'OK'} return response.json() except Exception as retry_e: raise ProviderAuthenticationError(f"Auth-Fehler (auch nach Retry) bei {method} {url}: {retry_e}") else: # Andere API-Fehler (z.B. 400, 500) raise ProviderError(f"STACKIT API Fehler ({e.response.status_code}) " f"bei {method} {url}: {e.response.text}") except requests.exceptions.ConnectionError as e: raise ProviderConnectionError(f"Verbindungsfehler zu {url}: {e}") except Exception as e: raise ProviderError(f"Allgemeiner Fehler: {e}") # --- HTTP Methoden (GET, POST, PUT, DELETE) --- def get(self, path, params=None): return self._request('GET', path, params=params) def post(self, path, data=None): return self._request('POST', path, data=json.dumps(data) if data else None) def put(self, path, data=None): return self._request('PUT', path, data=json.dumps(data)) def delete(self, path): return self._request('DELETE', path) # --- Coriolis Wrapper-Klassen --- # Diese Klassen MÜSSEN die Properties implementieren, # die Coriolis in seinen Basisklassen (z.B. BaseAbstractInstance) # als @abstractmethod definiert. class StackitInstance(BaseAbstractInstance): """Wrapper für eine STACKIT VM-Instanz.""" def __init__(self, provider, data): super(StackitInstance, self).__init__(provider, data) self._data = data @property def id(self): # STACKIT API: 'id' return self._data.get('id') @property def name(self): # STACKIT API: 'name' return self._data.get('name') @property def status(self): # STACKIT API: 'status' (z.B. 'CREATING', 'RUNNING', 'STOPPED', 'FAILED') # Coriolis erwartet Status-Strings wie 'RUNNING', 'STOPPED', 'ERROR' stackit_status = self._data.get('status') # Mapping: mapping = { 'RUNNING': 'RUNNING', 'STOPPED': 'STOPPED', 'CREATING': 'BUILDING', 'FAILED': 'ERROR', 'DELETING': 'DELETING', } # Fallback auf den Originalstatus oder UNKNOWN return mapping.get(stackit_status, stackit_status or "UNKNOWN") @property def flavor_id(self): # STACKIT API: 'flavorId' return self._data.get('flavorId') @property def image_id(self): # STACKIT API: 'imageId' return self._data.get('imageId') # --- Optionale, aber empfohlene Properties --- @property def availability_zone(self): # STACKIT API: 'availabilityZone' return self._data.get('availabilityZone') @property def networks(self): # STACKIT API: 'networkIds' (Liste von IDs) # Coriolis erwartet hier oft eine strukturierte Liste # Für den Anfang reicht eine einfache Liste von IDs. return self._data.get('networkIds', []) @property def volumes(self): # STACKIT API: 'volumeIds' (Liste von IDs) return self._data.get('volumeIds', []) class StackitVolume(BaseAbstractVolume): """Wrapper für ein STACKIT Volume.""" def __init__(self, provider, data): super(StackitVolume, self).__init__(provider, data) self._data = data @property def id(self): # STACKIT API: 'id' return self._data.get('id') @property def name(self): # STACKIT API: 'name' return self._data.get('name') @property def size(self): # STACKIT API: 'size' (in GB) # Coriolis erwartet die Größe in GB return self._data.get('size') @property def status(self): # STACKIT API: 'status' # Coriolis erwartet 'AVAILABLE', 'CREATING', 'IN-USE', 'ERROR' stackit_status = self._data.get('status') # TODO: Status-Mapping (falls nötig) mapping = { 'AVAILABLE': 'AVAILABLE', 'CREATING': 'CREATING', 'IN_USE': 'IN-USE', # Annahme 'FAILED': 'ERROR', } return mapping.get(stackit_status, stackit_status or "UNKNOWN") @property def attachments(self): # STACKIT API: 'attachedTo' (eine einzelne instanceId) # Coriolis erwartet eine Liste von { "instance_id": "...", "device": "..." } instance_id = self._data.get('attachedTo') if instance_id: # Device-Info fehlt in der STACKIT API-Antwort return [{"instance_id": instance_id, "device": None}] return [] # --- Optionale Properties --- @property def volume_type(self): # STACKIT API: 'type' return self._data.get('type') # z.B. 'storage_premium_perf1' class StackitSnapshot(BaseAbstractSnapshot): """Wrapper für einen STACKIT Snapshot.""" def __init__(self, provider, data): super(StackitSnapshot, self).__init__(provider, data) self._data = data @property def id(self): # STACKIT API: 'id' return self._data.get('id') @property def name(self): # STACKIT API: 'name' return self._data.get('name') @property def size(self): # STACKIT API: 'size' (in GB) return self._data.get('size') # Größe in GB @property def status(self): # STACKIT API: 'status' # Coriolis erwartet 'AVAILABLE', 'CREATING', 'ERROR' stackit_status = self._data.get('status') mapping = { 'AVAILABLE': 'AVAILABLE', 'CREATING': 'CREATING', 'FAILED': 'ERROR', } return mapping.get(stackit_status, stackit_status or "UNKNOWN") @property def source_volume_id(self): # STACKIT API: 'sourceVolumeId' return self._data.get('sourceVolumeId') # --- Wrapper für Flavor, Image, Network (Vervollständigt) --- class StackitFlavor(BaseAbstractFlavor): def __init__(self, provider, data): super(StackitFlavor, self).__init__(provider, data) self._data = data @property def id(self): # STACKIT API: 'id' return self._data.get('id') @property def name(self): # STACKIT API: 'name' return self._data.get('name') @property def vcpus(self): # STACKIT API: 'cpu' return self._data.get('cpu') @property def ram(self): # STACKIT API: 'memory' (in GB) # Coriolis erwartet RAM in MB mem_gb = self._data.get('memory', 0) return mem_gb * 1024 @property def disk(self): # STACKIT API: 'disk' (in GB) return self._data.get('disk') class StackitImage(BaseAbstractImage): def __init__(self, provider, data): super(StackitImage, self).__init__(provider, data) self._data = data @property def id(self): # STACKIT API: 'id' return self._data.get('id') @property def name(self): # STACKIT API: 'name' return self._data.get('name') @property def status(self): # STACKIT API: Hat keinen Status. Wir nehmen 'AVAILABLE' an. return "AVAILABLE" @property def min_disk(self): # STACKIT API: 'minDisk' (in GB) return self._data.get('minDisk') @property def min_ram(self): # STACKIT API: 'minRam' (in GB) # Coriolis erwartet MB ram_gb = self._data.get('minRam', 0) return ram_gb * 1024 class StackitNetwork(BaseAbstractNetwork): def __init__(self, provider, data): super(StackitNetwork, self).__init__(provider, data) self._data = data @property def id(self): # STACKIT API: 'id' return self._data.get('id') @property def name(self): # STACKIT API: 'name' return self._data.get('name') # Coriolis' BaseAbstractNetwork erwartet Subnetz-Infos @property def subnets(self): # STACKIT IaaS v2 hat keine expliziten Subnetze, # aber das Network-Objekt hat 'cidrRange'. # Wir erstellen ein "Dummy"-Subnetz-Objekt daraus. cidr = self._data.get('cidrRange') if cidr: # Erstellen eines Dummy-Subnetz-Dicts subnet_data = { 'id': f"{self.id}-subnet", # Künstliche ID 'name': f"{self.name}-subnet", 'cidr': cidr, 'network_id': self.id } # Wir verwenden die Basis-Subnetz-Klasse von Coriolis return [BaseAbstractSubnet(self._provider, subnet_data)] return [] # --- DER EIGENTLICHE PROVIDER --- class StackitProvider(BaseProvider): """ Coriolis Provider für STACKIT IaaS v2. """ # Name, wie er in der setup.py registriert ist PROVIDER_NAME = "stackit_v2" def __init__(self, connection_details): # 'connection_details' ist das volle Dict aus der .endpoint-Datei # 'connection' ist der Schlüssel darin super(StackitProvider, self).__init__( connection_details.get('connection', {})) self.provider_name = connection_details.get('provider') # "stackit_v2" # --- STACKIT API Client Initialisierung (NEU) --- conn = self.connection self.iaas_api_url = conn.get('api_url') self.auth_url = conn.get('auth_url') self.client_id = conn.get('client_id') self.client_secret = conn.get('client_secret') self.project_id = conn.get('project_id') self.region = conn.get('region') # NEU: URL für die Object Storage Control Plane API self.object_storage_api_url = conn.get('object_storage_api_url') # Datentransfer-Konfiguration self.migration_bucket_name = conn.get('migration_bucket_name', "coriolis-migration-data") self.worker_image_id = conn.get('worker_image_id') self.worker_flavor_id = conn.get('worker_flavor_id') self.worker_network_id = conn.get('worker_network_id') self.worker_availability_zone = conn.get('worker_availability_zone') self.worker_security_group_ids = conn.get('worker_security_group_ids', []) self.worker_keypair_name = conn.get('worker_keypair_name') self.worker_metadata = conn.get('worker_metadata', {}) self.worker_user = conn.get('worker_user', "ubuntu") self.worker_boot_volume_size = conn.get('worker_boot_volume_size', 20) self.worker_cleanup = conn.get('worker_auto_cleanup', True) self.object_storage_s3_endpoint = conn.get('object_storage_s3_endpoint_url') self.run_command_api_url = conn.get('run_command_api_url') self.run_command_template = conn.get('run_command_template', "RunShellScript") self.run_command_timeout = conn.get('run_command_timeout', 7200) self.os_morphing_assets = conn.get('os_morphing_assets', {}) if isinstance(self.worker_security_group_ids, str): self.worker_security_group_ids = [self.worker_security_group_ids] if not isinstance(self.worker_metadata, dict): self.worker_metadata = {} if not all([self.iaas_api_url, self.auth_url, self.client_id, self.client_secret, self.project_id, self.object_storage_api_url, self.region]): raise ProviderConnectionError( "api_url, auth_url, client_id, client_secret, project_id, region " "und object_storage_api_url sind in der .endpoint-Datei erforderlich.") # Client für IaaS API self.client = self._get_client(self.iaas_api_url, region=self.region) # NEU: Zweiter Client für Object Storage API (Control Plane) self.os_client = self._get_client( self.object_storage_api_url, region=self.region, project_segment="project") self.run_client = None if self.run_command_api_url: self.run_client = self._get_client( self.run_command_api_url, region=self.region, project_segment="projects") def _get_client(self, api_url, region=None, project_segment="projects"): """Erstellt den STACKIT API Client für eine gegebene API-URL.""" # Parameter werden jetzt von __init__ übergeben return StackitAPIClient( api_url, self.auth_url, self.client_id, self.client_secret, self.project_id, region=region, project_segment=project_segment) def authenticate(self): """ Testet die Verbindung. _get_client() hat bereits den ersten Login-Versuch unternommen. Wir machen einen einfachen Lese-Aufruf, um sicherzugehen. """ try: # Test 1: IaaS API # STACKIT API: GET /projects/{projectId}/flavors?limit=1 self.client.get("/flavors", params={'limit': 1}) print("STACKIT IaaS API: Authentifizierung und Verbindung erfolgreich.") # NEU: Test 2: Object Storage API # STACKIT OS API: GET /projects/{projectId}/buckets?limit=1 self.os_client.get("/buckets", params={'limit': 1}) print("STACKIT Object Storage API: Authentifizierung und Verbindung erfolgreich.") except (ProviderConnectionError, ProviderAuthenticationError) as e: # Diese Fehler werden bereits von _get_client oder _request ausgelöst print(f"STACKIT API: Authentifizierung fehlgeschlagen: {e}") raise except Exception as e: # Fallback raise ProviderError(f"Unerwarteter Fehler bei Authentifizierung: {e}") # --- Inventarisierung (Listen-Methoden) --- def list_instances(self, filter_opts=None): """Gibt eine Liste von VMs zurück.""" # STACKIT API: GET /projects/{projectId}/instances api_data = self.client.get("/instances") instances = [] # 'instances' ist der Schlüssel in der STACKIT API-Antwort for instance_data in api_data.get('instances', []): instances.append(StackitInstance(self, instance_data)) return instances def get_instance(self, instance_id): """Gibt Details zu einer einzelnen VM zurück.""" try: # STACKIT API: GET /projects/{projectId}/instances/{instanceId} instance_data = self.client.get(f"/instances/{instance_id}") return StackitInstance(self, instance_data) except ObjectNotFoundError: raise ObjectNotFoundError(f"Instanz {instance_id} nicht gefunden") def list_volumes(self, filter_opts=None): """Gibt eine Liste von Volumes zurück.""" # STACKIT API: GET /projects/{projectId}/volumes api_data = self.client.get("/volumes") volumes = [] for volume_data in api_data.get('volumes', []): volumes.append(StackitVolume(self, volume_data)) return volumes def get_volume(self, volume_id): try: # STACKIT API: GET /projects/{projectId}/volumes/{volumeId} volume_data = self.client.get(f"/volumes/{volume_id}") return StackitVolume(self, volume_data) except ObjectNotFoundError: raise ObjectNotFoundError(f"Volume {volume_id} nicht gefunden") def list_snapshots(self, filter_opts=None): # STACKIT API: GET /projects/{projectId}/snapshots api_data = self.client.get("/snapshots") snapshots = [] for snap_data in api_data.get('snapshots', []): snapshots.append(StackitSnapshot(self, snap_data)) return snapshots def get_snapshot(self, snapshot_id): try: # STACKIT API: GET /projects/{projectId}/snapshots/{snapshotId} snap_data = self.client.get(f"/snapshots/{snapshot_id}") return StackitSnapshot(self, snap_data) except ObjectNotFoundError: raise ObjectNotFoundError(f"Snapshot {snapshot_id} nicht gefunden") def list_networks(self, filter_opts=None): # STACKIT API: GET /projects/{projectId}/networks api_data = self.client.get("/networks") networks = [] for net_data in api_data.get('networks', []): networks.append(StackitNetwork(self, net_data)) return networks def list_subnets(self, network_id=None, filter_opts=None): # STACKIT IaaS v2 hat keine separaten Subnetze. # Wir geben die "Dummy"-Subnetze zurück, die in StackitNetwork erstellt wurden. print("INFO: list_subnets() gibt abgeleitete Subnetze von list_networks() zurück.") all_subnets = [] networks = self.list_networks(filter_opts=filter_opts) for net in networks: if network_id and net.id != network_id: continue all_subnets.extend(net.subnets) return all_subnets def list_images(self, filter_opts=None): # STACKIT API: GET /projects/{projectId}/images api_data = self.client.get("/images") images = [] for image_data in api_data.get('images', []): images.append(StackitImage(self, image_data)) return images def list_flavors(self, filter_opts=None): # STACKIT API: GET /projects/{projectId}/flavors api_data = self.client.get("/flavors") flavors = [] for flavor_data in api_data.get('flavors', []): flavors.append(StackitFlavor(self, flavor_data)) return flavors # --- Lebenszyklus-Management --- def power_on_instance(self, instance_id): """Schaltet eine Instanz ein.""" # STACKIT API: POST /projects/{projectId}/instances/{instanceId}/start task = self.client.post( f"/instances/{instance_id}/start") self._wait_for_instance_status( instance_id, target_states=("RUNNING",), error_states=("ERROR", "FAILED")) return task def power_off_instance(self, instance_id): """Schaltet eine Instanz aus.""" # STACKIT API: POST /projects/{projectId}/instances/{instanceId}/stop task = self.client.post( f"/instances/{instance_id}/stop") self._wait_for_instance_status( instance_id, target_states=("STOPPED",), error_states=("ERROR", "FAILED")) return task def get_instance_status(self, instance_id): """Holt den aktuellen Status einer VM (z.B. 'RUNNING', 'STOPPED').""" return self.get_instance(instance_id).status # --- Storage-Operationen --- def create_volume(self, name, size_gb, description=None, volume_type=None, snapshot_id=None, image_id=None, availability_zone=None): """Erstellt ein neues Volume.""" # STACKIT API: POST /projects/{projectId}/volumes payload = { 'name': name, 'size': size_gb, 'type': volume_type, 'availabilityZone': availability_zone, } # STACKIT API unterstützt scheinbar nicht 'create from snapshot' oder 'image' if snapshot_id or image_id: raise NotImplementedError("STACKIT API unterstützt create_volume " "aus Snapshot/Image nicht.") # Entferne None-Werte, damit die API sie nicht als 'null' interpretiert payload = {k: v for k, v in payload.items() if v is not None} volume_data = self.client.post("/volumes", data=payload) volume = StackitVolume(self, volume_data) self._wait_for_volume_status(volume.id) return volume def delete_volume(self, volume_id): """Löscht ein Volume.""" # STACKIT API: DELETE /projects/{projectId}/volumes/{volumeId} self.client.delete(f"/volumes/{volume_id}") return True def attach_volume(self, instance_id, volume_id, device_path=None): """Hängt ein Volume an eine Instanz an.""" # STACKIT API: POST /projects/{projectId}/instances/{instanceId}/volumes # Die API-Doku ist hier unklar. Ich nehme an, der Body ist: payload = { "volumeId": volume_id } self.client.post(f"/instances/{instance_id}/volumes", data=payload) return True # print(f"WARNUNG: attach_volume (nachträglich) scheint von STACKIT IaaS v2 nicht unterstützt zu werden.") # raise NotImplementedError("attach_volume scheint nicht unterstützt zu werden.") def detach_volume(self, instance_id, volume_id): """Hängt ein Volume von einer Instanz ab.""" # STACKIT API: DELETE /projects/{projectId}/instances/{instanceId}/volumes/{volumeId} self.client.delete(f"/instances/{instance_id}/volumes/{volume_id}") return True # print(f"WARNUNG: detach_volume scheint von STACKIT IaaS v2 nicht unterstützt zu werden.") # raise NotImplementedError("detach_volume scheint nicht unterstützt zu werden.") def create_snapshot(self, volume_id, snapshot_name, description=None, force=False): """Erstellt einen Snapshot.""" # STACKIT API: POST /projects/{projectId}/volumes/{volumeId}/snapshot payload = { 'name': snapshot_name } snapshot_data = self.client.post(f"/volumes/{volume_id}/snapshot", data=payload) snapshot = StackitSnapshot(self, snapshot_data) self._wait_for_snapshot_status(snapshot.id) return snapshot def delete_snapshot(self, snapshot_id): """Löscht einen Snapshot.""" # STACKIT API: DELETE /projects/{projectId}/snapshots/{snapshotId} self.client.delete(f"/snapshots/{snapshot_id}") self._wait_for_snapshot_deletion(snapshot_id) return True def get_snapshot_status(self, snapshot_id): """Holt den Status eines Snapshots.""" return self.get_snapshot(snapshot_id).status # --- Datentransfer & OS-Morphing (SEHR KOMPLEX) --- # (Aktualisiert mit Object Storage API-Logik) def export_snapshot_to_url(self, snapshot_id, *args, **kwargs): """Exportiert einen Snapshot in einen S3-kompatiblen Bucket.""" bucket_override = kwargs.get("bucket_name") object_name = kwargs.get("object_name") timeout = kwargs.get("timeout", 7200) snapshot = self.get_snapshot(snapshot_id) bucket_name = bucket_override or self.migration_bucket_name bucket = self._get_or_create_migration_bucket(bucket_name) s3_creds = self._create_temp_s3_credentials() object_name = object_name or f"{snapshot.id}-{int(time.time())}.img" job_name = f"coriolis-export-{uuid.uuid4().hex[:8]}" endpoint_url = self._determine_s3_endpoint(bucket) worker_volume = None worker_instance = None try: worker_volume = self._create_volume_from_source( name=f"{job_name}-src", source_type="snapshot", source_id=snapshot_id, availability_zone=getattr(snapshot, "availability_zone", None) ) job_payload = { "operation": "export", "source_volume_id": worker_volume.id, "s3_target": { "bucket": bucket_name, "object": object_name, "endpoint_url": endpoint_url, "access_key": s3_creds["access_key"], "secret_key": s3_creds["secret_key"] } } user_data = self._render_worker_user_data(job_payload) worker_instance = self._launch_worker_instance( job_name=job_name, user_data=user_data, attached_volume_ids=[worker_volume.id] ) self._execute_worker_job(worker_instance, job_payload, timeout=timeout) if worker_volume: self.detach_volume(worker_instance.id, worker_volume.id) self._wait_for_volume_status(worker_volume.id) self._verify_object_exists(bucket_name, object_name, endpoint_url, s3_creds) s3_url = self._build_bucket_object_url(bucket, object_name) result = { "object_url": s3_url, "bucket": bucket_name, "object": object_name, "credentials": s3_creds, "worker_instance_id": worker_instance.id if worker_instance else None, "worker_volume_id": worker_volume.id if worker_volume else None } return result finally: if worker_instance and self.worker_cleanup: try: if worker_volume: self.detach_volume(worker_instance.id, worker_volume.id) except Exception: pass self._cleanup_worker_instance(worker_instance.id) if worker_volume: self._delete_volume_safe(worker_volume.id) def create_volume_from_url(self, name, size_gb, image_url, *args, **kwargs): """Erstellt ein Volume, das mit den Daten eines externen URLs befüllt wird.""" timeout = kwargs.get("timeout", 7200) source_credentials = kwargs.get("source_credentials", {}) availability_zone = kwargs.get("availability_zone", self.worker_availability_zone) target_volume = self.create_volume( name=name, size_gb=size_gb, availability_zone=availability_zone ) self._wait_for_volume_status(target_volume.id) job_name = f"coriolis-import-{uuid.uuid4().hex[:8]}" worker_instance = None try: if source_credentials.get("type") == "s3" or ( source_credentials.get("bucket") and source_credentials.get("object")): source_cfg = { "type": "s3", "bucket": source_credentials.get("bucket"), "object": source_credentials.get("object"), "endpoint_url": source_credentials.get("endpoint_url"), "access_key": source_credentials.get("access_key"), "secret_key": source_credentials.get("secret_key") } else: source_cfg = { "type": "http", "url": image_url } job_payload = { "operation": "import", "target_volume_id": target_volume.id, "source": source_cfg } user_data = self._render_worker_user_data(job_payload) worker_instance = self._launch_worker_instance( job_name=job_name, user_data=user_data, attached_volume_ids=[target_volume.id] ) self._execute_worker_job(worker_instance, job_payload, timeout=timeout) self.detach_volume(worker_instance.id, target_volume.id) self._wait_for_volume_status(target_volume.id) return self.get_volume(target_volume.id) finally: if worker_instance and self.worker_cleanup: try: self.detach_volume(worker_instance.id, target_volume.id) except Exception: pass self._cleanup_worker_instance(worker_instance.id) # --- OS-Morphing via Run-Command --- def prepare_instance_for_os_morphing( self, volume_id, os_type="linux", network_configuration=None, timeout=5400): """Setzt Netzwerk-/Cloud-Init-Konfigurationen auf dem Volume zurück.""" payload = { "network": network_configuration or { "dhcp4": True, "dhcp6": True, "interface": "eth0" } } return self._run_os_morphing_phase( volume_id=volume_id, os_type=os_type, phase="prepare", extra_payload=payload, timeout=timeout) def inject_drivers( self, volume_id, os_type="linux", drivers=None, timeout=5400): """Installiert VirtIO-Treiber und baut die initramfs neu.""" payload = { "drivers": drivers or { "virtio": True } } return self._run_os_morphing_phase( volume_id=volume_id, os_type=os_type, phase="drivers", extra_payload=payload, timeout=timeout) def _run_os_morphing_phase( self, volume_id, os_type, phase, extra_payload=None, timeout=5400): """ Startet eine Worker-VM, hängt das Volume an und führt die Morphing- Phase über die Run-Command API aus. """ self._wait_for_volume_status(volume_id) job_payload = { "operation": "os_morphing", "phase": phase, "os_type": (os_type or "linux").lower(), "volume_id": volume_id } assets_cfg = self._get_os_morphing_assets(os_type) if assets_cfg: job_payload["assets"] = assets_cfg if extra_payload: job_payload.update(extra_payload) user_data = self._render_worker_user_data(job_payload) job_name = f"coriolis-morph-{phase}-{uuid.uuid4().hex[:8]}" worker_instance = None try: worker_instance = self._launch_worker_instance( job_name=job_name, user_data=user_data, attached_volume_ids=[volume_id]) self._execute_worker_job( worker_instance, job_payload, timeout=timeout) try: self.detach_volume(worker_instance.id, volume_id) except Exception: pass self._wait_for_volume_status(volume_id) return { "status": "completed", "phase": phase, "volume_id": volume_id } finally: if worker_instance and self.worker_cleanup: try: self.detach_volume(worker_instance.id, volume_id) except Exception: pass self._cleanup_worker_instance(worker_instance.id) # --- Instanz-Erstellung --- def create_instance(self, name, image_id, flavor_id, network_id, availability_zone=None, root_volume_size=None, volumes=None, boot_volume=None, **kwargs): """Erstellt eine neue Instanz.""" # STACKIT API: POST /projects/{projectId}/instances # 'volumes' ist in Coriolis eine Liste von { "id": "vol-id", "device": "/dev/vdb" } # STACKIT erwartet nur eine Liste von Volume-IDs. volume_ids = [v['id'] for v in volumes] if volumes else [] payload = { "name": name, "flavorId": flavor_id, "networkIds": [network_id], "volumeIds": volume_ids, "availabilityZone": availability_zone, } if boot_volume: payload["bootVolume"] = boot_volume else: payload["imageId"] = image_id metadata = kwargs.get("metadata") if metadata: payload["metadata"] = metadata security_group_ids = kwargs.get("security_group_ids") if security_group_ids: payload["securityGroupIds"] = security_group_ids keypair_name = kwargs.get("keypair_name") if keypair_name: payload["keypairName"] = keypair_name user_data = kwargs.get("user_data") if user_data: payload["userData"] = self._encode_user_data(user_data) # TODO: STACKIT API Doku für 'create' ist unklar bzgl. # root_volume_size. Es gibt kein 'root_volume_size'-Feld. # Evtl. muss ein Volume separat erstellt und als 'bootable' markiert werden? if root_volume_size and not boot_volume: print(f"WARNUNG: 'root_volume_size' ({root_volume_size}GB) wird " "ohne 'bootVolume' nicht berücksichtigt.") # Entferne None-Werte payload = {k: v for k, v in payload.items() if v is not None} try: instance_data = self.client.post("/instances", data=payload) return StackitInstance(self, instance_data) except Exception as e: raise ProviderError(f"Fehler bei create_instance: {e}") def delete_instance(self, instance_id): """Löscht eine Instanz.""" # STACKIT API: DELETE /projects/{projectId}/instances/{instanceId} try: self.client.delete(f"/instances/{instance_id}") return True except Exception as e: raise ProviderError(f"Fehler bei delete_instance: {e}") # --- Datentransfer-Helfer (NEU) --- # Implementiert mit der Object Storage Control Plane API def _get_or_create_migration_bucket(self, bucket_name): """ Prüft, ob ein Bucket existiert. Wenn nicht, erstellt er ihn. Verwendet den Object Storage Client (self.os_client). """ try: # STACKIT OS API: GET /v2/project/{projectId}/regions/{region}/bucket/{bucketName} bucket_data = self.os_client.get(f"/bucket/{bucket_name}") print(f"STACKIT OS: Migration-Bucket '{bucket_name}' gefunden.") return bucket_data except ObjectNotFoundError: print(f"STACKIT OS: Migration-Bucket '{bucket_name}' nicht gefunden. Erstelle...") try: # STACKIT OS API: POST /v2/project/{projectId}/regions/{region}/bucket/{bucketName} payload = { "name": bucket_name } new_bucket_data = self.os_client.post(f"/bucket/{bucket_name}", data=payload) print(f"STACKIT OS: Bucket '{bucket_name}' erstellt.") return new_bucket_data except Exception as e: raise ProviderError(f"Fehler beim Erstellen des Buckets '{bucket_name}': {e}") except Exception as e: raise ProviderError(f"Fehler beim Prüfen des Buckets '{bucket_name}': {e}") def _create_temp_s3_credentials(self): """ Erstellt temporäre S3-Zugangsdaten für den Datentransfer. """ print("STACKIT OS: Erstelle temporäre S3-Zugangsdaten...") try: # STACKIT OS API: POST /v2/project/{projectId}/regions/{region}/access-key payload = {} creds_data = self.os_client.post("/access-key", data=payload) # Wichtig: Wir brauchen 'accessKey', 'secretAccessKey' und 's3Endpoint' s3_creds = { "id": creds_data.get("keyId"), "access_key": creds_data.get("accessKey"), "secret_key": creds_data.get("secretAccessKey"), "s3_endpoint_url": self.object_storage_s3_endpoint } if not all([s3_creds["id"], s3_creds["access_key"], s3_creds["secret_key"]]): raise ProviderError("Unvollständige S3-Zugangsdaten von STACKIT API erhalten.") print("STACKIT OS: Temporäre S3-Zugangsdaten erstellt.") return s3_creds except Exception as e: raise ProviderError(f"Fehler beim Erstellen der S3-Zugangsdaten: {e}") def _delete_temp_s3_credentials(self, credential_id): """Löscht temporäre S3-Zugangsdaten.""" print(f"STACKIT OS: Lösche temporäre S3-Zugangsdaten (ID: {credential_id})...") try: # STACKIT OS API: DELETE /v2/project/{projectId}/regions/{region}/access-key/{credentialId} self.os_client.delete(f"/access-key/{credential_id}") print("STACKIT OS: S3-Zugangsdaten gelöscht.") return True except Exception as e: # Nicht-kritischer Fehler (Cleanup) print(f"WARNUNG: Löschen der S3-Zugangsdaten {credential_id} fehlgeschlagen: {e}") return False def _ensure_run_command_available(self): if not self.run_client: raise ProviderError( "run_command_api_url ist nicht konfiguriert. " "Bitte ergänzen Sie die Endpoint-Datei um 'run_command_api_url'.") def _run_worker_command(self, instance_id, script, *, command_template=None, timeout=None, poll_interval=5): """Führt ein Skript über die STACKIT Run-Command API aus.""" self._ensure_run_command_available() template = command_template or self.run_command_template or "RunShellScript" payload = { "commandTemplateName": template, "parameters": { "script": script } } response = self.run_client.post( f"/servers/{instance_id}/commands", data=payload) command_id = response.get("id") if command_id is None: raise ProviderError( f"Run-Command API hat keine commandId geliefert: {response}") timeout = timeout or self.run_command_timeout or 3600 start = time.time() details = None while True: details = self.run_client.get( f"/servers/{instance_id}/commands/{command_id}") status = details.get("status") if status in ("completed", "failed"): break if time.time() - start >= timeout: raise ProviderError( f"Run-Command Zeitüberschreitung nach {timeout}s " f"(commandId={command_id}, letzter Status={status}).") time.sleep(poll_interval) exit_code = details.get("exitCode") if status != "completed" or (exit_code not in (None, 0)): output = details.get("output") raise ProviderError( f"Worker-Befehl fehlgeschlagen (Status={status}, " f"ExitCode={exit_code}). Ausgabe: {output}") return details def _build_worker_job_script(self, job_payload): """Erzeugt ein Shell-Skript, das den Transfer-Job via Python ausführt.""" job_literal = json.dumps(job_payload) python_code_template = """ import json import os import subprocess import sys import urllib.request try: import boto3 except ImportError: boto3 = None JOB = __JOB_PAYLOAD__ def download_asset(asset_cfg, destination): if not asset_cfg: return None source_type = asset_cfg.get("type", "s3") os.makedirs(os.path.dirname(destination), exist_ok=True) if source_type == "s3": if boto3 is None: raise RuntimeError( "boto3 wird benötigt, um Assets aus S3 zu laden.") session = boto3.session.Session( aws_access_key_id=asset_cfg.get("access_key"), aws_secret_access_key=asset_cfg.get("secret_key")) client = session.client( "s3", endpoint_url=asset_cfg.get("endpoint_url")) client.download_file( asset_cfg["bucket"], asset_cfg["object"], destination) return destination elif source_type == "http": url = asset_cfg.get("url") if not url: raise RuntimeError("HTTP Asset ohne URL angegeben.") with urllib.request.urlopen(url) as resp, open( destination, "wb") as dst: while True: chunk = resp.read(4 * 1024 * 1024) if not chunk: break dst.write(chunk) return destination else: raise RuntimeError( f"Unbekannter Asset-Typ: {source_type}") def _lsblk(): data = subprocess.check_output( ["lsblk", "-J", "-o", "NAME,TYPE,MOUNTPOINT,FSTYPE"] ).decode("utf-8") return json.loads(data) def _has_mount(entry): if entry.get("mountpoint"): return True for child in entry.get("children") or []: if _has_mount(child): return True return False def find_data_device(): lsblk = _lsblk() for dev in lsblk.get("blockdevices", []): if dev.get("type") != "disk": continue if _has_mount(dev): continue return "/dev/" + dev["name"] raise RuntimeError("Kein zusätzliches Daten-Volume gefunden.") def get_disk_entry(device_path): device_name = device_path.replace("/dev/", "") lsblk = _lsblk() for dev in lsblk.get("blockdevices", []): if dev.get("name") == device_name: return dev return None def find_linux_root_partition(disk_entry): candidates = (disk_entry or {}).get("children") or [] fs_preference = ("ext4", "ext3", "xfs", "btrfs") for preferred in fs_preference: for child in candidates: if (child.get("type") == "part" and (child.get("fstype") or "").lower() == preferred): return "/dev/" + child["name"] for child in candidates: if child.get("type") == "part": return "/dev/" + child["name"] return "/dev/" + (disk_entry or {}).get("name", "") def _mount(target, mountpoint, options=None): os.makedirs(mountpoint, exist_ok=True) cmd = ["mount"] if options: cmd.extend(["-o", options]) cmd.extend([target, mountpoint]) subprocess.run(cmd, check=True) def _umount(target): if os.path.ismount(target): subprocess.run(["umount", "-lf", target], check=False) def _bind_mount(base, name): src = f"/{name}" dst = os.path.join(base, name) os.makedirs(dst, exist_ok=True) subprocess.run(["mount", "--bind", src, dst], check=True) return dst def _unbind_mount(path): if os.path.ismount(path): subprocess.run(["umount", "-lf", path], check=False) def _write_file(path, content): os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w", encoding="utf-8") as stream: stream.write(content) def _append_line(path, line): with open(path, "a", encoding="utf-8") as stream: stream.write(line + "\\n") def _ensure_cloud_init(mount_dir): cfg_dir = os.path.join(mount_dir, "etc/cloud/cloud.cfg.d") if os.path.isdir(os.path.join(mount_dir, "etc/cloud")): content = \"\"\"# Coriolis injected config datasource_list: [ ConfigDrive, Ec2, NoCloud ] \"\"\" _write_file(os.path.join(cfg_dir, "90-coriolis.cfg"), content) def _configure_netplan(mount_dir, network): eth_name = network.get("interface", "eth0") dhcp4 = str(network.get("dhcp4", True)).lower() dhcp6 = str(network.get("dhcp6", True)).lower() netplan_yaml = f\"\"\"network: version: 2 renderer: networkd ethernets: {eth_name}: dhcp4: {dhcp4} dhcp6: {dhcp6} \"\"\" _write_file(os.path.join( mount_dir, "etc/netplan/99-coriolis.yaml"), netplan_yaml) def _configure_interfaces_file(mount_dir, network): content = \"\"\"auto lo iface lo inet loopback auto eth0 iface eth0 inet dhcp \"\"\" _write_file(os.path.join(mount_dir, "etc/network/interfaces"), content) def _configure_sysconfig_network(mount_dir, network): content = \"\"\"DEVICE=eth0 BOOTPROTO=dhcp ONBOOT=yes TYPE=Ethernet PEERDNS=yes NM_CONTROLLED=no \"\"\" scripts_dir = os.path.join( mount_dir, "etc/sysconfig/network-scripts") _write_file(os.path.join(scripts_dir, "ifcfg-eth0"), content) def _ensure_grub_cmdline(mount_dir): grub_path = os.path.join(mount_dir, "etc/default/grub") if not os.path.exists(grub_path): return with open(grub_path, "r", encoding="utf-8") as stream: data = stream.read() if "GRUB_CMDLINE_LINUX" not in data: data += "\\nGRUB_CMDLINE_LINUX=\"\"\\n" addition = "net.ifnames=0 biosdevname=0" if addition not in data: data = data.replace( "GRUB_CMDLINE_LINUX=\"", f"GRUB_CMDLINE_LINUX=\"{addition} ") _write_file(grub_path, data) def _cleanup_persistent_rules(mount_dir): udev_path = os.path.join( mount_dir, "etc/udev/rules.d/70-persistent-net.rules") if os.path.exists(udev_path): os.remove(udev_path) def _prep_linux_network(mount_dir, network): _cleanup_persistent_rules(mount_dir) _ensure_grub_cmdline(mount_dir) _ensure_cloud_init(mount_dir) if os.path.isdir(os.path.join(mount_dir, "etc/netplan")): _configure_netplan(mount_dir, network) elif os.path.isdir( os.path.join(mount_dir, "etc/sysconfig/network-scripts")): _configure_sysconfig_network(mount_dir, network) else: _configure_interfaces_file(mount_dir, network) def _write_kernel_module_overrides(mount_dir): modules_dir = os.path.join( mount_dir, "etc/modules-load.d") content = \"\"\"virtio virtio_blk virtio_pci virtio_net virtio_scsi \"\"\" _write_file(os.path.join( modules_dir, "coriolis-virtio.conf"), content) dracut_dir = os.path.join( mount_dir, "etc/dracut.conf.d") if os.path.isdir(os.path.join(mount_dir, "etc/dracut.conf.d")): dracut_cfg = 'add_drivers+=\" virtio virtio_blk virtio_pci virtio_net virtio_scsi \"\\n' _write_file(os.path.join( dracut_dir, "coriolis-virtio.conf"), dracut_cfg) def _chroot_exec(mount_dir, cmd): binds = [] for name in ("dev", "proc", "sys"): binds.append(_bind_mount(mount_dir, name)) env = os.environ.copy() env["PATH"] = "/usr/sbin:/usr/bin:/sbin:/bin" try: subprocess.run( ["chroot", mount_dir] + cmd, check=True, env=env) finally: for target in reversed(binds): _unbind_mount(target) def _rebuild_initramfs(mount_dir): if os.path.exists(os.path.join( mount_dir, "usr/sbin/update-initramfs")): _chroot_exec(mount_dir, ["update-initramfs", "-u"]) elif os.path.exists(os.path.join( mount_dir, "usr/bin/dracut")) or os.path.exists( os.path.join(mount_dir, "usr/sbin/dracut")): _chroot_exec(mount_dir, ["dracut", "-f"]) def _linux_os_morph(device, job): assets_cfg = (job.get("assets") or {}) script_cfg = assets_cfg.get("script") if script_cfg: script_path = download_asset( script_cfg, "/var/lib/coriolis/assets/linux-script.sh") os.chmod(script_path, 0o755) env = os.environ.copy() env["CORIOLIS_DEVICE"] = device env["CORIOLIS_JOB_JSON"] = json.dumps(job) subprocess.run( [script_path, device], check=True, env=env) return disk_entry = get_disk_entry(device) if not disk_entry: raise RuntimeError("lsblk Daten für device wurden nicht gefunden.") root_part = find_linux_root_partition(disk_entry) if not root_part or not os.path.exists(root_part): raise RuntimeError("Konnte Root-Partition nicht bestimmen.") mount_dir = "/mnt/coriolis-root" _mount(root_part, mount_dir) try: network = job.get("network") or {"dhcp4": True, "dhcp6": True} _prep_linux_network(mount_dir, network) phase = job.get("phase", "prepare") if phase == "drivers": _write_kernel_module_overrides(mount_dir) _rebuild_initramfs(mount_dir) finally: _umount(mount_dir) def upload_to_s3(device, cfg): if boto3 is None: raise RuntimeError("boto3 ist nicht installiert (S3-Upload).") session = boto3.session.Session( aws_access_key_id=cfg["access_key"], aws_secret_access_key=cfg["secret_key"] ) client = session.client("s3", endpoint_url=cfg.get("endpoint_url")) with open(device, "rb") as src: client.upload_fileobj(src, cfg["bucket"], cfg["object"]) def download_from_s3(device, cfg): if boto3 is None: raise RuntimeError("boto3 ist nicht installiert (S3-Download).") session = boto3.session.Session( aws_access_key_id=cfg["access_key"], aws_secret_access_key=cfg["secret_key"] ) client = session.client("s3", endpoint_url=cfg.get("endpoint_url")) with open(device, "wb") as dst: client.download_fileobj(cfg["bucket"], cfg["object"], dst) def download_from_url(device, url): with urllib.request.urlopen(url) as resp, open(device, "wb") as dst: while True: chunk = resp.read(1024 * 1024) if not chunk: break dst.write(chunk) def wipe_device(device): subprocess.run(["wipefs", "--all", device], check=False) subprocess.run(["sgdisk", "--zap-all", device], check=False) def _windows_os_morph(device, job): assets_cfg = (job.get("assets") or {}) script_cfg = assets_cfg.get("script") if not script_cfg: raise RuntimeError( "Windows OS-Morphing erfordert ein Script-Asset.") script_path = download_asset( script_cfg, "/var/lib/coriolis/assets/windows-script.sh") os.chmod(script_path, 0o755) env = os.environ.copy() env["CORIOLIS_DEVICE"] = device env["CORIOLIS_JOB_JSON"] = json.dumps(job) subprocess.run( [script_path, device], check=True, env=env) def run_job(): device = find_data_device() operation = JOB.get("operation") if operation == "export": upload_to_s3(device, JOB["s3_target"]) elif operation == "import": wipe_device(device) source = JOB.get("source", {}) if source.get("type") == "s3": download_from_s3(device, source) elif source.get("type") == "http": download_from_url(device, source["url"]) else: raise RuntimeError(f"Unsupported source type: {source}") elif operation == "os_morphing": os_type = (JOB.get("os_type") or "linux").lower() if os_type == "linux": _linux_os_morph(device, JOB) elif os_type == "windows": _windows_os_morph(device, JOB) else: raise RuntimeError( f"Unbekannte OS-Morphing Plattform: {os_type}") else: raise RuntimeError(f"Unbekannte Job-Operation: {operation}") os.sync() if __name__ == "__main__": try: run_job() except Exception as exc: print(f"[coriolis-worker] Fehler: {exc}", file=sys.stderr) raise """ python_code = textwrap.dedent(python_code_template).replace("__JOB_PAYLOAD__", job_literal) script = textwrap.dedent(f"""\ #!/bin/bash set -euo pipefail python3 - <<'PY' {python_code} PY """) return script def _execute_worker_job(self, instance, job_payload, *, timeout=None): script = self._build_worker_job_script(job_payload) return self._run_worker_command( instance.id, script, timeout=timeout or self.run_command_timeout) # --- Allgemeine Helfer für Worker- und Transfer-Workflows --- def _ensure_worker_prerequisites(self): missing = [] if not self.worker_image_id: missing.append("worker_image_id") if not self.worker_flavor_id: missing.append("worker_flavor_id") if not self.worker_network_id: missing.append("worker_network_id") if missing: raise ProviderError( f"Worker-Konfiguration unvollständig. Folgende Felder fehlen in der Endpoint-Datei: {', '.join(missing)}" ) def _get_os_morphing_assets(self, os_type): if not self.os_morphing_assets: return {} return self.os_morphing_assets.get((os_type or "linux").lower(), {}) def _build_bucket_object_url(self, bucket_data, object_name): """Leitet aus den Bucket-Daten einen S3-kompatiblen URL ab.""" virtual_host_url = bucket_data.get("urlVirtualHostedStyle") path_url = bucket_data.get("urlPathStyle") if virtual_host_url: base = virtual_host_url.rstrip('/') return f"{base}/{object_name}" if path_url: base = path_url.rstrip('/') return f"{base}/{object_name}" # Fallback auf generisch konfigurierten Endpoint endpoint = self.object_storage_s3_endpoint or self.object_storage_api_url return f"{endpoint.rstrip('/')}/{bucket_data.get('name')}/{object_name}" def _determine_s3_endpoint(self, bucket_data): if self.object_storage_s3_endpoint: return self.object_storage_s3_endpoint virtual_host_url = bucket_data.get("urlVirtualHostedStyle") if virtual_host_url: # https://bucket.region.endpoint -> extrahiere endpoint parts = virtual_host_url.split("://", 1) scheme = "https" host = parts[-1] if len(parts) == 2: scheme = parts[0] # bucket-name.first -> entferne ersten Abschnitt host_bits = host.split('.', 1) if len(host_bits) == 2: host = host_bits[1] return f"{scheme}://{host}" return self.object_storage_api_url def _render_worker_user_data(self, job_payload): """Erstellt ein minimales cloud-init Skript, das den Job beschreibt.""" job_json = json.dumps(job_payload) script = f"""#cloud-config write_files: - path: /var/lib/coriolis/job.json permissions: '0600' owner: root:root content: | {job_json} runcmd: - [ bash, -c, "echo 'Coriolis job description written to /var/lib/coriolis/job.json'" ] """ return script def _encode_user_data(self, user_data_str): encoded = base64.b64encode(user_data_str.encode("utf-8")).decode("ascii") return encoded def _wait_for_instance_status( self, instance_id, target_states=None, error_states=None, timeout=3600, poll_interval=15 ): target_states = target_states or ("STOPPED", "DELETED") error_states = error_states or ("ERROR", "FAILED") start = time.time() while True: instance = self.get_instance(instance_id) status = instance.status if status in target_states: return status if status in error_states: raise ProviderError( f"Instanz {instance_id} hat Fehlerstatus '{status}' erreicht.") if time.time() - start > timeout: raise ProviderError( f"Wartezeit für Instanz {instance_id} überschritten ({timeout}s).") time.sleep(poll_interval) def _wait_for_volume_status( self, volume_id, target_states=None, error_states=None, timeout=1800, poll_interval=10 ): target_states = target_states or ("AVAILABLE",) error_states = error_states or ("ERROR", "FAILED") start = time.time() while True: volume = self.get_volume(volume_id) status = volume.status if status in target_states: return status if status in error_states: raise ProviderError( f"Volume {volume_id} hat Fehlerstatus '{status}' erreicht.") if time.time() - start > timeout: raise ProviderError( f"Wartezeit für Volume {volume_id} überschritten ({timeout}s).") time.sleep(poll_interval) def _wait_for_snapshot_status( self, snapshot_id, target_states=None, error_states=None, timeout=1800, poll_interval=10 ): target_states = target_states or ("AVAILABLE",) error_states = error_states or ("ERROR", "FAILED") start = time.time() while True: snapshot = self.get_snapshot(snapshot_id) status = snapshot.status if status in target_states: return status if status in error_states: raise ProviderError( f"Snapshot {snapshot_id} hat Fehlerstatus '{status}' erreicht.") if time.time() - start > timeout: raise ProviderError( f"Wartezeit für Snapshot {snapshot_id} überschritten ({timeout}s).") time.sleep(poll_interval) def _wait_for_snapshot_deletion( self, snapshot_id, timeout=900, poll_interval=10 ): start = time.time() while True: try: self.get_snapshot(snapshot_id) except ObjectNotFoundError: return True if time.time() - start > timeout: raise ProviderError( f"Snapshot {snapshot_id} wurde nicht innerhalb von {timeout}s gelöscht.") time.sleep(poll_interval) def _create_volume_from_source( self, *, name, source_type, source_id, size_gb=None, availability_zone=None, performance_class=None ): payload = { "name": name, "availabilityZone": availability_zone or self.worker_availability_zone, "performanceClass": performance_class, "size": size_gb, "source": { "id": source_id, "type": source_type } } payload = {k: v for k, v in payload.items() if v is not None} volume_data = self.client.post("/volumes", data=payload) volume = StackitVolume(self, volume_data) self._wait_for_volume_status(volume.id) return volume def _launch_worker_instance( self, *, job_name, user_data, attached_volume_ids=None ): self._ensure_worker_prerequisites() payload_volumes = [{"id": vol_id} for vol_id in (attached_volume_ids or [])] metadata = dict(self.worker_metadata or {}) metadata.update({ "coriolis_job": job_name, }) instance = self.create_instance( name=job_name, image_id=self.worker_image_id, flavor_id=self.worker_flavor_id, network_id=self.worker_network_id, availability_zone=self.worker_availability_zone, volumes=payload_volumes or None, metadata=metadata, user_data=user_data, keypair_name=self.worker_keypair_name, security_group_ids=self.worker_security_group_ids ) try: self._wait_for_instance_status( instance.id, target_states=("RUNNING",), error_states=("ERROR", "FAILED"), timeout=900) except ProviderError as exc: print(f"WARNUNG: Worker-Instanz {instance.id} hat RUNNING nicht erreicht: {exc}") return instance def _cleanup_worker_instance(self, instance_id): try: self.delete_instance(instance_id) except Exception as exc: print(f"WARNUNG: Worker-Instanz {instance_id} konnte nicht gelöscht werden: {exc}") def _delete_volume_safe(self, volume_id): try: self.delete_volume(volume_id) except Exception as exc: print(f"WARNUNG: Volume {volume_id} konnte nicht gelöscht werden: {exc}") def _verify_object_exists(self, bucket_name, object_name, endpoint_url, creds): if not boto3: print("WARNUNG: boto3 nicht verfügbar - kann Objektverifikation nicht durchführen.") return True session = boto3.session.Session( aws_access_key_id=creds["access_key"], aws_secret_access_key=creds["secret_key"] ) client = session.client("s3", endpoint_url=endpoint_url) try: client.head_object(Bucket=bucket_name, Key=object_name) return True except ClientError as exc: raise ProviderError( f"S3-Objekt {bucket_name}/{object_name} konnte nicht verifiziert werden: {exc}" )