In some API calls it is necessary to perform some work, which could take a long time. In that cases it is recommended to store a task (kind of message) in task queue and return quick response to user. In such task the API function should store all necessary information, required to finish this task.
All tasks are fetched and executed by special processes - agents. Each agent is prepared to execute exactly one kind of task with several actions. For example the Node agent will be able to mount storages on nodes and retreive base virtual machine image to nodes. The VM agent is designed to manage existing and new virtual machines. Type of each task defines, which agent should handle it with proper action.
All tasks are assigned to proper queues, so all node tasks are in one queue and all vm tasks are in another. However there is a mechanism, which blocks executing tasks from one queue before ones from another queue. For example when Node agent has finished image copying, it unblocks tasks for VM agent. So, it could attach external disks (vm:attach), start (vm:start_vm) and optionaly reset virtual machine (vm:reset). There is much more agent types in whole CoreCluster and related modules. Each task executed by agent has its own type, which determines agent type to handle it and action, which corresponds the agent's function.
Described above task's structure could be better understood by Task and BaseAgent class definitions:
class BaseAgent(threading.Thread): task_type = "" supported_actions =  ...
The field task_type defines which kind of tasks should be processed by agent. The supported_actions defines possible actions, which are performed by agent. Each action should have its equivalent in class method of particular agent. Each task uses common classs - the Task model, present in file corecluster/cache/task.py:
class Task(Model): states = ['init', 'not active', 'waiting', 'start trigger', 'in progress', 'finish trigger', 'ok', 'failed', 'canceled', ] serializable = ['id', 'data', 'type', 'action', 'version', 'ignore_errors', 'state', ... ] container = 'tasks' state = 'init' # type = '' # inherited from cache/model.py action = '' ignore_errors = False ...
Mention three last fields - type (this is inherited from Model class) which defines agent type and queue, action which defines what action should agent perform and the state. The serializable list defines what fields of agent should be stored in task queue by CoreCluster. Remember, that since version 16.01 task queue is not handled by database.
All agents are placed in corecluster/agents/ directory. To add new one, just create new file in this directory and add new agent definition in app.py file, which edscribes all agents, models etc. provided by module. You can also create new agent in your own extension in the same way.
The most basic agent should inherit the BaseAgent class from module corecluster.agents.base_agent and implement some methods and fields described below. First is a type - set it to proper type of tasks exexuted by your agent. If you want to override existing agent, remember to cover all actions in your agent. The second important field is supported_actions, which defines what methods could be executed by your agent. There are some forbidden names for actions:
The last thing is to implement all methods listed in supported_actions. Each method should accept as parameter the Task class instance (and self of course).
The class, which implements agent functionality should be called AgentThread. It could use additional, helper classes, like mixins, but the name of your agent class is mandatory. With this configuration, your agent should work as very simple, basic agent thread. in CoreCluster system.
The BaseAgent class is capable to catch any exception thrown by action method in your class. However there are three additional exception classes, which could be used to control behavior of the task queue, in case of any failure:
All above exceptions are defined in corecluster/exceptions/agent.py file.
Do not overwrite methods describe in this section unless it is not necessary or you know what you are doing. Instead use the hook subsystem missing article . Each time when task is processed following methods from BaseAgent are called:
The method responsible for fetching tasks from queue and executing them is run from BaseAgent class. It performs basic actions to notify, the agent is alive and starts processing tasks in main thread loop:
def run(self): self.init() self.agent.set_state('running') self.agent.task_fetch_timeout = config.get('agent', 'TASK_FETCH_INTERVAL', 20) self.agent.save() log(msg="Agent %s is running" % self.task_type, tags=('agent', self.task_type, 'info')) while self.i_am_running: ...
It is not recommended to overwrite standard python's __init__ method or run method from BaseAgent class. The best place to put your agent initialization code is the init method. It could be overwritten in any agent. To put code, which cleans up when agent is closed, overwrite the cleanup method. Remember to call parent's method from your code.
If there is reason, why above criteria are not valid for some kind of agents, it is possible to overwrite method get_tasks in your agent, however it is not recommended. Overwritten method should return list of tasks, which will be processed in your agent.
When agent finishes execution of tasks returend by get_tasks method, the main agent's loop sleeps certain amount of time and calls get_tasks again to check for new tasks.< Go back Author: Maciej Nabozny Published: Feb. 5, 2017, 11:50 a.m.