Skip to content

Internals of ShannonBase

Shannon Data AI Lab edited this page Oct 9, 2025 · 77 revisions

1 Overview

2 Architecture

2.1 Before the start

Shannonbase is an AI/ML empowered open source MySQL HTAP Database. which utilize AI/ML to enhance the AI/ML ability of Shannonbase.

A hybrid transaction/analytical processing (HTAP) architecture is best enabled by in-memory computing (IMC) techniques and technologies to enable analytical processing on the same (in memory) data store that is used to perform transaction processing. By removing the latency associated with moving data from operational databases to data warehouses and data marts for analytical processing, this architecture enables real-time analytics and situation awareness on live transaction data as opposed to after-the-fact analysis on stale data. IMC technologies support a single, low-latency-access, in-memory data store capable of processing high volumes of transactions. From Gartner, HTAP-enabling In-memory Computing Technologies

With these employed features, shannonbase will has autonomous tuning, workloads predicting, auto index choice, index recommendation, selectivity estimation, etc. (ref: a phd dissertation from UC berkeley by Zongheng Yang and MySQL heatwave.

In order to support analytical processing in mysql, shannonbase incorporate a secondary engine, which is an in-memory column engine to process analytical workloads. The secondary engine is new feature, which is introduced in MySQL 8.x, it provides an interface to support multi-model, heterogeneous database. It synchronize the data from primary engine(InnoDB) to secondary engine(Rapid).

Now, our in-memory column sotre, also called Rapid.

It will be based on MySQL 8.1. It aims to achieve at least x times higher query performance on TPC-H than that of xxx, with a more concise architecture and a query optimizer that can intelligently offload query workloads to corresponding storage engines.

The design philosophy of ShannonBase Rapid is modularity and performance-cost balance. The following outlines the new features that will be implemented in ShannonBase. To learn details about each feature, see the relevant chapter.

ShannonBase Rapid will still be an open source project, which is a counterpart of close source service, MySQL Heatwave.

At first, an in-memory column store (IMCS) will be used. Secondly, a cost-based query engine will be developed to automatically offload transactional and analytics workloads. Thirdly, ShannonBase Rapid will provide a vectorized execution engine and support massive parallel processing. In this way, the execution performance of ShannonBase Rapid will be at least xxx times as that of xxx.

ShannonBase will load the data into memory from InnoDB to Rapid, just the same as MySQL Heatwave does.

’‘’MySQL Analytics is an in-memory processing engine, data is only persisted in MySQL InnoDB storage engine.‘’‘

This sentence functions as the basic rule and guideline for us when implementation ShannonBase Rapid. This design document introduces the main changes that will be achieved and gives you an overview of architecture of ShannonBase.

The main design goals of ShannonBase will include:

  • Large Scale.
  • Real Time.
  • Highly Fresh Data Changes.
  • Strong Data Consistency.
  • Query Performance Capability.
  • Single System Interface.
  • Workload Isolation.

2.2 Overview of ShannonBase Rapid

ShannonBase is an integrated HTAP database that adopts hybrid row-column store and in-memory computing. It is fully compatible with MySQL version 8.1.

The architecture overview of ShannonBase image

In MySQL 8.0, it provides the secondary engine which can intelligently route the TP workloads to the primary engine(InnonDB) and routes the AP workloads to secondary engine(Rapid), all these operations are based on the workload type. image

2.3 Query Engine

After all new SQL syntaxes are enabled, the server will understand all the SQL statements. When the server receives an SQL statement, the SQL string will create some SQL classes, such as PT_create_table_stmt after lexical processing and grammatical processing. We will not discuss how distributed query plans in MPP are generated in this document. Instead, we focus on ONE NODE and try to explain what happens in ONE node when processing an SQL statement.

In MySQL 8.0, when the cost of a query plan on the primary engine is greater than the threshold defined by the new system variable (secondary_engine_cost_threshold), the query optimization engine will offload this workload to the secondary engine, ensuring optimal processing efficiency.

At the last phase of query optimization, The query engine will add optimize_secondary_engine to determine to which engine will the workload route for execution by performing the following three steps:

Use the original processing way: unit->optimize().

Estimate the cost spent by each engine to process the query: current_query_cost and accumulate_current_query_cost.

If current_query_cost is greater than secondary_engine_cost_threshold, forward the workload to optimize_secondary_engine.

if (current_query_cost < variables.secondary_engine_cost_threshold) 
    return false; 
optimize_secondary_engine;

In future, after ShannonBase achieves MPP, the way that ShannonBase processes SQL statements will be different from centralized systems. A distributed query plan will be generated after query optimization is completed.

2.4 Execution Engine

As for the execution engine, a vectorized execution engine will be incorporated in ShannonBase. Shannonbase will support parallel query and vectorized execution. A column-based AP system is native to implement a vectorized execution engine. The vectorized engine seems as a standard feature to improve the performance of AP workloads. RDBMS systems such as ClickHouse also use vectorized execution engines.

Two ways are available to achieve vectorized execution as following:

Use SIMD (single instruction, multiple data) to re-write execution plans. Multiple tuples will be fetched in an iteration, rather than a-tuple-an-iteration. Use GCC to generate vectorized code. Some aggregation functions such as count(), sum(), and avg() can be executed in parallel mode. After a query plan is dispatched on a data node through the management node, the execution engine executes this query plan in parallel and the job is divided into sub-jobs and simultaneously executed through threads. The framework of parallel execution is discussed in issue #xxxx. You can refere to MySQL NDB cluster.

MySQL Cluster has a unique parallel query engine. It gives a consistent consolidated transactional view of the entire distributed partitioned dataset. This makes designing and programming scaleable distributed applications straightforward and extremely simple. The developer can completely focus on the application logic and doesn’t need to think about data distribution.

2.4 Rapid, A In-Memory Column Store Secondary Engine

3 Internals

1 Getting TRX_ID from InnoDB

In Shannonbase, we have introduced a hidden column, db_trx_id, designed to store the trxid from InnoDB records. When executing the secondary_load command in Shannonbase, a full table scan is performed on the specified loaded table, and all visible records in the table are loaded into the Shannonbase Rapid engine. Therefore, the trxid for each record is similarly loaded into the Shannonbase Rapid engine in columnar format. The following picture illustrates the internal layout of a row in innodb engine. image

When a row loaded into ShannonBase Rapid Engine, it's divided into parts, each field stored as an independent file. image

In order to identify the visibility of a columar, a TRX_ID was attached, which got from its row information. An invisible field, Field_sys_trx_id, employed into Shannonbase to store the TRX_ID when we do full table scan in alter table xxx secondary_load executing stage.

1.1 The ghost column, Field_sys_trx_id

/**
 Field_sys_trx_id represented as an system column DB_TRX_ID  used for getting
 trx_id value from innodb to SQL.
 * */
class Field_sys_trx_id : public Field_longlong {
 public:
  using Field_longlong::store;
  static const int PACK_LENGTH_TRX_ID = MAX_DB_TRX_ID_WIDTH;

  Field_sys_trx_id(uchar *ptr_arg, uint32 len_arg)
      : Field_longlong(ptr_arg, len_arg, nullptr, 0, 0, "DB_TRX_ID", 0, false)
  {
    stored_in_db = false;
    set_hidden(dd::Column::enum_hidden_type::HT_HIDDEN_SE);
    set_column_format(COLUMN_FORMAT_TYPE_DEFAULT);
    set_flag(NO_DEFAULT_VALUE_FLAG);
    stored_in_db = true;
  }
  Field_sys_trx_id(uint32 len_arg, bool is_nullable_arg,
                   const char *field_name_arg, bool unsigned_arg)
      : Field_longlong(nullptr, len_arg, is_nullable_arg ? &dummy_null_buffer : nullptr,
		      0, 0, field_name_arg, 0, unsigned_arg)
  {
    stored_in_db = false;
    set_hidden(dd::Column::enum_hidden_type::HT_HIDDEN_SE);
    set_column_format(COLUMN_FORMAT_TYPE_DEFAULT);
    set_flag(NO_DEFAULT_VALUE_FLAG);
    stored_in_db = true;
  }
  type_conversion_status store(longlong nr, bool unsigned_val) final;
  enum_field_types type() const final { return MYSQL_TYPE_DB_TRX_ID; }
  uint32 pack_length() const final { return PACK_LENGTH_TRX_ID; }
  void sql_type(String &str) const final;
  Field_sys_trx_id *clone(MEM_ROOT *mem_root) const final {
    assert(type() == MYSQL_TYPE_DB_TRX_ID);
    return new (mem_root) Field_sys_trx_id(*this);
  }
  longlong val_int() const final;
};

1.2 Adding and Filling the ghost column

In open table stage, MySQL will filling the TABLE_SHARE from dd. Therefore, in order to keep TRX_ID, we need add a ghost column, 'Field_sys_trx_id' to the end of all use defined field. In open_table_from_share, we allocate an extra Field to keep Field_sys_trx_id.

int  open_table_from_share(THD *thd, TABLE_SHARE *share, const char *alias,
                          uint db_stat, uint prgflag, uint ha_open_flags,
                          TABLE *outparam, bool is_create_table,
                          const dd::Table *table_def_param) {
   ...
  //Here we need an extra space to store 'ghost' column from table_share.
  if (!(field_ptr = root->ArrayAlloc<Field *>(share->fields + 1 + 1)))
    goto err; /* purecov: inspected */
  ...

After that, MySQL will fill all the allocated Fields information by fill_columns_from_dd. When all user defined columns are filled, we will add and fill the ghost column.

  bool is_in_upgrade = dd::upgrade_57::in_progress();
  bool is_system_objs = is_system_object(share->db.str, share->table_name.str);
  /*we dont add the extra file for system table or in upgrading phase.*/
  if (!is_in_upgrade && !is_system_objs){
    Create_field db_trx_id_field;
    db_trx_id_field.sql_type = MYSQL_TYPE_DB_TRX_ID;
    db_trx_id_field.is_nullable = db_trx_id_field.is_zerofill = false;
    db_trx_id_field.is_unsigned = true;
    Field *sys_trx_id_field = make_field(db_trx_id_field, share, "MYSQL_TYPE_DB_TRX_ID",
                                                    MAX_DB_TRX_ID_WIDTH, rec_pos, null_pos, 0);
    sys_trx_id_field->set_field_index(field_nr);
    share->field[field_nr] = sys_trx_id_field;
    assert (sys_trx_id_field->pack_length_in_rec() == MAX_DB_TRX_ID_WIDTH);
    //rec_pos += share->field[field_nr]->pack_length_in_rec();
    field_nr++;
    assert(share->fields + 1 == field_nr);
  }

In open_table_from_share, also should add an extra space for that ghost column. And, the field_ptr also should be set at here.

int  open_table_from_share(THD *thd, TABLE_SHARE *share, const char *alias,
                          uint db_stat, uint prgflag, uint ha_open_flags,
                          TABLE *outparam, bool is_create_table,
                          const dd::Table *table_def_param) {
  ...
  //Here we need an extra space to store 'ghost' column from table_share.
  if (!(field_ptr = root->ArrayAlloc<Field *>(share->fields + 1 + 1)))
    goto err; /* purecov: inspected */
  ...

Another extra space need is row's record[0], which used to store row data loaded from InnoDB.

int  open_table_from_share(THD *thd, TABLE_SHARE *share, const char *alias,
                          uint db_stat, uint prgflag, uint ha_open_flags,
                          TABLE *outparam, bool is_create_table,
                          const dd::Table *table_def_param) {
  ...
  //in find_record_length(), MAX_DB_TRX_ID_WIDTH is already added.
  record = root->ArrayAlloc<uchar>(share->rec_buff_length * records +
                                   share->null_bytes);
  ...
static bool find_record_length(const dd::Table &table, size_t min_length,
                               TABLE_SHARE *share) {
  ...
    // Loop over columns, count nullable and bit fields and find record length.
  for (const dd::Column *col_obj : table.columns()) {
    // Skip hidden columns
    if (col_obj->is_se_hidden()) continue;

    // Check if the field may be NULL.
    if (col_obj->is_nullable()) share->null_fields++;

    // Check if this is a BIT field with leftover bits in the preamble, and
    // adjust record length accordingly.
    if (col_obj->type() == dd::enum_column_types::BIT) {
      bool treat_bit_as_char;
      if (col_obj->options().get("treat_bit_as_char", &treat_bit_as_char))
        return true;

      if (!treat_bit_as_char && (col_obj->char_length() & 7))
        leftover_bits += col_obj->char_length() & 7;
    }

    // Increment record length.
    share->reclength += column_pack_length(*col_obj);
    share->fields++;
  }

  // Find preamble length and add it to the total record length.
  share->null_bytes = (share->null_fields + leftover_bits + 7) / 8;
  share->last_null_bit_pos = (share->null_fields + leftover_bits) & 7;
  share->reclength += share->null_bytes;

  // Hack to avoid bugs with small static rows in MySQL.
  share->reclength = std::max<size_t>(min_length, share->reclength);
  //Here, due to we need an extra space to store ghost column, db_trx_id length
  //therefore, 'MAX_DB_TRX_ID_WIDTH' is added.
  
  share->reclength += calc_pack_length(MYSQL_TYPE_DB_TRX_ID, 0);
  share->stored_rec_length = share->reclength;
  ...

By now, all the tables should have an extra invisible column, Field_sys_trx_id.

1.3 Building the template for InnoDB

The template is used in fast retrieval of just those column values MySQL needs in its processing. It used by m_prebuilt when we start to scan data in innodb.

void ha_innobase::build_template(bool whole_row) {
  ...
  /**there're two places using this template for accelerating, one: select, another place is for DML
  in 'row_mysql_convert_row_to_innobase', it uses for build up innobase row format by using template.
  This field is need in any queries, so that we dont use 'build_template_needs_field()' to check it.
  only one we should know that the difference between index (secondary index)and primary key(cluster
  index). append the ghost field template at the end.
  */
  Field* db_trx_id_field = table->field[n_fields];
  if (db_trx_id_field) {
    assert(db_trx_id_field->type() == MYSQL_TYPE_DB_TRX_ID);
    mysql_row_templ_t *templ [[maybe_unused]] = build_template_field(
      m_prebuilt, clust_index, index, table, db_trx_id_field, 1, 0);
  }
  ...

1.4 Filling TRX_ID in InnoDB

In InnoDB row_sel_field_store_in_mysql_format function, It transform a row in innodb format to mysql format.

/** Convert a non-SQL-NULL field from Innobase format to MySQL format. */
static inline void row_sel_field_store_in_mysql_format(
    byte *dest, const mysql_row_templ_t *templ, const dict_index_t *idx,
    ulint field, const byte *src, ulint len, ulint sec) {
  row_sel_field_store_in_mysql_format_func(
      dest, templ, idx, field,  src, len , sec);
}
void row_sel_field_store_in_mysql_format_func(
    byte *dest, const mysql_row_templ_t *templ, const dict_index_t *index,
    ulint field_no,  const byte *data,
    ulint len , ulint sec_field) {
   ...
       case DATA_SYS_CHILD:
    case DATA_SYS:
      /* These column types should never be shipped to MySQL. But, in Shannon,
         we will retrieve trx id to MySQL. */
      switch (prtype & DATA_SYS_PRTYPE_MASK) {
         case DATA_TRX_ID:
             id = mach_read_from_6(data);
             memcpy(dest, &id, sizeof(ib_id_t));
             break;
         case DATA_ROW_ID:
         case DATA_ROLL_PTR:
           assert(0);
           break;
      }
      break;
   ...

1.5 SQL Layer

THE GHOST COLUMN IS INVISIBLE TO USER, THEREFORE, IT DONT ENABLE IN SQL LAYER

1.6 miscellaneous

It only USE in InnoDB, not in MyISAM.

2 ShannonBase In-memory Column Store -Rapid

2.1 Overview

Taking Oracle IM column store as an example, here, we will to imll our IM column store as Oracle does.

Oracle IM

In order to provide HTAP services, just like MySQL Heatwave, An in memory column store should be incoperated into, which is used to handle the analytical workloads. Any AP workloads will be offloaded to Rapid Engine(In memory column store).

The architecture of MySQL Heatwave is listed below. (copyright belongs to MySQL)

image image

2.2 Secondary engine

2.2.1 SQL syntaxes

Firstly, the SQL syntaxes must be defined. All these syntaxes are the basis of all future works. This chapter introduces syntaxes of the following operations:

Create a table with secondary_engine=rapid.

Load data.

Process a query.

Monitor the system status.

To determine which SQL syntaxes to define, we must first figure out why we want to use ShannonBase: Firstly, we want users can port their workloads seamlessly from MySQL Heatwave to ShannonBase. Therefore, we try to adopt all SQL syntax that MySQL Heatwave uses (which is also used in MySQL version 8.1).

In addition, relevant changes will be implemented in the MySQL server layer. Following are examples showing some SQL syntaxes supported by ShannonBase.

Certain SQL grammars must be added in sql/sql_yacc.yy. The following uses the SELECT statement as an example:

select_stmt:
          query_expression
          {
            $$ = NEW_PTN PT_select_stmt($1);
          }
        | query_expression locking_clause_list
          {
            $$ = NEW_PTN PT_select_stmt(NEW_PTN PT_locking($1, $2),
                                        nullptr, true);
          }
        | query_expression_parens
          {
            $$ = NEW_PTN PT_select_stmt($1);
          }
        | select_stmt_with_into
        ;

After SQL syntaxes are added, new SQL items are created in yacc. These items will be processed in the MySQL server layer during query optimization.

Create a table with secondary_engine=rapid:

CREATETABLE orders (id INT)SECONDARY_ENGINE= rapid;
ALTERTABLE orders SECONDARY_ENGINE= rapid;

Compared with the syntax for creating tables used in StoneDB V1.0, StoneDB V2.0 will support a new keyword SECONDARY_ENGINE that is adopted in MySQL 8.0. Original CREATE statement syntax used in MySQL:

create_table_stmt:
          CREATE opt_temporary TABLE_SYM opt_if_not_exists table_ident
          '(' table_element_list ')' opt_create_table_options_etc
          {
            $$= NEW_PTN PT_create_table_stmt(YYMEM_ROOT, $2, $4, $5,
                                             $7,
                                             $9.opt_create_table_options,
                                             $9.opt_partitioning,
                                             $9.on_duplicate,
                                             $9.opt_query_expression);
          }

opt_create_table_options_etc:
          create_table_options
          opt_create_partitioning_etc
          {
            $$= $2;
            $$.opt_create_table_options= $1;
          }
        | opt_create_partitioning_etc
        ;

create_table_option:
          ENGINE_SYM opt_equal ident_or_text
          {
            $$= NEW_PTN PT_create_table_engine_option(to_lex_cstring($3));
          }
        | **SECONDARY_ENGINE_SYM** opt_equal NULL_SYM
          {
            $$= NEW_PTN PT_create_table_secondary_engine_option();
          }
        | SECONDARY_ENGINE_SYM opt_equal ident_or_text
          {
            $$= NEW_PTN PT_create_table_secondary_engine_option(to_lex_cstring($3));
          }

From the definition above, SECONDARY_ENGINE_SYM is already defined in create_table_option and also should be in class PT_create_table_stmt. For more information about how SQL syntax support will be designed.

2.2.2 Load Data/Unload Data

This is a part, which is mainly focusing on how to load data from innodb into in-memory column store. This issue will give all the information about this part.

When the table with secondary engine created. the next step will be loaded data into the secondary engine. After the all the data we need have been loaded, we can do query processing. The load operation perform via using alter table statement with SECONDARY_LOAD option.

ALTER TABLE tb_name SECONDARY_LOAD.
/**
  Represents ALTER TABLE SECONDARY_LOAD/SECONDARY_UNLOAD statements.
*/
class Sql_cmd_secondary_load_unload final : public Sql_cmd_common_alter_table {
};

/**
 * Loads a table from its primary engine into its secondary engine.
 *
 * This call assumes that MDL_SHARED_NO_WRITE/SECLOAD_SCAN_START_MDL lock
 * on the table have been acquired by caller. During its execution it may
 * downgrade this lock to MDL_SHARED_UPGRADEABLE/SECLOAD_PAR_SCAN_MDL.
 *
 * @param thd              Thread handler.
 * @param table            Table in primary storage engine.
 *
 * @return True if error, false otherwise.
 */
static bool secondary_engine_load_table(THD *thd, const TABLE &table) {
};

class ha_tianmu_secondary : public handler {
 public:
  ha_tianmu_secondary(handlerton *hton, TABLE_SHARE *table_share);

 private:
  int create(const char *, TABLE *, HA_CREATE_INFO *, dd::Table *) override;

  int open(const char *name, int mode, unsigned int test_if_locked,
           const dd::Table *table_def) override;

  int close() override { return 0; }

  int rnd_init(bool) override { return 0; }

  int rnd_next(unsigned char *) override { return HA_ERR_END_OF_FILE; }

  int rnd_pos(unsigned char *, unsigned char *) override {
    return HA_ERR_WRONG_COMMAND;
  }

  int info(unsigned int) override;

  ha_rows records_in_range(unsigned int index, key_range *min_key,
                           key_range *max_key) override;

  void position(const unsigned char *) override {}

  unsigned long index_flags(unsigned int, unsigned int, bool) const override;

  THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to,
                             thr_lock_type lock_type) override;

  Table_flags table_flags() const override;

  const char *table_type() const override { return "TIANMU_RAPID"; }

  int load_table(const TABLE &table) override;

  int unload_table(const char *db_name, const char *table_name,
                   bool error_if_not_loaded) override;

  THR_LOCK_DATA m_lock;
};

In sql_table.cc, the function Sql_cmd_secondary_load_unload::mysql_secondary_load_or_unload used to load/unload the data to/from secondary. When the operations are done, the meta information of loaded table is stored into performance_schema.rpd_xxx. These tables are used to monitor the status of that secondary engine.

static bool secondary_engine_load_table(THD *thd, const TABLE &table) {
  ...
  // Load table from primary into secondary engine and add to change
  // propagation if that is enabled.
  if (handler->ha_load_table(table)){
    my_error(ER_SECONDARY_ENGINE, MYF(0),
             "secondary storage engine load table failed");
    return true;
  }

  // add the mete info into 'rpd_column_id' and 'rpd_columns tables', etc.
  // to check whether it has been loaded or not. here, we dont use field_ptr != nullptr
  // because the ghost column.
  uint32 field_count = table.s->fields;
  Field *field_ptr = nullptr;
  for (uint32 index = 0; index < field_count; index++) {
    field_ptr = *(table.field + index);
    // Skip columns marked as NOT SECONDARY.
    if ((field_ptr)->is_flag_set(NOT_SECONDARY_FLAG)) continue;

    ShannonBase::rpd_columns_info row_rpd_columns;
    strncpy(row_rpd_columns.schema_name, table.s->db.str, table.s->db.length);
    row_rpd_columns.table_id = static_cast<uint>(table.s->table_map_id.id());
    row_rpd_columns.column_id = field_ptr->field_index();
    strncpy(row_rpd_columns.column_name, field_ptr->field_name,
            strlen(field_ptr->field_name));
    strncpy(row_rpd_columns.table_name, table.s->table_name.str,
            strlen(table.s->table_name.str));
    std::string key_name (table.s->db.str);
    key_name += table.s->table_name.str;
    key_name += field_ptr->field_name;
    ShannonBase::Compress::Dictionary* dict =
      ShannonBase::Imcs::Imcs::get_instance()->get_cu(key_name)->get_header()->m_local_dict.get();
    if (dict)
      row_rpd_columns.data_dict_bytes = dict->content_size();
    row_rpd_columns.data_placement_index = 0;
  ...

2.3 Rapid, In-memory Column Store.

2.3.1 Overview

The first step of processing AP workloads is load the basic full data into rapid engine, then rapid will start propagation operation automatically. When a table loaded from innodb to rapid engine, some meta informations will be also loaded into catalog table, such as performance_schema.rpd_column, performance_schema.rpd_column_id, etc. A backgroud thread will be launched when system start, then start to monitor the redo log, when a new DML operation done, this background thread starts to parse the incoming redo log, and apply the changes into IMCS.

When the load statement was executed, it would peform the load operation. Overall, just like insert into xxx select xxx statement, the system firstly do a table scan via index or full table scan.

1: It scans the target table, usually it is an innodb table. And, here, there is a problem must be clarified at first. That is which data will be visible to operation, and which is not. Therefore, here, we define that only the committed data will be visible to scan operation. In other words, that means we will use auto commited transaction to do table scan. the transaciton will be read committed isolation level.

The new data inserted when we do table scaning, all these the latest data will not be seen by the operation, because this would not happen. An exclusive mdl lock is used to protected the new rows are inserted into table when the loading operation is running.

2: Except the core functions, there must be some system parameters to monitors the load operations, for example, how many data have been loaded? and how many remains, and so on. some parallel related parameters also will be introduced into, such as POD( parallel of degree), etc. Therefore, some system parameters will be introduced.

2.3.2 Column Data Format

Each column is organized as a file, when it flushes to disk. The format of columns in memory is also called as IMCU(In-memory column unit). An IMCU consisted by CUs(Column Unit), A CU has two parts: (1) Header, Meta Information; (2) Data; Data also can be divided into a bunch of chunks. Each chunk has 122880 rows data, which is same as DuckDB's. And, the column of DB_TR_IDis used to track which transaction modify this row data(cu datas) to implement the multi-version. image image

All chunks are linked. The address of the first chunk can be found from Cu's header, and also contains the address of the next chunk. A chunk's consist with header and data. Header contains the meta information of this chunk. the data part is where the real data located. Gets the first Cu from IMCS. In an IMCS instance header, it has a header, which has a pointer to the address of IMCU.

When a new data in, it stores it in order. Insert sort can be used to make it ordered. It uses binary search to find the data. But it data is in compressed format, at this situation, we need a new algorithm to find the data in compresssed data.

Now, we go to deeper. Giving out the more specific details of the data. Here, we notice that every data we write into CU should a tansaction id attached to it to mark which transaction it belongs. image

All the variable length data, such as text, string, etc. are encoded into a double tyep in its local dictionary. Each one has a double typed id when it loaded into rapid.

  • Imcs

Singleton pattern, Only ONE instance in rapid engin. It used to represent an in-memory store instance. we can use it to perform full table scan or index table scan to getting the data from it. It has several CUs, A Cu means a field of loaded table. Some Chunks consist A Cu.

class Imcs : public MemoryObject {
 public:
  using Cu_map_t = std::unordered_map<std::string, std::unique_ptr<Cu>>;
  using Imcu_map_t = std::multimap<std::string, std::unique_ptr<Imcu>>;
  inline static Imcs *get_instance() {
    std::call_once(one, [&] { m_instance = new Imcs(); });
    return m_instance;
  }
  // initialize the imcs.
  uint initialize();
  // deinitialize the imcs.
  uint deinitialize();
  // gets initialized flag.
  inline bool initialized() {
    return (m_inited == handler::NONE) ? false : true;
  }
  // scan oper initialization.
  uint rnd_init(bool scan);
  // end of scanning
  uint rnd_end();
  // writes a row of a column in.
  uint write_direct(ShannonBase::RapidContext *context, Field *fields);
  // reads the data by a rowid into a field.
  uint read_direct(ShannonBase::RapidContext *context, Field *field);
  // reads the data by a rowid into buffer.
  uint read_direct(ShannonBase::RapidContext *context, uchar *buffer);
  uint read_batch_direct(ShannonBase::RapidContext *context, uchar *buffer);
  // deletes the data by a rowid
  uint delete_direct(ShannonBase::RapidContext *context, Field *field);
  // deletes all the data.
  uint delete_all_direct(ShannonBase::RapidContext *context);
  Cu *get_cu(std::string &key);
  void add_cu(std::string key, std::unique_ptr<Cu> &cu);
  ha_rows get_rows(TABLE *source_table);

 private:
  // make ctor and dctor private.
  Imcs();
  virtual ~Imcs();

  Imcs(Imcs &&) = delete;
  Imcs(Imcs &) = delete;
  Imcs &operator=(const Imcs &) = delete;
  Imcs &operator=(const Imcs &&) = delete;

 private:
  // imcs instance
  static Imcs *m_instance;
  // initialization flag, only once.
  static std::once_flag one;
  // cus in this imcs. <db+table+col, cu*>
  Cu_map_t m_cus;
  // imcu in this imcs. <db name + table name, imcu*>
  Imcu_map_t m_imcus;
  // used to keep all allocated imcus. key string: db_name + table_name.
  // initialization flag.
  std::atomic<uint8> m_inited{handler::NONE};
};

The class member m_imcus is a map, defined as following:

std::unordered_map<std::string, std::unique_ptr<Cu>>

The key is constructed by "db_name" and "table_name". When writting a row into rapid, every field are writting its own data indepently.

  • Cu

Column Unit, it represent a field of a loaded table. All data of a field is stored into its CU.

class Cu : public MemoryObject {
 public:
  using Cu_header = struct alignas(CACHE_LINE_SIZE) Cu_header_t {
   public:
    // whether the is not null or not.
    bool m_nullable{false};
    // encoding type. pls ref to:
    // https://dev.mysql.com/doc/heatwave/en/mys-hw-varlen-encoding.html
    // https://dev.mysql.com/doc/heatwave/en/mys-hw-dictionary-encoding.html
    Compress::Encoding_type m_encoding_type{Compress::Encoding_type::NONE};
    // the index of field.
    uint16 m_field_no{0};
    // field type of this cu.
    enum_field_types m_cu_type{MYSQL_TYPE_TINY};
    // local dictionary.
    std::unique_ptr<Compress::Dictionary> m_local_dict;
    // statistics info.
    std::atomic<double> m_max{0}, m_min{0}, m_middle{0}, m_median{0}, m_avg{0},
        m_sum{0};
    std::atomic<uint64> m_rows{0};
  };

  explicit Cu(Field *field);
  virtual ~Cu();
  Cu(Cu &&) = delete;
  Cu &operator=(Cu &&) = delete;

  // initialization. these're for internal.
  uint rnd_init(bool scan);
  // End of Rnd scan
  uint rnd_end();
  // writes the data into this chunk. length unspecify means calc by chunk.
  uchar *write_data_direct(ShannonBase::RapidContext *context, uchar *data,
                           uint length = 0);
  // reads the data by from address.
  uchar *read_data_direct(ShannonBase::RapidContext *context, uchar *buffer);
  // reads the data by rowid to buffer.
  uchar *read_data_direct(ShannonBase::RapidContext *context, uchar *rowid,
                          uchar *buffer);
  // deletes the data by rowid
  uchar *delete_data_direct(ShannonBase::RapidContext *context, uchar *rowid);
  // deletes all
  uchar *delete_all_direct();
  // updates the data with rowid with the new data.
  uchar *update_data_direct(ShannonBase::RapidContext *context, uchar *rowid,
                            uchar *data, uint length = 0);
  // flush the data to disk. by now, we cannot impl this part.
  uint flush_direct(ShannonBase::RapidContext *context, uchar *from = nullptr,
                    uchar *to = nullptr);
  inline Compress::Dictionary *local_dictionary() const {
    return m_header->m_local_dict.get();
  }
  Cu_header *get_header() { return m_header.get(); }
  // gets the base address of chunks.
  uchar *get_base();
  void add_chunk(std::unique_ptr<Chunk> &chunk);
  inline Chunk *get_chunk(uint chunkid) {
    return (chunkid < m_chunks.size()) ? m_chunks[chunkid].get() : nullptr;
  }
  inline Chunk *get_first_chunk() { return get_chunk(0); }
  inline Chunk *get_last_chunk() { return get_chunk(m_chunks.size() - 1); }
  inline size_t get_chunk_nums() { return m_chunks.size(); }

  uchar *seek(size_t offset);
  inline Index *get_index() { return m_index.get(); }

 private:
  uint m_magic{SHANNON_MAGIC_CU};
  // proctect header.
  std::mutex m_header_mutex;
  // header info of this Cu.
  std::unique_ptr<Cu_header> m_header{nullptr};
  // chunks in this cu.
  std::vector<std::unique_ptr<Chunk>> m_chunks;
  // current chunk read.
  std::atomic<uint32> m_current_chunk_id{0};
  // index of Cu
  std::unique_ptr<Index> m_index{nullptr};
};
  • Chunk

The basic unit to store the data.

class Chunk : public MemoryObject {
 public:
  using Chunk_header = struct alignas(CACHE_LINE_SIZE) Chunk_header_t {
   public:
    // is null or not.
    bool m_null{false};
    // whether it is var type or not
    bool m_varlen{false};
    // data type in mysql.
    enum_field_types m_chunk_type{MYSQL_TYPE_TINY};
    // field no.
    uint16 m_field_no{0};
    // pointer to the next or prev.
    Chunk *m_next_chunk{nullptr}, *m_prev_chunk{nullptr};
    // statistics data.
    std::atomic<double> m_max{0}, m_min{0}, m_median{0}, m_middle{0}, m_avg{0},
        m_sum{0};
    std::atomic<uint64> m_rows{0};
  };
  explicit Chunk(Field *field);
  virtual ~Chunk();
  Chunk(Chunk &&) = delete;
  Chunk &operator=(Chunk &&) = delete;

  Chunk_header *get_header() {
    std::scoped_lock lk(m_header_mutex);
    return m_header.get();
  }
  // initial the read opers.
  uint rnd_init(bool scan);
  // End of Rnd scan.
  uint rnd_end();
  // writes the data into this chunk. length unspecify means calc by chunk.
  uchar *write_data_direct(ShannonBase::RapidContext *context, uchar *data,
                           uint length = 0);
  // reads the data by from address .
  uchar *read_data_direct(ShannonBase::RapidContext *context, uchar *buffer);
  // reads the data by rowid.
  uchar *read_data_direct(ShannonBase::RapidContext *context, uchar *rowid,
                          uchar *buffer);
  // deletes the data by rowid.
  uchar *delete_data_direct(ShannonBase::RapidContext *context, uchar *rowid);
  // deletes all.
  uchar *delete_all_direct();
  // updates the data.
  uchar *update_date_direct(ShannonBase::RapidContext *context, uchar *rowid,
                            uchar *data, uint length = 0);
  // flush the data to disk. by now, we cannot impl this part.
  uint flush_direct(RapidContext *context, uchar *from = nullptr,
                    uchar *to = nullptr);
  // the start loc of chunk. where the data wrtes from.
  inline uchar *get_base() const { return m_data_base; }
  // the end loc of chunk. is base + chunk_size
  inline uchar *get_end() const { return m_data_end; }
  // gets the max valid loc of current the data has written to.
  inline uchar *get_data() const { return m_data; }
  bool is_full() { return (m_data == m_data_end) ? true : false; }
  ha_rows records_in_range(ShannonBase::RapidContext *context, double &min_key,
                           double &max_key);

  uchar *where(uint offset);
  uchar *seek(uint offset);

 private:
  std::mutex m_header_mutex;
  std::unique_ptr<Chunk_header> m_header{nullptr};
  // started or not
  std::atomic<uint8> m_inited;
  std::mutex m_data_mutex;
  /** the base pointer of chunk, and the current pos of data. whether data
   * should be in order or not */
  uchar *m_data_base{nullptr};
  // current pointer, where the data is. use write.
  std::atomic<uchar *> m_data{nullptr};
  // pointer of cursor, which used for reading.
  std::atomic<uchar *> m_data_cursor{nullptr};
  // end address of memory, to determine whether the memory is full or not.
  uchar *m_data_end{nullptr};
  // the check sum of this chunk. it used to do check when the data flush to
  // disk.
  uint m_check_sum{0};
  // maigic num of chunk.
  uint m_magic = SHANNON_MAGIC_CHUNK;
};

In its constructor function, to allocate memory and set some pointers.

Chunk::Chunk(Field *field) {
  ut_ad(field);
  ut_ad(ShannonBase::SHANNON_CHUNK_SIZE < rapid_memory_size);
  m_inited = handler::NONE;

  m_header = std::make_unique<Chunk_header> ();
  if (!m_header.get()) {
    assert(false);
    return ;
  }

  /**m_data_base,here, we use the same psi key with buffer pool which used in
   * innodb page allocation. Here, we use ut::xxx to manage memory allocation
   * and free as innobase doese. In SQL lay, we will use MEM_ROOT to manage the
   * memory management. In IMCS, all modules use ut:: to manage memory operations,
   * it's an effiecient memory utils. it has been initialized in
   * ha_innodb.cc: ut_new_boot(); */
  if (likely(rapid_allocated_mem_size + ShannonBase::SHANNON_CHUNK_SIZE <=
      rapid_memory_size)) {
    m_data_base = static_cast<uchar *>(ut::aligned_alloc(ShannonBase::SHANNON_CHUNK_SIZE,
        ALIGN_WORD(ShannonBase::SHANNON_CHUNK_SIZE, SHANNON_ROW_TOTAL_LEN)));

    if (unlikely(!m_data_base)) {
      my_error(ER_SECONDARY_ENGINE_PLUGIN, MYF(0), "Chunk allocation failed");
      return;
    }
    m_data = m_data_base;
    m_data_cursor = m_data_base;
    m_data_end =
        m_data_base + static_cast<ptrdiff_t>(ShannonBase::SHANNON_CHUNK_SIZE);
    rapid_allocated_mem_size += ShannonBase::SHANNON_CHUNK_SIZE;

    m_header->m_avg = 0;
    m_header->m_sum = 0;
    m_header->m_rows = 0;

    m_header->m_max = std::numeric_limits<long long>::lowest();
    m_header->m_min = std::numeric_limits<long long>::max();
    m_header->m_median = std::numeric_limits<long long>::lowest();
    m_header->m_middle = std::numeric_limits<long long>::lowest();

    m_header->m_field_no = field->field_index();
    m_header->m_chunk_type = field->type();
    m_header->m_null = field->is_nullable();
    switch (m_header->m_chunk_type) {
      case MYSQL_TYPE_VARCHAR:
      case MYSQL_TYPE_BIT:
      case MYSQL_TYPE_JSON:
      case MYSQL_TYPE_TINY_BLOB:
      case MYSQL_TYPE_BLOB:
      case MYSQL_TYPE_MEDIUM_BLOB:
      case MYSQL_TYPE_VAR_STRING:
      case MYSQL_TYPE_STRING:
      case MYSQL_TYPE_GEOMETRY:
        m_header->m_varlen = true;
        break;
      default:
        m_header->m_varlen = false;
        break;
    }
  } else {
    my_error(ER_SECONDARY_ENGINE_PLUGIN, MYF(0),
             "Rapid allocated memory exceeds over the maximum");
    return;
  }
}

2.3.3 ART index

Adaptive Radix Tree from https://github.com/armon/libart is introuduced into Rpaid. For more information about ART pls refer to the corresponding papers or soemthing else.

2.3.4 Full Table Scan

The baisc table operation of Rapid engine is full table scan. we have implemented the full table scan.

class ha_rapid : public handler {
  ...
  int rnd_init(bool) override;
  int rnd_next(unsigned char *) override;
  int rnd_end() override;
  ..

In order to support perform the table scan on every CUs. ImcsReader and CuView are employed into Rapid.

The CuView is a view of a Cu, it provides a index or full table scan operation to cruise over the Cu to check the data and return the satisfied data. Taking a sequential full table scan as an instance. It check very chunk to test the visibility condition met or not. if it met, then returned it.

int CuView::read(ShannonBaseContext *context, uchar *buffer, size_t length) {
  DBUG_TRACE;
  ut_a(context && buffer);
  RapidContext *rpd_context = dynamic_cast<RapidContext *>(context);
  if (!m_source_cu) return HA_ERR_END_OF_FILE;

  // gets the chunks belongs to this cu.
  auto chunk = m_source_cu->get_chunk(m_rnd_chunk_rid);
  while (chunk) {
    ptrdiff_t diff = m_rnd_rpos - chunk->get_data();
    if (unlikely(diff >= 0)) {  // to the next
      m_rnd_chunk_rid.fetch_add(1,
          std::memory_order::memory_order_acq_rel);
      chunk = m_source_cu->get_chunk(m_rnd_chunk_rid);
      if (!chunk) return HA_ERR_END_OF_FILE;
      m_rnd_rpos.store(chunk->get_base(), std::memory_order_acq_rel);
      continue;
    }

    uint8 info =
        *((uint8 *)(m_rnd_rpos + SHANNON_INFO_BYTE_OFFSET));  // info byte
    uint64 trxid =
        *((uint64 *)(m_rnd_rpos + SHANNON_TRX_ID_BYTE_OFFSET));  // trxid bytes
    // visibility check at firt.
    table_name_t name{const_cast<char *>(m_source_table->s->table_name.str)};
    ReadView *read_view = trx_get_read_view(rpd_context->m_trx);
    ut_ad(read_view);
    if (!read_view->changes_visible(trxid, name) ||
        (info & DATA_DELETE_FLAG_MASK)) {  // invisible and deleted
      // TODO: travel the change link to get the visibile version data.
      m_rnd_rpos.fetch_add(SHANNON_ROW_TOTAL_LEN,
                           std::memory_order_acq_rel);  // to the next value.
      diff = m_rnd_rpos - chunk->get_data();
      if (diff >= 0) {
        m_rnd_chunk_rid.fetch_add(1, std::memory_order::memory_order_seq_cst);
        chunk = m_source_cu->get_chunk(m_rnd_chunk_rid);
        if (!chunk) return HA_ERR_END_OF_FILE;
        m_rnd_rpos.store(chunk->get_base(), std::memory_order_acq_rel);
        continue;
      }  // no data here to the next.
    }

    memcpy(buffer, m_rnd_rpos, SHANNON_ROW_TOTAL_LEN);
    m_rnd_rpos.fetch_add(SHANNON_ROW_TOTAL_LEN,
                         std::memory_order_acq_rel);  // go to the next.
    return 0;
  }
  return HA_ERR_END_OF_FILE;
}

2.3.5 Index Table Scan

As full table scan, rapid also support index table scan by impl the following interfances.

class ha_rapid : public handler {
 public:
  int index_init(uint keynr, bool sorted) override;

  int index_end() override;

  int index_read(uchar *buf, const uchar *key, uint key_len,
                 ha_rkey_function find_flag) override;

  int index_read_last(uchar *buf, const uchar *key, uint key_len) override;

  int index_next(uchar *buf) override;

  int index_next_same(uchar *buf, const uchar *key, uint keylen) override;

  int index_prev(uchar *buf) override;

  int index_first(uchar *buf) override;

  int index_last(uchar *buf) override;

For the detail, please refer to the corresponding codes.

2.3.6 Index Condition pushdown to Rapid

In ShannonBase Rapid, we support ICP due to the index eanbled in Rapid.

handler::Table_flags ha_rapid::table_flags() const {
  ulong flags = HA_READ_NEXT | HA_READ_PREV | HA_READ_ORDER | HA_READ_RANGE |
                HA_KEYREAD_ONLY | HA_DO_INDEX_COND_PUSHDOWN;
  return flags;
}

int ha_rapid::key_cmp(KEY_PART_INFO *key_part, const uchar *key, uint key_length) {
  uint store_length;

  for (const uchar *end = key + key_length; key < end;
       key += store_length, key_part++) {
    int cmp;
    const int res = (key_part->key_part_flag & HA_REVERSE_SORT) ? -1 : 1;
    store_length = key_part->store_length;
    if (key_part->null_bit) {
      /* This key part allows null values; NULL is lower than everything */
      const bool field_is_null = key_part->field->is_null();
      if (*key)  // If range key is null
      {
        /* the range is expecting a null value */
        if (!field_is_null) return res;  // Found key is > range
        /* null -- exact match, go to next key part */
        continue;
      } else if (field_is_null)
        return -res;  // NULL is less than any value
      key++;          // Skip null byte
      store_length--;
    }
    if ((cmp = key_part->field->key_cmp(key, key_part->length)) < 0)
      return -res;
    if (cmp > 0) return res;
  }
  return 0;  // Keys are equal
}

int ha_rapid::compare_key_icp(const key_range *range) {
  int cmp;
  if (!range) return 0;  // no max range
  cmp = key_cmp(range_key_part, range->key, range->length);
  if (!cmp) cmp = get_key_comp_result();
  if (get_range_scan_direction() == RANGE_SCAN_DESC) cmp = -cmp;
  return cmp;
}

unsigned long ha_rapid::index_flags(unsigned int idx, unsigned int part,
                                   bool all_parts) const {
  //here, we support the same index flag as primary engine.
  const handler *primary = ha_get_primary_handler();
  const unsigned long primary_flags =
      primary == nullptr ? 0 : primary->index_flags(idx, part, all_parts);

  if(pushed_idx_cond) {}
  // Inherit the following index flags from the primary handler, if they are
  // set:
  //
  // HA_READ_RANGE - to signal that ranges can be read from the index, so that
  // the optimizer can use the index to estimate the number of rows in a range.
  //
  // HA_KEY_SCAN_NOT_ROR - to signal if the index returns records in rowid
  // order. Used to disable use of the index in the range optimizer if it is not
  // in rowid order.

  return ((HA_READ_NEXT | HA_READ_PREV | HA_READ_ORDER |
           HA_KEYREAD_ONLY | HA_DO_INDEX_COND_PUSHDOWN |
           HA_READ_RANGE | HA_KEY_SCAN_NOT_ROR) & primary_flags);
}

Item *ha_rapid::idx_cond_push(uint keyno, Item *idx_cond)
{
  DBUG_TRACE;
  ut_ad(keyno != MAX_KEY);
  ut_ad(idx_cond != nullptr);

  pushed_idx_cond = idx_cond;
  pushed_idx_cond_keyno = keyno;
  in_range_check_pushed_down = true;

  /* We will evaluate the condition entirely */
  return nullptr;
}

/**
Index Condition Pushdown interface implementation */

/** Shannon Rapid index push-down condition check
 @return ICP_NO_MATCH, ICP_MATCH, or ICP_OUT_OF_RANGE */
ICP_RESULT
shannon_rapid_index_cond(ha_rapid *h) /*!< in/out: pointer to ha_rapid */
{
  DBUG_TRACE;

  assert(h->pushed_idx_cond);
  assert(h->pushed_idx_cond_keyno != MAX_KEY);

  if (h->end_range && h->compare_key_icp(h->end_range) > 0) {
    /* caller should return HA_ERR_END_OF_FILE already */
    return ICP_OUT_OF_RANGE;
  }

  return h->pushed_idx_cond->val_int() ? ICP_MATCH : ICP_NO_MATCH;
}

2.4 Storage Interface Of Rapid

Rapid Engine handler provides an interface to SQL layer. in this impl full table scan, rnd_next and index table scan, index_read, index_next. And, we also support ICP in rapid engine.

class ha_rapid : public handler {
  ...
}

2.5 Load/Unload Table in Secondary Engine.

The first step of using rapid is to load InnoDB tables into rapid by executing alter table xxx secondary_load coammand. When this commande executed, it will invoke the following code in ha_shannon_rapid.cc.

int ha_rapid::load_table(const TABLE &table_arg) {

In ha_rapid::load_table, at first, to check whether this table has been loaded into or not. If it's, then return, Otherwise, do loading.

  if (shannon_loaded_tables->get(table_arg.s->db.str, table_arg.s->table_name.str) != nullptr) {
    std::ostringstream err;
    err << table_arg.s->db.str << "." <<table_arg.s->table_name.str << " already loaded";
    my_error(ER_SECONDARY_ENGINE_LOAD, MYF(0), err.str().c_str());
    return HA_ERR_GENERIC;
  }

After that to check type of every field is supported or not.

  for (uint idx =0; idx < table_arg.s->fields; idx ++) {
    Field* key_field = *(table_arg.field + idx);
    if (!Utils::Util::is_support_type(key_field->type())) {
      std::ostringstream err;
      err << key_field->field_name << " type not allowed";
      my_error(ER_SECONDARY_ENGINE_LOAD, MYF(0), err.str().c_str());
      return HA_ERR_GENERIC;
    }
  }

In 3rd step, it constructs the Primary key, which used as a Key for ART index. Before this, to check the loading table has a primary key or not. The rapid need a primary key built.

To check primary key missing or not, and the primary key MUST be NOT secondary exclusive.

  context.m_extra_info.m_keynr = 0;
  auto key = (table_arg.key_info + 0);
  for (uint keyid =0; keyid < key->user_defined_key_parts; keyid++) {
    if (key->key_part[keyid].field->is_flag_set(NOT_SECONDARY_FLAG)) {
      my_error(ER_RAPID_DA_PRIMARY_KEY_CAN_NOT_HAVE_NOT_SECONDARY_FLAG, MYF(0),
               table_arg.s->db.str, table_arg.s->table_name.str);
      return HA_ERR_GENERIC;
    }
  }

At final step, do innodb table scan to get each of rows, and write it into the corresponding Cus.

  while ((tmp = table_arg.file->ha_rnd_next(table_arg.record[0])) != HA_ERR_END_OF_FILE) {
   /*** ha_rnd_next can return RECORD_DELETED for MyISAM when one thread is reading and another deleting
    without locks. Now, do full scan, but multi-thread scan will impl in future. */
    if (tmp == HA_ERR_KEY_NOT_FOUND) break;

    auto offset {0};
    memset(context.m_extra_info.m_key_buff.get(), 0x0, key->key_length);
    for (uint key_partid = 0; key_partid < key->user_defined_key_parts; key_partid++) {
      memcpy(context.m_extra_info.m_key_buff.get() + offset,
             key->key_part[key_partid].field->field_ptr(),
             key->key_part[key_partid].store_length);
      offset += key->key_part[key_partid].store_length;
    }
    context.m_extra_info.m_key_len = offset;

    uint32 field_count = table_arg.s->fields;
    Field *field_ptr = nullptr;
    uint32 primary_key_idx [[maybe_unused]] = field_count;

    context.m_trx = thd_to_trx(m_rpd_thd);
    field_ptr = *(table_arg.field + field_count); //ghost field.
    if (field_ptr && field_ptr->type() == MYSQL_TYPE_DB_TRX_ID) {
      context.m_extra_info.m_trxid = field_ptr->val_int();
    }

    if (context.m_trx->state == TRX_STATE_NOT_STARTED) {
      assert (false);
    }
    //will used rowid as rapid pk.
    //if (imcs_instance->write_direct(&context, field_ptr)) {
    if (imcs_reader->write(&context, const_cast<TABLE*>(&table_arg)->record[0])) {
      table_arg.file->ha_rnd_end();
      imcs_instance->delete_all_direct(&context);
      my_error(ER_SECONDARY_ENGINE_LOAD, MYF(0), table_arg.s->db.str,
               table_arg.s->table_name.str);
      return HA_ERR_GENERIC;
    }
    ha_statistic_increment(&System_status_var::ha_read_rnd_count);
    m_rpd_thd->inc_sent_row_count(1);
    if (tmp == HA_ERR_RECORD_DELETED && !thd->killed) continue;
  }

If alter table xxx secondary_unload executed, all the loaded data will be erased from rapid engine.

int ha_rapid::unload_table(const char *db_name, const char *table_name,
                          bool error_if_not_loaded) {
  DBUG_TRACE;
  if (error_if_not_loaded &&
      shannon_loaded_tables->get(db_name, table_name) == nullptr) {
    my_error(ER_SECONDARY_ENGINE_PLUGIN, MYF(0),
             "Table is not loaded on a secondary engine");
    return HA_ERR_GENERIC;
  }
  
  ShannonBase::Imcs::Imcs* imcs_instance = ShannonBase::Imcs::Imcs::get_instance();
  assert(imcs_instance);
  RapidContext context;
  context.m_current_db = std::string(db_name);
  context.m_current_table = std::string(table_name);

  if (auto ret = imcs_instance->delete_all_direct(&context)) {
    return ret;
  }
  shannon_loaded_tables->erase(db_name, table_name);
  return 0;
}

2.6 Local dictionary

Each of Cus has a local dictionary, which is used to keep a dictionary of compressed string. In rapid, we dont store the string or text natviely, just using a double type id instead.

In loading table stage, a text or string value of a field will be processed by local dictionar instance to process via invoking `Dictionary::store. it returns an double value as string value's id, and this id stored in rapid.

In reading table stage, we get the string text by its id via location dictionary. Before using local dictionary process it, all string texts are compressed via zstd or lz4, etc according its field encoding type.

  std::string comment(field->comment.str);
  std::transform(comment.begin(), comment.end(), comment.begin(), ::toupper);
  if (std::regex_search(comment.c_str(), column_encoding_patt)) {
    if (comment.find("SORTED") != std::string::npos)
      m_header->m_encoding_type = Compress::Encoding_type::SORTED;
    else if (comment.find("VARLEN") != std::string::npos)
      m_header->m_encoding_type = Compress::Encoding_type::VARLEN;
  } else
    m_header->m_encoding_type = Compress::Encoding_type::NONE;
  m_header->m_local_dict =
      std::make_unique<Compress::Dictionary>(m_header->m_encoding_type);

4 AutoML

4.1 System procedures, sys.ml_xxx

In ShannonBase, we support several MLs. In ${source_code_dir}/scripts/sys_schema/procedures folder, we added some system procedures, and which will be added into sys.xxx after mysql initialization.

Taking sys.ML_train as an instance

DROP PROCEDURE IF EXISTS sys.ml_train;
DELIMITER $$

CREATE PROCEDURE sys.ml_train (
        IN in_table_name VARCHAR(64), IN in_target_name VARCHAR(64), IN in_option JSON, IN in_model_handle VARCHAR(64)
    )
    COMMENT '
Description
-----------

Run the ML_TRAIN routine on a labeled training dataset to produce a trained machine learning model.

Parameters
-----------
in_table_name (VARCHAR(64)):
  fully qualified name of the table containing the training dataset.
in_target_name (VARCHAR(64)):
  name of the column in \'table_name\' representing the target, i.e. ground truth values (required for some tasks)
in_option (JSON)
  optional training parameters as key-value pairs in JSON format.
    1: The most important parameter is \'task\', which specifies the ML task to be performed (if not specified, \'classification\' is assumed)
    2: Other parameters allow finer-grained control on the training task
in_model_handle (VARCHAR(64))
   user-defined session variable storing the ML model handle for the duration of the connection
Example
-----------
mysql> SET @iris_model = \'iris_manual\';
mysql> CALL sys.ML_TRAIN(\'ml_data.iris_train\', \'class\', 
          JSON_OBJECT(\'task\', \'classification\'), 
          @iris_model);
...    
'
    SQL SECURITY INVOKER
    NOT DETERMINISTIC
    CONTAINS SQL
BEGIN
    DECLARE v_error BOOLEAN DEFAULT FALSE;
    DECLARE v_user_name VARCHAR(64);
    DECLARE v_db_name_check VARCHAR(64);
    DECLARE v_sys_schema_name VARCHAR(64);
    DECLARE v_db_err_msg TEXT;

    DECLARE v_train_obj_check INT;
    DECLARE v_train_schema_name VARCHAR(64);
    DECLARE v_train_table_name VARCHAR(64);

    SELECT SUBSTRING_INDEX(CURRENT_USER(), '@', 1) INTO v_user_name;  
    SET v_sys_schema_name = CONCAT('ML_SCHEMA_', v_user_name);
  
    SELECT SCHEMA_NAME INTO v_db_name_check
      FROM INFORMATION_SCHEMA.SCHEMATA
    WHERE SCHEMA_NAME = v_sys_schema_name;

    IF v_db_name_check IS NULL THEN
        SET @create_db_stmt = CONCAT('CREATE DATABASE ', v_sys_schema_name, ';');
        PREPARE create_db_stmt FROM @create_db_stmt;
        EXECUTE create_db_stmt;
        DEALLOCATE PREPARE create_db_stmt;

        SET @create_tb_stmt = CONCAT(' CREATE TABLE ', v_sys_schema_name, '.MODEL_CATALOG(
                                        MODEL_ID INT NOT NULL AUTO_INCREMENT,
                                        MODEL_HANDLE VARCHAR(255),
                                        MODEL_OBJECT JSON,
                                        MODEL_OWNER VARCHAR(64),
                                        BUILD_TIMESTAMP TIMESTAMP,
                                        TARGET_COLUMN_NAME VARCHAR(64),
                                        TRAIN_TABLE_NAME VARCHAR(255),
                                        MODEL_OBJECT_SIZE INT,
                                        MODEL_TYPE  VARCHAR(64),
                                        TASK  VARCHAR(64),
                                        COLUMN_NAMES VARCHAR(1024),
                                        MODEL_EXPLANATION NUMERIC,
                                        LAST_ACCESSED TIMESTAMP,
                                        MODEL_METADATA JSON,
                                        NOTES VARCHAR(1024),
                                        PRIMARY KEY (MODEL_ID));');
        PREPARE create_tb_stmt FROM @create_tb_stmt;
        EXECUTE create_tb_stmt;
        DEALLOCATE PREPARE create_tb_stmt;
    END IF;

    SELECT SUBSTRING_INDEX(in_table_name, '.', 1) INTO v_train_schema_name;
    SELECT SUBSTRING_INDEX(in_table_name, '.', -1) INTO v_train_table_name;
    
    SELECT COUNT(*) INTO v_train_obj_check
    FROM INFORMATION_SCHEMA.TABLES
    WHERE TABLE_SCHEMA = v_train_schema_name AND TABLE_NAME = v_train_table_name;
    IF v_train_obj_check = 0 THEN
        SET v_db_err_msg = CONCAT(in_table_name, ' does not exists.');
        SIGNAL SQLSTATE 'HY000'
            SET MESSAGE_TEXT = v_db_err_msg;
    END IF;
  
    SELECT COUNT(COLUMN_NAME) INTO v_train_obj_check
    FROM INFORMATION_SCHEMA.COLUMNS
    WHERE TABLE_SCHEMA = v_train_schema_name AND TABLE_NAME = v_train_table_name AND COLUMN_NAME = in_target_name;
    IF v_train_obj_check = 0 THEN
        SET v_db_err_msg = CONCAT(in_target_name, ' does not exists.');
        SIGNAL SQLSTATE 'HY000'
            SET MESSAGE_TEXT = v_db_err_msg;
    END IF;
    SELECT ml_train(in_table_name, in_target_name, in_option, in_model_handle) INTO v_train_obj_check;
    IF v_train_obj_check != 0 THEN
        SET v_db_err_msg = CONCAT('ML_TRAIN failed.');
        SIGNAL SQLSTATE 'HY000'
            SET MESSAGE_TEXT = v_db_err_msg;
    END IF; 
END$$
DELIMITER ;

Besides, system procedures added, some system SQL functions also added, which used to invoke ML functions to perform the ML tasks.

4.2 System SQL functions

In ShannonBase, it added some system internal functions, such as ml_train. In item_func.cc, some functions are added.

static const std::pair<const char *, Create_func *> func_array[] = {
    {"ABS", SQL_FN(Item_func_abs, 1)},
    {"ACOS", SQL_FN(Item_func_acos, 1)},
    ...
    {"MD5", SQL_FN(Item_func_md5, 1)},
    {"ML_TRAIN", SQL_FN_V_LIST(Item_func_ml_train, 3, 4)},
    {"ML_MODEL_LOAD", SQL_FN_LIST(Item_func_ml_model_load, 3)},
    {"ML_MODEL_UNLOAD", SQL_FN_LIST(Item_func_ml_model_unload, 2)},
    {"ML_MODEL_IMPORT", SQL_FN_LIST(Item_func_ml_model_import, 4)},
    {"ML_SCORE", SQL_FN_V_LIST(Item_func_ml_score, 5, 6)},
    {"ML_PREDICT_ROW", SQL_FN_LIST(Item_func_ml_predicte_row, 2)},
    {"ML_PREDICT_TABLE", SQL_FN_V_LIST(Item_func_ml_predicte_table, 3, 4)},
    {"ML_EXPLAIN", SQL_FN_V_LIST(Item_func_ml_explain, 3, 4)},
    {"ML_EXPLAIN_ROW", SQL_FN_V_LIST(Item_func_ml_explain_row, 2, 3)},
    {"ML_EXPLAIN_TABLE", SQL_FN_V_LIST(Item_func_ml_explain_table, 3, 4)},
    {"MONTHNAME", SQL_FN(Item_func_monthname, 1)},
    {"NAME_CONST", SQL_FN(Item_name_const, 2)},
    ...

and in item_func.xx, the implementations are in these two files.

5 Populating the changes from InnoDB to Rapid

As a htap database, ShannonBase has abilitiy of populating the changes in innodb to rapid in real-time, which makes it can do real time analytical workloads.

Shannonbase use redo log to achieve this population. When a transaction do commit, it will write some redo logs. Therefore, we can have a copy of this redo log, and then parse it to apply these changes to rapid engine.

Base the idea of above, we use a circular ring buffer to store these redo log. Only, the redo logs of insert, delete, and update operation will be stored into ring buffer.

A backgroud thread will be launched after alter table xxx secondary_load statement executed. And, it will be stopped when all loaded tables are unloaded from rapid engine.

If you want to know the status info of rapid engine, you can use show engine innodb status to list all informations including rapid engine. Here, we dont split rapid engine status information from innodb's because rapid is a sub-engine of innodb. The innodb engine is primary engine, the rapid is secondary one. image

5.1 Change Propagation Buffer.

we incorporate a unordered_map to store the new comming innodb redo log records with its start lsn.

std::unordered_map<uint64_t, mtr_log_rec> sys_pop_buff;

5.2 Background Thread

A new backgroup thread employed to monitor the change propagation buffer. when a new redo log record comes, it will wake up and start to process this redo log by launching a new worker thread to process, then waitting for the result of processing. After that it wil hibernate 200ms or the event comes.

in populate.cpp, the implementation of this thread be given. parse_log_func_main is the thread function.

static void parse_log_func_main(log_t *log_ptr) {
  // here we have a notifiyer, start pop. ref: https://dev.mysql.com/doc/heatwave/en/mys-hw-change-propagation.html
  while (srv_shutdown_state.load(std::memory_order_acquire) == SRV_SHUTDOWN_NONE &&
         sys_pop_started.load(std::memory_order_acquire)) {
    auto stop_condition = [&](bool wait) {
      if (sys_pop_data_sz.load(std::memory_order_acquire) >= SHANNON_POPULATION_HRESHOLD_SIZE) {
        return true;
      }

      if (unlikely(wait)) {
        os_event_set(log_sys->rapid_events[0]);
        return true;
      }
      return false;
    };

    // waiting until the 200ms reached or the incomming data in buffer more than 64MB.
    os_event_wait_for(log_ptr->rapid_events[0], MAX_LOG_POP_SPINS, std::chrono::microseconds{MAX_WAIT_TIMEOUT},
                      stop_condition);
    os_event_reset(log_sys->rapid_events[0]);

    // thread stopped.
    if (unlikely(!sys_pop_started.load(std::memory_order_acquire))) break;

    // pop buffer is empty, then re-check the condtion.
    if (likely(sys_pop_buff.empty())) continue;

    // we only use a half of threads to do propagation.
    std::vector<std::future<uint64_t>> results;
    size_t thread_num = std::thread::hardware_concurrency() / 2;
    thread_num = (thread_num > sys_pop_buff.size()) ? sys_pop_buff.size() : thread_num;
    auto curr_iter = sys_pop_buff.begin();
    for (size_t counter = 0; counter < thread_num; counter++) {
      mutex_enter(&log_sys->rapid_populator_mutex);
      byte *from_ptr = curr_iter->second.data.get();
      auto size = curr_iter->second.size;

      // using std thread, not IB_thread, ib_thread has not interface to thread func ret.
      results.emplace_back(
          std::async(std::launch::async, parse_mtr_log_worker, curr_iter->first, from_ptr, from_ptr + size, size));
      curr_iter++;
      mutex_exit(&log_sys->rapid_populator_mutex);
    }

    for (auto &res : results) {  // gets the result from worker thread.
      auto ret_lsn = res.get();
      if (!ret_lsn) {  // propagation failure occure.
        std::stringstream error_message;
        error_message << "Propagation thread occur errors, it makes loaded data to be stale"
                      << ". Please unload loaded tables, and then load tables again manually.";
        push_warning(current_thd, Sql_condition::SL_WARNING, errno, error_message.str().c_str());
      }

      mutex_enter(&log_sys->rapid_populator_mutex);
      auto iter = sys_pop_buff.find(ret_lsn);
      ut_a(iter != sys_pop_buff.end());
      sys_pop_data_sz.fetch_sub(iter->second.size);
      sys_pop_buff.erase(ret_lsn);
      mutex_exit(&log_sys->rapid_populator_mutex);
    }

    sys_rapid_loop_count++;
  }

  sys_pop_started.store(false, std::memory_order_seq_cst);
}
static uint64_t parse_mtr_log_worker(uint64_t start_lsn, const byte *start, const byte *end, size_t sz) {
  THD *log_pop_thread_thd{nullptr};
  if (current_thd == nullptr) {
    my_thread_init();
    log_pop_thread_thd = create_internal_thd();
    if (!log_pop_thread_thd) {
      my_thread_end();
      return start_lsn;
    }
  }

#if !defined(_WIN32)  // here we
  pthread_setname_np(pthread_self(), "rapid_log_wkr");
#else
  SetThreadDescription(GetCurrentThread(), L"rapid_log_wkr");
#endif
  LogParser parse_log;

  Rapid_load_context context;
  auto parsed_bytes = parse_log.parse_redo(&context, const_cast<byte *>(start), const_cast<byte *>(end));
  ut_a(parsed_bytes == sz);

  if (log_pop_thread_thd) {
    my_thread_end();
    destroy_internal_thd(log_pop_thread_thd);
    log_pop_thread_thd = nullptr;
  }

  return (parsed_bytes == sz) ? start_lsn : 0;
}

in Populator::start_change_populate_threads, we start this background thread.

void Populator::start_change_populate_threads() {
  if (!Populator::log_pop_thread_is_active() && shannon_loaded_tables->size()) {
    srv_threads.m_change_pop = os_thread_create(rapid_populate_thread_key, 0, parse_log_func_main, log_sys);
    ShannonBase::Populate::sys_pop_started = true;
    srv_threads.m_change_pop.start();
    assert(log_pop_thread_is_active());
  }
}

To use end_change_populate_threads to stop this background thread.

void Populator::end_change_populate_threads() {
  if (Populator::log_pop_thread_is_active() && shannon_loaded_tables->size()) {
    sys_pop_started.store(false, std::memory_order_seq_cst);
    os_event_set(log_sys->rapid_events[0]);
    srv_threads.m_change_pop.join();
    sys_rapid_loop_count = 0;
    ShannonBase::Populate::sys_pop_started = false;
    assert(log_pop_thread_is_active() == false);
  }
}

The rules of change propagation buffer is as following:

  • Every 200ms.

  • When the change propagation buffer reaches its 64MB capacity.

  • When data updated by DML operations on the MySQL DB System are read by a subsequent HeatWave query.

And, you can use the statement to check the status of that:

mysql> SELECT VARIABLE_VALUE
          FROM performance_schema.global_status
          WHERE VARIABLE_NAME = 'rapid_change_propagation_status';
+----------------+
| VARIABLE_VALUE |
+----------------+
| ON             |
+----------------+

5.3 Adding the redo logs

As we known, redo log will be written when transaction be commmitted. Therefore, if the redo log is MLOG_REC_INSERT, MLOG_REC_DELETE and MLOG_REC_UPDATE_IN_PLACE, we add these redo log records into the ring buffer, then start to process these redo log records.

In log_buffer_write function, the redo log records firstly will be inserted into redo log buffer. And, at this point, we add our logic here, also making a copy of that.

lsn_t log_buffer_write(log_t &log, const byte *str, size_t str_len,
                       lsn_t start_lsn) {
  ut_ad(rw_lock_own(log.sn_lock_inst, RW_LOCK_S));
  ...  
    log_sync_point("log_buffer_write_before_memcpy");

    /* This is the critical memcpy operation, which copies data
    from internal mtr's buffer to the shared log buffer. */
    std::memcpy(ptr, str, len);
    auto type = mlog_id_t(*ptr & ~MLOG_SINGLE_REC_FLAG);
    if (ShannonBase::Populate::Populator::log_pop_thread_is_active() &&
        !recv_recovery_is_on()) {
        ShannonBase::Populate::sys_population_buffer->writeBuff(str, len);
    }
  ... 

Here, sys_population_buffer is our defined ring buffer. After log records write up, it send a notification to change pop thread in log_buffer_write_completed(...).

os_event_set(log.rapid_events[0]);

5.3 Parsing redo log

LogParser is used to parse the redo log and apply the changes to rapid. the function entry is uint LogParser::parse_redo(byte* ptr, byte* end_ptr). The format of redo log can be found at redo log format.

uint LogParser::parse_multi_rec(byte *ptr, byte *end_ptr) {
  ut_a(end_ptr >= ptr);
  return (end_ptr - ptr);
}
// handle single mtr
uint LogParser::parse_redo(byte* ptr, byte* end_ptr) {
/**
 * after secondary_load command excuted, all the data read from data file. the last
 * checkpoint lsn makes all the data lsn is samller than it were written to data file.
*/
    if (ptr == end_ptr) {
      return 0;
    }

    bool single_rec;
    switch (*ptr) {
#ifdef UNIV_LOG_LSN_DEBUG
      case MLOG_LSN:
#endif /* UNIV_LOG_LSN_DEBUG */
      case MLOG_DUMMY_RECORD:
        single_rec = true;
        break;
      default:
        single_rec = !!(*ptr & MLOG_SINGLE_REC_FLAG);
    }

    return (single_rec) ?  parse_single_rec(ptr, end_ptr) :
                           parse_multi_rec(ptr, end_ptr);
}

parse_cur_and_apply_insert_rec, parse_cur_and_apply_delete_rec and parse_cur_update_in_place_and_apply are used to process insert, delete and update all three types of redo log.

for more information about the these three functions, pls ref to the source code at: implementation

6 Intelligent Optimization and Vectorized Execution

6.1 Architectural Overview

Rapid Engine is a high-performance secondary storage engine within the ShannonBase database system, specifically designed for modern hybrid transactional/analytical processing (HTAP) environments. Through innovative intelligent offloading decision mechanisms and deep vectorized execution technology, it delivers exceptional performance for analytical workloads while maintaining full MySQL compatibility.

6.2 Decision Mechanism: RapidOptimize

Intelligent Offloading Decision Flow The RapidOptimize function serves as the core decision hub within MySQL's secondary engine framework, implementing a comprehensive three-tier decision process:

flowchart TD
    A[SQL Query Request] --> B[MySQL Optimizer Generates Execution Plan]
    B --> C{RapidOptimize Decision}
    
    subgraph C[Three-Tier Decision System]
        C1{Engine Enablement Check}
        C2{Data Freshness Check}
        C3{Capability Support Check}
        
        C1 -- Disabled --> D[Fallback to Primary Engine]
        C1 -- Enabled --> C2
        C2 -- High Latency --> D
        C2 -- Normal Latency --> C3
        C3 -- Not Supported --> D
        C3 -- Supported --> E[Execution Plan Rewrite]
    end

    E --> F[Vectorized Iterator Generation]
    F --> G[Rapid Engine Execution]
    G --> H[Result Return]
    
    D --> I[Primary Engine Execution]
    I --> H
Loading

6.2.1 Key Technical Implementation

// Decision Core Code
static bool RapidOptimize(OptimizeContext *context, THD *thd, LEX *lex) {
  // Tier 1: Engine Enablement Check
  if (thd->variables.use_secondary_engine == SECONDARY_ENGINE_OFF) {
    return true; // Fallback to primary engine
  }

  // Tier 2: Data Synchronization Check
  ulonglong too_much_pop_threshold = 
      static_cast<ulonglong>(SHANNON_TO_MUCH_POP_THRESHOLD_RATIO * rpd_pop_buff_sz_max);
  if (Populate::sys_pop_buff.size() > SHANNON_POP_BUFF_THRESHOLD_COUNT ||
      Populate::sys_pop_data_sz > too_much_pop_threshold) {
    return true; // Data not synchronized, fallback
  }

  // Tier 3: Execution Plan Rewrite
  auto root_access_path = lex->unit->root_access_path();
  JOIN *join = lex->unit->first_query_block()->join;
  
  // Deep traversal and execution plan rewrite
  WalkAndRewriteAccessPaths(root_access_path, join, WalkAccessPathPolicy::ENTIRE_TREE,
    [&](AccessPath *path, const JOIN *join) -> AccessPath * {
      return OptimizeAndRewriteAccessPath(context, path, join);
    });

  // Regenerate vectorized iterators
  lex->unit->release_root_iterator().reset();
  auto new_root_iter = PathGenerator::CreateIteratorFromAccessPath(
      thd, context, lex->unit->root_access_path(), join, true);

  lex->unit->set_root_iterator(new_root_iter);
  return false; // Use Rapid Engine
}

6.3 Vectorized Execution System

6.3.1 Columnar Storage Architecture

class ColumnChunk {
private:
  std::unique_ptr<uchar[]> m_cols_buffer;        // Column data buffer
  std::unique_ptr<bit_array_t> m_null_mask;      // Null value bitmap
  std::atomic<size_t> m_current_size;            // Current data volume
  size_t m_chunk_size = SHANNON_VECTOR_WIDTH;    // Vectorization width
  size_t m_field_width;                          // Field width
  alignas(CACHE_LINE_SIZE) char m_padding;       // Cache line alignment

public:
  // Vectorized processing interface
  bool add_batch(const std::vector<std::pair<const uchar *, size_t>> &data_batch,
                 const std::vector<bool> &null_flags);
  
};

6.3.2 Adaptive Vectorized Iterator

Modern data workloads are hybrid — they demand OLTP speed and OLAP-scale aggregation within the same SQL engine. Traditional MySQL aggregation is fundamentally row-based, optimized for correctness but limited in throughput.

ShannonBase introduces the Vectorized Aggregate Iterator (v1) — a fully compatible, high-performance aggregation engine that processes data in columnar batches using SIMD acceleration and adaptive batch scheduling.

This paper describes the design, architecture, and performance results of the module — part of ShannonBase’s mission to make AI-native, HTAP-ready SQL open and accessible. MySQL’s legacy aggregation flow:

while (source->Read() == 0)
  aggregate_items();

This per-row model suffers from:

  • Excessive function call overhead (Field::val_*)

  • Cache inefficiency due to scattered memory access

  • Lack of SIMD or parallel execution

  • Poor scaling for analytical queries

ShannonBase’s Vectorized Aggregate Iterator solves these issues by:

  • Converting row data into columnar batches (ColumnChunk)

  • Processing aggregations in vectorized form

  • Dynamically adapting batch size to CPU performance

  • Preserving 100% MySQL semantics (including rollup & null handling)

RowIterator(Source)
   ↓
Batch Loader (ReadRowsIntoCurrentBatch)
   ↓
ColumnChunk[]
   ↓
SIMD Aggregator (Sum / Count / Min / Max)
   ↓
Adaptive Batch Scheduler
   ↓
Group Output (rollup aware)

Component Description
ColumnChunk Columnar in-memory buffer with typed storage
VectorizedAggregateIterator Main execution iterator handling vectorized aggregation
ColumnChunkOper SIMD-ready aggregation primitives (Sum, Count, Min, Max)
Adaptive Batch Controller Dynamically adjusts batch size based on observed latency
Fallback Path Guarantees correctness for non-vectorizable aggregates (e.g., JSON, BLOB, UDFs)

ColumnChunk: A lightweight in-memory column buffer representing a single field for a batch.

class ColumnChunk {
 public:
  // ctor
  ColumnChunk(Field *mysql_fld, size_t size);

  // Destructor (default is fine since we use smart pointers)
  virtual ~ColumnChunk() = default;

  // Copy ctor operator
  ColumnChunk(const ColumnChunk &other);

  // Copy assignment operator
  ColumnChunk &operator=(const ColumnChunk &other);

  // Move constructor
  ColumnChunk(ColumnChunk &&other) noexcept;

  // Move assignment operator
  ColumnChunk &operator=(ColumnChunk &&other) noexcept;

  // set the indexth is null.
  inline void set_null(size_t index) {
    assert(m_null_mask.get());
    assert(index < m_chunk_size);
    ShannonBase::Utils::Util::bit_array_set(m_null_mask.get(), index);
  }

  // to tell indexth is null or not.
  inline bool nullable(size_t index) const {
    assert(m_null_mask.get());
    assert(index < m_chunk_size);
    return ShannonBase::Utils::Util::bit_array_get(m_null_mask.get(), index);
  }

  bool add(uchar *data, size_t length, bool null);
  bool add_batch(const std::vector<std::pair<const uchar *, size_t>> &data_batch, const std::vector<bool> &null_flags);

  // remove the last row data.
  inline bool remove() {
    if (m_current_size.load(std::memory_order_relaxed) == 0) return true;
    m_current_size.fetch_sub(1, std::memory_order_acq_rel);
    return true;
  }

  inline const uchar *data(size_t index) const {
    assert(index < m_current_size.load(std::memory_order_relaxed));
    return m_cols_buffer.get() + (index * m_field_width);
  }

  inline uchar *mutable_data(size_t index) {
    assert(index < m_current_size.load(std::memory_order_relaxed));
    return m_cols_buffer.get() + (index * m_field_width);
  }

  inline bool empty() const { return m_current_size.load(std::memory_order_relaxed) == 0; }

  inline bool full() const { return m_current_size.load(std::memory_order_relaxed) >= m_chunk_size; }

  inline size_t size() const { return m_current_size.load(std::memory_order_relaxed); }

  inline size_t capacity() const { return m_chunk_size; }

  inline size_t width() const { return m_field_width; }

  inline size_t remaining() const { return m_chunk_size - m_current_size.load(std::memory_order_relaxed); }

  inline enum_field_types field_type() const { return m_type; }

  inline Field *source_field() const { return m_source_fld; }

  void clear() {
    m_current_size.store(0, std::memory_order_release);

    if (m_null_mask) {
      memset(m_null_mask.get()->data, 0x0, m_null_mask.get()->size);
    }

#ifndef NDEBUG
    if (m_cols_buffer) {
      std::memset((void *)m_cols_buffer.get(), 0, m_chunk_size * m_field_width);
    }
#endif
  }

  void resize(size_t new_size) {
    assert(new_size <= m_chunk_size);
    m_current_size.store(new_size, std::memory_order_release);
  }

  bool reserve(size_t additional_space) {
    size_t current = m_current_size.load(std::memory_order_relaxed);
    return (current + additional_space) <= m_chunk_size;
  }

  size_t compact();

  struct MemoryUsage {
    size_t data_buffer_bytes;
    size_t null_mask_bytes;
    size_t total_bytes;
    double utilization_ratio;
  };

  MemoryUsage get_memory_usage() const {
    MemoryUsage usage{};
    usage.data_buffer_bytes = m_chunk_size * m_field_width;
    usage.null_mask_bytes = (m_chunk_size + 7) / 8;
    usage.total_bytes = usage.data_buffer_bytes + usage.null_mask_bytes + sizeof(*this);

    size_t current = m_current_size.load(std::memory_order_relaxed);
    usage.utilization_ratio = m_chunk_size > 0 ? static_cast<double>(current) / m_chunk_size : 0.0;

    return usage;
  }

  inline ShannonBase::bit_array_t *get_null_mask() const { return m_null_mask.get(); }

 private:
  inline void initialize_buffers();

  void copy_from(const ColumnChunk &other);

  void swap(ColumnChunk &other);

 private:
  // source field.
  Field *m_source_fld;

  // data type in mysql.
  enum_field_types m_type{MYSQL_TYPE_NULL};

  size_t m_field_width{0};

  // current rows number of this chunk.
  std::atomic<size_t> m_current_size{0};

  // VECTOR_WIDTH
  size_t m_chunk_size{0};

  // to keep the every column data in VECTOR_WIDTH count. the order is same with field order.
  std::unique_ptr<uchar[]> m_cols_buffer{nullptr};

  // null bitmap of all data in this Column Chunk.
  std::unique_ptr<ShannonBase::bit_array_t> m_null_mask{nullptr};
};
class VectorizedTableScanIterator : public TableRowIterator {
private:
  // Performance monitoring metrics
  struct PerformanceMetrics {
    std::chrono::nanoseconds avg_batch_time;
    size_t total_rows;
    size_t total_batches;
    double cache_hit_rate;
  };

  std::vector<ColumnChunk> m_col_chunks;     // Column chunk collection
  size_t m_optimal_batch_size;               // Optimal batch size
  size_t m_current_batch_size;               // Current batch size
  PerformanceMetrics m_metrics;              // Performance metrics

public:
  int ReadNextBatch() {
    auto start_time = std::chrono::high_resolution_clock::now();
    
    // Batch data reading
    size_t read_cnt = 0;
    int result = m_data_table->next_batch(m_batch_size, m_col_chunks, read_cnt);
    
    // Performance monitoring and adaptive adjustment
    UpdatePerformanceMetrics(start_time);
    AdaptBatchSize();
    
    return result;
  }

  void AdaptBatchSize() {
    // Dynamic batch size adjustment based on historical performance
    if (m_metrics.avg_batch_time > 50ms) {
      m_batch_size = std::max(m_batch_size / 2, SHANNON_VECTOR_WIDTH);
    } else if (m_metrics.avg_batch_time < 10ms) {
      m_batch_size = std::min(m_batch_size * 2, m_optimal_batch_size * 4);
    }
  }
};

class VectorizedAggregateIterator final : public RowIterator {
 public:
  VectorizedAggregateIterator(THD *thd, unique_ptr_destroy_only<RowIterator> source, JOIN *join,
                              pack_rows::TableCollection tables, bool rollup, double expected_rows = 0.0);

  ~VectorizedAggregateIterator() override = default;

  bool Init() override;
  int Read() override;
  void SetNullRowFlag(bool is_null_row) override;
  void StartPSIBatchMode() override;
  void EndPSIBatchModeIfStarted() override;
  void UnlockRow() override;

  // Performance monitoring
  struct VectorizationStats {
    size_t total_batches_processed{0};
    size_t total_rows_vectorized{0};
    size_t traditional_fallbacks{0};
    double avg_batch_processing_time_ms{0.0};
    double total_vectorized_time_ms{0.0};
    size_t cache_hits{0};
    size_t cache_misses{0};
  };

  const VectorizationStats &GetStats() const { return m_stats; }

 private:
  // Core members (identical to original AggregateIterator)
  unique_ptr_destroy_only<RowIterator> m_source;
  JOIN *m_join;
  const bool m_rollup;
  pack_rows::TableCollection m_tables;

  // Keep original state machine
  enum State {
    READING_FIRST_ROW,
    PROCESSING_CURRENT_GROUP,
    LAST_ROW_STARTED_NEW_GROUP,
    OUTPUTTING_ROLLUP_ROWS,
    DONE_OUTPUTTING_ROWS
  } m_state;

  bool m_seen_eof;
  table_map m_save_nullinfo;

  int m_last_unchanged_group_item_idx;
  int m_current_rollup_position;
  String m_first_row_this_group;
  String m_first_row_next_group;
  int m_output_slice;

  // Vectorization infrastructure
  struct VectorizedGroupProcessor {
    // Batch storage for current group
    struct RowBatch {
      std::vector<ColumnChunk> column_chunks;
      size_t row_count{0};
      size_t capacity{0};
      bool initialized{false};

      void clear() {
        for (auto &chunk : column_chunks) {
          chunk.clear();
        }
        row_count = 0;
      }

      bool full() const { return row_count >= capacity; }
    };

    RowBatch current_batch;

    // Aggregate function analysis
    struct AggregateInfo {
      Item_sum *item;
      Item_sum::Sumfunctype type;
      Field *source_field;  // Primary field for this aggregate
      bool vectorizable;
      size_t field_index;  // Index in column chunks
    };

    std::vector<AggregateInfo> aggregate_infos;
    bool can_vectorize_current_group{false};
    bool analysis_complete{false};

    // Performance state
    size_t optimal_batch_size{1024};
    double recent_processing_times[10]{0.0};
    size_t time_index{0};

    void reset() {
      current_batch.clear();
      can_vectorize_current_group = false;
    }
  };

  VectorizedGroupProcessor m_vectorizer;
  VectorizationStats m_stats;

  // Configuration
  size_t m_max_batch_size{4096};
  size_t m_min_batch_size{64};
  double m_target_batch_time_ms{10.0};
  bool m_vectorization_enabled{true};

  // Core processing methods
  void SetRollupLevel(int level);
  int ProcessCurrentGroupTraditional();
  int ProcessCurrentGroupVectorized();

  // Vectorization setup and analysis
  void InitializeVectorization();
  bool AnalyzeAggregatesForVectorization();
  void SetupColumnChunks();
  void UpdateBatchSizeFromPerformance(double processing_time_ms);

  // Batch processing
  int ReadRowsIntoCurrentBatch();
  int ProcessVectorizedAggregates();
  void RestoreFirstRowOfCurrentGroup();
  void RestoreRowFromBatch(size_t row_idx, size_t agg_idx);

  // Aggregate type handlers
  int ProcessCountAggregates(const std::vector<size_t> &count_indices);
  int ProcessSumAggregates(const std::vector<size_t> &sum_indices);
  int ProcessMinMaxAggregates(const std::vector<size_t> &minmax_indices);
  int ProcessAvgAggregates(const std::vector<size_t> &avg_indices);

  // Utility methods
  bool IsSimpleAggregate(Item_sum *item) const;
  Field *GetPrimaryFieldForAggregate(Item_sum *item) const;
  void LogPerformanceMetrics();
};

6.4 Execution Plan Rewrite Technology

6.4.1 Conversion Logic

  switch (path->type) {
    case AccessPath::TABLE_SCAN: {
      TABLE *table = path->table_scan().table;
      bool can_vectorized = (table->file->stats.records >= SHANNON_VECTOR_WIDTH);
      context->can_vectorized = can_vectorized;
      
      // Create vectorized table scan path
      return AccessPathFactory::CreateTableScan(table, nullptr, 
                                              table->s->is_secondary_engine() && can_vectorized);
    }
    
    case AccessPath::HASH_JOIN: {
      if (context->can_vectorized) {
        // Create vectorized hash join
        return AccessPathFactory::CreateHashJoin(path->hash_join().outer,
                                               path->hash_join().inner,
                                               true);
      }
      return path; // Keep original
    }
    
    // Conversion logic for other operation types...
  }
}

Complete Execution Flow

1: Decision Phase

  • Query parsing and preliminary optimization

  • RapidOptimize three-tier decision making

  • Execution plan rewrite approval

2: Preparation Phase

  • Vectorized execution plan generation

  • Columnar memory allocation

  • Iterator initialization

3: Execution Phase

  • Batch data reading

  • Vectorized processing

  • Result assembly and return

7 Performance_schema

In ShannonBase, performance_schema tables're added to store the informmation that Rapid created. Sucha as, the information of loaded tables and fields. rpd_column_id, rpd_columns, rpd_preload_stats, rpd_table_id and rpd_tables are added.

Now taking rpd_column_id as an instance, In storage/perfschema directorr, a file named table_rpd_column_id.cc added, and the corresponding CMakefile modified also. After that, in pfs_engine.table.cc, adding a table share into all_shares.

mysql> show tables like "%rpd%";
+--------------------------------------+
| Tables_in_performance_schema (%rpd%) |
+--------------------------------------+
| rpd_column_id                        |
| rpd_columns                          |
| rpd_preload_stats                    |
| rpd_table_id                         |
| rpd_tables                           |
+--------------------------------------+
5 rows in set (0.04 sec)


mysql> show create table rpd_column_id \G
*************************** 1. row ***************************
       Table: rpd_column_id
Create Table: CREATE TABLE `rpd_column_id` (
  `ID` bigint unsigned NOT NULL,
  `TABLE_ID` bigint unsigned NOT NULL,
  `COLUMN_NAME` char(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL
) ENGINE=PERFORMANCE_SCHEMA DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
1 row in set (0.02 sec)

static PFS_engine_table_share *all_shares[] = {
    &table_cond_instances::m_share,
    &table_error_log::m_share,
    &table_events_waits_current::m_share,
    &table_events_waits_history::m_share,
    &table_events_waits_history_long::m_share,
    ...
    &table_rpl_async_connection_failover_managed::m_share,
    &table_rpd_column_id::m_share,
    &table_rpd_columns::m_share,
    &table_rpd_table_id::m_share,
    &table_rpd_tables::m_share,
    &table_rpd_preload_stats::m_share,

    &table_log_status::m_share,

    &table_prepared_stmt_instances::m_share,
    ...

7.1 Table definition

First of all, it's to define the table definitions.

Plugin_table table_rpd_column_id::m_table_def(
    /* Schema name */
    "performance_schema",
    /* Name */
    "rpd_column_id",
    /* Definition */
    "  ID BIGINT unsigned not null,\n"
    "  TABLE_ID BIGINT unsigned not null,\n"
    "  COLUMN_NAME CHAR(128) collate utf8mb4_bin not null\n",
    /* Options */
    " ENGINE=PERFORMANCE_SCHEMA",
    /* Tablespace */
    nullptr);

and table share is defined as

PFS_engine_table_share table_rpd_column_id::m_share = {
    &pfs_readonly_acl,
    &table_rpd_column_id::create,
    nullptr, /* write_row */
    nullptr, /* delete_all_rows */
    table_rpd_column_id::get_row_count,
    sizeof(pos_t), /* ref length */
    &m_table_lock,
    &m_table_def,
    true, /* perpetual */
    PFS_engine_table_proxy(),
    {0},
    false /* m_in_purgatory */
};

The defintions will be executed at bootstrap stage.

7.2 Impl the adding and query data

int table_rpd_column_id::make_row(uint index[[maybe_unused]]) {
  DBUG_TRACE;
  // Set default values.
  if (index >= ShannonBase::meta_rpd_columns_infos.size()) {
    return HA_ERR_END_OF_FILE;
  } else {
    m_row.column_id = ShannonBase::meta_rpd_columns_infos[index].column_id;
    m_row.table_id = ShannonBase::meta_rpd_columns_infos[index].table_id;

    strncpy(m_row.column_name, ShannonBase::meta_rpd_columns_infos[index].column_name,
            sizeof(m_row.column_name));
    m_row.column_name_length = strlen(ShannonBase::meta_rpd_columns_infos[index].column_name);
  }
  return 0;
}

int table_rpd_column_id::read_row_values(TABLE *table,
                                         unsigned char *buf,
                                         Field **fields,
                                         bool read_all) {
  Field *f;

  //assert(table->s->null_bytes == 0);
  buf[0] = 0;

  for (; (f = *fields); fields++) {
    if (read_all || bitmap_is_set(table->read_set, f->field_index())) {
      switch (f->field_index()) {
        case 0: /** colum_id */
          set_field_ulonglong(f, m_row.column_id);
          break;
        case 1: /** table_id */
          set_field_ulonglong(f, m_row.table_id);
          break;
        case 2: /** column name */
          set_field_char_utf8mb4(f, m_row.column_name, m_row.column_name_length);
          break;
        default:
          assert(false);
      }
    }
  }
  return 0;
}

8 Javascript Stored Procedure

8.1 Overview

javaScript for Shannonbase MySQL is a new feature available. It allows users to write JavaScript stored functions in the server that are executed via JerryScript, which is a lightweight javascript engine. The JavaScript functions can manipulate existing MySQL data irrespective of the underlying storage engine (or rapid engine), i.e., InnoDB, all work transparently.

Users can now re-organize applications and move the data-intensive complex operations closer to their data, this reduces the cloud egress cost and the effort required to maintain data pipelines. In addition, it improves end-to-end application performance and security by eliminating the need for client-server data movement.

8.2 Challenges

Even with rich transaction processing, analytics, and machine learning inside the database, complex and rapidly evolving data-intensive applications still force the user to move large amount of data into the client side of applications. This is done to access the rich procedural programming language eco-system not available inside the database. Enabling the same capability in the database has some practical challenges:

  • Limitation of SQL MySQL allows stored programs in SQL procedural-dialect “Compound Statements” . This allows users to deploy server-side programs but comes with limitations. SQL stored programs are interpreted and do not take advantage of compiler optimizations. Furthermore, the SQL dialects lack basic features compared to modern language runtime, such as user-defined types, containers (arrays, maps), and functional programming constructs. This makes it hard for the user to implement complex logic.

  • Ecosystem of Development Only allowing procedural language runtime inside the database is not sufficient, the challenge is to give developers the freedom to use the development ecosystem that comes with the language runtime. The development ecosystem may include tools such as 3rd party package managers, debuggers, editors, testing frameworks, etc.

  • Data Access API Accessing and manipulating database data is central to procedural language integration. Introducing a new data access API to the developer community introduces a steep learning curve and hinders adoption. Reusing interfaces such as existing5 Business / Technical Brief / JavaScript for HeatWave MySQL Copyright © 2024, Oracle and/or its affiliates / Public database client-server connectors and ORMs does not directly map to the needs of server-centric data processing.

  • Security Adding a new execution engine in the database server means expanding the attack surface for the database against new vulnerabilities. It also means preventing unnecessary access to valuable compute and memory resources by the procedural code that would have been otherwise used by the database.

[all above comes from Oracle, rights to it]

8.3 Usage

The syntax of Javascript stored procedure is same as SQL's. You, also, can use create function to create a new javascript stored procedure, using show create function to show the content of this SP(stored procedure).

 CREATE FUNCTION js_pow(arg1 INT, arg2 INT) 
    RETURNS INT LANGUAGE JAVASCRIPT 
    AS 
    $$
     let x = Math.pow(arg1, arg2);
     return x;
    $$

To show the script of js_pow

mysql> show create function js_pow \G
*************************** 1. row ***************************
            Function: js_pow
            sql_mode: ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION
     Create Function: CREATE DEFINER=`root`@`localhost` FUNCTION `js_pow`(arg1 INT, arg2 INT) RETURNS int
    NO SQL
    LANGUAGE JAVASCRIPT
AS $$
     let x = Math.pow(arg1, arg2);
     return x;
    $$
character_set_client: utf8mb4
collation_connection: utf8mb4_0900_ai_ci
  Database Collation: utf8mb4_general_ci
1 row in set (0.00 sec)

After that javascript stored procedure be created successfully. we, now, can to use this function by using select js_pow.

mysql> select js_pow(4,5);
+-------------+
| js_pow(4,5) |
+-------------+
|        1024 |
+-------------+
1 row in set (0.04 sec)

8.4 Binlog

Create function is one of DDL statement. If we turn on the binlog, the create function statement should be recorded in, and it will be replayed on replica. Now, let's checkout whether the binlog was wrotten down or not.

By running show binlog events command, the results were listed as following, and from the shown results, we drew the conclusion that create function statement generated the binlog.

mysql> show binlog events in 'binlog.000005' \G
*************************** 1. row ***************************
   Log_name: binlog.000005
        Pos: 4
 Event_type: Format_desc
  Server_id: 1
End_log_pos: 126
       Info: Server ver: 8.1.0-debug-asan, Binlog ver: 4
*************************** 2. row ***************************
   Log_name: binlog.000005
        Pos: 126
 Event_type: Previous_gtids
  Server_id: 1
End_log_pos: 157
       Info: 
*************************** 3. row ***************************
   Log_name: binlog.000005
        Pos: 157
 Event_type: Anonymous_Gtid
  Server_id: 1
End_log_pos: 236
       Info: SET @@SESSION.GTID_NEXT= 'ANONYMOUS'
*************************** 4. row ***************************
   Log_name: binlog.000005
        Pos: 236
 Event_type: Query
  Server_id: 1
End_log_pos: 490
       Info: use `test`; CREATE DEFINER=`root`@`localhost` FUNCTION `myfunc`(x INT) RETURNS int
    NO SQL
    LANGUAGE JAVASCRIPT
AS $$
       var x;
       return 2*x;
     $$ /* xid=5 */
*************************** 5. row ***************************
   Log_name: binlog.000005
        Pos: 490
 Event_type: Anonymous_Gtid
  Server_id: 1
End_log_pos: 569
       Info: SET @@SESSION.GTID_NEXT= 'ANONYMOUS'
*************************** 6. row ***************************
   Log_name: binlog.000005
        Pos: 569
 Event_type: Query
  Server_id: 1
End_log_pos: 852
       Info: use `test`; CREATE DEFINER=`root`@`localhost` FUNCTION `js_pow`(arg1 INT, arg2 INT) RETURNS int
    NO SQL
    LANGUAGE JAVASCRIPT
AS $$
     let x = Math.pow(arg1, arg2);
     return x;
    $$ /* xid=15 */
6 rows in set (0.00 sec)

8.5 Internal implementation

In this part, we will give explanation that what works have been added to support this feature. The syntax, first of all, is supported. In sql_yacc.yy and lex.h, a new keyword, called JAVASCRIPT_SYM , was added. image and image

If the syntax is supported, the processing logic can be added. In db_load_routine of sp.xx, a defstrof stored procedure was built up, and it will be compiled bysp_copmile. One thing, we should to know that if we are dealing with no-sql stored procedure, we do not care about the body of stored procedure. That's becuase of the dody string of no-sql will be compiled by extra compiler. Therefore, we built up the defstr` according to languge type.

  switch (is_language_of(sp_chistics->language.str)) {
    case enum_sp_language::SQL: {
      if (!create_string(thd, &defstr, type, nullptr, 0, ssp_name, ssp_name_len,
                        params, strlen(params), returns, strlen(returns), body,
                        strlen(body), sp_chistics, user, host, sql_mode, false)) {
        ret = SP_INTERNAL_ERROR;
        goto end;
      }
    } break;
    case enum_sp_language::JAVASCRIPT: {
      /**Here, we just only need a declaration of a sp, the body we dont care. Because
       * we need the params and its values,the return value field,etc, therefore, we
       * remove the sp body. If it's with sp body, it will failed in sp_compile().*/
      if (!create_string(thd, &declare_str, type, nullptr, 0, ssp_name, ssp_name_len,
                        params, strlen(params), returns, strlen(returns), body,
                        strlen(body), sp_chistics, user, host, sql_mode, false,
                        false)) {
        ret = SP_INTERNAL_ERROR;
        goto end;
      }
    } break;
    default: 
      assert(false);
    break;
  }

To compile the stored procedure.

    switch (is_language_of(sp_chistics->language.str)) {
      case enum_sp_language::SQL:
        *sphp = sp_compile(thd, &defstr, sql_mode, creation_ctx);
        break;
      case enum_sp_language::JAVASCRIPT: {
        *sphp = sp_compile(thd, &declare_str, sql_mode, creation_ctx);
        //reset the code body.
        (*sphp)->code = {body, strlen(body)};
        (*sphp)->m_body_utf8 = {body, strlen(body)};
        (*sphp)->m_body = {body, strlen(body)};
      }break;
      default: assert(false);
    }

After the stored procedure compiled correctly, the logic flow comes to process stage. In sp_head::execute_function, shannonbase start to do real works.

image

SQL stored procedure uses excute, and no-sql uses execute_compiled_sp, and the external uses execute_external_routine to run. In execute_compiled_sp, it compiles the stored procedure and executes it.

bool sp_head::execute_compiled_sp(THD* thd, Item **argp, uint argcount,
                                  Field *return_value_fld) {
  sp_extra_compiler* ext_compiler =
    sp_head::get_instance(thd, sp_compiler_type::LANG_JAVASCRIPT,
                          return_value_fld);
  if (!ext_compiler) return true;

  String code_str("", m_creation_ctx->get_client_cs());
  for (auto index = 0u; index < argcount; index++) {
    auto var = m_root_parsing_ctx->find_variable(index);
    create_string(code_str, var, *(argp + index));
  }
  code_str.append(m_body_utf8.str, strlen(m_body_utf8.str));
  String strstr = sp_extra_compiler::to_javascript(code_str);
  strstr.ltrim();
  strstr.rtrim();
  if (ext_compiler->compile(strstr.c_ptr(), strstr.length())) {
    my_error(ER_DEFINITION_CONTAINS_INVALID_STRING, MYF(0), "stored routine",
             m_db.str, m_name.str, system_charset_info->csname, "parsed failed");
    return true;
  }

  if (ext_compiler->execute()) {
    my_error(ER_DEFINITION_CONTAINS_INVALID_STRING, MYF(0), "stored routine",
             m_db.str, m_name.str, system_charset_info->csname, "execute failed");
    return true;    
  }
  return false;
}

Here, some new classes introduced in, they're used to deal with specific language. For example, javascript, R, ruby, etc.

enum class sp_compiler_type {
  LANG_NONE,
  LANG_JAVASCRIPT,
  LANG_PYTHON,
  LANG_R,
  LANG_RUBY
};

class sp_extra_compiler {
  public:
    sp_extra_compiler(sp_compiler_type type, Field* fld) : m_type(type),
      m_return_fld(fld) {}
    virtual ~sp_extra_compiler() = default;
    //compile the code.
    virtual bool compile(const char* code, size_t code_len) = 0;
    //execute the compiled code.
    virtual bool execute() = 0;
    //gets the result of execution.
    virtual void result() = 0;
    static String to_javascript(String& source);
    sp_compiler_type m_type;
    Field* m_return_fld;
};

class sp_extra_compiler_java : public sp_extra_compiler {
  public:
    sp_extra_compiler_java(Field* fld) :
       sp_extra_compiler(sp_compiler_type::LANG_JAVASCRIPT, fld),
       m_parsed_code{0}, m_ret_val{0}{}
    virtual ~sp_extra_compiler_java();
    bool compile(const char* code, size_t code_len) override;
    bool execute() override;
    void result() override;
  private:
    jerry_value_t m_parsed_code, m_ret_val;
};

A typical call stack of calling a stored procedure.

#0  sp_rcontext::set_return_value (this=0x614000180070, thd=0x628000320100, return_value_item=0x613000ca1a70)
    at /home/workshop/ShannonBase/sql/sp_rcontext.cc:148
#1  0x0000560eafe9cad5 in sp_instr_freturn::exec_core (this=<optimized out>, thd=<optimized out>, nextp=<optimized out>)
    at /home/workshop/ShannonBase/sql/sp_instr.cc:1330
#2  0x0000560eafea1a14 in sp_lex_instr::execute_expression (this=this@entry=0x613000ca1970, thd=thd@entry=0x628000320100, 
    nextp=nextp@entry=0x7fdd6a844000) at /home/workshop/ShannonBase/sql/sp_instr.cc:375
#3  0x0000560eafea20cc in sp_lex_instr::reset_lex_and_exec_core (this=this@entry=0x613000ca1970, thd=thd@entry=0x628000320100, 
    nextp=nextp@entry=0x7fdd6a844000, open_tables=open_tables@entry=true) at /home/workshop/ShannonBase/sql/sp_instr.cc:457
#4  0x0000560eafea2637 in sp_lex_instr::validate_lex_and_execute_core (this=this@entry=0x613000ca1970, 
    thd=thd@entry=0x628000320100, nextp=nextp@entry=0x7fdd6a844000, open_tables=open_tables@entry=true)
    at /home/workshop/ShannonBase/sql/sp_instr.cc:746
#5  0x0000560eafeaa756 in sp_lex_instr::execute (this=0x613000ca1970, thd=0x628000320100, nextp=0x7fdd6a844000)
    at /home/workshop/ShannonBase/sql/sp_instr.h:333
#6  0x0000560eafe84706 in sp_head::execute (this=this@entry=0x61d0005a8cb0, thd=thd@entry=0x628000320100, 
    merge_da_on_success=merge_da_on_success@entry=true) at /home/workshop/ShannonBase/sql/sp_head.cc:2233
#7  0x0000560eafe8678b in sp_head::execute_function (this=<optimized out>, thd=thd@entry=0x628000320100, argp=<optimized out>, 
    argcount=<optimized out>, return_value_fld=<optimized out>) at /home/workshop/ShannonBase/sql/sp_head.cc:2748
#8  0x0000560eb097f612 in Item_func_sp::execute_impl (this=this@entry=0x613000ca2cb0, thd=thd@entry=0x628000320100)
    at /home/workshop/ShannonBase/sql/item_func.cc:8356
#9  0x0000560eb097f8af in Item_func_sp::execute (this=this@entry=0x613000ca2cb0)
    at /home/workshop/ShannonBase/sql/item_func.cc:8297
#10 0x0000560eb097fb75 in Item_func_sp::val_int (this=0x613000ca2cb0) at /home/workshop/ShannonBase/sql/item_func.cc:8211
#11 0x0000560eb0842b50 in Item::send (this=0x613000ca2cb0, protocol=0x60d0012e6810, buffer=<optimized out>)
    at /home/workshop/ShannonBase/sql/item.cc:7490
#12 0x0000560eaff0cb8a in THD::send_result_set_row (this=this@entry=0x628000320100, row_items=...)
    at /home/workshop/ShannonBase/sql/sql_class.cc:2883
#13 0x0000560eb0cf872d in Query_result_send::send_data (this=<optimized out>, thd=<optimized out>, items=...)
    at /home/workshop/ShannonBase/sql/query_result.cc:106
#14 0x0000560eb03078b5 in Query_expression::ExecuteIteratorQuery (this=this@entry=0x6120003109f0, thd=thd@entry=0x628000320100)
    at /home/workshop/ShannonBase/sql/sql_union.cc:1785
#15 0x0000560eb0307c7d in Query_expression::execute (this=this@entry=0x6120003109f0, thd=thd@entry=0x628000320100)
    at /home/workshop/ShannonBase/sql/sql_union.cc:1823
#16 0x0000560eb01a816f in Sql_cmd_dml::execute_inner (this=0x60b000182d50, thd=0x628000320100)
    at /home/workshop/ShannonBase/sql/sql_select.cc:1023
#17 0x0000560eb01c48a3 in Sql_cmd_dml::execute (this=0x60b000182d50, thd=0x628000320100)
    at /home/workshop/ShannonBase/sql/sql_select.cc:794
#18 0x0000560eb00b5495 in mysql_execute_command (thd=thd@entry=0x628000320100, first_level=first_level@entry=true)
    at /home/workshop/ShannonBase/sql/sql_parse.cc:4797
#19 0x0000560eb00b8b01 in dispatch_sql_command (thd=0x628000320100, parser_state=parser_state@entry=0x7fdd6a8497a0)
    at /home/workshop/ShannonBase/sql/sql_parse.cc:5447
#20 0x0000560eb00bb69b in dispatch_command (thd=thd@entry=0x628000320100, com_data=com_data@entry=0x7fdd6a84ada0, 
    command=COM_QUERY) at /home/workshop/ShannonBase/sql/sql_parse.cc:2112
#21 0x0000560eb00bee25 in do_command (thd=thd@entry=0x628000320100) at /home/workshop/ShannonBase/sql/sql_parse.cc:1459
#22 0x0000560eb04a018b in handle_connection (arg=arg@entry=0x603000131b00)
    at /home/workshop/ShannonBase/sql/conn_handler/connection_handler_per_thread.cc:303
#23 0x0000560eb3eba990 in pfs_spawn_thread (arg=0x614000145060)
    at /home/workshop/ShannonBase/storage/perfschema/pfs.cc:3043
#24 0x00007fdda7c94ac3 in start_thread (arg=<optimized out>) at ./nptl/pthread_create.c:442
#25 0x00007fdda7d26850 in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81

9 GenAI

9.1 Embedding

9.1.1 Embedding a Row

The ML_EMBED_ROW routine uses the specified embedding model to encode the specified text or query into a vector embedding. The routine returns a Vector that contains a numerical representation of the specified text.

  • syntax
mysql> SELECT sys.ML_EMBED_ROW('Text'[, options]);

options: JSON_OBJECT(keyvalue[, keyvalue] ...)
keyvalue: 
{
  'model_id', {'ModelID'}
  |'truncate', {true|false}
}

Following are ML_EMBED_ROW parameters:

Text: specifies the text to encode.

options: specifies optional parameters as key-value pairs in JSON format. It can include the following parameters: all-MiniLM-L12-v2(now available)

mysql> SELECT sys.ML_EMBED_ROW("What is artificial intelligence?", JSON_OBJECT("model_id", "all_minilm_l12_v2")) into @text_embedding;
  • implementation We create a new FUNCTION named ML_EMBED_ROW, which definition listed as below:
DROP FUNCTION IF EXISTS ml_embed_row;

DELIMITER $$

CREATE DEFINER='mysql.sys'@'localhost' FUNCTION ml_embed_row (
        in_text TEXT,
        in_model_option JSON
    )
    RETURNS BLOB
    COMMENT '
Description
-----------

The ML_EMBED_ROW routine uses the specified embedding model to encode the specified text or query into a vector embedding. 

Parameters
-----------

in_text (TEXT):
  user-defined session variable storing the ML model handle for the duration of the connection.
in_model_option JSON:
  specifies optional parameters as key-value pairs in JSON format
Example
-----------
mysql> SELECT sys.ML_EMBED_ROW("What is artificial intelligence?", JSON_OBJECT("model_id", "all_minilm_l12_v2")) into @text_embedding;
'
    SQL SECURITY INVOKER
    DETERMINISTIC
    CONTAINS SQL
BEGIN

  DECLARE v_embed_row_res BLOB;

  SELECT ML_MODEL_EMBED_ROW(in_text, in_model_option) INTO v_embed_row_res;
  RETURN v_embed_row_res;
END$$

DELIMITER ;

and A new native function ML_MODEL_EMBED_ROW is also created.

class Item_func_ml_embed_row : public Item_str_func {
 public:
  Item_func_ml_embed_row(const POS &pos, PT_item_list *list)
      : Item_str_func(pos, list) {}
  const char *func_name() const override { return "ML_EMBED_ROW"; }
  bool resolve_type(THD *) override;
  String *val_str(String *str) override;
  enum Item_result result_type() const override { return STRING_RESULT; }
 private:
  String buffer;
};
String *Item_func_ml_embed_row::val_str(String *str) {
  assert(fixed);

  String* res;
  if (!(res = args[0]->val_str(str))) {
    return error_str();
  }
  if (res->is_empty()) return error_str();
  std::string input_text(res->ptr(), res->length());

  std::string model_name;
  Json_wrapper options;
  if (arg_count == 2) {
    if (args[1]->val_json(&options)) return error_str();
  }

  auto embed_row_ptr = std::make_unique<ShannonBase::ML::ML_embedding_row>();
  auto embeded_res = embed_row_ptr->GenerateEmbedding(input_text, options);
  if (!embeded_res.size()) {
    std::string err("embedding text unsuccessfull, pleas check your model");
    my_error(ER_ML_FAIL, MYF(0), err.c_str());
    return error_str();
  }

  uint32 input_dims = embeded_res.size();
  if (input_dims == UINT32_MAX) {
    my_error(ER_TO_VECTOR_CONVERSION, MYF(0), embeded_res.size(), embeded_res.data());
    return error_str();
  }

  uint32 out_length = input_dims * Field_vector::precision;
  if (buffer.mem_realloc(out_length)) return error_str();

  memcpy(buffer.ptr(), embeded_res.data(), out_length);
  buffer.length(out_length);
  return &buffer;
}

9.1.2 Embedding a Table

  • Syntax The ML_EMBED_TABLE routine runs multiple embedding generations in a batch, in parallel.
mysql> CALL sys.ML_EMBED_TABLE('InputTableColumn', 'OutputTableColumn'[, options]);
    
options: JSON_OBJECT(keyvalue[, keyvalue] ...)
keyvalue: 
{
  'model_id', {'ModelID'}
  |'truncate', {true|false}
  |'batch_size', BatchSize
  |'details_column', 'ErrorDetailsColumnName'
}

Following are ML_EMBED_TABLE parameters:

InputTableColumn: specifies the names of the input database, table, and column that contains the text to encode. The InputTableColumn is specified in the following format: DBName.TableName.ColumnName.

The specified input table can be an internal or external table.

The specified input table must already exist, must not be empty, and must have a primary key.

The input column must already exist and must contain text or varchar values.

The input column must not be a part of the primary key and must not have NULL values or empty strings.

There must be no backticks used in the DBName, TableName, or ColumnName and there must be no period used in the DBName or TableName.

OutputTableColumn: specifies the names of the database, table, and column where the generated embeddings are stored. The OutputTableColumn is specified in the following format: DBName.TableName.ColumnName.

The specified output table must be an internal table.

If the specified output table already exists, then it must be the same as the input table. And, the specified output column must not already exist in the input table. A new Vector column is added to the table. External tables are read only. So if input table is an external table, then it cannot be used to store the output.

If the specified output table doesn't exist, then a new table is created. The new output table has key columns which contains the same primary key values as the input table and a VECTOR column that stores the generated embeddings.

There must be no backticks used in the DBName, TableName, or ColumnName and there must be no period used in the DBName or TableName.

options: specifies optional parameters as key-value pairs in JSON format. It can include the following parameters:

model_id: specifies the embedding model to use for encoding the text. Possible values are: all-MiniLM-L12-v2(now available)

truncate: specifies whether to truncate inputs longer than the maximum token size. Default value is true.

batch_size: specifies the batch size for the routine. This parameter is supported for internal tables only. Default value is 1000. Possible values are integer values between 1 and 1000.

details_column: specifies a name for the output table column that is created for adding details of errors encountered for rows that aren't processed successfully by the routine. Ensure that a column by the specified name does not already exist in the table. Default value is details.

  • Implementation A new procedure created, named ML_EMBED_TABLE.
DROP PROCEDURE IF EXISTS ml_embed_table;

DELIMITER $$

CREATE DEFINER='mysql.sys'@'localhost' PROCEDURE ml_embed_table (
    IN in_input_table_column VARCHAR(255),
    IN in_output_table_column VARCHAR(255),
    IN in_options JSON
)
COMMENT '
Description
-----------
The ML_EMBED_TABLE routine uses the specified embedding model to encode text data 
from an input table column into vector embeddings and stores them in an output table column.

Parameters
-----------
in_input_table_column (VARCHAR(255)):
  Specifies the input database, table, and column in format: DBName.TableName.ColumnName
  
in_output_table_column (VARCHAR(255)):
  Specifies the output database, table, and column in format: DBName.TableName.ColumnName

in_options (JSON):
  Specifies optional parameters as key-value pairs in JSON format:
  - model_id: The embedding model to use (required)
  - truncate: Whether to truncate inputs longer than maximum token size (true|false, default: true)
  - batch_size: Number of rows to process in each batch (1-1000, default: 1000)
  - details_column: Name of column to store error details (default: "details")

Example
-----------
mysql> CALL sys.ML_EMBED_TABLE(
    "demo_db.input_table.Input", 
    "demo_db.output_table.Output",
    JSON_OBJECT(
        "model_id", "all_minilm_l12_v2",
        "batch_size", 500,
        "truncate", true
    )
);
'
SQL SECURITY INVOKER
MODIFIES SQL DATA
BEGIN
    DECLARE v_input_db VARCHAR(64);
    DECLARE v_input_table VARCHAR(64);
    DECLARE v_input_column VARCHAR(64);
    DECLARE v_output_db VARCHAR(64);
    DECLARE v_output_table VARCHAR(64);
    DECLARE v_output_column VARCHAR(64);
    DECLARE v_model_id VARCHAR(255);
    DECLARE v_truncate BOOLEAN DEFAULT TRUE;
    DECLARE v_batch_size INT DEFAULT 1000;
    DECLARE v_details_column VARCHAR(64) DEFAULT 'details';
    DECLARE v_input_same_as_output BOOLEAN DEFAULT FALSE;
    DECLARE v_table_exists INT DEFAULT 0;
    DECLARE v_column_exists INT DEFAULT 0;
    DECLARE v_has_primary_key INT DEFAULT 0;
    DECLARE v_is_external_table INT DEFAULT 0;
    DECLARE v_sql TEXT;
    DECLARE v_error_msg TEXT DEFAULT '';
    DECLARE v_total_rows INT DEFAULT 0;
    DECLARE v_processed_rows INT DEFAULT 0;
    DECLARE v_batch_start INT DEFAULT 0;

    DECLARE EXIT HANDLER FOR SQLEXCEPTION
    BEGIN
        GET DIAGNOSTICS CONDITION 1
            v_error_msg = MESSAGE_TEXT;
        RESIGNAL SET MESSAGE_TEXT = v_error_msg;
    END;

    -- Parse input table column (DBName.TableName.ColumnName)
    IF CHAR_LENGTH(in_input_table_column) - CHAR_LENGTH(REPLACE(in_input_table_column, '.', '')) != 2 THEN
        SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Invalid input table column format. Use DBName.TableName.ColumnName';
    END IF;

    SET v_input_db = SUBSTRING_INDEX(in_input_table_column, '.', 1);
    SET v_input_table = SUBSTRING_INDEX(SUBSTRING_INDEX(in_input_table_column, '.', 2), '.', -1);
    SET v_input_column = SUBSTRING_INDEX(in_input_table_column, '.', -1);

    -- Parse output table column (DBName.TableName.ColumnName)
    IF CHAR_LENGTH(in_output_table_column) - CHAR_LENGTH(REPLACE(in_output_table_column, '.', '')) != 2 THEN
        SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Invalid output table column format. Use DBName.TableName.ColumnName';
    END IF;

    SET v_output_db = SUBSTRING_INDEX(in_output_table_column, '.', 1);
    SET v_output_table = SUBSTRING_INDEX(SUBSTRING_INDEX(in_output_table_column, '.', 2), '.', -1);
    SET v_output_column = SUBSTRING_INDEX(in_output_table_column, '.', -1);

    -- Validate no backticks or periods in names
    IF v_input_db REGEXP '`|\\..*\\.' OR v_input_table REGEXP '`|\\..*\\.' OR 
       v_output_db REGEXP '`|\\..*\\.' OR v_output_table REGEXP '`|\\..*\\.' THEN
        SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Database and table names cannot contain backticks or periods';
    END IF;

    -- Extract options from JSON
    IF in_options IS NOT NULL AND JSON_VALID(in_options) THEN
        SET v_model_id = JSON_UNQUOTE(JSON_EXTRACT(in_options, '$.model_id'));
        SET v_truncate = COALESCE(CAST(JSON_UNQUOTE(JSON_EXTRACT(in_options, '$.truncate')) AS UNSIGNED), TRUE);
        SET v_batch_size = COALESCE(CAST(JSON_UNQUOTE(JSON_EXTRACT(in_options, '$.batch_size')) AS UNSIGNED), 1000);
        SET v_details_column = COALESCE(JSON_UNQUOTE(JSON_EXTRACT(in_options, '$.details_column')), 'details');
    END IF;

    -- Validate required parameters
    IF v_model_id IS NULL OR v_model_id = '' THEN
        SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'model_id is required in options';
    END IF;

    -- Validate batch_size range
    IF v_batch_size < 1 OR v_batch_size > 1000 THEN
        SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'batch_size must be between 1 and 1000';
    END IF;

    -- Check if input table exists
    SELECT COUNT(*) INTO v_table_exists
    FROM information_schema.tables 
    WHERE table_schema = v_input_db AND table_name = v_input_table;

    IF v_table_exists = 0 THEN
        SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Input table does not exist';
    END IF;

    -- Check if input table has primary key
    SELECT COUNT(*) INTO v_has_primary_key
    FROM information_schema.table_constraints 
    WHERE table_schema = v_input_db 
    AND table_name = v_input_table 
    AND constraint_type = 'PRIMARY KEY';

    IF v_has_primary_key = 0 THEN
        SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Input table must have a primary key';
    END IF;

    -- Check if input column exists and is valid
    SELECT COUNT(*) INTO v_column_exists
    FROM information_schema.columns 
    WHERE table_schema = v_input_db 
    AND table_name = v_input_table 
    AND column_name = v_input_column
    AND data_type IN ('text', 'varchar', 'char');

    IF v_column_exists = 0 THEN
        SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Input column does not exist or is not a text/varchar type';
    END IF;

    -- Check if input column is part of primary key
    SELECT COUNT(*) INTO v_column_exists
    FROM information_schema.key_column_usage k
    JOIN information_schema.table_constraints t ON k.constraint_name = t.constraint_name
    WHERE k.table_schema = v_input_db 
    AND k.table_name = v_input_table 
    AND k.column_name = v_input_column
    AND t.constraint_type = 'PRIMARY KEY';
    
    IF v_column_exists > 0 THEN
        SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Input column cannot be part of the primary key';
    END IF;

    -- Check if input table is empty
    SET @sql = CONCAT('SELECT COUNT(*) FROM `', v_input_db, '`.`', v_input_table, '`');
    PREPARE stmt FROM @sql;
    EXECUTE stmt;
    DEALLOCATE PREPARE stmt;

    -- Check if input and output tables are the same
    IF v_input_db = v_output_db AND v_input_table = v_output_table THEN
        SET v_input_same_as_output = TRUE;

        -- Check if output column already exists
        SELECT COUNT(*) INTO v_column_exists
        FROM information_schema.columns 
        WHERE table_schema = v_output_db 
        AND table_name = v_output_table 
        AND column_name = v_output_column;

        IF v_column_exists > 0 THEN
            SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Output column already exists in the table';
        END IF;

        -- Add vector column to existing table
        SET v_sql = CONCAT('ALTER TABLE `', v_output_db, '`.`', v_output_table, '` 
                           ADD COLUMN `', v_output_column, '` VECTOR');

        -- Add details column if specified and doesn't exist
        SELECT COUNT(*) INTO v_column_exists
        FROM information_schema.columns 
        WHERE table_schema = v_output_db 
        AND table_name = v_output_table 
        AND column_name = v_details_column;

        IF v_column_exists = 0 THEN
            SET v_sql = CONCAT(v_sql, ', ADD COLUMN `', v_details_column, '` TEXT');
        END IF;

        SET @sql = v_sql;
        PREPARE stmt FROM @sql;
        EXECUTE stmt;
        DEALLOCATE PREPARE stmt;

    ELSE
        -- Create new output table
        -- First check if output table already exists
        SELECT COUNT(*) INTO v_table_exists
        FROM information_schema.tables 
        WHERE table_schema = v_output_db AND table_name = v_output_table;

        IF v_table_exists > 0 THEN
            SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Output table already exists';
        END IF;

        -- Get primary key columns from input table
        SET v_sql = CONCAT('CREATE TABLE `', v_output_db, '`.`', v_output_table, '` AS 
                           SELECT ');

        -- Add primary key columns
        SELECT GROUP_CONCAT(CONCAT('`', column_name, '`') ORDER BY ordinal_position)
        INTO @pk_columns
        FROM information_schema.key_column_usage k
        JOIN information_schema.table_constraints t ON k.constraint_name = t.constraint_name
        WHERE k.table_schema = v_input_db 
        AND k.table_name = v_input_table 
        AND t.constraint_type = 'PRIMARY KEY';

        SET v_sql = CONCAT(v_sql, @pk_columns, ' FROM `', v_input_db, '`.`', v_input_table, '`');

        SET @sql = v_sql;
        PREPARE stmt FROM @sql;
        EXECUTE stmt;
        DEALLOCATE PREPARE stmt;

        -- Add vector column
        SET v_sql = CONCAT('ALTER TABLE `', v_output_db, '`.`', v_output_table, '` 
                           ADD COLUMN `', v_output_column, '` VECTOR,
                           ADD COLUMN `', v_details_column, '` TEXT');

        SET @sql = v_sql;
        PREPARE stmt FROM @sql;
        EXECUTE stmt;
        DEALLOCATE PREPARE stmt;

        -- Create primary key on new table
        SET v_sql = CONCAT('ALTER TABLE `', v_output_db, '`.`', v_output_table, '` 
                           ADD PRIMARY KEY (', @pk_columns, ')');

        SET @sql = v_sql;
        PREPARE stmt FROM @sql;
        EXECUTE stmt;
        DEALLOCATE PREPARE stmt;
    END IF;

    -- Process embeddings in batches using ML_EMBED_ROW
    IF v_input_same_as_output THEN
        -- Update same table with embeddings
        SET v_sql = CONCAT('UPDATE `', v_output_db, '`.`', v_output_table, '` 
                           SET `', v_output_column, '` = sys.ML_EMBED_ROW(`', v_input_column, '`, 
                           JSON_OBJECT("model_id", "', v_model_id, '", "truncate", ', v_truncate, ')),
                           `', v_details_column, '` = 
                           CASE 
                               WHEN `', v_input_column, '` IS NULL OR TRIM(`', v_input_column, '`) = "" 
                               THEN "Empty or null input text"
                               ELSE NULL 
                           END
                           WHERE `', v_input_column, '` IS NOT NULL AND TRIM(`', v_input_column, '`) != ""');
    ELSE
        -- Insert data with embeddings into new table
        SELECT GROUP_CONCAT(CONCAT('s.`', column_name, '`') ORDER BY ordinal_position)
        INTO @pk_columns_prefixed
        FROM information_schema.key_column_usage k
        JOIN information_schema.table_constraints t ON k.constraint_name = t.constraint_name
        WHERE k.table_schema = v_input_db 
        AND k.table_name = v_input_table 
        AND t.constraint_type = 'PRIMARY KEY';

        SET v_sql = CONCAT('INSERT INTO `', v_output_db, '`.`', v_output_table, '` 
                           SELECT ', @pk_columns, ',
                           sys.ML_EMBED_ROW(s.`', v_input_column, '`, 
                           JSON_OBJECT("model_id", "', v_model_id, '", "truncate", ', v_truncate, ')) as `', v_output_column, '`,
                           CASE 
                               WHEN s.`', v_input_column, '` IS NULL OR TRIM(s.`', v_input_column, '`) = "" 
                               THEN "Empty or null input text"
                               ELSE NULL 
                           END as `', v_details_column, '`
                           FROM `', v_input_db, '`.`', v_input_table, '` s
                           WHERE s.`', v_input_column, '` IS NOT NULL AND TRIM(s.`', v_input_column, '`) != ""');
    END IF;

    SET @sql = v_sql;
    PREPARE stmt FROM @sql;
    EXECUTE stmt;
    DEALLOCATE PREPARE stmt;

END$$

DELIMITER ;

10 Run MTRs

MySQL test cases are used to make sure your features dont brake the others correctness.

./mtr --suite=xxx--nowarnings --force --nocheck-testcases --retry=0 [--sanitize] --parallel=5
Clone this wiki locally