Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ on:
- cron: '00 02 */4 * *'
env:
RUST_BACKTRACE: 1
CLC_STABLE_HASH: "b963be478a9b97fd149326dc69581f6733b23c23"
CLC_STABLE_HASH: "5703e607c324ef3a5453000aca5ea79d8a0fc95f"
jobs:
rust-tests:
name: Rust tests
Expand Down
2 changes: 1 addition & 1 deletion couchbase-lite-core-sys/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ fn download_source_code_via_git_if_needed() -> Result<PathBuf, Box<dyn std::erro
use which::which;

const URL: &str = "https://github.com/Dushistov/couchbase-lite-core";
const COMMIT_SHA1: &str = "b963be478a9b97fd149326dc69581f6733b23c23";
const COMMIT_SHA1: &str = "5703e607c324ef3a5453000aca5ea79d8a0fc95f";

let git_path = which("git")?;
let cur_dir = env::current_dir()?;
Expand Down
10 changes: 10 additions & 0 deletions couchbase-lite-core-sys/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@ impl Default for FLSliceResult {
}
}

impl AsRef<[u8]> for FLSliceResult {
#[inline]
fn as_ref(&self) -> &[u8] {
self.as_bytes()
}
}

/// looks like according C++ code it is safe from other thread
unsafe impl Send for FLSliceResult {}

impl FLSliceResult {
#[inline]
pub fn as_bytes(&self) -> &[u8] {
Expand Down
8 changes: 4 additions & 4 deletions couchbase-lite/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ documentation = "https://docs.rs/couchbase-lite"
repository = "https://github.com/dushistov/couchbase-lite-rust"
readme = "../README.md"
keywords = ["ffi", "couchbase", "couchbase-lite-core", "database"]
rust-version = "1.81"

[features]
default = ["build-cpp", "git-download-cpp", "use-couchbase-lite-sqlite", "use-tokio-websocket", "use-native-tls"]
Expand All @@ -26,11 +27,10 @@ couchbase-lite-core-sys = { version = "0.13.0", default-features = false }
serde-fleece = { version = "0.9.0", default-features = false }
log = "0.4"
tokio = { version = "1.16.1", optional = true, default-features = false, features = ["rt", "sync", "macros", "time"] }
tokio-tungstenite = { version = "0.23.0", optional = true, default-features = false, features = ["connect"] }
tokio-tungstenite = { version = "0.27.0", optional = true, default-features = false, features = ["connect"] }
futures-util = { version = "0.3", optional = true, default-features = false }
# TODO: remove deps when https://github.com/rust-lang/rust/issues/44930
# was merged
va_list = "0.1.4"
# TODO: remove deps when https://github.com/rust-lang/rust/issues/44930 was merged
va_list = "0.2.1"
serde = { version = "1.0", default-features = false, features = ["std"] }
uuid = { version = "1.1.2", default-features = false, features = ["v4", "serde"] }
bitflags = { version = "2.5.0", default-features = false }
Expand Down
21 changes: 7 additions & 14 deletions couchbase-lite/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ use crate::{
},
Database,
};
use log::error;
use std::{mem::MaybeUninit, os::raw::c_void, panic::catch_unwind, process::abort, ptr::NonNull};
use std::{mem::MaybeUninit, os::raw::c_void, ptr::NonNull};

pub(crate) struct DatabaseObserver {
inner: NonNull<C4CollectionObserver>,
Expand Down Expand Up @@ -37,18 +36,12 @@ impl DatabaseObserver {
) where
F: FnMut(*const C4CollectionObserver) + Send,
{
let r = catch_unwind(|| {
let boxed_f = context as *mut F;
assert!(
!boxed_f.is_null(),
"DatabaseObserver: Internal error - null function pointer"
);
(*boxed_f)(obs);
});
if r.is_err() {
error!("DatabaseObserver::call_boxed_closure catch panic aborting");
abort();
}
let boxed_f = context as *mut F;
assert!(
!boxed_f.is_null(),
"DatabaseObserver: Internal error - null function pointer"
);
(*boxed_f)(obs);
}
let boxed_f: *mut F = Box::into_raw(Box::new(callback_f));
let mut error = c4error_init();
Expand Down
64 changes: 23 additions & 41 deletions couchbase-lite/src/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@ use crate::{
},
Database,
};
use log::{error, info, trace};
use log::{info, trace};
use std::{
mem::{self, MaybeUninit},
os::raw::c_void,
panic::catch_unwind,
process::abort,
ptr,
ptr::NonNull,
slice, str,
Expand Down Expand Up @@ -233,18 +231,12 @@ impl Replicator {
F2: ReplicatorStatusChangedCallback,
F3: ReplicatorDocumentsEndedCallback,
{
let r = catch_unwind(|| {
let ctx = ctx as *mut CallbackContext<F, F2, F3>;
assert!(
!ctx.is_null(),
"Replicator::call_validation: Internal error - null function pointer"
);
((*ctx).validation_cb)(coll_spec, doc_id, rev_id, flags, body)
});
r.unwrap_or_else(|_| {
error!("Replicator::call_validation: catch panic aborting");
abort();
})
let ctx = ctx as *mut CallbackContext<F, F2, F3>;
assert!(
!ctx.is_null(),
"Replicator::call_validation: Internal error - null function pointer"
);
((*ctx).validation_cb)(coll_spec, doc_id, rev_id, flags, body)
}

unsafe extern "C" fn call_on_status_changed<F1, F, F3>(
Expand All @@ -257,18 +249,13 @@ impl Replicator {
F3: ReplicatorDocumentsEndedCallback,
{
info!("on_status_changed: repl {c4_repl:?}, status {status:?}");
let r = catch_unwind(|| {
let ctx = ctx as *mut CallbackContext<F1, F, F3>;
assert!(
!ctx.is_null(),
"Replicator::call_on_status_changed: Internal error - null function pointer"
);
((*ctx).state_cb)(ReplicatorState::from(status));
});
if r.is_err() {
error!("Replicator::call_on_status_changed: catch panic aborting");
abort();
}

let ctx = ctx as *mut CallbackContext<F1, F, F3>;
assert!(
!ctx.is_null(),
"Replicator::call_on_status_changed: Internal error - null function pointer"
);
((*ctx).state_cb)(ReplicatorState::from(status));
}

unsafe extern "C" fn call_on_documents_ended<F1, F2, F>(
Expand All @@ -283,20 +270,15 @@ impl Replicator {
F: ReplicatorDocumentsEndedCallback,
{
trace!("on_documents_ended: repl {c4_repl:?} pushing {pushing}, num_docs {num_docs}");
let r = catch_unwind(|| {
let ctx = ctx as *mut CallbackContext<F1, F2, F>;
assert!(
!ctx.is_null(),
"Replicator::call_on_documents_ended: Internal error - null function pointer"
);
let docs: &[*const C4DocumentEnded] = slice::from_raw_parts(docs, num_docs);
let mut it = docs.iter().map(|x| &**x);
((*ctx).docs_ended_cb)(pushing, &mut it);
});
if r.is_err() {
error!("Replicator::call_on_documents_ended: catch panic aborting");
abort();
}

let ctx = ctx as *mut CallbackContext<F1, F2, F>;
assert!(
!ctx.is_null(),
"Replicator::call_on_documents_ended: Internal error - null function pointer"
);
let docs: &[*const C4DocumentEnded] = slice::from_raw_parts(docs, num_docs);
let mut it = docs.iter().map(|x| &**x);
((*ctx).docs_ended_cb)(pushing, &mut it);
}

let ctx = Box::new(CallbackContext {
Expand Down
23 changes: 12 additions & 11 deletions couchbase-lite/src/replicator/tokio_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use tokio_tungstenite::{
handshake::client::{Request, Response},
http::{self, header::HeaderName, HeaderValue, Uri},
protocol::{frame::coding::CloseCode, CloseFrame},
Message,
Bytes, Message,
},
WebSocketStream,
};
Expand All @@ -65,8 +65,8 @@ pub fn c4socket_init(handle: Handle) {
struct SocketImpl {
handle: Handle,
read_push_pull: Arc<ReadPushPull>,
writer: Arc<TokioMutex<(Option<WsWriter>, mpsc::UnboundedReceiver<Vec<u8>>)>>,
send_queue: mpsc::UnboundedSender<Vec<u8>>,
writer: Arc<TokioMutex<(Option<WsWriter>, mpsc::UnboundedReceiver<Bytes>)>>,
send_queue: mpsc::UnboundedSender<Bytes>,
close_control: Arc<CloseControl>,
}

Expand Down Expand Up @@ -202,9 +202,7 @@ unsafe extern "C" fn ws_write(c4sock: *mut C4Socket, allocated_data: C4SliceResu
assert!(!native.is_null());
let socket: &SocketImpl = &*native;
let writer = socket.writer.clone();
//TODO: change this when `Vec` allocator API was stabilized
// https://github.com/rust-lang/rust/issues/32838
let data: Vec<u8> = allocated_data.as_bytes().to_vec();
let data = Bytes::from_owner(allocated_data);
socket
.send_queue
.send(data)
Expand All @@ -219,7 +217,7 @@ unsafe extern "C" fn ws_write(c4sock: *mut C4Socket, allocated_data: C4SliceResu
});
}

async fn send_binary_msg(ctx: C4SocketPtr, writer: &mut WsWriter, data: Vec<u8>) {
async fn send_binary_msg(ctx: C4SocketPtr, writer: &mut WsWriter, data: Bytes) {
let n = data.len();
if let Err(err) = writer.send(Message::Binary(data)).await {
error!("c4sock {ctx:?}: writer.send failure: {err}");
Expand Down Expand Up @@ -330,7 +328,10 @@ unsafe extern "C" fn ws_request_close(c4sock: *mut C4Socket, status: c_int, mess
}
trace!("c4sock {c4sock:?}: sending close message");
if let Err(err) = writer
.send(Message::Close(Some(CloseFrame { code, reason })))
.send(Message::Close(Some(CloseFrame {
code,
reason: reason.to_string().into(),
})))
.await
{
error!("c4sock {c4sock:?}: requestClose, writer.send failure: {err}");
Expand Down Expand Up @@ -524,7 +525,7 @@ async fn do_open(
request: Result<Request, Error>,
mut stop_rx: oneshot::Receiver<()>,
read_push_pull: Arc<ReadPushPull>,
writer: Arc<TokioMutex<(Option<WsWriter>, mpsc::UnboundedReceiver<Vec<u8>>)>>,
writer: Arc<TokioMutex<(Option<WsWriter>, mpsc::UnboundedReceiver<Bytes>)>>,
close_control: Arc<CloseControl>,
handle: Handle,
) -> Result<(), Error> {
Expand Down Expand Up @@ -599,7 +600,7 @@ async fn main_read_loop(
let data = m.into_data();
read_push_pull.nbytes_avaible.store(data.len(), Ordering::Release);
unsafe {
c4socket_received(c4sock.0, data.as_slice().into());
c4socket_received(c4sock.0, data.as_ref().into());
}
read_push_pull.confirm.notified().await;
}
Expand Down Expand Up @@ -694,7 +695,7 @@ unsafe fn tungstenite_err_to_c4_err(err: tungstenite::Error) -> Error {
C4ErrorDomain::NetworkDomain,
C4NetworkErrorCode::kC4NumNetErrorCodesPlus1.0,
),
Utf8 => (
Utf8(_) => (
C4ErrorDomain::WebSocketDomain,
C4WebSocketCloseCode::kWebSocketCloseBadMessageFormat.0,
),
Expand Down
Loading