|
1 | | -import logging |
2 | | - |
3 | | -import weaviate |
4 | | - |
5 | | -from llmstack.data.datasource_processor import DataSourceProcessor |
6 | | -from llmstack.data.models import DataSource, DataSourceEntry, DataSourceEntryStatus |
7 | | -from llmstack.data.types import DataSourceTypeFactory |
8 | | - |
9 | | -logger = logging.getLogger(__name__) |
10 | | - |
11 | | - |
12 | | -def delete_data_entry_task( |
13 | | - datasource: DataSource, |
14 | | - entry_data: DataSourceEntry, |
15 | | -): |
16 | | - logger.error("Deleting data_source_entry: %s" % str(entry_data.uuid)) |
17 | | - entry_data.status = DataSourceEntryStatus.MARKED_FOR_DELETION |
18 | | - entry_data.save() |
19 | | - |
20 | | - datasource.size -= entry_data.size |
21 | | - datasource_entry_handler_cls = DataSourceTypeFactory.get_datasource_type_handler( |
22 | | - datasource.type, |
23 | | - ) |
24 | | - datasource_entry_handler = datasource_entry_handler_cls(datasource) |
25 | | - try: |
26 | | - datasource_entry_items = datasource_entry_handler.delete_entry( |
27 | | - entry_data.config, |
28 | | - ) |
29 | | - if datasource_entry_items: |
30 | | - logger.debug( |
31 | | - f"Deleted {len(datasource_entry_items)} items from weaviate for data_source_entry: {str(entry_data.uuid)}", |
32 | | - ) |
33 | | - entry_data.delete() |
34 | | - except weaviate.exceptions.UnexpectedStatusCodeException: |
35 | | - logger.exception("Error deleting data source entry from weaviate") |
36 | | - entry_data.delete() |
37 | | - except Exception: |
38 | | - logger.exception( |
39 | | - "Error deleting data_source_entry: %s" % str(entry_data.name), |
40 | | - ) |
41 | | - entry_data.status = DataSourceEntryStatus.FAILED |
42 | | - entry_data.config = { |
43 | | - "errors": { |
44 | | - "message": "Error in deleting data source entry", |
45 | | - }, |
46 | | - } |
47 | | - entry_data.save() |
48 | | - |
49 | | - datasource.save() |
50 | | - return |
51 | | - |
52 | | - |
53 | | -def resync_data_entry_task( |
54 | | - datasource: DataSource, |
55 | | - entry_data: DataSourceEntry, |
56 | | -): |
57 | | - logger.info("Resyncing task for data_source_entry: %s" % str(entry_data)) |
58 | | - |
59 | | - datasource_entry_handler_cls = DataSourceTypeFactory.get_datasource_type_handler( |
60 | | - datasource.type, |
61 | | - ) |
62 | | - datasource_entry_handler: DataSourceProcessor = datasource_entry_handler_cls( |
63 | | - datasource, |
64 | | - ) |
65 | | - entry_data.status = DataSourceEntryStatus.PROCESSING |
66 | | - entry_data.save() |
67 | | - old_size = entry_data.size |
68 | | - |
69 | | - result = datasource_entry_handler.resync_entry(entry_data.config) |
70 | | - entry_data.size = result.size |
71 | | - config_entry = result.config |
72 | | - config_entry["input"] = entry_data.config["input"] |
73 | | - entry_data.config = config_entry |
74 | | - entry_data.status = DataSourceEntryStatus.READY |
75 | | - entry_data.save() |
76 | | - |
77 | | - datasource.size = datasource.size - old_size + result.size |
78 | | - datasource.save() |
79 | | - |
80 | | - |
81 | | -def delete_data_source_task(datasource): |
82 | | - datasource_type = datasource.type |
83 | | - if datasource_type.is_external_datasource: |
84 | | - return |
85 | | - datasource_entry_handler_cls = DataSourceTypeFactory.get_datasource_type_handler( |
86 | | - datasource_type, |
87 | | - ) |
88 | | - datasource_entry_handler = datasource_entry_handler_cls(datasource) |
89 | | - datasource_entry_handler.delete_all_entries() |
0 commit comments