@@ -1736,6 +1736,119 @@ func (tc *Catalog) FlushCollectionCompaction(ctx context.Context, flushCollectio
17361736 return flushCollectionInfo , nil
17371737}
17381738
1739+ // FlushCollectionCompactionAndAttachedFunction atomically updates collection compaction data and attached function completion offset.
1740+ // TODO(tanujnay112): Deprecate this
1741+ func (tc * Catalog ) FlushCollectionCompactionAndAttachedFunction (
1742+ ctx context.Context ,
1743+ flushCollectionCompaction * model.FlushCollectionCompaction ,
1744+ attachedFunctionID uuid.UUID ,
1745+ completionOffset int64 ,
1746+ ) (* model.FlushCollectionInfo , error ) {
1747+ if ! tc .versionFileEnabled {
1748+ // Attached-function-based compactions are only supported with versioned collections
1749+ log .Error ("FlushCollectionCompactionAndAttachedFunction is only supported for versioned collections" )
1750+ return nil , errors .New ("attached-function-based compaction requires versioned collections" )
1751+ }
1752+
1753+ var flushCollectionInfo * model.FlushCollectionInfo
1754+
1755+ err := tc .txImpl .Transaction (ctx , func (txCtx context.Context ) error {
1756+ var err error
1757+ // Get the transaction from context to pass to FlushCollectionCompactionForVersionedCollection
1758+ tx := dbcore .GetDB (txCtx )
1759+ flushCollectionInfo , err = tc .FlushCollectionCompactionForVersionedCollection (txCtx , flushCollectionCompaction , tx )
1760+ if err != nil {
1761+ return err
1762+ }
1763+
1764+ // Update ONLY completion_offset
1765+ err = tc .metaDomain .AttachedFunctionDb (txCtx ).UpdateCompletionOffset (attachedFunctionID , completionOffset )
1766+ if err != nil {
1767+ return err
1768+ }
1769+
1770+ return nil
1771+ })
1772+
1773+ if err != nil {
1774+ return nil , err
1775+ }
1776+
1777+ // Populate attached function fields with authoritative values from database
1778+ flushCollectionInfo .AttachedFunctionCompletionOffset = & completionOffset
1779+
1780+ log .Info ("FlushCollectionCompactionAndAttachedFunction" ,
1781+ zap .String ("collection_id" , flushCollectionCompaction .ID .String ()),
1782+ zap .String ("attached_function_id" , attachedFunctionID .String ()),
1783+ zap .Int64 ("completion_offset" , completionOffset ))
1784+
1785+ return flushCollectionInfo , nil
1786+ }
1787+
1788+ // FlushCollectionCompactionAndAttachedFunctionExtended atomically updates multiple collection compaction data
1789+ // and attached function completion offset in a single transaction.
1790+ func (tc * Catalog ) FlushCollectionCompactionAndAttachedFunctionExtended (
1791+ ctx context.Context ,
1792+ collectionCompactions []* model.FlushCollectionCompaction ,
1793+ attachedFunctionID uuid.UUID ,
1794+ completionOffset int64 ,
1795+ ) (* model.ExtendedFlushCollectionInfo , error ) {
1796+ if ! tc .versionFileEnabled {
1797+ // Attached-function-based compactions are only supported with versioned collections
1798+ log .Error ("FlushCollectionCompactionAndAttachedFunctionExtended is only supported for versioned collections" )
1799+ return nil , errors .New ("attached-function-based compaction requires versioned collections" )
1800+ }
1801+
1802+ if len (collectionCompactions ) == 0 {
1803+ return nil , errors .New ("at least one collection compaction is required" )
1804+ }
1805+
1806+ flushInfos := make ([]* model.FlushCollectionInfo , 0 , len (collectionCompactions ))
1807+
1808+ err := tc .txImpl .Transaction (ctx , func (txCtx context.Context ) error {
1809+ var err error
1810+ // Get the transaction from context to pass to FlushCollectionCompactionForVersionedCollection
1811+ tx := dbcore .GetDB (txCtx )
1812+
1813+ // Handle all collection compactions
1814+ for _ , collectionCompaction := range collectionCompactions {
1815+ log .Info ("FlushCollectionCompactionAndAttachedFunctionExtended" , zap .String ("collection_id" , collectionCompaction .ID .String ()))
1816+ flushInfo , err := tc .FlushCollectionCompactionForVersionedCollection (txCtx , collectionCompaction , tx )
1817+ if err != nil {
1818+ return err
1819+ }
1820+ flushInfos = append (flushInfos , flushInfo )
1821+ }
1822+
1823+ err = tc .metaDomain .AttachedFunctionDb (txCtx ).UpdateCompletionOffset (attachedFunctionID , completionOffset )
1824+ if err != nil {
1825+ return err
1826+ }
1827+
1828+ return nil
1829+ })
1830+
1831+ if err != nil {
1832+ return nil , err
1833+ }
1834+
1835+ // Populate attached function fields with authoritative values from database
1836+ for _ , flushInfo := range flushInfos {
1837+ flushInfo .AttachedFunctionCompletionOffset = & completionOffset
1838+ }
1839+
1840+ // Log with first collection ID (typically the output collection)
1841+ log .Info ("FlushCollectionCompactionAndAttachedFunctionExtended" ,
1842+ zap .String ("first_collection_id" , collectionCompactions [0 ].ID .String ()),
1843+ zap .Int ("collection_count" , len (collectionCompactions )),
1844+ zap .String ("attached_function_id" , attachedFunctionID .String ()),
1845+ zap .Int64 ("completion_offset" , completionOffset ))
1846+
1847+ return & model.ExtendedFlushCollectionInfo {
1848+ Collections : flushInfos ,
1849+ }, nil
1850+ }
1851+
17391852func (tc * Catalog ) validateVersionFile (versionFile * coordinatorpb.CollectionVersionFile , collectionID string , version int64 ) error {
17401853 if versionFile .GetCollectionInfoImmutable ().GetCollectionId () != collectionID {
17411854 log .Error ("collection id mismatch" , zap .String ("collection_id" , collectionID ), zap .String ("version_file_collection_id" , versionFile .GetCollectionInfoImmutable ().GetCollectionId ()))
0 commit comments