Expert in building stateful, distributed AI agents with AgentPG framework using PostgreSQL and Claude Batch API for event-driven async execution
Expert skill for building stateful, distributed AI agent systems using the AgentPG framework - an event-driven Go framework that uses PostgreSQL for state management and distribution, providing transaction-safe agent execution with Claude Batch API integration.
This skill helps you build and work with AgentPG, an event-driven AI agent framework that:
When working with AgentPG, always follow these principles:
1. **PostgreSQL as Single Source of Truth**: All state is stored in PostgreSQL. No in-memory state that could be lost.
2. **Event-Driven with Polling Fallback**: Uses LISTEN/NOTIFY for real-time events, with polling as fallback for reliability.
3. **Race-Safe Distribution**: Uses `SELECT FOR UPDATE SKIP LOCKED` for safe work claiming across workers.
4. **Transaction-First**: `RunTx()` accepts user transactions for atomic operations.
5. **Database-Driven Agents**: Agents are database entities with UUID primary keys, not per-client registrations. Tools are registered per-client.
6. **Multi-Level Agent Hierarchies**: Agents can be tools for other agents (PM → Lead → Worker pattern).
When setting up AgentPG:
```bash
go get github.com/youssefsiam38/agentpg
psql $DATABASE_URL -f storage/migrations/001_agentpg_migration.up.sql
```
When creating a client, use these configuration options:
```go
client, err := agentpg.NewClient(driver, &agentpg.ClientConfig{
APIKey: os.Getenv("ANTHROPIC_API_KEY"), // Required
Name: "worker-1", // Instance name
MaxConcurrentRuns: 10, // Default: 10
MaxConcurrentTools: 50, // Default: 50
BatchPollInterval: 30 * time.Second, // Claude Batch API polling
RunPollInterval: 1 * time.Second, // New runs polling
ToolPollInterval: 500 * time.Millisecond, // Tool execution polling
})
```
**Driver Options:**
**IMPORTANT**: Agents are database entities created AFTER client starts, identified by UUID.
```go
// Step 1: Start client first
if err := client.Start(ctx); err != nil {
log.Fatal(err)
}
// Step 2: Create agent (returns agent with UUID)
agent, err := client.CreateAgent(ctx, &agentpg.AgentDefinition{
Name: "assistant",
Model: "claude-sonnet-4-5-20250929",
SystemPrompt: "You are a helpful assistant.",
Tools: []string{"calculator", "weather"}, // Must be registered on client
})
// Step 3: Use agent.ID (uuid.UUID) when running
response, err := client.RunSync(ctx, sessionID, agent.ID, "What is 2+2?", nil)
```
**Agent Creation Methods:**
When implementing tools:
```go
type Tool interface {
Name() string
Description() string
InputSchema() ToolSchema
Execute(ctx context.Context, input json.RawMessage) (string, error)
}
// Example: Database-aware tool
type UserLookupTool struct {
db *pgxpool.Pool
}
func (t *UserLookupTool) Name() string {
return "lookup_user"
}
func (t *UserLookupTool) Description() string {
return "Look up user information by ID"
}
func (t *UserLookupTool) InputSchema() tool.ToolSchema {
return tool.ToolSchema{
Type: "object",
Properties: map[string]tool.PropertyDef{
"user_id": {Type: "string", Description: "The user's ID"},
},
Required: []string{"user_id"},
}
}
func (t *UserLookupTool) Execute(ctx context.Context, input json.RawMessage) (string, error) {
var params struct {
UserID string `json:"user_id"`
}
json.Unmarshal(input, ¶ms)
var name, email string
err := t.db.QueryRow(ctx, "SELECT name, email FROM users WHERE id = $1", params.UserID).Scan(&name, &email)
if err != nil {
return "", fmt.Errorf("user not found: %w", err)
}
return fmt.Sprintf("User: %s <%s>", name, email), nil
}
```
**CRITICAL ORDER**:
```go
// Step 1: Register tools on client (BEFORE Start)
client.RegisterTool(&CalculatorTool{})
client.RegisterTool(&UserLookupTool{db: pool})
client.RegisterTool(&WeatherTool{apiKey: key})
// Step 2: Start client
client.Start(ctx)
// Step 3: Create agents with allowed tools (AFTER Start)
agent, _ := client.CreateAgent(ctx, &agentpg.AgentDefinition{
Name: "assistant",
Model: "claude-sonnet-4-5-20250929",
Tools: []string{"calculator", "lookup_user"}, // Only these tools available
})
```
**Rule**: An agent can only use tools that are:
1. Registered on the client instance
2. Listed in the agent's `Tools` array
For delegation between agents, use the `AgentIDs` field:
```go
// Create specialist agents first
frontendDev, _ := client.CreateAgent(ctx, &agentpg.AgentDefinition{
Name: "frontend-dev",
Description: "Frontend developer",
Model: "claude-sonnet-4-5-20250929",
Tools: []string{"lint"},
})
backendDev, _ := client.CreateAgent(ctx, &agentpg.AgentDefinition{
Name: "backend-dev",
Description: "Backend developer",
Model: "claude-sonnet-4-5-20250929",
Tools: []string{"test"},
})
// Create manager that delegates to specialists
techLead, _ := client.CreateAgent(ctx, &agentpg.AgentDefinition{
Name: "tech-lead",
Model: "claude-sonnet-4-5-20250929",
AgentIDs: []uuid.UUID{frontendDev.ID, backendDev.ID}, // Delegation via AgentIDs
})
```
**Key Points**:
Pass per-run variables to tools:
```go
// Pass variables when creating a run
response, _ := client.RunSync(ctx, sessionID, agent.ID, "Continue the story", map[string]any{
"story_id": "story-123",
"user_id": "user-456",
"tenant_id": "tenant-1",
})
// Access in tools
func (t *StoryTool) Execute(ctx context.Context, input json.RawMessage) (string, error) {
// Get single variable with type safety
storyID, ok := tool.GetVariable[string](ctx, "story_id")
if !ok {
return "", errors.New("story_id not provided")
}
// Get with default value
maxChapters := tool.GetVariableOr[int](ctx, "max_chapters", 10)
// Get all variables
vars := tool.GetVariables(ctx)
// Get full run context
runCtx, ok := tool.GetRunContext(ctx)
if ok {
fmt.Printf("Run: %s, Session: %s\n", runCtx.RunID, runCtx.SessionID)
}
// Use in queries...
}
```
Sessions provide conversation context with flexible metadata:
```go
// Create session with metadata
sessionID, err := client.NewSession(ctx, nil, map[string]any{
"tenant_id": "tenant-1",
"user_id": "user-123",
"project": "alpha",
})
// Query sessions by metadata
sessions, _ := client.ListSessions(ctx, map[string]any{"tenant_id": "tenant-1"}, 100, 0)
// Session messages are automatically managed
messages, _ := client.GetSessionMessages(ctx, sessionID)
```
**Synchronous (blocking)**:
```go
response, err := client.RunSync(ctx, sessionID, agent.ID, "What is 2+2?", nil)
fmt.Println(response.Text)
```
**Asynchronous (non-blocking)**:
```go
runID, err := client.Run(ctx, sessionID, agent.ID, "Hello!", nil)
// Do other work...
response, err := client.WaitForRun(ctx, runID)
```
**With transaction safety**:
```go
tx, _ := pool.Begin(ctx)
defer tx.Rollback(ctx)
response, err := client.RunTx(ctx, tx, sessionID, agent.ID, "Create order", map[string]any{
"order_id": "ord-123",
})
if err == nil {
tx.Commit(ctx)
}
```
Choose models based on task complexity:
```go
// Check run status
run, _ := client.GetRun(ctx, runID)
fmt.Printf("Status: %s, Error: %v\n", run.Status, run.Error)
// Monitor iterations (Claude Batch API calls)
iterations, _ := client.GetRunIterations(ctx, runID)
for _, iter := range iterations {
fmt.Printf("Iteration %d: %s\n", iter.Index, iter.Status)
}
// Query tool executions
tools, _ := client.GetRunTools(ctx, runID)
```
When building with AgentPG:
1. **Always start client before creating agents** - Agents are database entities created after startup
2. **Use GetOrCreateAgent for idempotency** - Safe to call on every startup
3. **Register tools before Start()** - Tool registration must happen before client starts
4. **Use transactions for atomic operations** - Leverage RunTx() for data consistency
5. **Leverage metadata for multi-tenancy** - Use session/agent metadata for tenant isolation
6. **Use agent hierarchies for complex workflows** - Delegate to specialist agents via AgentIDs
7. **Pass context via variables, not prompts** - Use run variables for dynamic tool context
8. **Monitor stuck runs** - Configure StuckRunTimeout appropriately
9. **Use UUIDs for agent references** - Always use agent.ID, never agent names in Run calls
```go
// Session with tenant context
sessionID, _ := client.NewSession(ctx, nil, map[string]any{
"tenant_id": "tenant-1",
"user_id": "user-123",
})
// Agent with tenant metadata
agent, _ := client.CreateAgent(ctx, &agentpg.AgentDefinition{
Name: "support-agent",
Model: "claude-sonnet-4-5-20250929",
Metadata: map[string]any{"tenant_id": "tenant-1"},
})
// Run with tenant-scoped variables
response, _ := client.RunSync(ctx, sessionID, agent.ID, "Get customer info", map[string]any{
"tenant_id": "tenant-1",
"customer_id": "cust-456",
})
```
```go
researcher, _ := client.CreateAgent(ctx, &agentpg.AgentDefinition{
Name: "researcher",
Description: "Research specialist",
Model: "claude-sonnet-4-5-20250929",
Tools: []string{"web_search"},
})
writer, _ := client.CreateAgent(ctx, &agentpg.AgentDefinition{
Name: "content-writer",
Model: "claude-opus-4-5-20251101",
AgentIDs: []uuid.UUID{researcher.ID}, // Can delegate to researcher
})
// Writer can ask researcher for information
response, _ := client.RunSync(ctx, sessionID, writer.ID, "Write an article about AI agents", nil)
```
Leave a review
No reviews yet. Be the first to review this skill!
# Download SKILL.md from killerskills.ai/api/skills/agentpg-framework-expert/raw