Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions genkit-tools/common/src/types/action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,43 @@ export const ActionSchema = z

export type Action = z.infer<typeof ActionSchema>;

export const ActionMetadataSchema = z
.object({
actionType: z
.enum([
'custom',
'dynamic-action-provider',
'embedder',
'evaluator',
'executable-prompt',
'flow',
'indexer',
'model',
'background-model',
'check-operation',
'cancel-operation',
'prompt',
'reranker',
'retriever',
'tool',
'tool.v2',
'util',
'resource',
])
.optional(),
name: z.string(),
description: z.string().optional(),
inputSchema: z.unknown().optional(),
inputJsonSchema: JSONSchema7Schema.optional(),
outputSchema: z.unknown().optional(),
outputJsonSchema: JSONSchema7Schema.optional(),
streamSchema: z.unknown().optional(),
metadata: z.record(z.string(), CustomAnySchema).optional(),
})
.openapi('ActionMetadata');

export type ActionMetadata = z.infer<typeof ActionMetadataSchema>;

export const RunActionResponseSchema = z.object({
result: z.unknown().optional(),
telemetry: z
Expand Down
35 changes: 34 additions & 1 deletion js/core/src/action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

import type { JSONSchema7 } from 'json-schema';
import type * as z from 'zod';
import * as z from 'zod';
import { getAsyncContext } from './async-context.js';
import { lazy } from './async.js';
import { getContext, runWithContext, type ActionContext } from './context.js';
Expand Down Expand Up @@ -62,6 +62,39 @@ export interface ActionMetadata<
metadata?: Record<string, any>;
}

export const ActionMetadataSchema = z.object({
actionType: z
.enum([
'custom',
'dynamic-action-provider',
'embedder',
'evaluator',
'executable-prompt',
'flow',
'indexer',
'model',
'background-model',
'check-operation',
'cancel-operation',
'prompt',
'reranker',
'retriever',
'tool',
'tool.v2',
'util',
'resource',
])
.optional(),
name: z.string(),
description: z.string().optional(),
inputSchema: z.unknown().optional(),
inputJsonSchema: z.object({}).optional(),
outputSchema: z.unknown().optional(),
outputJsonSchema: z.object({}).optional(),
streamSchema: z.unknown().optional(),
metadata: z.record(z.string(), z.any()).optional(),
});

/**
* Results of an action run. Includes telemetry.
*/
Expand Down
90 changes: 42 additions & 48 deletions js/core/src/dynamic-action-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@
* limitations under the License.
*/

import type * as z from 'zod';
import { Action, ActionMetadata, defineAction } from './action.js';
import * as z from 'zod';
import {
Action,
ActionMetadata,
ActionMetadataSchema,
defineAction,
} from './action.js';
import { GenkitError } from './error.js';
import { ActionMetadataRecord, ActionType, Registry } from './registry.js';

Expand All @@ -27,22 +32,26 @@ class SimpleCache {
private value: DapValue | undefined;
private expiresAt: number | undefined;
private ttlMillis: number;
private dap: DynamicActionProviderAction;
private dap: DynamicActionProviderAction | undefined;
private dapFn: DapFn;
private fetchPromise: Promise<DapValue> | null = null;

constructor(
dap: DynamicActionProviderAction,
config: DapConfig,
dapFn: DapFn
) {
this.dap = dap;
constructor(config: DapConfig, dapFn: DapFn) {
this.dapFn = dapFn;
this.ttlMillis = !config.cacheConfig?.ttlMillis
? 3 * 1000
: config.cacheConfig?.ttlMillis;
}

setDap(dap: DynamicActionProviderAction) {
this.dap = dap;
}

setValue(value: DapValue) {
this.value = value;
this.expiresAt = Date.now() + this.ttlMillis;
}

/**
* Gets or fetches the DAP data.
* @param skipTrace Don't run the action. i.e. don't create a trace log.
Expand All @@ -61,17 +70,13 @@ class SimpleCache {
if (!this.fetchPromise) {
this.fetchPromise = (async () => {
try {
// Get a new value
this.value = await this.dapFn(); // this returns the actual actions
this.expiresAt = Date.now() + this.ttlMillis;

if (!params?.skipTrace) {
// Also run the action
// This action actually does nothing, with the important side
// effect of logging its input and output (which are the same).
// It does not change what we return, it just makes
// the content of the DAP visible in the DevUI and logging trace.
await this.dap.run(transformDapValue(this.value));
if (this.dap && !params?.skipTrace) {
await this.dap.run(); // calls setValue
} else {
this.setValue(await this.dapFn());
}
if (!this.value) {
throw new Error('value is undefined');
}
return this.value;
} catch (error) {
Expand Down Expand Up @@ -107,8 +112,8 @@ export interface DynamicRegistry {
}

export type DynamicActionProviderAction = Action<
z.ZodTypeAny,
z.ZodTypeAny,
z.ZodVoid,
z.ZodArray<typeof ActionMetadataSchema>,
z.ZodTypeAny
> &
DynamicRegistry & {
Expand Down Expand Up @@ -142,14 +147,10 @@ export type DapMetadata = {
[K in ActionType]?: ActionMetadata[];
};

function transformDapValue(value: DapValue): DapMetadata {
const metadata: DapMetadata = {};
for (const key of Object.keys(value)) {
metadata[key] = value[key].map((a) => {
return a.__action;
});
}
return metadata;
function transformDapValue(value: DapValue): ActionMetadata[] {
return Object.values(value).flatMap(
(actions) => actions?.map((a) => a.__action) || []
);
}

export function defineDynamicActionProvider(
Expand All @@ -163,36 +164,29 @@ export function defineDynamicActionProvider(
} else {
cfg = { ...config };
}
const cache = new SimpleCache(cfg, fn);
const a = defineAction(
registry,
{
...cfg,
inputSchema: z.void(),
outputSchema: z.array(ActionMetadataSchema),
actionType: 'dynamic-action-provider',
metadata: { ...(cfg.metadata || {}), type: 'dynamic-action-provider' },
},
async (i, _options) => {
// The actions are retrieved, saved in a cache, formatted nicely and
// then passed in here so they can be automatically logged by the action
// call. This action is for logging only. We cannot run the actual
// 'getting the data from the DAP' here because the DAP data is required
// to resolve tools/resources etc. And there can be a LOT of tools etc.
// for a single generate. Which would log one DAP action per resolve,
// and unnecessarily overwhelm the Dev UI with DAP actions that all have
// the same information. So we only run this action (for the logging) when
// we go get new data from the DAP (so we can see what it returned).
return i;
async (_options) => {
const dapValue = await fn();
cache.setValue(dapValue);
return transformDapValue(dapValue);
}
);
implementDap(a as DynamicActionProviderAction, cfg, fn);
implementDap(a as DynamicActionProviderAction, cache);
return a as DynamicActionProviderAction;
}

function implementDap(
dap: DynamicActionProviderAction,
config: DapConfig,
dapFn: DapFn
) {
dap.__cache = new SimpleCache(dap, config, dapFn);
function implementDap(dap: DynamicActionProviderAction, cache: SimpleCache) {
cache.setDap(dap);
dap.__cache = cache;
dap.invalidateCache = () => {
dap.__cache.invalidate();
};
Expand Down
10 changes: 4 additions & 6 deletions js/core/tests/dynamic-action-provider_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,18 +237,16 @@ describe('dynamic action provider', () => {
};
});

let runInput: any;
let runResult: any;
const originalRun = dap.run.bind(dap);
dap.run = async (input, options) => {
runInput = input;
return originalRun(input, options);
runResult = await originalRun(input, options);
return runResult;
};

await dap.__cache.getOrFetch();

assert.deepStrictEqual(runInput, {
tool: [tool1.__action, tool2.__action],
});
assert.deepStrictEqual(runResult.result, [tool1.__action, tool2.__action]);
});

it('skips trace when requested', async () => {
Expand Down
Loading