kosong

Kosong is an LLM abstraction layer designed for modern AI agent applications. It unifies message structures, asynchronous tool orchestration, and pluggable chat providers so you can build agents with ease and avoid vendor lock-in.

Key features:

  • kosong.generate creates a completion stream and merges streamed message parts (including content and tool calls) from any ChatProvider into a complete Message plus optional TokenUsage.
  • kosong.step layers tool dispatch (Tool, Toolset, SimpleToolset) over generate, exposing StepResult with awaited tool outputs and streaming callbacks.
  • Message structures and tool abstractions live under kosong.message and kosong.tooling.

Example:

import asyncio

from pydantic import BaseModel

import kosong
from kosong import StepResult
from kosong.chat_provider.kimi import Kimi
from kosong.message import Message
from kosong.tooling import CallableTool2, ToolOk, ToolReturnType
from kosong.tooling.simple import SimpleToolset


class AddToolParams(BaseModel):
    a: int
    b: int


class AddTool(CallableTool2[AddToolParams]):
    name: str = "add"
    description: str = "Add two integers."
    params: type[AddToolParams] = AddToolParams

    async def __call__(self, params: AddToolParams) -> ToolReturnType:
        return ToolOk(output=str(params.a + params.b))


async def main() -> None:
    kimi = Kimi(
        base_url="https://api.moonshot.ai/v1",
        api_key="your_kimi_api_key_here",
        model="kimi-k2-turbo-preview",
    )

    toolset = SimpleToolset()
    toolset += AddTool()

    history = [
        Message(role="user", content="Please add 2 and 3 with the add tool."),
    ]

    result: StepResult = await kosong.step(
        chat_provider=kimi,
        system_prompt="You are a precise math tutor.",
        toolset=toolset,
        history=history,
    )
    print(result.message)
    print(await result.tool_results())


asyncio.run(main())
  1"""
  2Kosong is an LLM abstraction layer designed for modern AI agent applications.
  3It unifies message structures, asynchronous tool orchestration, and pluggable chat providers so you
  4can build agents with ease and avoid vendor lock-in.
  5
  6Key features:
  7
  8- `kosong.generate` creates a completion stream and merges streamed message parts (including
  9  content and tool calls) from any `ChatProvider` into a complete `Message` plus optional
 10  `TokenUsage`.
 11- `kosong.step` layers tool dispatch (`Tool`, `Toolset`, `SimpleToolset`) over `generate`,
 12  exposing `StepResult` with awaited tool outputs and streaming callbacks.
 13- Message structures and tool abstractions live under `kosong.message` and `kosong.tooling`.
 14
 15Example:
 16
 17```python
 18import asyncio
 19
 20from pydantic import BaseModel
 21
 22import kosong
 23from kosong import StepResult
 24from kosong.chat_provider.kimi import Kimi
 25from kosong.message import Message
 26from kosong.tooling import CallableTool2, ToolOk, ToolReturnType
 27from kosong.tooling.simple import SimpleToolset
 28
 29
 30class AddToolParams(BaseModel):
 31    a: int
 32    b: int
 33
 34
 35class AddTool(CallableTool2[AddToolParams]):
 36    name: str = "add"
 37    description: str = "Add two integers."
 38    params: type[AddToolParams] = AddToolParams
 39
 40    async def __call__(self, params: AddToolParams) -> ToolReturnType:
 41        return ToolOk(output=str(params.a + params.b))
 42
 43
 44async def main() -> None:
 45    kimi = Kimi(
 46        base_url="https://api.moonshot.ai/v1",
 47        api_key="your_kimi_api_key_here",
 48        model="kimi-k2-turbo-preview",
 49    )
 50
 51    toolset = SimpleToolset()
 52    toolset += AddTool()
 53
 54    history = [
 55        Message(role="user", content="Please add 2 and 3 with the add tool."),
 56    ]
 57
 58    result: StepResult = await kosong.step(
 59        chat_provider=kimi,
 60        system_prompt="You are a precise math tutor.",
 61        toolset=toolset,
 62        history=history,
 63    )
 64    print(result.message)
 65    print(await result.tool_results())
 66
 67
 68asyncio.run(main())
 69```
 70"""
 71
 72import asyncio
 73from collections.abc import Callable, Sequence
 74from dataclasses import dataclass
 75
 76from loguru import logger
 77
 78from kosong._generate import GenerateResult, generate
 79from kosong.chat_provider import ChatProvider, ChatProviderError, StreamedMessagePart, TokenUsage
 80from kosong.message import Message, ToolCall
 81from kosong.tooling import ToolResult, ToolResultFuture, Toolset
 82from kosong.utils.aio import Callback
 83
 84# Explicitly import submodules
 85from . import chat_provider, contrib, message, tooling, utils
 86
 87logger.disable("kosong")
 88
 89__all__ = [
 90    # submodules
 91    "chat_provider",
 92    "tooling",
 93    "message",
 94    "utils",
 95    "contrib",
 96    # classes and functions
 97    "generate",
 98    "GenerateResult",
 99    "step",
100    "StepResult",
101]
102
103
104async def step(
105    chat_provider: ChatProvider,
106    system_prompt: str,
107    toolset: Toolset,
108    history: Sequence[Message],
109    *,
110    on_message_part: Callback[[StreamedMessagePart], None] | None = None,
111    on_tool_result: Callable[[ToolResult], None] | None = None,
112) -> "StepResult":
113    """
114    Run one agent "step". In one step, the function generates LLM response based on the given
115    context for exactly one time. All new message parts will be streamed to `on_message_part` in
116    real-time if provided. Tool calls will be handled by `toolset`. The generated message will be
117    returned in a `StepResult`. Depending on the toolset implementation, the tool calls may be
118    handled asynchronously and the results need to be fetched with `await result.tool_results()`.
119
120    The message history will NOT be modified in this function.
121
122    The token usage will be returned in the `StepResult` if available.
123
124    Raises:
125        APIConnectionError: If the API connection fails.
126        APITimeoutError: If the API request times out.
127        APIStatusError: If the API returns a status code of 4xx or 5xx.
128        APIEmptyResponseError: If the API returns an empty response.
129        ChatProviderError: If any other recognized chat provider error occurs.
130        asyncio.CancelledError: If the step is cancelled.
131    """
132
133    tool_calls: list[ToolCall] = []
134    tool_result_futures: dict[str, ToolResultFuture] = {}
135
136    def future_done_callback(future: ToolResultFuture):
137        if on_tool_result:
138            try:
139                result = future.result()
140                on_tool_result(result)
141            except asyncio.CancelledError:
142                return
143
144    async def on_tool_call(tool_call: ToolCall):
145        tool_calls.append(tool_call)
146        result = toolset.handle(tool_call)
147
148        if isinstance(result, ToolResult):
149            future = ToolResultFuture()
150            future.add_done_callback(future_done_callback)
151            future.set_result(result)
152            tool_result_futures[tool_call.id] = future
153        else:
154            result.add_done_callback(future_done_callback)
155            tool_result_futures[tool_call.id] = result
156
157    try:
158        result = await generate(
159            chat_provider,
160            system_prompt,
161            toolset.tools,
162            history,
163            on_message_part=on_message_part,
164            on_tool_call=on_tool_call,
165        )
166    except (ChatProviderError, asyncio.CancelledError):
167        # cancel all the futures to avoid hanging tasks
168        for future in tool_result_futures.values():
169            future.remove_done_callback(future_done_callback)
170            future.cancel()
171        await asyncio.gather(*tool_result_futures.values(), return_exceptions=True)
172        raise
173
174    return StepResult(
175        result.id,
176        result.message,
177        result.usage,
178        tool_calls,
179        tool_result_futures,
180    )
181
182
183@dataclass(frozen=True, slots=True)
184class StepResult:
185    id: str | None
186    """The ID of the generated message."""
187
188    message: Message
189    """The message generated in this step."""
190
191    usage: TokenUsage | None
192    """The token usage in this step."""
193
194    tool_calls: list[ToolCall]
195    """All the tool calls generated in this step."""
196
197    _tool_result_futures: dict[str, ToolResultFuture]
198    """@private The futures of the results of the spawned tool calls."""
199
200    async def tool_results(self) -> list[ToolResult]:
201        """All the tool results returned by corresponding tool calls."""
202        if not self._tool_result_futures:
203            return []
204
205        try:
206            results: list[ToolResult] = []
207            for tool_call in self.tool_calls:
208                future = self._tool_result_futures.pop(tool_call.id)
209                result = await future
210                results.append(result)
211            return results
212        finally:
213            # one exception should cancel all the futures to avoid hanging tasks
214            for future in self._tool_result_futures.values():
215                future.cancel()
216            await asyncio.gather(*self._tool_result_futures.values(), return_exceptions=True)
async def generate( chat_provider: kosong.chat_provider.ChatProvider, system_prompt: str, tools: Sequence[kosong.tooling.Tool], history: Sequence[kosong.message.Message], *, on_message_part: Callback[[StreamedMessagePart], None] | None = None, on_tool_call: Callback[[kosong.message.ToolCall], None] | None = None) -> GenerateResult:
18async def generate(
19    chat_provider: ChatProvider,
20    system_prompt: str,
21    tools: Sequence[Tool],
22    history: Sequence[Message],
23    *,
24    on_message_part: Callback[[StreamedMessagePart], None] | None = None,
25    on_tool_call: Callback[[ToolCall], None] | None = None,
26) -> "GenerateResult":
27    """
28    Generate one message based on the given context.
29    Parts of the message will be streamed to the specified callbacks if provided.
30
31    Args:
32        chat_provider: The chat provider to use for generation.
33        system_prompt: The system prompt to use for generation.
34        tools: The tools available for the model to call.
35        history: The message history to use for generation.
36        on_message_part: An optional callback to be called for each raw message part.
37        on_tool_call: An optional callback to be called for each complete tool call.
38
39    Returns:
40        A tuple of the generated message and the token usage (if available).
41        All parts in the message are guaranteed to be complete and merged as much as possible.
42
43    Raises:
44        APIConnectionError: If the API connection fails.
45        APITimeoutError: If the API request times out.
46        APIStatusError: If the API returns a status code of 4xx or 5xx.
47        APIEmptyResponseError: If the API returns an empty response.
48        ChatProviderError: If any other recognized chat provider error occurs.
49    """
50    message = Message(role="assistant", content=[])
51    pending_part: StreamedMessagePart | None = None  # message part that is currently incomplete
52
53    logger.trace("Generating with history: {history}", history=history)
54    stream = await chat_provider.generate(system_prompt, tools, history)
55    async for part in stream:
56        logger.trace("Received part: {part}", part=part)
57        if on_message_part:
58            await callback(on_message_part, part.model_copy(deep=True))
59
60        if pending_part is None:
61            pending_part = part
62        elif not pending_part.merge_in_place(part):  # try merge into the pending part
63            # unmergeable part must push the pending part to the buffer
64            _message_append(message, pending_part)
65            if isinstance(pending_part, ToolCall) and on_tool_call:
66                await callback(on_tool_call, pending_part)
67            pending_part = part
68
69    # end of message
70    if pending_part is not None:
71        _message_append(message, pending_part)
72        if isinstance(pending_part, ToolCall) and on_tool_call:
73            await callback(on_tool_call, pending_part)
74
75    if not message.content and not message.tool_calls:
76        raise APIEmptyResponseError("The API returned an empty response.")
77
78    return GenerateResult(
79        id=stream.id,
80        message=message,
81        usage=stream.usage,
82    )

Generate one message based on the given context. Parts of the message will be streamed to the specified callbacks if provided.

Arguments:
  • chat_provider: The chat provider to use for generation.
  • system_prompt: The system prompt to use for generation.
  • tools: The tools available for the model to call.
  • history: The message history to use for generation.
  • on_message_part: An optional callback to be called for each raw message part.
  • on_tool_call: An optional callback to be called for each complete tool call.
Returns:

A tuple of the generated message and the token usage (if available). All parts in the message are guaranteed to be complete and merged as much as possible.

Raises:
  • APIConnectionError: If the API connection fails.
  • APITimeoutError: If the API request times out.
  • APIStatusError: If the API returns a status code of 4xx or 5xx.
  • APIEmptyResponseError: If the API returns an empty response.
  • ChatProviderError: If any other recognized chat provider error occurs.
@dataclass(frozen=True, slots=True)
class GenerateResult:
85@dataclass(frozen=True, slots=True)
86class GenerateResult:
87    """The result of a generation."""
88
89    id: str | None
90    """The ID of the generated message."""
91    message: Message
92    """The generated message."""
93    usage: TokenUsage | None
94    """The token usage of the generated message."""

The result of a generation.

GenerateResult( id: str | None, message: kosong.message.Message, usage: kosong.chat_provider.TokenUsage | None)
id: str | None

The ID of the generated message.

The generated message.

The token usage of the generated message.

async def step( chat_provider: kosong.chat_provider.ChatProvider, system_prompt: str, toolset: kosong.tooling.Toolset, history: Sequence[kosong.message.Message], *, on_message_part: Callback[[StreamedMessagePart], None] | None = None, on_tool_result: Callable[[kosong.tooling.ToolResult], None] | None = None) -> StepResult:
105async def step(
106    chat_provider: ChatProvider,
107    system_prompt: str,
108    toolset: Toolset,
109    history: Sequence[Message],
110    *,
111    on_message_part: Callback[[StreamedMessagePart], None] | None = None,
112    on_tool_result: Callable[[ToolResult], None] | None = None,
113) -> "StepResult":
114    """
115    Run one agent "step". In one step, the function generates LLM response based on the given
116    context for exactly one time. All new message parts will be streamed to `on_message_part` in
117    real-time if provided. Tool calls will be handled by `toolset`. The generated message will be
118    returned in a `StepResult`. Depending on the toolset implementation, the tool calls may be
119    handled asynchronously and the results need to be fetched with `await result.tool_results()`.
120
121    The message history will NOT be modified in this function.
122
123    The token usage will be returned in the `StepResult` if available.
124
125    Raises:
126        APIConnectionError: If the API connection fails.
127        APITimeoutError: If the API request times out.
128        APIStatusError: If the API returns a status code of 4xx or 5xx.
129        APIEmptyResponseError: If the API returns an empty response.
130        ChatProviderError: If any other recognized chat provider error occurs.
131        asyncio.CancelledError: If the step is cancelled.
132    """
133
134    tool_calls: list[ToolCall] = []
135    tool_result_futures: dict[str, ToolResultFuture] = {}
136
137    def future_done_callback(future: ToolResultFuture):
138        if on_tool_result:
139            try:
140                result = future.result()
141                on_tool_result(result)
142            except asyncio.CancelledError:
143                return
144
145    async def on_tool_call(tool_call: ToolCall):
146        tool_calls.append(tool_call)
147        result = toolset.handle(tool_call)
148
149        if isinstance(result, ToolResult):
150            future = ToolResultFuture()
151            future.add_done_callback(future_done_callback)
152            future.set_result(result)
153            tool_result_futures[tool_call.id] = future
154        else:
155            result.add_done_callback(future_done_callback)
156            tool_result_futures[tool_call.id] = result
157
158    try:
159        result = await generate(
160            chat_provider,
161            system_prompt,
162            toolset.tools,
163            history,
164            on_message_part=on_message_part,
165            on_tool_call=on_tool_call,
166        )
167    except (ChatProviderError, asyncio.CancelledError):
168        # cancel all the futures to avoid hanging tasks
169        for future in tool_result_futures.values():
170            future.remove_done_callback(future_done_callback)
171            future.cancel()
172        await asyncio.gather(*tool_result_futures.values(), return_exceptions=True)
173        raise
174
175    return StepResult(
176        result.id,
177        result.message,
178        result.usage,
179        tool_calls,
180        tool_result_futures,
181    )

Run one agent "step". In one step, the function generates LLM response based on the given context for exactly one time. All new message parts will be streamed to on_message_part in real-time if provided. Tool calls will be handled by toolset. The generated message will be returned in a StepResult. Depending on the toolset implementation, the tool calls may be handled asynchronously and the results need to be fetched with await result.tool_results().

The message history will NOT be modified in this function.

The token usage will be returned in the StepResult if available.

Raises:
  • APIConnectionError: If the API connection fails.
  • APITimeoutError: If the API request times out.
  • APIStatusError: If the API returns a status code of 4xx or 5xx.
  • APIEmptyResponseError: If the API returns an empty response.
  • ChatProviderError: If any other recognized chat provider error occurs.
  • asyncio.CancelledError: If the step is cancelled.
@dataclass(frozen=True, slots=True)
class StepResult:
184@dataclass(frozen=True, slots=True)
185class StepResult:
186    id: str | None
187    """The ID of the generated message."""
188
189    message: Message
190    """The message generated in this step."""
191
192    usage: TokenUsage | None
193    """The token usage in this step."""
194
195    tool_calls: list[ToolCall]
196    """All the tool calls generated in this step."""
197
198    _tool_result_futures: dict[str, ToolResultFuture]
199    """@private The futures of the results of the spawned tool calls."""
200
201    async def tool_results(self) -> list[ToolResult]:
202        """All the tool results returned by corresponding tool calls."""
203        if not self._tool_result_futures:
204            return []
205
206        try:
207            results: list[ToolResult] = []
208            for tool_call in self.tool_calls:
209                future = self._tool_result_futures.pop(tool_call.id)
210                result = await future
211                results.append(result)
212            return results
213        finally:
214            # one exception should cancel all the futures to avoid hanging tasks
215            for future in self._tool_result_futures.values():
216                future.cancel()
217            await asyncio.gather(*self._tool_result_futures.values(), return_exceptions=True)
StepResult( id: str | None, message: kosong.message.Message, usage: kosong.chat_provider.TokenUsage | None, tool_calls: list[kosong.message.ToolCall], _tool_result_futures: dict[str, _asyncio.Future[kosong.tooling.ToolResult]])
id: str | None

The ID of the generated message.

The message generated in this step.

The token usage in this step.

tool_calls: list[kosong.message.ToolCall]

All the tool calls generated in this step.

async def tool_results(self) -> list[kosong.tooling.ToolResult]:
201    async def tool_results(self) -> list[ToolResult]:
202        """All the tool results returned by corresponding tool calls."""
203        if not self._tool_result_futures:
204            return []
205
206        try:
207            results: list[ToolResult] = []
208            for tool_call in self.tool_calls:
209                future = self._tool_result_futures.pop(tool_call.id)
210                result = await future
211                results.append(result)
212            return results
213        finally:
214            # one exception should cancel all the futures to avoid hanging tasks
215            for future in self._tool_result_futures.values():
216                future.cancel()
217            await asyncio.gather(*self._tool_result_futures.values(), return_exceptions=True)

All the tool results returned by corresponding tool calls.