Source code for omniduct.filesystems.webhdfs

import posixpath
import random
from functools import partial

from interface_meta import override

from .base import FileSystemClient, FileSystemFileDesc
from .local import LocalFsClient

# Python 2 compatibility imports
try:
    FileNotFoundError
except NameError:
    FileNotFoundError = IOError


[docs]class WebHdfsClient(FileSystemClient): """ This Duct connects to an Apache WebHDFS server using the `pywebhdfs` library. Attributes: namenodes (list<str>): A list of hosts that are acting as namenodes for the HDFS cluster in form "<hostname>:<port>". """ PROTOCOLS = ['webhdfs'] DEFAULT_PORT = 50070 @override def _init(self, namenodes=None, auto_conf=False, auto_conf_cluster=None, auto_conf_path=None, **kwargs): """ namenodes (list<str>): A list of hosts that are acting as namenodes for the HDFS cluster in form "<hostname>:<port>". auto_conf (bool): Whether to automatically extract host, port and namenode information from Cloudera configuration files. If True, automatically extracted values will override other passed values. auto_conf_cluster (str): The name of the cluster for which to extract configuration. auto_conf_path (str): The path of the `hdfs-site.xml` file in which the HDFS configuration is stored (on the remote filesystem if `remote` is specified, and on the local filesystem otherwise). Defaults to '/etc/hadoop/conf.cloudera.hdfs2/hdfs-site.xml'. **kwargs (dict): Additional arguments to pass onto the WebHdfs client. """ self.namenodes = namenodes if auto_conf: from ._webhdfs_helpers import CdhHdfsConfParser assert auto_conf_cluster is not None, "You must specify a cluster via `auto_conf_cluster` for auto-detection to work." def get_host_and_set_namenodes(duct, cluster, conf_path): conf_parser = CdhHdfsConfParser(duct.remote or LocalFsClient(), conf_path=conf_path) duct.namenodes = conf_parser.namenodes(cluster) return random.choice(duct.namenodes) self._host = partial(get_host_and_set_namenodes, cluster=auto_conf_cluster, conf_path=auto_conf_path) elif not self._host and namenodes: self._host = random.choice(self.namenodes) self.__webhdfs = None self.__webhdfs_kwargs = kwargs self.prepared_fields += ('namenodes',) @override def _connect(self): from ._webhdfs_helpers import OmniductPyWebHdfsClient self.__webhdfs = OmniductPyWebHdfsClient( host=self._host, port=self._port, remote=self.remote, namenodes=self.namenodes, user_name=self.username, **self.__webhdfs_kwargs ) @override def _is_connected(self): try: if self.remote and not self.remote.is_connected(): return False return self.__webhdfs is not None except: return False @override def _disconnect(self): self.__webhdfs = None # Path properties and helpers @override def _path_home(self): return self.__webhdfs.get_home_directory() @override def _path_separator(self): return '/' # File node properties @override def _exists(self, path): from pywebhdfs.errors import FileNotFound try: self.__webhdfs.get_file_dir_status(path) return True except FileNotFound: return False @override def _isdir(self, path): from pywebhdfs.errors import FileNotFound try: stats = self.__webhdfs.get_file_dir_status(path) return stats['FileStatus']['type'] == 'DIRECTORY' except FileNotFound: return False @override def _isfile(self, path): from pywebhdfs.errors import FileNotFound try: stats = self.__webhdfs.get_file_dir_status(path) return stats['FileStatus']['type'] == 'FILE' except FileNotFound: return False # Directory handling and enumeration @override def _dir(self, path): files = self.__webhdfs.list_dir(path) for f in files['FileStatuses']['FileStatus']: yield FileSystemFileDesc( fs=self, path=posixpath.join(path, f['pathSuffix']), name=f['pathSuffix'], type=f['type'].lower(), bytes=f['length'], owner=f['owner'], group=f['group'], last_modified=f['modificationTime'], last_accessed=f['accessTime'], permissions=f['permission'], replication=f['replication'] ) @override def _mkdir(self, path, recursive, exist_ok): if not recursive and not self._isdir(self.path_basename(path)): raise IOError("No parent directory found for {}.".format(path)) if not exist_ok and self._exists(path): raise IOError("Path already exists at {}.".format(path)) self.__webhdfs.make_dir(path) @override def _remove(self, path, recursive): self.__webhdfs.delete_file_dir(path, recursive) # File handling @override def _file_read_(self, path, size=-1, offset=0, binary=False): if not self.isfile(path): raise FileNotFoundError("File `{}` does not exist.".format(path)) read = self.__webhdfs.read_file(path, offset=offset, length='null' if size < 0 else size) if not binary: read = read.decode('utf-8') return read @override def _file_append_(self, path, s, binary): return self.__webhdfs.append_file(path, s) @override def _file_write_(self, path, s, binary): return self.__webhdfs.create_file(path, s, overwrite=True)