From c1c9a1d6635295cbeff68e9e059207422c533fc3 Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Thu, 28 Jul 2016 13:43:53 +0100 Subject: [PATCH 01/15] Get consul running on acceptance test clusters --- .../installation/enabling-control-service.rst | 9 ++ docs/installation/install-node.rst | 12 +++ flocker/provision/_install.py | 99 +++++++++++++------ flocker/provision/_tasks.py | 4 + 4 files changed, 96 insertions(+), 28 deletions(-) diff --git a/docs/installation/enabling-control-service.rst b/docs/installation/enabling-control-service.rst index 4af81d5d5b..b290d7a4ec 100644 --- a/docs/installation/enabling-control-service.rst +++ b/docs/installation/enabling-control-service.rst @@ -53,6 +53,15 @@ On AWS, an external firewall is used instead, which will need to be configured s Ubuntu 16.04 ============ +In a previous step we started a ``consul`` "server" on each of the nodes. +Now we will run ``consul join`` on one of the nodes, in order to introduce all the members of the consul cluster. + +.. task:: consul_join 192.0.2.101 192.0.2.102 192.0.2.103 + :prompt: [root@control-node]# + +With the consul database running we can now start the control service. +It will create a unique Flocker key in the database and store the Flocker configuration there. + .. task:: enable_flocker_control ubuntu-16.04 :prompt: [root@control-node]# diff --git a/docs/installation/install-node.rst b/docs/installation/install-node.rst index 7d8a174b91..c5920c53c0 100644 --- a/docs/installation/install-node.rst +++ b/docs/installation/install-node.rst @@ -169,6 +169,18 @@ Installing on Ubuntu 16.04 .. XXX FLOC-3454 to create a task directive for installing the plugin +#. **Install** ``consul`` **(or a supported distributed key / value database):** + + Launch ``consul`` on each of the nodes by running the official `Hashicorp Consul Docker image `_. + + * Supply the number of ``consul`` servers that you expect to run in your cluster. + XXX: Should probably run up to 5 consul servers and agents on the rest. + + * Supply the local IP address which ``consul`` will use when connecting to other servers in the cluster. + + .. task:: consul_start 3 192.0.2.100 + :prompt: [root@ubuntu]# + #. **Repeat the previous steps for all other nodes:** Log into your other nodes as root, and then complete step 2 and 3 until all the nodes in your cluster have installed the ``clusterhq-flocker-node`` and the optional ``clusterhq-flocker-docker-plugin`` package. diff --git a/flocker/provision/_install.py b/flocker/provision/_install.py index 06e83db983..7294232f56 100644 --- a/flocker/provision/_install.py +++ b/flocker/provision/_install.py @@ -1429,6 +1429,38 @@ def task_install_docker_plugin( ) +def task_consul_start(bootstrap_expect, advertise_address): + """ + Install consul, + """ + return sequence([ + run_from_args(['docker', 'pull', 'consul']), + run_from_args([ + 'docker', 'run', + '--detach', + '--name', 'consul_server', + '--net', 'host', + + 'consul', 'agent', + '-server', + '-bootstrap-expect', str(bootstrap_expect), + '-advertise', advertise_address, + ]), + ]) + + +def task_consul_join(*consul_addresses): + """ + Join up the consul cluster. + """ + return sequence([ + run_from_args(( + 'docker', 'run', + '--rm', '--net', 'host', + 'consul', 'join') + consul_addresses), + ]) + + ACCEPTANCE_IMAGES = [ "postgres:latest", "clusterhq/mongodb:latest", @@ -1593,7 +1625,7 @@ def configure_cluster( ), ]) for certnkey, node in zip(cluster.certificates.nodes, cluster.agent_nodes) - ]) + ]), ]) @@ -1765,36 +1797,47 @@ def configure_node( if provider == "managed": setup_action = 'restart' + tasks = [ + task_install_node_certificates( + cluster.certificates.cluster.certificate, + certnkey.certificate, + certnkey.key), + task_install_api_certificates( + cluster.certificates.user.certificate, + cluster.certificates.user.key), + task_enable_docker(node.distribution), + if_firewall_available( + node.distribution, + open_firewall_for_docker_api(node.distribution), + ), + task_configure_flocker_agent( + control_node=cluster.control_node.address, + dataset_backend=cluster.dataset_backend, + dataset_backend_configuration=( + dataset_backend_configuration + ), + logging_config=logging_config, + ), + task_enable_docker_plugin(node.distribution), + task_enable_flocker_agent( + distribution=node.distribution, + action=setup_action, + ), + task_consul_start( + bootstrap_expect=min(3, len(cluster.all_nodes)), + advertise_address=node.address, + ), + ] + + if node is not cluster.control_node: + tasks.append( + task_consul_join(cluster.control_node.address), + ) + return run_remotely( username='root', address=node.address, - commands=sequence([ - task_install_node_certificates( - cluster.certificates.cluster.certificate, - certnkey.certificate, - certnkey.key), - task_install_api_certificates( - cluster.certificates.user.certificate, - cluster.certificates.user.key), - task_enable_docker(node.distribution), - if_firewall_available( - node.distribution, - open_firewall_for_docker_api(node.distribution), - ), - task_configure_flocker_agent( - control_node=cluster.control_node.address, - dataset_backend=cluster.dataset_backend, - dataset_backend_configuration=( - dataset_backend_configuration - ), - logging_config=logging_config, - ), - task_enable_docker_plugin(node.distribution), - task_enable_flocker_agent( - distribution=node.distribution, - action=setup_action, - ), - ]), + commands=sequence(tasks), ) diff --git a/flocker/provision/_tasks.py b/flocker/provision/_tasks.py index da729324a7..bc033863f4 100644 --- a/flocker/provision/_tasks.py +++ b/flocker/provision/_tasks.py @@ -18,6 +18,8 @@ task_enable_flocker_agent, task_open_control_firewall, task_install_zfs, + task_consul_start, + task_consul_join, ) __all__ = [ @@ -34,4 +36,6 @@ 'task_enable_flocker_agent', 'task_open_control_firewall', 'task_install_zfs', + 'task_consul_start', + 'task_consul_join', ] From 0eb683736890be9a2685bb4cf3a899c0ea5726a9 Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Thu, 28 Jul 2016 16:51:12 +0100 Subject: [PATCH 02/15] A simpler way to bootstrap the consul cluster. This works with the add-cluster-node script. --- .../installation/enabling-control-service.rst | 7 +-- docs/installation/install-node.rst | 19 +++++-- flocker/provision/_install.py | 53 +++++++++++-------- 3 files changed, 47 insertions(+), 32 deletions(-) diff --git a/docs/installation/enabling-control-service.rst b/docs/installation/enabling-control-service.rst index b290d7a4ec..5799814fc4 100644 --- a/docs/installation/enabling-control-service.rst +++ b/docs/installation/enabling-control-service.rst @@ -53,12 +53,7 @@ On AWS, an external firewall is used instead, which will need to be configured s Ubuntu 16.04 ============ -In a previous step we started a ``consul`` "server" on each of the nodes. -Now we will run ``consul join`` on one of the nodes, in order to introduce all the members of the consul cluster. - -.. task:: consul_join 192.0.2.101 192.0.2.102 192.0.2.103 - :prompt: [root@control-node]# - +In a previous step we started a ``consul`` "server" on each of the nodes and introduced them to form a cluster. With the consul database running we can now start the control service. It will create a unique Flocker key in the database and store the Flocker configuration there. diff --git a/docs/installation/install-node.rst b/docs/installation/install-node.rst index c5920c53c0..5e3aafa6d4 100644 --- a/docs/installation/install-node.rst +++ b/docs/installation/install-node.rst @@ -173,12 +173,23 @@ Installing on Ubuntu 16.04 Launch ``consul`` on each of the nodes by running the official `Hashicorp Consul Docker image `_. - * Supply the number of ``consul`` servers that you expect to run in your cluster. - XXX: Should probably run up to 5 consul servers and agents on the rest. - * Supply the local IP address which ``consul`` will use when connecting to other servers in the cluster. - .. task:: consul_start 3 192.0.2.100 + * On the first ``consul`` server, add the ``-bootstrap`` argument. + This will be the server to which you join all the subsequent servers in the cluster. + + .. task:: consul_start True 192.0.2.101 + :prompt: [root@ubuntu]# + + * On all subsequent nodes run the ``consul`` server, without the ``-bootstrap`` argument. + And join the server to the first by running ``consul join`` as follows: + + .. task:: consul_start True 192.0.2.102 + :prompt: [root@ubuntu]# + + Now we will run ``consul join`` in order to introduce this server to the bootstrap server and hence all other members of the consul cluster. + + .. task:: consul_join 192.0.2.101 :prompt: [root@ubuntu]# #. **Repeat the previous steps for all other nodes:** diff --git a/flocker/provision/_install.py b/flocker/provision/_install.py index 7294232f56..c7efdf7d96 100644 --- a/flocker/provision/_install.py +++ b/flocker/provision/_install.py @@ -1429,35 +1429,37 @@ def task_install_docker_plugin( ) -def task_consul_start(bootstrap_expect, advertise_address): +def task_consul_start(bootstrap, advertise_address): """ - Install consul, + Start consul. """ + consul_server_command = [ + 'docker', 'run', + '--detach', + '--name', 'consul_server', + '--net', 'host', + + 'consul', 'agent', + '-server', + '-advertise', advertise_address, + ] + if bootstrap: + consul_server_command.append('-bootstrap') return sequence([ run_from_args(['docker', 'pull', 'consul']), - run_from_args([ - 'docker', 'run', - '--detach', - '--name', 'consul_server', - '--net', 'host', - - 'consul', 'agent', - '-server', - '-bootstrap-expect', str(bootstrap_expect), - '-advertise', advertise_address, - ]), + run_from_args(consul_server_command), ]) -def task_consul_join(*consul_addresses): +def task_consul_join(bootstrap_address): """ Join up the consul cluster. """ return sequence([ - run_from_args(( + run_from_args([ 'docker', 'run', '--rm', '--net', 'host', - 'consul', 'join') + consul_addresses), + 'consul', 'join', bootstrap_address]), ]) @@ -1823,16 +1825,23 @@ def configure_node( distribution=node.distribution, action=setup_action, ), - task_consul_start( - bootstrap_expect=min(3, len(cluster.all_nodes)), - advertise_address=node.address, - ), ] - if node is not cluster.control_node: + if node is cluster.control_node: tasks.append( - task_consul_join(cluster.control_node.address), + task_consul_start( + bootstrap=True, + advertise_address=node.address, + ) ) + else: + tasks.extend([ + task_consul_start( + bootstrap=False, + advertise_address=node.address, + ), + task_consul_join(cluster.control_node.address), + ]) return run_remotely( username='root', From b4b5bdb7dba56d5ec60e8eb5c62446befa6fe046 Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Fri, 29 Jul 2016 17:01:02 +0100 Subject: [PATCH 03/15] Factor out the configuration store operations so that I can implement a consul store. --- flocker/control/_persistence.py | 250 +++++++++++++++-------- flocker/control/script.py | 53 +++-- flocker/control/test/test_httpapi.py | 10 +- flocker/control/test/test_persistence.py | 77 ++++--- flocker/control/testtools.py | 9 +- 5 files changed, 260 insertions(+), 139 deletions(-) diff --git a/flocker/control/_persistence.py b/flocker/control/_persistence.py index 42a0cc513b..539a541006 100644 --- a/flocker/control/_persistence.py +++ b/flocker/control/_persistence.py @@ -13,16 +13,19 @@ from collections import Set, Mapping, Iterable from eliot import Logger, write_traceback, MessageType, Field, ActionType +from eliot.twisted import DeferredContext -from pyrsistent import PRecord, PVector, PMap, PSet, pmap, PClass +from pyrsistent import PRecord, PVector, PMap, PSet, pmap, PClass, field from pytz import UTC from twisted.python.filepath import FilePath from twisted.application.service import Service, MultiService -from twisted.internet.defer import succeed +from twisted.internet.defer import succeed, maybeDeferred from twisted.internet.task import LoopingCall +from zope.interface import Interface, implementer + from weakref import WeakKeyDictionary from ._model import ( @@ -585,6 +588,86 @@ def update_leases(transform, persistence_service): return succeed(new_leases) +class IConfigurationStore(Interface): + """ + """ + def get_content(): + """ + """ + + def set_content(): + """ + """ + + +def _process_v1_config(directory, config_path): + """ + Check if a v1 configuration file exists and upgrade it if necessary. + After upgrade, the v1 configuration file is retained with an archived + file name, which ensures the data is not lost but we do not override + a newer configuration version next time the service starts. + """ + v1_config_path = directory.child(b"current_configuration.v1.json") + v1_archived_path = directory.child(b"current_configuration.v1.old.json") + # Check for a v1 config and upgrade to latest if found. + if v1_config_path.exists(): + v1_json = v1_config_path.getContent() + with _LOG_UPGRADE(configuration=v1_json, + source_version=1, + target_version=_CONFIG_VERSION): + updated_json = migrate_configuration( + 1, _CONFIG_VERSION, v1_json, + ConfigurationMigration + ) + config_path.setContent(updated_json) + v1_config_path.moveTo(v1_archived_path) + + +@implementer(IConfigurationStore) +class FilePathConfigurationStore(PClass): + path = field(mandatory=True) + + @classmethod + def from_directory(cls, directory): + if not directory.exists(): + directory.makedirs() + path = directory.child("current_configuration.json") + if not path.exists(): + path.touch() + # Version 1 configurations are a special case. They do not store + # any version information in the configuration data itself, rather they + # can only be identified by the use of the file name + # current_configuration.v1.json + # Therefore we check for a version 1 configuration file and if it is + # found, the config is upgraded, written to current_configuration.json + # and the old file archived as current_configuration.v1.old.json + _process_v1_config(directory, path) + return cls(path=path) + + def get_content_sync(self): + return self.path.getContent() + + def get_content(self): + return succeed(self.get_content_sync()) + + def set_content(self, content): + return succeed(self.path.setContent(content)) + + +def load_and_upgrade(config_json): + config_dict = loads(config_json) + config_version = config_dict['version'] + if config_version < _CONFIG_VERSION: + with _LOG_UPGRADE(configuration=config_json, + source_version=config_version, + target_version=_CONFIG_VERSION): + config_json = migrate_configuration( + config_version, _CONFIG_VERSION, + config_json, ConfigurationMigration) + config = wire_decode(config_json) + return config.deployment + + class ConfigurationPersistenceService(MultiService): """ Persist configuration to disk, and load it back. @@ -593,53 +676,35 @@ class ConfigurationPersistenceService(MultiService): :ivar bytes _hash: A SHA256 hash of the configuration. """ logger = Logger() + _deployment = None + _hash = None - def __init__(self, reactor, path): + def __init__(self, reactor, configuration_saver=None, + initial_deployment=None): """ :param reactor: Reactor to use for thread pool. - :param FilePath path: Directory where desired deployment will be - persisted. """ MultiService.__init__(self) - self._path = path - self._config_path = self._path.child(b"current_configuration.json") + if configuration_saver is None: + configuration_saver = lambda deployment_data: None + self._configuration_save = configuration_saver self._change_callbacks = [] + if initial_deployment is None: + initial_deployment = Deployment() + initial_deployment_data = self._encode_deployment(initial_deployment) + self._hash = self._hash_deployment_data(initial_deployment_data) + self._deployment = initial_deployment LeaseService(reactor, self).setServiceParent(self) def startService(self): - if not self._path.exists(): - self._path.makedirs() - self.load_configuration() + # Register the flocker-control service on this node + # curl -X PUT + # -d '{"Name": "flocker-control", + # "Check": {"tcp": "localhost:4523", + # "interval": "10s", "timeout": "1s"}}' + # http://localhost:8500/v1/agent/service/register MultiService.startService(self) - _LOG_STARTUP(configuration=self.get()).write(self.logger) - - def _process_v1_config(self, file_name, archive_name): - """ - Check if a v1 configuration file exists and upgrade it if necessary. - After upgrade, the v1 configuration file is retained with an archived - file name, which ensures the data is not lost but we do not override - a newer configuration version next time the service starts. - - :param bytes file_name: The expected file name of a version 1 - configuration. - :param bytes archive_name: The file name to which a version 1 - configuration should be moved after it has been processed. - """ - v1_config_path = self._path.child(file_name) - v1_archived_path = self._path.child(archive_name) - # Check for a v1 config and upgrade to latest if found. - if v1_config_path.exists(): - v1_json = v1_config_path.getContent() - with _LOG_UPGRADE(self.logger, - configuration=v1_json, - source_version=1, - target_version=_CONFIG_VERSION): - updated_json = migrate_configuration( - 1, _CONFIG_VERSION, v1_json, - ConfigurationMigration - ) - self._config_path.setContent(updated_json) - v1_config_path.moveTo(v1_archived_path) + _LOG_STARTUP(configuration=self.get()).write() def configuration_hash(self): """ @@ -647,43 +712,46 @@ def configuration_hash(self): """ return self._hash - def load_configuration(self): + @classmethod + def from_json_bytes(cls, reactor, json_bytes, configuration_saver): + if json_bytes: + initial_deployment = load_and_upgrade(json_bytes) + else: + initial_deployment = None + + return cls( + reactor=reactor, + configuration_saver=configuration_saver, + initial_deployment=initial_deployment, + ) + + @classmethod + def from_configuration_store(cls, reactor, configuration_store): """ Load the persisted configuration, upgrading the configuration format if an older version is detected. """ - # Version 1 configurations are a special case. They do not store - # any version information in the configuration data itself, rather they - # can only be identified by the use of the file name - # current_configuration.v1.json - # Therefore we check for a version 1 configuration file and if it is - # found, the config is upgraded, written to current_configuration.json - # and the old file archived as current_configuration.v1.old.json - self._process_v1_config( - file_name=b"current_configuration.v1.json", - archive_name=b"current_configuration.v1.old.json" - ) + d = configuration_store.get_content() - # We can now safely attempt to detect and process a >v1 configuration - # file as normal. - if self._config_path.exists(): - config_json = self._config_path.getContent() - config_dict = loads(config_json) - config_version = config_dict['version'] - if config_version < _CONFIG_VERSION: - with _LOG_UPGRADE(self.logger, - configuration=config_json, - source_version=config_version, - target_version=_CONFIG_VERSION): - config_json = migrate_configuration( - config_version, _CONFIG_VERSION, - config_json, ConfigurationMigration) - config = wire_decode(config_json) - self._deployment = config.deployment - self._sync_save(config.deployment) - else: - self._deployment = Deployment() - self._sync_save(self._deployment) + def load(json_bytes): + return cls.from_json_bytes( + reactor=reactor, + json_bytes=json_bytes, + configuration_saver=configuration_store.set_content, + ) + d.addCallback(load) + return d + + @classmethod + def from_directory(cls, reactor, directory): + configuration_store = FilePathConfigurationStore.from_directory( + directory + ) + return cls.from_json_bytes( + reactor=reactor, + json_bytes=configuration_store.get_content_sync(), + configuration_saver=configuration_store.set_content, + ) def register(self, change_callback): """ @@ -694,14 +762,15 @@ def register(self, change_callback): """ self._change_callbacks.append(change_callback) - def _sync_save(self, deployment): - """ - Save and flush new configuration to disk synchronously. - """ - config = Configuration(version=_CONFIG_VERSION, deployment=deployment) - data = wire_encode(config) - self._hash = b16encode(mmh3_hash_bytes(data)).lower() - self._config_path.setContent(data) + def _encode_deployment(self, deployment): + config = Configuration( + version=_CONFIG_VERSION, + deployment=deployment + ) + return wire_encode(config) + + def _hash_deployment_data(self, deployment_data): + return b16encode(mmh3_hash_bytes(deployment_data)).lower() def save(self, deployment): """ @@ -710,12 +779,10 @@ def save(self, deployment): :return Deferred: Fires when write is finished. """ if deployment == self._deployment: - _LOG_UNCHANGED_DEPLOYMENT_NOT_SAVED().write(self.logger) + _LOG_UNCHANGED_DEPLOYMENT_NOT_SAVED().write() return succeed(None) - with _LOG_SAVE(self.logger, configuration=deployment): - self._sync_save(deployment) - self._deployment = deployment + def finish(ignored): # At some future point this will likely involve talking to a # distributed system (e.g. ZooKeeper or etcd), so the API doesn't # guarantee immediate saving of the data. @@ -725,9 +792,24 @@ def save(self, deployment): except: # Second argument will be ignored in next Eliot release, so # not bothering with particular value. - write_traceback(self.logger, u"") + write_traceback() return succeed(None) + with _LOG_SAVE(configuration=deployment) as action: + deployment_data = self._encode_deployment(deployment) + self._hash = self._hash_deployment_data(deployment_data) + self._deployment = deployment + d = maybeDeferred( + self._configuration_save, + deployment_data + ) + + with action.context(): + d = DeferredContext(d) + d.addCallback(finish) + d.addActionFinish() + return d.result + def get(self): """ Retrieve current configuration. diff --git a/flocker/control/script.py b/flocker/control/script.py index 7cf45c606c..ee53dd2edb 100644 --- a/flocker/control/script.py +++ b/flocker/control/script.py @@ -12,12 +12,16 @@ from twisted.python.usage import Options from twisted.internet.endpoints import serverFromString +from twisted.internet.defer import maybeDeferred from twisted.python.filepath import FilePath from twisted.application.service import MultiService from twisted.internet.ssl import Certificate from .httpapi import create_api_service, REST_API_PORT -from ._persistence import ConfigurationPersistenceService +from ._persistence import ( + ConfigurationPersistenceService, + FilePathConfigurationStore, +) from ._clusterstate import ClusterStateService from ..common.script import ( flocker_standard_options, FlockerScriptRunner, main_for_service, @@ -63,23 +67,36 @@ def main(self, reactor, options): control_credential = ControlCredential.from_path( certificates_path, b"service") - top_service = MultiService() - persistence = ConfigurationPersistenceService( - reactor, options["data-path"]) - persistence.setServiceParent(top_service) - cluster_state = ClusterStateService(reactor) - cluster_state.setServiceParent(top_service) - api_service = create_api_service( - persistence, cluster_state, serverFromString( - reactor, options["port"]), - rest_api_context_factory(ca, control_credential)) - api_service.setServiceParent(top_service) - amp_service = ControlAMPService( - reactor, cluster_state, persistence, serverFromString( - reactor, options["agent-port"]), - amp_server_context_factory(ca, control_credential)) - amp_service.setServiceParent(top_service) - return main_for_service(reactor, top_service) + d = maybeDeferred( + FilePathConfigurationStore.from_directory, + options["data-path"], + ) + + def make_persistence_service(configuration_store): + return ConfigurationPersistenceService.from_configuration_store( + reactor, + configuration_store + ) + d.addCallback(make_persistence_service) + + def start_services(persistence_service): + top_service = MultiService() + persistence_service.setServiceParent(top_service) + cluster_state = ClusterStateService(reactor) + cluster_state.setServiceParent(top_service) + api_service = create_api_service( + persistence_service, cluster_state, serverFromString( + reactor, options["port"]), + rest_api_context_factory(ca, control_credential)) + api_service.setServiceParent(top_service) + amp_service = ControlAMPService( + reactor, cluster_state, persistence_service, serverFromString( + reactor, options["agent-port"]), + amp_server_context_factory(ca, control_credential)) + amp_service.setServiceParent(top_service) + return main_for_service(reactor, top_service) + d.addCallback(start_services) + return d def flocker_control_main(): diff --git a/flocker/control/test/test_httpapi.py b/flocker/control/test/test_httpapi.py index 30ae4d9d2d..96d08bae5c 100644 --- a/flocker/control/test/test_httpapi.py +++ b/flocker/control/test/test_httpapi.py @@ -76,8 +76,10 @@ def initialize(self): """ Create initial objects for the ``ConfigurationAPIUserV1``. """ - self.persistence_service = ConfigurationPersistenceService( - reactor, FilePath(self.mktemp())) + service = ConfigurationPersistenceService.from_directory( + reactor, FilePath(self.mktemp()) + ) + self.persistence_service = service self.persistence_service.startService() self.cluster_state_service = ClusterStateService(Clock()) self.cluster_state_service.startService() @@ -2600,7 +2602,9 @@ def test_returns_service(self): reactor = MemoryReactor() endpoint = TCP4ServerEndpoint(reactor, 6789) verifyObject(IService, create_api_service( - ConfigurationPersistenceService(reactor, FilePath(self.mktemp())), + ConfigurationPersistenceService.from_directory( + reactor, FilePath(self.mktemp()) + ), ClusterStateService(reactor), endpoint, ClientContextFactory())) diff --git a/flocker/control/test/test_persistence.py b/flocker/control/test/test_persistence.py index 85fa06789b..35e207f66e 100644 --- a/flocker/control/test/test_persistence.py +++ b/flocker/control/test/test_persistence.py @@ -12,7 +12,10 @@ from pytz import UTC from eliot.testing import ( - validate_logging, assertHasMessage, assertHasAction, capture_logging) + assertHasMessage, + assertHasAction, + capture_logging +) from hypothesis import given from hypothesis import strategies as st @@ -93,7 +96,8 @@ def setUp(self): super(LeasesTests, self).setUp() self.clock = Clock() self.persistence_service = ConfigurationPersistenceService( - self.clock, FilePath(self.mktemp())) + reactor=self.clock, + ) self.persistence_service.startService() self.addCleanup(self.persistence_service.stopService) @@ -204,7 +208,7 @@ class ConfigurationPersistenceServiceTests(AsyncTestCase): """ Tests for ``ConfigurationPersistenceService``. """ - def service(self, path, logger=None): + def service(self, path): """ Start a service, schedule its stop. @@ -213,9 +217,10 @@ def service(self, path, logger=None): :return: Started ``ConfigurationPersistenceService``. """ - service = ConfigurationPersistenceService(reactor, path) - if logger is not None: - self.patch(service, "logger", logger) + service = ConfigurationPersistenceService.from_directory( + reactor=reactor, + directory=path, + ) service.startService() self.addCleanup(service.stopService) return service @@ -244,10 +249,10 @@ def test_file_is_created(self): self.service(path) self.assertTrue(path.child(b"current_configuration.json").exists()) - @validate_logging(assertHasAction, _LOG_UPGRADE, succeeded=True, - startFields=dict(configuration=V1_TEST_DEPLOYMENT_JSON, - source_version=1, - target_version=_CONFIG_VERSION)) + @capture_logging(assertHasAction, _LOG_UPGRADE, succeeded=True, + startFields=dict(configuration=V1_TEST_DEPLOYMENT_JSON, + source_version=1, + target_version=_CONFIG_VERSION)) def test_v1_file_creates_updated_file(self, logger): """ If a version 1 configuration file exists under name @@ -258,13 +263,13 @@ def test_v1_file_creates_updated_file(self, logger): path.makedirs() v1_config_file = path.child(b"current_configuration.v1.json") v1_config_file.setContent(V1_TEST_DEPLOYMENT_JSON) - self.service(path, logger) + self.service(path) self.assertTrue(path.child(b"current_configuration.json").exists()) - @validate_logging(assertHasAction, _LOG_UPGRADE, succeeded=True, - startFields=dict(configuration=V1_TEST_DEPLOYMENT_JSON, - source_version=1, - target_version=_CONFIG_VERSION)) + @capture_logging(assertHasAction, _LOG_UPGRADE, succeeded=True, + startFields=dict(configuration=V1_TEST_DEPLOYMENT_JSON, + source_version=1, + target_version=_CONFIG_VERSION)) def test_v1_file_archived(self, logger): """ If a version 1 configuration file exists, it is archived with a @@ -275,7 +280,7 @@ def test_v1_file_archived(self, logger): path.makedirs() v1_config_file = path.child(b"current_configuration.v1.json") v1_config_file.setContent(V1_TEST_DEPLOYMENT_JSON) - self.service(path, logger) + self.service(path) self.assertEqual( (True, False), ( @@ -315,33 +320,35 @@ def test_current_configuration_unchanged(self): loaded_configuration = wire_decode(config_path.getContent()) self.assertEqual(loaded_configuration, persisted_configuration) - @validate_logging(assertHasAction, _LOG_SAVE, succeeded=True, - startFields=dict(configuration=LATEST_TEST_DEPLOYMENT)) + @capture_logging(assertHasAction, _LOG_SAVE, succeeded=True, + startFields=dict(configuration=LATEST_TEST_DEPLOYMENT)) def test_save_then_get(self, logger): """ A configuration that was saved can subsequently retrieved. """ - service = self.service(FilePath(self.mktemp()), logger) - logger.reset() + service = self.service(FilePath(self.mktemp())) d = service.save(LATEST_TEST_DEPLOYMENT) d.addCallback(lambda _: service.get()) d.addCallback(self.assertEqual, LATEST_TEST_DEPLOYMENT) return d - @validate_logging(assertHasMessage, _LOG_STARTUP, - fields=dict(configuration=LATEST_TEST_DEPLOYMENT)) + @capture_logging(assertHasMessage, _LOG_STARTUP, + fields=dict(configuration=LATEST_TEST_DEPLOYMENT)) def test_persist_across_restarts(self, logger): """ A configuration that was saved can be loaded from a new service. """ path = FilePath(self.mktemp()) - service = ConfigurationPersistenceService(reactor, path) + service = ConfigurationPersistenceService.from_directory( + reactor, path + ) service.startService() + logger.reset() d = service.save(LATEST_TEST_DEPLOYMENT) d.addCallback(lambda _: service.stopService()) def retrieve_in_new_service(_): - new_service = self.service(path, logger) + new_service = self.service(path) self.assertEqual(new_service.get(), LATEST_TEST_DEPLOYMENT) d.addCallback(retrieve_in_new_service) return d @@ -371,14 +378,14 @@ def saved_again(_): d.addCallback(saved_again) return d - @validate_logging( + @capture_logging( lambda test, logger: test.assertEqual(len(logger.flush_tracebacks(ZeroDivisionError)), 1)) def test_register_for_callback_failure(self, logger): """ Failed callbacks don't prevent later callbacks from being called. """ - service = self.service(FilePath(self.mktemp()), logger) + service = self.service(FilePath(self.mktemp())) callbacks = [] service.register(lambda: 1/0) service.register(lambda: callbacks.append(1)) @@ -389,13 +396,13 @@ def saved(_): d.addCallback(saved) return d - @validate_logging(assertHasMessage, _LOG_UNCHANGED_DEPLOYMENT_NOT_SAVED) + @capture_logging(assertHasMessage, _LOG_UNCHANGED_DEPLOYMENT_NOT_SAVED) def test_callback_not_called_for_unchanged_deployment(self, logger): """ If the old deployment and the new deployment are equivalent, registered callbacks are not called. """ - service = self.service(FilePath(self.mktemp()), logger) + service = self.service(FilePath(self.mktemp())) state = [] @@ -424,7 +431,7 @@ def test_success_returned_for_unchanged_deployment(self): ``save`` returns a ``Deferred`` that fires with ``None`` when called with a deployment that is the same as the already-saved deployment. """ - service = self.service(FilePath(self.mktemp()), None) + service = self.service(FilePath(self.mktemp())) old_saving = service.save(LATEST_TEST_DEPLOYMENT) @@ -456,7 +463,9 @@ def test_hash_on_startup(self): An empty configuration can be hashed. """ path = FilePath(self.mktemp()) - service = ConfigurationPersistenceService(reactor, path) + service = ConfigurationPersistenceService.from_directory( + reactor, path + ) service.startService() self.addCleanup(service.stopService) @@ -468,7 +477,9 @@ def test_hash_on_save(self): The configuration hash changes when a new version is saved. """ path = FilePath(self.mktemp()) - service = ConfigurationPersistenceService(reactor, path) + service = ConfigurationPersistenceService.from_directory( + reactor, path + ) service.startService() self.addCleanup(service.stopService) original = self.get_hash(service) @@ -485,7 +496,9 @@ def test_hash_persists_across_restarts(self): A configuration that was saved can be loaded from a new service. """ path = FilePath(self.mktemp()) - service = ConfigurationPersistenceService(reactor, path) + service = ConfigurationPersistenceService.from_directory( + reactor, path + ) service.startService() self.addCleanup(service.stopService) d = service.save(LATEST_TEST_DEPLOYMENT) diff --git a/flocker/control/testtools.py b/flocker/control/testtools.py index 520b415a80..0fe5e7a824 100644 --- a/flocker/control/testtools.py +++ b/flocker/control/testtools.py @@ -16,7 +16,10 @@ from ..testtools import TestCase from ._clusterstate import ClusterStateService -from ._persistence import ConfigurationPersistenceService +from ._persistence import ( + ConfigurationPersistenceService, + FilePathConfigurationStore +) from ._protocol import ( ControlAMPService, ControlAMP, ) @@ -137,7 +140,9 @@ def build_control_amp_service(test_case, reactor=None): cluster_state.startService() test_case.addCleanup(cluster_state.stopService) persistence_service = ConfigurationPersistenceService( - reactor, test_case.make_temporary_directory()) + reactor=reactor, + configuration_saver=lambda deployment_data: None, + ) persistence_service.startService() test_case.addCleanup(persistence_service.stopService) return ControlAMPService( From 7692dd08852067b63c494cdb34ae3c186743a941 Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Fri, 29 Jul 2016 17:49:49 +0100 Subject: [PATCH 04/15] Start writing and testing a Consul config store --- flocker/control/_consul.py | 28 ++++++++++++++++++++ flocker/control/test/test_consul.py | 40 +++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+) create mode 100644 flocker/control/_consul.py create mode 100644 flocker/control/test/test_consul.py diff --git a/flocker/control/_consul.py b/flocker/control/_consul.py new file mode 100644 index 0000000000..231cef7dc7 --- /dev/null +++ b/flocker/control/_consul.py @@ -0,0 +1,28 @@ +# Copyright ClusterHQ Inc. See LICENSE file for details. + +""" +Persistence of cluster configuration to consul. +""" +from pyrsistent import PClass, field +from treq import json_content, content +import treq +from zope.interface import implementer + +from ._persistence import IConfigurationStore + + +CONFIG_URL = ( + "http://localhost:8500/v1/kv" + "/com.clusterhq/flocker/current_configuration" +) + + +@implementer(IConfigurationStore) +class ConsulConfigurationStore(PClass): + def get_content(self): + d = treq.get(CONFIG_URL) + d.addCallback(json_content) + return d + + def set_content(self, content): + return diff --git a/flocker/control/test/test_consul.py b/flocker/control/test/test_consul.py new file mode 100644 index 0000000000..0bfeb00ce0 --- /dev/null +++ b/flocker/control/test/test_consul.py @@ -0,0 +1,40 @@ +# Copyright ClusterHQ Inc. See LICENSE file for details. +""" +Tests for ``flocker.control._consul``. +""" +from subprocess import check_output +import time + +from ...testtools import AsyncTestCase, random_name + +from .._consul import ConsulConfigurationStore + + +def consul_server_for_test(test_case): + container_name = random_name(test_case) + container_id = check_output([ + 'docker', 'run', + '--detach', + '--net', 'host', + '--name', container_name, + 'consul', + 'agent', + '-advertise', '127.0.0.1', + '-dev' + ]).rstrip() + + test_case.addCleanup( + check_output, + ['docker', 'rm', '--force', container_id] + ) + # XXX Wait for consul port to be listening. + time.sleep(1) + + +class ConsulTests(AsyncTestCase): + def test_get_content_empty(self): + consul_server_for_test(self) + store = ConsulConfigurationStore() + d = store.get_content() + d.addCallback(self.assertEqual, '') + return d From 80b76fa0bfc5d7222fddee1c6c4fbed07acce726 Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Mon, 1 Aug 2016 15:18:15 +0100 Subject: [PATCH 05/15] Some tests and a treq implementation copied from the apiclient package. --- flocker/control/_consul.py | 151 ++++++++++++++++++++++++++-- flocker/control/test/test_consul.py | 51 +++++++++- 2 files changed, 188 insertions(+), 14 deletions(-) diff --git a/flocker/control/_consul.py b/flocker/control/_consul.py index 231cef7dc7..b16ddfb9d0 100644 --- a/flocker/control/_consul.py +++ b/flocker/control/_consul.py @@ -3,26 +3,157 @@ """ Persistence of cluster configuration to consul. """ -from pyrsistent import PClass, field +import base64 + +from eliot import ActionType, Field +from eliot.twisted import DeferredContext from treq import json_content, content import treq +from twisted.web.http import ( + OK, NOT_FOUND +) from zope.interface import implementer from ._persistence import IConfigurationStore -CONFIG_URL = ( - "http://localhost:8500/v1/kv" - "/com.clusterhq/flocker/current_configuration" -) +BASE_URL = b"http://localhost:8500/v1/kv" +CONFIG_PATH = b"/com.clusterhq/flocker/current_configuration" + +_LOG_HTTP_REQUEST = ActionType( + "flocker:control:consul", + [Field.forTypes("url", [bytes, unicode], "Request URL."), + Field.forTypes("method", [bytes, unicode], "Request method."), + Field("request_body", lambda o: o, "Request JSON body.")], + [Field.forTypes("response_code", [int], "Response code."), + Field("response_body", lambda o: o, "JSON response body.")], + "A HTTP request.") + + +class ResponseError(Exception): + """ + An unexpected response from the REST API. + """ + def __init__(self, code, body): + Exception.__init__(self, "Unexpected response code {}:\n{}\n".format( + code, body)) + self.code = code + + +class NotFound(Exception): + """ + Result was not found. + """ @implementer(IConfigurationStore) -class ConsulConfigurationStore(PClass): +class ConsulConfigurationStore(object): + _base_url = BASE_URL + _treq = treq + + def _request_with_headers( + self, method, path, body, success_codes, error_codes=None): + """ + Send a HTTP request to the Flocker API, return decoded JSON body and + headers. + + :param bytes method: HTTP method, e.g. PUT. + :param bytes path: Path to add to base URL. + :param body: If not ``None``, JSON encode this and send as the + body of the request. + :param set success_codes: Expected success response codes. + :param error_codes: Mapping from HTTP response code to exception to be + raised if it is present, or ``None`` to set no errors. + :return: ``Deferred`` firing a tuple of (decoded JSON, + response headers). + """ + url = self._base_url + path + action = _LOG_HTTP_REQUEST(url=url, method=method, request_body=body) + + if error_codes is None: + error_codes = {} + + def error(body, code): + if code in error_codes: + raise error_codes[code](body) + raise ResponseError(code, body) + + def got_response(response): + if response.code in success_codes: + action.addSuccessFields(response_code=response.code) + d = json_content(response) + d.addCallback(lambda decoded_body: + (decoded_body, response.headers)) + return d + else: + d = content(response) + d.addCallback(error, response.code) + return d + headers = {} + data = None + if body is not None: + headers["content-type"] = b"application/json" + data = body + + with action.context(): + request = DeferredContext(self._treq.request( + method, url, data=data, headers=headers, + persistent=False, + )) + request.addCallback(got_response) + + def got_body(result): + action.addSuccessFields(response_body=result[0]) + return result + request.addCallback(got_body) + request.addActionFinish() + return request.result + + def _request(self, *args, **kwargs): + """ + Send a HTTP request to the Flocker API, return decoded JSON body. + + Takes the same arguments as ``_request_with_headers``. + + :return: ``Deferred`` firing with decoded JSON. + """ + return self._request_with_headers(*args, **kwargs).addCallback( + lambda t: t[0]) + + def initialize(self): + """ + """ + d = self.get_content() + + def set_if_missing(failure): + failure.trap(NotFound) + return self.set_content(b"") + d.addErrback(set_if_missing) + return d + def get_content(self): - d = treq.get(CONFIG_URL) - d.addCallback(json_content) + d = self._request( + b"GET", + CONFIG_PATH, + None, + {OK}, + error_codes={NOT_FOUND: NotFound} + ) + + def decode(result): + value = result[0]['Value'] + if value is None: + return b"" + else: + return base64.decodestring(value) + + d.addCallback(decode) return d - def set_content(self, content): - return + def set_content(self, content_bytes): + return self._request( + b"PUT", + CONFIG_PATH, + content_bytes, + success_codes={OK}, + ) diff --git a/flocker/control/test/test_consul.py b/flocker/control/test/test_consul.py index 0bfeb00ce0..ebf9437917 100644 --- a/flocker/control/test/test_consul.py +++ b/flocker/control/test/test_consul.py @@ -7,7 +7,7 @@ from ...testtools import AsyncTestCase, random_name -from .._consul import ConsulConfigurationStore +from .._consul import ConsulConfigurationStore, NotFound def consul_server_for_test(test_case): @@ -28,13 +28,56 @@ def consul_server_for_test(test_case): ['docker', 'rm', '--force', container_id] ) # XXX Wait for consul port to be listening. - time.sleep(1) + time.sleep(2) class ConsulTests(AsyncTestCase): - def test_get_content_empty(self): + def setUp(self): + super(ConsulTests, self).setUp() consul_server_for_test(self) + + def test_uninitialized(self): + """ + ``get_content`` raises ``NotFound`` if the configuration store key does + not exist. + """ store = ConsulConfigurationStore() d = store.get_content() - d.addCallback(self.assertEqual, '') + d = self.assertFailure(d, NotFound) + return d + + def test_initialize_empty(self): + """ + ``initialize`` creates the key with an empty value. + """ + store = ConsulConfigurationStore() + d = store.initialize() + d.addCallback(lambda ignored: store.get_content()) + d.addCallback(self.assertEqual, b"") + return d + + def test_set_and_get(self): + """ + ``set_content`` sets the value and the value can be retrieved by + ``get_content``. + """ + expected_value = random_name(self).encode('utf8') + store = ConsulConfigurationStore() + d = store.initialize() + d.addCallback(lambda ignored: store.set_content(expected_value)) + d.addCallback(lambda ignored: store.get_content()) + d.addCallback(self.assertEqual, expected_value) + return d + + def test_initialize_non_empty(self): + """ + ``initialize`` does not overwrite an existing value. + """ + expected_value = random_name(self).encode('utf8') + store = ConsulConfigurationStore() + d = store.initialize() + d.addCallback(lambda ignored: store.set_content(expected_value)) + d.addCallback(lambda ignored: store.initialize()) + d.addCallback(lambda ignored: store.get_content()) + d.addCallback(self.assertEqual, expected_value) return d From 277a3bed5e4bc81f3de9504bfcadb420fc96aaab Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Mon, 1 Aug 2016 16:40:13 +0100 Subject: [PATCH 06/15] loop until the consul server is 'ready' --- flocker/control/_consul.py | 36 ++++++++++++++++----- flocker/control/test/test_consul.py | 49 +++++++++++++++++------------ 2 files changed, 57 insertions(+), 28 deletions(-) diff --git a/flocker/control/_consul.py b/flocker/control/_consul.py index b16ddfb9d0..ec7760f0b9 100644 --- a/flocker/control/_consul.py +++ b/flocker/control/_consul.py @@ -7,6 +7,7 @@ from eliot import ActionType, Field from eliot.twisted import DeferredContext +from pyrsistent import PClass, field from treq import json_content, content import treq from twisted.web.http import ( @@ -17,9 +18,6 @@ from ._persistence import IConfigurationStore -BASE_URL = b"http://localhost:8500/v1/kv" -CONFIG_PATH = b"/com.clusterhq/flocker/current_configuration" - _LOG_HTTP_REQUEST = ActionType( "flocker:control:consul", [Field.forTypes("url", [bytes, unicode], "Request URL."), @@ -46,10 +44,18 @@ class NotFound(Exception): """ +class NotReady(Exception): + """ + """ + +CONFIG_PATH = b"/v1/kv/com.clusterhq/flocker/current_configuration" + + @implementer(IConfigurationStore) -class ConsulConfigurationStore(object): - _base_url = BASE_URL +class ConsulConfigurationStore(PClass): _treq = treq + api_address = field(mandatory=True, initial=b"127.0.0.1") + api_port = field(mandatory=True, initial=8500) def _request_with_headers( self, method, path, body, success_codes, error_codes=None): @@ -67,7 +73,9 @@ def _request_with_headers( :return: ``Deferred`` firing a tuple of (decoded JSON, response headers). """ - url = self._base_url + path + url = b"http://{}:{}{}".format( + self.api_address, self.api_port, path + ) action = _LOG_HTTP_REQUEST(url=url, method=method, request_body=body) if error_codes is None: @@ -121,8 +129,6 @@ def _request(self, *args, **kwargs): lambda t: t[0]) def initialize(self): - """ - """ d = self.get_content() def set_if_missing(failure): @@ -157,3 +163,17 @@ def set_content(self, content_bytes): content_bytes, success_codes={OK}, ) + + def ready(self): + d = self._request( + b"GET", + b"/v1/status/leader", + None, + {OK}, + ) + + def check_leader(result): + if not result: + raise NotReady("The consul cluster has no leader.") + d.addCallback(check_leader) + return d diff --git a/flocker/control/test/test_consul.py b/flocker/control/test/test_consul.py index ebf9437917..6b3b6a9ecc 100644 --- a/flocker/control/test/test_consul.py +++ b/flocker/control/test/test_consul.py @@ -3,14 +3,17 @@ Tests for ``flocker.control._consul``. """ from subprocess import check_output -import time -from ...testtools import AsyncTestCase, random_name +from twisted.internet import reactor +from twisted.internet.error import ConnectionRefusedError -from .._consul import ConsulConfigurationStore, NotFound +from ...testtools import AsyncTestCase, random_name, find_free_port +from ...common import retry_failure +from .._consul import ConsulConfigurationStore, NotFound, NotReady def consul_server_for_test(test_case): + api_address, api_port = find_free_port() container_name = random_name(test_case) container_id = check_output([ 'docker', 'run', @@ -20,6 +23,7 @@ def consul_server_for_test(test_case): 'consul', 'agent', '-advertise', '127.0.0.1', + '-http-port', str(api_port), '-dev' ]).rstrip() @@ -27,22 +31,29 @@ def consul_server_for_test(test_case): check_output, ['docker', 'rm', '--force', container_id] ) - # XXX Wait for consul port to be listening. - time.sleep(2) + return api_port class ConsulTests(AsyncTestCase): def setUp(self): super(ConsulTests, self).setUp() - consul_server_for_test(self) + api_port = consul_server_for_test(self) + self.store = ConsulConfigurationStore( + api_port=api_port + ) + return retry_failure( + reactor, + self.store.ready, + {ConnectionRefusedError, NotReady}, + [0.1] * 50 + ) def test_uninitialized(self): """ ``get_content`` raises ``NotFound`` if the configuration store key does not exist. """ - store = ConsulConfigurationStore() - d = store.get_content() + d = self.store.get_content() d = self.assertFailure(d, NotFound) return d @@ -50,9 +61,8 @@ def test_initialize_empty(self): """ ``initialize`` creates the key with an empty value. """ - store = ConsulConfigurationStore() - d = store.initialize() - d.addCallback(lambda ignored: store.get_content()) + d = self.store.initialize() + d.addCallback(lambda ignored: self.store.get_content()) d.addCallback(self.assertEqual, b"") return d @@ -62,10 +72,9 @@ def test_set_and_get(self): ``get_content``. """ expected_value = random_name(self).encode('utf8') - store = ConsulConfigurationStore() - d = store.initialize() - d.addCallback(lambda ignored: store.set_content(expected_value)) - d.addCallback(lambda ignored: store.get_content()) + d = self.store.initialize() + d.addCallback(lambda ignored: self.store.set_content(expected_value)) + d.addCallback(lambda ignored: self.store.get_content()) d.addCallback(self.assertEqual, expected_value) return d @@ -74,10 +83,10 @@ def test_initialize_non_empty(self): ``initialize`` does not overwrite an existing value. """ expected_value = random_name(self).encode('utf8') - store = ConsulConfigurationStore() - d = store.initialize() - d.addCallback(lambda ignored: store.set_content(expected_value)) - d.addCallback(lambda ignored: store.initialize()) - d.addCallback(lambda ignored: store.get_content()) + d = self.store.initialize() + d.addCallback(lambda ignored: self.store.set_content(expected_value)) + # Second initialize does not overwrite the expected_value above. + d.addCallback(lambda ignored: self.store.initialize()) + d.addCallback(lambda ignored: self.store.get_content()) d.addCallback(self.assertEqual, expected_value) return d From ee22315b4202bf89be249e30db57a18f2025f65a Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Tue, 2 Aug 2016 11:11:42 +0100 Subject: [PATCH 07/15] A way to load configuration stre plugins --- flocker/control/script.py | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/flocker/control/script.py b/flocker/control/script.py index ee53dd2edb..6e3f876a82 100644 --- a/flocker/control/script.py +++ b/flocker/control/script.py @@ -10,6 +10,8 @@ from functools import partial from time import clock +from pyrsistent import PClass, field + from twisted.python.usage import Options from twisted.internet.endpoints import serverFromString from twisted.internet.defer import maybeDeferred @@ -22,6 +24,7 @@ ConfigurationPersistenceService, FilePathConfigurationStore, ) +from ._consul import ConsulConfigurationStore from ._clusterstate import ClusterStateService from ..common.script import ( flocker_standard_options, FlockerScriptRunner, main_for_service, @@ -34,6 +37,32 @@ DEFAULT_CERTIFICATE_PATH = b"/etc/flocker" +class ConfigurationStorePlugin(PClass): + name = field(mandatory=True, type={unicode}) + factory = field(mandatory=True) + +CONFIGURATION_STORE_PLUGINS = [ + ConfigurationStorePlugin( + name=u"filepath", + factory=FilePathConfigurationStore.from_directory + ), + ConfigurationStorePlugin( + name=u"consul", + factory=ConsulConfigurationStore + ), +] + +CONFIGURATION_STORE_PLUGINS_BY_NAME = { + p.name: p for p in CONFIGURATION_STORE_PLUGINS +} + +CONFIGURATION_STORE_PLUGIN_NAMES = [ + p.name for p in CONFIGURATION_STORE_PLUGINS +] + +CONFIGURATION_STORE_PLUGIN_DEFAULT = CONFIGURATION_STORE_PLUGIN_NAMES[0] + + @flocker_standard_options class ControlOptions(Options): """ @@ -42,6 +71,12 @@ class ControlOptions(Options): optParameters = [ ["data-path", "d", FilePath(b"/var/lib/flocker"), "The directory where data will be persisted.", FilePath], + ["configuration-store-plugin", None, + CONFIGURATION_STORE_PLUGIN_DEFAULT, + "The plugin to use for storing Flocker configuration. " + "One of '{}'.".format( + "', '".join(CONFIGURATION_STORE_PLUGIN_NAMES) + )], ["port", "p", 'tcp:%d' % (REST_API_PORT,), "The external API port to listen on."], ["agent-port", "a", 'tcp:4524', From 79f3c4042dab6d0619d17fc1f3ccbbd5f36dd0e7 Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Wed, 3 Aug 2016 10:22:28 +0100 Subject: [PATCH 08/15] Move ConfigurationStore implementations to a separate package --- flocker/control/_persistence.py | 78 ++----------------- .../control/configuration_storage/__init__.py | 0 .../consul.py} | 5 +- .../configuration_storage/directory.py | 64 +++++++++++++++ .../configuration_storage/interface.py | 23 ++++++ .../configuration_storage/test/__init__.py | 0 .../configuration_storage/test/test_consul.py | 35 +++++++++ .../test/test_directory.py | 17 ++++ .../testtools.py} | 39 ++++------ flocker/control/script.py | 23 +++--- flocker/control/test/test_httpapi.py | 8 +- flocker/control/test/test_persistence.py | 21 ++++- flocker/control/test/test_protocol.py | 19 +---- flocker/control/testtools.py | 22 +++++- 14 files changed, 212 insertions(+), 142 deletions(-) create mode 100644 flocker/control/configuration_storage/__init__.py rename flocker/control/{_consul.py => configuration_storage/consul.py} (97%) create mode 100644 flocker/control/configuration_storage/directory.py create mode 100644 flocker/control/configuration_storage/interface.py create mode 100644 flocker/control/configuration_storage/test/__init__.py create mode 100644 flocker/control/configuration_storage/test/test_consul.py create mode 100644 flocker/control/configuration_storage/test/test_directory.py rename flocker/control/{test/test_consul.py => configuration_storage/testtools.py} (68%) diff --git a/flocker/control/_persistence.py b/flocker/control/_persistence.py index 539a541006..4bdcc866d4 100644 --- a/flocker/control/_persistence.py +++ b/flocker/control/_persistence.py @@ -15,7 +15,7 @@ from eliot import Logger, write_traceback, MessageType, Field, ActionType from eliot.twisted import DeferredContext -from pyrsistent import PRecord, PVector, PMap, PSet, pmap, PClass, field +from pyrsistent import PRecord, PVector, PMap, PSet, pmap, PClass from pytz import UTC @@ -24,13 +24,12 @@ from twisted.internet.defer import succeed, maybeDeferred from twisted.internet.task import LoopingCall -from zope.interface import Interface, implementer - from weakref import WeakKeyDictionary from ._model import ( SERIALIZABLE_CLASSES, Deployment, Configuration, GenerationHash ) +from .configuration_storage.directory import DirectoryConfigurationStore # The class at the root of the configuration tree. ROOT_CLASS = Deployment @@ -588,75 +587,9 @@ def update_leases(transform, persistence_service): return succeed(new_leases) -class IConfigurationStore(Interface): - """ - """ - def get_content(): - """ - """ - - def set_content(): - """ - """ - - -def _process_v1_config(directory, config_path): - """ - Check if a v1 configuration file exists and upgrade it if necessary. - After upgrade, the v1 configuration file is retained with an archived - file name, which ensures the data is not lost but we do not override - a newer configuration version next time the service starts. - """ - v1_config_path = directory.child(b"current_configuration.v1.json") - v1_archived_path = directory.child(b"current_configuration.v1.old.json") - # Check for a v1 config and upgrade to latest if found. - if v1_config_path.exists(): - v1_json = v1_config_path.getContent() - with _LOG_UPGRADE(configuration=v1_json, - source_version=1, - target_version=_CONFIG_VERSION): - updated_json = migrate_configuration( - 1, _CONFIG_VERSION, v1_json, - ConfigurationMigration - ) - config_path.setContent(updated_json) - v1_config_path.moveTo(v1_archived_path) - - -@implementer(IConfigurationStore) -class FilePathConfigurationStore(PClass): - path = field(mandatory=True) - - @classmethod - def from_directory(cls, directory): - if not directory.exists(): - directory.makedirs() - path = directory.child("current_configuration.json") - if not path.exists(): - path.touch() - # Version 1 configurations are a special case. They do not store - # any version information in the configuration data itself, rather they - # can only be identified by the use of the file name - # current_configuration.v1.json - # Therefore we check for a version 1 configuration file and if it is - # found, the config is upgraded, written to current_configuration.json - # and the old file archived as current_configuration.v1.old.json - _process_v1_config(directory, path) - return cls(path=path) - - def get_content_sync(self): - return self.path.getContent() - - def get_content(self): - return succeed(self.get_content_sync()) - - def set_content(self, content): - return succeed(self.path.setContent(content)) - - def load_and_upgrade(config_json): config_dict = loads(config_json) - config_version = config_dict['version'] + config_version = config_dict.get('version', 1) if config_version < _CONFIG_VERSION: with _LOG_UPGRADE(configuration=config_json, source_version=config_version, @@ -744,9 +677,10 @@ def load(json_bytes): @classmethod def from_directory(cls, reactor, directory): - configuration_store = FilePathConfigurationStore.from_directory( - directory + configuration_store = DirectoryConfigurationStore( + directory=directory ) + configuration_store.initialize_sync() return cls.from_json_bytes( reactor=reactor, json_bytes=configuration_store.get_content_sync(), diff --git a/flocker/control/configuration_storage/__init__.py b/flocker/control/configuration_storage/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/flocker/control/_consul.py b/flocker/control/configuration_storage/consul.py similarity index 97% rename from flocker/control/_consul.py rename to flocker/control/configuration_storage/consul.py index ec7760f0b9..d8a3b00a7f 100644 --- a/flocker/control/_consul.py +++ b/flocker/control/configuration_storage/consul.py @@ -15,7 +15,7 @@ ) from zope.interface import implementer -from ._persistence import IConfigurationStore +from .interface import IConfigurationStore _LOG_HTTP_REQUEST = ActionType( @@ -135,6 +135,7 @@ def set_if_missing(failure): failure.trap(NotFound) return self.set_content(b"") d.addErrback(set_if_missing) + d.addCallback(lambda ignored: None) return d def get_content(self): @@ -164,7 +165,7 @@ def set_content(self, content_bytes): success_codes={OK}, ) - def ready(self): + def _ready(self): d = self._request( b"GET", b"/v1/status/leader", diff --git a/flocker/control/configuration_storage/directory.py b/flocker/control/configuration_storage/directory.py new file mode 100644 index 0000000000..2746c457ca --- /dev/null +++ b/flocker/control/configuration_storage/directory.py @@ -0,0 +1,64 @@ +# Copyright ClusterHQ Inc. See LICENSE file for details. + +""" +Persistence of cluster configuration to local file. +""" + +from pyrsistent import PClass, field +from twisted.internet.defer import succeed +from twisted.python.filepath import FilePath +from zope.interface import implementer + +from .interface import IConfigurationStore + + +def _process_v1_config(directory, config_path): + """ + Check if a v1 configuration file exists and move it if necessary. + After upgrade, the v1 configuration file is retained with an archived + file name, which ensures the data is not lost but we do not override + a newer configuration version next time the service starts. + """ + v1_config_path = directory.child(b"current_configuration.v1.json") + v1_archived_path = directory.child(b"current_configuration.v1.old.json") + # Check for a v1 config and move to standard file location + if v1_config_path.exists(): + v1_json = v1_config_path.getContent() + config_path.setContent(v1_json) + v1_config_path.moveTo(v1_archived_path) + + +@implementer(IConfigurationStore) +class DirectoryConfigurationStore(PClass): + directory = field(mandatory=True, type={FilePath}) + + @property + def path(self): + return self.directory.child("current_configuration.json") + + def initialize_sync(self): + if not self.directory.exists(): + self.directory.makedirs() + if not self.path.exists(): + self.path.touch() + # Version 1 configurations are a special case. They do not store + # any version information in the configuration data itself, rather they + # can only be identified by the use of the file name + # current_configuration.v1.json + # Therefore we check for a version 1 configuration file and if it is + # found, the config is upgraded, written to current_configuration.json + # and the old file archived as current_configuration.v1.old.json + _process_v1_config(self.directory, self.path) + + def initialize(self): + self.initialize_sync() + return succeed(None) + + def get_content_sync(self): + return self.path.getContent() + + def get_content(self): + return succeed(self.get_content_sync()) + + def set_content(self, content): + return succeed(self.path.setContent(content)) diff --git a/flocker/control/configuration_storage/interface.py b/flocker/control/configuration_storage/interface.py new file mode 100644 index 0000000000..ba1551fe63 --- /dev/null +++ b/flocker/control/configuration_storage/interface.py @@ -0,0 +1,23 @@ +# Copyright ClusterHQ Inc. See LICENSE file for details. + +""" +Interface for cluster configuration storage plugin. +""" + +from zope.interface import Interface + + +class IConfigurationStore(Interface): + """ + """ + def initialize(): + """ + """ + + def get_content(): + """ + """ + + def set_content(content): + """ + """ diff --git a/flocker/control/configuration_storage/test/__init__.py b/flocker/control/configuration_storage/test/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/flocker/control/configuration_storage/test/test_consul.py b/flocker/control/configuration_storage/test/test_consul.py new file mode 100644 index 0000000000..9b7ef0b6be --- /dev/null +++ b/flocker/control/configuration_storage/test/test_consul.py @@ -0,0 +1,35 @@ +# Copyright ClusterHQ Inc. See LICENSE file for details. +""" +Tests for ``flocker.control.configuration_storage.consul``. +""" +from twisted.internet import reactor +from twisted.internet.error import ConnectionRefusedError + +from ....testtools import AsyncTestCase +from ....common import retry_failure +from ..testtools import consul_server_for_test, IConfigurationStoreTestsMixin +from ..consul import ConsulConfigurationStore, NotFound, NotReady + + +class ConsulTests(IConfigurationStoreTestsMixin, AsyncTestCase): + def setUp(self): + super(ConsulTests, self).setUp() + api_port = consul_server_for_test(self) + self.store = ConsulConfigurationStore( + api_port=api_port + ) + return retry_failure( + reactor, + self.store._ready, + {ConnectionRefusedError, NotReady}, + [0.1] * 50 + ) + + def test_uninitialized(self): + """ + ``get_content`` raises ``NotFound`` if the configuration store key does + not exist. + """ + d = self.store.get_content() + d = self.assertFailure(d, NotFound) + return d diff --git a/flocker/control/configuration_storage/test/test_directory.py b/flocker/control/configuration_storage/test/test_directory.py new file mode 100644 index 0000000000..0035c96173 --- /dev/null +++ b/flocker/control/configuration_storage/test/test_directory.py @@ -0,0 +1,17 @@ +# Copyright ClusterHQ Inc. See LICENSE file for details. +""" +Tests for ``flocker.control.configuration_storage.filepath``. +""" +from ....testtools import AsyncTestCase + +from ..directory import DirectoryConfigurationStore +from ..testtools import IConfigurationStoreTestsMixin + + +class DirectoryConfigurationStoreInterfaceTests(IConfigurationStoreTestsMixin, + AsyncTestCase): + def setUp(self): + super(DirectoryConfigurationStoreInterfaceTests, self).setUp() + self.store = DirectoryConfigurationStore( + directory=self.make_temporary_directory() + ) diff --git a/flocker/control/test/test_consul.py b/flocker/control/configuration_storage/testtools.py similarity index 68% rename from flocker/control/test/test_consul.py rename to flocker/control/configuration_storage/testtools.py index 6b3b6a9ecc..351bc1804a 100644 --- a/flocker/control/test/test_consul.py +++ b/flocker/control/configuration_storage/testtools.py @@ -1,15 +1,14 @@ # Copyright ClusterHQ Inc. See LICENSE file for details. """ -Tests for ``flocker.control._consul``. +Tests for ``flocker.control.configuration_storage``. """ from subprocess import check_output -from twisted.internet import reactor -from twisted.internet.error import ConnectionRefusedError +from zope.interface.verify import verifyObject -from ...testtools import AsyncTestCase, random_name, find_free_port -from ...common import retry_failure -from .._consul import ConsulConfigurationStore, NotFound, NotReady +from ...testtools import random_name, find_free_port + +from .interface import IConfigurationStore def consul_server_for_test(test_case): @@ -34,27 +33,19 @@ def consul_server_for_test(test_case): return api_port -class ConsulTests(AsyncTestCase): - def setUp(self): - super(ConsulTests, self).setUp() - api_port = consul_server_for_test(self) - self.store = ConsulConfigurationStore( - api_port=api_port - ) - return retry_failure( - reactor, - self.store.ready, - {ConnectionRefusedError, NotReady}, - [0.1] * 50 - ) +class IConfigurationStoreTestsMixin(object): + def test_interface(self): + """ + ``self.store`` provides ``IConfigurationStore``. + """ + self.assertTrue(verifyObject(IConfigurationStore, self.store)) - def test_uninitialized(self): + def test_initialize_returns_none(self): """ - ``get_content`` raises ``NotFound`` if the configuration store key does - not exist. + ``initialize`` returns ``None``. """ - d = self.store.get_content() - d = self.assertFailure(d, NotFound) + d = self.store.initialize() + d.addCallback(self.assertIs, None) return d def test_initialize_empty(self): diff --git a/flocker/control/script.py b/flocker/control/script.py index 6e3f876a82..d6f7bfa609 100644 --- a/flocker/control/script.py +++ b/flocker/control/script.py @@ -14,7 +14,6 @@ from twisted.python.usage import Options from twisted.internet.endpoints import serverFromString -from twisted.internet.defer import maybeDeferred from twisted.python.filepath import FilePath from twisted.application.service import MultiService from twisted.internet.ssl import Certificate @@ -22,9 +21,10 @@ from .httpapi import create_api_service, REST_API_PORT from ._persistence import ( ConfigurationPersistenceService, - FilePathConfigurationStore, ) -from ._consul import ConsulConfigurationStore +from .configuration_storage.consul import ConsulConfigurationStore +from .configuration_storage.directory import DirectoryConfigurationStore + from ._clusterstate import ClusterStateService from ..common.script import ( flocker_standard_options, FlockerScriptRunner, main_for_service, @@ -43,12 +43,12 @@ class ConfigurationStorePlugin(PClass): CONFIGURATION_STORE_PLUGINS = [ ConfigurationStorePlugin( - name=u"filepath", - factory=FilePathConfigurationStore.from_directory + name=u"directory", + factory=DirectoryConfigurationStore, ), ConfigurationStorePlugin( name=u"consul", - factory=ConsulConfigurationStore + factory=ConsulConfigurationStore, ), ] @@ -101,16 +101,15 @@ def main(self, reactor, options): # flexible. https://clusterhq.atlassian.net/browse/FLOC-1865 control_credential = ControlCredential.from_path( certificates_path, b"service") - - d = maybeDeferred( - FilePathConfigurationStore.from_directory, - options["data-path"], + store = DirectoryConfigurationStore( + directory=options["data-path"] ) + d = store.initialize() - def make_persistence_service(configuration_store): + def make_persistence_service(ignored): return ConfigurationPersistenceService.from_configuration_store( reactor, - configuration_store + store ) d.addCallback(make_persistence_service) diff --git a/flocker/control/test/test_httpapi.py b/flocker/control/test/test_httpapi.py index 96d08bae5c..090e8a60c7 100644 --- a/flocker/control/test/test_httpapi.py +++ b/flocker/control/test/test_httpapi.py @@ -76,9 +76,7 @@ def initialize(self): """ Create initial objects for the ``ConfigurationAPIUserV1``. """ - service = ConfigurationPersistenceService.from_directory( - reactor, FilePath(self.mktemp()) - ) + service = ConfigurationPersistenceService(reactor) self.persistence_service = service self.persistence_service.startService() self.cluster_state_service = ClusterStateService(Clock()) @@ -2602,9 +2600,7 @@ def test_returns_service(self): reactor = MemoryReactor() endpoint = TCP4ServerEndpoint(reactor, 6789) verifyObject(IService, create_api_service( - ConfigurationPersistenceService.from_directory( - reactor, FilePath(self.mktemp()) - ), + ConfigurationPersistenceService(reactor), ClusterStateService(reactor), endpoint, ClientContextFactory())) diff --git a/flocker/control/test/test_persistence.py b/flocker/control/test/test_persistence.py index 35e207f66e..2c6adfe5fb 100644 --- a/flocker/control/test/test_persistence.py +++ b/flocker/control/test/test_persistence.py @@ -29,7 +29,7 @@ from testtools.matchers import Is, Equals, Not -from ..testtools import deployment_strategy +from ..testtools import deployment_strategy, arbitrary_transformation from ...testtools import AsyncTestCase, TestCase from .._persistence import ( @@ -294,15 +294,28 @@ def test_old_configuration_is_upgraded(self): The persistence service will detect if an existing configuration saved in a file is a previous version and perform a migration to the latest version. + + XXX: The upgraded configuration is not persisted immediately. Only + when the deployment is first saved and only if the deployment has + changed. I'm not sure I like this -RichardW. """ path = FilePath(self.mktemp()) path.makedirs() v1_config_file = path.child(b"current_configuration.v1.json") v1_config_file.setContent(V1_TEST_DEPLOYMENT_JSON) config_path = path.child(b"current_configuration.json") - self.service(path) - configuration = wire_decode(config_path.getContent()) - self.assertEqual(configuration.version, _CONFIG_VERSION) + service = self.service(path) + upgraded_deployment = service.get() + changed_upgraded_deployment = arbitrary_transformation( + upgraded_deployment + ) + d = service.save(changed_upgraded_deployment) + + def check_file(ignored): + configuration = wire_decode(config_path.getContent()) + self.assertEqual(configuration.version, _CONFIG_VERSION) + d.addCallback(check_file) + return d def test_current_configuration_unchanged(self): """ diff --git a/flocker/control/test/test_protocol.py b/flocker/control/test/test_protocol.py index 194388106d..325b73450b 100644 --- a/flocker/control/test/test_protocol.py +++ b/flocker/control/test/test_protocol.py @@ -31,7 +31,7 @@ from testtools.matchers import Equals -from ..testtools import build_control_amp_service +from ..testtools import build_control_amp_service, arbitrary_transformation from ...testtools import TestCase from ...testtools.amp import ( DelayedAMPClient, connected_amp_protocol, @@ -55,23 +55,6 @@ from .clusterstatetools import advance_some, advance_rest -def arbitrary_transformation(deployment): - """ - Make some change to a deployment configuration. Any change. - - The exact change made is unspecified but the resulting ``Deployment`` will - be different from the given ``Deployment``. - - :param Deployment deployment: A configuration to change. - - :return: A ``Deployment`` similar but not exactly equal to the given. - """ - uuid = uuid4() - return deployment.transform( - ["nodes", uuid], Node(uuid=uuid) - ) - - def arbitrary_state_transformation(deployment_state): """ Make some change to a deployment state. Any change. diff --git a/flocker/control/testtools.py b/flocker/control/testtools.py index 0fe5e7a824..209bacc79c 100644 --- a/flocker/control/testtools.py +++ b/flocker/control/testtools.py @@ -16,10 +16,7 @@ from ..testtools import TestCase from ._clusterstate import ClusterStateService -from ._persistence import ( - ConfigurationPersistenceService, - FilePathConfigurationStore -) +from ._persistence import ConfigurationPersistenceService from ._protocol import ( ControlAMPService, ControlAMP, ) @@ -55,6 +52,23 @@ ] +def arbitrary_transformation(deployment): + """ + Make some change to a deployment configuration. Any change. + + The exact change made is unspecified but the resulting ``Deployment`` will + be different from the given ``Deployment``. + + :param Deployment deployment: A configuration to change. + + :return: A ``Deployment`` similar but not exactly equal to the given. + """ + uuid = uuid4() + return deployment.transform( + ["nodes", uuid], Node(uuid=uuid) + ) + + def make_istatepersister_tests(fixture): """ Create a TestCase for ``IStatePersister``. From 8bec56a0d1e1b9e280d321b3a410b84307358e78 Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Wed, 3 Aug 2016 12:23:37 +0100 Subject: [PATCH 09/15] Add retry to the consul store. --- flocker/common/__init__.py | 4 +- flocker/control/_persistence.py | 3 +- .../control/configuration_storage/consul.py | 48 ++++++++++- .../configuration_storage/test/test_consul.py | 12 +-- flocker/control/script.py | 79 ++++++++++++++----- 5 files changed, 109 insertions(+), 37 deletions(-) diff --git a/flocker/common/__init__.py b/flocker/common/__init__.py index dad0b2ca54..a1f0628a11 100644 --- a/flocker/common/__init__.py +++ b/flocker/common/__init__.py @@ -23,7 +23,7 @@ from ._retry import ( loop_until, timeout, poll_until, retry_failure, retry_effect_with_timeout, get_default_retry_steps, - retry_if, decorate_methods, with_retry, + retry_if, decorate_methods, with_retry, backoff ) from .version import parse_version, UnparseableVersion @@ -36,7 +36,7 @@ 'poll_until', 'retry_effect_with_timeout', 'decorate_methods', - 'get_default_retry_steps', 'retry_if', 'with_retry', + 'get_default_retry_steps', 'retry_if', 'with_retry', 'backoff', 'parse_version', 'UnparseableVersion', 'RACKSPACE_MINIMUM_VOLUME_SIZE', diff --git a/flocker/control/_persistence.py b/flocker/control/_persistence.py index 4bdcc866d4..6623261237 100644 --- a/flocker/control/_persistence.py +++ b/flocker/control/_persistence.py @@ -664,7 +664,8 @@ def from_configuration_store(cls, reactor, configuration_store): Load the persisted configuration, upgrading the configuration format if an older version is detected. """ - d = configuration_store.get_content() + d = configuration_store.initialize() + d.addCallback(lambda ignored: configuration_store.get_content()) def load(json_bytes): return cls.from_json_bytes( diff --git a/flocker/control/configuration_storage/consul.py b/flocker/control/configuration_storage/consul.py index d8a3b00a7f..0e196e5176 100644 --- a/flocker/control/configuration_storage/consul.py +++ b/flocker/control/configuration_storage/consul.py @@ -10,12 +10,17 @@ from pyrsistent import PClass, field from treq import json_content, content import treq + +from twisted.internet import reactor +from twisted.internet.error import ConnectionRefusedError from twisted.web.http import ( - OK, NOT_FOUND + OK, NOT_FOUND, INTERNAL_SERVER_ERROR ) + from zope.interface import implementer from .interface import IConfigurationStore +from ...common import retry_failure, backoff _LOG_HTTP_REQUEST = ActionType( @@ -44,10 +49,22 @@ class NotFound(Exception): """ +class InternalServerError(Exception): + """ + 500 from server. + """ + + class NotReady(Exception): """ """ + +class NoLeader(Exception): + """ + """ + + CONFIG_PATH = b"/v1/kv/com.clusterhq/flocker/current_configuration" @@ -128,6 +145,26 @@ def _request(self, *args, **kwargs): return self._request_with_headers(*args, **kwargs).addCallback( lambda t: t[0]) + def _request_retry(self, *args, **kwargs): + def handle_no_cluster_leader(failure): + failure.trap(InternalServerError) + if failure.value.message == "No cluster leader": + raise NoLeader() + return failure + + def request(): + d = self._request(*args, **kwargs) + d.addErrback(handle_no_cluster_leader) + return d + + d = retry_failure( + reactor, + request, + {ConnectionRefusedError, NoLeader}, + backoff(step=0.5, maximum_step=5.0, timeout=60.0), + ) + return d + def initialize(self): d = self.get_content() @@ -139,12 +176,15 @@ def set_if_missing(failure): return d def get_content(self): - d = self._request( + d = self._request_retry( b"GET", CONFIG_PATH, None, {OK}, - error_codes={NOT_FOUND: NotFound} + error_codes={ + NOT_FOUND: NotFound, + INTERNAL_SERVER_ERROR: InternalServerError, + } ) def decode(result): @@ -158,7 +198,7 @@ def decode(result): return d def set_content(self, content_bytes): - return self._request( + return self._request_retry( b"PUT", CONFIG_PATH, content_bytes, diff --git a/flocker/control/configuration_storage/test/test_consul.py b/flocker/control/configuration_storage/test/test_consul.py index 9b7ef0b6be..03f4eb8153 100644 --- a/flocker/control/configuration_storage/test/test_consul.py +++ b/flocker/control/configuration_storage/test/test_consul.py @@ -2,13 +2,9 @@ """ Tests for ``flocker.control.configuration_storage.consul``. """ -from twisted.internet import reactor -from twisted.internet.error import ConnectionRefusedError - from ....testtools import AsyncTestCase -from ....common import retry_failure from ..testtools import consul_server_for_test, IConfigurationStoreTestsMixin -from ..consul import ConsulConfigurationStore, NotFound, NotReady +from ..consul import ConsulConfigurationStore, NotFound class ConsulTests(IConfigurationStoreTestsMixin, AsyncTestCase): @@ -18,12 +14,6 @@ def setUp(self): self.store = ConsulConfigurationStore( api_port=api_port ) - return retry_failure( - reactor, - self.store._ready, - {ConnectionRefusedError, NotReady}, - [0.1] * 50 - ) def test_uninitialized(self): """ diff --git a/flocker/control/script.py b/flocker/control/script.py index d6f7bfa609..76f12644e3 100644 --- a/flocker/control/script.py +++ b/flocker/control/script.py @@ -12,7 +12,7 @@ from pyrsistent import PClass, field -from twisted.python.usage import Options +from twisted.python.usage import Options, UsageError from twisted.internet.endpoints import serverFromString from twisted.python.filepath import FilePath from twisted.application.service import MultiService @@ -40,15 +40,59 @@ class ConfigurationStorePlugin(PClass): name = field(mandatory=True, type={unicode}) factory = field(mandatory=True) + options = field(mandatory=True) + + def __unicode__(self): + return self.name + + +def directory_store_from_options(options): + return DirectoryConfigurationStore( + directory=options["data-path"] + ) + + +def consul_store_from_options(options): + return ConsulConfigurationStore( + api_address=options["consul-api-address"], + api_port=options["consul-api-port"], + ) + + +def validate_plugin_options(plugin, options): + required_options = {option[0] for option in plugin.options} + missing_options = required_options.difference(options.keys()) + if missing_options: + raise UsageError( + u"Missing option(s): '--{}'. " + u"--configuration-store-plugin {} " + u"requires '--{}'.".format( + "', '--".join(missing_options), + plugin.name, + "', '--".join(required_options), + ) + ) + CONFIGURATION_STORE_PLUGINS = [ ConfigurationStorePlugin( name=u"directory", - factory=DirectoryConfigurationStore, + factory=directory_store_from_options, + options=[[ + "data-path", "d", FilePath(b"/var/lib/flocker"), + "The directory where data will be persisted.", FilePath + ]], + ), ConfigurationStorePlugin( name=u"consul", - factory=ConsulConfigurationStore, + factory=consul_store_from_options, + options=[ + ["consul-api-address", None, u"127.0.0.1", + "The IP address or hostname of the consul server"], + ["consul-api-port", None, 8500, + "The TCP port number of the consul server", int], + ], ), ] @@ -60,7 +104,7 @@ class ConfigurationStorePlugin(PClass): p.name for p in CONFIGURATION_STORE_PLUGINS ] -CONFIGURATION_STORE_PLUGIN_DEFAULT = CONFIGURATION_STORE_PLUGIN_NAMES[0] +CONFIGURATION_STORE_PLUGIN_DEFAULT = CONFIGURATION_STORE_PLUGINS[0] @flocker_standard_options @@ -69,14 +113,12 @@ class ControlOptions(Options): Command line options for ``flocker-control`` cluster management process. """ optParameters = [ - ["data-path", "d", FilePath(b"/var/lib/flocker"), - "The directory where data will be persisted.", FilePath], ["configuration-store-plugin", None, CONFIGURATION_STORE_PLUGIN_DEFAULT, - "The plugin to use for storing Flocker configuration. " - "One of '{}'.".format( + u"The plugin to use for storing Flocker configuration. " + u"One of '{}'.".format( "', '".join(CONFIGURATION_STORE_PLUGIN_NAMES) - )], + ), CONFIGURATION_STORE_PLUGINS_BY_NAME.get], ["port", "p", 'tcp:%d' % (REST_API_PORT,), "The external API port to listen on."], ["agent-port", "a", 'tcp:4524', @@ -86,6 +128,8 @@ class ControlOptions(Options): "root certificate (cluster.crt) and control service certificate " "and private key (control-service.crt and control-service.key).")], ] + for plugin in CONFIGURATION_STORE_PLUGINS: + optParameters.extend(plugin.options) class ControlScript(object): @@ -101,17 +145,14 @@ def main(self, reactor, options): # flexible. https://clusterhq.atlassian.net/browse/FLOC-1865 control_credential = ControlCredential.from_path( certificates_path, b"service") - store = DirectoryConfigurationStore( - directory=options["data-path"] - ) - d = store.initialize() + configuration_store_plugin = options['configuration-store-plugin'] + validate_plugin_options(configuration_store_plugin, options) + store = configuration_store_plugin.factory(options) - def make_persistence_service(ignored): - return ConfigurationPersistenceService.from_configuration_store( - reactor, - store - ) - d.addCallback(make_persistence_service) + d = ConfigurationPersistenceService.from_configuration_store( + reactor, + store + ) def start_services(persistence_service): top_service = MultiService() From 3c3a55b0ffa9126d7ef3d7919ad055436ebc7dbc Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Wed, 3 Aug 2016 12:34:00 +0100 Subject: [PATCH 10/15] Let's try this! --- admin/package-files/systemd/flocker-control.service | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/admin/package-files/systemd/flocker-control.service b/admin/package-files/systemd/flocker-control.service index bcfe235fc8..dabfad46e4 100644 --- a/admin/package-files/systemd/flocker-control.service +++ b/admin/package-files/systemd/flocker-control.service @@ -2,7 +2,7 @@ Description=Flocker Control Service [Service] -ExecStart=/usr/sbin/flocker-control --port tcp:4523 --agent-port tcp:4524 --journald +ExecStart=/usr/sbin/flocker-control --port tcp:4523 --agent-port tcp:4524 --journald --configuration-store-plugin consul Restart=always PrivateTmp=true From 9c2de81497e559bb5a2435fd837305ce12035b78 Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Wed, 3 Aug 2016 15:21:21 +0100 Subject: [PATCH 11/15] Update our TLS dropin configuration for Docker 1.12.0 --- flocker/provision/_install.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flocker/provision/_install.py b/flocker/provision/_install.py index c7efdf7d96..190a097c82 100644 --- a/flocker/provision/_install.py +++ b/flocker/provision/_install.py @@ -862,7 +862,7 @@ def task_enable_docker(distribution): """\ [Service] ExecStart= - ExecStart=/usr/bin/docker daemon -H fd:// {} + ExecStart=/usr/bin/dockerd {} """.format(docker_tls_options))), run_from_args(["systemctl", "enable", "docker.service"]), ]) From ec2b0f3a304da867c2a7e3675deae1b24f32110a Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Wed, 3 Aug 2016 15:30:19 +0100 Subject: [PATCH 12/15] Docker on Centos-7 is no longer socket activated...so hard code the unix socket. --- flocker/provision/_install.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/flocker/provision/_install.py b/flocker/provision/_install.py index 190a097c82..a100025ba9 100644 --- a/flocker/provision/_install.py +++ b/flocker/provision/_install.py @@ -862,7 +862,9 @@ def task_enable_docker(distribution): """\ [Service] ExecStart= - ExecStart=/usr/bin/dockerd {} + ExecStart=/usr/bin/dockerd \ + -H unix:///var/run/docker.sock \ + {} """.format(docker_tls_options))), run_from_args(["systemctl", "enable", "docker.service"]), ]) From 6f1ca8e2070e041554a27142e6bce44541d0728b Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Wed, 3 Aug 2016 18:24:14 +0100 Subject: [PATCH 13/15] A hack to prevent the acceptance test script cleaning up my consul containers --- flocker/acceptance/endtoend/test_dockerplugin.py | 5 +++++ flocker/acceptance/testtools.py | 4 +++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/flocker/acceptance/endtoend/test_dockerplugin.py b/flocker/acceptance/endtoend/test_dockerplugin.py index 86405afe59..80c8f85a5d 100644 --- a/flocker/acceptance/endtoend/test_dockerplugin.py +++ b/flocker/acceptance/endtoend/test_dockerplugin.py @@ -122,6 +122,11 @@ def run_python_container(self, cluster, address, docker_arguments, script, for container in client.containers(): client.remove_container(container["Id"], force=True) + # Give the test containers a meaningful name so that we can identify + # which test produced any container that don't get garbage collected. + if "name" not in docker_arguments: + docker_arguments["name"] = random_name(self) + container = client.create_container( "python:2.7-slim", ["python2.7", "-c", script.getContent()] + list(script_arguments), diff --git a/flocker/acceptance/testtools.py b/flocker/acceptance/testtools.py index b353ad224b..f0b121fa2c 100644 --- a/flocker/acceptance/testtools.py +++ b/flocker/acceptance/testtools.py @@ -873,8 +873,10 @@ def cleanup_all_containers(_): # Remove all existing containers on the node, in case # they're left over from previous test; they might e.g. # have a volume bind-mounted, preventing its destruction. + # XXX But don't kill the consul containers for container in client.containers(): - client.remove_container(container["Id"], force=True) + if "acceptance" in [name for name in container["Names"]]: + client.remove_container(container["Id"], force=True) def cleanup_flocker_containers(_): cleaning_containers = api_clean_state( From b5572c3badeceb7564270d0c8f74df5785c3273b Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Thu, 4 Aug 2016 12:08:26 +0100 Subject: [PATCH 14/15] Fix broken apiclient functional tests --- flocker/apiclient/test/test_client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flocker/apiclient/test/test_client.py b/flocker/apiclient/test/test_client.py index c3fbb07b8b..7288b72a00 100644 --- a/flocker/apiclient/test/test_client.py +++ b/flocker/apiclient/test/test_client.py @@ -717,7 +717,8 @@ def create_client(self): clock = Clock() _, self.port = find_free_port() self.persistence_service = ConfigurationPersistenceService( - clock, FilePath(self.mktemp())) + reactor=clock + ) self.persistence_service.startService() self.cluster_state_service = ClusterStateService(reactor) self.cluster_state_service.startService() From d70fad830a26110dc90834fce5e4440082297352 Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Tue, 16 Aug 2016 12:59:48 +0100 Subject: [PATCH 15/15] Add an SQL configuration store and functional tests against MariaDB. --- flocker/control/configuration_storage/sql.py | 68 +++++++++++++++++++ .../configuration_storage/test/test_sql.py | 25 +++++++ .../configuration_storage/testtools.py | 36 +++++++++- flocker/control/script.py | 15 ++++ requirements/flocker.txt.in | 2 + 5 files changed, 145 insertions(+), 1 deletion(-) create mode 100644 flocker/control/configuration_storage/sql.py create mode 100644 flocker/control/configuration_storage/test/test_sql.py diff --git a/flocker/control/configuration_storage/sql.py b/flocker/control/configuration_storage/sql.py new file mode 100644 index 0000000000..f7f58cf497 --- /dev/null +++ b/flocker/control/configuration_storage/sql.py @@ -0,0 +1,68 @@ +# Copyright ClusterHQ Inc. See LICENSE file for details. + +""" +Persistence of cluster configuration to a SQL database. +""" +from time import sleep +from pyrsistent import PClass, field +from sqlalchemy import ( + Table, Column, LargeBinary, MetaData, create_engine, select +) +from sqlalchemy.exc import OperationalError +from twisted.internet.defer import succeed +from zope.interface import implementer + +from .interface import IConfigurationStore + + +class NotFound(Exception): + pass + + +METADATA = MetaData() +CONFIGURATION_TABLE = Table( + "configuration", + METADATA, + Column("content", LargeBinary), +) + + +@implementer(IConfigurationStore) +class SQLConfigurationStore(PClass): + connection_string = field(mandatory=True) + + def _connect(self): + engine = create_engine(self.connection_string) + while True: + try: + connection = engine.connect() + except OperationalError: + sleep(1) + else: + break + + return connection + + def initialize(self): + connection = self._connect() + table_names = set(connection.engine.table_names()) + if not set(METADATA.tables.keys()).issubset(table_names): + METADATA.create_all(connection) + connection.execute( + CONFIGURATION_TABLE.insert().values(content=b"") + ) + return succeed(None) + + def get_content(self): + connection = self._connect() + [result] = connection.execute( + select([CONFIGURATION_TABLE.c.content]) + ).fetchall() + return succeed(result['content']) + + def set_content(self, content_bytes): + connection = self._connect() + connection.execute( + CONFIGURATION_TABLE.update().values(content=content_bytes) + ) + return succeed(None) diff --git a/flocker/control/configuration_storage/test/test_sql.py b/flocker/control/configuration_storage/test/test_sql.py new file mode 100644 index 0000000000..934604e377 --- /dev/null +++ b/flocker/control/configuration_storage/test/test_sql.py @@ -0,0 +1,25 @@ +# Copyright ClusterHQ Inc. See LICENSE file for details. +""" +Tests for ``flocker.control.configuration_storage.sql``. +""" +from ....testtools import AsyncTestCase +from ..testtools import mariadb_server_for_test, IConfigurationStoreTestsMixin +from ..sql import SQLConfigurationStore, NotFound + + +class SQLTests(IConfigurationStoreTestsMixin, AsyncTestCase): + def setUp(self): + super(SQLTests, self).setUp() + connection_url = mariadb_server_for_test(self) + self.store = SQLConfigurationStore( + connection_string=unicode(connection_url) + ) + + def test_uninitialized(self): + """ + ``get_content`` raises ``NotFound`` if the configuration store key does + not exist. + """ + d = self.store.get_content() + d = self.assertFailure(d, NotFound) + return d diff --git a/flocker/control/configuration_storage/testtools.py b/flocker/control/configuration_storage/testtools.py index 351bc1804a..f603a0fa44 100644 --- a/flocker/control/configuration_storage/testtools.py +++ b/flocker/control/configuration_storage/testtools.py @@ -2,12 +2,13 @@ """ Tests for ``flocker.control.configuration_storage``. """ +from os import urandom from subprocess import check_output +from sqlalchemy.engine.url import URL as sqlalchemy_url from zope.interface.verify import verifyObject from ...testtools import random_name, find_free_port - from .interface import IConfigurationStore @@ -33,6 +34,39 @@ def consul_server_for_test(test_case): return api_port +def mariadb_server_for_test(test_case): + address, port = find_free_port() + container_name = random_name(test_case) + password = urandom(32).encode('hex') + + url = sqlalchemy_url( + drivername=u"mysql+pymysql", + host=address, + port=port, + username=u"test", + password=password, + database=u"flocker", + ) + + container_id = check_output([ + 'docker', 'run', + '--detach', + '--publish', u'{}:3306'.format(url.port), + '--name', container_name, + '--env', u'MYSQL_ROOT_PASSWORD={}'.format(url.password), + '--env', u'MYSQL_USER={}'.format(url.username), + '--env', u'MYSQL_PASSWORD={}'.format(url.password), + '--env', u'MYSQL_DATABASE={}'.format(url.database), + 'mariadb:10.1' + ]).rstrip() + + test_case.addCleanup( + check_output, + ['docker', 'rm', '--force', container_id] + ) + return url + + class IConfigurationStoreTestsMixin(object): def test_interface(self): """ diff --git a/flocker/control/script.py b/flocker/control/script.py index 76f12644e3..acdee6a58c 100644 --- a/flocker/control/script.py +++ b/flocker/control/script.py @@ -24,6 +24,7 @@ ) from .configuration_storage.consul import ConsulConfigurationStore from .configuration_storage.directory import DirectoryConfigurationStore +from .configuration_storage.sql import SQLConfigurationStore from ._clusterstate import ClusterStateService from ..common.script import ( @@ -59,6 +60,12 @@ def consul_store_from_options(options): ) +def sql_store_from_options(options): + return SQLConfigurationStore( + connection_string=options["connection-string"], + ) + + def validate_plugin_options(plugin, options): required_options = {option[0] for option in plugin.options} missing_options = required_options.difference(options.keys()) @@ -94,6 +101,14 @@ def validate_plugin_options(plugin, options): "The TCP port number of the consul server", int], ], ), + ConfigurationStorePlugin( + name=u"sql", + factory=sql_store_from_options, + options=[ + ["sql_connection_string", None, u"mysql+pymysql://", + "The SQLAlchemy connection string for your database."], + ], + ), ] CONFIGURATION_STORE_PLUGINS_BY_NAME = { diff --git a/requirements/flocker.txt.in b/requirements/flocker.txt.in index da60df8e0c..3cb317d41b 100644 --- a/requirements/flocker.txt.in +++ b/requirements/flocker.txt.in @@ -35,6 +35,7 @@ oauth2client < 2.0.0 # setuptools capable of parsing pkg_resources environment markers # e.g. nomenclature; sys_platform==linux2 (FLOC-4429) pip >= 8.1.2 +pymysql pyOpenSSL psutil --find-links git+https://github.com/ClusterHQ/pyrsistent@v0.11.13+chq5#egg=pyrsistent-0.11.13+chq5 @@ -49,6 +50,7 @@ setuptools >= 22.0.5 six Sphinx sphinxcontrib-httpdomain +SQLAlchemy testtools treq Twisted