diff --git a/src/io/default_compact_strategy.cc b/src/io/default_compact_strategy.cc index 765eb2d6d..ca8aca042 100644 --- a/src/io/default_compact_strategy.cc +++ b/src/io/default_compact_strategy.cc @@ -13,15 +13,16 @@ namespace io { DefaultCompactStrategy::DefaultCompactStrategy(const TableSchema& schema) : m_schema(schema), m_raw_key_operator(GetRawKeyOperatorFromSchema(m_schema)), + m_last_type(leveldb::TKT_FORSEEK), m_cur_type(leveldb::TKT_FORSEEK), m_last_ts(-1), m_del_row_ts(-1), m_del_col_ts(-1), m_del_qual_ts(-1), m_cur_ts(-1), m_del_row_seq(0), m_del_col_seq(0), m_del_qual_seq(0), m_version_num(0), - m_snapshot(leveldb::kMaxSequenceNumber) { + m_snapshot(leveldb::kMaxSequenceNumber), + m_has_put(false), m_lock_ts(kMaxTimeStamp) { // build index for (int32_t i = 0; i < m_schema.column_families_size(); ++i) { const std::string name = m_schema.column_families(i).name(); m_cf_indexs[name] = i; } - m_has_put = false; VLOG(11) << "DefaultCompactStrategy construct"; } @@ -47,6 +48,10 @@ bool DefaultCompactStrategy::Drop(const Slice& tera_key, uint64_t n, return true; } + if (type == leveldb::TKT_LOCK) { + return !m_schema.enable_txn(); + } + m_cur_type = type; m_cur_ts = ts; int32_t cf_id = -1; @@ -254,16 +259,22 @@ bool DefaultCompactStrategy::ScanDrop(const Slice& tera_key, uint64_t n) { return true; } + if (type == leveldb::TKT_LOCK && !m_schema.enable_txn()) { + return true; + } + m_cur_type = type; m_last_ts = m_cur_ts; m_cur_ts = ts; int32_t cf_id = -1; - if (type != leveldb::TKT_DEL && DropIllegalColumnFamily(col.ToString(), &cf_id)) { + if (type != leveldb::TKT_DEL && type != leveldb::TKT_LOCK && + DropIllegalColumnFamily(col.ToString(), &cf_id)) { // drop illegal column family return true; } - if (type >= leveldb::TKT_VALUE && DropByLifeTime(cf_id, ts)) { + if (type >= leveldb::TKT_VALUE && type != leveldb::TKT_LOCK && + DropByLifeTime(cf_id, ts)) { // drop out-of-life-time record return true; } @@ -277,6 +288,12 @@ bool DefaultCompactStrategy::ScanDrop(const Slice& tera_key, uint64_t n) { m_version_num = 0; m_del_row_ts = m_del_col_ts = m_del_qual_ts = -1; m_has_put = false; + m_lock_ts = kMaxTimeStamp; + + if (type == leveldb::TKT_LOCK) { + m_lock_ts = n; + return true; + } // no break in switch: need to set multiple variables switch (type) { @@ -288,6 +305,12 @@ bool DefaultCompactStrategy::ScanDrop(const Slice& tera_key, uint64_t n) { m_del_qual_ts = ts; default:; } + } else if (type == leveldb::TKT_LOCK) { + m_lock_ts = n; + return true; + } else if (ts >= m_lock_ts) { + VLOG(15) << "tera.DefaultCompactStrategy: drop locked data, lock ts: " << m_lock_ts; + return true; } else if (m_del_row_ts >= ts) { // skip deleted row and the same row_del mark return true; diff --git a/src/io/default_compact_strategy.h b/src/io/default_compact_strategy.h index 7b196661f..2e54837dc 100644 --- a/src/io/default_compact_strategy.h +++ b/src/io/default_compact_strategy.h @@ -60,9 +60,9 @@ class DefaultCompactStrategy : public leveldb::CompactStrategy { std::string m_last_key; std::string m_last_col; std::string m_last_qual; - int64_t m_last_ts; leveldb::TeraKeyType m_last_type; leveldb::TeraKeyType m_cur_type; + int64_t m_last_ts; int64_t m_del_row_ts; int64_t m_del_col_ts; int64_t m_del_qual_ts; @@ -73,6 +73,7 @@ class DefaultCompactStrategy : public leveldb::CompactStrategy { uint32_t m_version_num; uint64_t m_snapshot; bool m_has_put; + int64_t m_lock_ts; }; class DefaultCompactStrategyFactory : public leveldb::CompactStrategyFactory { diff --git a/src/io/lock.cc b/src/io/lock.cc new file mode 100644 index 000000000..6f1556f17 --- /dev/null +++ b/src/io/lock.cc @@ -0,0 +1,126 @@ +// Copyright (c) 2016, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "io/lock.h" + +#include "io/io_utils.h" + +namespace tera { +namespace io { + +LockManager::LockManager() { + +} + +LockManager::~LockManager() { + +} + +bool LockManager::Lock(const std::string& key, uint64_t id, + const std::string& annotation, StatusCode* status) { + MutexLock l(&mutex_); + + LockIterator it = lock_map_.find(key); + if (it == lock_map_.end()) { + LockNode& lock_node = lock_map_[key]; + lock_node.id = id; + lock_node.annotation = annotation; + VLOG(10) << "lock key: " << key << " id: " << id + << " annotation: " << annotation; + return true; + } + + LockNode& ln = it->second; + if (ln.id == id) { + SetStatusCode(kLockDoubleLock, status); + } else { + SetStatusCode(kLockNotOwn, status); + } + VLOG(10) << "fail to lock key: " << key << " id: " << id + << " annotation: " << annotation + << " status: " << StatusCodeToString(*status); + return false; +} + +bool LockManager::Unlock(const std::string& key, uint64_t id, + std::string* annotation, StatusCode* status) { + MutexLock l(&mutex_); + + LockIterator it; + if (!GetLockNode(key, id, &it, status)) { + VLOG(10) << "fail to unlock key: " << key << " id: " << id + << " status: " << StatusCodeToString(*status); + return false; + } + + if (annotation != NULL) { + annotation->assign(it->second.annotation); + } + lock_map_.erase(it); + VLOG(10) << "unlock key: " << key << " id: " << id + << " annotation: " << it->second.annotation; + return true; +} + +bool LockManager::IsLocked(const std::string& key, uint64_t id, + std::string* annotation, StatusCode* status) { + MutexLock l(&mutex_); + + LockIterator it; + if (!GetLockNode(key, id, &it, status)) { + return false; + } + + LockNode& ln = it->second; + if (annotation != NULL) { + annotation->assign(ln.annotation); + } + return true; +} + +bool LockManager::IsLocked(const std::string& key, uint64_t* id, + std::string* annotation, StatusCode* status) { + MutexLock l(&mutex_); + + LockIterator it = lock_map_.find(key); + if (it == lock_map_.end()) { + SetStatusCode(kLockNotExist, status); + return false; + } + + LockNode& ln = it->second; + if (id != NULL) { + *id = ln.id; + } + if (annotation != NULL) { + annotation->assign(ln.annotation); + } + return true; +} + +bool LockManager::GetLockNode(const std::string& key, uint64_t id, + LockIterator* ret_it, StatusCode* status) { + mutex_.AssertHeld(); + + LockIterator it = lock_map_.find(key); + if (it == lock_map_.end()) { + VLOG(10) << "lock not exist, key: " << key << " id: " << id; + SetStatusCode(kLockNotExist, status); + return false; + } + + LockNode& ln = it->second; + if (ln.id != id) { + VLOG(10) << "lock not own, key: " << key << " id: " << id + << ", real id: " << ln.id << ", annotation: " << ln.annotation; + SetStatusCode(kLockNotOwn, status); + return false; + } + + *ret_it = it; + return true; +} + +} // namespace tabletnode +} // namespace tera diff --git a/src/io/lock.h b/src/io/lock.h new file mode 100644 index 000000000..e20b95b88 --- /dev/null +++ b/src/io/lock.h @@ -0,0 +1,54 @@ +// Copyright (c) 2016, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef TERA_IO_LOCK_MANAGER_H_ +#define TERA_IO_LOCK_MANAGER_H_ + +#include "common/mutex.h" + +#include "proto/status_code.pb.h" + +namespace tera { +namespace io { + +class LockManager { + struct LockNode { + uint64_t id; + std::string annotation; + }; +public: + LockManager(); + ~LockManager(); + + // key is a unique name of the item you want to lock + // id is used to authenticate + // annotation is a hint for others to do cleanup work after the lock owner dead + + bool Lock(const std::string& key, uint64_t id, + const std::string& annotation = "", StatusCode* status = NULL); + + bool Unlock(const std::string& key, uint64_t id, + std::string* annotation = NULL, StatusCode* status = NULL); + + bool IsLocked(const std::string& key, uint64_t id, + std::string* annotation = NULL, StatusCode* status = NULL); + + bool IsLocked(const std::string& key, uint64_t* id = NULL, + std::string* annotation = NULL, StatusCode* status = NULL); + +private: + typedef std::map LockMap; + typedef LockMap::iterator LockIterator; + bool GetLockNode(const std::string& key, uint64_t id, LockIterator* ret_it, + StatusCode* status = NULL); + +private: + Mutex mutex_; + LockMap lock_map_; +}; + +} // namespace io +} // namespace tera + +#endif // TERA_IO_LOCK_MANAGER_H_ diff --git a/src/io/tablet_io.cc b/src/io/tablet_io.cc index 40027aab0..aebf73e9b 100644 --- a/src/io/tablet_io.cc +++ b/src/io/tablet_io.cc @@ -24,6 +24,7 @@ #include "leveldb/env_flash.h" #include "leveldb/env_inmem.h" #include "leveldb/filter_policy.h" +#include "leveldb/util/coding.h" #include "types.h" #include "utils/counter.h" #include "utils/scan_filter.h" @@ -78,7 +79,8 @@ TabletIO::TabletIO(const std::string& key_start, const std::string& key_end) m_ref_count(1), m_db_ref_count(0), m_db(NULL), m_mem_store_activated(false), m_kv_only(false), - m_key_operator(NULL) { + m_key_operator(NULL), + m_meta_lg_id(-1) { } TabletIO::~TabletIO() { @@ -182,6 +184,10 @@ bool TabletIO::Load(const TableSchema& schema, } } + if (m_kv_only && m_table_schema.enable_txn()) { + m_table_schema.set_enable_txn(false); + } + m_key_operator = GetRawKeyOperatorFromSchema(m_table_schema); // [m_raw_start_key, m_raw_end_key) m_raw_start_key = m_start_key; @@ -278,6 +284,20 @@ bool TabletIO::Load(const TableSchema& schema, return false; } + if (m_table_schema.enable_txn()) { + LOG(INFO) << "[Load] Start load locks " << m_tablet_path; + if (!LoadLock(status)) { + delete m_db; + m_db = NULL; + { + MutexLock lock(&m_mutex); + m_status = kNotInit; + m_db_ref_count--; + } + return false; + } + } + m_async_writer = new TabletWriter(this); m_async_writer->Start(); @@ -532,11 +552,11 @@ bool TabletIO::AddInheritedLiveFiles(std::vector >* live) { m_db_ref_count++; } { - MutexLock lock(&m_schema_mutex); + uint32_t lg_num = m_ldb_options.exist_lg_list->size(); if (live->size() == 0) { - live->resize(m_table_schema.locality_groups_size()); + live->resize(lg_num); } else { - CHECK(live->size() == static_cast(m_table_schema.locality_groups_size())); + CHECK(live->size() == lg_num); } } m_db->AddInheritedLiveFiles(live); @@ -884,13 +904,28 @@ inline bool TabletIO::LowLevelScan(const std::string& start_tera_key, const std::set& cf_set = scan_options.iter_cf_set; if (cf_set.size() > 0 && cf_set.find(col.ToString()) == cf_set.end() && - type != leveldb::TKT_DEL) { + type != leveldb::TKT_DEL && + type != leveldb::TKT_LOCK) { // donot need this column, skip row deleting tag it->Next(); continue; } - if (compact_strategy->ScanDrop(it->key(), 0)) { + uint64_t n = 0; + if (type == leveldb::TKT_LOCK) { + int64_t lock_ts = 0; + if (!DecodeLock(value.ToString(), NULL, &lock_ts, NULL)) { + LOG(WARNING) << "invalid lock format, key: " << DebugString(tera_key.ToString()) + << " value: " << DebugString(value.ToString()); + it->Next(); + continue; + } + n = lock_ts; + VLOG(10) << "lock key: " << DebugString(tera_key.ToString()) + << " ts: " << lock_ts; + } + + if (compact_strategy->ScanDrop(it->key(), n)) { // skip drop record it->Next(); continue; @@ -1701,6 +1736,12 @@ void TabletIO::SetupOptionsForLG() { exist_lg_list->insert(lg_i); (*lg_info_list)[lg_i] = lg_info; } + if (m_table_schema.enable_txn()) { + // m_meta_lg_id = kMetaLgId; + m_meta_lg_id = m_table_schema.locality_groups_size(); + exist_lg_list->insert(m_meta_lg_id); + (*lg_info_list)[m_meta_lg_id] = new leveldb::LG_info(m_meta_lg_id, LeveldbMemEnv()); + } if (exist_lg_list->size() == 0) { delete exist_lg_list; @@ -1775,6 +1816,9 @@ void TabletIO::SetupIteratorOptions(const ScanOptions& scan_options, } if (target_lgs.size() > 0) { leveldb_opts->target_lgs = new std::set(target_lgs); + } else if (m_table_schema.enable_txn()) { + leveldb_opts->target_lgs = new std::set(*m_ldb_options.exist_lg_list); + leveldb_opts->target_lgs->erase(m_meta_lg_id); } } @@ -2002,5 +2046,105 @@ void TabletIO::ApplySchema(const TableSchema& schema) { m_ldb_options.compact_strategy_factory->SetArg(&schema); } +uint32_t TabletIO::GetLGNum() { + uint32_t lg_num = m_ldb_options.exist_lg_list->size(); + if (m_table_schema.enable_txn()) { + lg_num--; + } + return lg_num; +} + +bool TabletIO::LoadLock(StatusCode* status) { + // init iterator + leveldb::ReadOptions read_option(&m_ldb_options); + read_option.verify_checksums = FLAGS_tera_leveldb_verify_checksums; + read_option.fill_cache = false; + read_option.target_lgs = new std::set; + read_option.target_lgs->insert(m_meta_lg_id); + leveldb::Iterator* it = m_db->NewIterator(read_option); + delete read_option.target_lgs; + + // scan lock + for (it->SeekToFirst(); it->Valid(); it->Next()) { + leveldb::Slice tera_key = it->key(); + leveldb::Slice value = it->value(); + leveldb::Slice key, col, qual; + int64_t ts = 0; + leveldb::TeraKeyType type; + if (!m_key_operator->ExtractTeraKey(tera_key, &key, &col, &qual, &ts, &type)) { + LOG(WARNING) << "scan lock: invalid tera key: " + << DebugString(tera_key.ToString()); + continue; + } + VLOG(10) << "scan lock: " << "tablet=[" << m_tablet_path + << "] key=[" << DebugString(key.ToString()) + << "] column=[" << DebugString(col.ToString()) + << ":" << DebugString(qual.ToString()) + << "] ts=[" << ts << "] type=[" << type << "]"; + if (!col.empty() || !qual.empty() || ts != kMaxTimeStamp || + type != leveldb::TKT_LOCK) { + LOG(WARNING) << "scan lock: invalid key format: " + << DebugString(tera_key.ToString()); + continue; + } + int64_t lock_ts = 0; + uint64_t lock_id = 0; + std::string lock_annotation; + if (!DecodeLock(value.ToString(), &lock_id, &lock_ts, &lock_annotation)) { + LOG(WARNING) << "scan lock: invalid lock format, key: " + << DebugString(it->value().ToString()) + << " value: " << DebugString(it->value().ToString()); + continue; + } + VLOG(10) << "load lock: " << key.ToString() << ", id: " << lock_id + << ", ts: " << lock_ts << ", annotation: " << lock_annotation; + CHECK(m_lock_manager.Lock(key.ToString(), lock_id, lock_annotation)); + } + + leveldb::Status it_status = it->status(); + delete it; + + if (!it_status.ok()) { + SetStatusCode(it_status, status); + VLOG(10) << "scan lock fail: " << "tablet=[" << m_tablet_path << + "] status=[" << it_status.ToString(); + return false; + } + + return true; +} + +void TabletIO::EncodeLock(uint64_t lock_id, int64_t lock_ts, + const std::string& lock_annotation, + std::string* dst) { + dst->clear(); + leveldb::PutVarint64(dst, lock_id); + leveldb::PutVarint64(dst, lock_ts); + leveldb::PutLengthPrefixedSlice(dst, lock_annotation); +} + +bool TabletIO::DecodeLock(const std::string& src, uint64_t* lock_id, + int64_t* lock_ts, std::string* lock_annotation) { + leveldb::Slice input(src); + uint64_t id = 0; + uint64_t ts = 0; + leveldb::Slice annotation; + if (!leveldb::GetVarint64(&input, &id) || + !leveldb::GetVarint64(&input, &ts) || + !leveldb::GetLengthPrefixedSlice(&input, &annotation)) { + return false; + } + if (lock_id != NULL) { + *lock_id = id; + } + if (lock_ts != NULL) { + *lock_ts = ts; + } + if (lock_annotation != NULL) { + lock_annotation->assign(annotation.ToString()); + } + return true; +} + } // namespace io } // namespace tera diff --git a/src/io/tablet_io.h b/src/io/tablet_io.h index 1b0c416a1..3ba7a2385 100644 --- a/src/io/tablet_io.h +++ b/src/io/tablet_io.h @@ -16,6 +16,7 @@ #include "common/base/scoped_ptr.h" #include "common/mutex.h" +#include "io/lock.h" #include "io/tablet_scanner.h" #include "leveldb/db.h" #include "leveldb/options.h" @@ -33,6 +34,8 @@ namespace tera { namespace io { +//const uint32_t kMetaLgId = kMaxLocalityGroupNum; + class TabletWriter; struct ScanOptions; struct ScanContext; @@ -166,9 +169,6 @@ class TabletIO { private: friend class TabletWriter; friend class ScanConextManager; - bool WriteWithoutLock(const std::string& key, const std::string& value, - bool sync = false, StatusCode* status = NULL); -// int64_t GetDataSizeWithoutLock(StatusCode* status = NULL); void SetupOptionsForLG(); void TearDownOptionsForLG(); @@ -231,6 +231,13 @@ class TabletIO { leveldb::Iterator* it, KeyValuePair* next); void SetSchema(const TableSchema& schema); + uint32_t GetLGNum(); + bool LoadLock(StatusCode* status); + void EncodeLock(uint64_t lock_id, int64_t lock_ts, const std::string& lock_annotation, + std::string* dst); + bool DecodeLock(const std::string& src, uint64_t* lock_id = NULL, + int64_t* lock_ts = NULL, std::string* lock_annotation = NULL); + private: mutable Mutex m_mutex; TabletWriter* m_async_writer; @@ -260,6 +267,9 @@ class TabletIO { std::map m_lg_id_map; StatCounter m_counter; mutable Mutex m_schema_mutex; + + LockManager m_lock_manager; + int32_t m_meta_lg_id; }; } // namespace io diff --git a/src/io/tablet_writer.cc b/src/io/tablet_writer.cc index 28517529a..87fc91ab4 100644 --- a/src/io/tablet_writer.cc +++ b/src/io/tablet_writer.cc @@ -13,6 +13,7 @@ #include "io/io_utils.h" #include "io/tablet_io.h" #include "leveldb/lg_coding.h" +#include "lock.h" #include "proto/proto_helper.h" #include "utils/counter.h" #include "utils/timer.h" @@ -28,7 +29,9 @@ namespace tera { namespace io { TabletWriter::TabletWriter(TabletIO* tablet_io) - : m_tablet(tablet_io), m_stopped(true), + : m_tablet(tablet_io), + m_lock_manager(&tablet_io->m_lock_manager), + m_stopped(true), m_sync_timestamp(0), m_active_buffer_instant(false), m_active_buffer_size(0), @@ -190,110 +193,221 @@ bool TabletWriter::SwapActiveBuffer(bool force) { return true; } -bool TabletWriter::BatchRequest(const std::vector& row_mutation_vec, - leveldb::WriteBatch* batch, bool kv_only) { - int64_t timestamp_old = 0; - +void TabletWriter::BatchRequest(WriteTask& task, leveldb::WriteBatch* batch) { + std::vector& row_mutation_vec = *task.row_mutation_vec; + std::vector& status_vec = *task.status_vec; for (uint32_t i = 0; i < row_mutation_vec.size(); ++i) { const RowMutationSequence& row_mu = *row_mutation_vec[i]; - const std::string& row_key = row_mu.row_key(); - int32_t mu_num = row_mu.mutation_sequence().size(); - if (kv_only) { - // only the last mutation take effect for kv - const Mutation& mu = row_mu.mutation_sequence().Get(mu_num - 1); - std::string tera_key; - if (m_tablet->GetSchema().raw_key() == TTLKv) { // TTL-KV - if (mu.ttl() == -1) { // never expires - m_tablet->GetRawKeyOperator()->EncodeTeraKey(row_key, "", "", - kLatestTs, leveldb::TKT_FORSEEK, &tera_key); - } else { // no check of overflow risk ... - m_tablet->GetRawKeyOperator()->EncodeTeraKey(row_key, "", "", - get_micros() / 1000000 + mu.ttl(), leveldb::TKT_FORSEEK, &tera_key); - } - } else { // Readable-KV - tera_key.assign(row_key); - } - if (mu.type() == kPut) { - batch->Put(tera_key, mu.value()); - } else { - batch->Delete(tera_key); - } + StatusCode* status = &status_vec[i]; + *status = kTableOk; + if (m_tablet->KvOnly()) { + BatchKvRequest(row_mu, batch); } else { - for (int32_t t = 0; t < mu_num; ++t) { - const Mutation& mu = row_mu.mutation_sequence().Get(t); - std::string tera_key; - leveldb::TeraKeyType type = leveldb::TKT_VALUE; - switch (mu.type()) { - case kDeleteRow: - type = leveldb::TKT_DEL; - break; - case kDeleteFamily: - type = leveldb::TKT_DEL_COLUMN; - break; - case kDeleteColumn: - type = leveldb::TKT_DEL_QUALIFIER; - break; - case kDeleteColumns: - type = leveldb::TKT_DEL_QUALIFIERS; - break; - case kAdd: - type = leveldb::TKT_ADD; - break; - case kAddInt64: - type = leveldb::TKT_ADDINT64; - break; - case kPutIfAbsent: - type = leveldb::TKT_PUT_IFABSENT; - break; - case kAppend: - type = leveldb::TKT_APPEND; - break; - default: - break; - } - int64_t timestamp = get_unique_micros(timestamp_old); - timestamp_old = timestamp; - if (mu.has_timestamp() && mu.timestamp() < timestamp) { - timestamp = mu.timestamp(); - } - m_tablet->GetRawKeyOperator()->EncodeTeraKey(row_key, mu.family(), mu.qualifier(), - timestamp, type, &tera_key); - uint32_t lg_id = 0; - size_t lg_num = m_tablet->m_ldb_options.exist_lg_list->size(); - if (lg_num > 1) { - if (type != leveldb::TKT_DEL) { - lg_id = m_tablet->GetLGidByCFName(mu.family()); - leveldb::PutFixed32LGId(&tera_key, lg_id); - VLOG(10) << "Batch Request, key:" << row_key << ",family:" - << mu.family() << ",lg_id:" << lg_id; - batch->Put(tera_key, mu.value()); - } else { - // put row_del mark to all LGs - for (lg_id = 0; lg_id < lg_num; ++lg_id) { - std::string tera_key_tmp = tera_key; - leveldb::PutFixed32LGId(&tera_key_tmp, lg_id); - VLOG(10) << "Batch Request, key:" << row_key << ",family:" - << mu.family() << ",lg_id:" << lg_id; - batch->Put(tera_key_tmp, mu.value()); - } - } - } else { - VLOG(10) << "Batch Request, key:" << row_key << ",family:" - << mu.family() << ",lg_id:" << lg_id; - batch->Put(tera_key, mu.value()); - } + BatchTableRequest(row_mu, batch, status); + } + } +} + +void TabletWriter::BatchKvRequest(const RowMutationSequence& row_mu, + leveldb::WriteBatch* batch) { + const std::string& row_key = row_mu.row_key(); + int32_t mu_num = row_mu.mutation_sequence().size(); + // only the last mutation take effect for kv + const Mutation& mu = row_mu.mutation_sequence().Get(mu_num - 1); + std::string tera_key; + if (m_tablet->GetSchema().raw_key() == TTLKv) { // TTL-KV + if (mu.ttl() == -1) { // never expires + m_tablet->GetRawKeyOperator()->EncodeTeraKey(row_key, "", "", + kLatestTs, leveldb::TKT_FORSEEK, &tera_key); + } else { // no check of overflow risk ... + m_tablet->GetRawKeyOperator()->EncodeTeraKey(row_key, "", "", + get_micros() / 1000000 + mu.ttl(), leveldb::TKT_FORSEEK, &tera_key); + } + } else { // Readable-KV + tera_key.assign(row_key); + } + if (mu.type() == kPut) { + batch->Put(tera_key, mu.value()); + } else { + batch->Delete(tera_key); + } +} + +bool TabletWriter::BatchTableRequest(const RowMutationSequence& row_mu, + leveldb::WriteBatch* batch, + StatusCode* status) { + int64_t timestamp_old = 0; + const std::string& row_key = row_mu.row_key(); + if (m_tablet->GetSchema().enable_txn()) { + if (!HandleLockBeforeWrite(row_mu, batch, status)) { + return false; + } + } + + int32_t mu_num = row_mu.mutation_sequence().size(); + for (int32_t t = 0; t < mu_num; ++t) { + const Mutation& mu = row_mu.mutation_sequence().Get(t); + std::string tera_key; + leveldb::TeraKeyType type = leveldb::TKT_VALUE; + switch (mu.type()) { + case kDeleteRow: + type = leveldb::TKT_DEL; + break; + case kDeleteFamily: + type = leveldb::TKT_DEL_COLUMN; + break; + case kDeleteColumn: + type = leveldb::TKT_DEL_QUALIFIER; + break; + case kDeleteColumns: + type = leveldb::TKT_DEL_QUALIFIERS; + break; + case kAdd: + type = leveldb::TKT_ADD; + break; + case kAddInt64: + type = leveldb::TKT_ADDINT64; + break; + case kPutIfAbsent: + type = leveldb::TKT_PUT_IFABSENT; + break; + case kAppend: + type = leveldb::TKT_APPEND; + break; + default: + break; + } + int64_t timestamp = get_unique_micros(timestamp_old); + timestamp_old = timestamp; + if (mu.has_timestamp() && mu.timestamp() < timestamp) { + timestamp = mu.timestamp(); + } + m_tablet->GetRawKeyOperator()->EncodeTeraKey(row_key, mu.family(), mu.qualifier(), + timestamp, type, &tera_key); + uint32_t lg_id = 0; + size_t lg_num = m_tablet->GetLGNum(); + if (type != leveldb::TKT_DEL) { + lg_id = m_tablet->GetLGidByCFName(mu.family()); + leveldb::PutFixed32LGId(&tera_key, lg_id); + VLOG(10) << "Batch Request, key:" << row_key << ",family:" + << mu.family() << ",lg_id:" << lg_id; + batch->Put(tera_key, mu.value()); + } else { + // put row_del mark to all LGs + for (lg_id = 0; lg_id < lg_num; ++lg_id) { + std::string tera_key_tmp = tera_key; + leveldb::PutFixed32LGId(&tera_key_tmp, lg_id); + VLOG(10) << "Batch Request, key:" << row_key << ",family:" + << mu.family() << ",lg_id:" << lg_id; + batch->Put(tera_key_tmp, mu.value()); } } } return true; } +bool TabletWriter::HandleLockBeforeWrite(const RowMutationSequence& mu_seq, + leveldb::WriteBatch* batch, + StatusCode* status) { + const std::string& row_key = mu_seq.row_key(); + + if (mu_seq.lock_before_write()) { + if (!m_lock_manager->Lock(row_key, mu_seq.lock_id(), + mu_seq.lock_annotation(), status)) { + return false; + } + BatchLock(row_key, get_micros(), mu_seq.lock_id(), + mu_seq.lock_annotation(), batch); + return true; + } + + if (mu_seq.unlock_after_write()) { + if (!m_lock_manager->IsLocked(row_key, mu_seq.lock_id(), NULL, status)) { + return false; + } + BatchUnlock(row_key, batch); + return true; + } + + StatusCode lock_status = kTableOk; + if (m_lock_manager->IsLocked(row_key, mu_seq.lock_id(), NULL, &lock_status)) { + return true; + } else if (lock_status == kLockNotOwn || mu_seq.insure_locked_already()) { + SetStatusCode(lock_status, status); + return false; + } else { + CHECK_EQ(lock_status, kLockNotExist); + return true; + } +} + +void TabletWriter::HandleLockAfterWrite(const RowMutationSequence& mu_seq, + const StatusCode& write_status) { + if (mu_seq.unlock_after_write()) { + if (write_status != kTableOk) { + LOG(WARNING) << "fail to write lock-delete mark: " + << StatusCodeToString(write_status) + << ", abort unload, key: " << mu_seq.row_key(); + } else { + CHECK(m_lock_manager->Unlock(mu_seq.row_key(), mu_seq.lock_id(), NULL, NULL)); + } + } +} + +void TabletWriter::BatchLock(const std::string& row_key, int64_t timestamp, + uint64_t lock_id, const std::string& annotation, + leveldb::WriteBatch* batch) { + std::string lock_key; + m_tablet->GetRawKeyOperator()->EncodeTeraKey(row_key, "", "", kMaxTimeStamp, + leveldb::TKT_LOCK, &lock_key); + std::string lock_value; + m_tablet->EncodeLock(lock_id, timestamp, annotation, &lock_value); + + // put lock to all LGs + size_t lg_num = m_tablet->GetLGNum(); + for (size_t lg_id = 0; lg_id < lg_num; ++lg_id) { + std::string tera_key_tmp = lock_key; + leveldb::PutFixed32LGId(&tera_key_tmp, lg_id); + VLOG(10) << "Batch lock, key:" << row_key << ",lg_id:" << lg_id; + batch->Put(tera_key_tmp, lock_value); + } + std::string tera_key_tmp = lock_key; + leveldb::PutFixed32LGId(&tera_key_tmp, m_tablet->m_meta_lg_id); + VLOG(10) << "Batch lock, key:" << row_key << ",lg_id:" << m_tablet->m_meta_lg_id; + batch->Put(tera_key_tmp, lock_value); +} + +void TabletWriter::BatchUnlock(const std::string& row_key, leveldb::WriteBatch* batch) { + std::string lock_key; + m_tablet->GetRawKeyOperator()->EncodeTeraKey(row_key, "", "", kMaxTimeStamp, + leveldb::TKT_LOCK, &lock_key); + // put lock-delete mark to all LGs + size_t lg_num = m_tablet->GetLGNum(); + for (size_t lg_id = 0; lg_id < lg_num; ++lg_id) { + std::string tera_key_tmp = lock_key; + leveldb::PutFixed32LGId(&tera_key_tmp, lg_id); + VLOG(10) << "Batch unlock, key:" << row_key << ",lg_id:" << lg_id; + batch->Delete(tera_key_tmp); + } + std::string tera_key_tmp = lock_key; + leveldb::PutFixed32LGId(&tera_key_tmp, m_tablet->m_meta_lg_id); + VLOG(10) << "Batch unlock, key:" << row_key << ",lg_id:" << m_tablet->m_meta_lg_id; + batch->Delete(tera_key_tmp); +} + void TabletWriter::FinishTask(const WriteTask& task, StatusCode status) { int32_t row_num = task.row_mutation_vec->size(); m_tablet->GetCounter().write_rows.Add(row_num); for (int32_t i = 0; i < row_num; i++) { - m_tablet->GetCounter().write_kvs.Add((*task.row_mutation_vec)[i]->mutation_sequence_size()); - (*task.status_vec)[i] = status; + const RowMutationSequence& row_mu = *(*task.row_mutation_vec)[i]; + StatusCode& row_status = (*task.status_vec)[i]; + if (row_status == kTableOk) { + row_status = status; + } + if (!m_tablet->KvOnly() && m_tablet->GetSchema().enable_txn()) { + HandleLockAfterWrite(row_mu, row_status); + } + m_tablet->GetCounter().write_kvs.Add(row_mu.mutation_sequence_size()); } task.callback(task.row_mutation_vec, task.status_vec); } @@ -303,8 +417,7 @@ StatusCode TabletWriter::FlushToDiskBatch(WriteTaskBuffer* task_buffer) { leveldb::WriteBatch batch; for (size_t i = 0; i < task_num; ++i) { - const WriteTask& task = (*task_buffer)[i]; - BatchRequest(*task.row_mutation_vec, &batch, m_tablet->KvOnly()); + BatchRequest((*task_buffer)[i], &batch); } StatusCode status = kTableOk; diff --git a/src/io/tablet_writer.h b/src/io/tablet_writer.h index ed26ab04d..801357350 100644 --- a/src/io/tablet_writer.h +++ b/src/io/tablet_writer.h @@ -20,6 +20,7 @@ namespace tera { namespace io { class TabletIO; +class LockManager; class TabletWriter { public: @@ -43,24 +44,35 @@ class TabletWriter { /// 初略计算一个request的数据大小 static uint64_t CountRequestSize(std::vector& row_mutation_vec, bool kv_only); - /// 把一个request打到一个leveldbbatch里去, request是原子的, batch也是, so .. - bool BatchRequest(const std::vector& row_mutation_vec, - leveldb::WriteBatch* batch, - bool kv_only); void Start(); void Stop(); private: void DoWork(); bool SwapActiveBuffer(bool force); + /// 把一个request打到一个leveldbbatch里去, request是原子的, batch也是, so .. + void BatchRequest(WriteTask& task, leveldb::WriteBatch* batch); + void BatchKvRequest(const RowMutationSequence& row_mu, leveldb::WriteBatch* batch); + bool BatchTableRequest(const RowMutationSequence& row_mu, leveldb::WriteBatch* batch, + StatusCode* status = NULL); /// 任务完成, 执行回调 void FinishTaskBatch(WriteTaskBuffer* task_buffer, StatusCode status); void FinishTask(const WriteTask& task, StatusCode status); /// 将buffer刷到磁盘(leveldb), 并sync StatusCode FlushToDiskBatch(WriteTaskBuffer* task_buffer); +; + bool HandleLockBeforeWrite(const RowMutationSequence& mu_seq, + leveldb::WriteBatch* batch, + StatusCode* status = NULL); + void HandleLockAfterWrite(const RowMutationSequence& mu_seq, + const StatusCode& write_status); + void BatchLock(const std::string& row_key, int64_t timestamp, uint64_t lock_id, + const std::string& annotation, leveldb::WriteBatch* batch); + void BatchUnlock(const std::string& row_key, leveldb::WriteBatch* batch); private: TabletIO* m_tablet; + LockManager* m_lock_manager; mutable Mutex m_task_mutex; mutable Mutex m_status_mutex; diff --git a/src/leveldb/db/db_table.cc b/src/leveldb/db/db_table.cc index 4eb16b52e..bb5595c0e 100644 --- a/src/leveldb/db/db_table.cc +++ b/src/leveldb/db/db_table.cc @@ -479,26 +479,19 @@ Status DBTable::Write(const WriteOptions& options, WriteBatch* my_batch) { mutex_.Lock(); } if (s.ok()) { - std::vector lg_updates; - lg_updates.resize(lg_list_.size()); - std::fill(lg_updates.begin(), lg_updates.end(), (WriteBatch*)0); - bool created_new_wb = false; + std::vector lg_updates(lg_list_.size()); + updates->SeperateLocalityGroup(&lg_updates); + // kv version may not create snapshot for (uint32_t i = 0; i < lg_list_.size(); ++i) { lg_list_[i]->GetSnapshot(last_sequence_); } commit_snapshot_ = last_sequence_; - if (lg_list_.size() > 1) { - updates->SeperateLocalityGroup(&lg_updates); - created_new_wb = true; - } else { - lg_updates[0] = updates; - } + mutex_.Unlock(); //TODO: should be multi-thread distributed for (uint32_t i = 0; i < lg_updates.size(); ++i) { - assert(lg_updates[i] != NULL); - Status lg_s = lg_list_[i]->Write(WriteOptions(), lg_updates[i]); + Status lg_s = lg_list_[i]->Write(WriteOptions(), &lg_updates[i]); if (!lg_s.ok()) { // 这种情况下内存处于不一致状态 Log(options_.info_log, "[%s] [Fatal] Write to lg%u fail", @@ -521,15 +514,6 @@ Status DBTable::Write(const WriteOptions& options, WriteBatch* my_batch) { } commit_snapshot_ = last_sequence_ + WriteBatchInternal::Count(updates); } - - if (created_new_wb) { - for (uint32_t i = 0; i < lg_updates.size(); ++i) { - if (lg_updates[i] != NULL) { - delete lg_updates[i]; - lg_updates[i] = NULL; - } - } - } } // Update last_sequence @@ -868,35 +852,24 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit, last_sequence_ = last_seq; } - std::vector lg_updates; - lg_updates.resize(lg_list_.size()); - std::fill(lg_updates.begin(), lg_updates.end(), (WriteBatch*)0); - bool created_new_wb = false; - if (lg_list_.size() > 1) { - status = batch.SeperateLocalityGroup(&lg_updates); - created_new_wb = true; - if (!status.ok()) { - return status; - } - } else { - lg_updates[0] = (&batch); + std::vector lg_updates(lg_list_.size()); + status = batch.SeperateLocalityGroup(&lg_updates); + if (!status.ok()) { + return status; } if (status.ok()) { //TODO: should be multi-thread distributed for (uint32_t i = 0; i < lg_updates.size(); ++i) { - if (lg_updates[i] == NULL) { - continue; - } if (last_seq <= lg_list_[i]->GetLastSequence()) { continue; } - uint64_t first = WriteBatchInternal::Sequence(lg_updates[i]); - uint64_t last = first + WriteBatchInternal::Count(lg_updates[i]) - 1; + uint64_t first = WriteBatchInternal::Sequence(&lg_updates[i]); + uint64_t last = first + WriteBatchInternal::Count(&lg_updates[i]) - 1; // Log(options_.info_log, "[%s] recover log batch first= %lu, last= %lu\n", // dbname_.c_str(), first, last); - Status lg_s = lg_list_[i]->RecoverInsertMem(lg_updates[i], (*edit_list)[i]); + Status lg_s = lg_list_[i]->RecoverInsertMem(&lg_updates[i], (*edit_list)[i]); if (!lg_s.ok()) { Log(options_.info_log, "[%s] recover log fail batch first= %lu, last= %lu\n", dbname_.c_str(), first, last); @@ -904,15 +877,6 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit, } } } - - if (created_new_wb) { - for (uint32_t i = 0; i < lg_updates.size(); ++i) { - if (lg_updates[i] != NULL) { - delete lg_updates[i]; - lg_updates[i] = NULL; - } - } - } } delete file; diff --git a/src/leveldb/db/repair.cc b/src/leveldb/db/repair.cc index b67fc77b7..3a2ecfeb8 100644 --- a/src/leveldb/db/repair.cc +++ b/src/leveldb/db/repair.cc @@ -618,29 +618,13 @@ class DBRepairer { continue; } - std::vector lg_batchs; - lg_batchs.resize(options_.exist_lg_list->size()); - std::fill(lg_batchs.begin(), lg_batchs.end(), (WriteBatch*)0); - bool created_new_wb = false; - if (options_.exist_lg_list->size() > 1) { - status = batch.SeperateLocalityGroup(&lg_batchs); - created_new_wb = true; - if (!status.ok()) { - return status; - } - for (uint32_t i = 0; i < options_.exist_lg_list->size(); ++i) { - if (lg_batchs[i] != 0) { - WriteBatchInternal::SetSequence(lg_batchs[i], batch_seq); - } - } - } else { - lg_batchs[0] = (&batch); + std::vector lg_batchs(options_.exist_lg_list->size()); + status = batch.SeperateLocalityGroup(&lg_batchs); + if (!status.ok()) { + return status; } for (uint32_t i = 0; i < lg_batchs.size(); ++i) { - if (lg_batchs[i] == NULL) { - continue; - } - status = repairers[i]->InsertMemTable(lg_batchs[i], batch_seq); + status = repairers[i]->InsertMemTable(&lg_batchs[i], batch_seq); if (!status.ok()) { Log(options_.info_log, "[%s][lg:%d] Insert log #%llu: ignoring %s", dbname_.c_str(), i, @@ -648,15 +632,7 @@ class DBRepairer { status.ToString().c_str()); status = Status::OK(); // Keep going with rest of file } else { - counter += WriteBatchInternal::Count(lg_batchs[i]); - } - } - if (created_new_wb) { - for (uint32_t i = 0; i < lg_batchs.size(); ++i) { - if (lg_batchs[i] != NULL) { - delete lg_batchs[i]; - lg_batchs[i] = NULL; - } + counter += WriteBatchInternal::Count(&lg_batchs[i]); } } last_sequence_ = batch_seq + batch_count - 1; diff --git a/src/leveldb/db/write_batch.cc b/src/leveldb/db/write_batch.cc index 7abcf7262..0bbf8737e 100644 --- a/src/leveldb/db/write_batch.cc +++ b/src/leveldb/db/write_batch.cc @@ -90,7 +90,7 @@ Status WriteBatch::Iterate(Handler* handler) const { } } -Status WriteBatch::SeperateLocalityGroup(std::vector* lg_bw) const { +Status WriteBatch::SeperateLocalityGroup(std::vector* lg_bw) const { Slice input(rep_); if (input.size() < kHeader) { return Status::Corruption("malformed WriteBatch (too small)"); @@ -114,21 +114,17 @@ Status WriteBatch::SeperateLocalityGroup(std::vector* lg_bw) const key = tmp_key; } assert(lg_id < lg_bw->size()); - if ((*lg_bw)[lg_id] == NULL) { - WriteBatch* bw = new WriteBatch(); - (*lg_bw)[lg_id] = bw; - } } switch (tag) { case kTypeValue: if (GetLengthPrefixedSlice(&input, &value)) { - (*lg_bw)[lg_id]->Put(key, value); + (*lg_bw)[lg_id].Put(key, value); } else { return Status::Corruption("bad WriteBatch Put"); } break; case kTypeDeletion: - (*lg_bw)[lg_id]->Delete(key); + (*lg_bw)[lg_id].Delete(key); // std::cout << "find delete key: " << key.ToString() << std::endl; break; default: @@ -139,12 +135,8 @@ Status WriteBatch::SeperateLocalityGroup(std::vector* lg_bw) const uint64_t last_sequence = WriteBatchInternal::Sequence(this) + WriteBatchInternal::Count(this) - 1; for (uint32_t i = 0; i < lg_bw->size(); ++i) { - if ((*lg_bw)[i] == NULL) { - (*lg_bw)[i] = new WriteBatch(); - WriteBatchInternal::SetCount((*lg_bw)[i], 0); - } - int c = WriteBatchInternal::Count((*lg_bw)[i]); - WriteBatchInternal::SetSequence((*lg_bw)[i], last_sequence - c + 1); + int c = WriteBatchInternal::Count(&(*lg_bw)[i]); + WriteBatchInternal::SetSequence(&(*lg_bw)[i], last_sequence - c + 1); } if (found != WriteBatchInternal::Count(this)) { return Status::Corruption("WriteBatch has wrong count"); diff --git a/src/leveldb/include/leveldb/tera_key.h b/src/leveldb/include/leveldb/tera_key.h index 871449d75..5be42067f 100644 --- a/src/leveldb/include/leveldb/tera_key.h +++ b/src/leveldb/include/leveldb/tera_key.h @@ -23,7 +23,9 @@ enum TeraKeyType { TKT_PUT_IFABSENT = 8, TKT_APPEND = 9, TKT_ADDINT64 = 10, - TKT_TYPE_NUM = 11 + TKT_TYPE_NUM = 11, + + TKT_LOCK = 0x82 }; class RawKeyOperator; diff --git a/src/leveldb/include/leveldb/write_batch.h b/src/leveldb/include/leveldb/write_batch.h index ebe9a0571..e74e53233 100644 --- a/src/leveldb/include/leveldb/write_batch.h +++ b/src/leveldb/include/leveldb/write_batch.h @@ -59,7 +59,7 @@ class WriteBatch { }; Status Iterate(Handler* handler) const; - Status SeperateLocalityGroup(std::vector* lg_bw) const; + Status SeperateLocalityGroup(std::vector* lg_bw) const; private: friend class WriteBatchInternal; diff --git a/src/proto/status_code.proto b/src/proto/status_code.proto index d0c133548..e8be07465 100644 --- a/src/proto/status_code.proto +++ b/src/proto/status_code.proto @@ -17,7 +17,7 @@ enum StatusCode { kTabletNodeIsIniting = 24; kTabletNodeIsReadonly = 25; kTabletNodeIsRunning = 29; - + // tablet node manage kTabletNodeReady = 30; kTabletNodeOffLine = 31; @@ -34,7 +34,7 @@ enum StatusCode { kTabletOnMerge = 50; kTabletUnLoading = 52; kTabletUnLoading2 = 68; - + kTableNotFound = 45; kTableCorrupt = 46; kTableNotSupport = 47; @@ -44,7 +44,7 @@ enum StatusCode { kIllegalAccess = 71; kNotPermission = 72; kIOError = 73; - + //// master rpc //// // create&update table @@ -60,7 +60,7 @@ enum StatusCode { // report kTabletNodeNotRegistered = 306; - + // cmdctrl kInvalidArgument = 310; @@ -75,7 +75,7 @@ enum StatusCode { // key kKeyNotExist = 402; kKeyNotInRange = 403; - + // meta table kMetaTabletError = 500; @@ -90,12 +90,16 @@ enum StatusCode { kAsyncNotRunning = 801; kAsyncTooBusy = 802; kAsyncFailure = 803; - + // zk kZKError = 900; - + kTableStatusEnable = 1000; kTableStatusDisable = 1001; + + kLockNotOwn = 1101; // lock & unlock + kLockNotExist = 1102; // unlock + kLockDoubleLock = 1103; // lock } enum TabletStatus { diff --git a/src/proto/table_schema.proto b/src/proto/table_schema.proto index b438eab2f..9f6c8727d 100644 --- a/src/proto/table_schema.proto +++ b/src/proto/table_schema.proto @@ -55,6 +55,7 @@ message TableSchema { optional string admin_group = 12; // users in admin_group can admin this table optional string alias = 13; // table alias optional string admin = 14; + optional bool enable_txn = 15 [default = false]; // deprecated, instead by raw_key GeneralKv optional bool kv_only = 9 [default = false]; diff --git a/src/proto/tabletnode_rpc.proto b/src/proto/tabletnode_rpc.proto index c46f9dab1..03b24d49b 100644 --- a/src/proto/tabletnode_rpc.proto +++ b/src/proto/tabletnode_rpc.proto @@ -152,11 +152,16 @@ message Mutation { message RowMutationSequence { required bytes row_key = 1; repeated Mutation mutation_sequence = 2; + optional bool lock_before_write = 3 [default = false]; + optional bool unlock_after_write = 4 [default = false]; + optional bool insure_locked_already = 5 [default = false]; + optional uint64 lock_id = 6; + optional bytes lock_annotation = 7; } message WriteTabletRequest { optional uint64 sequence_id = 1; - required string tablet_name = 2; + required string tablet_name = 2; //repeated KeyValuePair pair_list = 3; // for compatible optional bool is_sync = 4 [default = false]; optional bool is_instant = 5 [default = false]; diff --git a/src/sdk/mutate_impl.cc b/src/sdk/mutate_impl.cc index f5eb22dd3..b174859ef 100644 --- a/src/sdk/mutate_impl.cc +++ b/src/sdk/mutate_impl.cc @@ -18,7 +18,11 @@ RowMutationImpl::RowMutationImpl(TableImpl* table, const std::string& row_key) _retry_times(0), _finish(false), _finish_cond(&_finish_mutex), - _commit_times(0) { + _commit_times(0), + _lock_before_write(false), + _insure_locked_already(false), + _unlock_after_write(false), + _lock_id(0) { SetErrorIfInvalid(row_key, kRowkey); } @@ -400,6 +404,22 @@ void RowMutationImpl::RunCallback() { } } +void RowMutationImpl::LockBeforeWrite(uint64_t lock_id, const std::string& lock_annotation) { + _lock_before_write = true; + _lock_id = lock_id; + _lock_annotation = lock_annotation; +} + +void RowMutationImpl::InsureLockedAlready(uint64_t lock_id) { + _insure_locked_already = true; + _lock_id = lock_id; +} + +void RowMutationImpl::UnlockAfterWrite(uint64_t lock_id) { + _unlock_after_write = true; + _lock_id = lock_id; +} + RowMutation::Mutation& RowMutationImpl::AddMutation() { _mu_seq.resize(_mu_seq.size() + 1); return _mu_seq.back(); @@ -470,6 +490,20 @@ void SerializeMutation(const RowMutation::Mutation& src, tera::Mutation* dst) { } } +void RowMutationImpl::Serialize(tera::RowMutationSequence* dst) { + dst->set_row_key(RowKey()); + for (uint32_t i = 0; i < MutationNum(); i++) { + const RowMutation::Mutation& mu = GetMutation(i); + tera::Mutation* mutation = dst->add_mutation_sequence(); + SerializeMutation(mu, mutation); + } + dst->set_lock_before_write(_lock_before_write); + dst->set_unlock_after_write(_unlock_after_write); + dst->set_insure_locked_already(_insure_locked_already); + dst->set_lock_id(_lock_id); + dst->set_lock_annotation(_lock_annotation); +} + } // namespace tera /* vim: set expandtab ts=4 sw=4 sts=4 tw=100: */ diff --git a/src/sdk/mutate_impl.h b/src/sdk/mutate_impl.h index 9fc28e50f..12311ec76 100644 --- a/src/sdk/mutate_impl.h +++ b/src/sdk/mutate_impl.h @@ -173,6 +173,19 @@ class RowMutationImpl : public RowMutation, public SdkTask { void AddCommitTimes() { _commit_times++; } int64_t GetCommitTimes() { return _commit_times; } + void LockBeforeWrite(uint64_t lock_id, const std::string& lock_annotation = ""); + void InsureLockedAlready(uint64_t lock_id); + void UnlockAfterWrite(uint64_t lock_id); + + bool IsLockBeforeWrite() { return _lock_before_write; } + bool IsInsureLockedAlready() { return _insure_locked_already; } + bool IsUnlockAfterWrite() { return _unlock_after_write; } + + uint64_t LockId() { return _lock_id; } + const std::string& LockAnnotation() { return _lock_annotation; } + + void Serialize(tera::RowMutationSequence* dst); + protected: /// 增加一个操作 RowMutation::Mutation& AddMutation(); @@ -194,6 +207,13 @@ class RowMutationImpl : public RowMutation, public SdkTask { /// 记录此mutation被提交到ts的次数 int64_t _commit_times; + + /// 锁相关 + bool _lock_before_write; + bool _insure_locked_already; + bool _unlock_after_write; + uint64_t _lock_id; + std::string _lock_annotation; }; void SerializeMutation(const RowMutation::Mutation& src, tera::Mutation* dst); diff --git a/src/sdk/schema.cc b/src/sdk/schema.cc index a0bb9c45e..e06e4b66f 100644 --- a/src/sdk/schema.cc +++ b/src/sdk/schema.cc @@ -119,6 +119,14 @@ bool TableDescriptor::IsWalDisabled() const { return _impl->IsWalDisabled(); } +void TableDescriptor::EnableTxn() { + _impl->EnableTxn(); +} + +bool TableDescriptor::IsTxnEnabled() const { + return _impl->IsTxnEnabled(); +} + int32_t TableDescriptor::AddSnapshot(uint64_t snapshot) { return _impl->AddSnapshot(snapshot); } diff --git a/src/sdk/schema_impl.cc b/src/sdk/schema_impl.cc index 71859f56b..dd82d6cef 100644 --- a/src/sdk/schema_impl.cc +++ b/src/sdk/schema_impl.cc @@ -192,7 +192,8 @@ TableDescImpl::TableDescImpl(const std::string& tb_name) _raw_key_type(kBinary), _split_size(FLAGS_tera_master_split_tablet_size), _merge_size(FLAGS_tera_master_merge_tablet_size), - _disable_wal(false) { + _disable_wal(false), + _enable_txn(false) { } /* @@ -416,6 +417,14 @@ bool TableDescImpl::IsWalDisabled() const { return _disable_wal; } +void TableDescImpl::EnableTxn() { + _enable_txn = true; +} + +bool TableDescImpl::IsTxnEnabled() const { + return _enable_txn; +} + /// 插入snapshot int32_t TableDescImpl::AddSnapshot(uint64_t snapshot) { _snapshots.push_back(snapshot); diff --git a/src/sdk/schema_impl.h b/src/sdk/schema_impl.h index 0a55a17d0..626028211 100644 --- a/src/sdk/schema_impl.h +++ b/src/sdk/schema_impl.h @@ -172,6 +172,9 @@ class TableDescImpl { void DisableWal(); bool IsWalDisabled() const; + void EnableTxn(); + bool IsTxnEnabled() const; + /// 插入snapshot int32_t AddSnapshot(uint64_t snapshot); /// 获取snapshot @@ -206,6 +209,7 @@ class TableDescImpl { int64_t _split_size; int64_t _merge_size; bool _disable_wal; + bool _enable_txn; std::string _admin_group; std::string _admin; std::string _alias; diff --git a/src/sdk/sdk_utils.cc b/src/sdk/sdk_utils.cc index 165e1b08a..160268a1a 100644 --- a/src/sdk/sdk_utils.cc +++ b/src/sdk/sdk_utils.cc @@ -137,6 +137,9 @@ void ShowTableSchema(const TableSchema& s, bool is_x) { if (is_x || schema.disable_wal()) { ss << "wal=" << Switch2Str(!schema.disable_wal()) << ","; } + if (is_x || schema.enable_txn()) { + ss << "txn=" << Switch2Str(schema.enable_txn()) << ","; + } ss << "\b> {" << std::endl; size_t lg_num = schema.locality_groups_size(); @@ -236,6 +239,7 @@ void TableDescToSchema(const TableDescriptor& desc, TableSchema* schema) { schema->set_admin_group(desc.AdminGroup()); schema->set_admin(desc.Admin()); schema->set_disable_wal(desc.IsWalDisabled()); + schema->set_enable_txn(desc.IsTxnEnabled()); schema->set_alias(desc.Alias()); // add lg int num = desc.LocalityGroupNum(); @@ -317,6 +321,9 @@ void TableSchemaToDesc(const TableSchema& schema, TableDescriptor* desc) { if (schema.has_disable_wal() && schema.disable_wal()) { desc->DisableWal(); } + if (schema.has_enable_txn() && schema.enable_txn()) { + desc->EnableTxn(); + } if (schema.has_alias()) { desc->SetAlias(schema.alias()); } @@ -511,6 +518,14 @@ bool SetTableProperties(const string& name, const string& value, } else { return false; } + } else if (name == "txn") { + if (value == "on") { + desc->EnableTxn(); + } else if (value == "off") { + // do nothing + } else { + return false; + } } else { return false; } @@ -543,6 +558,13 @@ bool CheckTableDescrptor(const TableDescriptor& desc, ErrorCode* err) { return false; } } + if (desc.IsTxnEnabled() && (desc.RawKey() == kGeneralKv || desc.RawKey() == kTTLKv)) { + ss << "kv and ttlkv don't support txn"; + if (err != NULL) { + err->SetFailed(ErrorCode::kBadParam, ss.str()); + } + return false; + } return true; } diff --git a/src/sdk/table_impl.cc b/src/sdk/table_impl.cc index 2affb4658..265a7fb6d 100644 --- a/src/sdk/table_impl.cc +++ b/src/sdk/table_impl.cc @@ -688,12 +688,8 @@ void TableImpl::CommitMutations(const std::string& server_addr, for (uint32_t i = 0; i < mu_list.size(); ++i) { RowMutationImpl* row_mutation = mu_list[i]; RowMutationSequence* mu_seq = request->add_row_list(); - mu_seq->set_row_key(row_mutation->RowKey()); - for (uint32_t j = 0; j < row_mutation->MutationNum(); j++) { - const RowMutation::Mutation& mu = row_mutation->GetMutation(j); - tera::Mutation* mutation = mu_seq->add_mutation_sequence(); - SerializeMutation(mu, mutation); - } + row_mutation->Serialize(mu_seq); + mu_id_list->push_back(row_mutation->GetId()); row_mutation->AddCommitTimes(); row_mutation->DecRef(); diff --git a/src/sdk/tera.h b/src/sdk/tera.h index c3242bda7..bb9ca952f 100644 --- a/src/sdk/tera.h +++ b/src/sdk/tera.h @@ -194,6 +194,9 @@ class TableDescriptor { void DisableWal(); bool IsWalDisabled() const; + void EnableTxn(); + bool IsTxnEnabled() const; + /// 插入snapshot int32_t AddSnapshot(uint64_t snapshot); /// 获取snapshot diff --git a/src/types.h b/src/types.h index bfad100da..c0ca887b4 100644 --- a/src/types.h +++ b/src/types.h @@ -19,7 +19,7 @@ const int64_t kInvalidTimerId = 0; const uint32_t kUnknownId = -1U; const uint32_t kInvalidSessionId = -1U; const std::string kUnknownAddr = "255.255.255.255:0000"; -const uint64_t kMaxTimeStamp = (1ULL << 56) - 1; +const int64_t kMaxTimeStamp = (1ULL << 56) - 1; const uint32_t kMaxHostNameSize = 255; const std::string kMasterNodePath = "/master"; const std::string kMasterLockPath = "/master-lock"; @@ -35,6 +35,7 @@ const uint64_t kMaxRpcSize = (16 << 20); // 16MB const uint64_t kRowkeySize = (64 << 10); // 64KB const uint64_t kQualifierSize = (64 << 10); // 64KB const uint64_t kValueSize = (32 << 20); // 32MB +const uint64_t kMaxLocalityGroupNum = 1024; } // namespace tera