# Copyright 2018 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import os import unittest from unittest import mock from .config_exception import ConfigException from .exec_provider import ExecProvider from .kube_config import ConfigNode class ExecProviderTest(unittest.TestCase): def setUp(self): self.input_ok = ConfigNode('test', { 'command': 'aws-iam-authenticator', 'args': ['token', '-i', 'dummy'], 'apiVersion': 'client.authentication.k8s.io/v1beta1', 'env': None }) self.input_with_cluster = ConfigNode('test', { 'command': 'aws-iam-authenticator', 'args': ['token', '-i', 'dummy'], 'apiVersion': 'client.authentication.k8s.io/v1beta1', 'provideClusterInfo': True, 'env': None }) self.output_ok = """ { "apiVersion": "client.authentication.k8s.io/v1beta1", "kind": "ExecCredential", "status": { "token": "dummy" } } """ def test_missing_input_keys(self): exec_configs = [ConfigNode('test1', {}), ConfigNode('test2', {'command': ''}), ConfigNode('test3', {'apiVersion': ''})] for exec_config in exec_configs: with self.assertRaises(ConfigException) as context: ExecProvider(exec_config, None) self.assertIn('exec: malformed request. missing key', context.exception.args[0]) @mock.patch('subprocess.Popen') def test_error_code_returned(self, mock): instance = mock.return_value instance.wait.return_value = 1 instance.communicate.return_value = ('', '') with self.assertRaises(ConfigException) as context: ep = ExecProvider(self.input_ok, None) ep.run() self.assertIn('exec: process returned %d' % instance.wait.return_value, context.exception.args[0]) @mock.patch('subprocess.Popen') def test_nonjson_output_returned(self, mock): instance = mock.return_value instance.wait.return_value = 0 instance.communicate.return_value = ('', '') with self.assertRaises(ConfigException) as context: ep = ExecProvider(self.input_ok, None) ep.run() self.assertIn('exec: failed to decode process output', context.exception.args[0]) @mock.patch('subprocess.Popen') def test_missing_output_keys(self, mock): instance = mock.return_value instance.wait.return_value = 0 outputs = [ """ { "kind": "ExecCredential", "status": { "token": "dummy" } } """, """ { "apiVersion": "client.authentication.k8s.io/v1beta1", "status": { "token": "dummy" } } """, """ { "apiVersion": "client.authentication.k8s.io/v1beta1", "kind": "ExecCredential" } """ ] for output in outputs: instance.communicate.return_value = (output, '') with self.assertRaises(ConfigException) as context: ep = ExecProvider(self.input_ok, None) ep.run() self.assertIn('exec: malformed response. missing key', context.exception.args[0]) @mock.patch('subprocess.Popen') def test_mismatched_api_version(self, mock): instance = mock.return_value instance.wait.return_value = 0 wrong_api_version = 'client.authentication.k8s.io/v1' output = """ { "apiVersion": "%s", "kind": "ExecCredential", "status": { "token": "dummy" } } """ % wrong_api_version instance.communicate.return_value = (output, '') with self.assertRaises(ConfigException) as context: ep = ExecProvider(self.input_ok, None) ep.run() self.assertIn( 'exec: plugin api version %s does not match' % wrong_api_version, context.exception.args[0]) @mock.patch('subprocess.Popen') def test_ok_01(self, mock): instance = mock.return_value instance.wait.return_value = 0 instance.communicate.return_value = (self.output_ok, '') ep = ExecProvider(self.input_ok, None) result = ep.run() self.assertTrue(isinstance(result, dict)) self.assertTrue('token' in result) @mock.patch('subprocess.Popen') def test_run_in_dir(self, mock): instance = mock.return_value instance.wait.return_value = 0 instance.communicate.return_value = (self.output_ok, '') ep = ExecProvider(self.input_ok, '/some/directory') ep.run() self.assertEqual(mock.call_args[1]['cwd'], '/some/directory') @mock.patch('subprocess.Popen') def test_ok_no_console_attached(self, mock): instance = mock.return_value instance.wait.return_value = 0 instance.communicate.return_value = (self.output_ok, '') mock_stdout = unittest.mock.patch( 'sys.stdout', new=None) # Simulate detached console with mock_stdout: ep = ExecProvider(self.input_ok, None) result = ep.run() self.assertTrue(isinstance(result, dict)) self.assertTrue('token' in result) @mock.patch('subprocess.Popen') def test_with_cluster_info(self, mock): instance = mock.return_value instance.wait.return_value = 0 instance.communicate.return_value = (self.output_ok, '') ep = ExecProvider(self.input_with_cluster, None, {'server': 'name.company.com'}) result = ep.run() self.assertTrue(isinstance(result, dict)) self.assertTrue('token' in result) obj = json.loads(mock.call_args.kwargs['env']['KUBERNETES_EXEC_INFO']) self.assertEqual(obj['spec']['cluster']['server'], 'name.company.com') if __name__ == '__main__': unittest.main()