# Copyright 2016 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import pydoc import sys from kubernetes import client PYDOC_RETURN_LABEL = ":return:" PYDOC_FOLLOW_PARAM = ":param bool follow:" # Removing this suffix from return type name should give us event's object # type. e.g., if list_namespaces() returns "NamespaceList" type, # then list_namespaces(watch=true) returns a stream of events with objects # of type "Namespace". In case this assumption is not true, user should # provide return_type to Watch class's __init__. TYPE_LIST_SUFFIX = "List" PY2 = sys.version_info[0] == 2 if PY2: import httplib HTTP_STATUS_GONE = httplib.GONE else: import http HTTP_STATUS_GONE = http.HTTPStatus.GONE class SimpleNamespace: def __init__(self, **kwargs): self.__dict__.update(kwargs) def _find_return_type(func): for line in pydoc.getdoc(func).splitlines(): if line.startswith(PYDOC_RETURN_LABEL): return line[len(PYDOC_RETURN_LABEL):].strip() return "" def iter_resp_lines(resp): buffer = bytearray() for segment in resp.stream(amt=None, decode_content=False): # Append the segment (chunk) to the buffer # # Performance note: depending on contents of buffer and the type+value of segment, # encoding segment into the buffer could be a wasteful step. The approach used here # simplifies the logic farther down, but in the future it may be reasonable to # sacrifice readability for performance. if isinstance(segment, bytes): buffer.extend(segment) elif isinstance(segment, str): buffer.extend(segment.encode("utf-8")) else: raise TypeError( f"Received invalid segment type, {type(segment)}, from stream. Accepts only 'str' or 'bytes'.") # Split by newline (safe for utf-8 because multi-byte sequences cannot contain the newline byte) next_newline = buffer.find(b'\n') while next_newline != -1: # Convert bytes to a valid utf-8 string, replacing any invalid utf-8 with the '�' character line = buffer[:next_newline].decode( "utf-8", errors="replace") buffer = buffer[next_newline+1:] if line: yield line next_newline = buffer.find(b'\n') class Watch(object): def __init__(self, return_type=None): self._raw_return_type = return_type self._stop = False self._api_client = client.ApiClient() self.resource_version = None def stop(self): self._stop = True def get_return_type(self, func): if self._raw_return_type: return self._raw_return_type return_type = _find_return_type(func) if return_type.endswith(TYPE_LIST_SUFFIX): return return_type[:-len(TYPE_LIST_SUFFIX)] return return_type def get_watch_argument_name(self, func): if PYDOC_FOLLOW_PARAM in pydoc.getdoc(func): return 'follow' else: return 'watch' def unmarshal_event(self, data, return_type): js = json.loads(data) js['raw_object'] = js['object'] # BOOKMARK event is treated the same as ERROR for a quick fix of # decoding exception # TODO: make use of the resource_version in BOOKMARK event for more # efficient WATCH if return_type and js['type'] != 'ERROR' and js['type'] != 'BOOKMARK': obj = SimpleNamespace(data=json.dumps(js['raw_object'])) js['object'] = self._api_client.deserialize(obj, return_type) if hasattr(js['object'], 'metadata'): self.resource_version = js['object'].metadata.resource_version # For custom objects that we don't have model defined, json # deserialization results in dictionary elif (isinstance(js['object'], dict) and 'metadata' in js['object'] and 'resourceVersion' in js['object']['metadata']): self.resource_version = js['object']['metadata'][ 'resourceVersion'] return js def stream(self, func, *args, **kwargs): """Watch an API resource and stream the result back via a generator. Note that watching an API resource can expire. The method tries to resume automatically once from the last result, but if that last result is too old as well, an `ApiException` exception will be thrown with ``code`` 410. In that case you have to recover yourself, probably by listing the API resource to obtain the latest state and then watching from that state on by setting ``resource_version`` to one returned from listing. :param func: The API function pointer. Any parameter to the function can be passed after this parameter. :return: Event object with these keys: 'type': The type of event such as "ADDED", "DELETED", etc. 'raw_object': a dict representing the watched object. 'object': A model representation of raw_object. The name of model will be determined based on the func's doc string. If it cannot be determined, 'object' value will be the same as 'raw_object'. Example: v1 = kubernetes.client.CoreV1Api() watch = kubernetes.watch.Watch() for e in watch.stream(v1.list_namespace, resource_version=1127): type_ = e['type'] object_ = e['object'] # object is one of type return_type raw_object = e['raw_object'] # raw_object is a dict ... if should_stop: watch.stop() """ self._stop = False return_type = self.get_return_type(func) watch_arg = self.get_watch_argument_name(func) kwargs[watch_arg] = True kwargs['_preload_content'] = False if 'resource_version' in kwargs: self.resource_version = kwargs['resource_version'] # Do not attempt retries if user specifies a timeout. # We want to ensure we are returning within that timeout. disable_retries = ('timeout_seconds' in kwargs) retry_after_410 = False while True: resp = func(*args, **kwargs) try: for line in iter_resp_lines(resp): # unmarshal when we are receiving events from watch, # return raw string when we are streaming log if watch_arg == "watch": event = self.unmarshal_event(line, return_type) if isinstance(event, dict) \ and event['type'] == 'ERROR': obj = event['raw_object'] # Current request expired, let's retry, (if enabled) # but only if we have not already retried. if not disable_retries and not retry_after_410 and \ obj['code'] == HTTP_STATUS_GONE: retry_after_410 = True break else: reason = "%s: %s" % ( obj['reason'], obj['message']) raise client.rest.ApiException( status=obj['code'], reason=reason) else: retry_after_410 = False yield event else: yield line if self._stop: break finally: resp.close() resp.release_conn() if self.resource_version is not None: kwargs['resource_version'] = self.resource_version else: self._stop = True if self._stop or disable_retries: break