Stateful agent with tool execution and event streaming. Built on @mariozechner/pi-ai for general-purpose agent workflows with transport abstraction, state management, and attachment support.
Implement a stateful AI agent using the `@mariozechner/pi-agent` npm package. This package provides tool execution, event streaming, state management, and attachment support built on top of `@mariozechner/pi-ai`.
First, install the required package:
```bash
npm install @mariozechner/pi-agent
```
```typescript
import { Agent } from "@mariozechner/pi-agent";
import { getModel } from "@mariozechner/pi-ai";
const agent = new Agent({
initialState: {
systemPrompt: "You are a helpful assistant.",
model: getModel("anthropic", "claude-sonnet-4-20250514"),
},
});
// Subscribe to events
agent.subscribe((event) => {
if (event.type === "message_update" && event.assistantMessageEvent.type === "text_delta") {
process.stdout.write(event.assistantMessageEvent.delta);
}
});
// Send prompt
await agent.prompt("Hello!");
```
The agent uses `AgentMessage` types that are transformed into LLM-compatible messages:
1. **AgentMessage[]** → `transformContext()` → filtered/pruned messages
2. **AgentMessage[]** → `convertToLlm()` → standard LLM messages
3. **Message[]** → sent to LLM
This allows custom message types (notifications, UI elements) alongside standard LLM messages.
```
prompt("Hello")
├─ agent_start
├─ turn_start
├─ message_start (user message)
├─ message_end
├─ message_start (assistant response)
├─ message_update (streaming chunks)
├─ message_end
├─ turn_end
└─ agent_end
```
With tool calls:
```
prompt("Read config.json")
├─ agent_start
├─ turn_start
├─ message_start/end (user)
├─ message_start (assistant with tool call)
├─ message_update...
├─ message_end
├─ tool_execution_start
├─ tool_execution_update (if streaming)
├─ tool_execution_end
├─ message_start/end (tool result)
├─ turn_end
├─ turn_start (next turn)
├─ message_start (assistant response to tool)
└─ agent_end
```
Define tools using `AgentTool` interface with TypeBox schemas:
```typescript
import { Type } from "@sinclair/typebox";
import { AgentTool } from "@mariozechner/pi-agent";
const readFileTool: AgentTool = {
name: "read_file",
label: "Read File",
description: "Read a file's contents",
parameters: Type.Object({
path: Type.String({ description: "File path" }),
}),
execute: async (toolCallId, params, signal, onUpdate) => {
// Stream progress (optional)
onUpdate?.({
content: [{ type: "text", text: "Reading..." }],
details: {}
});
const content = await fs.readFile(params.path, "utf-8");
return {
content: [{ type: "text", text: content }],
details: { path: params.path, size: content.length },
};
},
};
agent.setTools([readFileTool]);
```
**IMPORTANT**: Throw errors on failure, do not return error messages as content:
```typescript
execute: async (toolCallId, params, signal, onUpdate) => {
if (!fs.existsSync(params.path)) {
throw new Error(`File not found: ${params.path}`);
}
return { content: [{ type: "text", text: "..." }] };
}
```
```typescript
// Update configuration
agent.setSystemPrompt("New instructions");
agent.setModel(getModel("openai", "gpt-4o"));
agent.setThinkingLevel("medium");
agent.setTools([tool1, tool2]);
// Manage messages
agent.appendMessage(message);
agent.replaceMessages(newMessages);
agent.clearMessages();
agent.reset(); // Clear everything
// Access current state
console.log(agent.state.messages);
console.log(agent.state.isStreaming);
console.log(agent.state.streamMessage); // Partial message during streaming
```
```typescript
// Text only
await agent.prompt("Hello");
// With images
await agent.prompt("What's in this image?", [
{ type: "image", data: base64Data, mimeType: "image/jpeg" }
]);
// Direct AgentMessage
await agent.prompt({
role: "user",
content: "Hello",
timestamp: Date.now()
});
// Continue from current context
await agent.continue();
```
Steering interrupts ongoing work; follow-up queues messages after completion:
```typescript
agent.setSteeringMode("one-at-a-time");
agent.setFollowUpMode("one-at-a-time");
// Interrupt during tool execution
agent.steer({
role: "user",
content: "Stop! Do this instead.",
timestamp: Date.now(),
});
// Queue work after completion
agent.followUp({
role: "user",
content: "Also summarize the result.",
timestamp: Date.now(),
});
// Clear queues
agent.clearSteeringQueue();
agent.clearFollowUpQueue();
agent.clearAllQueues();
```
Extend `AgentMessage` via declaration merging:
```typescript
declare module "@mariozechner/pi-agent" {
interface CustomAgentMessages {
notification: {
role: "notification";
text: string;
timestamp: number
};
}
}
// Handle in convertToLlm
const agent = new Agent({
convertToLlm: (messages) => messages.filter(m =>
m.role !== "notification"
),
});
```
For browser apps with backend proxy:
```typescript
import { Agent, streamProxy } from "@mariozechner/pi-agent";
const agent = new Agent({
streamFn: (model, context, options) =>
streamProxy(model, context, {
...options,
authToken: "your-token",
proxyUrl: "https://your-backend.com",
}),
});
```
```typescript
const agent = new Agent({
sessionId: "session-123", // For provider caching
thinkingBudgets: {
minimal: 128,
low: 512,
medium: 1024,
high: 2048,
},
// Dynamic API key resolution
getApiKey: async (provider) => {
return await refreshOAuthToken(provider);
},
});
```
For direct control without the Agent class:
```typescript
import { agentLoop, agentLoopContinue } from "@mariozechner/pi-agent";
const context: AgentContext = {
systemPrompt: "You are helpful.",
messages: [],
tools: [],
};
const config: AgentLoopConfig = {
model: getModel("openai", "gpt-4o"),
convertToLlm: (msgs) => msgs.filter(m =>
["user", "assistant", "toolResult"].includes(m.role)
),
};
const userMessage = {
role: "user",
content: "Hello",
timestamp: Date.now()
};
for await (const event of agentLoop([userMessage], context, config)) {
console.log(event.type);
}
// Continue from existing context
for await (const event of agentLoopContinue(context, config)) {
console.log(event.type);
}
```
1. **Event Handling**: Subscribe to events early to capture all lifecycle events
2. **Error Handling**: Throw errors in tool execution, don't return error content
3. **State Access**: Use `agent.state.streamMessage` for partial messages during streaming
4. **Context Management**: Use `transformContext` for message pruning/compaction before LLM calls
5. **Control Flow**: Use `agent.abort()` to cancel operations, `agent.waitForIdle()` to wait for completion
6. **Tool Streaming**: Use `onUpdate` callback for progressive tool results
7. **Message Filtering**: Implement `convertToLlm` to handle custom message types
8. **Session Persistence**: Set `sessionId` for provider-level caching benefits
```typescript
import { Agent, AgentTool } from "@mariozechner/pi-agent";
import { getModel } from "@mariozechner/pi-ai";
import { Type } from "@sinclair/typebox";
import fs from "fs/promises";
const readFile: AgentTool = {
name: "read_file",
label: "Read File",
description: "Read file contents",
parameters: Type.Object({
path: Type.String(),
}),
execute: async (id, { path }, signal, onUpdate) => {
const content = await fs.readFile(path, "utf-8");
return { content: [{ type: "text", text: content }] };
},
};
const agent = new Agent({
initialState: {
systemPrompt: "You help manage files.",
model: getModel("anthropic", "claude-sonnet-4-20250514"),
tools: [readFile],
},
});
agent.subscribe((event) => {
if (event.type === "message_update" &&
event.assistantMessageEvent.type === "text_delta") {
process.stdout.write(event.assistantMessageEvent.delta);
}
});
await agent.prompt("Read package.json");
```
Leave a review
No reviews yet. Be the first to review this skill!
# Download SKILL.md from killerskills.ai/api/skills/pi-agent/raw