Skip to content

donna.memory.sources_chat

donna.memory.sources_chat

ChatSource — index conversation turns into the memory store.

Hooks into :meth:donna.tasks.database.Database.add_chat_message via the Option A constructor callback (see :mod:donna.memory.observers for the wiring rationale). The source maintains a per-session rolling buffer and flushes a turn document when:

  1. the role flips (user → assistant or vice versa);
  2. the buffer reaches :attr:ChatTurnChunker.max_tokens; or
  3. the session transitions out of active (close / expire).

source_id is "{session_id}:{first_msg_id}-{last_msg_id}" so an updated buffer upserts the same document (idempotent by UNIQUE(user_id, source_type, source_id)). The backfill path walks existing conversation_messages + conversation_sessions rows and regroups them through the same chunker; running backfill twice is a no-op.

Why this source upserts directly instead of going through :class:MemoryIngestQueue. The queue exists to amortise embedding cost when many events arrive in a burst (the vault backfill replays the entire corpus on boot). Chat turns arrive one-at-a-time at human-typing rate, so the batching window almost never fires with more than one event in it; we'd pay queue overhead for no batching win and lose the synchronous ordering tests + the in-process session buffer rely on. If a future workload bursts chat ingest (bulk import, transcript replay), revisit by passing a queue here.

logger module-attribute

logger = get_logger()

SOURCE_TYPE module-attribute

SOURCE_TYPE = 'chat'

ChatSource

ChatSource(*, store: MemoryStore, cfg: ChatSourceConfig, user_id_default: str = 'nick')

Observer + backfill for chat turns.

Stateless from the DB's perspective: the only side effect is MemoryStore.upsert (via the ingest queue if available, else direct). Failures are logged under memory_ingest_failed and swallowed — the caller has already committed the chat row.

Source code in src/donna/memory/sources_chat.py
def __init__(
    self,
    *,
    store: MemoryStore,
    cfg: ChatSourceConfig,
    user_id_default: str = "nick",
) -> None:
    self._store = store
    self._cfg = cfg
    self._chunker = ChatTurnChunker(
        max_tokens=256,
        merge_consecutive_roles=cfg.merge_consecutive_same_role,
        min_chars=cfg.min_chars,
        task_verbs=list(cfg.task_verbs),
        include_roles=list(cfg.index_roles),
    )
    self._default_user_id = user_id_default
    # Per-session rolling buffer (role + message list). Flushed
    # whenever the chunker would close a turn.
    self._buffers: dict[str, _SessionBuffer] = {}

observe_message async

observe_message(event: dict[str, Any]) -> None

Handle one chat_message_added event.

Event shape: {"session_id", "user_id", "message": {...}}.

Source code in src/donna/memory/sources_chat.py
async def observe_message(self, event: dict[str, Any]) -> None:
    """Handle one ``chat_message_added`` event.

    Event shape: ``{"session_id", "user_id", "message": {...}}``.
    """
    if not self._cfg.enabled:
        return
    msg = event.get("message") or {}
    role = str(msg.get("role") or "")
    if role not in self._cfg.index_roles:
        # Still flush — a system message creates a turn boundary.
        await self._flush_session(
            event["session_id"],
            user_id=str(event.get("user_id") or self._default_user_id),
        )
        return
    session_id = str(event["session_id"])
    user_id = str(event.get("user_id") or self._default_user_id)
    buf = self._buffers.setdefault(
        session_id, _SessionBuffer(user_id=user_id),
    )
    if buf.role is not None and role != buf.role:
        await self._flush_session(session_id, user_id=user_id)
        buf = self._buffers.setdefault(
            session_id, _SessionBuffer(user_id=user_id),
        )
    buf.role = role
    buf.messages.append(
        {"id": str(msg["id"]), "role": role, "content": msg.get("content") or ""}
    )
    # Re-chunk the buffer: if the chunker emits more than one turn
    # the earlier ones are complete and ready to flush.
    turns = self._chunker.chunk_messages(buf.messages)
    if len(turns) > 1:
        for turn in turns[:-1]:
            await self._emit_turn(session_id, user_id, turn)
        # Keep only the messages that contribute to the last turn
        # (they may still grow on the next incoming message).
        tail_ids = set(turns[-1].message_ids)
        buf.messages = [m for m in buf.messages if m["id"] in tail_ids]
    elif len(turns) == 1 and turns[0].token_count >= self._chunker.max_tokens:
        await self._emit_turn(session_id, user_id, turns[0])
        buf.reset()

observe_session_closed async

observe_session_closed(event: dict[str, Any]) -> None

Flush the session buffer on EXPIRED / CLOSED transitions.

Source code in src/donna/memory/sources_chat.py
async def observe_session_closed(self, event: dict[str, Any]) -> None:
    """Flush the session buffer on EXPIRED / CLOSED transitions."""
    if not self._cfg.enabled:
        return
    session_id = str(event["session_id"])
    user_id = str(event.get("user_id") or self._default_user_id)
    await self._flush_session(session_id, user_id=user_id)
    self._buffers.pop(session_id, None)

backfill async

backfill(user_id: str) -> int

Re-ingest every chat session for user_id.

Walks conversation_sessions + conversation_messages, regroups into turns via the chunker, and upserts each. The upsert is idempotent on (user_id, source_type, source_id) so re-running leaves row counts unchanged.

Source code in src/donna/memory/sources_chat.py
async def backfill(self, user_id: str) -> int:
    """Re-ingest every chat session for ``user_id``.

    Walks ``conversation_sessions`` + ``conversation_messages``,
    regroups into turns via the chunker, and upserts each. The
    upsert is idempotent on ``(user_id, source_type, source_id)``
    so re-running leaves row counts unchanged.
    """
    if not self._cfg.enabled:
        return 0
    conn = self._store._conn  # intentional access — same package
    async with conn.execute(
        "SELECT id FROM conversation_sessions WHERE user_id=? ORDER BY created_at",
        (user_id,),
    ) as cur:
        session_rows = await cur.fetchall()
    n = 0
    for (session_id,) in session_rows:
        async with conn.execute(
            "SELECT id, role, content FROM conversation_messages "
            "WHERE session_id=? ORDER BY created_at ASC",
            (session_id,),
        ) as mcur:
            msg_rows = await mcur.fetchall()
        messages = [
            {"id": mid, "role": role, "content": content or ""}
            for (mid, role, content) in msg_rows
        ]
        turns = self._chunker.chunk_messages(messages)
        for turn in turns:
            try:
                await self._upsert_turn(session_id, user_id, turn)
                n += 1
            except Exception as exc:
                logger.warning(
                    "memory_ingest_failed",
                    source_type=SOURCE_TYPE,
                    reason=str(exc),
                    session_id=session_id,
                )
    logger.info("memory_backfill_chat_done", count=n, user_id=user_id)
    return n