This repository has been archived on 2025-01-23. You can view files and clone it, but cannot push or open issues or pull requests.
traefik-certmanager/kubernetes/base/dynamic/test_client.py

572 lines
20 KiB
Python
Raw Permalink Normal View History

# Copyright 2019 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import unittest
import uuid
from kubernetes.e2e_test import base
from kubernetes.client import api_client
from . import DynamicClient
from .resource import ResourceInstance, ResourceField
from .exceptions import ResourceNotFoundError
def short_uuid():
id = str(uuid.uuid4())
return id[-12:]
class TestDynamicClient(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.config = base.get_e2e_configuration()
def test_cluster_custom_resources(self):
client = DynamicClient(api_client.ApiClient(configuration=self.config))
with self.assertRaises(ResourceNotFoundError):
changeme_api = client.resources.get(
api_version='apps.example.com/v1', kind='ClusterChangeMe')
crd_api = client.resources.get(
api_version='apiextensions.k8s.io/v1beta1',
kind='CustomResourceDefinition')
name = 'clusterchangemes.apps.example.com'
crd_manifest = {
'apiVersion': 'apiextensions.k8s.io/v1beta1',
'kind': 'CustomResourceDefinition',
'metadata': {
'name': name,
},
'spec': {
'group': 'apps.example.com',
'names': {
'kind': 'ClusterChangeMe',
'listKind': 'ClusterChangeMeList',
'plural': 'clusterchangemes',
'singular': 'clusterchangeme',
},
'scope': 'Cluster',
'version': 'v1',
'subresources': {
'status': {}
}
}
}
resp = crd_api.create(crd_manifest)
self.assertEqual(name, resp.metadata.name)
self.assertTrue(resp.status)
resp = crd_api.get(
name=name,
)
self.assertEqual(name, resp.metadata.name)
self.assertTrue(resp.status)
try:
changeme_api = client.resources.get(
api_version='apps.example.com/v1', kind='ClusterChangeMe')
except ResourceNotFoundError:
# Need to wait a sec for the discovery layer to get updated
time.sleep(2)
changeme_api = client.resources.get(
api_version='apps.example.com/v1', kind='ClusterChangeMe')
resp = changeme_api.get()
self.assertEqual(resp.items, [])
changeme_name = 'custom-resource' + short_uuid()
changeme_manifest = {
'apiVersion': 'apps.example.com/v1',
'kind': 'ClusterChangeMe',
'metadata': {
'name': changeme_name,
},
'spec': {}
}
resp = changeme_api.create(body=changeme_manifest)
self.assertEqual(resp.metadata.name, changeme_name)
resp = changeme_api.get(name=changeme_name)
self.assertEqual(resp.metadata.name, changeme_name)
changeme_manifest['spec']['size'] = 3
resp = changeme_api.patch(
body=changeme_manifest,
content_type='application/merge-patch+json'
)
self.assertEqual(resp.spec.size, 3)
resp = changeme_api.get(name=changeme_name)
self.assertEqual(resp.spec.size, 3)
resp = changeme_api.get()
self.assertEqual(len(resp.items), 1)
resp = changeme_api.delete(
name=changeme_name,
)
resp = changeme_api.get()
self.assertEqual(len(resp.items), 0)
resp = crd_api.delete(
name=name,
)
time.sleep(2)
client.resources.invalidate_cache()
with self.assertRaises(ResourceNotFoundError):
changeme_api = client.resources.get(
api_version='apps.example.com/v1', kind='ClusterChangeMe')
def test_async_namespaced_custom_resources(self):
client = DynamicClient(api_client.ApiClient(configuration=self.config))
with self.assertRaises(ResourceNotFoundError):
changeme_api = client.resources.get(
api_version='apps.example.com/v1', kind='ChangeMe')
crd_api = client.resources.get(
api_version='apiextensions.k8s.io/v1beta1',
kind='CustomResourceDefinition')
name = 'changemes.apps.example.com'
crd_manifest = {
'apiVersion': 'apiextensions.k8s.io/v1beta1',
'kind': 'CustomResourceDefinition',
'metadata': {
'name': name,
},
'spec': {
'group': 'apps.example.com',
'names': {
'kind': 'ChangeMe',
'listKind': 'ChangeMeList',
'plural': 'changemes',
'singular': 'changeme',
},
'scope': 'Namespaced',
'version': 'v1',
'subresources': {
'status': {}
}
}
}
async_resp = crd_api.create(crd_manifest, async_req=True)
self.assertEqual(name, async_resp.metadata.name)
self.assertTrue(async_resp.status)
async_resp = crd_api.get(
name=name,
async_req=True
)
self.assertEqual(name, async_resp.metadata.name)
self.assertTrue(async_resp.status)
try:
changeme_api = client.resources.get(
api_version='apps.example.com/v1', kind='ChangeMe')
except ResourceNotFoundError:
# Need to wait a sec for the discovery layer to get updated
time.sleep(2)
changeme_api = client.resources.get(
api_version='apps.example.com/v1', kind='ChangeMe')
async_resp = changeme_api.get(async_req=True)
self.assertEqual(async_resp.items, [])
changeme_name = 'custom-resource' + short_uuid()
changeme_manifest = {
'apiVersion': 'apps.example.com/v1',
'kind': 'ChangeMe',
'metadata': {
'name': changeme_name,
},
'spec': {}
}
async_resp = changeme_api.create(body=changeme_manifest, namespace='default', async_req=True)
self.assertEqual(async_resp.metadata.name, changeme_name)
async_resp = changeme_api.get(name=changeme_name, namespace='default', async_req=True)
self.assertEqual(async_resp.metadata.name, changeme_name)
changeme_manifest['spec']['size'] = 3
async_resp = changeme_api.patch(
body=changeme_manifest,
namespace='default',
content_type='application/merge-patch+json',
async_req=True
)
self.assertEqual(async_resp.spec.size, 3)
async_resp = changeme_api.get(name=changeme_name, namespace='default', async_req=True)
self.assertEqual(async_resp.spec.size, 3)
async_resp = changeme_api.get(namespace='default', async_req=True)
self.assertEqual(len(async_resp.items), 1)
async_resp = changeme_api.get(async_req=True)
self.assertEqual(len(async_resp.items), 1)
async_resp = changeme_api.delete(
name=changeme_name,
namespace='default',
async_req=True
)
async_resp = changeme_api.get(namespace='default', async_req=True)
self.assertEqual(len(async_resp.items), 0)
async_resp = changeme_api.get(async_req=True)
self.assertEqual(len(async_resp.items), 0)
async_resp = crd_api.delete(
name=name,
async_req=True
)
time.sleep(2)
client.resources.invalidate_cache()
with self.assertRaises(ResourceNotFoundError):
changeme_api = client.resources.get(
api_version='apps.example.com/v1', kind='ChangeMe')
def test_namespaced_custom_resources(self):
client = DynamicClient(api_client.ApiClient(configuration=self.config))
with self.assertRaises(ResourceNotFoundError):
changeme_api = client.resources.get(
api_version='apps.example.com/v1', kind='ChangeMe')
crd_api = client.resources.get(
api_version='apiextensions.k8s.io/v1beta1',
kind='CustomResourceDefinition')
name = 'changemes.apps.example.com'
crd_manifest = {
'apiVersion': 'apiextensions.k8s.io/v1beta1',
'kind': 'CustomResourceDefinition',
'metadata': {
'name': name,
},
'spec': {
'group': 'apps.example.com',
'names': {
'kind': 'ChangeMe',
'listKind': 'ChangeMeList',
'plural': 'changemes',
'singular': 'changeme',
},
'scope': 'Namespaced',
'version': 'v1',
'subresources': {
'status': {}
}
}
}
resp = crd_api.create(crd_manifest)
self.assertEqual(name, resp.metadata.name)
self.assertTrue(resp.status)
resp = crd_api.get(
name=name,
)
self.assertEqual(name, resp.metadata.name)
self.assertTrue(resp.status)
try:
changeme_api = client.resources.get(
api_version='apps.example.com/v1', kind='ChangeMe')
except ResourceNotFoundError:
# Need to wait a sec for the discovery layer to get updated
time.sleep(2)
changeme_api = client.resources.get(
api_version='apps.example.com/v1', kind='ChangeMe')
resp = changeme_api.get()
self.assertEqual(resp.items, [])
changeme_name = 'custom-resource' + short_uuid()
changeme_manifest = {
'apiVersion': 'apps.example.com/v1',
'kind': 'ChangeMe',
'metadata': {
'name': changeme_name,
},
'spec': {}
}
resp = changeme_api.create(body=changeme_manifest, namespace='default')
self.assertEqual(resp.metadata.name, changeme_name)
resp = changeme_api.get(name=changeme_name, namespace='default')
self.assertEqual(resp.metadata.name, changeme_name)
changeme_manifest['spec']['size'] = 3
resp = changeme_api.patch(
body=changeme_manifest,
namespace='default',
content_type='application/merge-patch+json'
)
self.assertEqual(resp.spec.size, 3)
resp = changeme_api.get(name=changeme_name, namespace='default')
self.assertEqual(resp.spec.size, 3)
resp = changeme_api.get(namespace='default')
self.assertEqual(len(resp.items), 1)
resp = changeme_api.get()
self.assertEqual(len(resp.items), 1)
resp = changeme_api.delete(
name=changeme_name,
namespace='default'
)
resp = changeme_api.get(namespace='default')
self.assertEqual(len(resp.items), 0)
resp = changeme_api.get()
self.assertEqual(len(resp.items), 0)
resp = crd_api.delete(
name=name,
)
time.sleep(2)
client.resources.invalidate_cache()
with self.assertRaises(ResourceNotFoundError):
changeme_api = client.resources.get(
api_version='apps.example.com/v1', kind='ChangeMe')
def test_service_apis(self):
client = DynamicClient(api_client.ApiClient(configuration=self.config))
api = client.resources.get(api_version='v1', kind='Service')
name = 'frontend-' + short_uuid()
service_manifest = {'apiVersion': 'v1',
'kind': 'Service',
'metadata': {'labels': {'name': name},
'name': name,
'resourceversion': 'v1'},
'spec': {'ports': [{'name': 'port',
'port': 80,
'protocol': 'TCP',
'targetPort': 80}],
'selector': {'name': name}}}
resp = api.create(
body=service_manifest,
namespace='default'
)
self.assertEqual(name, resp.metadata.name)
self.assertTrue(resp.status)
resp = api.get(
name=name,
namespace='default'
)
self.assertEqual(name, resp.metadata.name)
self.assertTrue(resp.status)
service_manifest['spec']['ports'] = [{'name': 'new',
'port': 8080,
'protocol': 'TCP',
'targetPort': 8080}]
resp = api.patch(
body=service_manifest,
name=name,
namespace='default'
)
self.assertEqual(2, len(resp.spec.ports))
self.assertTrue(resp.status)
resp = api.delete(
name=name, body={},
namespace='default'
)
def test_replication_controller_apis(self):
client = DynamicClient(api_client.ApiClient(configuration=self.config))
api = client.resources.get(
api_version='v1', kind='ReplicationController')
name = 'frontend-' + short_uuid()
rc_manifest = {
'apiVersion': 'v1',
'kind': 'ReplicationController',
'metadata': {'labels': {'name': name},
'name': name},
'spec': {'replicas': 2,
'selector': {'name': name},
'template': {'metadata': {
'labels': {'name': name}},
'spec': {'containers': [{
'image': 'nginx',
'name': 'nginx',
'ports': [{'containerPort': 80,
'protocol': 'TCP'}]}]}}}}
resp = api.create(
body=rc_manifest, namespace='default')
self.assertEqual(name, resp.metadata.name)
self.assertEqual(2, resp.spec.replicas)
resp = api.get(
name=name, namespace='default')
self.assertEqual(name, resp.metadata.name)
self.assertEqual(2, resp.spec.replicas)
api.delete(
name=name,
namespace='default',
propagation_policy='Background')
def test_configmap_apis(self):
client = DynamicClient(api_client.ApiClient(configuration=self.config))
api = client.resources.get(api_version='v1', kind='ConfigMap')
name = 'test-configmap-' + short_uuid()
test_configmap = {
"kind": "ConfigMap",
"apiVersion": "v1",
"metadata": {
"name": name,
"labels": {
"e2e-test": "true",
},
},
"data": {
"config.json": "{\"command\":\"/usr/bin/mysqld_safe\"}",
"frontend.cnf": "[mysqld]\nbind-address = 10.0.0.3\n"
}
}
resp = api.create(
body=test_configmap, namespace='default'
)
self.assertEqual(name, resp.metadata.name)
resp = api.get(
name=name, namespace='default', label_selector="e2e-test=true")
self.assertEqual(name, resp.metadata.name)
count = 0
for _ in client.watch(api, timeout=10, namespace="default", name=name):
count += 1
self.assertTrue(count > 0, msg="no events received for watch")
test_configmap['data']['config.json'] = "{}"
resp = api.patch(
name=name, namespace='default', body=test_configmap)
resp = api.delete(
name=name, body={}, namespace='default')
resp = api.get(
namespace='default',
pretty=True,
label_selector="e2e-test=true")
self.assertEqual([], resp.items)
def test_node_apis(self):
client = DynamicClient(api_client.ApiClient(configuration=self.config))
api = client.resources.get(api_version='v1', kind='Node')
for item in api.get().items:
node = api.get(name=item.metadata.name)
self.assertTrue(len(dict(node.metadata.labels)) > 0)
# test_node_apis_partial_object_metadata lists all nodes in the cluster,
# but only retrieves object metadata
def test_node_apis_partial_object_metadata(self):
client = DynamicClient(api_client.ApiClient(configuration=self.config))
api = client.resources.get(api_version='v1', kind='Node')
params = {
'header_params': {
'Accept': 'application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io'}}
resp = api.get(**params)
self.assertEqual('PartialObjectMetadataList', resp.kind)
self.assertEqual('meta.k8s.io/v1', resp.apiVersion)
params = {
'header_params': {
'aCcePt': 'application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io'}}
resp = api.get(**params)
self.assertEqual('PartialObjectMetadataList', resp.kind)
self.assertEqual('meta.k8s.io/v1', resp.apiVersion)
def test_server_side_apply_api(self):
client = DynamicClient(api_client.ApiClient(configuration=self.config))
api = client.resources.get(
api_version='v1', kind='Pod')
name = 'pod-' + short_uuid()
pod_manifest = {
'apiVersion': 'v1',
'kind': 'Pod',
'metadata': {'labels': {'name': name},
'name': name},
'spec': {'containers': [{
'image': 'nginx',
'name': 'nginx',
'ports': [{'containerPort': 80,
'protocol': 'TCP'}]}]}}
resp = api.server_side_apply(
namespace='default', body=pod_manifest,
field_manager='kubernetes-unittests', dry_run="All")
self.assertEqual('kubernetes-unittests', resp.metadata.managedFields[0].manager)
class TestDynamicClientSerialization(unittest.TestCase):
@classmethod
def setUpClass(cls):
config = base.get_e2e_configuration()
cls.client = DynamicClient(api_client.ApiClient(configuration=config))
cls.pod_manifest = {
'apiVersion': 'v1',
'kind': 'Pod',
'metadata': {'name': 'foo-pod'},
'spec': {'containers': [{'name': "main", 'image': "busybox"}]},
}
def test_dict_type(self):
self.assertEqual(self.client.serialize_body(self.pod_manifest), self.pod_manifest)
def test_resource_instance_type(self):
inst = ResourceInstance(self.client, self.pod_manifest)
self.assertEqual(self.client.serialize_body(inst), self.pod_manifest)
def test_resource_field(self):
"""`ResourceField` is a special type which overwrites `__getattr__` method to return `None`
when a non-existent attribute was accessed. which means it can pass any `hasattr(...)` tests.
"""
params = {
"foo": "bar",
"self": True
}
res = ResourceField(params=params)
self.assertEqual(res["foo"], params["foo"])
self.assertEqual(res["self"], params["self"])
self.assertEqual(self.client.serialize_body(res), params)