This repository has been archived on 2025-01-23. You can view files and clone it, but cannot push or open issues or pull requests.
traefik-certmanager/kubernetes/base/config/kube_config_test.py

1915 lines
70 KiB
Python

# Copyright 2018 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import datetime
import io
import json
import os
from pprint import pprint
import shutil
import tempfile
import unittest
from collections import namedtuple
from unittest import mock
import yaml
from six import PY3, next
from kubernetes.client import Configuration
from .config_exception import ConfigException
from .dateutil import UTC, format_rfc3339, parse_rfc3339
from .kube_config import (ENV_KUBECONFIG_PATH_SEPARATOR, CommandTokenSource,
ConfigNode, FileOrData, KubeConfigLoader,
KubeConfigMerger, _cleanup_temp_files,
_create_temp_file_with_content,
_get_kube_config_loader,
_get_kube_config_loader_for_yaml_file,
list_kube_config_contexts, load_kube_config,
load_kube_config_from_dict, new_client_from_config, new_client_from_config_dict)
BEARER_TOKEN_FORMAT = "Bearer %s"
EXPIRY_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
# should be less than kube_config.EXPIRY_SKEW_PREVENTION_DELAY
PAST_EXPIRY_TIMEDELTA = 2
# should be more than kube_config.EXPIRY_SKEW_PREVENTION_DELAY
FUTURE_EXPIRY_TIMEDELTA = 60
NON_EXISTING_FILE = "zz_non_existing_file_472398324"
def _base64(string):
return base64.standard_b64encode(string.encode()).decode()
def _urlsafe_unpadded_b64encode(string):
return base64.urlsafe_b64encode(string.encode()).decode().rstrip('=')
def _format_expiry_datetime(dt):
return dt.strftime(EXPIRY_DATETIME_FORMAT)
def _get_expiry(loader, active_context):
expired_gcp_conf = (item for item in loader._config.value.get("users")
if item.get("name") == active_context)
return next(expired_gcp_conf).get("user").get("auth-provider") \
.get("config").get("expiry")
def _raise_exception(st):
raise Exception(st)
TEST_FILE_KEY = "file"
TEST_DATA_KEY = "data"
TEST_FILENAME = "test-filename"
TEST_DATA = "test-data"
TEST_DATA_BASE64 = _base64(TEST_DATA)
TEST_ANOTHER_DATA = "another-test-data"
TEST_ANOTHER_DATA_BASE64 = _base64(TEST_ANOTHER_DATA)
TEST_HOST = "test-host"
TEST_USERNAME = "me"
TEST_PASSWORD = "pass"
# token for me:pass
TEST_BASIC_TOKEN = "Basic bWU6cGFzcw=="
DATETIME_EXPIRY_PAST = datetime.datetime.now(tz=UTC
).replace(tzinfo=None) - datetime.timedelta(minutes=PAST_EXPIRY_TIMEDELTA)
DATETIME_EXPIRY_FUTURE = datetime.datetime.now(tz=UTC
).replace(tzinfo=None) + datetime.timedelta(minutes=FUTURE_EXPIRY_TIMEDELTA)
TEST_TOKEN_EXPIRY_PAST = _format_expiry_datetime(DATETIME_EXPIRY_PAST)
TEST_SSL_HOST = "https://test-host"
TEST_CERTIFICATE_AUTH = "cert-auth"
TEST_CERTIFICATE_AUTH_BASE64 = _base64(TEST_CERTIFICATE_AUTH)
TEST_CLIENT_KEY = "client-key"
TEST_CLIENT_KEY_BASE64 = _base64(TEST_CLIENT_KEY)
TEST_CLIENT_CERT = "client-cert"
TEST_CLIENT_CERT_BASE64 = _base64(TEST_CLIENT_CERT)
TEST_TLS_SERVER_NAME = "kubernetes.io"
TEST_OIDC_TOKEN = "test-oidc-token"
TEST_OIDC_INFO = "{\"name\": \"test\"}"
TEST_OIDC_BASE = ".".join([
_urlsafe_unpadded_b64encode(TEST_OIDC_TOKEN),
_urlsafe_unpadded_b64encode(TEST_OIDC_INFO)
])
TEST_OIDC_LOGIN = ".".join([
TEST_OIDC_BASE,
_urlsafe_unpadded_b64encode(TEST_CLIENT_CERT_BASE64)
])
TEST_OIDC_TOKEN = "Bearer %s" % TEST_OIDC_LOGIN
TEST_OIDC_EXP = "{\"name\": \"test\",\"exp\": 536457600}"
TEST_OIDC_EXP_BASE = _urlsafe_unpadded_b64encode(
TEST_OIDC_TOKEN) + "." + _urlsafe_unpadded_b64encode(TEST_OIDC_EXP)
TEST_OIDC_EXPIRED_LOGIN = ".".join([
TEST_OIDC_EXP_BASE,
_urlsafe_unpadded_b64encode(TEST_CLIENT_CERT)
])
TEST_OIDC_CONTAINS_RESERVED_CHARACTERS = ".".join([
_urlsafe_unpadded_b64encode(TEST_OIDC_TOKEN),
_urlsafe_unpadded_b64encode(TEST_OIDC_INFO).replace("a", "+"),
_urlsafe_unpadded_b64encode(TEST_CLIENT_CERT)
])
TEST_OIDC_INVALID_PADDING_LENGTH = ".".join([
_urlsafe_unpadded_b64encode(TEST_OIDC_TOKEN),
"aaaaa",
_urlsafe_unpadded_b64encode(TEST_CLIENT_CERT)
])
TEST_OIDC_CA = _base64(TEST_CERTIFICATE_AUTH)
TEST_AZURE_LOGIN = TEST_OIDC_LOGIN
TEST_AZURE_TOKEN = "test-azure-token"
TEST_AZURE_TOKEN_FULL = "Bearer " + TEST_AZURE_TOKEN
class BaseTestCase(unittest.TestCase):
def setUp(self):
self._temp_files = []
def tearDown(self):
for f in self._temp_files:
os.remove(f)
def _create_temp_file(self, content=""):
handler, name = tempfile.mkstemp()
self._temp_files.append(name)
os.write(handler, str.encode(content))
os.close(handler)
return name
def expect_exception(self, func, message_part, *args, **kwargs):
with self.assertRaises(ConfigException) as context:
func(*args, **kwargs)
self.assertIn(message_part, str(context.exception))
class TestFileOrData(BaseTestCase):
@staticmethod
def get_file_content(filename):
with open(filename) as f:
return f.read()
def test_file_given_file(self):
temp_filename = _create_temp_file_with_content(TEST_DATA)
obj = {TEST_FILE_KEY: temp_filename}
t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY)
self.assertEqual(TEST_DATA, self.get_file_content(t.as_file()))
def test_file_given_non_existing_file(self):
temp_filename = NON_EXISTING_FILE
obj = {TEST_FILE_KEY: temp_filename}
t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY)
self.expect_exception(t.as_file, "does not exist")
def test_file_given_data(self):
obj = {TEST_DATA_KEY: TEST_DATA_BASE64}
t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY,
data_key_name=TEST_DATA_KEY)
self.assertEqual(TEST_DATA, self.get_file_content(t.as_file()))
def test_file_given_data_no_base64(self):
obj = {TEST_DATA_KEY: TEST_DATA}
t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY,
data_key_name=TEST_DATA_KEY, base64_file_content=False)
self.assertEqual(TEST_DATA, self.get_file_content(t.as_file()))
def test_data_given_data(self):
obj = {TEST_DATA_KEY: TEST_DATA_BASE64}
t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY,
data_key_name=TEST_DATA_KEY)
self.assertEqual(TEST_DATA_BASE64, t.as_data())
def test_data_given_file(self):
obj = {
TEST_FILE_KEY: self._create_temp_file(content=TEST_DATA)}
t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY)
self.assertEqual(TEST_DATA_BASE64, t.as_data())
def test_data_given_file_no_base64(self):
obj = {
TEST_FILE_KEY: self._create_temp_file(content=TEST_DATA)}
t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY,
base64_file_content=False)
self.assertEqual(TEST_DATA, t.as_data())
def test_data_given_file_and_data(self):
obj = {
TEST_DATA_KEY: TEST_DATA_BASE64,
TEST_FILE_KEY: self._create_temp_file(
content=TEST_ANOTHER_DATA)}
t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY,
data_key_name=TEST_DATA_KEY)
self.assertEqual(TEST_DATA_BASE64, t.as_data())
def test_file_given_file_and_data(self):
obj = {
TEST_DATA_KEY: TEST_DATA_BASE64,
TEST_FILE_KEY: self._create_temp_file(
content=TEST_ANOTHER_DATA)}
t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY,
data_key_name=TEST_DATA_KEY)
self.assertEqual(TEST_DATA, self.get_file_content(t.as_file()))
def test_file_with_custom_dirname(self):
tempfile = self._create_temp_file(content=TEST_DATA)
tempfile_dir = os.path.dirname(tempfile)
tempfile_basename = os.path.basename(tempfile)
obj = {TEST_FILE_KEY: tempfile_basename}
t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY,
file_base_path=tempfile_dir)
self.assertEqual(TEST_DATA, self.get_file_content(t.as_file()))
def test_create_temp_file_with_content(self):
self.assertEqual(TEST_DATA,
self.get_file_content(
_create_temp_file_with_content(TEST_DATA)))
_cleanup_temp_files()
def test_file_given_data_bytes(self):
obj = {TEST_DATA_KEY: TEST_DATA_BASE64.encode()}
t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY,
data_key_name=TEST_DATA_KEY)
self.assertEqual(TEST_DATA, self.get_file_content(t.as_file()))
def test_file_given_data_bytes_no_base64(self):
obj = {TEST_DATA_KEY: TEST_DATA.encode()}
t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY,
data_key_name=TEST_DATA_KEY, base64_file_content=False)
self.assertEqual(TEST_DATA, self.get_file_content(t.as_file()))
def test_file_given_no_object(self):
t = FileOrData(obj=None, file_key_name=TEST_FILE_KEY,
data_key_name=TEST_DATA_KEY)
self.assertEqual(t.as_file(), None)
def test_file_given_no_object_data(self):
t = FileOrData(obj=None, file_key_name=TEST_FILE_KEY,
data_key_name=TEST_DATA_KEY)
self.assertEqual(t.as_data(), None)
class TestConfigNode(BaseTestCase):
test_obj = {"key1": "test", "key2": ["a", "b", "c"],
"key3": {"inner_key": "inner_value"},
"with_names": [{"name": "test_name", "value": "test_value"},
{"name": "test_name2",
"value": {"key1", "test"}},
{"name": "test_name3", "value": [1, 2, 3]}],
"with_names_dup": [
{"name": "test_name", "value": "test_value"},
{"name": "test_name",
"value": {"key1", "test"}},
{"name": "test_name3", "value": [1, 2, 3]}
]}
def setUp(self):
super(TestConfigNode, self).setUp()
self.node = ConfigNode("test_obj", self.test_obj)
def test_normal_map_array_operations(self):
self.assertEqual("test", self.node['key1'])
self.assertEqual(5, len(self.node))
self.assertEqual("test_obj/key2", self.node['key2'].name)
self.assertEqual(["a", "b", "c"], self.node['key2'].value)
self.assertEqual("b", self.node['key2'][1])
self.assertEqual(3, len(self.node['key2']))
self.assertEqual("test_obj/key3", self.node['key3'].name)
self.assertEqual({"inner_key": "inner_value"},
self.node['key3'].value)
self.assertEqual("inner_value", self.node['key3']["inner_key"])
self.assertEqual(1, len(self.node['key3']))
def test_get_with_name(self):
node = self.node["with_names"]
self.assertEqual(
"test_value",
node.get_with_name("test_name")["value"])
self.assertTrue(
isinstance(node.get_with_name("test_name2"), ConfigNode))
self.assertTrue(
isinstance(node.get_with_name("test_name3"), ConfigNode))
self.assertEqual("test_obj/with_names[name=test_name2]",
node.get_with_name("test_name2").name)
self.assertEqual("test_obj/with_names[name=test_name3]",
node.get_with_name("test_name3").name)
def test_key_does_not_exists(self):
self.expect_exception(lambda: self.node['not-exists-key'],
"Expected key not-exists-key in test_obj")
self.expect_exception(lambda: self.node['key3']['not-exists-key'],
"Expected key not-exists-key in test_obj/key3")
def test_get_with_name_on_invalid_object(self):
self.expect_exception(
lambda: self.node['key2'].get_with_name('no-name'),
"Expected all values in test_obj/key2 list to have \'name\' key")
def test_get_with_name_on_non_list_object(self):
self.expect_exception(
lambda: self.node['key3'].get_with_name('no-name'),
"Expected test_obj/key3 to be a list")
def test_get_with_name_on_name_does_not_exists(self):
self.expect_exception(
lambda: self.node['with_names'].get_with_name('no-name'),
"Expected object with name no-name in test_obj/with_names list")
def test_get_with_name_on_duplicate_name(self):
self.expect_exception(
lambda: self.node['with_names_dup'].get_with_name('test_name'),
"Expected only one object with name test_name in "
"test_obj/with_names_dup list")
class FakeConfig:
FILE_KEYS = ["ssl_ca_cert", "key_file", "cert_file"]
IGNORE_KEYS = ["refresh_api_key_hook"]
def __init__(self, token=None, **kwargs):
self.api_key = {}
# Provided by the OpenAPI-generated Configuration class
self.refresh_api_key_hook = None
if token:
self.api_key['authorization'] = token
self.__dict__.update(kwargs)
def __eq__(self, other):
if len(self.__dict__) != len(other.__dict__):
return
for k, v in self.__dict__.items():
if k in self.IGNORE_KEYS:
continue
if k not in other.__dict__:
return
if k in self.FILE_KEYS:
if v and other.__dict__[k]:
try:
with open(v) as f1, open(other.__dict__[k]) as f2:
if f1.read() != f2.read():
return
except OSError:
# fall back to only compare filenames in case we are
# testing the passing of filenames to the config
if other.__dict__[k] != v:
return
else:
if other.__dict__[k] != v:
return
else:
if other.__dict__[k] != v:
return
return True
def __repr__(self):
rep = "\n"
for k, v in self.__dict__.items():
val = v
if k in self.FILE_KEYS:
try:
with open(v) as f:
val = "FILE: %s" % str.decode(f.read())
except OSError as e:
val = "ERROR: %s" % str(e)
rep += "\t%s: %s\n" % (k, val)
return "Config(%s\n)" % rep
class TestKubeConfigLoader(BaseTestCase):
TEST_KUBE_CONFIG = {
"current-context": "no_user",
"contexts": [
{
"name": "no_user",
"context": {
"cluster": "default"
}
},
{
"name": "simple_token",
"context": {
"cluster": "default",
"user": "simple_token"
}
},
{
"name": "gcp",
"context": {
"cluster": "default",
"user": "gcp"
}
},
{
"name": "expired_gcp",
"context": {
"cluster": "default",
"user": "expired_gcp"
}
},
{
"name": "expired_gcp_refresh",
"context": {
"cluster": "default",
"user": "expired_gcp_refresh"
}
},
{
"name": "oidc",
"context": {
"cluster": "default",
"user": "oidc"
}
},
{
"name": "azure",
"context": {
"cluster": "default",
"user": "azure"
}
},
{
"name": "azure_num",
"context": {
"cluster": "default",
"user": "azure_num"
}
},
{
"name": "azure_str",
"context": {
"cluster": "default",
"user": "azure_str"
}
},
{
"name": "azure_num_error",
"context": {
"cluster": "default",
"user": "azure_str_error"
}
},
{
"name": "azure_str_error",
"context": {
"cluster": "default",
"user": "azure_str_error"
}
},
{
"name": "expired_oidc",
"context": {
"cluster": "default",
"user": "expired_oidc"
}
},
{
"name": "expired_oidc_with_idp_ca_file",
"context": {
"cluster": "default",
"user": "expired_oidc_with_idp_ca_file"
}
},
{
"name": "expired_oidc_nocert",
"context": {
"cluster": "default",
"user": "expired_oidc_nocert"
}
},
{
"name": "oidc_contains_reserved_character",
"context": {
"cluster": "default",
"user": "oidc_contains_reserved_character"
}
},
{
"name": "oidc_invalid_padding_length",
"context": {
"cluster": "default",
"user": "oidc_invalid_padding_length"
}
},
{
"name": "user_pass",
"context": {
"cluster": "default",
"user": "user_pass"
}
},
{
"name": "ssl",
"context": {
"cluster": "ssl",
"user": "ssl"
}
},
{
"name": "no_ssl_verification",
"context": {
"cluster": "no_ssl_verification",
"user": "ssl"
}
},
{
"name": "ssl-no_file",
"context": {
"cluster": "ssl-no_file",
"user": "ssl-no_file"
}
},
{
"name": "ssl-local-file",
"context": {
"cluster": "ssl-local-file",
"user": "ssl-local-file"
}
},
{
"name": "non_existing_user",
"context": {
"cluster": "default",
"user": "non_existing_user"
}
},
{
"name": "exec_cred_user",
"context": {
"cluster": "default",
"user": "exec_cred_user"
}
},
{
"name": "exec_cred_user_certificate",
"context": {
"cluster": "ssl",
"user": "exec_cred_user_certificate"
}
},
{
"name": "contexttestcmdpath",
"context": {
"cluster": "clustertestcmdpath",
"user": "usertestcmdpath"
}
},
{
"name": "contexttestcmdpathempty",
"context": {
"cluster": "clustertestcmdpath",
"user": "usertestcmdpathempty"
}
},
{
"name": "contexttestcmdpathscope",
"context": {
"cluster": "clustertestcmdpath",
"user": "usertestcmdpathscope"
}
},
{
"name": "tls-server-name",
"context": {
"cluster": "tls-server-name",
"user": "ssl"
}
},
],
"clusters": [
{
"name": "default",
"cluster": {
"server": TEST_HOST
}
},
{
"name": "ssl-no_file",
"cluster": {
"server": TEST_SSL_HOST,
"certificate-authority": TEST_CERTIFICATE_AUTH,
}
},
{
"name": "ssl-local-file",
"cluster": {
"server": TEST_SSL_HOST,
"certificate-authority": "cert_test",
}
},
{
"name": "ssl",
"cluster": {
"server": TEST_SSL_HOST,
"certificate-authority-data":
TEST_CERTIFICATE_AUTH_BASE64,
"insecure-skip-tls-verify": False,
}
},
{
"name": "no_ssl_verification",
"cluster": {
"server": TEST_SSL_HOST,
"insecure-skip-tls-verify": True,
}
},
{
"name": "clustertestcmdpath",
"cluster": {}
},
{
"name": "tls-server-name",
"cluster": {
"server": TEST_SSL_HOST,
"certificate-authority-data":
TEST_CERTIFICATE_AUTH_BASE64,
"insecure-skip-tls-verify": False,
"tls-server-name": TEST_TLS_SERVER_NAME,
}
},
],
"users": [
{
"name": "simple_token",
"user": {
"token": TEST_DATA_BASE64,
"username": TEST_USERNAME, # should be ignored
"password": TEST_PASSWORD, # should be ignored
}
},
{
"name": "gcp",
"user": {
"auth-provider": {
"name": "gcp",
"config": {
"access-token": TEST_DATA_BASE64,
}
},
"token": TEST_DATA_BASE64, # should be ignored
"username": TEST_USERNAME, # should be ignored
"password": TEST_PASSWORD, # should be ignored
}
},
{
"name": "expired_gcp",
"user": {
"auth-provider": {
"name": "gcp",
"config": {
"access-token": TEST_DATA_BASE64,
"expiry": TEST_TOKEN_EXPIRY_PAST, # always in past
}
},
"token": TEST_DATA_BASE64, # should be ignored
"username": TEST_USERNAME, # should be ignored
"password": TEST_PASSWORD, # should be ignored
}
},
# Duplicated from "expired_gcp" so test_load_gcp_token_with_refresh
# is isolated from test_gcp_get_api_key_with_prefix.
{
"name": "expired_gcp_refresh",
"user": {
"auth-provider": {
"name": "gcp",
"config": {
"access-token": TEST_DATA_BASE64,
"expiry": TEST_TOKEN_EXPIRY_PAST, # always in past
}
},
"token": TEST_DATA_BASE64, # should be ignored
"username": TEST_USERNAME, # should be ignored
"password": TEST_PASSWORD, # should be ignored
}
},
{
"name": "oidc",
"user": {
"auth-provider": {
"name": "oidc",
"config": {
"id-token": TEST_OIDC_LOGIN
}
}
}
},
{
"name": "azure",
"user": {
"auth-provider": {
"config": {
"access-token": TEST_AZURE_TOKEN,
"apiserver-id": "00000002-0000-0000-c000-"
"000000000000",
"environment": "AzurePublicCloud",
"refresh-token": "refreshToken",
"tenant-id": "9d2ac018-e843-4e14-9e2b-4e0ddac75433"
},
"name": "azure"
}
}
},
{
"name": "azure_num",
"user": {
"auth-provider": {
"config": {
"access-token": TEST_AZURE_TOKEN,
"apiserver-id": "00000002-0000-0000-c000-"
"000000000000",
"environment": "AzurePublicCloud",
"expires-in": "0",
"expires-on": "156207275",
"refresh-token": "refreshToken",
"tenant-id": "9d2ac018-e843-4e14-9e2b-4e0ddac75433"
},
"name": "azure"
}
}
},
{
"name": "azure_str",
"user": {
"auth-provider": {
"config": {
"access-token": TEST_AZURE_TOKEN,
"apiserver-id": "00000002-0000-0000-c000-"
"000000000000",
"environment": "AzurePublicCloud",
"expires-in": "0",
"expires-on": "2018-10-18 00:52:29.044727",
"refresh-token": "refreshToken",
"tenant-id": "9d2ac018-e843-4e14-9e2b-4e0ddac75433"
},
"name": "azure"
}
}
},
{
"name": "azure_str_error",
"user": {
"auth-provider": {
"config": {
"access-token": TEST_AZURE_TOKEN,
"apiserver-id": "00000002-0000-0000-c000-"
"000000000000",
"environment": "AzurePublicCloud",
"expires-in": "0",
"expires-on": "2018-10-18 00:52",
"refresh-token": "refreshToken",
"tenant-id": "9d2ac018-e843-4e14-9e2b-4e0ddac75433"
},
"name": "azure"
}
}
},
{
"name": "azure_num_error",
"user": {
"auth-provider": {
"config": {
"access-token": TEST_AZURE_TOKEN,
"apiserver-id": "00000002-0000-0000-c000-"
"000000000000",
"environment": "AzurePublicCloud",
"expires-in": "0",
"expires-on": "-1",
"refresh-token": "refreshToken",
"tenant-id": "9d2ac018-e843-4e14-9e2b-4e0ddac75433"
},
"name": "azure"
}
}
},
{
"name": "expired_oidc",
"user": {
"auth-provider": {
"name": "oidc",
"config": {
"client-id": "tectonic-kubectl",
"client-secret": "FAKE_SECRET",
"id-token": TEST_OIDC_EXPIRED_LOGIN,
"idp-certificate-authority-data": TEST_OIDC_CA,
"idp-issuer-url": "https://example.org/identity",
"refresh-token":
"lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk"
}
}
}
},
{
"name": "expired_oidc_with_idp_ca_file",
"user": {
"auth-provider": {
"name": "oidc",
"config": {
"client-id": "tectonic-kubectl",
"client-secret": "FAKE_SECRET",
"id-token": TEST_OIDC_EXPIRED_LOGIN,
"idp-certificate-authority": TEST_CERTIFICATE_AUTH,
"idp-issuer-url": "https://example.org/identity",
"refresh-token":
"lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk"
}
}
}
},
{
"name": "expired_oidc_nocert",
"user": {
"auth-provider": {
"name": "oidc",
"config": {
"client-id": "tectonic-kubectl",
"client-secret": "FAKE_SECRET",
"id-token": TEST_OIDC_EXPIRED_LOGIN,
"idp-issuer-url": "https://example.org/identity",
"refresh-token":
"lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk"
}
}
}
},
{
"name": "oidc_contains_reserved_character",
"user": {
"auth-provider": {
"name": "oidc",
"config": {
"client-id": "tectonic-kubectl",
"client-secret": "FAKE_SECRET",
"id-token": TEST_OIDC_CONTAINS_RESERVED_CHARACTERS,
"idp-issuer-url": "https://example.org/identity",
"refresh-token":
"lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk"
}
}
}
},
{
"name": "oidc_invalid_padding_length",
"user": {
"auth-provider": {
"name": "oidc",
"config": {
"client-id": "tectonic-kubectl",
"client-secret": "FAKE_SECRET",
"id-token": TEST_OIDC_INVALID_PADDING_LENGTH,
"idp-issuer-url": "https://example.org/identity",
"refresh-token":
"lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk"
}
}
}
},
{
"name": "user_pass",
"user": {
"username": TEST_USERNAME, # should be ignored
"password": TEST_PASSWORD, # should be ignored
}
},
{
"name": "ssl-no_file",
"user": {
"token": TEST_DATA_BASE64,
"client-certificate": TEST_CLIENT_CERT,
"client-key": TEST_CLIENT_KEY,
}
},
{
"name": "ssl-local-file",
"user": {
"tokenFile": "token_file",
"client-certificate": "client_cert",
"client-key": "client_key",
}
},
{
"name": "ssl",
"user": {
"token": TEST_DATA_BASE64,
"client-certificate-data": TEST_CLIENT_CERT_BASE64,
"client-key-data": TEST_CLIENT_KEY_BASE64,
}
},
{
"name": "exec_cred_user",
"user": {
"exec": {
"apiVersion": "client.authentication.k8s.io/v1beta1",
"command": "aws-iam-authenticator",
"args": ["token", "-i", "dummy-cluster"]
}
}
},
{
"name": "exec_cred_user_certificate",
"user": {
"exec": {
"apiVersion": "client.authentication.k8s.io/v1beta1",
"command": "custom-certificate-authenticator",
"args": []
}
}
},
{
"name": "usertestcmdpath",
"user": {
"auth-provider": {
"name": "gcp",
"config": {
"cmd-path": "cmdtorun"
}
}
}
},
{
"name": "usertestcmdpathempty",
"user": {
"auth-provider": {
"name": "gcp",
"config": {
"cmd-path": ""
}
}
}
},
{
"name": "usertestcmdpathscope",
"user": {
"auth-provider": {
"name": "gcp",
"config": {
"cmd-path": "cmd",
"scopes": "scope"
}
}
}
}
]
}
def test_no_user_context(self):
expected = FakeConfig(host=TEST_HOST)
actual = FakeConfig()
KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="no_user").load_and_set(actual)
self.assertEqual(expected, actual)
def test_simple_token(self):
expected = FakeConfig(host=TEST_HOST,
token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64)
actual = FakeConfig()
KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="simple_token").load_and_set(actual)
self.assertEqual(expected, actual)
def test_load_user_token(self):
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="simple_token")
self.assertTrue(loader._load_user_token())
self.assertEqual(BEARER_TOKEN_FORMAT % TEST_DATA_BASE64, loader.token)
def test_gcp_no_refresh(self):
fake_config = FakeConfig()
self.assertIsNone(fake_config.refresh_api_key_hook)
KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="gcp",
get_google_credentials=lambda: _raise_exception(
"SHOULD NOT BE CALLED")).load_and_set(fake_config)
# Should now be populated with a gcp token fetcher.
self.assertIsNotNone(fake_config.refresh_api_key_hook)
self.assertEqual(TEST_HOST, fake_config.host)
self.assertEqual(BEARER_TOKEN_FORMAT % TEST_DATA_BASE64,
fake_config.api_key['authorization'])
def test_load_gcp_token_no_refresh(self):
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="gcp",
get_google_credentials=lambda: _raise_exception(
"SHOULD NOT BE CALLED"))
self.assertTrue(loader._load_auth_provider_token())
self.assertEqual(BEARER_TOKEN_FORMAT % TEST_DATA_BASE64,
loader.token)
def test_load_gcp_token_with_refresh(self):
def cred(): return None
cred.token = TEST_ANOTHER_DATA_BASE64
cred.expiry = datetime.datetime.now(tz=UTC).replace(tzinfo=None)
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="expired_gcp",
get_google_credentials=lambda: cred)
original_expiry = _get_expiry(loader, "expired_gcp")
self.assertTrue(loader._load_auth_provider_token())
new_expiry = _get_expiry(loader, "expired_gcp")
# assert that the configs expiry actually updates
self.assertTrue(new_expiry > original_expiry)
self.assertEqual(BEARER_TOKEN_FORMAT % TEST_ANOTHER_DATA_BASE64,
loader.token)
def test_gcp_refresh_api_key_hook(self):
class cred_old:
token = TEST_DATA_BASE64
expiry = DATETIME_EXPIRY_PAST
class cred_new:
token = TEST_ANOTHER_DATA_BASE64
expiry = DATETIME_EXPIRY_FUTURE
fake_config = FakeConfig()
_get_google_credentials = mock.Mock()
_get_google_credentials.side_effect = [cred_old, cred_new]
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="expired_gcp_refresh",
get_google_credentials=_get_google_credentials)
loader.load_and_set(fake_config)
original_expiry = _get_expiry(loader, "expired_gcp_refresh")
# Refresh the GCP token.
fake_config.refresh_api_key_hook(fake_config)
new_expiry = _get_expiry(loader, "expired_gcp_refresh")
self.assertTrue(new_expiry > original_expiry)
self.assertEqual(BEARER_TOKEN_FORMAT % TEST_ANOTHER_DATA_BASE64,
loader.token)
def test_oidc_no_refresh(self):
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="oidc",
)
self.assertTrue(loader._load_auth_provider_token())
self.assertEqual(TEST_OIDC_TOKEN, loader.token)
@mock.patch('kubernetes.config.kube_config.OAuth2Session.refresh_token')
@mock.patch('kubernetes.config.kube_config.ApiClient.request')
def test_oidc_with_refresh(self, mock_ApiClient, mock_OAuth2Session):
mock_response = mock.MagicMock()
type(mock_response).status = mock.PropertyMock(
return_value=200
)
type(mock_response).data = mock.PropertyMock(
return_value=json.dumps({
"token_endpoint": "https://example.org/identity/token"
})
)
mock_ApiClient.return_value = mock_response
mock_OAuth2Session.return_value = {"id_token": "abc123",
"refresh_token": "newtoken123"}
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="expired_oidc",
)
self.assertTrue(loader._load_auth_provider_token())
self.assertEqual("Bearer abc123", loader.token)
@mock.patch('kubernetes.config.kube_config.OAuth2Session.refresh_token')
@mock.patch('kubernetes.config.kube_config.ApiClient.request')
def test_oidc_with_idp_ca_file_refresh(self, mock_ApiClient, mock_OAuth2Session):
mock_response = mock.MagicMock()
type(mock_response).status = mock.PropertyMock(
return_value=200
)
type(mock_response).data = mock.PropertyMock(
return_value=json.dumps({
"token_endpoint": "https://example.org/identity/token"
})
)
mock_ApiClient.return_value = mock_response
mock_OAuth2Session.return_value = {"id_token": "abc123",
"refresh_token": "newtoken123"}
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="expired_oidc_with_idp_ca_file",
)
self.assertTrue(loader._load_auth_provider_token())
self.assertEqual("Bearer abc123", loader.token)
@mock.patch('kubernetes.config.kube_config.OAuth2Session.refresh_token')
@mock.patch('kubernetes.config.kube_config.ApiClient.request')
def test_oidc_with_refresh_nocert(
self, mock_ApiClient, mock_OAuth2Session):
mock_response = mock.MagicMock()
type(mock_response).status = mock.PropertyMock(
return_value=200
)
type(mock_response).data = mock.PropertyMock(
return_value=json.dumps({
"token_endpoint": "https://example.org/identity/token"
})
)
mock_ApiClient.return_value = mock_response
mock_OAuth2Session.return_value = {"id_token": "abc123",
"refresh_token": "newtoken123"}
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="expired_oidc_nocert",
)
self.assertTrue(loader._load_auth_provider_token())
self.assertEqual("Bearer abc123", loader.token)
def test_oidc_fails_if_contains_reserved_chars(self):
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="oidc_contains_reserved_character",
)
self.assertEqual(
loader._load_oid_token("oidc_contains_reserved_character"),
None,
)
def test_oidc_fails_if_invalid_padding_length(self):
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="oidc_invalid_padding_length",
)
self.assertEqual(
loader._load_oid_token("oidc_invalid_padding_length"),
None,
)
def test_azure_no_refresh(self):
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="azure",
)
self.assertTrue(loader._load_auth_provider_token())
self.assertEqual(TEST_AZURE_TOKEN_FULL, loader.token)
def test_azure_with_expired_num(self):
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="azure_num",
)
provider = loader._user['auth-provider']
self.assertTrue(loader._azure_is_expired(provider))
def test_azure_with_expired_str(self):
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="azure_str",
)
provider = loader._user['auth-provider']
self.assertTrue(loader._azure_is_expired(provider))
def test_azure_with_expired_str_error(self):
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="azure_str_error",
)
provider = loader._user['auth-provider']
self.assertRaises(ValueError, loader._azure_is_expired, provider)
def test_azure_with_expired_int_error(self):
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="azure_num_error",
)
provider = loader._user['auth-provider']
self.assertRaises(ValueError, loader._azure_is_expired, provider)
def test_user_pass(self):
expected = FakeConfig(host=TEST_HOST, token=TEST_BASIC_TOKEN)
actual = FakeConfig()
KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="user_pass").load_and_set(actual)
self.assertEqual(expected, actual)
def test_load_user_pass_token(self):
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="user_pass")
self.assertTrue(loader._load_user_pass_token())
self.assertEqual(TEST_BASIC_TOKEN, loader.token)
def test_ssl_no_cert_files(self):
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="ssl-no_file")
self.expect_exception(
loader.load_and_set,
"does not exist",
FakeConfig())
def test_ssl(self):
expected = FakeConfig(
host=TEST_SSL_HOST,
token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64,
cert_file=self._create_temp_file(TEST_CLIENT_CERT),
key_file=self._create_temp_file(TEST_CLIENT_KEY),
ssl_ca_cert=self._create_temp_file(TEST_CERTIFICATE_AUTH),
verify_ssl=True
)
actual = FakeConfig()
KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="ssl").load_and_set(actual)
self.assertEqual(expected, actual)
def test_ssl_no_verification(self):
expected = FakeConfig(
host=TEST_SSL_HOST,
token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64,
cert_file=self._create_temp_file(TEST_CLIENT_CERT),
key_file=self._create_temp_file(TEST_CLIENT_KEY),
verify_ssl=False,
ssl_ca_cert=None,
)
actual = FakeConfig()
KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="no_ssl_verification").load_and_set(actual)
self.assertEqual(expected, actual)
def test_tls_server_name(self):
expected = FakeConfig(
host=TEST_SSL_HOST,
token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64,
cert_file=self._create_temp_file(TEST_CLIENT_CERT),
key_file=self._create_temp_file(TEST_CLIENT_KEY),
ssl_ca_cert=self._create_temp_file(TEST_CERTIFICATE_AUTH),
verify_ssl=True,
tls_server_name=TEST_TLS_SERVER_NAME
)
actual = FakeConfig()
KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="tls-server-name").load_and_set(actual)
self.assertEqual(expected, actual)
def test_list_contexts(self):
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="no_user")
actual_contexts = loader.list_contexts()
expected_contexts = ConfigNode("", self.TEST_KUBE_CONFIG)['contexts']
for actual in actual_contexts:
expected = expected_contexts.get_with_name(actual['name'])
self.assertEqual(expected.value, actual)
def test_current_context(self):
loader = KubeConfigLoader(config_dict=self.TEST_KUBE_CONFIG)
expected_contexts = ConfigNode("", self.TEST_KUBE_CONFIG)['contexts']
self.assertEqual(expected_contexts.get_with_name("no_user").value,
loader.current_context)
def test_set_active_context(self):
loader = KubeConfigLoader(config_dict=self.TEST_KUBE_CONFIG)
loader.set_active_context("ssl")
expected_contexts = ConfigNode("", self.TEST_KUBE_CONFIG)['contexts']
self.assertEqual(expected_contexts.get_with_name("ssl").value,
loader.current_context)
def test_ssl_with_relative_ssl_files(self):
expected = FakeConfig(
host=TEST_SSL_HOST,
token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64,
cert_file=self._create_temp_file(TEST_CLIENT_CERT),
key_file=self._create_temp_file(TEST_CLIENT_KEY),
ssl_ca_cert=self._create_temp_file(TEST_CERTIFICATE_AUTH)
)
try:
temp_dir = tempfile.mkdtemp()
actual = FakeConfig()
with open(os.path.join(temp_dir, "cert_test"), "wb") as fd:
fd.write(TEST_CERTIFICATE_AUTH.encode())
with open(os.path.join(temp_dir, "client_cert"), "wb") as fd:
fd.write(TEST_CLIENT_CERT.encode())
with open(os.path.join(temp_dir, "client_key"), "wb") as fd:
fd.write(TEST_CLIENT_KEY.encode())
with open(os.path.join(temp_dir, "token_file"), "wb") as fd:
fd.write(TEST_DATA_BASE64.encode())
KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="ssl-local-file",
config_base_path=temp_dir).load_and_set(actual)
self.assertEqual(expected, actual)
finally:
shutil.rmtree(temp_dir)
def test_load_kube_config_from_file_path(self):
expected = FakeConfig(host=TEST_HOST,
token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64)
config_file = self._create_temp_file(
yaml.safe_dump(self.TEST_KUBE_CONFIG))
actual = FakeConfig()
load_kube_config(config_file=config_file, context="simple_token",
client_configuration=actual)
self.assertEqual(expected, actual)
def test_load_kube_config_from_file_like_object(self):
expected = FakeConfig(host=TEST_HOST,
token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64)
config_file_like_object = io.StringIO()
# py3 (won't have unicode) vs py2 (requires it)
try:
unicode('')
config_file_like_object.write(
unicode(
yaml.safe_dump(
self.TEST_KUBE_CONFIG),
errors='replace'))
except NameError:
config_file_like_object.write(
yaml.safe_dump(
self.TEST_KUBE_CONFIG))
actual = FakeConfig()
load_kube_config(
config_file=config_file_like_object,
context="simple_token",
client_configuration=actual)
self.assertEqual(expected, actual)
def test_load_kube_config_from_dict(self):
expected = FakeConfig(host=TEST_HOST,
token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64)
actual = FakeConfig()
load_kube_config_from_dict(config_dict=self.TEST_KUBE_CONFIG,
context="simple_token",
client_configuration=actual)
self.assertEqual(expected, actual)
def test_load_kube_config_from_dict_with_temp_file_path(self):
expected = FakeConfig(
host=TEST_SSL_HOST,
token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64,
cert_file=self._create_temp_file(TEST_CLIENT_CERT),
key_file=self._create_temp_file(TEST_CLIENT_KEY),
ssl_ca_cert=self._create_temp_file(TEST_CERTIFICATE_AUTH),
verify_ssl=True
)
actual = FakeConfig()
tmp_path = os.path.join(
os.path.dirname(
os.path.dirname(
os.path.abspath(__file__))),
'tmp_file_path_test')
load_kube_config_from_dict(config_dict=self.TEST_KUBE_CONFIG,
context="ssl",
client_configuration=actual,
temp_file_path=tmp_path)
self.assertFalse(True if not os.listdir(tmp_path) else False)
self.assertEqual(expected, actual)
_cleanup_temp_files()
def test_load_kube_config_from_empty_file_like_object(self):
config_file_like_object = io.StringIO()
self.assertRaises(
ConfigException,
load_kube_config,
config_file_like_object)
def test_load_kube_config_from_empty_file(self):
config_file = self._create_temp_file(
yaml.safe_dump(None))
self.assertRaises(
ConfigException,
load_kube_config,
config_file)
def test_list_kube_config_contexts(self):
config_file = self._create_temp_file(
yaml.safe_dump(self.TEST_KUBE_CONFIG))
contexts, active_context = list_kube_config_contexts(
config_file=config_file)
self.assertDictEqual(self.TEST_KUBE_CONFIG['contexts'][0],
active_context)
if PY3:
self.assertCountEqual(self.TEST_KUBE_CONFIG['contexts'],
contexts)
else:
self.assertItemsEqual(self.TEST_KUBE_CONFIG['contexts'],
contexts)
def test_new_client_from_config(self):
config_file = self._create_temp_file(
yaml.safe_dump(self.TEST_KUBE_CONFIG))
client = new_client_from_config(
config_file=config_file, context="simple_token")
self.assertEqual(TEST_HOST, client.configuration.host)
self.assertEqual(BEARER_TOKEN_FORMAT % TEST_DATA_BASE64,
client.configuration.api_key['authorization'])
def test_new_client_from_config_dict(self):
client = new_client_from_config_dict(
config_dict=self.TEST_KUBE_CONFIG, context="simple_token")
self.assertEqual(TEST_HOST, client.configuration.host)
self.assertEqual(BEARER_TOKEN_FORMAT % TEST_DATA_BASE64,
client.configuration.api_key['authorization'])
def test_no_users_section(self):
expected = FakeConfig(host=TEST_HOST)
actual = FakeConfig()
test_kube_config = self.TEST_KUBE_CONFIG.copy()
del test_kube_config['users']
KubeConfigLoader(
config_dict=test_kube_config,
active_context="gcp").load_and_set(actual)
self.assertEqual(expected, actual)
def test_non_existing_user(self):
expected = FakeConfig(host=TEST_HOST)
actual = FakeConfig()
KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="non_existing_user").load_and_set(actual)
self.assertEqual(expected, actual)
@mock.patch('kubernetes.config.kube_config.ExecProvider.run')
def test_user_exec_auth(self, mock):
token = "dummy"
mock.return_value = {
"token": token
}
expected = FakeConfig(host=TEST_HOST, api_key={
"authorization": BEARER_TOKEN_FORMAT % token})
actual = FakeConfig()
KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="exec_cred_user").load_and_set(actual)
self.assertEqual(expected, actual)
@mock.patch('kubernetes.config.kube_config.ExecProvider.run')
def test_user_exec_auth_with_expiry(self, mock):
expired_token = "expired"
current_token = "current"
mock.side_effect = [
{
"token": expired_token,
"expirationTimestamp": format_rfc3339(DATETIME_EXPIRY_PAST)
},
{
"token": current_token,
"expirationTimestamp": format_rfc3339(DATETIME_EXPIRY_FUTURE)
}
]
fake_config = FakeConfig()
self.assertIsNone(fake_config.refresh_api_key_hook)
KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="exec_cred_user").load_and_set(fake_config)
# The kube config should use the first token returned from the
# exec provider.
self.assertEqual(fake_config.api_key["authorization"],
BEARER_TOKEN_FORMAT % expired_token)
# Should now be populated with a method to refresh expired tokens.
self.assertIsNotNone(fake_config.refresh_api_key_hook)
# Refresh the token; the kube config should be updated.
fake_config.refresh_api_key_hook(fake_config)
self.assertEqual(fake_config.api_key["authorization"],
BEARER_TOKEN_FORMAT % current_token)
@mock.patch('kubernetes.config.kube_config.ExecProvider.run')
def test_user_exec_auth_certificates(self, mock):
mock.return_value = {
"clientCertificateData": TEST_CLIENT_CERT,
"clientKeyData": TEST_CLIENT_KEY,
}
expected = FakeConfig(
host=TEST_SSL_HOST,
cert_file=self._create_temp_file(TEST_CLIENT_CERT),
key_file=self._create_temp_file(TEST_CLIENT_KEY),
ssl_ca_cert=self._create_temp_file(TEST_CERTIFICATE_AUTH),
verify_ssl=True)
actual = FakeConfig()
KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="exec_cred_user_certificate").load_and_set(actual)
self.assertEqual(expected, actual)
@mock.patch('kubernetes.config.kube_config.ExecProvider.run', autospec=True)
def test_user_exec_cwd(self, mock):
capture = {}
def capture_cwd(exec_provider):
capture['cwd'] = exec_provider.cwd
mock.side_effect = capture_cwd
expected = "/some/random/path"
KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="exec_cred_user",
config_base_path=expected).load_and_set(FakeConfig())
self.assertEqual(expected, capture['cwd'])
def test_user_cmd_path(self):
A = namedtuple('A', ['token', 'expiry'])
token = "dummy"
return_value = A(token, parse_rfc3339(datetime.datetime.now()))
CommandTokenSource.token = mock.Mock(return_value=return_value)
expected = FakeConfig(api_key={
"authorization": BEARER_TOKEN_FORMAT % token})
actual = FakeConfig()
KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="contexttestcmdpath").load_and_set(actual)
self.assertEqual(expected, actual)
def test_user_cmd_path_empty(self):
A = namedtuple('A', ['token', 'expiry'])
token = "dummy"
return_value = A(token, parse_rfc3339(datetime.datetime.now()))
CommandTokenSource.token = mock.Mock(return_value=return_value)
expected = FakeConfig(api_key={
"authorization": BEARER_TOKEN_FORMAT % token})
actual = FakeConfig()
self.expect_exception(lambda: KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="contexttestcmdpathempty").load_and_set(actual),
"missing access token cmd "
"(cmd-path is an empty string in your kubeconfig file)")
def test_user_cmd_path_with_scope(self):
A = namedtuple('A', ['token', 'expiry'])
token = "dummy"
return_value = A(token, parse_rfc3339(datetime.datetime.now()))
CommandTokenSource.token = mock.Mock(return_value=return_value)
expected = FakeConfig(api_key={
"authorization": BEARER_TOKEN_FORMAT % token})
actual = FakeConfig()
self.expect_exception(lambda: KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="contexttestcmdpathscope").load_and_set(actual),
"scopes can only be used when kubectl is using "
"a gcp service account key")
def test__get_kube_config_loader_for_yaml_file_no_persist(self):
expected = FakeConfig(host=TEST_HOST,
token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64)
config_file = self._create_temp_file(
yaml.safe_dump(self.TEST_KUBE_CONFIG))
actual = _get_kube_config_loader_for_yaml_file(config_file)
self.assertIsNone(actual._config_persister)
def test__get_kube_config_loader_for_yaml_file_persist(self):
expected = FakeConfig(host=TEST_HOST,
token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64)
config_file = self._create_temp_file(
yaml.safe_dump(self.TEST_KUBE_CONFIG))
actual = _get_kube_config_loader_for_yaml_file(config_file,
persist_config=True)
self.assertTrue(callable(actual._config_persister))
self.assertEqual(actual._config_persister.__name__, "save_changes")
def test__get_kube_config_loader_file_no_persist(self):
expected = FakeConfig(host=TEST_HOST,
token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64)
config_file = self._create_temp_file(
yaml.safe_dump(self.TEST_KUBE_CONFIG))
actual = _get_kube_config_loader(filename=config_file)
self.assertIsNone(actual._config_persister)
def test__get_kube_config_loader_file_persist(self):
expected = FakeConfig(host=TEST_HOST,
token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64)
config_file = self._create_temp_file(
yaml.safe_dump(self.TEST_KUBE_CONFIG))
actual = _get_kube_config_loader(filename=config_file,
persist_config=True)
self.assertTrue(callable(actual._config_persister))
self.assertEqual(actual._config_persister.__name__, "save_changes")
def test__get_kube_config_loader_dict_no_persist(self):
expected = FakeConfig(host=TEST_HOST,
token=BEARER_TOKEN_FORMAT % TEST_DATA_BASE64)
actual = _get_kube_config_loader(
config_dict=self.TEST_KUBE_CONFIG)
self.assertIsNone(actual._config_persister)
class TestKubernetesClientConfiguration(BaseTestCase):
# Verifies properties of kubernetes.client.Configuration.
# These tests guard against changes to the upstream configuration class,
# since GCP and Exec authorization use refresh_api_key_hook to refresh
# their tokens regularly.
def test_refresh_api_key_hook_exists(self):
self.assertTrue(hasattr(Configuration(), 'refresh_api_key_hook'))
def test_get_api_key_calls_refresh_api_key_hook(self):
identifier = 'authorization'
expected_token = 'expected_token'
old_token = 'old_token'
config = Configuration(
api_key={identifier: old_token},
api_key_prefix={identifier: 'Bearer'}
)
def refresh_api_key_hook(client_config):
self.assertEqual(client_config, config)
client_config.api_key[identifier] = expected_token
config.refresh_api_key_hook = refresh_api_key_hook
self.assertEqual('Bearer ' + expected_token,
config.get_api_key_with_prefix(identifier))
class TestKubeConfigMerger(BaseTestCase):
TEST_KUBE_CONFIG_SET1 = [{
"current-context": "no_user",
"contexts": [
{
"name": "no_user",
"context": {
"cluster": "default"
}
},
],
"clusters": [
{
"name": "default",
"cluster": {
"server": TEST_HOST
}
},
],
"users": []
}, {
"current-context": "",
"contexts": [
{
"name": "ssl",
"context": {
"cluster": "ssl",
"user": "ssl"
}
},
{
"name": "simple_token",
"context": {
"cluster": "default",
"user": "simple_token"
}
},
],
"clusters": [
{
"name": "ssl",
"cluster": {
"server": TEST_SSL_HOST,
"certificate-authority-data":
TEST_CERTIFICATE_AUTH_BASE64,
}
},
],
"users": [
{
"name": "ssl",
"user": {
"token": TEST_DATA_BASE64,
"client-certificate-data": TEST_CLIENT_CERT_BASE64,
"client-key-data": TEST_CLIENT_KEY_BASE64,
}
},
]
}, {
"current-context": "no_user",
"contexts": [
{
"name": "expired_oidc",
"context": {
"cluster": "default",
"user": "expired_oidc"
}
},
{
"name": "ssl",
"context": {
"cluster": "skipped-part2-defined-this-context",
"user": "skipped"
}
},
],
"clusters": [
],
"users": [
{
"name": "expired_oidc",
"user": {
"auth-provider": {
"name": "oidc",
"config": {
"client-id": "tectonic-kubectl",
"client-secret": "FAKE_SECRET",
"id-token": TEST_OIDC_EXPIRED_LOGIN,
"idp-certificate-authority-data": TEST_OIDC_CA,
"idp-issuer-url": "https://example.org/identity",
"refresh-token":
"lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk"
}
}
}
},
{
"name": "simple_token",
"user": {
"token": TEST_DATA_BASE64,
"username": TEST_USERNAME, # should be ignored
"password": TEST_PASSWORD, # should be ignored
}
},
]
}, {
"current-context": "no_user",
}, {
# Config with user having cmd-path
"contexts": [
{
"name": "contexttestcmdpath",
"context": {
"cluster": "clustertestcmdpath",
"user": "usertestcmdpath"
}
}
],
"clusters": [
{
"name": "clustertestcmdpath",
"cluster": {}
}
],
"users": [
{
"name": "usertestcmdpath",
"user": {
"auth-provider": {
"name": "gcp",
"config": {
"cmd-path": "cmdtorun"
}
}
}
}
]
}, {
"current-context": "no_user",
"contexts": [
{
"name": "no_user",
"context": {
"cluster": "default"
}
},
],
"clusters": [
{
"name": "default",
"cluster": {
"server": TEST_HOST
}
},
],
"users": None
}]
# 3 parts with different keys/data to merge
TEST_KUBE_CONFIG_SET2 = [{
"clusters": [
{
"name": "default",
"cluster": {
"server": TEST_HOST
}
},
],
}, {
"current-context": "simple_token",
"contexts": [
{
"name": "simple_token",
"context": {
"cluster": "default",
"user": "simple_token"
}
},
],
}, {
"users": [
{
"name": "simple_token",
"user": {
"token": TEST_DATA_BASE64,
"username": TEST_USERNAME,
"password": TEST_PASSWORD,
}
},
]
}]
def _create_multi_config(self, parts):
files = []
for part in parts:
files.append(self._create_temp_file(yaml.safe_dump(part)))
return ENV_KUBECONFIG_PATH_SEPARATOR.join(files)
def test_list_kube_config_contexts(self):
kubeconfigs = self._create_multi_config(self.TEST_KUBE_CONFIG_SET1)
expected_contexts = [
{'context': {'cluster': 'default'}, 'name': 'no_user'},
{'context': {'cluster': 'ssl', 'user': 'ssl'}, 'name': 'ssl'},
{'context': {'cluster': 'default', 'user': 'simple_token'},
'name': 'simple_token'},
{'context': {'cluster': 'default', 'user': 'expired_oidc'},
'name': 'expired_oidc'},
{'context': {'cluster': 'clustertestcmdpath',
'user': 'usertestcmdpath'},
'name': 'contexttestcmdpath'}]
contexts, active_context = list_kube_config_contexts(
config_file=kubeconfigs)
self.assertEqual(contexts, expected_contexts)
self.assertEqual(active_context, expected_contexts[0])
def test_new_client_from_config(self):
kubeconfigs = self._create_multi_config(self.TEST_KUBE_CONFIG_SET1)
client = new_client_from_config(
config_file=kubeconfigs, context="simple_token")
self.assertEqual(TEST_HOST, client.configuration.host)
self.assertEqual(BEARER_TOKEN_FORMAT % TEST_DATA_BASE64,
client.configuration.api_key['authorization'])
def test_merge_with_context_in_different_file(self):
kubeconfigs = self._create_multi_config(self.TEST_KUBE_CONFIG_SET2)
client = new_client_from_config(config_file=kubeconfigs)
expected_contexts = [
{'context': {'cluster': 'default', 'user': 'simple_token'},
'name': 'simple_token'}
]
contexts, active_context = list_kube_config_contexts(
config_file=kubeconfigs)
self.assertEqual(contexts, expected_contexts)
self.assertEqual(active_context, expected_contexts[0])
self.assertEqual(TEST_HOST, client.configuration.host)
self.assertEqual(BEARER_TOKEN_FORMAT % TEST_DATA_BASE64,
client.configuration.api_key['authorization'])
def test_save_changes(self):
kubeconfigs = self._create_multi_config(self.TEST_KUBE_CONFIG_SET1)
# load configuration, update token, save config
kconf = KubeConfigMerger(kubeconfigs)
user = kconf.config['users'].get_with_name('expired_oidc')['user']
provider = user['auth-provider']['config']
provider.value['id-token'] = "token-changed"
kconf.save_changes()
# re-read configuration
kconf = KubeConfigMerger(kubeconfigs)
user = kconf.config['users'].get_with_name('expired_oidc')['user']
provider = user['auth-provider']['config']
# new token
self.assertEqual(provider.value['id-token'], "token-changed")
if __name__ == '__main__':
unittest.main()