#!/usr/bin/env python3 import sys from rich.console import Console from rich.table import Table from rich.prompt import Prompt from rich.progress import Progress from rich.pretty import pprint from rich.prompt import Confirm from os import environ, system, path import subprocess from time import sleep import argparse from hcloud import Client from hcloud.images.domain import Image from hcloud.server_types.domain import ServerType from hcloud.ssh_keys.domain import SSHKey from hcloud.datacenters.domain import Datacenter from hcloud.primary_ips.domain import PrimaryIP from hcloud.volumes.domain import Volume from hcloud.servers.domain import ServerCreatePublicNetwork from hcloud.locations.domain import Location server_name = "lgsm-1" server_game = "fctrserver" server_type = "ccx13" server_type_id = None server_key = 6513932 server_image = 114690387 server_ipv4 = 11737045 server_ipv6 = 11737053 volume_id = 102426010 game_choices = ["fctrserver", "sfserver", "cs2server"] backup_paths = { "sfserver": "/gameserver/home/.config/Epic/FactoryGame/Saved/SaveGames/server/*", "fctrserver": "/gameserver/fctrserver/serverfiles/save1.zip", } # please put the token in a file named .token in the same dir as the python script token_file = open("%s/.token" % path.dirname(__file__), "r") token = token_file.read().replace("\n", "") token_file.close() client = Client(token=token) console = Console() # for rich module≈ # get volume volume = Volume(volume_id) # get server types models = client.server_types.get_all() # parse arguments parser = argparse.ArgumentParser() parser.add_argument("action", type=str, help="create or delete") parser.add_argument("-t", "--type", help="server type") parser.add_argument( "-g", "--game", help="game", choices=game_choices, default=server_game ) args = parser.parse_args() # functions def delete_server(s): with Progress() as progress: check_connection = progress.add_task( "[green]Check connection to server", total=None ) subprocess.check_output( f"ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null shcloud.eu whoami", shell=True, stderr=subprocess.STDOUT, ) progress.update(check_connection, completed=1, total=1) task_stop_game = progress.add_task("[red]Stop Game", total=None) subprocess.check_output( "ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null shcloud.eu 'su -c \"cd /gameserver/%s && /gameserver/%s/%s stop\" gameserver'" % (server_game, server_game, server_game), shell=True, stderr=subprocess.STDOUT, ) progress.update(task_stop_game, completed=1, total=1) # backup game files if server_game in backup_paths: task_backup = progress.add_task("[red]Backup files", total=None) subprocess.check_output( f"mkdir -p /root/backup/{server_game}", shell=True, stderr=subprocess.STDOUT, ) subprocess.check_output( f"scp shcloud.eu:{backup_paths[server_game]} /root/backup/{server_game}/", shell=True, stderr=subprocess.STDOUT, ) progress.update(task_backup, completed=1, total=1) response = s.shutdown() track_progress(response.id, response.command) response = client.volumes.detach(volume) track_progress(response.id, response.command) response = s.delete() track_progress(response.id, response.command) def check_servertype(server_type): # return id or None for m in models: if m.name == server_type and m.deprecated == False: return m.id return None def select_servertype(): # return id or None table = Table(title="Server Types Hetzner Cloud") table.add_column("ID", justify="left", style="grey46", no_wrap=True) table.add_column("Name", justify="left", style="cyan", no_wrap=True) table.add_column("Description", justify="left", style="orange1", no_wrap=True) table.add_column("Cores", justify="left", style="red", no_wrap=True) table.add_column("Memory", justify="left", style="red", no_wrap=True) table.add_column("Disk", justify="left", style="blue", no_wrap=True) # table.add_column("Storage Type", justify="center", style="blue", no_wrap=True) table.add_column("CPU Type", justify="center", style="cyan2", no_wrap=True) # table.add_column("Deprecated", justify="center", style="grey46", no_wrap=True) table.add_column("Hourly", justify="left", style="cyan2", no_wrap=True) table.add_column("Monthly", justify="left", style="cyan2", no_wrap=True) for m in models: # if str(m.cpu_type) == "shared": table.add_row( str(m.id), str(m.name), str(m.description), str(m.cores), str(int(m.memory)) + " GB", str(m.disk) + " GB", str(m.cpu_type), format(float(m.prices[0]["price_hourly"]["gross"]), ".2f") + " €", format(float(m.prices[0]["price_monthly"]["gross"]), ".2f") + " €", ) console.print(table) return Prompt.ask("Selection: ", default=server_type) def track_progress(a_id, description): with Progress() as progress: task = progress.add_task("[cyan]%s..." % description, total=None) while not progress.finished: a = client.actions.get_by_id(a_id) if a.progress != 0: total = 100 else: total = None progress.update(task, completed=int(a.progress), total=total) if a.finished != None: progress.stop_task(task) sleep(0.1) return if args.action == "delete": with Progress() as progress: task_search = progress.add_task("[green]Search for server", total=None) s = client.servers.get_by_name(server_name) progress.update(task_search, completed=1, total=1) if s == None: console.print("[red]:heavy_multiplication_x: Server not found") exit(0) else: console.print("[green]:heavy_check_mark: Server found") delete_server(s) exit(0) if args.action == "create": # check if server model was selected if args.type: # get server models server_type_id = check_servertype(args.type) # if not found force the user to select one while server_type_id == None: console.print("[red]Server type not found.") server_type_id = check_servertype(select_servertype()) else: while server_type_id == None: console.print("[red]Server type not found.") server_type_id = check_servertype(select_servertype()) # check if server already exists servers = client.servers.get_all() for s in servers: if s.name == server_name: console.print("[red]server already there") if Confirm.ask("Delete?"): delete_server(s) exit(0) # for now, until I have a way to wait else: console.print("[red]Server should not be deleted. Aboring...") exit(0) # create new server response = client.servers.create( name=server_name, server_type=ServerType(id=server_type_id), image=Image(id=server_image), ssh_keys=[SSHKey(id=server_key)], location=Location(name="fsn1"), volumes=[Volume(id=volume_id)], labels={"game": server_game}, user_data="#!/bin/bash\ncurl -sL ar21.de/shinit.php?GAME=%s\\&VOLUME=%s | bash" % (server_game, volume_id), public_net=ServerCreatePublicNetwork( ipv4=PrimaryIP(id=server_ipv4), ipv6=PrimaryIP(id=server_ipv6), enable_ipv4=True, enable_ipv6=True, ), ) track_progress(response.action.id, response.action.command) for a in response.next_actions: track_progress(a.id, a.command)