Filesystem Event Bus
When two concurrent branches need to communicate, a shared filesystem directory works as a message bus. The producer writes JSON files; the consumer reads and processes them. No engine extensions required — this uses all, loop, branch, and a reusable queue abstraction.
Both strategies below support multiple concurrent consumers via atomic renameSync claiming.
Strategy 1: Delete on dequeue
Consumer claims an item via rename, reads it, then deletes it. Simple, but if the consumer crashes after claiming, the item is lost.
Queue functions
export function enqueue<T>(dir: string, id: string, maxSize: number, item: T, schema: z.ZodType<T>): Option<null> {
schema.parse(item);
mkdirSync(dir, { recursive: true });
const files = readdirSync(dir);
// Idempotent: skip if this ID already exists in any state
if (files.some(f => f.startsWith(`${id}.`))) {
return some(null);
}
const unclaimed = files.filter(f => f.endsWith(".unclaimed.json"));
if (unclaimed.length >= maxSize) {
return none();
}
writeFileSync(join(dir, `${id}.unclaimed.json`), JSON.stringify(item));
return some(null);
}
export function dequeue<T>(dir: string, schema: z.ZodType<T>): Option<T> {
mkdirSync(dir, { recursive: true });
const files = readdirSync(dir).filter(f => f.endsWith(".unclaimed.json")).sort();
for (const file of files) {
const filepath = join(dir, file);
const claimedPath = filepath.replace(".unclaimed.json", ".claimed.json");
try {
renameSync(filepath, claimedPath);
} catch {
continue; // another consumer claimed it first
}
const raw = JSON.parse(readFileSync(claimedPath, "utf-8"));
unlinkSync(claimedPath);
return some(schema.parse(raw));
}
return none();
}
Pipeline
all(
// producer
loop<null, null>((recur, done) =>
pipe(generateEvent, enqueueEvent).branch({
Some: recur,
None: done,
}),
),
// consumer
loop<null, null>((recur, done) =>
dequeueEvent.branch({
Some: processEvent.then(recur),
None: isDone.then(asOption()).branch({
Some: done,
None: sleep(50).then(recur),
}),
}),
),
)
Use this when items are cheap to reproduce or losing an in-flight item is acceptable.
Strategy 2: Claim and complete
Three states tracked via filename suffix:
| Suffix | Meaning |
|---|---|
.unclaimed.json | Available for dequeue |
.pending.json | Claimed — being processed |
.done.json | Completed |
The consumer claims an item via renameSync (atomic on POSIX), processes it, then marks it complete with another rename. If the consumer crashes mid-processing, the .pending.json file survives and can be detected for recovery.
Queue functions
export function enqueue<T>(dir: string, id: string, maxSize: number, item: T, schema: z.ZodType<T>): Option<null> {
schema.parse(item);
mkdirSync(dir, { recursive: true });
const files = readdirSync(dir);
// Idempotent: skip if this ID already exists in any state
if (files.some(f => f.startsWith(`${id}.`))) {
return some(null);
}
const active = files.filter(f => f.endsWith(".unclaimed.json") || f.endsWith(".pending.json"));
if (active.length >= maxSize) {
return none();
}
writeFileSync(join(dir, `${id}.unclaimed.json`), JSON.stringify(item));
return some(null);
}
export function dequeue<T>(dir: string, schema: z.ZodType<T>): Option<{ id: string; item: T }> {
mkdirSync(dir, { recursive: true });
const files = readdirSync(dir).filter(f => f.endsWith(".unclaimed.json")).sort();
for (const file of files) {
const id = file.replace(".unclaimed.json", "");
const filepath = join(dir, file);
const pendingPath = join(dir, `${id}.pending.json`);
try {
renameSync(filepath, pendingPath);
} catch {
continue; // another consumer claimed it first
}
const raw = JSON.parse(readFileSync(pendingPath, "utf-8"));
return some({ id, item: schema.parse(raw) });
}
return none();
}
export function complete(dir: string, id: string): void {
const pendingPath = join(dir, `${id}.pending.json`);
const donePath = join(dir, `${id}.done.json`);
renameSync(pendingPath, donePath);
}
export function resetPending(dir: string): void {
mkdirSync(dir, { recursive: true });
const files = readdirSync(dir).filter(f => f.endsWith(".pending.json"));
for (const file of files) {
const id = file.replace(".pending.json", "");
renameSync(join(dir, file), join(dir, `${id}.unclaimed.json`));
}
}
export function clearQueue(dir: string): void {
mkdirSync(dir, { recursive: true });
const files = readdirSync(dir).filter(f => f.endsWith(".json"));
for (const file of files) {
unlinkSync(join(dir, file));
}
}
Typed handlers
export const dequeueEvent = createHandler({
inputValidator: z.null(),
outputValidator: optionSchema(ClaimedEventSchema),
handle: async (): Promise<Option<{ id: string; item: Event }>> => {
return dequeue(QUEUE_DIR, EventSchema);
},
}, "dequeueEvent");
export const completeEvent = createHandler({
inputValidator: z.object({ id: z.string() }),
outputValidator: z.null(),
handle: async ({ value: { id } }) => {
complete(QUEUE_DIR, id);
return null;
},
}, "completeEvent");
Pipeline
The consumer uses bindInput to thread the claimed event's id through processing and into the completion step:
function makeConsumerLoop(): TypedAction<null, null> {
return loop<null, null>((recur, done) =>
dequeueEvent.branch({
Some: bindInput<ClaimedEvent, never>((claimed) =>
pipe(
claimed.getField("item"),
consumeEvent,
claimed.pick("id"),
completeEvent,
recur,
),
),
None: isDone.then(asOption()).branch({
Some: done,
None: sleep(50).then(recur),
}),
}),
);
}
all(
makeProducerLoop(0),
makeProducerLoop(1),
makeProducerLoop(2),
makeConsumerLoop(),
makeConsumerLoop(),
)
Use this when losing an in-flight item is unacceptable, or when you want an audit trail of completed items.
Properties
Both strategies share:
- Idempotent enqueue.
enqueuechecks whether the ID already exists in any state (startsWithmatch on filename prefix). Safe to retry after crash. - Backpressure.
enqueuereturnsNonewhen full. Strategy 1 counts unclaimed items; Strategy 2 counts active items (unclaimed + pending) since pending items still occupy logical capacity. - Atomic claiming.
renameSyncis atomic on POSIX. First consumer to rename wins; others get ENOENT and try the next file. Safe for multiple concurrent consumers. - FIFO. Lexicographic sort on filenames gives ordering.
- Validation. Both
enqueueanddequeuevalidate against the Zod schema. - Debuggable.
lsthe queue directory to see item states at a glance via suffixes.
Strategy 2 additionally provides:
- Crash recovery.
.pending.jsonfiles indicate items that were claimed but never completed.resetPendingmoves them back to.unclaimed.jsonfor reprocessing. - Audit trail.
.done.jsonfiles record what was processed.
Limitations
- Polling. The consumer sleeps when the queue is empty. Latency equals the sleep interval.
- Single machine. The directory must be locally accessible to both branches.
- Capacity race.
readdirSynccount is best-effort. Concurrent producers may overshootmaxSizeby up to (N-1). - Same-millisecond ordering. Items enqueued in the same millisecond have arbitrary relative order.