A generic substrate for building reactive applications (e.g. live sports betting) on top of the existing hub. Part A = event-driven message triggers. Part B = non-conversational, data-prepared agents that run to completion.
The goal splits into two reusable layers plus a thin demo app:
sports_bettor reactive agent reacts to each.Key finding: roughly 80% of the required machinery already exists in the codebase. The plan reuses it rather than rebuilding.
| Capability | What it gives us |
|---|---|
| EXISTSDispatch | Backend HTTP POST /api/hub-control/{hub_id}/inject (message to existing instance) and /inject-and-create (create instance + message). routes.py:411 / :444 |
| EXISTSScheduler | Separate APScheduler process + CHMP handler (EntityType.SCHEDULER) where agents register cron/interval/once jobs that drive agents via the dispatch path. The time-based twin of Part A - we mirror its architecture. |
| EXISTSTool loop | Agents already run a sequence of tool calls to completion: _run_single_turn / _run_llm_turn / _resume_after_capability in agent/base.py. 'Place bet -> check data -> close bet' needs no new agent class. |
| EXISTSseed_parts / resolve | Already does 'prepare a list of data (memory reads, shell, capability calls, literals) and deliver as ONE user turn'. This is Part B's prepared-data list. |
| EXISTSData layer | sports_data (live_game_state, live_odds, scores, news, game_summary), fifa_worldcup, kalshi. The sports_betting memory env already exists. |
Time triggers (cron/interval/once) and event triggers are the same idea: a condition that fires a message to an agent. So Part A is not a new subsystem - it generalizes the existing scheduler. One table, one handler, one dispatch path; the only thing that differs is the trigger source - a clock vs a push.
No new table - add an event trigger type to the existing one:
schedule_type = 'event' // new, alongside cron/interval/once
schedule_expression = 'sports_feed:goal' // the event key (source:type)
// two new nullable columns:
match_filter jsonb // optional field comparisons
extra_seed_parts jsonb // optional per-trigger seeds
All other columns (agent_id, agent_instance_id, supervisor_message, hub_id, provider, model, enabled, fallback_to_new_hub) are reused as-is.
Same EntityType.SCHEDULER CHMP handler, same ops: create / list / get / update / delete / enable / disable. The only change is adding "event" to VALID_SCHEDULE_TYPES + validation. No new entity type, no new handler - agents register event triggers with the same create op they already use for cron jobs.
POST /api/events/ingest { source, type, payload }
The only genuinely new piece. It queries enabled schedule_type='event' rows matching the key (+ match_filter) and dispatches via the same run_heartbeat_job path (inject / inject-and-create). The scheduler process just skips event rows in its clock loops - exactly how it already skips once.
scheduler/utils.py's docstring already says it is 'shared between the scheduler service and the backend trigger endpoint' - so this goes with the grain.
Small workers that watch a data source and emit events to /ingest. Comparisons + edge-detection live here (see section 7). Generic - any app adds its own producer without touching the core. A producer can even be a normal scheduled interval job, so no new process type is required.
Not a new LLM class - a prompt capability + a lifecycle convention over existing machinery.
Defines the contract:
seed_parts give you a prepared data list.scheduler capability (schedule_type=event).Ephemeral instance per trigger via inject-and-create with extra_seed_parts = [event payload + data-prep resolve parts]. This maps exactly to 'a prepared list of data -> a response -> run to completion'. (Alternative: a long-lived instance that calls clear_history between events.)
Each step flows into the next. The coloured tag shows which part of the system is acting.
sports_data.live_game_state, diffs state, and sees a new goal.POST /api/events/ingest with { source: sports_feed, type: goal, payload: { game, team, odds } }inject-and-create.sports_bettor wakes with prepared dataFirst turn = strategy rules + ledger (sports_betting env) + live odds + the event payload, delivered as ONE turn.scheduler.create { schedule_type: event, type: final_whistle }, then end the turn.sports_bettor.Everything below is done by an agent sending CHMP messages - no human code, no deploy. That is what makes the loop self-service: an agent can author a reactive agent, wire a trigger to it, and let that agent register further triggers at runtime.
agent_builderDefine the agent once: its capabilities, its contract prompt, and the data it should auto-prepare on every trigger (seed_parts, resolved live into one turn).
{
"header": {"message_title": "agent_builder_create_agent_request",
"destination": {"id": "agent_builder", "type": "AGENT_BUILDER"}},
"body": {
"agent_id": "sports_bettor",
"capabilities": [
{"name": "central_hub", "version": 1},
{"name": "reactive_agent", "version": 1},
{"name": "sports_data", "version": 1},
{"name": "event_trigger", "version": 1},
{"name": "memory", "version": 1}
],
"customized_prompt": "You are a live sports bettor. On each trigger, ...",
"seed_parts": [
{"literal": "## Strategy rules + open ledger"},
{"request": {"header": {"message_title": "memory_read_request",
"destination": {"id": "memory", "type": "MEMORY"}},
"body": {"operation": "read", "environment": "sports_betting",
"key": "config/strategy/rules"}}}
]
}
}
schedulerThe same scheduler handler used for cron jobs - just with schedule_type: "event". {date}/{run_id} in agent_instance_id means 'spawn a fresh instance per event'; a static id means 'deliver to the existing instance'.
{
"header": {"message_title": "scheduler_create_job_request",
"destination": {"id": "scheduler", "type": "SCHEDULER"}},
"body": {
"operation": "create",
"name": "soccer_goal_to_bettor",
"schedule_type": "event",
"schedule_expression": "sports_feed:goal", // event key = source:type
"match_filter": {"league": "soccer"}, // optional comparison
"agent_id": "sports_bettor",
"agent_instance_id": "sports_bettor_{date}_{run_id}", // fresh instance per event
"supervisor_message": "A goal was scored. Evaluate the edge and act.",
"owner_email": "owner@example.com",
"enabled": true
}
}
Identical shape to a cron job - only schedule_type plus the new match_filter / extra_seed_parts fields differ. One handler, one registry, one dispatch.
While the reactive agent is running, it issues the exact same register call to set up its OWN follow-up trigger (e.g. a final_whistle trigger to settle the bet). New triggers are built at runtime by the agent - no human in the loop.
A trigger condition (e.g. 'price first exceeds 100') can be a comparison. Recommended: the comparison lives in the producer. The producer turns a continuous signal into a discrete fact and emits only when the condition flips - the event bus then just routes facts.
| Pattern | Where the comparison runs | Good for | Stateful? |
|---|---|---|---|
| A. Producer recommended | Producer holds threshold + prior state; emits price_cross_above only on the crossing | 'first exceeds / crosses / changed' | Yes - state lives where it belongs |
| B. match_filter | Subscription compares payload fields at routing time | fan-out: one stream, many thresholds | No - fires on every match; pair with one_shot |
| C. Agent | Agent decides after waking | complex / changing logic | Yes, but highest wake-cost |
// producer config, per watch
{ "symbol": "AAPL", "op": "gt", "threshold": 100 }
// emitted ONLY when price crosses 100 from below
POST /api/events/ingest
{ "source": "stocks", "type": "price_cross_above",
"payload": { "symbol": "AAPL", "threshold": 100, "price": 100.3 } }
Clean refinement: declare the threshold in the subscription, and have the producer read enabled subscriptions to learn which thresholds to watch and do the edge-detection. The condition stays declarative; the statefulness stays in the producer.
"match_filter": { "symbol": "AAPL", "price": { "gte": 100 } }
Operators: eq ne gt gte lt lte in. Fires on every matching event (not just the first) - use one_shot: true for 'fire once, then disable'.
scheduler.jobs (schedule_type=event + match_filter/extra_seed_parts), add "event" to the SCHEDULER handler, and add /api/events/ingest reusing the existing dispatch. No producers yet - test by curling /ingest.reactive_agent prompt capability + one demo reactive spec wired to Phase 1.sports_bettor spec + subscriptions, demoed end-to-end.