Build automation agents and data workflows using OpenIAP's MongoDB client. Handle queues, work items, watches, and messaging with proper reconnection logic.
Build automation agents and data workflows using OpenIAP's MongoDB-backed platform. This skill covers database operations, work item queues, event streaming, and messaging patterns with proper connection lifecycle management.
1. **Always use double quotes** for strings in JavaScript
2. **All data must be stored** in OpenIAP's MongoDB using the official client
3. **Never use** `console.log()` or `console.debug()` — use client logging methods instead
4. **Handle reconnections** properly using the `onConnected` pattern for stateful operations
Always start by importing and connecting the OpenIAP client:
```js
const { Client } = require("openiap");
const client = new Client();
await client.connect();
```
For code that needs to reinitialize state on reconnect (queue registration, watches, etc.), use the `onConnected` pattern:
```js
await client.connect();
client.on_client_event((event) => {
if (event && event.event === "SignedIn") {
onConnected().catch((error) => {
client.error(error);
});
}
});
async function onConnected() {
// Register queues, watches, or other stateful resources here
const queuename = client.register_queue({ queuename: "myqueue" }, (event) => {
client.info("Received message:", event);
});
}
```
**Important:** Call `on_client_event` **after** `connect()`. You will still receive the initial `Connected` and `SignedIn` events.
Use these methods for CRUD operations:
```js
// Query documents
const results = await client.query({
collectionname: "users",
query: '{"active": true}',
projection: "name email",
orderby: "createdAt",
skip: 0,
top: 100
});
// Aggregation pipeline
const aggregated = await client.aggregate({
collectionname: "orders",
aggregates: '[{"$group": {"_id": "$status", "count": {"$sum": 1}}}]'
});
// Insert one document
const inserted = await client.insert_one({
collectionname: "logs",
item: { timestamp: new Date(), message: "System started" }
});
// Insert or update (upsert)
const upserted = await client.insert_or_update_one({
collectionname: "settings",
item: { key: "theme", value: "dark" },
uniqeness: "key"
});
// Delete documents
await client.delete_many({
collectionname: "logs",
query: '{"timestamp": {"$lt": "2024-01-01"}}'
});
```
Work items represent units of work to be processed. Always update state after processing.
```js
// Push a work item
await client.push_workitem({
wiq: "processing-queue",
name: "Process Invoice",
payload: '{"invoiceId": 12345}',
priority: 2
});
// Pop and process work item
const workitem = await client.pop_workitem({
wiq: "processing-queue",
downloadfolder: "./downloads"
});
try {
// Process the work item
const payload = JSON.parse(workitem.payload);
// ... do work ...
// Mark as successful
workitem.state = "successful";
await client.update_workitem({ workitem });
} catch (error) {
client.error("Work item failed:", error);
// Mark for retry
workitem.state = "retry";
await client.update_workitem({ workitem });
}
```
**Critical:** Always update work item state to `"successful"` or `"retry"`. Items start as `"new"`, become `"processing"` when popped, and must be explicitly completed.
#### Watch (Change Streams)
Monitor collection changes in real-time:
```js
const watchid = await client.watch({
collectionname: "inventory",
paths: ["quantity"]
}, (change, event_counter) => {
client.info("Document changed:", change.id, change.operation);
});
// Unwatch later
await client.unwatch(watchid);
```
#### Queue Messaging
```js
// Register a queue to receive messages
const queuename = client.register_queue({ queuename: "notifications" }, (event) => {
client.info("Received:", event.data);
// Process message
});
// Send a message to a queue
await client.queue_message({
queuename: "notifications",
data: { type: "email", to: "[email protected]" }
});
// RPC-style (send and wait for reply)
const response = await client.rpc({
queuename: "calculator",
data: { operation: "add", values: [5, 3] }
});
```
#### Exchange Messaging
```js
// Register an exchange consumer
const queuename = client.register_exchange({
exchangename: "events",
algorithm: "fanout",
routingkey: "user.created"
}, (event) => {
client.info("Event received:", event.routingkey, event.data);
});
```
```js
// Upload file
const uploaded = await client.upload({
filepath: "./report.pdf",
filename: "monthly-report.pdf",
mimetype: "application/pdf",
metadata: { department: "sales" },
collectionname: "documents"
});
// Download file
await client.download({
collectionname: "documents",
id: uploaded._id,
folder: "./downloads",
filename: "report.pdf"
});
```
Always use client logging methods:
```js
client.info("Application started");
client.verbose("Detailed debug info");
client.error("Something went wrong", error);
client.trace("Fine-grained trace data");
```
Enable tracing early in your application:
```js
client.enable_tracing("openiap=info"); // Options: info, error, debug, trace
```
```js
await client.signin({
username: "admin",
password: "secret",
longtoken: false
});
```
Create custom metrics for Grafana:
```js
client.set_u64_observable_gauge("items_processed", 1523, "Total items processed");
client.set_f64_observable_gauge("success_rate", 0.987, "Processing success rate");
```
```js
const { Client } = require("openiap");
const client = new Client();
client.enable_tracing("openiap=info");
await client.connect();
let queuename = "";
client.on_client_event((event) => {
if (event && event.event === "SignedIn") {
onConnected().catch((error) => client.error(error));
}
});
async function onConnected() {
queuename = client.register_queue({ queuename: "work-queue" }, async (event) => {
try {
const payload = JSON.parse(event.data);
client.info("Processing job:", payload.jobId);
// Do work here
client.set_u64_observable_gauge("jobs_processed", 1, "Jobs processed");
} catch (error) {
client.error("Job failed:", error);
}
});
client.info("Worker ready on queue:", queuename);
}
```
```js
async function syncData() {
const items = await client.query({
collectionname: "pending_sync",
query: '{"synced": false}',
top: 100
});
for (const item of items) {
try {
// Sync to external system
await externalAPI.push(item);
// Mark as synced
item.synced = true;
await client.update_one({
collectionname: "pending_sync",
item: item
});
} catch (error) {
client.error("Sync failed for item:", item._id, error);
}
}
}
```
Leave a review
No reviews yet. Be the first to review this skill!
# Download SKILL.md from killerskills.ai/api/skills/openiap-mongodb-agent/raw