kosong
Kosong is an LLM abstraction layer designed for modern AI agent applications. It unifies message structures, asynchronous tool orchestration, and pluggable chat providers so you can build agents with ease and avoid vendor lock-in.
Key features:
kosong.generatecreates a completion stream and merges streamed message parts (including content and tool calls) from anyChatProviderinto a completeMessageplus optionalTokenUsage.kosong.steplayers tool dispatch (Tool,Toolset,SimpleToolset) overgenerate, exposingStepResultwith awaited tool outputs and streaming callbacks.- Message structures and tool abstractions live under
kosong.messageandkosong.tooling.
Example:
import asyncio
from pydantic import BaseModel
import kosong
from kosong import StepResult
from kosong.chat_provider.kimi import Kimi
from kosong.message import Message
from kosong.tooling import CallableTool2, ToolOk, ToolReturnType
from kosong.tooling.simple import SimpleToolset
class AddToolParams(BaseModel):
a: int
b: int
class AddTool(CallableTool2[AddToolParams]):
name: str = "add"
description: str = "Add two integers."
params: type[AddToolParams] = AddToolParams
async def __call__(self, params: AddToolParams) -> ToolReturnType:
return ToolOk(output=str(params.a + params.b))
async def main() -> None:
kimi = Kimi(
base_url="https://api.moonshot.ai/v1",
api_key="your_kimi_api_key_here",
model="kimi-k2-turbo-preview",
)
toolset = SimpleToolset()
toolset += AddTool()
history = [
Message(role="user", content="Please add 2 and 3 with the add tool."),
]
result: StepResult = await kosong.step(
chat_provider=kimi,
system_prompt="You are a precise math tutor.",
toolset=toolset,
history=history,
)
print(result.message)
print(await result.tool_results())
asyncio.run(main())
1""" 2Kosong is an LLM abstraction layer designed for modern AI agent applications. 3It unifies message structures, asynchronous tool orchestration, and pluggable chat providers so you 4can build agents with ease and avoid vendor lock-in. 5 6Key features: 7 8- `kosong.generate` creates a completion stream and merges streamed message parts (including 9 content and tool calls) from any `ChatProvider` into a complete `Message` plus optional 10 `TokenUsage`. 11- `kosong.step` layers tool dispatch (`Tool`, `Toolset`, `SimpleToolset`) over `generate`, 12 exposing `StepResult` with awaited tool outputs and streaming callbacks. 13- Message structures and tool abstractions live under `kosong.message` and `kosong.tooling`. 14 15Example: 16 17```python 18import asyncio 19 20from pydantic import BaseModel 21 22import kosong 23from kosong import StepResult 24from kosong.chat_provider.kimi import Kimi 25from kosong.message import Message 26from kosong.tooling import CallableTool2, ToolOk, ToolReturnType 27from kosong.tooling.simple import SimpleToolset 28 29 30class AddToolParams(BaseModel): 31 a: int 32 b: int 33 34 35class AddTool(CallableTool2[AddToolParams]): 36 name: str = "add" 37 description: str = "Add two integers." 38 params: type[AddToolParams] = AddToolParams 39 40 async def __call__(self, params: AddToolParams) -> ToolReturnType: 41 return ToolOk(output=str(params.a + params.b)) 42 43 44async def main() -> None: 45 kimi = Kimi( 46 base_url="https://api.moonshot.ai/v1", 47 api_key="your_kimi_api_key_here", 48 model="kimi-k2-turbo-preview", 49 ) 50 51 toolset = SimpleToolset() 52 toolset += AddTool() 53 54 history = [ 55 Message(role="user", content="Please add 2 and 3 with the add tool."), 56 ] 57 58 result: StepResult = await kosong.step( 59 chat_provider=kimi, 60 system_prompt="You are a precise math tutor.", 61 toolset=toolset, 62 history=history, 63 ) 64 print(result.message) 65 print(await result.tool_results()) 66 67 68asyncio.run(main()) 69``` 70""" 71 72import asyncio 73from collections.abc import Callable, Sequence 74from dataclasses import dataclass 75 76from loguru import logger 77 78from kosong._generate import GenerateResult, generate 79from kosong.chat_provider import ChatProvider, ChatProviderError, StreamedMessagePart, TokenUsage 80from kosong.message import Message, ToolCall 81from kosong.tooling import ToolResult, ToolResultFuture, Toolset 82from kosong.utils.aio import Callback 83 84# Explicitly import submodules 85from . import chat_provider, contrib, message, tooling, utils 86 87logger.disable("kosong") 88 89__all__ = [ 90 # submodules 91 "chat_provider", 92 "tooling", 93 "message", 94 "utils", 95 "contrib", 96 # classes and functions 97 "generate", 98 "GenerateResult", 99 "step", 100 "StepResult", 101] 102 103 104async def step( 105 chat_provider: ChatProvider, 106 system_prompt: str, 107 toolset: Toolset, 108 history: Sequence[Message], 109 *, 110 on_message_part: Callback[[StreamedMessagePart], None] | None = None, 111 on_tool_result: Callable[[ToolResult], None] | None = None, 112) -> "StepResult": 113 """ 114 Run one agent "step". In one step, the function generates LLM response based on the given 115 context for exactly one time. All new message parts will be streamed to `on_message_part` in 116 real-time if provided. Tool calls will be handled by `toolset`. The generated message will be 117 returned in a `StepResult`. Depending on the toolset implementation, the tool calls may be 118 handled asynchronously and the results need to be fetched with `await result.tool_results()`. 119 120 The message history will NOT be modified in this function. 121 122 The token usage will be returned in the `StepResult` if available. 123 124 Raises: 125 APIConnectionError: If the API connection fails. 126 APITimeoutError: If the API request times out. 127 APIStatusError: If the API returns a status code of 4xx or 5xx. 128 APIEmptyResponseError: If the API returns an empty response. 129 ChatProviderError: If any other recognized chat provider error occurs. 130 asyncio.CancelledError: If the step is cancelled. 131 """ 132 133 tool_calls: list[ToolCall] = [] 134 tool_result_futures: dict[str, ToolResultFuture] = {} 135 136 def future_done_callback(future: ToolResultFuture): 137 if on_tool_result: 138 try: 139 result = future.result() 140 on_tool_result(result) 141 except asyncio.CancelledError: 142 return 143 144 async def on_tool_call(tool_call: ToolCall): 145 tool_calls.append(tool_call) 146 result = toolset.handle(tool_call) 147 148 if isinstance(result, ToolResult): 149 future = ToolResultFuture() 150 future.add_done_callback(future_done_callback) 151 future.set_result(result) 152 tool_result_futures[tool_call.id] = future 153 else: 154 result.add_done_callback(future_done_callback) 155 tool_result_futures[tool_call.id] = result 156 157 try: 158 result = await generate( 159 chat_provider, 160 system_prompt, 161 toolset.tools, 162 history, 163 on_message_part=on_message_part, 164 on_tool_call=on_tool_call, 165 ) 166 except (ChatProviderError, asyncio.CancelledError): 167 # cancel all the futures to avoid hanging tasks 168 for future in tool_result_futures.values(): 169 future.remove_done_callback(future_done_callback) 170 future.cancel() 171 await asyncio.gather(*tool_result_futures.values(), return_exceptions=True) 172 raise 173 174 return StepResult( 175 result.id, 176 result.message, 177 result.usage, 178 tool_calls, 179 tool_result_futures, 180 ) 181 182 183@dataclass(frozen=True, slots=True) 184class StepResult: 185 id: str | None 186 """The ID of the generated message.""" 187 188 message: Message 189 """The message generated in this step.""" 190 191 usage: TokenUsage | None 192 """The token usage in this step.""" 193 194 tool_calls: list[ToolCall] 195 """All the tool calls generated in this step.""" 196 197 _tool_result_futures: dict[str, ToolResultFuture] 198 """@private The futures of the results of the spawned tool calls.""" 199 200 async def tool_results(self) -> list[ToolResult]: 201 """All the tool results returned by corresponding tool calls.""" 202 if not self._tool_result_futures: 203 return [] 204 205 try: 206 results: list[ToolResult] = [] 207 for tool_call in self.tool_calls: 208 future = self._tool_result_futures.pop(tool_call.id) 209 result = await future 210 results.append(result) 211 return results 212 finally: 213 # one exception should cancel all the futures to avoid hanging tasks 214 for future in self._tool_result_futures.values(): 215 future.cancel() 216 await asyncio.gather(*self._tool_result_futures.values(), return_exceptions=True)
18async def generate( 19 chat_provider: ChatProvider, 20 system_prompt: str, 21 tools: Sequence[Tool], 22 history: Sequence[Message], 23 *, 24 on_message_part: Callback[[StreamedMessagePart], None] | None = None, 25 on_tool_call: Callback[[ToolCall], None] | None = None, 26) -> "GenerateResult": 27 """ 28 Generate one message based on the given context. 29 Parts of the message will be streamed to the specified callbacks if provided. 30 31 Args: 32 chat_provider: The chat provider to use for generation. 33 system_prompt: The system prompt to use for generation. 34 tools: The tools available for the model to call. 35 history: The message history to use for generation. 36 on_message_part: An optional callback to be called for each raw message part. 37 on_tool_call: An optional callback to be called for each complete tool call. 38 39 Returns: 40 A tuple of the generated message and the token usage (if available). 41 All parts in the message are guaranteed to be complete and merged as much as possible. 42 43 Raises: 44 APIConnectionError: If the API connection fails. 45 APITimeoutError: If the API request times out. 46 APIStatusError: If the API returns a status code of 4xx or 5xx. 47 APIEmptyResponseError: If the API returns an empty response. 48 ChatProviderError: If any other recognized chat provider error occurs. 49 """ 50 message = Message(role="assistant", content=[]) 51 pending_part: StreamedMessagePart | None = None # message part that is currently incomplete 52 53 logger.trace("Generating with history: {history}", history=history) 54 stream = await chat_provider.generate(system_prompt, tools, history) 55 async for part in stream: 56 logger.trace("Received part: {part}", part=part) 57 if on_message_part: 58 await callback(on_message_part, part.model_copy(deep=True)) 59 60 if pending_part is None: 61 pending_part = part 62 elif not pending_part.merge_in_place(part): # try merge into the pending part 63 # unmergeable part must push the pending part to the buffer 64 _message_append(message, pending_part) 65 if isinstance(pending_part, ToolCall) and on_tool_call: 66 await callback(on_tool_call, pending_part) 67 pending_part = part 68 69 # end of message 70 if pending_part is not None: 71 _message_append(message, pending_part) 72 if isinstance(pending_part, ToolCall) and on_tool_call: 73 await callback(on_tool_call, pending_part) 74 75 if not message.content and not message.tool_calls: 76 raise APIEmptyResponseError("The API returned an empty response.") 77 78 return GenerateResult( 79 id=stream.id, 80 message=message, 81 usage=stream.usage, 82 )
Generate one message based on the given context. Parts of the message will be streamed to the specified callbacks if provided.
Arguments:
- chat_provider: The chat provider to use for generation.
- system_prompt: The system prompt to use for generation.
- tools: The tools available for the model to call.
- history: The message history to use for generation.
- on_message_part: An optional callback to be called for each raw message part.
- on_tool_call: An optional callback to be called for each complete tool call.
Returns:
A tuple of the generated message and the token usage (if available). All parts in the message are guaranteed to be complete and merged as much as possible.
Raises:
- APIConnectionError: If the API connection fails.
- APITimeoutError: If the API request times out.
- APIStatusError: If the API returns a status code of 4xx or 5xx.
- APIEmptyResponseError: If the API returns an empty response.
- ChatProviderError: If any other recognized chat provider error occurs.
85@dataclass(frozen=True, slots=True) 86class GenerateResult: 87 """The result of a generation.""" 88 89 id: str | None 90 """The ID of the generated message.""" 91 message: Message 92 """The generated message.""" 93 usage: TokenUsage | None 94 """The token usage of the generated message."""
The result of a generation.
105async def step( 106 chat_provider: ChatProvider, 107 system_prompt: str, 108 toolset: Toolset, 109 history: Sequence[Message], 110 *, 111 on_message_part: Callback[[StreamedMessagePart], None] | None = None, 112 on_tool_result: Callable[[ToolResult], None] | None = None, 113) -> "StepResult": 114 """ 115 Run one agent "step". In one step, the function generates LLM response based on the given 116 context for exactly one time. All new message parts will be streamed to `on_message_part` in 117 real-time if provided. Tool calls will be handled by `toolset`. The generated message will be 118 returned in a `StepResult`. Depending on the toolset implementation, the tool calls may be 119 handled asynchronously and the results need to be fetched with `await result.tool_results()`. 120 121 The message history will NOT be modified in this function. 122 123 The token usage will be returned in the `StepResult` if available. 124 125 Raises: 126 APIConnectionError: If the API connection fails. 127 APITimeoutError: If the API request times out. 128 APIStatusError: If the API returns a status code of 4xx or 5xx. 129 APIEmptyResponseError: If the API returns an empty response. 130 ChatProviderError: If any other recognized chat provider error occurs. 131 asyncio.CancelledError: If the step is cancelled. 132 """ 133 134 tool_calls: list[ToolCall] = [] 135 tool_result_futures: dict[str, ToolResultFuture] = {} 136 137 def future_done_callback(future: ToolResultFuture): 138 if on_tool_result: 139 try: 140 result = future.result() 141 on_tool_result(result) 142 except asyncio.CancelledError: 143 return 144 145 async def on_tool_call(tool_call: ToolCall): 146 tool_calls.append(tool_call) 147 result = toolset.handle(tool_call) 148 149 if isinstance(result, ToolResult): 150 future = ToolResultFuture() 151 future.add_done_callback(future_done_callback) 152 future.set_result(result) 153 tool_result_futures[tool_call.id] = future 154 else: 155 result.add_done_callback(future_done_callback) 156 tool_result_futures[tool_call.id] = result 157 158 try: 159 result = await generate( 160 chat_provider, 161 system_prompt, 162 toolset.tools, 163 history, 164 on_message_part=on_message_part, 165 on_tool_call=on_tool_call, 166 ) 167 except (ChatProviderError, asyncio.CancelledError): 168 # cancel all the futures to avoid hanging tasks 169 for future in tool_result_futures.values(): 170 future.remove_done_callback(future_done_callback) 171 future.cancel() 172 await asyncio.gather(*tool_result_futures.values(), return_exceptions=True) 173 raise 174 175 return StepResult( 176 result.id, 177 result.message, 178 result.usage, 179 tool_calls, 180 tool_result_futures, 181 )
Run one agent "step". In one step, the function generates LLM response based on the given
context for exactly one time. All new message parts will be streamed to on_message_part in
real-time if provided. Tool calls will be handled by toolset. The generated message will be
returned in a StepResult. Depending on the toolset implementation, the tool calls may be
handled asynchronously and the results need to be fetched with await result.tool_results().
The message history will NOT be modified in this function.
The token usage will be returned in the StepResult if available.
Raises:
- APIConnectionError: If the API connection fails.
- APITimeoutError: If the API request times out.
- APIStatusError: If the API returns a status code of 4xx or 5xx.
- APIEmptyResponseError: If the API returns an empty response.
- ChatProviderError: If any other recognized chat provider error occurs.
- asyncio.CancelledError: If the step is cancelled.
184@dataclass(frozen=True, slots=True) 185class StepResult: 186 id: str | None 187 """The ID of the generated message.""" 188 189 message: Message 190 """The message generated in this step.""" 191 192 usage: TokenUsage | None 193 """The token usage in this step.""" 194 195 tool_calls: list[ToolCall] 196 """All the tool calls generated in this step.""" 197 198 _tool_result_futures: dict[str, ToolResultFuture] 199 """@private The futures of the results of the spawned tool calls.""" 200 201 async def tool_results(self) -> list[ToolResult]: 202 """All the tool results returned by corresponding tool calls.""" 203 if not self._tool_result_futures: 204 return [] 205 206 try: 207 results: list[ToolResult] = [] 208 for tool_call in self.tool_calls: 209 future = self._tool_result_futures.pop(tool_call.id) 210 result = await future 211 results.append(result) 212 return results 213 finally: 214 # one exception should cancel all the futures to avoid hanging tasks 215 for future in self._tool_result_futures.values(): 216 future.cancel() 217 await asyncio.gather(*self._tool_result_futures.values(), return_exceptions=True)
201 async def tool_results(self) -> list[ToolResult]: 202 """All the tool results returned by corresponding tool calls.""" 203 if not self._tool_result_futures: 204 return [] 205 206 try: 207 results: list[ToolResult] = [] 208 for tool_call in self.tool_calls: 209 future = self._tool_result_futures.pop(tool_call.id) 210 result = await future 211 results.append(result) 212 return results 213 finally: 214 # one exception should cancel all the futures to avoid hanging tasks 215 for future in self._tool_result_futures.values(): 216 future.cancel() 217 await asyncio.gather(*self._tool_result_futures.values(), return_exceptions=True)
All the tool results returned by corresponding tool calls.