shcloud/shcloud.py
Aaron Riedel f88d3828c3
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
update to allow multiple games
2025-04-15 18:05:36 +02:00

220 lines
7.9 KiB
Python
Executable file

#!/usr/bin/env python3
import sys
from rich.console import Console
from rich.table import Table
from rich.prompt import Prompt
from rich.progress import Progress
from rich.pretty import pprint
from rich.prompt import Confirm
from os import environ, system, path
import subprocess
from time import sleep
import argparse
from hcloud import Client
from hcloud.images.domain import Image
from hcloud.server_types.domain import ServerType
from hcloud.ssh_keys.domain import SSHKey
from hcloud.datacenters.domain import Datacenter
from hcloud.primary_ips.domain import PrimaryIP
from hcloud.volumes.domain import Volume
from hcloud.servers.domain import ServerCreatePublicNetwork
from hcloud.locations.domain import Location
server_name = "lgsm-1"
server_game = "fctrserver"
server_type = "ccx13"
server_type_id = None
server_key = 6513932
server_image = 114690387
server_ipv4 = 11737045
server_ipv6 = 11737053
volume_id = 102426010
game_choices = ["fctrserver", "sfserver", "cs2server"]
backup_paths = {
"sfserver": "/gameserver/home/.config/Epic/FactoryGame/Saved/SaveGames/server/*",
"fctrserver": "/gameserver/fctrserver/serverfiles/save1.zip",
}
# please put the token in a file named .token in the same dir as the python script
token_file = open("%s/.token" % path.dirname(__file__), "r")
token = token_file.read().replace("\n", "")
token_file.close()
client = Client(token=token)
console = Console() # for rich module≈
# get volume
volume = Volume(volume_id)
# get server types
models = client.server_types.get_all()
# parse arguments
parser = argparse.ArgumentParser()
parser.add_argument("action", type=str, help="create or delete")
parser.add_argument("-t", "--type", help="server type")
parser.add_argument(
"-g", "--game", help="game", choices=game_choices, default=server_game
)
args = parser.parse_args()
# functions
def delete_server(s):
with Progress() as progress:
check_connection = progress.add_task(
"[green]Check connection to server", total=None
)
subprocess.check_output(
f"ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null shcloud.eu whoami",
shell=True,
stderr=subprocess.STDOUT,
)
progress.update(check_connection, completed=1, total=1)
task_stop_game = progress.add_task("[red]Stop Game", total=None)
subprocess.check_output(
"ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null shcloud.eu 'su -c \"cd /gameserver/%s && /gameserver/%s/%s stop\" gameserver'"
% (server_game, server_game, server_game),
shell=True,
stderr=subprocess.STDOUT,
)
progress.update(task_stop_game, completed=1, total=1)
# backup game files
if server_game in backup_paths:
task_backup = progress.add_task("[red]Backup files", total=None)
subprocess.check_output(
f"mkdir -p /root/backup/{server_game}",
shell=True,
stderr=subprocess.STDOUT,
)
subprocess.check_output(
f"scp shcloud.eu:{backup_paths[server_game]} /root/backup/{server_game}/",
shell=True,
stderr=subprocess.STDOUT,
)
progress.update(task_backup, completed=1, total=1)
response = s.shutdown()
track_progress(response.id, response.command)
response = client.volumes.detach(volume)
track_progress(response.id, response.command)
response = s.delete()
track_progress(response.id, response.command)
def check_servertype(server_type):
# return id or None
for m in models:
if m.name == server_type and m.deprecated == False:
return m.id
return None
def select_servertype():
# return id or None
table = Table(title="Server Types Hetzner Cloud")
table.add_column("ID", justify="left", style="grey46", no_wrap=True)
table.add_column("Name", justify="left", style="cyan", no_wrap=True)
table.add_column("Description", justify="left", style="orange1", no_wrap=True)
table.add_column("Cores", justify="left", style="red", no_wrap=True)
table.add_column("Memory", justify="left", style="red", no_wrap=True)
table.add_column("Disk", justify="left", style="blue", no_wrap=True)
# table.add_column("Storage Type", justify="center", style="blue", no_wrap=True)
table.add_column("CPU Type", justify="center", style="cyan2", no_wrap=True)
# table.add_column("Deprecated", justify="center", style="grey46", no_wrap=True)
table.add_column("Hourly", justify="left", style="cyan2", no_wrap=True)
table.add_column("Monthly", justify="left", style="cyan2", no_wrap=True)
for m in models:
# if str(m.cpu_type) == "shared":
table.add_row(
str(m.id),
str(m.name),
str(m.description),
str(m.cores),
str(int(m.memory)) + " GB",
str(m.disk) + " GB",
str(m.cpu_type),
format(float(m.prices[0]["price_hourly"]["gross"]), ".2f") + "",
format(float(m.prices[0]["price_monthly"]["gross"]), ".2f") + "",
)
console.print(table)
return Prompt.ask("Selection: ", default=server_type)
def track_progress(a_id, description):
with Progress() as progress:
task = progress.add_task("[cyan]%s..." % description, total=None)
while not progress.finished:
a = client.actions.get_by_id(a_id)
if a.progress != 0:
total = 100
else:
total = None
progress.update(task, completed=int(a.progress), total=total)
if a.finished != None:
progress.stop_task(task)
sleep(0.1)
return
if args.action == "delete":
with Progress() as progress:
task_search = progress.add_task("[green]Search for server", total=None)
s = client.servers.get_by_name(server_name)
progress.update(task_search, completed=1, total=1)
if s == None:
console.print("[red]:heavy_multiplication_x: Server not found")
exit(0)
else:
console.print("[green]:heavy_check_mark: Server found")
delete_server(s)
exit(0)
if args.action == "create":
# check if server model was selected
if args.type:
# get server models
server_type_id = check_servertype(args.type)
# if not found force the user to select one
while server_type_id == None:
console.print("[red]Server type not found.")
server_type_id = check_servertype(select_servertype())
else:
while server_type_id == None:
console.print("[red]Server type not found.")
server_type_id = check_servertype(select_servertype())
# check if server already exists
servers = client.servers.get_all()
for s in servers:
if s.name == server_name:
console.print("[red]server already there")
if Confirm.ask("Delete?"):
delete_server(s)
exit(0) # for now, until I have a way to wait
else:
console.print("[red]Server should not be deleted. Aboring...")
exit(0)
# create new server
response = client.servers.create(
name=server_name,
server_type=ServerType(id=server_type_id),
image=Image(id=server_image),
ssh_keys=[SSHKey(id=server_key)],
location=Location(name="fsn1"),
volumes=[Volume(id=volume_id)],
labels={"game": server_game},
user_data="#!/bin/bash\ncurl -sL ar21.de/shinit.php?GAME=%s\\&VOLUME=%s | bash"
% (server_game, volume_id),
public_net=ServerCreatePublicNetwork(
ipv4=PrimaryIP(id=server_ipv4),
ipv6=PrimaryIP(id=server_ipv6),
enable_ipv4=True,
enable_ipv6=True,
),
)
track_progress(response.action.id, response.action.command)
for a in response.next_actions:
track_progress(a.id, a.command)