task_protocol#

class baf.platforms.a2a.task_protocol.Task(method, params)[source]#

Bases: object

Task initialises each task submitted to the agent that is added to the queue to be executed

async notify_subscribers(message)[source]#

Push non-blocking message to all subscribers in queues (awaits put).

subscribe(q=None)[source]#

Return an asyncio.Queue that will receive updates for this task. Caller should read until cancelled/closed.

unsubscribe(q)[source]#

Remove tasks that do not need any monitoring.

class baf.platforms.a2a.task_protocol.TaskStatus(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: str, Enum

Constants for task status

DONE = 'DONE'#
ERROR = 'ERROR'#
PENDING = 'PENDING'#
RUNNING = 'RUNNING'#
_generate_next_value_(start, count, last_values)[source]#

Generate the next value when not given.

name: the name of the member start: the initial start value or None count: the number of existing members last_values: the list of values assigned

_member_map_ = {'DONE': TaskStatus.DONE, 'ERROR': TaskStatus.ERROR, 'PENDING': TaskStatus.PENDING, 'RUNNING': TaskStatus.RUNNING}#
_member_names_ = ['PENDING', 'RUNNING', 'DONE', 'ERROR']#
_member_type_#

alias of str

_new_member_(**kwargs)#

Create and return a new object. See help(type) for accurate signature.

_unhashable_values_ = []#
_use_args_ = True#
_value2member_map_ = {'DONE': TaskStatus.DONE, 'ERROR': TaskStatus.ERROR, 'PENDING': TaskStatus.PENDING, 'RUNNING': TaskStatus.RUNNING}#
_value_repr_()#

Return repr(self).

baf.platforms.a2a.task_protocol.create_task(method, params, task_storage=None)[source]#

This is an internal method. It creates a new task and adds it to the tasks dictionary.

async baf.platforms.a2a.task_protocol.execute_task(task_id, router, task_storage=None, coroutine_func=None, params=None)[source]#

This is an internal method. It executes a task given its task_id. In the case of Orchestration tasks, a coroutine function can be provided that will be awaited with task parameters instead of the default method handler.

baf.platforms.a2a.task_protocol.get_status(task_id, task_storage=None)[source]#

This is an internal method. It gets the status of a task given its task_id.

baf.platforms.a2a.task_protocol.list_all_tasks(task_storage=None)[source]#

Return status info for all tasks.