Builtins
Barnum provides typed combinators for composing workflows. TypeScript tracks input and output types through the entire pipeline via phantom types.
Every combinator is either a standalone function (imported from barnum) or a postfix method on TypedAction (chained via .method()), or both. The tables below note availability.
Control flowβ
pipe(...actions)β
Sequential composition. The output of each action becomes the input of the next.
function pipe<T1, T2, T3>(
a: Pipeable<T1, T2>,
b: Pipeable<T2, T3>,
): TypedAction<T1, T3>
Up to 10 actions. Zero arguments returns identity; one argument wraps the action.
Postfix: .then(next) chains a single action.
listFiles.then(forEach(processFile)).then(commit)
// equivalent to pipe(listFiles, forEach(processFile), commit)
all(...actions)β
Run multiple actions concurrently on the same input. Collects outputs as a tuple.
function all<T1, A, B, C>(
a: Pipeable<T1, A>,
b: Pipeable<T1, B>,
c: Pipeable<T1, C>,
): TypedAction<T1, [A, B, C]>
Up to 10 actions. Zero arguments returns TypedAction<any, []>.
Postfix: No.
all(analyzeStyle, analyzeLogic, analyzeSecurity)
// Input: string β Output: [StyleReport, LogicReport, SecurityReport]
chain(first, rest)β
Binary chain. Equivalent to pipe(first, rest).
function chain<T1, T2, T3>(
first: Pipeable<T1, T2>,
rest: Pipeable<T2, T3>,
): TypedAction<T1, T3>
Postfix: .then(rest) is the postfix equivalent.
forEach(action)β
Apply an action to each element of an array, concurrently.
function forEach<TIn, TOut>(
action: Pipeable<TIn, TOut>,
): TypedAction<TIn[], TOut[]>
Postfix: Yes β .forEach(action) on an action that outputs an array.
listFiles.forEach(processFile)
// Input: string[] β Output: ProcessResult[]
branch(cases)β
Dispatch on a tagged union's kind field. Each variant maps to a handler that receives the unwrapped value.
function branch<TCases extends Record<string, Action>>(
cases: TCases,
): TypedAction<BranchInput<TCases>, ExtractOutput<TCases[keyof TCases]>>
All case handlers must produce the same output type.
Postfix: Yes β .branch(cases) on an action that outputs a tagged union.
classify.branch({
NeedsRefactor: refactor,
Clean: drop,
})
loop(bodyFn)β
Iterative loop. The body receives recur (restart with new input) and done (break with value). The body must never complete normally β it always calls recur or done.
function loop<TBreak, TIn>(
bodyFn: (
recur: TypedAction<TIn, never>,
done: TypedAction<TBreak, never>,
) => Pipeable<TIn, never>,
): TypedAction<TIn, TBreak>
Postfix: No.
loop((recur, done) =>
pipe(typeCheck, classifyErrors).branch({
HasErrors: pipe(fix, recur),
Clean: done,
})
)
tryCatch(body, recovery)β
Type-level error handling. The body receives a throwError token; firing it routes to the recovery arm. Both arms must return the same type.
function tryCatch<TIn, TOut, TError>(
body: (throwError: TypedAction<TError, never>) => Pipeable<TIn, TOut>,
recovery: Pipeable<TError, TOut>,
): TypedAction<TIn, TOut>
Handles type-level errors only (not exceptions/panics).
Postfix: No.
tryCatch(
(throwError) => pipe(riskyStep, Result.unwrapOr(throwError)),
fallbackStep,
)
race(...actions)β
Run actions concurrently. First to complete wins; others are cancelled.
function race<TIn, TOut>(
...actions: Pipeable<TIn, TOut>[]
): TypedAction<TIn, TOut>
All actions must have identical input and output types.
Postfix: No.
withTimeout(ms, body)β
Race an action against a timer. Returns Result<TOut, void>.
function withTimeout<TIn, TOut>(
ms: Pipeable<TIn, number>,
body: Pipeable<TIn, TOut>,
): TypedAction<TIn, Result<TOut, void>>
Ok(value) if body completes, Err(void) on timeout.
Postfix: No.
withTimeout(constant(30_000), slowStep)
earlyReturn(bodyFn)β
Create a scope with an early exit. The body receives an earlyReturn token. Output type is the union of normal completion and early return.
function earlyReturn<TEarlyReturn, TIn, TOut>(
bodyFn: (
earlyReturn: TypedAction<TEarlyReturn, never>,
) => Pipeable<TIn, TOut>,
): TypedAction<TIn, TEarlyReturn | TOut>
Postfix: No.
recur(bodyFn)β
Restartable scope. The body receives a restart token that re-executes the body from the beginning with new input.
function recur<TIn, TOut>(
bodyFn: (
restart: TypedAction<TIn, never>,
) => Pipeable<TIn, TOut>,
): TypedAction<TIn, TOut>
Postfix: No.
sleep()β
Delay for the number of milliseconds specified by the input. Cancellable during race teardown.
function sleep(): TypedAction<number, void>
Postfix: No.
bind(bindings, body)β
Bind concurrent values as typed references (VarRef). All bindings are evaluated concurrently; the body receives an array of typed references that can be dereferenced anywhere in the pipeline.
function bind<TBindings extends Action[], TOut>(
bindings: [...TBindings],
body: (vars: InferVarRefs<TBindings>) => BodyResult<TOut>,
): TypedAction<ExtractInput<TBindings[number]>, TOut>
Postfix: No.
bind([getConfig, getUser], ([configRef, userRef]) =>
pipe(processWithConfig(configRef), notifyUser(userRef))
)
bindInput(body)β
Capture the pipeline input as a VarRef for later reference deeper in the pipeline.
function bindInput<TIn, TOut>(
body: (input: VarRef<TIn>) => BodyResult<TOut>,
): TypedAction<TIn, TOut>
Sugar for bind([identity], ([input]) => pipe(drop, body(input))).
Postfix: No.
withResource({ create, action, dispose })β
RAII-style resource management. Creates a resource, merges it with the input, runs an action, then disposes.
function withResource<
TIn extends Record<string, unknown>,
TResource extends Record<string, unknown>,
TOut,
>(args: {
create: Pipeable<TIn, TResource>,
action: Pipeable<TResource & TIn, TOut>,
dispose: Pipeable<TResource, unknown>,
}): TypedAction<TIn, TOut>
Postfix: No.
withResource({
create: createWorktree,
action: doWork,
dispose: cleanupWorktree,
})
dropResult(action)β
Run an action for side effects, discard its output. Returns never (terminates the pipeline β typically used before drop or another action).
function dropResult<TInput, TOutput>(
action: Pipeable<TInput, TOutput>,
): TypedAction<TInput, never>
Postfix: No.
Data manipulationβ
constant(value)β
Produce a fixed value, ignoring the pipeline input.
function constant<TValue>(value: TValue): TypedAction<any, TValue>
Postfix: No.
constant("hello")
// Input: anything β Output: "hello"
identityβ
Pass input through unchanged. A value, not a function.
const identity: TypedAction<any, any>
Postfix: No.
dropβ
Discard the pipeline value. Produces never. A value, not a function.
const drop: TypedAction<any, never>
Postfix: Yes β .drop().
sideEffect.drop()
// equivalent to pipe(sideEffect, drop)
tag(kind)β
Wrap the input as a tagged union member: { kind, value: input }.
function tag<
TDef extends Record<string, unknown>,
TKind extends keyof TDef & string,
>(kind: TKind): TypedAction<TDef[TKind], TaggedUnion<TDef>>
Postfix: Yes β .tag(kind).
tag<{ NeedsRefactor: FileInfo; Clean: FileInfo }, "NeedsRefactor">("NeedsRefactor")
// Input: FileInfo β Output: TaggedUnion<{ NeedsRefactor: FileInfo; Clean: FileInfo }>
merge()β
Merge a tuple of objects into a single object via intersection.
function merge<
TObjects extends Record<string, unknown>[],
>(): TypedAction<TObjects, UnionToIntersection<TObjects[number]>>
Postfix: Yes β .merge().
all(getUser, getSettings).merge()
// Output: User & Settings
flatten()β
Flatten a nested array one level.
function flatten<TElement>(): TypedAction<TElement[][], TElement[]>
Postfix: Yes β .flatten().
extractField(field) / .get(field)β
Extract a single field from an object.
function extractField<
TObj extends Record<string, unknown>,
TField extends keyof TObj & string,
>(field: TField): TypedAction<TObj, TObj[TField]>
Postfix: Yes β .get(field).
getUserProfile.get("email")
// equivalent to pipe(getUserProfile, extractField("email"))
extractIndex(index)β
Extract a single element from a tuple by index.
function extractIndex<TTuple extends unknown[], TIndex extends number>(
index: TIndex,
): TypedAction<TTuple, TTuple[TIndex]>
Postfix: No.
pick(...keys)β
Select named fields from an object.
function pick<
TObj extends Record<string, unknown>,
TKeys extends (keyof TObj & string)[],
>(...keys: TKeys): TypedAction<TObj, Pick<TObj, TKeys[number]>>
Postfix: Yes β .pick(...keys).
getUserProfile.pick("name", "email")
// equivalent to pipe(getUserProfile, pick("name", "email"))
range(start, end)β
Produce an integer array [start, start+1, ..., end-1]. Computed at AST build time (emits a constant node).
function range(start: number, end: number): TypedAction<any, number[]>
Postfix: No.
augment(action)β
Run an action, then merge its output back into the original input.
function augment<
TInput extends Record<string, unknown>,
TOutput extends Record<string, unknown>,
>(action: Pipeable<TInput, TOutput>): TypedAction<TInput, TInput & TOutput>
Postfix: Yes β .augment() (no arguments; wraps the preceding action).
augment(computeHash)
// Input: { file: string } β Output: { file: string, hash: string }
tap(action)β
Run an action for side effects, then pass the original input through unchanged.
function tap<TInput extends Record<string, unknown>>(
action: Pipeable<TInput, any>,
): TypedAction<TInput, TInput>
Postfix: No.
tap(logToFile)
// Input: T β Output: T (logToFile runs but output is discarded)
Option<T>β
Option<T> is a tagged union: TaggedUnion<{ Some: T; None: void }>.
All combinators desugar to branch + builtins at the AST level.
| Combinator | Type | Description |
|---|---|---|
Option.some() | T β Option<T> | Wrap as Some |
Option.none() | void β Option<T> | Produce None |
Option.map(action) | Option<T> β Option<U> | Transform Some value |
Option.andThen(action) | Option<T> β Option<U> | Monadic bind (flatMap) |
Option.unwrapOr(default) | Option<T> β T | Extract Some or compute default |
Option.flatten() | Option<Option<T>> β Option<T> | Unwrap nested Option |
Option.filter(predicate) | Option<T> β Option<T> | Keep if predicate returns Some |
Option.collect() | Option<T>[] β T[] | Collect Some values, discard Nones |
Option.isSome() | Option<T> β boolean | Test for Some |
Option.isNone() | Option<T> β boolean | Test for None |
Postfix: .mapOption(action) transforms the Some value of an Option output.
Result<TValue, TError>β
Result<TValue, TError> is a tagged union: TaggedUnion<{ Ok: TValue; Err: TError }>.
All combinators desugar to branch + builtins at the AST level.
| Combinator | Type | Description |
|---|---|---|
Result.ok() | TValue β Result<TValue, TError> | Wrap as Ok |
Result.err() | TError β Result<TValue, TError> | Wrap as Err |
Result.map(action) | Result<V, E> β Result<U, E> | Transform Ok value |
Result.mapErr(action) | Result<V, E> β Result<V, E2> | Transform Err value |
Result.andThen(action) | Result<V, E> β Result<U, E> | Monadic bind on Ok |
Result.or(fallback) | Result<V, E> β Result<V, E2> | Fallback on Err |
Result.and(other) | Result<V, E> β Result<U, E> | Replace Ok with other |
Result.unwrapOr(default) | Result<V, E> β V | Extract Ok or compute default |
Result.flatten() | Result<Result<V, E>, E> β Result<V, E> | Unwrap nested Result |
Result.toOption() | Result<V, E> β Option<V> | OkβSome, ErrβNone |
Result.toOptionErr() | Result<V, E> β Option<E> | ErrβSome, OkβNone |
Result.transpose() | Result<Option<V>, E> β Option<Result<V, E>> | Swap Result/Option nesting |
Result.isOk() | Result<V, E> β boolean | Test for Ok |
Result.isErr() | Result<V, E> β boolean | Test for Err |
Postfix: .mapErr(action) transforms the Err value; .unwrapOr(default) extracts Ok or applies default to Err.
Handler definitionβ
createHandler(definition, exportName?)β
Create a typed handler from an async function with optional Zod validators.
function createHandler<TValue, TOutput>(
definition: {
inputValidator?: z.ZodType<TValue>;
outputValidator?: z.ZodType<TOutput>;
handle: (context: { value: TValue }) => Promise<TOutput>;
},
exportName?: string,
): TypedAction<TValue, TOutput>
The returned action serializes to an Invoke node. At runtime, the Rust scheduler spawns a TypeScript worker subprocess that calls handle.
export const processFile = createHandler({
inputValidator: z.string(),
outputValidator: z.object({ status: z.string() }),
handle: async ({ value: filePath }) => {
// ...
return { status: "done" };
},
}, "processFile");
createHandlerWithConfig(definition, exportName?)β
Like createHandler, but also accepts step-level configuration.
function createHandlerWithConfig<TValue, TOutput, TStepConfig>(
definition: {
inputValidator?: z.ZodType<TValue>;
outputValidator?: z.ZodType<TOutput>;
stepConfigValidator?: z.ZodType<TStepConfig>;
handle: (context: { value: TValue; stepConfig: TStepConfig }) => Promise<TOutput>;
},
exportName?: string,
): TypedAction<TValue, TOutput>
Workflow executionβ
runPipeline(pipeline, input?)β
Run a pipeline to completion. Optionally provide an input value.
async function runPipeline(
pipeline: Action,
input?: unknown,
): Promise<void>
This is the main entry point. It serializes the pipeline AST to JSON, resolves the Rust binary, and spawns barnum run --config <json>.
await runPipeline(
pipe(listFiles, forEach(processFile), commit),
);
Postfix method summaryβ
These methods are available on any TypedAction via dot-chaining:
| Method | Standalone equivalent | Notes |
|---|---|---|
.then(next) | chain(a, next) | |
.forEach(action) | chain(a, forEach(action)) | Requires array output |
.branch(cases) | chain(a, branch(cases)) | Requires tagged union output |
.drop() | chain(a, drop) | |
.tag(kind) | chain(a, tag(kind)) | |
.merge() | chain(a, merge()) | Requires tuple-of-objects output |
.flatten() | chain(a, flatten()) | Requires nested array output |
.get(field) | chain(a, extractField(field)) | |
.pick(...keys) | chain(a, pick(...keys)) | |
.augment() | augment(a) | Merges output back into input |
.mapOption(action) | chain(a, Option.map(action)) | Requires Option output |
.mapErr(action) | chain(a, Result.mapErr(action)) | Requires Result output |
.unwrapOr(default) | chain(a, Result.unwrapOr(default)) | Requires Result output |