A2A multiagent execution examples#

  1# This script has multiple examples. 
  2# You can execute this script as such and in a terminal (no need to restart for each example, just once is enough)
  3# You can enter the curl commands given under each example to see what A2A platform can do.
  4# There are two ways to monitor the status of the tasks. First, use the corresponding curl commands given under each example or use the same curl command template with correct task_id and agent_id. Second, can be watched on a browser endpoint of the form agents/<agent_id>/tasks or can use the corresponding browser endpoints given under each example.
  5# task_id can be found in the response message given by the server after submitting the jobs. 
  6# Parts of code other than examples are generally used for every agent or platform.
  7
  8# import sys
  9# sys.path.append("your/path/to/BESSER-Agentic-Framework") # If you clone this repository, then add the location to BESSER-Agentic-Framework here.
 10
 11import asyncio
 12from aiohttp import web
 13
 14from baf.core.agent import Agent
 15from baf.exceptions.logger import logger
 16from baf.platforms.a2a.agent_registry import AgentRegistry
 17from baf.platforms.a2a.server import create_app
 18
 19# Create a registry of Agents
 20registry = AgentRegistry()
 21
 22# Define each agent
 23agent1 = Agent('TestAgent1')
 24agent2 = Agent('TestAgent2')
 25agent3 = Agent('TestAgent3')
 26agent4 = Agent('TestAgent4')
 27
 28# Assign platform for each agent
 29a2a_platform1 = agent1.use_a2a_platform()
 30a2a_platform2 = agent2.use_a2a_platform()
 31a2a_platform3 = agent3.use_a2a_platform()
 32a2a_platform4 = agent4.use_a2a_platform()
 33
 34# Provide an ID for each platform (also called as agent id)
 35registry.register('EchoAgent', a2a_platform1)
 36registry.register('SummationAgent', a2a_platform2)
 37registry.register('OrchAgent', a2a_platform3)
 38registry.register('FinalSumAgent', a2a_platform4)
 39
 40# Following prints show how to get basic agent related details.
 41# print(f"Total registered agents: {registry.count()}")
 42# print(registry.get("EchoAgent")._agent.name)
 43
 44# User defined methods. Delays are added to mimic LLMs response time and to watch different status - PENDING, RUNNING, DONE and ERROR. Delay time can be increased if you want to do this slowly and observe what is happening.
 45# ---------------------------------------------------------------
 46async def echo(msg: str):
 47    '''
 48    A simple echo method that waits for 30 seconds before returning the input message.
 49    '''
 50    if not isinstance(msg, str):
 51        raise ValueError("msg must be a string")
 52    
 53    await asyncio.sleep(30)
 54    return f"message is: {msg}"
 55
 56async def do_summation(num1: int, num2: int):
 57    '''
 58    A simple summation method that waits for 30 seconds before returning the sum of two numbers.
 59    '''
 60    if not isinstance(num1, int) or not isinstance(num2, int):
 61        raise ValueError("Please enter integers only")
 62    
 63    await asyncio.sleep(30)
 64    return f"{num1+num2}"
 65
 66async def final_summation(mysum: int, num1: int):
 67    '''
 68    A simple summation method that waits for 20 seconds before returning the sum of two numbers.
 69    '''
 70    if not isinstance(mysum, int) or not isinstance(num1, int):
 71        raise ValueError("numbers must be an integer")
 72    
 73    await asyncio.sleep(20)
 74    return f"{mysum+num1}"
 75#------------------------------------------------------------------
 76
 77async def await_subtask_result(orchestration_task, subtask, poll_interval=0.1):
 78    '''
 79    This is an internal and private helper function to await a subtask's result within an orchestration task.
 80    '''
 81    while True:
 82        for st in orchestration_task.result.get("subtasks", []):
 83            if st["task_id"] == subtask["task_id"]:
 84                if st["status"] in ["DONE", "ERROR"]:
 85                    return st.get("result")
 86                break
 87        await asyncio.sleep(poll_interval)
 88
 89# Give an ID for each user-defined method and register the methods on whichever platform that needs to access those methods. 
 90a2a_platform1.router.register("echo_message", echo)
 91a2a_platform2.router.register("do_summation", do_summation)
 92a2a_platform4.router.register("do_summation", do_summation)
 93a2a_platform4.router.register("final_summation", final_summation)
 94
 95# add capabilities, descriptions and examples for each platform
 96a2a_platform1.add_capabilities('Prints the entered message')
 97a2a_platform1.add_descriptions(['Waits for 30 seconds and then provides the entered message'])
 98
 99a2a_platform1.populate_methods_from_router()
100a2a_platform1.add_examples([{'To execute "echo_message" method': 'curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"agent_id\":\"EchoAgent\", \"method\":\"create_task_and_run\",\"params\":{\"method\":\"echo_message\",\"params\":{\"msg\":\"Hello\"}},\"id\":1}"', 'To get status of the task with task_id': 'curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"agent_id\":\"EchoAgent\",\"method\":\"task_status\",\"params\":{\"task_id\":\"<task_id>\"},\"id\":2}"', 'To view the status of tasks in a browser': 'http://localhost:8000/agents/EchoAgent/tasks'}])
101
102a2a_platform2.add_capabilities('Prints summation of two numbers')
103a2a_platform2.add_descriptions(['Waits for 30 seconds and then provides the summation of two entered numbers'])
104a2a_platform2.populate_methods_from_router()
105a2a_platform2.add_examples([{'To execute "do_summation" method': 'curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"agent_id\":\"SummationAgent\", \"method\":\"create_task_and_run\",\"params\":{\"method\":\"do_summation\",\"params\":{\"int1\":2, \"int2\":4}},\"id\":1}"', 'To get status of the task with task_id using curl': 'curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"agent_id\":\"SummationAgent\",\"method\":\"task_status\",\"params\":{\"task_id\":\"<task_id>\"},\"id\":2}"', 'To view the status of tasks in a browser': 'http://localhost:8000/agents/SummationAgent/tasks'}])
106
107a2a_platform4.add_capabilities('Displays the summation result')
108a2a_platform4.add_descriptions(['Gets two numbers, waits for 20 seconds, adds two numbers, and prints the summation'])
109a2a_platform4.add_methods([{"name": "do_summation", "description": "Waits for 30 seconds and provides the summation of two numbers provided as input."}, 
110                           {"name": "final_summation", "description": "Waits for 20 seconds and provides the summation of two numbers provided as input."}])
111a2a_platform4.add_examples([{'To get results from SummationAgent (with do_summation) and add it to another number within "final_summation" method': 'curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"id\":1,\"agent_id\":\"OrchAgent\",\"method\":\"orchestrate_tasks_tracked_seq\",\"params\":{\"msg\":\"Hello from orchestration\",\"num1\":3,\"num2\":12,\"num3\":12}}"', 'To get status of the task with task_id': 'curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"agent_id\":\"OrchAgent\",\"method\":\"task_status\",\"params\":{\"task_id\":\"<task_id>\"},\"id\":2}"', 'To view the status of tasks in a browser': 'http://localhost:8000/agents/OrchAgent/tasks'}])
112
113
114# Example 1: Independent
115#***********
116'''Just pass the curl command for a single agent in the terminal and follow its task status using curl command or in a browser as given in example of a2a_platform1 or a2a_platform2. Multiple agents can be executed in parallel (at the same time) as everything is asynchronous.
117Give task: curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"agent_id\":\"EchoAgent\", \"method\":\"create_task_and_run\",\"params\":{\"method\":\"echo_message\",\"params\":{\"msg\":\"Hello\"}},\"id\":1}"
118
119Get status (replace task_id): curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"agent_id\":\"SummationAgent\",\"method\":\"task_status\",\"params\":{\"task_id\":\"<task_id>\"},\"id\":2}"
120
121Browser: http://localhost:8000/agents/EchoAgent/tasks
122'''
123
124#------------------------------------------------------------------------------------------------------------------
125
126# Example 2: De-centralised
127#***********
128# Agent A invoking Agent B (A => B)
129# For agent-agent orchestration (agent calling another agent), register the orchestration methods in each agent's platform router. 
130# This enables an agent (e.g., EchoAgent) to call another agent (e.g., SummationAgent).
131for agent_id, platform in registry._agents.items():
132    if hasattr(platform, "router"):
133        platform.router.register_orchestration_methods(platform, registry)
134
135'''
136Give task: curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"id\":1,\"agent_id\":\"EchoAgent\",\"method\":\"call_agent\",\"params\":{\"target_agent_id\":\"SummationAgent\",\"method\":\"do_summation\",\"params\":{\"num1\":3,\"num2\":4}}}"
137
138Get status: curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"agent_id\":\"SummationAgent\",\"method\":\"task_status\",\"params\":{\"task_id\":\"<task_id>\"},\"id\":2}"
139
140Browser: http://localhost:8000/agents/SummationAgent/tasks
141'''
142#------------------------------------------------------------------------------------------------------------------
143
144# Example 3: Parallel without task_id for OrchAgent
145#***********
146# Agent A and B are executed in parallel (A || B) by a third agent
147# Separate agent for orchestration (only orchestration, no task registration). Task status can be monitored in respective agent's endpoint
148async def orchestrate_echo_and_sum(platform, params, registry):
149    '''
150    Orchestrates EchoAgent and SummationAgent tasks.
151    params: dict containing {'msg': str, 'num1': int, 'num2': int}
152    '''
153    echo_task = await platform.rpc_call_agent(
154        "EchoAgent", 
155        "echo_message", 
156        {"msg": params["msg"]}, 
157        registry
158    )
159    sum_task = await platform.rpc_call_agent(
160        "SummationAgent", 
161        "do_summation", 
162        {"num1": params["num1"], "num2": params["num2"]}, 
163        registry
164    )
165    return {"echo_task": echo_task, "sum_task": sum_task}
166
167a2a_platform3.register_orchestration_task_on_resp_agent("orchestrate_tasks", orchestrate_echo_and_sum, registry)
168
169'''
170Give task: curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"id\":1,\"agent_id\":\"OrchAgent\",\"method\":\"orchestrate_tasks\",\"params\":{\"msg\":\"Hello\",\"num1\":3,\"num2\":5}}"
171
172Get status: For echo task: curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"agent_id\":\"EchoAgent\",\"method\":\"task_status\",\"params\":{\"task_id\":\"<task_id>\"},\"id\":2}" 
173            For sum task: curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"agent_id\":\"SummationAgent\",\"method\":\"task_status\",\"params\":{\"task_id\":\"<task_id>\"},\"id\":3}"
174
175Browser: http://localhost:8000/agents/EchoAgent/tasks
176         http://localhost:8000/agents/SummationAgent/tasks
177'''
178
179#------------------------------------------------------------------------------------------------------------------
180
181# Example 4: Parallel with task_id for OrchAgent
182#***********
183# Agent A and B are executed in parallel (A || B)
184# Separate agent for orchestration (also has its own registered tasks)
185async def orchestrate_echo_and_sum_tracked(platform, params, registry, tracked_call, orchestration_task):
186    '''
187    Orchestrates EchoAgent and SummationAgent tasks.
188    params: dict containing {'msg': str, 'num1': int, 'num2': int}
189    '''
190    await tracked_call(
191        "EchoAgent", 
192        "echo_message", 
193        {"msg": params["msg"]}, 
194        registry
195    )
196    await tracked_call(
197        "SummationAgent", 
198        "do_summation", 
199        {"num1": params["num1"], "num2": params["num2"]}, 
200        registry
201    )
202
203    # Enable the following lines if the following behaviour is wanted: 
204    # Under the orchestration result, displays each agent's results (mostly duplicate of what is found in subtasks).
205    # orchestration_result = {}
206    # for st in orchestration_task.result.get("subtasks", []):
207    #     if st["agent_id"] == "EchoAgent":
208    #         orchestration_result["echo_task"] = st
209    #     elif st["agent_id"] == "SummationAgent":
210    #         orchestration_result["sum_task"] = st
211    # orchestration_result["pipeline"] = "Echo and Summation in parallel."
212
213    # return orchestration_result
214    return {}
215
216a2a_platform3.register_orchestration_as_task("orchestrate_tasks_tracked", orchestrate_echo_and_sum_tracked, registry)
217
218'''
219Give task: curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"id\":1,\"agent_id\":\"OrchAgent\",\"method\":\"orchestrate_tasks_tracked\",\"params\":{\"msg\":\"Hello\",\"num1\":3,\"num2\":5}}"
220
221Get status: curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"agent_id\":\"OrchAgent\",\"method\":\"task_status\",\"params\":{\"task_id\":\"<task_id>\"},\"id\":2}"
222
223Browser: http://localhost:8000/agents/OrchAgent/tasks
224'''
225
226#------------------------------------------------------------------------------------------------------------------
227
228# Example 5: Hybrid
229# Agent A || B -> C
230# Separate agent for orchestration (also has its own registered tasks)
231async def orchestrate_echo_sum_display_seq_tracked(platform, params, registry, tracked_call, orchestration_task):
232    '''
233    Orchestrates EchoAgent and SummationAgent tasks.
234    params: dict containing {'msg': str, 'num1': int, 'num2': int}
235    '''
236    await tracked_call(
237        "EchoAgent", 
238        "echo_message", 
239        {"msg": params["msg"]}, 
240        registry
241    )
242    sum_task = await tracked_call(
243        "SummationAgent", 
244        "do_summation", 
245        {"num1": params["num1"], "num2": params["num2"]}, 
246        registry
247    )
248    sum_result = await await_subtask_result(orchestration_task, sum_task, poll_interval=0.2)
249    
250    await tracked_call(
251        "FinalSumAgent", 
252        "final_summation", 
253        {"mysum": int(sum_result), "num1": params["num3"]},
254        registry
255    )
256    # orchestration_result = {}
257    # for st in orchestration_task.result.get("subtasks", []):
258    #     if st["agent_id"] == "EchoAgent":
259    #         orchestration_result["echo_task"] = st
260    #     elif st["agent_id"] == "SummationAgent":
261    #         orchestration_result["sum_task"] = st
262    # orchestration_result["pipeline"] = "Echo and Summation in parallel."
263
264    # return orchestration_result
265    return {}
266
267a2a_platform3.register_orchestration_as_task("orchestrate_tasks_tracked_seq", orchestrate_echo_sum_display_seq_tracked, registry)
268
269'''
270Give task: curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"id\":1,\"agent_id\":\"OrchAgent\",\"method\":\"orchestrate_tasks_tracked_seq\",\"params\":{\"msg\":\"Hello from orchestration\",\"num1\":3,\"num2\":12,\"num3\":10}}"
271
272Get status: curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"agent_id\":\"OrchAgent\",\"method\":\"task_status\",\"params\":{\"task_id\":\"<task_id>\"},\"id\":2}"
273
274Browser: http://localhost:8000/agents/OrchAgent/tasks
275'''
276
277#------------------------------------------------------------------------------------------------------------------
278
279# Run the platform with registry containing registered agents.
280# app = create_app(registry=registry)
281# web.run_app(app, port=8000)
282
283async def _shutdown(app: web.Application):
284    for task in asyncio.all_tasks():
285        if task is not asyncio.current_task():
286            task.cancel()
287    await app.shutdown()
288    await app.cleanup()
289
290async def _main():
291    app = create_app(registry=registry)
292    runner = web.AppRunner(app)
293    await runner.setup()
294    site = web.TCPSite(runner, "0.0.0.0", 8000)
295    await site.start()
296
297    try:
298        await asyncio.Event().wait()
299    except (KeyboardInterrupt, asyncio.CancelledError):
300        logger.info("Server shutdown requested (Ctrl+C pressed).")
301        await _shutdown(app)
302    finally:
303        await runner.cleanup()
304
305if __name__ == "__main__":
306    logger.info("Press (Ctrl+C) to stop the server.")
307    asyncio.run(_main())