Async Programming
- Async programming is ensure we are not getting blocked.
- A function defined with the following syntax is referred as coroutine
async def .....():
.....
- A coroutine is function that can be paused or resumed
- Coroutines to execute require a scheduler. A scheduler runs on a single thread and manages tasks effeciently
- Await waits till coroutine is completed execution
- asyncio.gather function helps to run multiple coroutines
- Synchronous equivelent
import time
def get_data_from_a():
print("Getting data from a")
time.sleep(5)
print("Completed reciveing data from a")
def get_data_from_b():
print("Getting data from b")
time.sleep(3)
print("Completed reciveing data from b")
def main():
get_data_from_a()
get_data_from_b()
main()
- Asynchronous
import asyncio
async def get_data_from_a():
print("Getting data from a")
await asyncio.sleep(5)
print("Completed reciveing data from a")
async def get_data_from_b():
print("Getting data from b")
await asyncio.sleep(2)
print("Completed reciveing data from b")
async def main():
await asyncio.gather(get_data_from_a(), get_data_from_b())
# creates a scheduler (Event loop)
asyncio.run(main())
- Async supports tasks, tasks when created will start executing the coroutine. Refer Here
import asyncio
async def get_data_from_a():
print("Getting data from a")
await asyncio.sleep(5)
print("Completed reciveing data from a")
async def get_data_from_b():
print("Getting data from b")
await asyncio.sleep(2)
print("Completed reciveing data from b")
async def main():
task_a = asyncio.create_task(get_data_from_a())
task_b = asyncio.create_task(get_data_from_b())
await task_a
await task_b
asyncio.run(main())
-
For io operations we have external async libraries
-
Context Manager
import aiohttp
import asyncio
async def get_data_from_a():
print("Getting data from a")
await asyncio.sleep(5)
print("Completed reciveing data from a")
async def get_data_from_b():
print("Getting data from b")
await asyncio.sleep(2)
print("Completed reciveing data from b")
async def make_http_call():
print("executing make http call")
# with statement to get http connection object
async with aiohttp.ClientSession() as session:
async with session.get('http://python.org') as response:
print("Status:", response.status)
print("Content-type:", response.headers['content-type'])
html = await response.text()
print("Body:", html[:15], "...")
async def main():
task_http = asyncio.create_task(make_http_call())
task_a = asyncio.create_task(get_data_from_a())
task_b = asyncio.create_task(get_data_from_b())
await task_a
await task_b
await task_http
if __name__ == '__main__':
asyncio.run(main())
- Using the existing knowledge we should be able to understand mcp client
from mcp import ClientSession, StdioServerParameters
from mcp.client.streamable_http import streamablehttp_client
from langchain_mcp_adapters.tools import load_mcp_tools
import asyncio
from langchain_core.messages import HumanMessage
from langchain.chat_models import init_chat_model
from langchain_core.language_models import BaseChatModel
model_name = "gemini-2.5-flash"
model_provider = "google_vertexai"
async def get_llm(name:str, provider:str) -> BaseChatModel:
"""get the llm
Args:
name (str): name of the mode
provider (str): provider
Returns:
BaseChatModel: llm
"""
return init_chat_model(model=name, model_provider=provider)
async def get_mcp_tools_n_execute():
async with streamablehttp_client("http://localhost:9000/mcp/") as (read, write, _):
async with ClientSession(read, write) as session:
# Initialize the connection
await session.initialize()
tools = await load_mcp_tools(session)
gemini_llm = await get_llm(model_name, model_provider)
gemini_llm_with_mcp_tools = gemini_llm.bind_tools(tools)
# Ask LLM
response = await gemini_llm_with_mcp_tools.ainvoke(
[HumanMessage(
content=
"I have 10 bikes i bought one, How many bikes do i have ?")]
)
print(response)
if __name__ == "__main__":
asyncio.run(get_mcp_tools_n_execute())
- Refer Here for awesome-async
