DESIGN PROPOSALawaiting scope approval

Event-Driven Triggers + Reactive Agents

A generic substrate for building reactive applications (e.g. live sports betting) on top of the existing hub. Part A = event-driven message triggers. Part B = non-conversational, data-prepared agents that run to completion.

1. Summary

The goal splits into two reusable layers plus a thin demo app:

Key finding: roughly 80% of the required machinery already exists in the codebase. The plan reuses it rather than rebuilding.

2. What already exists (reuse)

CapabilityWhat it gives us
EXISTSDispatchBackend HTTP POST /api/hub-control/{hub_id}/inject (message to existing instance) and /inject-and-create (create instance + message). routes.py:411 / :444
EXISTSSchedulerSeparate APScheduler process + CHMP handler (EntityType.SCHEDULER) where agents register cron/interval/once jobs that drive agents via the dispatch path. The time-based twin of Part A - we mirror its architecture.
EXISTSTool loopAgents already run a sequence of tool calls to completion: _run_single_turn / _run_llm_turn / _resume_after_capability in agent/base.py. 'Place bet -> check data -> close bet' needs no new agent class.
EXISTSseed_parts / resolveAlready does 'prepare a list of data (memory reads, shell, capability calls, literals) and deliver as ONE user turn'. This is Part B's prepared-data list.
EXISTSData layersports_data (live_game_state, live_odds, scores, news, game_summary), fifa_worldcup, kalshi. The sports_betting memory env already exists.

3. Part A - Generic trigger system NEW generalize the scheduler

Time triggers (cron/interval/once) and event triggers are the same idea: a condition that fires a message to an agent. So Part A is not a new subsystem - it generalizes the existing scheduler. One table, one handler, one dispatch path; the only thing that differs is the trigger source - a clock vs a push.

1. Extend scheduler.jobs REUSE

No new table - add an event trigger type to the existing one:

schedule_type       = 'event'   // new, alongside cron/interval/once
schedule_expression = 'sports_feed:goal'  // the event key (source:type)

// two new nullable columns:
match_filter     jsonb   // optional field comparisons
extra_seed_parts jsonb   // optional per-trigger seeds

All other columns (agent_id, agent_instance_id, supervisor_message, hub_id, provider, model, enabled, fallback_to_new_hub) are reused as-is.

2. Reuse the scheduler handler REUSE

Same EntityType.SCHEDULER CHMP handler, same ops: create / list / get / update / delete / enable / disable. The only change is adding "event" to VALID_SCHEDULE_TYPES + validation. No new entity type, no new handler - agents register event triggers with the same create op they already use for cron jobs.

3. Event arrival - the one new path NEW

POST /api/events/ingest { source, type, payload }

The only genuinely new piece. It queries enabled schedule_type='event' rows matching the key (+ match_filter) and dispatches via the same run_heartbeat_job path (inject / inject-and-create). The scheduler process just skips event rows in its clock loops - exactly how it already skips once.

scheduler/utils.py's docstring already says it is 'shared between the scheduler service and the backend trigger endpoint' - so this goes with the grain.

4. Pluggable producers

Small workers that watch a data source and emit events to /ingest. Comparisons + edge-detection live here (see section 7). Generic - any app adds its own producer without touching the core. A producer can even be a normal scheduled interval job, so no new process type is required.

4. Part B - Reactive agent pattern NEW

Not a new LLM class - a prompt capability + a lifecycle convention over existing machinery.

reactive_agent prompt capability

Defines the contract:

  1. You are triggered by an event or supervisor message - not a conversation.
  2. The trigger payload + seed_parts give you a prepared data list.
  3. Gather any additional data via tools.
  4. Act to completion (place bet, fetch more, close bet).
  5. Optionally register follow-up triggers via the scheduler capability (schedule_type=event).
  6. End the turn.

Lifecycle (recommended default)

Ephemeral instance per trigger via inject-and-create with extra_seed_parts = [event payload + data-prep resolve parts]. This maps exactly to 'a prepared list of data -> a response -> run to completion'. (Alternative: a long-lived instance that calls clear_history between events.)

5. End-to-end flow (sports betting)

Each step flows into the next. The coloured tag shows which part of the system is acting.

  1. Feed pollerDetects a goalPolls sports_data.live_game_state, diffs state, and sees a new goal.
  2. Feed pollerFires the eventPOST /api/events/ingest with { source: sports_feed, type: goal, payload: { game, team, odds } }
  3. Event systemMatches a subscriptionFinds the enabled trigger for { soccer, goal } and dispatches it via inject-and-create.
  4. Agent bootsports_bettor wakes with prepared dataFirst turn = strategy rules + ledger (sports_betting env) + live odds + the event payload, delivered as ONE turn.
  5. Agent turn 1Evaluate the edge, place a betTool call: place a mock bet (memory write to the ledger).
  6. Agent turn 2Register a follow-up, then stopTool call: scheduler.create { schedule_type: event, type: final_whistle }, then end the turn.
  7. LaterFinal whistle event firesThe follow-up trigger matches and re-injects sports_bettor.
  8. AgentSettle the betCloses the position and updates running P&L.

6. How an agent builds this itself

Everything below is done by an agent sending CHMP messages - no human code, no deploy. That is what makes the loop self-service: an agent can author a reactive agent, wire a trigger to it, and let that agent register further triggers at runtime.

Step 1 - Author a reactive agent spec → agent_builder

Define the agent once: its capabilities, its contract prompt, and the data it should auto-prepare on every trigger (seed_parts, resolved live into one turn).

{
  "header": {"message_title": "agent_builder_create_agent_request",
             "destination": {"id": "agent_builder", "type": "AGENT_BUILDER"}},
  "body": {
    "agent_id": "sports_bettor",
    "capabilities": [
      {"name": "central_hub",    "version": 1},
      {"name": "reactive_agent", "version": 1},
      {"name": "sports_data",    "version": 1},
      {"name": "event_trigger",  "version": 1},
      {"name": "memory",         "version": 1}
    ],
    "customized_prompt": "You are a live sports bettor. On each trigger, ...",
    "seed_parts": [
      {"literal": "## Strategy rules + open ledger"},
      {"request": {"header": {"message_title": "memory_read_request",
                   "destination": {"id": "memory", "type": "MEMORY"}},
                   "body": {"operation": "read", "environment": "sports_betting",
                            "key": "config/strategy/rules"}}}
    ]
  }
}

Step 2 - Register an event trigger → scheduler

The same scheduler handler used for cron jobs - just with schedule_type: "event". {date}/{run_id} in agent_instance_id means 'spawn a fresh instance per event'; a static id means 'deliver to the existing instance'.

{
  "header": {"message_title": "scheduler_create_job_request",
             "destination": {"id": "scheduler", "type": "SCHEDULER"}},
  "body": {
    "operation": "create",
    "name": "soccer_goal_to_bettor",
    "schedule_type": "event",
    "schedule_expression": "sports_feed:goal",   // event key = source:type
    "match_filter": {"league": "soccer"},          // optional comparison
    "agent_id": "sports_bettor",
    "agent_instance_id": "sports_bettor_{date}_{run_id}",  // fresh instance per event
    "supervisor_message": "A goal was scored. Evaluate the edge and act.",
    "owner_email": "owner@example.com",
    "enabled": true
  }
}

Identical shape to a cron job - only schedule_type plus the new match_filter / extra_seed_parts fields differ. One handler, one registry, one dispatch.

Step 3 - The loop closes itself

While the reactive agent is running, it issues the exact same register call to set up its OWN follow-up trigger (e.g. a final_whistle trigger to settle the bet). New triggers are built at runtime by the agent - no human in the loop.

7. Conditions & comparisons

A trigger condition (e.g. 'price first exceeds 100') can be a comparison. Recommended: the comparison lives in the producer. The producer turns a continuous signal into a discrete fact and emits only when the condition flips - the event bus then just routes facts.

PatternWhere the comparison runsGood forStateful?
A. Producer recommendedProducer holds threshold + prior state; emits price_cross_above only on the crossing'first exceeds / crosses / changed'Yes - state lives where it belongs
B. match_filterSubscription compares payload fields at routing timefan-out: one stream, many thresholdsNo - fires on every match; pair with one_shot
C. AgentAgent decides after wakingcomplex / changing logicYes, but highest wake-cost

Producer-side (Pattern A) - the recommended shape

// producer config, per watch
{ "symbol": "AAPL", "op": "gt", "threshold": 100 }

// emitted ONLY when price crosses 100 from below
POST /api/events/ingest
{ "source": "stocks", "type": "price_cross_above",
  "payload": { "symbol": "AAPL", "threshold": 100, "price": 100.3 } }

Clean refinement: declare the threshold in the subscription, and have the producer read enabled subscriptions to learn which thresholds to watch and do the edge-detection. The condition stays declarative; the statefulness stays in the producer.

match_filter (Pattern B) - stateless field comparison

"match_filter": { "symbol": "AAPL", "price": { "gte": 100 } }

Operators: eq ne gt gte lt lte in. Fires on every matching event (not just the first) - use one_shot: true for 'fire once, then disable'.

8. Proposed phasing

1
Part A core. Extend scheduler.jobs (schedule_type=event + match_filter/extra_seed_parts), add "event" to the SCHEDULER handler, and add /api/events/ingest reusing the existing dispatch. No producers yet - test by curling /ingest.
2
Reactive agents. reactive_agent prompt capability + one demo reactive spec wired to Phase 1.
3
Sports demo. Feed poller producer + concrete sports_bettor spec + subscriptions, demoed end-to-end.

9. Open questions

DECIDED: generalize the scheduler - one module (shared table + SCHEDULER handler + dispatch), events arrive via a push endpoint. No parallel subsystem.
2. Build Phase 1 only first, or all three phases and demo end-to-end?
3. Ephemeral-instance-per-trigger as the default reactive lifecycle - agree?
4. Confirm mock bets / paper trading only - real money never in scope.