1# This script has multiple examples.
2# You can execute this script as such and in a terminal (no need to restart for each example, just once is enough)
3# You can enter the curl commands given under each example to see what A2A platform can do.
4# There are two ways to monitor the status of the tasks. First, use the corresponding curl commands given under each example or use the same curl command template with correct task_id and agent_id. Second, can be watched on a browser endpoint of the form agents/<agent_id>/tasks or can use the corresponding browser endpoints given under each example.
5# task_id can be found in the response message given by the server after submitting the jobs.
6# Parts of code other than examples are generally used for every agent or platform.
7
8# import sys
9# sys.path.append("your/path/to/BESSER-Agentic-Framework") # If you clone this repository, then add the location to BESSER-Agentic-Framework here.
10
11import asyncio
12from aiohttp import web
13
14from baf.core.agent import Agent
15from baf.exceptions.logger import logger
16from baf.platforms.a2a.agent_registry import AgentRegistry
17from baf.platforms.a2a.server import create_app
18
19# Create a registry of Agents
20registry = AgentRegistry()
21
22# Define each agent
23agent1 = Agent('TestAgent1')
24agent2 = Agent('TestAgent2')
25agent3 = Agent('TestAgent3')
26agent4 = Agent('TestAgent4')
27
28# Assign platform for each agent
29a2a_platform1 = agent1.use_a2a_platform()
30a2a_platform2 = agent2.use_a2a_platform()
31a2a_platform3 = agent3.use_a2a_platform()
32a2a_platform4 = agent4.use_a2a_platform()
33
34# Provide an ID for each platform (also called as agent id)
35registry.register('EchoAgent', a2a_platform1)
36registry.register('SummationAgent', a2a_platform2)
37registry.register('OrchAgent', a2a_platform3)
38registry.register('FinalSumAgent', a2a_platform4)
39
40# Following prints show how to get basic agent related details.
41# print(f"Total registered agents: {registry.count()}")
42# print(registry.get("EchoAgent")._agent.name)
43
44# User defined methods. Delays are added to mimic LLMs response time and to watch different status - PENDING, RUNNING, DONE and ERROR. Delay time can be increased if you want to do this slowly and observe what is happening.
45# ---------------------------------------------------------------
46async def echo(msg: str):
47 '''
48 A simple echo method that waits for 30 seconds before returning the input message.
49 '''
50 if not isinstance(msg, str):
51 raise ValueError("msg must be a string")
52
53 await asyncio.sleep(30)
54 return f"message is: {msg}"
55
56async def do_summation(num1: int, num2: int):
57 '''
58 A simple summation method that waits for 30 seconds before returning the sum of two numbers.
59 '''
60 if not isinstance(num1, int) or not isinstance(num2, int):
61 raise ValueError("Please enter integers only")
62
63 await asyncio.sleep(30)
64 return f"{num1+num2}"
65
66async def final_summation(mysum: int, num1: int):
67 '''
68 A simple summation method that waits for 20 seconds before returning the sum of two numbers.
69 '''
70 if not isinstance(mysum, int) or not isinstance(num1, int):
71 raise ValueError("numbers must be an integer")
72
73 await asyncio.sleep(20)
74 return f"{mysum+num1}"
75#------------------------------------------------------------------
76
77async def await_subtask_result(orchestration_task, subtask, poll_interval=0.1):
78 '''
79 This is an internal and private helper function to await a subtask's result within an orchestration task.
80 '''
81 while True:
82 for st in orchestration_task.result.get("subtasks", []):
83 if st["task_id"] == subtask["task_id"]:
84 if st["status"] in ["DONE", "ERROR"]:
85 return st.get("result")
86 break
87 await asyncio.sleep(poll_interval)
88
89# Give an ID for each user-defined method and register the methods on whichever platform that needs to access those methods.
90a2a_platform1.router.register("echo_message", echo)
91a2a_platform2.router.register("do_summation", do_summation)
92a2a_platform4.router.register("do_summation", do_summation)
93a2a_platform4.router.register("final_summation", final_summation)
94
95# add capabilities, descriptions and examples for each platform
96a2a_platform1.add_capabilities('Prints the entered message')
97a2a_platform1.add_descriptions(['Waits for 30 seconds and then provides the entered message'])
98
99a2a_platform1.populate_methods_from_router()
100a2a_platform1.add_examples([{'To execute "echo_message" method': 'curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"agent_id\":\"EchoAgent\", \"method\":\"create_task_and_run\",\"params\":{\"method\":\"echo_message\",\"params\":{\"msg\":\"Hello\"}},\"id\":1}"', 'To get status of the task with task_id': 'curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"agent_id\":\"EchoAgent\",\"method\":\"task_status\",\"params\":{\"task_id\":\"<task_id>\"},\"id\":2}"', 'To view the status of tasks in a browser': 'http://localhost:8000/agents/EchoAgent/tasks'}])
101
102a2a_platform2.add_capabilities('Prints summation of two numbers')
103a2a_platform2.add_descriptions(['Waits for 30 seconds and then provides the summation of two entered numbers'])
104a2a_platform2.populate_methods_from_router()
105a2a_platform2.add_examples([{'To execute "do_summation" method': 'curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"agent_id\":\"SummationAgent\", \"method\":\"create_task_and_run\",\"params\":{\"method\":\"do_summation\",\"params\":{\"int1\":2, \"int2\":4}},\"id\":1}"', 'To get status of the task with task_id using curl': 'curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"agent_id\":\"SummationAgent\",\"method\":\"task_status\",\"params\":{\"task_id\":\"<task_id>\"},\"id\":2}"', 'To view the status of tasks in a browser': 'http://localhost:8000/agents/SummationAgent/tasks'}])
106
107a2a_platform4.add_capabilities('Displays the summation result')
108a2a_platform4.add_descriptions(['Gets two numbers, waits for 20 seconds, adds two numbers, and prints the summation'])
109a2a_platform4.add_methods([{"name": "do_summation", "description": "Waits for 30 seconds and provides the summation of two numbers provided as input."},
110 {"name": "final_summation", "description": "Waits for 20 seconds and provides the summation of two numbers provided as input."}])
111a2a_platform4.add_examples([{'To get results from SummationAgent (with do_summation) and add it to another number within "final_summation" method': 'curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"id\":1,\"agent_id\":\"OrchAgent\",\"method\":\"orchestrate_tasks_tracked_seq\",\"params\":{\"msg\":\"Hello from orchestration\",\"num1\":3,\"num2\":12,\"num3\":12}}"', 'To get status of the task with task_id': 'curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"agent_id\":\"OrchAgent\",\"method\":\"task_status\",\"params\":{\"task_id\":\"<task_id>\"},\"id\":2}"', 'To view the status of tasks in a browser': 'http://localhost:8000/agents/OrchAgent/tasks'}])
112
113
114# Example 1: Independent
115#***********
116'''Just pass the curl command for a single agent in the terminal and follow its task status using curl command or in a browser as given in example of a2a_platform1 or a2a_platform2. Multiple agents can be executed in parallel (at the same time) as everything is asynchronous.
117Give task: curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"agent_id\":\"EchoAgent\", \"method\":\"create_task_and_run\",\"params\":{\"method\":\"echo_message\",\"params\":{\"msg\":\"Hello\"}},\"id\":1}"
118
119Get status (replace task_id): curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"agent_id\":\"SummationAgent\",\"method\":\"task_status\",\"params\":{\"task_id\":\"<task_id>\"},\"id\":2}"
120
121Browser: http://localhost:8000/agents/EchoAgent/tasks
122'''
123
124#------------------------------------------------------------------------------------------------------------------
125
126# Example 2: De-centralised
127#***********
128# Agent A invoking Agent B (A => B)
129# For agent-agent orchestration (agent calling another agent), register the orchestration methods in each agent's platform router.
130# This enables an agent (e.g., EchoAgent) to call another agent (e.g., SummationAgent).
131for agent_id, platform in registry._agents.items():
132 if hasattr(platform, "router"):
133 platform.router.register_orchestration_methods(platform, registry)
134
135'''
136Give task: curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"id\":1,\"agent_id\":\"EchoAgent\",\"method\":\"call_agent\",\"params\":{\"target_agent_id\":\"SummationAgent\",\"method\":\"do_summation\",\"params\":{\"num1\":3,\"num2\":4}}}"
137
138Get status: curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"agent_id\":\"SummationAgent\",\"method\":\"task_status\",\"params\":{\"task_id\":\"<task_id>\"},\"id\":2}"
139
140Browser: http://localhost:8000/agents/SummationAgent/tasks
141'''
142#------------------------------------------------------------------------------------------------------------------
143
144# Example 3: Parallel without task_id for OrchAgent
145#***********
146# Agent A and B are executed in parallel (A || B) by a third agent
147# Separate agent for orchestration (only orchestration, no task registration). Task status can be monitored in respective agent's endpoint
148async def orchestrate_echo_and_sum(platform, params, registry):
149 '''
150 Orchestrates EchoAgent and SummationAgent tasks.
151 params: dict containing {'msg': str, 'num1': int, 'num2': int}
152 '''
153 echo_task = await platform.rpc_call_agent(
154 "EchoAgent",
155 "echo_message",
156 {"msg": params["msg"]},
157 registry
158 )
159 sum_task = await platform.rpc_call_agent(
160 "SummationAgent",
161 "do_summation",
162 {"num1": params["num1"], "num2": params["num2"]},
163 registry
164 )
165 return {"echo_task": echo_task, "sum_task": sum_task}
166
167a2a_platform3.register_orchestration_task_on_resp_agent("orchestrate_tasks", orchestrate_echo_and_sum, registry)
168
169'''
170Give task: curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"id\":1,\"agent_id\":\"OrchAgent\",\"method\":\"orchestrate_tasks\",\"params\":{\"msg\":\"Hello\",\"num1\":3,\"num2\":5}}"
171
172Get status: For echo task: curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"agent_id\":\"EchoAgent\",\"method\":\"task_status\",\"params\":{\"task_id\":\"<task_id>\"},\"id\":2}"
173 For sum task: curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"agent_id\":\"SummationAgent\",\"method\":\"task_status\",\"params\":{\"task_id\":\"<task_id>\"},\"id\":3}"
174
175Browser: http://localhost:8000/agents/EchoAgent/tasks
176 http://localhost:8000/agents/SummationAgent/tasks
177'''
178
179#------------------------------------------------------------------------------------------------------------------
180
181# Example 4: Parallel with task_id for OrchAgent
182#***********
183# Agent A and B are executed in parallel (A || B)
184# Separate agent for orchestration (also has its own registered tasks)
185async def orchestrate_echo_and_sum_tracked(platform, params, registry, tracked_call, orchestration_task):
186 '''
187 Orchestrates EchoAgent and SummationAgent tasks.
188 params: dict containing {'msg': str, 'num1': int, 'num2': int}
189 '''
190 await tracked_call(
191 "EchoAgent",
192 "echo_message",
193 {"msg": params["msg"]},
194 registry
195 )
196 await tracked_call(
197 "SummationAgent",
198 "do_summation",
199 {"num1": params["num1"], "num2": params["num2"]},
200 registry
201 )
202
203 # Enable the following lines if the following behaviour is wanted:
204 # Under the orchestration result, displays each agent's results (mostly duplicate of what is found in subtasks).
205 # orchestration_result = {}
206 # for st in orchestration_task.result.get("subtasks", []):
207 # if st["agent_id"] == "EchoAgent":
208 # orchestration_result["echo_task"] = st
209 # elif st["agent_id"] == "SummationAgent":
210 # orchestration_result["sum_task"] = st
211 # orchestration_result["pipeline"] = "Echo and Summation in parallel."
212
213 # return orchestration_result
214 return {}
215
216a2a_platform3.register_orchestration_as_task("orchestrate_tasks_tracked", orchestrate_echo_and_sum_tracked, registry)
217
218'''
219Give task: curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"id\":1,\"agent_id\":\"OrchAgent\",\"method\":\"orchestrate_tasks_tracked\",\"params\":{\"msg\":\"Hello\",\"num1\":3,\"num2\":5}}"
220
221Get status: curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"agent_id\":\"OrchAgent\",\"method\":\"task_status\",\"params\":{\"task_id\":\"<task_id>\"},\"id\":2}"
222
223Browser: http://localhost:8000/agents/OrchAgent/tasks
224'''
225
226#------------------------------------------------------------------------------------------------------------------
227
228# Example 5: Hybrid
229# Agent A || B -> C
230# Separate agent for orchestration (also has its own registered tasks)
231async def orchestrate_echo_sum_display_seq_tracked(platform, params, registry, tracked_call, orchestration_task):
232 '''
233 Orchestrates EchoAgent and SummationAgent tasks.
234 params: dict containing {'msg': str, 'num1': int, 'num2': int}
235 '''
236 await tracked_call(
237 "EchoAgent",
238 "echo_message",
239 {"msg": params["msg"]},
240 registry
241 )
242 sum_task = await tracked_call(
243 "SummationAgent",
244 "do_summation",
245 {"num1": params["num1"], "num2": params["num2"]},
246 registry
247 )
248 sum_result = await await_subtask_result(orchestration_task, sum_task, poll_interval=0.2)
249
250 await tracked_call(
251 "FinalSumAgent",
252 "final_summation",
253 {"mysum": int(sum_result), "num1": params["num3"]},
254 registry
255 )
256 # orchestration_result = {}
257 # for st in orchestration_task.result.get("subtasks", []):
258 # if st["agent_id"] == "EchoAgent":
259 # orchestration_result["echo_task"] = st
260 # elif st["agent_id"] == "SummationAgent":
261 # orchestration_result["sum_task"] = st
262 # orchestration_result["pipeline"] = "Echo and Summation in parallel."
263
264 # return orchestration_result
265 return {}
266
267a2a_platform3.register_orchestration_as_task("orchestrate_tasks_tracked_seq", orchestrate_echo_sum_display_seq_tracked, registry)
268
269'''
270Give task: curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"id\":1,\"agent_id\":\"OrchAgent\",\"method\":\"orchestrate_tasks_tracked_seq\",\"params\":{\"msg\":\"Hello from orchestration\",\"num1\":3,\"num2\":12,\"num3\":10}}"
271
272Get status: curl -X POST http://localhost:8000/a2a -H "Content-Type: application/json" -d "{\"jsonrpc\":\"2.0\",\"agent_id\":\"OrchAgent\",\"method\":\"task_status\",\"params\":{\"task_id\":\"<task_id>\"},\"id\":2}"
273
274Browser: http://localhost:8000/agents/OrchAgent/tasks
275'''
276
277#------------------------------------------------------------------------------------------------------------------
278
279# Run the platform with registry containing registered agents.
280# app = create_app(registry=registry)
281# web.run_app(app, port=8000)
282
283async def _shutdown(app: web.Application):
284 for task in asyncio.all_tasks():
285 if task is not asyncio.current_task():
286 task.cancel()
287 await app.shutdown()
288 await app.cleanup()
289
290async def _main():
291 app = create_app(registry=registry)
292 runner = web.AppRunner(app)
293 await runner.setup()
294 site = web.TCPSite(runner, "0.0.0.0", 8000)
295 await site.start()
296
297 try:
298 await asyncio.Event().wait()
299 except (KeyboardInterrupt, asyncio.CancelledError):
300 logger.info("Server shutdown requested (Ctrl+C pressed).")
301 await _shutdown(app)
302 finally:
303 await runner.cleanup()
304
305if __name__ == "__main__":
306 logger.info("Press (Ctrl+C) to stop the server.")
307 asyncio.run(_main())