#!/bin/python3 from hcloud import Client from hcloud.images.domain import Image from hcloud.networks.domain import NetworkSubnet from hcloud.server_types.domain import ServerType from hcloud.locations.domain import Location from hcloud.load_balancer_types.domain import LoadBalancerType from hcloud.load_balancers.domain import ( LoadBalancerHealthCheck, LoadBalancerService, LoadBalancerHealtCheckHttp, LoadBalancerTarget, ) import sys CLIENT_ID = "s478841" SERVERS_NO = 5 TARGET_PORT = 8080 key_name = f"ssh-{CLIENT_ID}" net_name = f"webservice-net-{CLIENT_ID}" server_name = f"webservice-server-{CLIENT_ID}" load_balancer_name = f"webservice-load-balancer-{CLIENT_ID}" location = Location("hel1") ci_webservice = r"""#cloud-config runcmd: - cd /home - git clone https://git.wmi.amu.edu.pl/s478841/DPZC-Hetzner.git - cd DPZC-Hetzner/webservice-balancer - chmod +x ./webservice - ./webservice """ def create_client(creds_path): with open(creds_path, "r") as f: token = str(f.readline()).strip() client = Client(token=token) print("The CLIENT has been CREATED") return client def load_key(pub_key_path): print(f"\t\tUsing the {pub_key_path} file") with open(pub_key_path) as f: key = f.readline() print("\t\tThe SSH KEY has been LOADED") return key def create_key(client, key_name, pub_key_path): print("The SSH KEY has been...") key = client.ssh_keys.get_by_name(key_name) or None if not key: key_of_power = load_key(pub_key_path) key = client.ssh_keys.create(name=key_name, public_key=key_of_power) print("\tCREATED") return key def remove_key(client, key_name): print("The KEY has been...", end="") try: action = client.ssh_keys.delete(client.ssh_keys.get_by_name(key_name)) action.wait_until_finished() except AttributeError: pass print("DELETED") def create_network(client, net_name, ip_range): print(f"The NETWORK {net_name} has been...", end="") net = client.networks.get_by_name(net_name) or None if not net: net = client.networks.create( name=net_name, ip_range=ip_range, subnets=[ NetworkSubnet( ip_range=ip_range, network_zone="eu-central", type="cloud" ) ], ) print("CREATED") return net def remove_network(client, net_name): print(f"The NETWORK {net_name} has been...", end="") try: action = client.networks.delete(client.networks.get_by_name(net_name)) action.wait_until_finished() except AttributeError: pass print("DELETED") def create_server(client, server_name, ssh_key, vnet, location, user_data): webserver = client.servers.get_by_name(server_name) or None if webserver is not None: print(f"The WEBSERVICE {server_name} already exists") else: print(f"The WEBSERVICE {server_name}has been CREATED:", end=" ") webserver = client.servers.create( name=server_name, server_type=ServerType("cx11"), image=Image(name="ubuntu-22.04"), ssh_keys=[ssh_key], networks=[vnet], location=location, user_data=user_data, ) webserver.action.wait_until_finished() print(webserver.action.complete) return server_name def remove_server(client, server_name): print(f"The WEBSERVICE {server_name} has been...", end="") try: action = client.servers.delete(client.servers.get_by_name(server_name)) action.wait_until_finished() except AttributeError: pass print("DELETED") def create_load_balancer(client, name, location, vnet, servers): targets = [ LoadBalancerTarget( type="server", server=client.servers.get_by_name(server), use_private_ip=True, ) for server in servers ] print(f"The LOAD BALANCER {name} has been CREATED:", end=" ") lb = client.load_balancers.create( name=name, load_balancer_type=LoadBalancerType("lb11"), location=location, services=[ LoadBalancerService( protocol="http", listen_port=TARGET_PORT, destination_port=TARGET_PORT, proxyprotocol=False, health_check=LoadBalancerHealthCheck( protocol="http", port=TARGET_PORT, interval=15, timeout=10, retries=3, http=LoadBalancerHealtCheckHttp( path="/factors/6", status_codes=["2??", "3??"], tls=False ), ), ) ], targets=targets, public_interface=True, network=vnet, ) lb.action.wait_until_finished() print(lb.action.complete) return client.load_balancers.get_by_name(name) def remove_load_balancer(client, name): print(f"The LOAD BALANCER {name} has been...", end="") try: action = client.load_balancers.delete(client.load_balancers.get_by_name(name)) action.wait_until_finished() except AttributeError: pass print("DELETED") if __name__ == "__main__": client = create_client("../tokens/token_file") ssh_path = sys.argv[-2] action = sys.argv[-1] if action == "--create": ssh_key = create_key(client, key_name, ssh_path) vnet = create_network(client, net_name, "10.10.10.0/24") servers = [ create_server( client, f"{i+1}-{server_name}", ssh_key, vnet, location, ci_webservice ) for i in range(SERVERS_NO) ] load_balancer = create_load_balancer( client, load_balancer_name, location, vnet, servers ) print( f"\nYour service should be available at:\n\thttp://{load_balancer.data_model.public_net.ipv4.ip}:{TARGET_PORT}\n\n\t\tHave fun!" ) elif action == "--clean": remove_load_balancer(client, load_balancer_name) for i in range(SERVERS_NO): remove_server(client, f"{i+1}-{server_name}") remove_network(client, net_name) remove_key(client, key_name) print("\n\n\t\tIt was nice to meet you!") else: print(f"\nSorry, don't know what the'{action}' means...bye!")