Skip to content

Commit 28835c6

Browse files
committed
feat: add observable ActorSystem with events, actors, and subscribe
1 parent dbf801b commit 28835c6

12 files changed

Lines changed: 829 additions & 42 deletions

File tree

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
---
2+
"effect-machine": minor
3+
---
4+
5+
Add observable ActorSystem and wire up actor.children for persistent actors
6+
7+
**ActorSystem observation:**
8+
9+
- `system.subscribe(fn)` — sync callback for `ActorSpawned` / `ActorStopped` events, returns unsubscribe
10+
- `system.actors` — sync snapshot of all registered actors (`ReadonlyMap`)
11+
- `system.events` — async `Stream<SystemEvent>` via PubSub (each subscriber gets own queue)
12+
- Works with both explicit (`ActorSystemDefault`) and implicit (`Machine.spawn`) systems
13+
- No events emitted during system teardown
14+
- Double-stop prevention: `system.stop` + scope finalizer won't emit duplicate `ActorStopped`
15+
16+
**actor.children:**
17+
18+
- Wire up `childrenMap` in persistent actors so `actor.children` reflects `self.spawn` children
19+
- Children auto-removed from map on scope close (state-scoped cleanup)

AGENTS.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,36 @@ actor.sendAndWait(ev, State.X); // Send + wait for state
135135
actor.awaitFinal; // Wait for final state
136136
actor.subscribe(fn); // Sync callback, returns unsubscribe
137137
actor.system; // ActorSystem — access child actors via .get(id)
138+
actor.children; // ReadonlyMap<string, ActorRef> — child actors spawned via self.spawn
138139
```
139140

141+
## System Observation
142+
143+
Observe actors joining/leaving the system:
144+
145+
```ts
146+
const system = yield * ActorSystemService;
147+
148+
// Sync callback (like ActorRef.subscribe pattern)
149+
const unsub = system.subscribe((event) => {
150+
// event: { _tag: "ActorSpawned" | "ActorStopped", id: string, actor: ActorRef }
151+
});
152+
153+
// Sync snapshot of all registered actors
154+
const actors: ReadonlyMap<string, ActorRef> = system.actors;
155+
156+
// Async stream (each subscriber gets own queue — late subscribers miss prior events)
157+
yield *
158+
system.events.pipe(
159+
Stream.tap((e) => Effect.log(e._tag, e.id)),
160+
Stream.runDrain,
161+
);
162+
```
163+
164+
- `system.actors` returns a new Map on each access (snapshot, not live)
165+
- No events emitted during system teardown (PubSub is shutting down)
166+
- Works with both explicit (`ActorSystemDefault`) and implicit (`Machine.spawn`) systems
167+
140168
## spawn vs on
141169

142170
- `.on()` - transitions, guards/effects run inline

README.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,30 @@ const child = yield * parent.system.get("worker-1"); // Option<ActorRef>
206206

207207
Every actor always has a system — `Machine.spawn` creates an implicit one if no `ActorSystem` is in context.
208208

209+
### System Observation
210+
211+
React to actors joining and leaving the system:
212+
213+
```ts
214+
const system = yield * ActorSystemService;
215+
216+
// Sync callback — like ActorRef.subscribe
217+
const unsub = system.subscribe((event) => {
218+
// event._tag: "ActorSpawned" | "ActorStopped"
219+
console.log(`${event._tag}: ${event.id}`);
220+
});
221+
222+
// Sync snapshot of all registered actors
223+
const actors = system.actors; // ReadonlyMap<string, ActorRef>
224+
225+
// Async stream (each subscriber gets own queue)
226+
yield *
227+
system.events.pipe(
228+
Stream.tap((e) => Effect.log(e._tag, e.id)),
229+
Stream.runDrain,
230+
);
231+
```
232+
209233
### Testing
210234

211235
Test transitions without actors:
@@ -297,6 +321,18 @@ See the [primer](./primer/) for comprehensive documentation:
297321
| `actor.sendAndWait(ev, State.X)` | Send + wait for state |
298322
| `actor.subscribe(fn)` | Sync callback |
299323
| `actor.system` | Access the actor's `ActorSystem` |
324+
| `actor.children` | Child actors (`ReadonlyMap`) |
325+
326+
### ActorSystem
327+
328+
| Method / Property | Description |
329+
| ---------------------- | ------------------------------------------- |
330+
| `system.spawn(id, m)` | Spawn actor |
331+
| `system.get(id)` | Get actor by ID |
332+
| `system.stop(id)` | Stop actor by ID |
333+
| `system.actors` | Sync snapshot of all actors (`ReadonlyMap`) |
334+
| `system.subscribe(fn)` | Sync callback for spawn/stop events |
335+
| `system.events` | Async `Stream<SystemEvent>` for spawn/stop |
300336

301337
## License
302338

SKILL.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,21 @@ Effect.runPromise(Effect.scoped(program.pipe(Effect.provide(ActorSystemDefault))
124124
| `actor.snapshotSync()` | Get current state (sync) |
125125
| `actor.matches(tag)` | Check state tag |
126126
| `actor.subscribe(fn)` | Sync callback, returns unsubscribe |
127+
| `actor.system` | Access the actor's `ActorSystem` |
128+
| `actor.children` | Child actors (`ReadonlyMap`) |
129+
130+
## System Observation
131+
132+
```ts
133+
// Sync callback — ActorSpawned / ActorStopped events
134+
const unsub = system.subscribe((event) => console.log(event._tag, event.id));
135+
136+
// Sync snapshot of all registered actors
137+
const actors: ReadonlyMap<string, ActorRef> = system.actors;
138+
139+
// Async stream (late subscribers miss prior events)
140+
system.events.pipe(Stream.take(10), Stream.runCollect);
141+
```
127142

128143
## Testing
129144

primer/actors.md

Lines changed: 70 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,24 +29,25 @@ Effect.runPromise(Effect.scoped(program).pipe(Effect.provide(ActorSystemDefault)
2929

3030
## ActorRef Methods
3131

32-
| Method | Type | Description |
33-
| -------------------------- | ----------------- | ---------------------------------- |
34-
| `send(event)` | `Effect<void>` | Queue event for processing |
35-
| `sendSync(event)` | `void` | Fire-and-forget (sync, for UI) |
36-
| `snapshot` | `Effect<State>` | Get current state |
37-
| `snapshotSync()` | `State` | Get current state (sync) |
38-
| `matches(tag)` | `Effect<boolean>` | Check if in state |
39-
| `matchesSync(tag)` | `boolean` | Check if in state (sync) |
40-
| `can(event)` | `Effect<boolean>` | Can handle event in current state? |
41-
| `canSync(event)` | `boolean` | Can handle event? (sync) |
42-
| `changes` | `Stream<State>` | Stream of state changes |
43-
| `waitFor(State.X)` | `Effect<State>` | Wait for state (constructor or fn) |
44-
| `awaitFinal` | `Effect<State>` | Wait for final state |
45-
| `sendAndWait(ev, State.X)` | `Effect<State>` | Send + wait for state |
46-
| `sendAndWait(ev)` | `Effect<State>` | Send + wait for final state |
47-
| `subscribe(fn)` | `() => void` | Sync callback, returns unsubscribe |
48-
| `system` | `ActorSystem` | Access the actor's system |
49-
| `stop` | `Effect<void>` | Stop actor gracefully |
32+
| Method | Type | Description |
33+
| -------------------------- | ----------------- | ----------------------------------- |
34+
| `send(event)` | `Effect<void>` | Queue event for processing |
35+
| `sendSync(event)` | `void` | Fire-and-forget (sync, for UI) |
36+
| `snapshot` | `Effect<State>` | Get current state |
37+
| `snapshotSync()` | `State` | Get current state (sync) |
38+
| `matches(tag)` | `Effect<boolean>` | Check if in state |
39+
| `matchesSync(tag)` | `boolean` | Check if in state (sync) |
40+
| `can(event)` | `Effect<boolean>` | Can handle event in current state? |
41+
| `canSync(event)` | `boolean` | Can handle event? (sync) |
42+
| `changes` | `Stream<State>` | Stream of state changes |
43+
| `waitFor(State.X)` | `Effect<State>` | Wait for state (constructor or fn) |
44+
| `awaitFinal` | `Effect<State>` | Wait for final state |
45+
| `sendAndWait(ev, State.X)` | `Effect<State>` | Send + wait for state |
46+
| `sendAndWait(ev)` | `Effect<State>` | Send + wait for final state |
47+
| `subscribe(fn)` | `() => void` | Sync callback, returns unsubscribe |
48+
| `system` | `ActorSystem` | Access the actor's system |
49+
| `children` | `ReadonlyMap` | Child actors spawned via self.spawn |
50+
| `stop` | `Effect<void>` | Stop actor gracefully |
5051

5152
## Sending Events
5253

@@ -247,6 +248,57 @@ const stopped = yield * system.stop("order-1");
247248
// true if actor existed and was stopped
248249
```
249250

251+
## System Observation
252+
253+
Observe actors joining and leaving the system without polling:
254+
255+
### Sync Callback
256+
257+
```ts
258+
const system = yield * ActorSystemService;
259+
260+
const unsub = system.subscribe((event) => {
261+
switch (event._tag) {
262+
case "ActorSpawned":
263+
console.log(`Spawned: ${event.id}`);
264+
break;
265+
case "ActorStopped":
266+
console.log(`Stopped: ${event.id}`);
267+
// event.actor still readable — can check final state
268+
break;
269+
}
270+
});
271+
272+
// Later: unsub() to stop receiving events
273+
```
274+
275+
### Actors Snapshot
276+
277+
```ts
278+
// Sync snapshot — returns new Map each time (not live)
279+
const actors = system.actors; // ReadonlyMap<string, ActorRef>
280+
console.log(`${actors.size} actors running`);
281+
```
282+
283+
### Async Stream
284+
285+
```ts
286+
// Each subscriber gets own queue — late subscribers miss prior events
287+
yield *
288+
system.events.pipe(
289+
Stream.tap((e) => Effect.log(e._tag, e.id)),
290+
Stream.runDrain,
291+
);
292+
```
293+
294+
### Edge Cases
295+
296+
- **System teardown**: No events emitted — PubSub is shutting down
297+
- **Late stream subscribers**: Miss prior events (PubSub gives each subscriber their own queue)
298+
- **Listener errors**: Caught and ignored — won't crash the system
299+
- **Scope cleanup**: Actors stopped via scope close emit `ActorStopped`
300+
- **Double stop**: `system.stop(id)` + scope finalizer won't double-emit (guarded by map check)
301+
250302
## Duplicate Actor Prevention
251303

252304
Same ID cannot be spawned twice:

primer/index.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,17 @@ Effect.runPromise(Effect.scoped(program.pipe(Effect.provide(ActorSystemDefault))
164164
| `actor.waitFor(State.X)` | No | Wait for state (constructor or fn) |
165165
| `actor.sendAndWait(ev, State.X)` | No | Send + wait for state |
166166
| `actor.subscribe(fn)` | Yes | Sync callback |
167+
| `actor.children` | Sync | Child actors (`ReadonlyMap`) |
168+
169+
### ActorSystem Observation
170+
171+
| Method / Property | Description |
172+
| ---------------------- | ------------------------------------------- |
173+
| `system.actors` | Sync snapshot of all actors (`ReadonlyMap`) |
174+
| `system.subscribe(fn)` | Sync callback for `SystemEvent` |
175+
| `system.events` | Async `Stream<SystemEvent>` |
176+
177+
`SystemEvent` = `{ _tag: "ActorSpawned" | "ActorStopped", id: string, actor: ActorRef }`
167178

168179
## See Also
169180

0 commit comments

Comments
 (0)