Skip to content

Commit 0c75b8e

Browse files
committed
allow specifying deps for inline run step
1 parent 99674bd commit 0c75b8e

File tree

3 files changed

+78
-4
lines changed

3 files changed

+78
-4
lines changed

src/client/step.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ export type StepRequest<DM extends GenericDataModel = GenericDataModel> = {
4848
| {
4949
kind: "inline";
5050
handler: (ctx: GenericMutationCtx<DM>) => Promise<unknown>;
51-
args: Record<string, never>;
51+
args: Record<string, unknown>;
5252
};
5353
retry: RetryBehavior | boolean | undefined;
5454
inline: boolean;

src/client/stepContext.test.ts

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -645,4 +645,68 @@ describe("StepExecutor + WorkflowCtx integration", () => {
645645
/Cannot call step methods inside a step\.run\(\) handler/,
646646
);
647647
});
648+
649+
it("ctx.run() journals deps and replays when they match", async () => {
650+
const channel = new BaseChannel<StepRequest>(0);
651+
const ctx = createWorkflowCtx("wf-20" as any, channel);
652+
653+
const entry = journalEntry({
654+
name: "run",
655+
functionType: "mutation",
656+
handle: "inline",
657+
args: { userId: "u1", count: 3 },
658+
runResult: { kind: "success", returnValue: "done" },
659+
});
660+
661+
const [result] = await Promise.all([
662+
ctx.run(async () => "done", {
663+
deps: { userId: "u1", count: 3 },
664+
}),
665+
replayFromJournal(channel, [entry]),
666+
]);
667+
668+
expect(result).toBe("done");
669+
});
670+
671+
it("ctx.run() sends deps through the channel as args", async () => {
672+
const channel = new BaseChannel<StepRequest>(0);
673+
const ctx = createWorkflowCtx("wf-21" as any, channel);
674+
675+
const deps = { userId: "u1", count: 3 };
676+
677+
// Read the message from the channel and verify the args match deps.
678+
const inspectMessage = async () => {
679+
const message = await channel.get();
680+
expect(message.target.args).toEqual(deps);
681+
expect(message.target.kind).toBe("inline");
682+
message.resolve({ kind: "success", returnValue: "ok" });
683+
};
684+
685+
const [result] = await Promise.all([
686+
ctx.run(async () => "ok", { deps }),
687+
inspectMessage(),
688+
]);
689+
690+
expect(result).toBe("ok");
691+
});
692+
693+
it("ctx.run() without deps still journals empty args", async () => {
694+
const channel = new BaseChannel<StepRequest>(0);
695+
const ctx = createWorkflowCtx("wf-22" as any, channel);
696+
697+
const entry = journalEntry({
698+
name: "run",
699+
functionType: "mutation",
700+
handle: "inline",
701+
args: {},
702+
runResult: { kind: "success", returnValue: 42 },
703+
});
704+
705+
const [result] = await Promise.all([
706+
ctx.run(async () => 42),
707+
replayFromJournal(channel, [entry]),
708+
]);
709+
710+
expect(result).toBe(42);
711+
});
648712
});

src/client/workflowContext.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,21 @@ export type WorkflowCtx<DataModel extends GenericDataModel = GenericDataModel> =
131131
* ```
132132
*
133133
* @param handler - A function receiving the mutation context to run inline.
134-
* @param opts - Options for naming the step.
134+
* @param opts - Options for naming the step and declaring dependencies.
135135
*/
136136
run<T>(
137137
handler: (ctx: GenericMutationCtx<DataModel>) => T | Promise<T>,
138-
opts?: { name?: string },
138+
opts?: {
139+
name?: string;
140+
/**
141+
* Dependencies that are validated and journaled as part of this step.
142+
* On replay, the saved deps are compared against the current deps —
143+
* if they differ, the workflow detects a mismatch and re-executes.
144+
* Use this to capture values from the enclosing scope that the handler
145+
* depends on.
146+
*/
147+
deps?: Record<string, unknown>;
148+
},
139149
): Promise<T>;
140150

141151
/**
@@ -223,7 +233,7 @@ export function createWorkflowCtx<
223233
inlineDepth--;
224234
}
225235
},
226-
args: {} as Record<string, never>,
236+
args: opts?.deps ?? {},
227237
},
228238
retry: undefined,
229239
inline: true,

0 commit comments

Comments
 (0)