Skip to content

Commit 0de988e

Browse files
authored
[CHORE]: Rename tasks/operators to attached functions and functions (#5733)
## Description of changes As titled. Operators -> Functions Task -> Attached Functions TaskRuns -> Invocations CreateTask -> AttachFunction RemoveTask -> DetachFunction Will rename files in a followup diff. - Improvements & Bug fixes - ... - New functionality - ... ## Test plan _How are these changes tested?_ - [ ] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Migration plan _Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?_ ## Observability plan _What is the plan to instrument and monitor this change?_ ## Documentation Changes _Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the_ [_docs section](https://github.com/chroma-core/chroma/tree/main/docs/docs.trychroma.com)?_
1 parent 9732ae1 commit 0de988e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+4009
-3580
lines changed

chromadb/api/__init__.py

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
from chromadb.config import Component, Settings
7373
from chromadb.types import Database, Tenant, Collection as CollectionModel
7474
from chromadb.api.models.Collection import Collection
75+
from chromadb.api.models.AttachedFunction import AttachedFunction
7576

7677
# Re-export the async version
7778
from chromadb.api.async_api import ( # noqa: F401
@@ -814,46 +815,44 @@ def _delete(
814815
pass
815816

816817
@abstractmethod
817-
def create_task(
818+
def attach_function(
818819
self,
819-
task_name: str,
820-
operator_name: str,
820+
function_id: str,
821+
name: str,
821822
input_collection_id: UUID,
822-
output_collection_name: str,
823+
output_collection: str,
823824
params: Optional[Dict[str, Any]] = None,
824825
tenant: str = DEFAULT_TENANT,
825826
database: str = DEFAULT_DATABASE,
826-
) -> tuple[bool, str]:
827-
"""Create a recurring task on a collection.
827+
) -> "AttachedFunction":
828+
"""Attach a function to a collection.
828829
829830
Args:
830-
task_name: Unique name for this task instance
831-
operator_name: Built-in operator name (e.g., 'record_counter')
832-
input_collection_id: Source collection that triggers the task
833-
output_collection_name: Target collection where task output is stored
834-
params: Optional dictionary with operator-specific parameters
831+
function_id: Built-in function identifier
832+
name: Unique name for this attached function
833+
input_collection_id: Source collection that triggers the function
834+
output_collection: Target collection where function output is stored
835+
params: Optional dictionary with function-specific parameters
835836
tenant: The tenant name
836837
database: The database name
837838
838839
Returns:
839-
tuple: (success: bool, task_id: str)
840+
AttachedFunction: Object representing the attached function
840841
"""
841842
pass
842843

843844
@abstractmethod
844-
def remove_task(
845+
def detach_function(
845846
self,
846-
task_name: str,
847-
input_collection_id: UUID,
847+
attached_function_id: UUID,
848848
delete_output: bool = False,
849849
tenant: str = DEFAULT_TENANT,
850850
database: str = DEFAULT_DATABASE,
851851
) -> bool:
852-
"""Delete a task and prevent any further runs.
852+
"""Detach a function and prevent any further runs.
853853
854854
Args:
855-
task_name: Name of the task to remove
856-
input_collection_id: Id of the input collection the task is registered on
855+
attached_function_id: ID of the attached function to remove
857856
delete_output: Whether to also delete the output collection
858857
tenant: The tenant name
859858
database: The database name

chromadb/api/fastapi.py

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import urllib.parse
88
from overrides import override
99

10+
from chromadb.api.models.AttachedFunction import AttachedFunction
11+
1012
from chromadb.api.collection_configuration import (
1113
CreateCollectionConfiguration,
1214
UpdateCollectionConfiguration,
@@ -748,47 +750,56 @@ def get_max_batch_size(self) -> int:
748750
max_batch_size = cast(int, pre_flight_checks.get("max_batch_size", -1))
749751
return max_batch_size
750752

751-
@trace_method("FastAPI.create_task", OpenTelemetryGranularity.ALL)
753+
@trace_method("FastAPI.attach_function", OpenTelemetryGranularity.ALL)
752754
@override
753-
def create_task(
755+
def attach_function(
754756
self,
755-
task_name: str,
756-
operator_name: str,
757+
function_id: str,
758+
name: str,
757759
input_collection_id: UUID,
758-
output_collection_name: str,
760+
output_collection: str,
759761
params: Optional[Dict[str, Any]] = None,
760762
tenant: str = DEFAULT_TENANT,
761763
database: str = DEFAULT_DATABASE,
762-
) -> tuple[bool, str]:
763-
"""Register a recurring task on a collection."""
764+
) -> "AttachedFunction":
765+
"""Attach a function to a collection."""
764766
resp_json = self._make_request(
765767
"post",
766-
f"/tenants/{tenant}/databases/{database}/collections/{input_collection_id}/tasks/create",
768+
f"/tenants/{tenant}/databases/{database}/collections/{input_collection_id}/functions/attach",
767769
json={
768-
"task_name": task_name,
769-
"operator_name": operator_name,
770-
"output_collection_name": output_collection_name,
770+
"name": name,
771+
"function_id": function_id,
772+
"output_collection": output_collection,
771773
"params": params,
772774
},
773775
)
774-
return cast(bool, resp_json["success"]), cast(str, resp_json["task_id"])
775776

776-
@trace_method("FastAPI.remove_task", OpenTelemetryGranularity.ALL)
777+
return AttachedFunction(
778+
client=self,
779+
id=UUID(resp_json["attached_function"]["id"]),
780+
name=resp_json["attached_function"]["name"],
781+
function_id=resp_json["attached_function"]["function_id"],
782+
input_collection_id=input_collection_id,
783+
output_collection=output_collection,
784+
params=params,
785+
tenant=tenant,
786+
database=database,
787+
)
788+
789+
@trace_method("FastAPI.detach_function", OpenTelemetryGranularity.ALL)
777790
@override
778-
def remove_task(
791+
def detach_function(
779792
self,
780-
task_name: str,
781-
input_collection_id: UUID,
793+
attached_function_id: UUID,
782794
delete_output: bool = False,
783795
tenant: str = DEFAULT_TENANT,
784796
database: str = DEFAULT_DATABASE,
785797
) -> bool:
786-
"""Delete a task and prevent any further runs."""
798+
"""Detach a function and prevent any further runs."""
787799
resp_json = self._make_request(
788800
"post",
789-
f"/tenants/{tenant}/databases/{database}/collections/{input_collection_id}/tasks/delete",
801+
f"/tenants/{tenant}/databases/{database}/attached_functions/{attached_function_id}/detach",
790802
json={
791-
"task_name": task_name,
792803
"delete_output": delete_output,
793804
},
794805
)
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
from typing import TYPE_CHECKING, Optional, Dict, Any
2+
from uuid import UUID
3+
4+
if TYPE_CHECKING:
5+
from chromadb.api import ServerAPI # noqa: F401
6+
7+
8+
class AttachedFunction:
9+
"""Represents a function attached to a collection."""
10+
11+
def __init__(
12+
self,
13+
client: "ServerAPI",
14+
id: UUID,
15+
name: str,
16+
function_id: str,
17+
input_collection_id: UUID,
18+
output_collection: str,
19+
params: Optional[Dict[str, Any]],
20+
tenant: str,
21+
database: str,
22+
):
23+
"""Initialize an AttachedFunction.
24+
25+
Args:
26+
client: The API client
27+
id: Unique identifier for this attached function
28+
name: Name of this attached function instance
29+
function_id: The function identifier (e.g., "record_counter")
30+
input_collection_id: ID of the input collection
31+
output_collection: Name of the output collection
32+
params: Function-specific parameters
33+
tenant: The tenant name
34+
database: The database name
35+
"""
36+
self._client = client
37+
self._id = id
38+
self._name = name
39+
self._function_id = function_id
40+
self._input_collection_id = input_collection_id
41+
self._output_collection = output_collection
42+
self._params = params
43+
self._tenant = tenant
44+
self._database = database
45+
46+
@property
47+
def id(self) -> UUID:
48+
"""The unique identifier of this attached function."""
49+
return self._id
50+
51+
@property
52+
def name(self) -> str:
53+
"""The name of this attached function instance."""
54+
return self._name
55+
56+
@property
57+
def function_id(self) -> str:
58+
"""The function identifier."""
59+
return self._function_id
60+
61+
@property
62+
def input_collection_id(self) -> UUID:
63+
"""The ID of the input collection."""
64+
return self._input_collection_id
65+
66+
@property
67+
def output_collection(self) -> str:
68+
"""The name of the output collection."""
69+
return self._output_collection
70+
71+
@property
72+
def params(self) -> Optional[Dict[str, Any]]:
73+
"""The function parameters."""
74+
return self._params
75+
76+
def detach(self, delete_output_collection: bool = False) -> bool:
77+
"""Detach this function and prevent any further runs.
78+
79+
Args:
80+
delete_output_collection: Whether to also delete the output collection. Defaults to False.
81+
82+
Returns:
83+
bool: True if successful
84+
85+
Example:
86+
>>> success = attached_fn.detach(delete_output_collection=True)
87+
"""
88+
return self._client.detach_function(
89+
attached_function_id=self._id,
90+
delete_output=delete_output_collection,
91+
tenant=self._tenant,
92+
database=self._database,
93+
)
94+
95+
def __repr__(self) -> str:
96+
return (
97+
f"AttachedFunction(id={self._id}, name='{self._name}', "
98+
f"function_id='{self._function_id}', "
99+
f"input_collection_id={self._input_collection_id}, "
100+
f"output_collection='{self._output_collection}')"
101+
)

chromadb/api/models/Collection.py

Lines changed: 23 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525

2626
import logging
2727

28+
if TYPE_CHECKING:
29+
from chromadb.api.models.AttachedFunction import AttachedFunction
30+
2831
logger = logging.getLogger(__name__)
2932

3033
if TYPE_CHECKING:
@@ -368,7 +371,7 @@ def search(
368371

369372
return self._client._search(
370373
collection_id=self.id,
371-
searches=cast(List[Search], embedded_searches),
374+
searches=searches_list,
372375
tenant=self.tenant,
373376
database=self.database,
374377
)
@@ -495,63 +498,38 @@ def delete(
495498
database=self.database,
496499
)
497500

498-
def create_task(
501+
def attach_function(
499502
self,
500-
task_name: str,
501-
operator_name: str,
502-
output_collection_name: str,
503+
function_id: str,
504+
name: str,
505+
output_collection: str,
503506
params: Optional[Dict[str, Any]] = None,
504-
) -> tuple[bool, str]:
505-
"""Create a recurring task that processes this collection.
507+
) -> "AttachedFunction":
508+
"""Attach a function to this collection.
506509
507510
Args:
508-
task_name: Unique name for this task instance
509-
operator_name: Built-in operator name (e.g., "record_counter")
510-
output_collection_name: Name of the collection where task output will be stored
511-
params: Optional dictionary with operator-specific parameters
511+
function_id: Built-in function identifier (e.g., "record_counter")
512+
name: Unique name for this attached function
513+
output_collection: Name of the collection where function output will be stored
514+
params: Optional dictionary with function-specific parameters
512515
513516
Returns:
514-
tuple: (success: bool, task_id: str)
517+
AttachedFunction: Object representing the attached function
515518
516519
Example:
517-
>>> success, task_id = collection.create_task(
518-
... task_name="count_docs",
519-
... operator_name="record_counter",
520-
... output_collection_name="doc_counts",
520+
>>> attached_fn = collection.attach_function(
521+
... function_id="record_counter",
522+
... name="mycoll_stats_fn",
523+
... output_collection="mycoll_stats",
521524
... params={"threshold": 100}
522525
... )
523526
"""
524-
return self._client.create_task(
525-
task_name=task_name,
526-
operator_name=operator_name,
527+
return self._client.attach_function(
528+
function_id=function_id,
529+
name=name,
527530
input_collection_id=self.id,
528-
output_collection_name=output_collection_name,
531+
output_collection=output_collection,
529532
params=params,
530533
tenant=self.tenant,
531534
database=self.database,
532535
)
533-
534-
def remove_task(
535-
self,
536-
task_name: str,
537-
delete_output: bool = False,
538-
) -> bool:
539-
"""Delete a task and prevent any further runs.
540-
541-
Args:
542-
task_name: Name of the task to remove
543-
delete_output: Whether to also delete the output collection. Defaults to False.
544-
545-
Returns:
546-
bool: True if successful
547-
548-
Example:
549-
>>> success = collection.remove_task("count_docs", delete_output=True)
550-
"""
551-
return self._client.remove_task(
552-
task_name=task_name,
553-
input_collection_id=self.id,
554-
delete_output=delete_output,
555-
tenant=self.tenant,
556-
database=self.database,
557-
)

0 commit comments

Comments
 (0)