Compare commits

...

10 commits

14 changed files with 410 additions and 118 deletions

View file

@ -4,14 +4,17 @@ AUTHORIZED_USERS=""
# PDS service URL (optional)
SERVICE="https://bsky.social"
DB_PATH="data/sqlite.db"
DB_PATH="sqlite.db"
GEMINI_MODEL="gemini-2.5-flash"
DID=""
HANDLE=""
# https://bsky.app/settings/app-passwords
BSKY_PASSWORD=""
APP_PASSWORD=""
# https://aistudio.google.com/apikey
GEMINI_API_KEY=""
DAILY_QUERY_LIMIT=15
USE_JETSTREAM=false

2
.gitignore vendored
View file

@ -34,4 +34,4 @@ report.[0-9]_.[0-9]_.[0-9]_.[0-9]_.json
.DS_Store
# Database
data
*.db

View file

@ -6,12 +6,15 @@ services:
environment:
- "AUTHORIZED_USERS=${AUTHORIZED_USERS}"
- "SERVICE=${SERVICE:?https://bsky.social}"
- "DB_PATH=data/sqlite.db"
- DB_PATH=/data/sqlite.db
- "GEMINI_MODEL=${GEMINI_MODEL:-gemini-2.5-flash}"
- "DID=${DID:?}"
- "HANDLE=${HANDLE:?}"
- "BSKY_PASSWORD=${BSKY_PASSWORD:?}"
- "APP_PASSWORD=${APP_PASSWORD:?}"
- "GEMINI_API_KEY=${GEMINI_API_KEY:?}"
- "USE_JETSTREAM=${USE_JETSTREAM:-false}"
volumes:
- .:/app
- aero_db:/app/data
- "aero_db:/data"
volumes:
aero_db:

View file

@ -1,16 +1,71 @@
import { GoogleGenAI } from "@google/genai";
import { Bot } from "@skyware/bot";
import { Bot, EventStrategy } from "@skyware/bot";
import { env } from "./env";
import type { BinaryType } from "bun";
// Websocket patch was written by Claude, hopefully it doesn't suck
const OriginalWebSocket = global.WebSocket;
const binaryTypeDescriptor = Object.getOwnPropertyDescriptor(
OriginalWebSocket.prototype,
"binaryType",
);
const originalSetter = binaryTypeDescriptor?.set;
if (OriginalWebSocket && originalSetter) {
global.WebSocket = new Proxy(OriginalWebSocket, {
construct(target, args) {
//@ts-ignore
const ws = new target(...args) as WebSocket & {
_binaryType?: BinaryType;
};
Object.defineProperty(ws, "binaryType", {
get(): BinaryType {
return ws._binaryType ||
(binaryTypeDescriptor.get
? binaryTypeDescriptor.get.call(ws)
: "arraybuffer");
},
set(value: BinaryType) {
//@ts-ignore
if (value === "blob") {
originalSetter.call(ws, "arraybuffer");
//@ts-ignore
ws._binaryType = "blob";
} else {
originalSetter.call(ws, value);
ws._binaryType = value;
}
},
configurable: true,
});
return ws;
},
}) as typeof WebSocket;
}
export const bot = new Bot({
service: env.SERVICE,
emitChatEvents: true,
eventEmitterOptions: {
strategy: env.USE_JETSTREAM
? EventStrategy.Jetstream
: EventStrategy.Polling,
},
});
export const ai = new GoogleGenAI({
apiKey: env.GEMINI_API_KEY,
});
export const QUOTA_EXCEEDED_MESSAGE =
"You have exceeded your daily message quota (15). Please wait 24 hours before trying again.";
export const ERROR_MESSAGE =
"Sorry, I ran into an issue analyzing that post. Please try again.";
export const UNAUTHORIZED_MESSAGE =
"I cant make sense of your noise just yet. Youll need to be whitelisted before I can help.";

View file

@ -1,10 +1,12 @@
import { drizzle } from "drizzle-orm/bun-sqlite";
import { migrate } from "drizzle-orm/bun-sqlite/migrator";
import { Database } from "bun:sqlite";
import * as schema from "./schema";
import { env } from "../env";
import { migrateDB } from "./migrate";
await migrateDB();
const sqlite = new Database(env.DB_PATH);
export default drizzle(sqlite, { schema });
const db = drizzle(sqlite, { schema });
migrate(db, { migrationsFolder: "./drizzle" });
export default db;

View file

@ -1,10 +0,0 @@
import { migrate } from "drizzle-orm/bun-sqlite/migrator";
import { drizzle } from "drizzle-orm/bun-sqlite";
import { Database } from "bun:sqlite";
import { env } from "../env";
export async function migrateDB() {
const sqlite = new Database(env.DB_PATH);
const db = drizzle(sqlite);
await migrate(db, { migrationsFolder: "./drizzle" });
}

View file

@ -11,11 +11,22 @@ const envSchema = z.object({
DB_PATH: z.string().default("sqlite.db"),
GEMINI_MODEL: z.string().default("gemini-2.5-flash"),
ADMIN_DID: z.string().optional(),
DID: z.string(),
HANDLE: z.string(),
BSKY_PASSWORD: z.string(),
APP_PASSWORD: z.string(),
GEMINI_API_KEY: z.string(),
DAILY_QUERY_LIMIT: z.preprocess(
(val) =>
(typeof val === "string" && val.trim() !== "") ? Number(val) : undefined,
z.number().int().positive().default(15),
),
USE_JETSTREAM: z.preprocess(
(val) => val === "true",
z.boolean().default(false),
),
});
export type Env = z.infer<typeof envSchema>;

View file

@ -1,9 +1,12 @@
import modelPrompt from "../model/prompt.txt";
import { ChatMessage, Conversation } from "@skyware/bot";
import { ChatMessage, Conversation, RichText } from "@skyware/bot";
import * as c from "../core";
import * as tools from "../tools";
import consola from "consola";
import { env } from "../env";
import db from "../db";
import { messages } from "../db/schema";
import { and, count, eq, gte, lt } from "drizzle-orm";
import {
exceedsGraphemes,
multipartResponse,
@ -15,7 +18,12 @@ const logger = consola.withTag("Message Handler");
type SupportedFunctionCall = typeof c.SUPPORTED_FUNCTION_CALLS[number];
async function generateAIResponse(parsedConversation: string) {
async function generateAIResponse(parsedContext: string, messages: {
role: string;
parts: {
text: string;
}[];
}[]) {
const config = {
model: env.GEMINI_MODEL,
config: {
@ -29,21 +37,19 @@ async function generateAIResponse(parsedConversation: string) {
parts: [
{
text: modelPrompt
.replace("{{ handle }}", env.HANDLE),
.replace("$handle", env.HANDLE),
},
],
},
{
role: "user" as const,
role: "model" as const,
parts: [
{
text:
`Below is the yaml for the current conversation. The last message is the one to respond to. The post is the current one you are meant to be analyzing.
${parsedConversation}`,
text: parsedContext,
},
],
},
...messages,
];
let inference = await c.ai.models.generateContent({
@ -96,17 +102,64 @@ ${parsedConversation}`,
return inference;
}
async function sendResponse(
conversation: Conversation,
text: string,
): Promise<void> {
if (exceedsGraphemes(text)) {
multipartResponse(conversation, text);
} else {
conversation.sendMessage({
text,
});
function addCitations(
inference: Awaited<ReturnType<typeof c.ai.models.generateContent>>,
) {
let originalText = inference.text ?? "";
if (!inference.candidates) {
return originalText;
}
const supports = inference.candidates[0]?.groundingMetadata
?.groundingSupports;
const chunks = inference.candidates[0]?.groundingMetadata?.groundingChunks;
const richText = new RichText();
if (!supports || !chunks || originalText === "") {
return richText.addText(originalText);
}
const sortedSupports = [...supports].sort(
(a, b) => (b.segment?.endIndex ?? 0) - (a.segment?.endIndex ?? 0),
);
let currentText = originalText;
for (const support of sortedSupports) {
const endIndex = support.segment?.endIndex;
if (endIndex === undefined || !support.groundingChunkIndices?.length) {
continue;
}
const citationLinks = support.groundingChunkIndices
.map((i) => {
const uri = chunks[i]?.web?.uri;
if (uri) {
return { index: i + 1, uri };
}
return null;
})
.filter(Boolean);
if (citationLinks.length > 0) {
richText.addText(currentText.slice(endIndex));
citationLinks.forEach((citation, idx) => {
if (citation) {
richText.addLink(`[${citation.index}]`, citation.uri);
if (idx < citationLinks.length - 1) {
richText.addText(", ");
}
}
});
currentText = currentText.slice(0, endIndex);
}
}
richText.addText(currentText);
return richText;
}
export async function handler(message: ChatMessage): Promise<void> {
@ -122,42 +175,78 @@ export async function handler(message: ChatMessage): Promise<void> {
: env.AUTHORIZED_USERS.includes(message.senderDid as any);
if (!authorized) {
conversation.sendMessage({
await conversation.sendMessage({
text: c.UNAUTHORIZED_MESSAGE,
});
return;
}
if (message.senderDid != env.ADMIN_DID) {
const todayStart = new Date();
todayStart.setHours(0, 0, 0, 0);
const dailyCount = await db
.select({ count: count(messages.id) })
.from(messages)
.where(
and(
eq(messages.did, message.senderDid),
gte(messages.created_at, todayStart),
),
);
if (dailyCount[0]!.count >= env.DAILY_QUERY_LIMIT) {
conversation.sendMessage({
text: c.QUOTA_EXCEEDED_MESSAGE,
});
return;
}
}
logger.success("Found conversation");
conversation.sendMessage({
text: "...",
});
const parsedConversation = await parseConversation(conversation);
logger.info("Parsed conversation: ", parsedConversation);
const parsedConversation = await parseConversation(conversation, message);
try {
const inference = await generateAIResponse(parsedConversation);
const inference = await generateAIResponse(
parsedConversation.context,
parsedConversation.messages,
);
if (!inference) {
throw new Error("Failed to generate text. Returned undefined.");
logger.error("Failed to generate text. Returned undefined.");
return;
}
logger.success("Generated text:", inference.text);
saveMessage(conversation, env.DID, inference.text!);
const responseText = inference.text;
if (responseText) {
await sendResponse(conversation, responseText);
}
} catch (error) {
logger.error("Error in post handler:", error);
const responseWithCitations = addCitations(inference);
await conversation.sendMessage({
text:
"Sorry, I ran into an issue analyzing that post. Please try again.",
if (responseWithCitations) {
logger.success("Generated text:", responseText);
saveMessage(conversation, env.DID, responseText!);
if (exceedsGraphemes(responseWithCitations)) {
multipartResponse(conversation, responseWithCitations);
} else {
conversation.sendMessage({
text: responseWithCitations,
});
}
}
} catch (error: any) {
logger.error("Error in post handler:", error);
let errorMsg = c.ERROR_MESSAGE;
if (error.error.code == 503) {
errorMsg =
"Sorry, the AI model is currently overloaded. Please try again later.";
}
await conversation.sendMessage({
text: errorMsg,
});
}
}

View file

@ -11,7 +11,7 @@ logger.info("Logging in..");
try {
await bot.login({
identifier: env.HANDLE,
password: env.BSKY_PASSWORD,
password: env.APP_PASSWORD,
});
logger.success(`Logged in as @${env.HANDLE} (${env.DID})`);
@ -19,7 +19,7 @@ try {
await bot.setChatPreference(IncomingChatPreference.All);
bot.on("message", messages.handler);
logger.success("Registered events (reply, mention, quote)");
logger.success("Registered events (message)");
} catch (e) {
logger.error("Failure to log-in: ", e);
process.exit(1);

View file

@ -1,7 +1,7 @@
You are Aero, a neutral and helpful assistant on Bluesky.
Your job is to give clear, factual, and concise explanations or context about posts users send you.
Handle: {{ handle }}
Handle: $handle
Guidelines:

12
src/types.ts Normal file
View file

@ -0,0 +1,12 @@
export type ParsedPost = {
thread?: {
ancestors: ParsedPost[];
};
author: string;
text: string;
images?: {
index: number;
alt: string;
}[];
quotePost?: ParsedPost;
};

42
src/utils/cache.ts Normal file
View file

@ -0,0 +1,42 @@
interface CacheEntry<T> {
value: T;
expiry: number;
}
class TimedCache<T> {
private cache = new Map<string, CacheEntry<T>>();
private ttl: number; // Time to live in milliseconds
constructor(ttl: number) {
this.ttl = ttl;
}
get(key: string): T | undefined {
const entry = this.cache.get(key);
if (!entry) {
return undefined;
}
if (Date.now() > entry.expiry) {
this.cache.delete(key); // Entry expired
return undefined;
}
return entry.value;
}
set(key: string, value: T): void {
const expiry = Date.now() + this.ttl;
this.cache.set(key, { value, expiry });
}
delete(key: string): void {
this.cache.delete(key);
}
clear(): void {
this.cache.clear();
}
}
export const postCache = new TimedCache<any>(2 * 60 * 1000); // 2 minutes cache

View file

@ -2,21 +2,20 @@ import {
type ChatMessage,
type Conversation,
graphemeLength,
RichText,
} from "@skyware/bot";
import * as yaml from "js-yaml";
import db from "../db";
import { conversations, messages } from "../db/schema";
import { and, eq } from "drizzle-orm";
import { env } from "../env";
import { bot, MAX_GRAPHEMES } from "../core";
import { parsePostImages, traverseThread } from "./post";
import { bot, ERROR_MESSAGE, MAX_GRAPHEMES } from "../core";
import { parsePost, parsePostImages, traverseThread } from "./post";
import { postCache } from "../utils/cache";
/*
Utilities
*/
const resolveDid = (convo: Conversation, did: string) =>
convo.members.find((actor) => actor.did == did)!;
const getUserDid = (convo: Conversation) =>
convo.members.find((actor) => actor.did != env.DID)!;
@ -29,22 +28,16 @@ function generateRevision(bytes = 8) {
/*
Conversations
*/
async function initConvo(convo: Conversation) {
async function initConvo(convo: Conversation, initialMessage: ChatMessage) {
const user = getUserDid(convo);
const initialMessage = (await convo.getMessages()).messages[0] as
| ChatMessage
| undefined;
if (!initialMessage) {
throw new Error("Failed to get initial message of conversation");
}
const postUri = await parseMessagePostUri(initialMessage);
if (!postUri) {
convo.sendMessage({
await convo.sendMessage({
text:
"Please send a post for me to make sense of the noise for you.",
});
throw new Error("No post reference in initial message.");
}
@ -70,7 +63,11 @@ async function initConvo(convo: Conversation) {
did: user.did,
postUri,
revision: _convo.revision,
text: initialMessage.text,
text:
!initialMessage.text ||
initialMessage.text.trim().length == 0
? "Explain this post."
: initialMessage.text,
});
return _convo!;
@ -87,14 +84,14 @@ async function getConvo(convoId: string) {
return convo;
}
export async function parseConversation(convo: Conversation) {
export async function parseConversation(
convo: Conversation,
latestMessage: ChatMessage,
) {
let row = await getConvo(convo.id);
if (!row) {
row = await initConvo(convo);
row = await initConvo(convo, latestMessage);
} else {
const latestMessage = (await convo.getMessages())
.messages[0] as ChatMessage;
const postUri = await parseMessagePostUri(latestMessage);
if (postUri) {
const [updatedRow] = await db
@ -119,44 +116,50 @@ export async function parseConversation(convo: Conversation) {
did: getUserDid(convo).did,
postUri: row.postUri,
revision: row.revision,
text: latestMessage!.text,
text: postUri &&
(!latestMessage.text ||
latestMessage.text.trim().length == 0)
? "Explain this post."
: latestMessage.text,
});
}
const post = await bot.getPost(row.postUri);
let post = postCache.get(row.postUri);
if (!post) {
post = await bot.getPost(row.postUri);
postCache.set(row.postUri, post);
}
const convoMessages = await getRelevantMessages(row!);
const thread = await traverseThread(post);
return yaml.dump({
post: {
thread: {
ancestors: thread.map((post) => ({
author: post.author.displayName
? `${post.author.displayName} (${post.author.handle})`
: `Handle: ${post.author.handle}`,
text: post.text,
})),
},
author: post.author.displayName
? `${post.author.displayName} (${post.author.handle})`
: `Handle: ${post.author.handle}`,
text: post.text,
images: parsePostImages(post),
likes: post.likeCount || 0,
replies: post.replyCount || 0,
},
let parseResult = null;
try {
const parsedPost = await parsePost(post, true, new Set());
parseResult = {
context: yaml.dump({
post: parsedPost || null,
}),
messages: convoMessages.map((message) => {
const profile = resolveDid(convo, message.did);
const role = message.did == env.DID ? "model" : "user";
return {
user: profile.displayName
? `${profile.displayName} (${profile.handle})`
: `Handle: ${profile.handle}`,
role,
parts: [
{
text: message.text,
},
],
};
}),
};
} catch (e) {
await convo.sendMessage({
text: ERROR_MESSAGE,
});
throw new Error("Failed to parse conversation");
}
return parseResult;
}
/*
@ -175,7 +178,8 @@ async function getRelevantMessages(convo: typeof conversations.$inferSelect) {
.where(
and(
eq(messages.conversationId, convo.id),
eq(messages.postUri, convo!.postUri),
eq(messages.postUri, convo.postUri),
eq(messages.revision, convo.revision),
),
)
.limit(15);
@ -198,7 +202,7 @@ export async function saveMessage(
.values({
conversationId: _convo.id,
postUri: _convo.postUri,
revision: _convo.postUri,
revision: _convo.revision,
did,
text,
});
@ -207,7 +211,10 @@ export async function saveMessage(
/*
Reponse Utilities
*/
export function exceedsGraphemes(content: string) {
export function exceedsGraphemes(content: string | RichText) {
if (content instanceof RichText) {
return graphemeLength(content.text) > MAX_GRAPHEMES;
}
return graphemeLength(content) > MAX_GRAPHEMES;
}
@ -235,8 +242,24 @@ export function splitResponse(text: string): string[] {
return chunks.map((chunk, i) => `(${i + 1}/${total}) ${chunk}`);
}
export async function multipartResponse(convo: Conversation, content: string) {
const parts = splitResponse(content).filter((p) => p.trim().length > 0);
export async function multipartResponse(
convo: Conversation,
content: string | RichText,
) {
let parts: (string | RichText)[];
if (content instanceof RichText) {
if (exceedsGraphemes(content)) {
// If RichText exceeds grapheme limit, convert to plain text for splitting
parts = splitResponse(content.text);
} else {
// Otherwise, send the RichText directly as a single part
parts = [content];
}
} else {
// If content is a string, behave as before
parts = splitResponse(content);
}
for (const segment of parts) {
await convo.sendMessage({

View file

@ -1,6 +1,66 @@
import { EmbedImage, Post } from "@skyware/bot";
import {
EmbedImage,
Post,
PostEmbed,
RecordEmbed,
RecordWithMediaEmbed,
} from "@skyware/bot";
import * as c from "../core";
import * as yaml from "js-yaml";
import type { ParsedPost } from "../types";
import { postCache } from "../utils/cache";
export async function parsePost(
post: Post,
includeThread: boolean,
seenUris: Set<string> = new Set(),
): Promise<ParsedPost | undefined> {
if (seenUris.has(post.uri)) {
return undefined;
}
seenUris.add(post.uri);
const [images, quotePost, ancestorPosts] = await Promise.all([
parsePostImages(post),
parseQuote(post, seenUris),
includeThread ? traverseThread(post) : Promise.resolve(null),
]);
return {
author: post.author.displayName
? `${post.author.displayName} (${post.author.handle})`
: `Handle: ${post.author.handle}`,
text: post.text,
...(images && { images }),
...(quotePost && { quotePost }),
...(ancestorPosts && {
thread: {
ancestors: (await Promise.all(
ancestorPosts.map((ancestor) => parsePost(ancestor, false, seenUris)),
)).filter((post): post is ParsedPost => post !== undefined),
},
}),
};
}
async function parseQuote(post: Post, seenUris: Set<string>) {
if (
!post.embed || (!post.embed.isRecord() && !post.embed.isRecordWithMedia())
) return undefined;
const record = (post.embed as RecordEmbed || RecordWithMediaEmbed).record;
if (seenUris.has(record.uri)) {
return undefined;
}
let embedPost = postCache.get(record.uri);
if (!embedPost) {
embedPost = await c.bot.getPost(record.uri);
postCache.set(record.uri, embedPost);
}
return await parsePost(embedPost, false, seenUris);
}
export function parsePostImages(post: Post) {
if (!post.embed) return [];
@ -16,7 +76,9 @@ export function parsePostImages(post: Post) {
}
}
return images.map((image, idx) => parseImage(image, idx + 1));
return images.map((image, idx) => parseImage(image, idx + 1)).filter((img) =>
img.alt.length > 0
);
}
function parseImage(image: EmbedImage, index: number) {