openclaw/src/discord/monitor/message-handler.process.ts

416 lines
14 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import { resolveAckReaction, resolveHumanDelayConfig } from "../../agents/identity.js";
import {
removeAckReactionAfterReply,
shouldAckReaction as shouldAckReactionGate,
} from "../../channels/ack-reactions.js";
import { logTypingFailure, logAckFailure } from "../../channels/logging.js";
import { createReplyPrefixContext } from "../../channels/reply-prefix.js";
import { createTypingCallbacks } from "../../channels/typing.js";
import {
formatInboundEnvelope,
formatThreadStarterEnvelope,
resolveEnvelopeFormatOptions,
} from "../../auto-reply/envelope.js";
import { dispatchInboundMessage } from "../../auto-reply/dispatch.js";
import {
buildPendingHistoryContextFromMap,
clearHistoryEntriesIfEnabled,
} from "../../auto-reply/reply/history.js";
import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js";
import { createReplyDispatcherWithTyping } from "../../auto-reply/reply/reply-dispatcher.js";
import type { ReplyPayload } from "../../auto-reply/types.js";
import { recordInboundSession } from "../../channels/session.js";
import { readSessionUpdatedAt, resolveStorePath } from "../../config/sessions.js";
import { resolveMarkdownTableMode } from "../../config/markdown-tables.js";
import { danger, logVerbose, shouldLogVerbose } from "../../globals.js";
import { buildAgentSessionKey } from "../../routing/resolve-route.js";
import { resolveThreadSessionKeys } from "../../routing/session-key.js";
import { truncateUtf16Safe } from "../../utils.js";
import { reactMessageDiscord, removeReactionDiscord } from "../send.js";
import { normalizeDiscordSlug } from "./allow-list.js";
import { formatDiscordUserTag, resolveTimestampMs } from "./format.js";
import type { DiscordMessagePreflightContext } from "./message-handler.preflight.js";
import {
buildDiscordMediaPayload,
resolveDiscordMessageText,
resolveMediaList,
} from "./message-utils.js";
import { buildDirectLabel, buildGuildLabel, resolveReplyContext } from "./reply-context.js";
import { deliverDiscordReply } from "./reply-delivery.js";
import { resolveDiscordAutoThreadReplyPlan, resolveDiscordThreadStarter } from "./threading.js";
import { sendTyping } from "./typing.js";
export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) {
const {
cfg,
discordConfig,
accountId,
token,
runtime,
guildHistories,
historyLimit,
mediaMaxBytes,
textLimit,
replyToMode,
ackReactionScope,
message,
author,
data,
client,
channelInfo,
channelName,
isGuildMessage,
isDirectMessage,
isGroupDm,
baseText,
messageText,
shouldRequireMention,
canDetectMention,
effectiveWasMentioned,
shouldBypassMention,
threadChannel,
threadParentId,
threadParentName,
threadParentType,
threadName,
displayChannelSlug,
guildInfo,
guildSlug,
channelConfig,
baseSessionKey,
route,
commandAuthorized,
} = ctx;
const mediaList = await resolveMediaList(message, mediaMaxBytes);
const text = messageText;
if (!text) {
logVerbose(`discord: drop message ${message.id} (empty content)`);
return;
}
const ackReaction = resolveAckReaction(cfg, route.agentId);
const removeAckAfterReply = cfg.messages?.removeAckAfterReply ?? false;
const shouldAckReaction = () =>
Boolean(
ackReaction &&
shouldAckReactionGate({
scope: ackReactionScope,
isDirect: isDirectMessage,
isGroup: isGuildMessage || isGroupDm,
isMentionableGroup: isGuildMessage,
requireMention: Boolean(shouldRequireMention),
canDetectMention,
effectiveWasMentioned,
shouldBypassMention,
}),
);
const ackReactionPromise = shouldAckReaction()
? reactMessageDiscord(message.channelId, message.id, ackReaction, {
rest: client.rest,
}).then(
() => true,
(err) => {
logVerbose(`discord react failed for channel ${message.channelId}: ${String(err)}`);
return false;
},
)
: null;
const fromLabel = isDirectMessage
? buildDirectLabel(author)
: buildGuildLabel({
guild: data.guild ?? undefined,
channelName: channelName ?? message.channelId,
channelId: message.channelId,
});
const senderTag = formatDiscordUserTag(author);
const senderDisplay = data.member?.nickname ?? author.globalName ?? author.username;
const senderLabel =
senderDisplay && senderTag && senderDisplay !== senderTag
? `${senderDisplay} (${senderTag})`
: (senderDisplay ?? senderTag ?? author.id);
const groupChannel = isGuildMessage && displayChannelSlug ? `#${displayChannelSlug}` : undefined;
const groupSubject = isDirectMessage ? undefined : groupChannel;
const channelDescription = channelInfo?.topic?.trim();
const systemPromptParts = [
channelDescription ? `Channel topic: ${channelDescription}` : null,
channelConfig?.systemPrompt?.trim() || null,
].filter((entry): entry is string => Boolean(entry));
const groupSystemPrompt =
systemPromptParts.length > 0 ? systemPromptParts.join("\n\n") : undefined;
const storePath = resolveStorePath(cfg.session?.store, {
agentId: route.agentId,
});
const envelopeOptions = resolveEnvelopeFormatOptions(cfg);
const previousTimestamp = readSessionUpdatedAt({
storePath,
sessionKey: route.sessionKey,
});
let combinedBody = formatInboundEnvelope({
channel: "Discord",
from: fromLabel,
timestamp: resolveTimestampMs(message.timestamp),
body: text,
chatType: isDirectMessage ? "direct" : "channel",
senderLabel,
previousTimestamp,
envelope: envelopeOptions,
});
const shouldIncludeChannelHistory =
!isDirectMessage && !(isGuildMessage && channelConfig?.autoThread && !threadChannel);
if (shouldIncludeChannelHistory) {
combinedBody = buildPendingHistoryContextFromMap({
historyMap: guildHistories,
historyKey: message.channelId,
limit: historyLimit,
currentMessage: combinedBody,
formatEntry: (entry) =>
formatInboundEnvelope({
channel: "Discord",
from: fromLabel,
timestamp: entry.timestamp,
body: `${entry.body} [id:${entry.messageId ?? "unknown"} channel:${message.channelId}]`,
chatType: "channel",
senderLabel: entry.sender,
envelope: envelopeOptions,
}),
});
}
const replyContext = resolveReplyContext(message, resolveDiscordMessageText, {
envelope: envelopeOptions,
});
if (replyContext) {
combinedBody = `[Replied message - for context]\n${replyContext}\n\n${combinedBody}`;
}
let threadStarterBody: string | undefined;
let threadLabel: string | undefined;
let parentSessionKey: string | undefined;
if (threadChannel) {
const starter = await resolveDiscordThreadStarter({
channel: threadChannel,
client,
parentId: threadParentId,
parentType: threadParentType,
resolveTimestampMs,
});
if (starter?.text) {
const starterEnvelope = formatThreadStarterEnvelope({
channel: "Discord",
author: starter.author,
timestamp: starter.timestamp,
body: starter.text,
envelope: envelopeOptions,
});
threadStarterBody = starterEnvelope;
}
const parentName = threadParentName ?? "parent";
threadLabel = threadName
? `Discord thread #${normalizeDiscordSlug(parentName)} ${threadName}`
: `Discord thread #${normalizeDiscordSlug(parentName)}`;
if (threadParentId) {
parentSessionKey = buildAgentSessionKey({
agentId: route.agentId,
channel: route.channel,
peer: { kind: "channel", id: threadParentId },
});
}
}
const mediaPayload = buildDiscordMediaPayload(mediaList);
const threadKeys = resolveThreadSessionKeys({
baseSessionKey,
threadId: threadChannel ? message.channelId : undefined,
parentSessionKey,
useSuffix: false,
});
const replyPlan = await resolveDiscordAutoThreadReplyPlan({
client,
message,
isGuildMessage,
channelConfig,
threadChannel,
baseText: baseText ?? "",
combinedBody,
replyToMode,
agentId: route.agentId,
channel: route.channel,
});
const deliverTarget = replyPlan.deliverTarget;
const replyTarget = replyPlan.replyTarget;
const replyReference = replyPlan.replyReference;
const autoThreadContext = replyPlan.autoThreadContext;
const effectiveFrom = isDirectMessage
? `discord:${author.id}`
: (autoThreadContext?.From ?? `discord:channel:${message.channelId}`);
const effectiveTo = autoThreadContext?.To ?? replyTarget;
if (!effectiveTo) {
runtime.error?.(danger("discord: missing reply target"));
return;
}
const ctxPayload = finalizeInboundContext({
Body: combinedBody,
RawBody: baseText,
CommandBody: baseText,
From: effectiveFrom,
To: effectiveTo,
SessionKey: autoThreadContext?.SessionKey ?? threadKeys.sessionKey,
AccountId: route.accountId,
ChatType: isDirectMessage ? "direct" : "channel",
ConversationLabel: fromLabel,
SenderName: data.member?.nickname ?? author.globalName ?? author.username,
SenderId: author.id,
SenderUsername: author.username,
SenderTag: formatDiscordUserTag(author),
GroupSubject: groupSubject,
GroupChannel: groupChannel,
GroupSystemPrompt: isGuildMessage ? groupSystemPrompt : undefined,
GroupSpace: isGuildMessage ? (guildInfo?.id ?? guildSlug) || undefined : undefined,
Provider: "discord" as const,
Surface: "discord" as const,
WasMentioned: effectiveWasMentioned,
MessageSid: message.id,
ParentSessionKey: autoThreadContext?.ParentSessionKey ?? threadKeys.parentSessionKey,
ThreadStarterBody: threadStarterBody,
ThreadLabel: threadLabel,
Timestamp: resolveTimestampMs(message.timestamp),
...mediaPayload,
CommandAuthorized: commandAuthorized,
CommandSource: "text" as const,
// Originating channel for reply routing.
OriginatingChannel: "discord" as const,
OriginatingTo: autoThreadContext?.OriginatingTo ?? replyTarget,
});
await recordInboundSession({
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
ctx: ctxPayload,
updateLastRoute: isDirectMessage
? {
sessionKey: route.mainSessionKey,
channel: "discord",
to: `user:${author.id}`,
accountId: route.accountId,
}
: undefined,
onRecordError: (err) => {
logVerbose(`discord: failed updating session meta: ${String(err)}`);
},
});
if (shouldLogVerbose()) {
const preview = truncateUtf16Safe(combinedBody, 200).replace(/\n/g, "\\n");
logVerbose(
`discord inbound: channel=${message.channelId} deliver=${deliverTarget} from=${ctxPayload.From} preview="${preview}"`,
);
}
const typingChannelId = deliverTarget.startsWith("channel:")
? deliverTarget.slice("channel:".length)
: message.channelId;
const prefixContext = createReplyPrefixContext({ cfg, agentId: route.agentId });
const tableMode = resolveMarkdownTableMode({
cfg,
channel: "discord",
accountId,
});
const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({
responsePrefix: prefixContext.responsePrefix,
responsePrefixContextProvider: prefixContext.responsePrefixContextProvider,
humanDelay: resolveHumanDelayConfig(cfg, route.agentId),
deliver: async (payload: ReplyPayload) => {
const replyToId = replyReference.use();
await deliverDiscordReply({
replies: [payload],
target: deliverTarget,
token,
accountId,
rest: client.rest,
runtime,
replyToId,
textLimit,
maxLinesPerMessage: discordConfig?.maxLinesPerMessage,
tableMode,
});
replyReference.markSent();
},
onError: (err, info) => {
runtime.error?.(danger(`discord ${info.kind} reply failed: ${String(err)}`));
},
onReplyStart: createTypingCallbacks({
start: () => sendTyping({ client, channelId: typingChannelId }),
onStartError: (err) => {
logTypingFailure({
log: logVerbose,
channel: "discord",
target: typingChannelId,
error: err,
});
},
}).onReplyStart,
});
const { queuedFinal, counts } = await dispatchInboundMessage({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
...replyOptions,
skillFilter: channelConfig?.skills,
disableBlockStreaming:
typeof discordConfig?.blockStreaming === "boolean"
? !discordConfig.blockStreaming
: undefined,
onModelSelected: (ctx) => {
prefixContext.onModelSelected(ctx);
},
},
});
markDispatchIdle();
if (!queuedFinal) {
if (isGuildMessage) {
clearHistoryEntriesIfEnabled({
historyMap: guildHistories,
historyKey: message.channelId,
limit: historyLimit,
});
}
return;
}
if (shouldLogVerbose()) {
const finalCount = counts.final;
logVerbose(
`discord: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${replyTarget}`,
);
}
removeAckReactionAfterReply({
removeAfterReply: removeAckAfterReply,
ackReactionPromise,
ackReactionValue: ackReaction,
remove: async () => {
await removeReactionDiscord(message.channelId, message.id, ackReaction, {
rest: client.rest,
});
},
onError: (err) => {
logAckFailure({
log: logVerbose,
channel: "discord",
target: `${message.channelId}/${message.id}`,
error: err,
});
},
});
if (isGuildMessage) {
clearHistoryEntriesIfEnabled({
historyMap: guildHistories,
historyKey: message.channelId,
limit: historyLimit,
});
}
}