Compare commits

...

14 commits

Author SHA1 Message Date
Quentin Dufour
85aca61860
[SKIP CI] WIP merkle todo encapsulation 2025-05-02 08:18:46 +02:00
Quentin Dufour
46ebfdba66
set config 2025-05-01 16:30:09 +02:00
Quentin Dufour
ee8fa687ad
add permits to merkle worker 2025-05-01 13:53:38 +02:00
Quentin Dufour
fa457328c8
[SKIP CI] add forget semaphore, add them back not yet implemented 2025-05-01 10:05:04 +02:00
Quentin Dufour
f34558af07
add a rpc in-flight limiter 2025-05-01 08:56:57 +02:00
Quentin Dufour
d78e5f8a1b
fix logic 2025-04-30 15:53:12 +02:00
Quentin Dufour
3172f875ae
pass config 2025-04-30 14:41:16 +02:00
Quentin Dufour
11a6417d11
try to better track queue len evol 2025-04-30 09:01:50 +02:00
Quentin Dufour
b0a9e007bd
try another approach to backpressure 2025-04-30 08:49:44 +02:00
Quentin Dufour
904548d1d1
allow up to 30sec 2025-04-30 08:38:28 +02:00
Quentin Dufour
6cc79bc696
react slower 2025-04-30 08:37:35 +02:00
Quentin Dufour
60b3d28f93
add an opentelemetry metric 2025-04-30 07:24:20 +02:00
Quentin Dufour
7fddf0af9c
first implementation 2025-04-29 10:50:47 +02:00
Quentin Dufour
78882f4040
add a backpressure system 2025-04-29 09:38:47 +02:00
22 changed files with 403 additions and 65 deletions

1
Cargo.lock generated
View file

@ -1470,6 +1470,7 @@ dependencies = [
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tokio-util 0.7.14", "tokio-util 0.7.14",
"tracing",
] ]
[[package]] [[package]]

View file

@ -17,6 +17,7 @@ use opentelemetry::{
Context, Context,
}; };
use garage_net::endpoint::RpcInFlightLimiter;
use garage_net::stream::{read_stream_to_end, stream_asyncread, ByteStream}; use garage_net::stream::{read_stream_to_end, stream_asyncread, ByteStream};
use garage_db as db; use garage_db as db;
@ -295,6 +296,7 @@ impl BlockManager {
&node_id, &node_id,
BlockRpc::GetBlock(*hash, order_tag), BlockRpc::GetBlock(*hash, order_tag),
priority, priority,
RpcInFlightLimiter::TableWrite,
); );
tokio::select! { tokio::select! {
res = rpc => { res = rpc => {

View file

@ -13,6 +13,8 @@ use serde::{Deserialize, Serialize};
use format_table::format_table_to_string; use format_table::format_table_to_string;
use garage_net::endpoint::RpcInFlightLimiter;
use garage_util::background::BackgroundRunner; use garage_util::background::BackgroundRunner;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error as GarageError; use garage_util::error::Error as GarageError;
@ -118,6 +120,7 @@ impl AdminRpcHandler {
&node, &node,
AdminRpc::LaunchRepair(opt_to_send.clone()), AdminRpc::LaunchRepair(opt_to_send.clone()),
PRIO_NORMAL, PRIO_NORMAL,
RpcInFlightLimiter::NoLimit,
) )
.await; .await;
if !matches!(resp, Ok(Ok(_))) { if !matches!(resp, Ok(Ok(_))) {
@ -164,7 +167,12 @@ impl AdminRpcHandler {
let node_id = (*node).into(); let node_id = (*node).into();
match self match self
.endpoint .endpoint
.call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL) .call(
&node_id,
AdminRpc::Stats(opt),
PRIO_NORMAL,
RpcInFlightLimiter::NoLimit,
)
.await .await
{ {
Ok(Ok(AdminRpc::Ok(s))) => writeln!(&mut ret, "{}", s).unwrap(), Ok(Ok(AdminRpc::Ok(s))) => writeln!(&mut ret, "{}", s).unwrap(),
@ -407,6 +415,7 @@ impl AdminRpcHandler {
variable: variable.clone(), variable: variable.clone(),
}), }),
PRIO_NORMAL, PRIO_NORMAL,
RpcInFlightLimiter::NoLimit,
) )
.await?? .await??
{ {
@ -456,6 +465,7 @@ impl AdminRpcHandler {
value: value.to_string(), value: value.to_string(),
}), }),
PRIO_NORMAL, PRIO_NORMAL,
RpcInFlightLimiter::NoLimit,
) )
.await?? .await??
{ {
@ -488,6 +498,7 @@ impl AdminRpcHandler {
&to, &to,
AdminRpc::MetaOperation(MetaOperation::Snapshot { all: false }), AdminRpc::MetaOperation(MetaOperation::Snapshot { all: false }),
PRIO_NORMAL, PRIO_NORMAL,
RpcInFlightLimiter::NoLimit,
) )
.await? .await?
})) }))

View file

@ -2,6 +2,7 @@ use std::collections::{HashMap, HashSet};
use std::time::Duration; use std::time::Duration;
use format_table::format_table; use format_table::format_table;
use garage_net::endpoint::RpcInFlightLimiter;
use garage_util::error::*; use garage_util::error::*;
use garage_rpc::layout::*; use garage_rpc::layout::*;
@ -200,7 +201,12 @@ pub async fn cmd_connect(
args: ConnectNodeOpt, args: ConnectNodeOpt,
) -> Result<(), Error> { ) -> Result<(), Error> {
match rpc_cli match rpc_cli
.call(&rpc_host, SystemRpc::Connect(args.node), PRIO_NORMAL) .call(
&rpc_host,
SystemRpc::Connect(args.node),
PRIO_NORMAL,
RpcInFlightLimiter::NoLimit,
)
.await?? .await??
{ {
SystemRpc::Ok => { SystemRpc::Ok => {
@ -216,7 +222,10 @@ pub async fn cmd_admin(
rpc_host: NodeID, rpc_host: NodeID,
args: AdminRpc, args: AdminRpc,
) -> Result<(), HelperError> { ) -> Result<(), HelperError> {
match rpc_cli.call(&rpc_host, args, PRIO_NORMAL).await?? { match rpc_cli
.call(&rpc_host, args, PRIO_NORMAL, RpcInFlightLimiter::NoLimit)
.await??
{
AdminRpc::Ok(msg) => { AdminRpc::Ok(msg) => {
println!("{}", msg); println!("{}", msg);
} }
@ -271,7 +280,12 @@ pub async fn fetch_status(
rpc_host: NodeID, rpc_host: NodeID,
) -> Result<Vec<KnownNodeInfo>, Error> { ) -> Result<Vec<KnownNodeInfo>, Error> {
match rpc_cli match rpc_cli
.call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL) .call(
&rpc_host,
SystemRpc::GetKnownNodes,
PRIO_NORMAL,
RpcInFlightLimiter::NoLimit,
)
.await?? .await??
{ {
SystemRpc::ReturnKnownNodes(nodes) => Ok(nodes), SystemRpc::ReturnKnownNodes(nodes) => Ok(nodes),

View file

@ -1,6 +1,7 @@
use bytesize::ByteSize; use bytesize::ByteSize;
use format_table::format_table; use format_table::format_table;
use garage_net::endpoint::RpcInFlightLimiter;
use garage_util::crdt::Crdt; use garage_util::crdt::Crdt;
use garage_util::error::*; use garage_util::error::*;
@ -45,7 +46,12 @@ pub async fn cmd_assign_role(
args: AssignRoleOpt, args: AssignRoleOpt,
) -> Result<(), Error> { ) -> Result<(), Error> {
let status = match rpc_cli let status = match rpc_cli
.call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL) .call(
&rpc_host,
SystemRpc::GetKnownNodes,
PRIO_NORMAL,
RpcInFlightLimiter::NoLimit,
)
.await?? .await??
{ {
SystemRpc::ReturnKnownNodes(nodes) => nodes, SystemRpc::ReturnKnownNodes(nodes) => nodes,
@ -475,7 +481,12 @@ pub async fn fetch_layout(
rpc_host: NodeID, rpc_host: NodeID,
) -> Result<LayoutHistory, Error> { ) -> Result<LayoutHistory, Error> {
match rpc_cli match rpc_cli
.call(&rpc_host, SystemRpc::PullClusterLayout, PRIO_NORMAL) .call(
&rpc_host,
SystemRpc::PullClusterLayout,
PRIO_NORMAL,
RpcInFlightLimiter::NoLimit,
)
.await?? .await??
{ {
SystemRpc::AdvertiseClusterLayout(t) => Ok(t), SystemRpc::AdvertiseClusterLayout(t) => Ok(t),
@ -493,6 +504,7 @@ pub async fn send_layout(
&rpc_host, &rpc_host,
SystemRpc::AdvertiseClusterLayout(layout), SystemRpc::AdvertiseClusterLayout(layout),
PRIO_NORMAL, PRIO_NORMAL,
RpcInFlightLimiter::NoLimit,
) )
.await??; .await??;
Ok(()) Ok(())

View file

@ -244,7 +244,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
// Generate a temporary keypair for our RPC client // Generate a temporary keypair for our RPC client
let (_pk, sk) = sodiumoxide::crypto::sign::ed25519::gen_keypair(); let (_pk, sk) = sodiumoxide::crypto::sign::ed25519::gen_keypair();
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, sk, None); let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, sk, None, None);
// Find and parse the address of the target host // Find and parse the address of the target host
let (id, addr, is_default_addr) = if let Some(h) = opt.rpc_host { let (id, addr, is_default_addr) = if let Some(h) = opt.rpc_host {

View file

@ -175,7 +175,13 @@ impl Garage {
// ---- admin tables ---- // ---- admin tables ----
info!("Initialize bucket_table..."); info!("Initialize bucket_table...");
let bucket_table = Table::new(BucketTable, control_rep_param.clone(), system.clone(), &db); let bucket_table = Table::new(
BucketTable,
control_rep_param.clone(),
system.clone(),
&db,
&config.experimental.merkle_backpressure,
);
info!("Initialize bucket_alias_table..."); info!("Initialize bucket_alias_table...");
let bucket_alias_table = Table::new( let bucket_alias_table = Table::new(
@ -183,9 +189,16 @@ impl Garage {
control_rep_param.clone(), control_rep_param.clone(),
system.clone(), system.clone(),
&db, &db,
&config.experimental.merkle_backpressure,
); );
info!("Initialize key_table_table..."); info!("Initialize key_table_table...");
let key_table = Table::new(KeyTable, control_rep_param, system.clone(), &db); let key_table = Table::new(
KeyTable,
control_rep_param,
system.clone(),
&db,
&config.experimental.merkle_backpressure,
);
// ---- S3 tables ---- // ---- S3 tables ----
info!("Initialize block_ref_table..."); info!("Initialize block_ref_table...");
@ -196,6 +209,7 @@ impl Garage {
meta_rep_param.clone(), meta_rep_param.clone(),
system.clone(), system.clone(),
&db, &db,
&config.experimental.merkle_backpressure,
); );
info!("Initialize version_table..."); info!("Initialize version_table...");
@ -206,10 +220,12 @@ impl Garage {
meta_rep_param.clone(), meta_rep_param.clone(),
system.clone(), system.clone(),
&db, &db,
&config.experimental.merkle_backpressure,
); );
info!("Initialize multipart upload counter table..."); info!("Initialize multipart upload counter table...");
let mpu_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db); let mpu_counter_table =
IndexCounter::new(system.clone(), meta_rep_param.clone(), &db, &config);
info!("Initialize multipart upload table..."); info!("Initialize multipart upload table...");
let mpu_table = Table::new( let mpu_table = Table::new(
@ -220,10 +236,12 @@ impl Garage {
meta_rep_param.clone(), meta_rep_param.clone(),
system.clone(), system.clone(),
&db, &db,
&config.experimental.merkle_backpressure,
); );
info!("Initialize object counter table..."); info!("Initialize object counter table...");
let object_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db); let object_counter_table =
IndexCounter::new(system.clone(), meta_rep_param.clone(), &db, &config);
info!("Initialize object_table..."); info!("Initialize object_table...");
#[allow(clippy::redundant_clone)] #[allow(clippy::redundant_clone)]
@ -236,6 +254,7 @@ impl Garage {
meta_rep_param.clone(), meta_rep_param.clone(),
system.clone(), system.clone(),
&db, &db,
&config.experimental.merkle_backpressure,
); );
info!("Load lifecycle worker state..."); info!("Load lifecycle worker state...");
@ -245,7 +264,7 @@ impl Garage {
// ---- K2V ---- // ---- K2V ----
#[cfg(feature = "k2v")] #[cfg(feature = "k2v")]
let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param); let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param, &config);
// ---- setup block refcount recalculation ---- // ---- setup block refcount recalculation ----
// this function can be used to fix inconsistencies in the RC table // this function can be used to fix inconsistencies in the RC table
@ -335,9 +354,14 @@ impl Garage {
#[cfg(feature = "k2v")] #[cfg(feature = "k2v")]
impl GarageK2V { impl GarageK2V {
fn new(system: Arc<System>, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self { fn new(
system: Arc<System>,
db: &db::Db,
meta_rep_param: TableShardedReplication,
config: &Config,
) -> Self {
info!("Initialize K2V counter table..."); info!("Initialize K2V counter table...");
let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db); let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db, config);
info!("Initialize K2V subscription manager..."); info!("Initialize K2V subscription manager...");
let subscriptions = Arc::new(SubscriptionManager::new()); let subscriptions = Arc::new(SubscriptionManager::new());
@ -351,6 +375,7 @@ impl GarageK2V {
meta_rep_param, meta_rep_param,
system.clone(), system.clone(),
db, db,
&config.experimental.merkle_backpressure,
); );
info!("Initialize K2V RPC handler..."); info!("Initialize K2V RPC handler...");

View file

@ -10,6 +10,7 @@ use garage_db as db;
use garage_rpc::layout::LayoutHelper; use garage_rpc::layout::LayoutHelper;
use garage_rpc::system::System; use garage_rpc::system::System;
use garage_util::background::BackgroundRunner; use garage_util::background::BackgroundRunner;
use garage_util::config::Config;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_util::migrate::Migrate; use garage_util::migrate::Migrate;
@ -173,6 +174,7 @@ impl<T: CountedItem> IndexCounter<T> {
system: Arc<System>, system: Arc<System>,
replication: TableShardedReplication, replication: TableShardedReplication,
db: &db::Db, db: &db::Db,
config: &Config,
) -> Arc<Self> { ) -> Arc<Self> {
Arc::new(Self { Arc::new(Self {
this_node: system.id, this_node: system.id,
@ -186,6 +188,7 @@ impl<T: CountedItem> IndexCounter<T> {
replication, replication,
system, system,
db, db,
&config.experimental.merkle_backpressure,
), ),
}) })
} }

View file

@ -39,6 +39,7 @@ kuska-handshake.workspace = true
opentelemetry = { workspace = true, optional = true } opentelemetry = { workspace = true, optional = true }
opentelemetry-contrib = { workspace = true, optional = true } opentelemetry-contrib = { workspace = true, optional = true }
tracing.workspace = true
[dev-dependencies] [dev-dependencies]
pretty_env_logger.workspace = true pretty_env_logger.workspace = true

View file

@ -4,6 +4,7 @@ use std::pin::Pin;
use std::sync::atomic::{self, AtomicU32}; use std::sync::atomic::{self, AtomicU32};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::task::Poll; use std::task::Poll;
use tracing::*;
use arc_swap::ArcSwapOption; use arc_swap::ArcSwapOption;
use bytes::Bytes; use bytes::Bytes;
@ -14,7 +15,7 @@ use futures::Stream;
use kuska_handshake::async_std::{handshake_client, BoxStream}; use kuska_handshake::async_std::{handshake_client, BoxStream};
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::select; use tokio::select;
use tokio::sync::{mpsc, oneshot, watch}; use tokio::sync::{mpsc, oneshot, watch, Semaphore};
use tokio_util::compat::*; use tokio_util::compat::*;
#[cfg(feature = "telemetry")] #[cfg(feature = "telemetry")]
@ -25,6 +26,7 @@ use opentelemetry::{
#[cfg(feature = "telemetry")] #[cfg(feature = "telemetry")]
use opentelemetry_contrib::trace::propagator::binary::*; use opentelemetry_contrib::trace::propagator::binary::*;
use crate::endpoint::RpcInFlightLimiter;
use crate::error::*; use crate::error::*;
use crate::message::*; use crate::message::*;
use crate::netapp::*; use crate::netapp::*;
@ -41,6 +43,7 @@ pub(crate) struct ClientConn {
next_query_number: AtomicU32, next_query_number: AtomicU32,
inflight: Mutex<HashMap<RequestID, oneshot::Sender<ByteStream>>>, inflight: Mutex<HashMap<RequestID, oneshot::Sender<ByteStream>>>,
rpc_table_write_inflight_limiter: Option<Semaphore>,
} }
impl ClientConn { impl ClientConn {
@ -98,8 +101,14 @@ impl ClientConn {
next_query_number: AtomicU32::from(RequestID::default()), next_query_number: AtomicU32::from(RequestID::default()),
query_send: ArcSwapOption::new(Some(Arc::new(query_send))), query_send: ArcSwapOption::new(Some(Arc::new(query_send))),
inflight: Mutex::new(HashMap::new()), inflight: Mutex::new(HashMap::new()),
rpc_table_write_inflight_limiter: netapp.max_in_flight_table_write.map(Semaphore::new),
}); });
info!(
"Created conn with table write limit set to {}",
netapp.max_in_flight_table_write.unwrap_or(0)
);
netapp.connected_as_client(peer_id, conn.clone()); netapp.connected_as_client(peer_id, conn.clone());
let debug_name = format!("CLI {}", hex::encode(&peer_id[..8])); let debug_name = format!("CLI {}", hex::encode(&peer_id[..8]));
@ -144,10 +153,21 @@ impl ClientConn {
req: Req<T>, req: Req<T>,
path: &str, path: &str,
prio: RequestPriority, prio: RequestPriority,
limiter: RpcInFlightLimiter,
) -> Result<Resp<T>, Error> ) -> Result<Resp<T>, Error>
where where
T: Message, T: Message,
{ {
let _permit = match (limiter, &self.rpc_table_write_inflight_limiter) {
(RpcInFlightLimiter::TableWrite, Some(sem)) => {
info!(
"Available RPC table write slots: {}",
sem.available_permits()
);
Some(sem.acquire().await.unwrap())
}
_ => None,
};
let query_send = self.query_send.load_full().ok_or(Error::ConnectionClosed)?; let query_send = self.query_send.load_full().ok_or(Error::ConnectionClosed)?;
let id = self let id = self
@ -212,6 +232,7 @@ impl ClientConn {
let stream = Box::pin(canceller.for_stream(stream)); let stream = Box::pin(canceller.for_stream(stream));
let resp_enc = RespEnc::decode(stream).await?; let resp_enc = RespEnc::decode(stream).await?;
drop(_permit);
debug!("client: got response to request {} (path {})", id, path); debug!("client: got response to request {} (path {})", id, path);
Resp::from_enc(resp_enc) Resp::from_enc(resp_enc)
} }

View file

@ -57,6 +57,13 @@ where
} }
} }
#[derive(Debug, Copy, Clone, Default)]
pub enum RpcInFlightLimiter {
#[default]
NoLimit,
TableWrite,
}
// ---- // ----
/// This struct represents an endpoint for message of type `M`. /// This struct represents an endpoint for message of type `M`.
@ -114,6 +121,7 @@ where
target: &NodeID, target: &NodeID,
req: T, req: T,
prio: RequestPriority, prio: RequestPriority,
limiter: RpcInFlightLimiter,
) -> Result<Resp<M>, Error> ) -> Result<Resp<M>, Error>
where where
T: IntoReq<M>, T: IntoReq<M>,
@ -136,7 +144,10 @@ where
"Not connected: {}", "Not connected: {}",
hex::encode(&target[..8]) hex::encode(&target[..8])
))), ))),
Some(c) => c.call(req.into_req()?, self.path.as_str(), prio).await, Some(c) => {
c.call(req.into_req()?, self.path.as_str(), prio, limiter)
.await
}
} }
} }
} }
@ -149,8 +160,12 @@ where
target: &NodeID, target: &NodeID,
req: M, req: M,
prio: RequestPriority, prio: RequestPriority,
limiter: RpcInFlightLimiter,
) -> Result<<M as Message>::Response, Error> { ) -> Result<<M as Message>::Response, Error> {
Ok(self.call_streaming(target, req, prio).await?.into_msg()) Ok(self
.call_streaming(target, req, prio, limiter)
.await?
.into_msg())
} }
} }

View file

@ -74,6 +74,8 @@ pub struct NetApp {
pub id: NodeID, pub id: NodeID,
/// Private key associated with our peer ID /// Private key associated with our peer ID
pub privkey: ed25519::SecretKey, pub privkey: ed25519::SecretKey,
/// Config related to netapp
pub(crate) max_in_flight_table_write: Option<usize>,
pub(crate) server_conns: RwLock<HashMap<NodeID, Arc<ServerConn>>>, pub(crate) server_conns: RwLock<HashMap<NodeID, Arc<ServerConn>>>,
pub(crate) client_conns: RwLock<HashMap<NodeID, Arc<ClientConn>>>, pub(crate) client_conns: RwLock<HashMap<NodeID, Arc<ClientConn>>>,
@ -101,6 +103,7 @@ impl NetApp {
netid: auth::Key, netid: auth::Key,
privkey: ed25519::SecretKey, privkey: ed25519::SecretKey,
bind_outgoing_to: Option<IpAddr>, bind_outgoing_to: Option<IpAddr>,
max_in_flight_table_write: Option<usize>,
) -> Arc<Self> { ) -> Arc<Self> {
let mut version_tag = [0u8; 16]; let mut version_tag = [0u8; 16];
version_tag[0..8].copy_from_slice(&u64::to_be_bytes(NETAPP_VERSION_TAG)[..]); version_tag[0..8].copy_from_slice(&u64::to_be_bytes(NETAPP_VERSION_TAG)[..]);
@ -114,6 +117,7 @@ impl NetApp {
netid, netid,
id, id,
privkey, privkey,
max_in_flight_table_write,
server_conns: RwLock::new(HashMap::new()), server_conns: RwLock::new(HashMap::new()),
client_conns: RwLock::new(HashMap::new()), client_conns: RwLock::new(HashMap::new()),
endpoints: RwLock::new(HashMap::new()), endpoints: RwLock::new(HashMap::new()),
@ -427,6 +431,7 @@ impl NetApp {
server_port, server_port,
}, },
PRIO_NORMAL, PRIO_NORMAL,
RpcInFlightLimiter::NoLimit,
) )
.await .await
.map(|_| ()) .map(|_| ())

View file

@ -406,7 +406,7 @@ impl PeeringManager {
ping_time ping_time
); );
let ping_response = select! { let ping_response = select! {
r = self.ping_endpoint.call(&id, ping_msg, PRIO_HIGH) => r, r = self.ping_endpoint.call(&id, ping_msg, PRIO_HIGH, RpcInFlightLimiter::NoLimit) => r,
_ = tokio::time::sleep(ping_timeout) => Err(Error::Message("Ping timeout".into())), _ = tokio::time::sleep(ping_timeout) => Err(Error::Message("Ping timeout".into())),
}; };
@ -458,7 +458,12 @@ impl PeeringManager {
let pex_message = PeerListMessage { list: peer_list }; let pex_message = PeerListMessage { list: peer_list };
match self match self
.peer_list_endpoint .peer_list_endpoint
.call(id, pex_message, PRIO_BACKGROUND) .call(
id,
pex_message,
PRIO_BACKGROUND,
RpcInFlightLimiter::NoLimit,
)
.await .await
{ {
Err(e) => warn!("Error doing peer exchange: {}", e), Err(e) => warn!("Error doing peer exchange: {}", e),

View file

@ -6,6 +6,7 @@ use std::time::Duration;
use futures::future::join_all; use futures::future::join_all;
use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use garage_net::endpoint::RpcInFlightLimiter;
use tokio::select; use tokio::select;
use opentelemetry::KeyValue; use opentelemetry::KeyValue;
@ -44,6 +45,8 @@ pub struct RequestStrategy<T> {
rs_timeout: Timeout, rs_timeout: Timeout,
/// Data to drop when everything completes /// Data to drop when everything completes
rs_drop_on_complete: T, rs_drop_on_complete: T,
/// RPC In Flight Limiter
rs_inflight_limiter: RpcInFlightLimiter,
} }
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
@ -61,6 +64,7 @@ impl Clone for RequestStrategy<()> {
rs_priority: self.rs_priority, rs_priority: self.rs_priority,
rs_timeout: self.rs_timeout, rs_timeout: self.rs_timeout,
rs_drop_on_complete: (), rs_drop_on_complete: (),
rs_inflight_limiter: self.rs_inflight_limiter,
} }
} }
} }
@ -74,6 +78,7 @@ impl RequestStrategy<()> {
rs_priority: prio, rs_priority: prio,
rs_timeout: Timeout::Default, rs_timeout: Timeout::Default,
rs_drop_on_complete: (), rs_drop_on_complete: (),
rs_inflight_limiter: RpcInFlightLimiter::NoLimit,
} }
} }
/// Add an item to be dropped on completion /// Add an item to be dropped on completion
@ -84,6 +89,7 @@ impl RequestStrategy<()> {
rs_priority: self.rs_priority, rs_priority: self.rs_priority,
rs_timeout: self.rs_timeout, rs_timeout: self.rs_timeout,
rs_drop_on_complete: drop_on_complete, rs_drop_on_complete: drop_on_complete,
rs_inflight_limiter: RpcInFlightLimiter::NoLimit,
} }
} }
} }
@ -109,6 +115,10 @@ impl<T> RequestStrategy<T> {
self.rs_timeout = Timeout::Custom(timeout); self.rs_timeout = Timeout::Custom(timeout);
self self
} }
pub fn with_write_limiter(mut self) -> Self {
self.rs_inflight_limiter = RpcInFlightLimiter::TableWrite;
self
}
/// Extract drop_on_complete item /// Extract drop_on_complete item
fn extract_drop_on_complete(self) -> (RequestStrategy<()>, T) { fn extract_drop_on_complete(self) -> (RequestStrategy<()>, T) {
( (
@ -118,6 +128,7 @@ impl<T> RequestStrategy<T> {
rs_priority: self.rs_priority, rs_priority: self.rs_priority,
rs_timeout: self.rs_timeout, rs_timeout: self.rs_timeout,
rs_drop_on_complete: (), rs_drop_on_complete: (),
rs_inflight_limiter: self.rs_inflight_limiter,
}, },
self.rs_drop_on_complete, self.rs_drop_on_complete,
) )
@ -185,7 +196,7 @@ impl RpcHelper {
let node_id = to.into(); let node_id = to.into();
let rpc_call = endpoint let rpc_call = endpoint
.call_streaming(&node_id, msg, strat.rs_priority) .call_streaming(&node_id, msg, strat.rs_priority, strat.rs_inflight_limiter)
.with_context(Context::current_with_span(span)) .with_context(Context::current_with_span(span))
.record_duration(&self.0.metrics.rpc_duration, &metric_tags); .record_duration(&self.0.metrics.rpc_duration, &metric_tags);

View file

@ -21,7 +21,7 @@ use garage_net::{NetApp, NetworkKey, NodeID, NodeKey};
#[cfg(feature = "kubernetes-discovery")] #[cfg(feature = "kubernetes-discovery")]
use garage_util::config::KubernetesDiscoveryConfig; use garage_util::config::KubernetesDiscoveryConfig;
use garage_util::config::{Config, DataDirEnum}; use garage_util::config::{Config, DataDirEnum, RpcInFlightLimiterEnum};
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_util::persister::Persister; use garage_util::persister::Persister;
@ -256,7 +256,17 @@ impl System {
let bind_outgoing_to = Some(config) let bind_outgoing_to = Some(config)
.filter(|x| x.rpc_bind_outgoing) .filter(|x| x.rpc_bind_outgoing)
.map(|x| x.rpc_bind_addr.ip()); .map(|x| x.rpc_bind_addr.ip());
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key, bind_outgoing_to); let maybe_max_table_write = match &config.experimental.rpc_in_flight_limiters {
RpcInFlightLimiterEnum::None => None,
RpcInFlightLimiterEnum::FixedSize(v) => Some(v.max_table_write),
};
let netapp = NetApp::new(
GARAGE_VERSION_TAG,
network_key,
node_key,
bind_outgoing_to,
maybe_max_table_write,
);
let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into()); let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
// ---- setup netapp public listener and full mesh peering strategy ---- // ---- setup netapp public listener and full mesh peering strategy ----

View file

@ -3,10 +3,12 @@ use std::convert::TryInto;
use std::sync::Arc; use std::sync::Arc;
use serde_bytes::ByteBuf; use serde_bytes::ByteBuf;
use tokio::sync::Notify; use tokio::sync::SemaphorePermit;
use tokio::sync::{Notify, Semaphore};
use garage_db as db; use garage_db as db;
use garage_util::config::MerkleBackpressureEnum;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_util::migrate::Migrate; use garage_util::migrate::Migrate;
@ -20,6 +22,67 @@ use crate::replication::*;
use crate::schema::*; use crate::schema::*;
use crate::util::*; use crate::util::*;
pub(crate) struct MerkleTodo {
merkle_todo: db::Tree,
merkle_todo_notify: Notify,
merkle_todo_bounded_queue: Option<Arc<Semaphore>>,
}
impl Clone for MerkleTodo {
fn clone(&self) -> Self {
Self {
merkle_todo: self.merkle_todo.clone(),
merkle_todo_notify: Notify::new(),
merkle_todo_bounded_queue: self.merkle_todo_bounded_queue.clone(),
}
}
}
impl MerkleTodo {
fn new<F: TableSchema>(db: &db::Db, config: &MerkleBackpressureEnum) -> Self {
let merkle_todo = db
.open_tree(format!("{}:merkle_todo", F::TABLE_NAME))
.expect("Unable to open DB Merkle TODO tree");
let merkle_todo_bounded_queue = match config {
MerkleBackpressureEnum::None => None,
MerkleBackpressureEnum::FixedQueue(p) => {
Some(Arc::new(Semaphore::new(p.max_queue_size)))
}
};
Self {
merkle_todo,
merkle_todo_notify: Notify::new(),
merkle_todo_bounded_queue,
}
}
pub(crate) fn len(&self) -> Result<usize, db::Error> {
self.merkle_todo.len()
}
pub(crate) async fn with_db<F: FnOnce(&db::Tree, SemaphorePermit)>(&self, f: F) {
let bounded = self
.merkle_todo_bounded_queue
.clone()
.unwrap_or(Arc::new(Semaphore::new(1)));
let permit = bounded.acquire().await.unwrap();
f(&self.merkle_todo, permit);
}
pub(crate) fn appended(&self, permit: SemaphorePermit) {
permit.forget();
self.merkle_todo_notify.notify_one();
}
pub(crate) fn processed(&self) {
let bounded = self
.merkle_todo_bounded_queue
.clone()
.unwrap_or(Arc::new(Semaphore::new(1)));
bounded.add_permits(1);
}
}
pub struct TableData<F: TableSchema, R: TableReplication> { pub struct TableData<F: TableSchema, R: TableReplication> {
system: Arc<System>, system: Arc<System>,
@ -29,8 +92,7 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
pub store: db::Tree, pub store: db::Tree,
pub(crate) merkle_tree: db::Tree, pub(crate) merkle_tree: db::Tree,
pub(crate) merkle_todo: db::Tree, pub(crate) merkle_todo: MerkleTodo,
pub(crate) merkle_todo_notify: Notify,
pub(crate) insert_queue: db::Tree, pub(crate) insert_queue: db::Tree,
pub(crate) insert_queue_notify: Arc<Notify>, pub(crate) insert_queue_notify: Arc<Notify>,
@ -38,10 +100,18 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
pub(crate) gc_todo: db::Tree, pub(crate) gc_todo: db::Tree,
pub(crate) metrics: TableMetrics, pub(crate) metrics: TableMetrics,
pub(crate) config: MerkleBackpressureEnum,
} }
impl<F: TableSchema, R: TableReplication> TableData<F, R> { impl<F: TableSchema, R: TableReplication> TableData<F, R> {
pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> { pub fn new(
system: Arc<System>,
instance: F,
replication: R,
db: &db::Db,
config: &MerkleBackpressureEnum,
) -> Arc<Self> {
let store = db let store = db
.open_tree(format!("{}:table", F::TABLE_NAME)) .open_tree(format!("{}:table", F::TABLE_NAME))
.expect("Unable to open DB tree"); .expect("Unable to open DB tree");
@ -49,9 +119,8 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
let merkle_tree = db let merkle_tree = db
.open_tree(format!("{}:merkle_tree", F::TABLE_NAME)) .open_tree(format!("{}:merkle_tree", F::TABLE_NAME))
.expect("Unable to open DB Merkle tree tree"); .expect("Unable to open DB Merkle tree tree");
let merkle_todo = db
.open_tree(format!("{}:merkle_todo", F::TABLE_NAME)) let merkle_todo = MerkleTodo::new::<F>(db, config);
.expect("Unable to open DB Merkle TODO tree");
let insert_queue = db let insert_queue = db
.open_tree(format!("{}:insert_queue", F::TABLE_NAME)) .open_tree(format!("{}:insert_queue", F::TABLE_NAME))
@ -76,11 +145,11 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
store, store,
merkle_tree, merkle_tree,
merkle_todo, merkle_todo,
merkle_todo_notify: Notify::new(),
insert_queue, insert_queue,
insert_queue_notify: Arc::new(Notify::new()), insert_queue_notify: Arc::new(Notify::new()),
gc_todo, gc_todo,
metrics, metrics,
config: config.clone(),
}) })
} }
@ -167,6 +236,8 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
// - When an entry is modified or deleted, add it to the merkle updater's todo list. // - When an entry is modified or deleted, add it to the merkle updater's todo list.
// This has to be done atomically with the modification for the merkle updater // This has to be done atomically with the modification for the merkle updater
// to maintain consistency. The merkle updater must then be notified with todo_notify. // to maintain consistency. The merkle updater must then be notified with todo_notify.
// Also to avoid overloading the merkle updater, you need to sleep a given amount of
// time to enable backpressure (ie. slow down clients).
// - When an entry is updated to be a tombstone, add it to the gc_todo tree // - When an entry is updated to be a tombstone, add it to the gc_todo tree
pub(crate) fn update_many<T: Borrow<ByteBuf>>(&self, entries: &[T]) -> Result<(), Error> { pub(crate) fn update_many<T: Borrow<ByteBuf>>(&self, entries: &[T]) -> Result<(), Error> {
@ -201,6 +272,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
) -> Result<Option<F::E>, Error> { ) -> Result<Option<F::E>, Error> {
let tree_key = self.tree_key(partition_key, sort_key); let tree_key = self.tree_key(partition_key, sort_key);
// transaction begins
let changed = self.store.db().transaction(|tx| { let changed = self.store.db().transaction(|tx| {
let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? { let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? {
Some(old_bytes) => { Some(old_bytes) => {
@ -238,12 +310,22 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
Ok(None) Ok(None)
} }
})?; })?;
// transaction ends
if let Some((new_entry, new_bytes_hash)) = changed { // early return if nothing changed
self.metrics.internal_update_counter.add(1); let (new_entry, new_bytes_hash) = match changed {
Some((e, b)) => (e, b),
None => {
let maybe_bound = self.merkle_todo_bounded_queue.clone();
if let Some(b) = &maybe_bound {
b.add_permits(1);
}
return Ok(None);
}
};
// Handle GC in case of tombstone
let is_tombstone = new_entry.is_tombstone(); let is_tombstone = new_entry.is_tombstone();
self.merkle_todo_notify.notify_one();
if is_tombstone { if is_tombstone {
// We are only responsible for GC'ing this item if we are the // We are only responsible for GC'ing this item if we are the
// "leader" of the partition, i.e. the first node in the // "leader" of the partition, i.e. the first node in the
@ -259,10 +341,13 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
} }
} }
// Collect metrics
self.metrics.internal_update_counter.add(1);
// Synchronize with the Merkle Worker
self.merkle_todo_notify.notify_one(); // Wake-up it
Ok(Some(new_entry)) Ok(Some(new_entry))
} else {
Ok(None)
}
} }
pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> { pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
@ -282,10 +367,16 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
_ => Ok(false), _ => Ok(false),
})?; })?;
if removed { if !removed {
let maybe_bound = self.merkle_todo_bounded_queue.clone();
if let Some(b) = &maybe_bound {
b.add_permits(1);
}
return Ok(false);
}
self.metrics.internal_delete_counter.add(1); self.metrics.internal_delete_counter.add(1);
self.merkle_todo_notify.notify_one(); self.merkle_todo_notify.notify_one();
}
Ok(removed) Ok(removed)
} }
@ -310,11 +401,18 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
_ => Ok(false), _ => Ok(false),
})?; })?;
if removed { if !removed {
let maybe_bound = self.merkle_todo_bounded_queue.clone();
if let Some(b) = &maybe_bound {
b.add_permits(1);
}
return Ok(false);
}
self.metrics.internal_delete_counter.add(1); self.metrics.internal_delete_counter.add(1);
self.merkle_todo_notify.notify_one(); self.merkle_todo_notify.notify_one();
}
Ok(removed) Ok(true)
} }
// ---- Insert queue functions ---- // ---- Insert queue functions ----

View file

@ -262,7 +262,8 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
// GC has been successful for all of these entries. // GC has been successful for all of these entries.
// We now remove them all from our local table and from the GC todo list. // We now remove them all from our local table and from the GC todo list.
for item in items { for item in items {
self.data let _is_removed = self
.data
.delete_if_equal_hash(&item.key[..], item.value_hash) .delete_if_equal_hash(&item.key[..], item.value_hash)
.err_context("GC: local delete tombstones")?; .err_context("GC: local delete tombstones")?;
item.remove_if_equal(&self.data.gc_todo) item.remove_if_equal(&self.data.gc_todo)
@ -275,14 +276,21 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
impl<F: TableSchema, R: TableReplication> EndpointHandler<GcRpc> for TableGc<F, R> { impl<F: TableSchema, R: TableReplication> EndpointHandler<GcRpc> for TableGc<F, R> {
async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> Result<GcRpc, Error> { async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> Result<GcRpc, Error> {
let maybe_bounded = self.data.merkle_todo_bounded_queue.clone();
match message { match message {
GcRpc::Update(items) => { GcRpc::Update(items) => {
if let Some(b) = maybe_bounded {
b.acquire_many(items.len() as u32).await.unwrap().forget();
}
self.data.update_many(items)?; self.data.update_many(items)?;
Ok(GcRpc::Ok) Ok(GcRpc::Ok)
} }
GcRpc::DeleteIfEqualHash(items) => { GcRpc::DeleteIfEqualHash(items) => {
if let Some(b) = maybe_bounded {
b.acquire_many(items.len() as u32).await.unwrap().forget();
}
for (key, vhash) in items.iter() { for (key, vhash) in items.iter() {
self.data.delete_if_equal_hash(&key[..], *vhash)?; let _is_removed = self.data.delete_if_equal_hash(&key[..], *vhash)?;
} }
Ok(GcRpc::Ok) Ok(GcRpc::Ok)
} }
@ -329,7 +337,6 @@ impl<F: TableSchema, R: TableReplication> Worker for GcWorker<F, R> {
} }
async fn wait_for_work(&mut self) -> WorkerState { async fn wait_for_work(&mut self) -> WorkerState {
tokio::time::sleep(self.wait_delay).await;
WorkerState::Busy WorkerState::Busy
} }
} }

View file

@ -9,6 +9,7 @@ use tokio::sync::watch;
use garage_db as db; use garage_db as db;
use garage_util::background::*; use garage_util::background::*;
use garage_util::config::MerkleBackpressureEnum;
use garage_util::data::*; use garage_util::data::*;
use garage_util::encode::{nonversioned_decode, nonversioned_encode}; use garage_util::encode::{nonversioned_decode, nonversioned_encode};
use garage_util::error::Error; use garage_util::error::Error;
@ -70,6 +71,15 @@ impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
pub(crate) fn new(data: Arc<TableData<F, R>>) -> Arc<Self> { pub(crate) fn new(data: Arc<TableData<F, R>>) -> Arc<Self> {
let empty_node_hash = blake2sum(&nonversioned_encode(&MerkleNode::Empty).unwrap()[..]); let empty_node_hash = blake2sum(&nonversioned_encode(&MerkleNode::Empty).unwrap()[..]);
// @FIXME: move in worker
match &data.config {
MerkleBackpressureEnum::None => info!("Merkle Backpressure is not activated"),
MerkleBackpressureEnum::FixedQueue(v) => info!(
"Merkle backpressure with a fixed queue size (qlen={}) is activated.",
v.max_queue_size
),
}
Arc::new(Self { Arc::new(Self {
data, data,
empty_node_hash, empty_node_hash,
@ -125,6 +135,11 @@ impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
k k
); );
} }
let maybe_bound = self.data.merkle_todo_bounded_queue.clone();
if let Some(b) = &maybe_bound {
b.add_permits(1);
}
Ok(()) Ok(())
} }

View file

@ -1,12 +1,16 @@
use opentelemetry::{global, metrics::*, KeyValue}; use opentelemetry::{global, metrics::*, KeyValue};
use std::convert::TryInto;
use garage_db as db; use garage_db as db;
use crate::data::MerkleTodo;
/// TableMetrics reference all counter used for metrics /// TableMetrics reference all counter used for metrics
pub struct TableMetrics { pub struct TableMetrics {
pub(crate) _table_size: ValueObserver<u64>, pub(crate) _table_size: ValueObserver<u64>,
pub(crate) _merkle_tree_size: ValueObserver<u64>, pub(crate) _merkle_tree_size: ValueObserver<u64>,
pub(crate) _merkle_todo_len: ValueObserver<u64>, pub(crate) _merkle_todo_len: ValueObserver<u64>,
pub(crate) _merkle_todo_bounded_queue_free: ValueObserver<u64>,
pub(crate) _gc_todo_len: ValueObserver<u64>, pub(crate) _gc_todo_len: ValueObserver<u64>,
pub(crate) get_request_counter: BoundCounter<u64>, pub(crate) get_request_counter: BoundCounter<u64>,
@ -25,7 +29,7 @@ impl TableMetrics {
table_name: &'static str, table_name: &'static str,
store: db::Tree, store: db::Tree,
merkle_tree: db::Tree, merkle_tree: db::Tree,
merkle_todo: db::Tree, merkle_todo: MerkleTodo,
gc_todo: db::Tree, gc_todo: db::Tree,
) -> Self { ) -> Self {
let meter = global::meter(table_name); let meter = global::meter(table_name);
@ -72,6 +76,20 @@ impl TableMetrics {
) )
.with_description("Merkle tree updater TODO queue length") .with_description("Merkle tree updater TODO queue length")
.init(), .init(),
_merkle_todo_bounded_queue_free: meter
.u64_value_observer(
"table.merkle_todo_bounded_queue_free",
move |observer| {
let maybe_bounded = merkle_todo_bounded_queue.clone();
let free: u64 = match &maybe_bounded {
Some(v) => v.available_permits().try_into().unwrap(),
None => 0,
};
observer.observe(free, &[KeyValue::new("table_name", table_name)])
}
)
.with_description("Merkle TODO queue free slots")
.init(),
_gc_todo_len: meter _gc_todo_len: meter
.u64_value_observer( .u64_value_observer(
"table.gc_todo_queue_length", "table.gc_todo_queue_length",

View file

@ -244,8 +244,14 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
// All remote nodes have written those items, now we can delete them locally // All remote nodes have written those items, now we can delete them locally
let mut not_removed = 0; let mut not_removed = 0;
let maybe_bounded = self.data.merkle_todo_bounded_queue.clone();
if let Some(b) = maybe_bounded {
b.acquire_many(items.len() as u32).await.unwrap().forget();
}
for (k, v) in items.iter() { for (k, v) in items.iter() {
if !self.data.delete_if_equal(&k[..], &v[..])? { let removed = self.data.delete_if_equal(&k[..], &v[..])?;
if !removed {
not_removed += 1; not_removed += 1;
} }
} }

View file

@ -14,6 +14,7 @@ use opentelemetry::{
use garage_db as db; use garage_db as db;
use garage_util::background::BackgroundRunner; use garage_util::background::BackgroundRunner;
use garage_util::config::MerkleBackpressureEnum;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error; use garage_util::error::Error;
use garage_util::metrics::RecordDuration; use garage_util::metrics::RecordDuration;
@ -68,12 +69,18 @@ impl<F: TableSchema> Rpc for TableRpc<F> {
impl<F: TableSchema, R: TableReplication> Table<F, R> { impl<F: TableSchema, R: TableReplication> Table<F, R> {
// =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) ===============
pub fn new(instance: F, replication: R, system: Arc<System>, db: &db::Db) -> Arc<Self> { pub fn new(
instance: F,
replication: R,
system: Arc<System>,
db: &db::Db,
config: &MerkleBackpressureEnum,
) -> Arc<Self> {
let endpoint = system let endpoint = system
.netapp .netapp
.endpoint(format!("garage_table/table.rs/Rpc:{}", F::TABLE_NAME)); .endpoint(format!("garage_table/table.rs/Rpc:{}", F::TABLE_NAME));
let data = TableData::new(system.clone(), instance, replication, db); let data = TableData::new(system.clone(), instance, replication, db, config);
let merkle_updater = MerkleUpdater::new(data.clone()); let merkle_updater = MerkleUpdater::new(data.clone());
@ -131,7 +138,8 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
who.as_ref(), who.as_ref(),
rpc, rpc,
RequestStrategy::with_priority(PRIO_NORMAL) RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.data.replication.write_quorum()), .with_quorum(self.data.replication.write_quorum())
.with_write_limiter(),
) )
.await?; .await?;
@ -527,6 +535,10 @@ impl<F: TableSchema, R: TableReplication> EndpointHandler<TableRpc<F>> for Table
Ok(TableRpc::Update(values)) Ok(TableRpc::Update(values))
} }
TableRpc::Update(pairs) => { TableRpc::Update(pairs) => {
let maybe_bounded = self.data.merkle_todo_bounded_queue.clone();
if let Some(b) = maybe_bounded {
b.acquire_many(pairs.len() as u32).await.unwrap().forget();
}
self.data.update_many(pairs)?; self.data.update_many(pairs)?;
Ok(TableRpc::Ok) Ok(TableRpc::Ok)
} }

View file

@ -135,6 +135,10 @@ pub struct Config {
/// Configuration for the admin API endpoint /// Configuration for the admin API endpoint
#[serde(default = "Default::default")] #[serde(default = "Default::default")]
pub admin: AdminConfig, pub admin: AdminConfig,
/// --- Experimental
#[serde(default = "Default::default")]
pub experimental: ExperimentalConfig,
} }
/// Value for data_dir: either a single directory or a list of dirs with attributes /// Value for data_dir: either a single directory or a list of dirs with attributes
@ -255,6 +259,40 @@ pub struct KubernetesDiscoveryConfig {
pub skip_crd: bool, pub skip_crd: bool,
} }
#[derive(Deserialize, Debug, Clone, Default)]
pub struct ExperimentalConfig {
pub merkle_backpressure: MerkleBackpressureEnum,
pub rpc_in_flight_limiters: RpcInFlightLimiterEnum,
}
#[derive(Deserialize, Debug, Clone, Default)]
#[serde(rename_all = "lowercase", tag = "kind")]
pub enum MerkleBackpressureEnum {
#[default]
None,
FixedQueue(MerkleFixedQueue),
}
#[derive(Deserialize, Debug, Clone, Default)]
#[serde(rename_all = "lowercase", tag = "kind")]
pub enum RpcInFlightLimiterEnum {
#[default]
None,
FixedSize(InFlightFixedSize),
}
#[derive(Deserialize, Debug, Clone, Default)]
pub struct InFlightFixedSize {
#[serde(default = "default_max_table_write")]
pub max_table_write: usize,
}
#[derive(Deserialize, Debug, Clone, Default)]
pub struct MerkleFixedQueue {
#[serde(default = "default_max_queue_size")]
pub max_queue_size: usize,
}
/// Read and parse configuration /// Read and parse configuration
pub fn read_config(config_file: PathBuf) -> Result<Config, Error> { pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {
let config = std::fs::read_to_string(config_file)?; let config = std::fs::read_to_string(config_file)?;
@ -281,6 +319,14 @@ fn default_compression() -> Option<i32> {
Some(1) Some(1)
} }
fn default_max_table_write() -> usize {
64
}
fn default_max_queue_size() -> usize {
256
}
fn deserialize_compression<'de, D>(deserializer: D) -> Result<Option<i32>, D::Error> fn deserialize_compression<'de, D>(deserializer: D) -> Result<Option<i32>, D::Error>
where where
D: de::Deserializer<'de>, D: de::Deserializer<'de>,