task_protocol#
- class baf.platforms.a2a.task_protocol.Task(method, params)[source]#
Bases:
objectTask initialises each task submitted to the agent that is added to the queue to be executed
- async notify_subscribers(message)[source]#
Push non-blocking message to all subscribers in queues (awaits put).
- class baf.platforms.a2a.task_protocol.TaskStatus(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#
-
Constants for task status
- DONE = 'DONE'#
- ERROR = 'ERROR'#
- PENDING = 'PENDING'#
- RUNNING = 'RUNNING'#
- _generate_next_value_(start, count, last_values)[source]#
Generate the next value when not given.
name: the name of the member start: the initial start value or None count: the number of existing members last_values: the list of values assigned
- _member_map_ = {'DONE': TaskStatus.DONE, 'ERROR': TaskStatus.ERROR, 'PENDING': TaskStatus.PENDING, 'RUNNING': TaskStatus.RUNNING}#
- _member_names_ = ['PENDING', 'RUNNING', 'DONE', 'ERROR']#
- _new_member_(**kwargs)#
Create and return a new object. See help(type) for accurate signature.
- _unhashable_values_ = []#
- _use_args_ = True#
- _value2member_map_ = {'DONE': TaskStatus.DONE, 'ERROR': TaskStatus.ERROR, 'PENDING': TaskStatus.PENDING, 'RUNNING': TaskStatus.RUNNING}#
- _value_repr_()#
Return repr(self).
- baf.platforms.a2a.task_protocol.create_task(method, params, task_storage=None)[source]#
This is an internal method. It creates a new task and adds it to the tasks dictionary.
- async baf.platforms.a2a.task_protocol.execute_task(task_id, router, task_storage=None, coroutine_func=None, params=None)[source]#
This is an internal method. It executes a task given its task_id. In the case of Orchestration tasks, a coroutine function can be provided that will be awaited with task parameters instead of the default method handler.