Compare commits

...

14 commits

Author SHA1 Message Date
Quentin Dufour
85aca61860
[SKIP CI] WIP merkle todo encapsulation 2025-05-02 08:18:46 +02:00
Quentin Dufour
46ebfdba66
set config 2025-05-01 16:30:09 +02:00
Quentin Dufour
ee8fa687ad
add permits to merkle worker 2025-05-01 13:53:38 +02:00
Quentin Dufour
fa457328c8
[SKIP CI] add forget semaphore, add them back not yet implemented 2025-05-01 10:05:04 +02:00
Quentin Dufour
f34558af07
add a rpc in-flight limiter 2025-05-01 08:56:57 +02:00
Quentin Dufour
d78e5f8a1b
fix logic 2025-04-30 15:53:12 +02:00
Quentin Dufour
3172f875ae
pass config 2025-04-30 14:41:16 +02:00
Quentin Dufour
11a6417d11
try to better track queue len evol 2025-04-30 09:01:50 +02:00
Quentin Dufour
b0a9e007bd
try another approach to backpressure 2025-04-30 08:49:44 +02:00
Quentin Dufour
904548d1d1
allow up to 30sec 2025-04-30 08:38:28 +02:00
Quentin Dufour
6cc79bc696
react slower 2025-04-30 08:37:35 +02:00
Quentin Dufour
60b3d28f93
add an opentelemetry metric 2025-04-30 07:24:20 +02:00
Quentin Dufour
7fddf0af9c
first implementation 2025-04-29 10:50:47 +02:00
Quentin Dufour
78882f4040
add a backpressure system 2025-04-29 09:38:47 +02:00
22 changed files with 403 additions and 65 deletions

1
Cargo.lock generated
View file

@ -1470,6 +1470,7 @@ dependencies = [
"tokio",
"tokio-stream",
"tokio-util 0.7.14",
"tracing",
]
[[package]]

View file

@ -17,6 +17,7 @@ use opentelemetry::{
Context,
};
use garage_net::endpoint::RpcInFlightLimiter;
use garage_net::stream::{read_stream_to_end, stream_asyncread, ByteStream};
use garage_db as db;
@ -295,6 +296,7 @@ impl BlockManager {
&node_id,
BlockRpc::GetBlock(*hash, order_tag),
priority,
RpcInFlightLimiter::TableWrite,
);
tokio::select! {
res = rpc => {

View file

@ -13,6 +13,8 @@ use serde::{Deserialize, Serialize};
use format_table::format_table_to_string;
use garage_net::endpoint::RpcInFlightLimiter;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
@ -118,6 +120,7 @@ impl AdminRpcHandler {
&node,
AdminRpc::LaunchRepair(opt_to_send.clone()),
PRIO_NORMAL,
RpcInFlightLimiter::NoLimit,
)
.await;
if !matches!(resp, Ok(Ok(_))) {
@ -164,7 +167,12 @@ impl AdminRpcHandler {
let node_id = (*node).into();
match self
.endpoint
.call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL)
.call(
&node_id,
AdminRpc::Stats(opt),
PRIO_NORMAL,
RpcInFlightLimiter::NoLimit,
)
.await
{
Ok(Ok(AdminRpc::Ok(s))) => writeln!(&mut ret, "{}", s).unwrap(),
@ -407,6 +415,7 @@ impl AdminRpcHandler {
variable: variable.clone(),
}),
PRIO_NORMAL,
RpcInFlightLimiter::NoLimit,
)
.await??
{
@ -456,6 +465,7 @@ impl AdminRpcHandler {
value: value.to_string(),
}),
PRIO_NORMAL,
RpcInFlightLimiter::NoLimit,
)
.await??
{
@ -488,6 +498,7 @@ impl AdminRpcHandler {
&to,
AdminRpc::MetaOperation(MetaOperation::Snapshot { all: false }),
PRIO_NORMAL,
RpcInFlightLimiter::NoLimit,
)
.await?
}))

View file

@ -2,6 +2,7 @@ use std::collections::{HashMap, HashSet};
use std::time::Duration;
use format_table::format_table;
use garage_net::endpoint::RpcInFlightLimiter;
use garage_util::error::*;
use garage_rpc::layout::*;
@ -200,7 +201,12 @@ pub async fn cmd_connect(
args: ConnectNodeOpt,
) -> Result<(), Error> {
match rpc_cli
.call(&rpc_host, SystemRpc::Connect(args.node), PRIO_NORMAL)
.call(
&rpc_host,
SystemRpc::Connect(args.node),
PRIO_NORMAL,
RpcInFlightLimiter::NoLimit,
)
.await??
{
SystemRpc::Ok => {
@ -216,7 +222,10 @@ pub async fn cmd_admin(
rpc_host: NodeID,
args: AdminRpc,
) -> Result<(), HelperError> {
match rpc_cli.call(&rpc_host, args, PRIO_NORMAL).await?? {
match rpc_cli
.call(&rpc_host, args, PRIO_NORMAL, RpcInFlightLimiter::NoLimit)
.await??
{
AdminRpc::Ok(msg) => {
println!("{}", msg);
}
@ -271,7 +280,12 @@ pub async fn fetch_status(
rpc_host: NodeID,
) -> Result<Vec<KnownNodeInfo>, Error> {
match rpc_cli
.call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL)
.call(
&rpc_host,
SystemRpc::GetKnownNodes,
PRIO_NORMAL,
RpcInFlightLimiter::NoLimit,
)
.await??
{
SystemRpc::ReturnKnownNodes(nodes) => Ok(nodes),

View file

@ -1,6 +1,7 @@
use bytesize::ByteSize;
use format_table::format_table;
use garage_net::endpoint::RpcInFlightLimiter;
use garage_util::crdt::Crdt;
use garage_util::error::*;
@ -45,7 +46,12 @@ pub async fn cmd_assign_role(
args: AssignRoleOpt,
) -> Result<(), Error> {
let status = match rpc_cli
.call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL)
.call(
&rpc_host,
SystemRpc::GetKnownNodes,
PRIO_NORMAL,
RpcInFlightLimiter::NoLimit,
)
.await??
{
SystemRpc::ReturnKnownNodes(nodes) => nodes,
@ -475,7 +481,12 @@ pub async fn fetch_layout(
rpc_host: NodeID,
) -> Result<LayoutHistory, Error> {
match rpc_cli
.call(&rpc_host, SystemRpc::PullClusterLayout, PRIO_NORMAL)
.call(
&rpc_host,
SystemRpc::PullClusterLayout,
PRIO_NORMAL,
RpcInFlightLimiter::NoLimit,
)
.await??
{
SystemRpc::AdvertiseClusterLayout(t) => Ok(t),
@ -493,6 +504,7 @@ pub async fn send_layout(
&rpc_host,
SystemRpc::AdvertiseClusterLayout(layout),
PRIO_NORMAL,
RpcInFlightLimiter::NoLimit,
)
.await??;
Ok(())

View file

@ -244,7 +244,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
// Generate a temporary keypair for our RPC client
let (_pk, sk) = sodiumoxide::crypto::sign::ed25519::gen_keypair();
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, sk, None);
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, sk, None, None);
// Find and parse the address of the target host
let (id, addr, is_default_addr) = if let Some(h) = opt.rpc_host {

View file

@ -175,7 +175,13 @@ impl Garage {
// ---- admin tables ----
info!("Initialize bucket_table...");
let bucket_table = Table::new(BucketTable, control_rep_param.clone(), system.clone(), &db);
let bucket_table = Table::new(
BucketTable,
control_rep_param.clone(),
system.clone(),
&db,
&config.experimental.merkle_backpressure,
);
info!("Initialize bucket_alias_table...");
let bucket_alias_table = Table::new(
@ -183,9 +189,16 @@ impl Garage {
control_rep_param.clone(),
system.clone(),
&db,
&config.experimental.merkle_backpressure,
);
info!("Initialize key_table_table...");
let key_table = Table::new(KeyTable, control_rep_param, system.clone(), &db);
let key_table = Table::new(
KeyTable,
control_rep_param,
system.clone(),
&db,
&config.experimental.merkle_backpressure,
);
// ---- S3 tables ----
info!("Initialize block_ref_table...");
@ -196,6 +209,7 @@ impl Garage {
meta_rep_param.clone(),
system.clone(),
&db,
&config.experimental.merkle_backpressure,
);
info!("Initialize version_table...");
@ -206,10 +220,12 @@ impl Garage {
meta_rep_param.clone(),
system.clone(),
&db,
&config.experimental.merkle_backpressure,
);
info!("Initialize multipart upload counter table...");
let mpu_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db);
let mpu_counter_table =
IndexCounter::new(system.clone(), meta_rep_param.clone(), &db, &config);
info!("Initialize multipart upload table...");
let mpu_table = Table::new(
@ -220,10 +236,12 @@ impl Garage {
meta_rep_param.clone(),
system.clone(),
&db,
&config.experimental.merkle_backpressure,
);
info!("Initialize object counter table...");
let object_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db);
let object_counter_table =
IndexCounter::new(system.clone(), meta_rep_param.clone(), &db, &config);
info!("Initialize object_table...");
#[allow(clippy::redundant_clone)]
@ -236,6 +254,7 @@ impl Garage {
meta_rep_param.clone(),
system.clone(),
&db,
&config.experimental.merkle_backpressure,
);
info!("Load lifecycle worker state...");
@ -245,7 +264,7 @@ impl Garage {
// ---- K2V ----
#[cfg(feature = "k2v")]
let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param);
let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param, &config);
// ---- setup block refcount recalculation ----
// this function can be used to fix inconsistencies in the RC table
@ -335,9 +354,14 @@ impl Garage {
#[cfg(feature = "k2v")]
impl GarageK2V {
fn new(system: Arc<System>, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self {
fn new(
system: Arc<System>,
db: &db::Db,
meta_rep_param: TableShardedReplication,
config: &Config,
) -> Self {
info!("Initialize K2V counter table...");
let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db);
let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db, config);
info!("Initialize K2V subscription manager...");
let subscriptions = Arc::new(SubscriptionManager::new());
@ -351,6 +375,7 @@ impl GarageK2V {
meta_rep_param,
system.clone(),
db,
&config.experimental.merkle_backpressure,
);
info!("Initialize K2V RPC handler...");

View file

@ -10,6 +10,7 @@ use garage_db as db;
use garage_rpc::layout::LayoutHelper;
use garage_rpc::system::System;
use garage_util::background::BackgroundRunner;
use garage_util::config::Config;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::migrate::Migrate;
@ -173,6 +174,7 @@ impl<T: CountedItem> IndexCounter<T> {
system: Arc<System>,
replication: TableShardedReplication,
db: &db::Db,
config: &Config,
) -> Arc<Self> {
Arc::new(Self {
this_node: system.id,
@ -186,6 +188,7 @@ impl<T: CountedItem> IndexCounter<T> {
replication,
system,
db,
&config.experimental.merkle_backpressure,
),
})
}

View file

@ -39,6 +39,7 @@ kuska-handshake.workspace = true
opentelemetry = { workspace = true, optional = true }
opentelemetry-contrib = { workspace = true, optional = true }
tracing.workspace = true
[dev-dependencies]
pretty_env_logger.workspace = true

View file

@ -4,6 +4,7 @@ use std::pin::Pin;
use std::sync::atomic::{self, AtomicU32};
use std::sync::{Arc, Mutex};
use std::task::Poll;
use tracing::*;
use arc_swap::ArcSwapOption;
use bytes::Bytes;
@ -14,7 +15,7 @@ use futures::Stream;
use kuska_handshake::async_std::{handshake_client, BoxStream};
use tokio::net::TcpStream;
use tokio::select;
use tokio::sync::{mpsc, oneshot, watch};
use tokio::sync::{mpsc, oneshot, watch, Semaphore};
use tokio_util::compat::*;
#[cfg(feature = "telemetry")]
@ -25,6 +26,7 @@ use opentelemetry::{
#[cfg(feature = "telemetry")]
use opentelemetry_contrib::trace::propagator::binary::*;
use crate::endpoint::RpcInFlightLimiter;
use crate::error::*;
use crate::message::*;
use crate::netapp::*;
@ -41,6 +43,7 @@ pub(crate) struct ClientConn {
next_query_number: AtomicU32,
inflight: Mutex<HashMap<RequestID, oneshot::Sender<ByteStream>>>,
rpc_table_write_inflight_limiter: Option<Semaphore>,
}
impl ClientConn {
@ -98,8 +101,14 @@ impl ClientConn {
next_query_number: AtomicU32::from(RequestID::default()),
query_send: ArcSwapOption::new(Some(Arc::new(query_send))),
inflight: Mutex::new(HashMap::new()),
rpc_table_write_inflight_limiter: netapp.max_in_flight_table_write.map(Semaphore::new),
});
info!(
"Created conn with table write limit set to {}",
netapp.max_in_flight_table_write.unwrap_or(0)
);
netapp.connected_as_client(peer_id, conn.clone());
let debug_name = format!("CLI {}", hex::encode(&peer_id[..8]));
@ -144,10 +153,21 @@ impl ClientConn {
req: Req<T>,
path: &str,
prio: RequestPriority,
limiter: RpcInFlightLimiter,
) -> Result<Resp<T>, Error>
where
T: Message,
{
let _permit = match (limiter, &self.rpc_table_write_inflight_limiter) {
(RpcInFlightLimiter::TableWrite, Some(sem)) => {
info!(
"Available RPC table write slots: {}",
sem.available_permits()
);
Some(sem.acquire().await.unwrap())
}
_ => None,
};
let query_send = self.query_send.load_full().ok_or(Error::ConnectionClosed)?;
let id = self
@ -212,6 +232,7 @@ impl ClientConn {
let stream = Box::pin(canceller.for_stream(stream));
let resp_enc = RespEnc::decode(stream).await?;
drop(_permit);
debug!("client: got response to request {} (path {})", id, path);
Resp::from_enc(resp_enc)
}

View file

@ -57,6 +57,13 @@ where
}
}
#[derive(Debug, Copy, Clone, Default)]
pub enum RpcInFlightLimiter {
#[default]
NoLimit,
TableWrite,
}
// ----
/// This struct represents an endpoint for message of type `M`.
@ -114,6 +121,7 @@ where
target: &NodeID,
req: T,
prio: RequestPriority,
limiter: RpcInFlightLimiter,
) -> Result<Resp<M>, Error>
where
T: IntoReq<M>,
@ -136,7 +144,10 @@ where
"Not connected: {}",
hex::encode(&target[..8])
))),
Some(c) => c.call(req.into_req()?, self.path.as_str(), prio).await,
Some(c) => {
c.call(req.into_req()?, self.path.as_str(), prio, limiter)
.await
}
}
}
}
@ -149,8 +160,12 @@ where
target: &NodeID,
req: M,
prio: RequestPriority,
limiter: RpcInFlightLimiter,
) -> Result<<M as Message>::Response, Error> {
Ok(self.call_streaming(target, req, prio).await?.into_msg())
Ok(self
.call_streaming(target, req, prio, limiter)
.await?
.into_msg())
}
}

View file

@ -74,6 +74,8 @@ pub struct NetApp {
pub id: NodeID,
/// Private key associated with our peer ID
pub privkey: ed25519::SecretKey,
/// Config related to netapp
pub(crate) max_in_flight_table_write: Option<usize>,
pub(crate) server_conns: RwLock<HashMap<NodeID, Arc<ServerConn>>>,
pub(crate) client_conns: RwLock<HashMap<NodeID, Arc<ClientConn>>>,
@ -101,6 +103,7 @@ impl NetApp {
netid: auth::Key,
privkey: ed25519::SecretKey,
bind_outgoing_to: Option<IpAddr>,
max_in_flight_table_write: Option<usize>,
) -> Arc<Self> {
let mut version_tag = [0u8; 16];
version_tag[0..8].copy_from_slice(&u64::to_be_bytes(NETAPP_VERSION_TAG)[..]);
@ -114,6 +117,7 @@ impl NetApp {
netid,
id,
privkey,
max_in_flight_table_write,
server_conns: RwLock::new(HashMap::new()),
client_conns: RwLock::new(HashMap::new()),
endpoints: RwLock::new(HashMap::new()),
@ -427,6 +431,7 @@ impl NetApp {
server_port,
},
PRIO_NORMAL,
RpcInFlightLimiter::NoLimit,
)
.await
.map(|_| ())

View file

@ -406,7 +406,7 @@ impl PeeringManager {
ping_time
);
let ping_response = select! {
r = self.ping_endpoint.call(&id, ping_msg, PRIO_HIGH) => r,
r = self.ping_endpoint.call(&id, ping_msg, PRIO_HIGH, RpcInFlightLimiter::NoLimit) => r,
_ = tokio::time::sleep(ping_timeout) => Err(Error::Message("Ping timeout".into())),
};
@ -458,7 +458,12 @@ impl PeeringManager {
let pex_message = PeerListMessage { list: peer_list };
match self
.peer_list_endpoint
.call(id, pex_message, PRIO_BACKGROUND)
.call(
id,
pex_message,
PRIO_BACKGROUND,
RpcInFlightLimiter::NoLimit,
)
.await
{
Err(e) => warn!("Error doing peer exchange: {}", e),

View file

@ -6,6 +6,7 @@ use std::time::Duration;
use futures::future::join_all;
use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt;
use garage_net::endpoint::RpcInFlightLimiter;
use tokio::select;
use opentelemetry::KeyValue;
@ -44,6 +45,8 @@ pub struct RequestStrategy<T> {
rs_timeout: Timeout,
/// Data to drop when everything completes
rs_drop_on_complete: T,
/// RPC In Flight Limiter
rs_inflight_limiter: RpcInFlightLimiter,
}
#[derive(Copy, Clone)]
@ -61,6 +64,7 @@ impl Clone for RequestStrategy<()> {
rs_priority: self.rs_priority,
rs_timeout: self.rs_timeout,
rs_drop_on_complete: (),
rs_inflight_limiter: self.rs_inflight_limiter,
}
}
}
@ -74,6 +78,7 @@ impl RequestStrategy<()> {
rs_priority: prio,
rs_timeout: Timeout::Default,
rs_drop_on_complete: (),
rs_inflight_limiter: RpcInFlightLimiter::NoLimit,
}
}
/// Add an item to be dropped on completion
@ -84,6 +89,7 @@ impl RequestStrategy<()> {
rs_priority: self.rs_priority,
rs_timeout: self.rs_timeout,
rs_drop_on_complete: drop_on_complete,
rs_inflight_limiter: RpcInFlightLimiter::NoLimit,
}
}
}
@ -109,6 +115,10 @@ impl<T> RequestStrategy<T> {
self.rs_timeout = Timeout::Custom(timeout);
self
}
pub fn with_write_limiter(mut self) -> Self {
self.rs_inflight_limiter = RpcInFlightLimiter::TableWrite;
self
}
/// Extract drop_on_complete item
fn extract_drop_on_complete(self) -> (RequestStrategy<()>, T) {
(
@ -118,6 +128,7 @@ impl<T> RequestStrategy<T> {
rs_priority: self.rs_priority,
rs_timeout: self.rs_timeout,
rs_drop_on_complete: (),
rs_inflight_limiter: self.rs_inflight_limiter,
},
self.rs_drop_on_complete,
)
@ -185,7 +196,7 @@ impl RpcHelper {
let node_id = to.into();
let rpc_call = endpoint
.call_streaming(&node_id, msg, strat.rs_priority)
.call_streaming(&node_id, msg, strat.rs_priority, strat.rs_inflight_limiter)
.with_context(Context::current_with_span(span))
.record_duration(&self.0.metrics.rpc_duration, &metric_tags);

View file

@ -21,7 +21,7 @@ use garage_net::{NetApp, NetworkKey, NodeID, NodeKey};
#[cfg(feature = "kubernetes-discovery")]
use garage_util::config::KubernetesDiscoveryConfig;
use garage_util::config::{Config, DataDirEnum};
use garage_util::config::{Config, DataDirEnum, RpcInFlightLimiterEnum};
use garage_util::data::*;
use garage_util::error::*;
use garage_util::persister::Persister;
@ -256,7 +256,17 @@ impl System {
let bind_outgoing_to = Some(config)
.filter(|x| x.rpc_bind_outgoing)
.map(|x| x.rpc_bind_addr.ip());
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key, bind_outgoing_to);
let maybe_max_table_write = match &config.experimental.rpc_in_flight_limiters {
RpcInFlightLimiterEnum::None => None,
RpcInFlightLimiterEnum::FixedSize(v) => Some(v.max_table_write),
};
let netapp = NetApp::new(
GARAGE_VERSION_TAG,
network_key,
node_key,
bind_outgoing_to,
maybe_max_table_write,
);
let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
// ---- setup netapp public listener and full mesh peering strategy ----

View file

@ -3,10 +3,12 @@ use std::convert::TryInto;
use std::sync::Arc;
use serde_bytes::ByteBuf;
use tokio::sync::Notify;
use tokio::sync::SemaphorePermit;
use tokio::sync::{Notify, Semaphore};
use garage_db as db;
use garage_util::config::MerkleBackpressureEnum;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::migrate::Migrate;
@ -20,6 +22,67 @@ use crate::replication::*;
use crate::schema::*;
use crate::util::*;
pub(crate) struct MerkleTodo {
merkle_todo: db::Tree,
merkle_todo_notify: Notify,
merkle_todo_bounded_queue: Option<Arc<Semaphore>>,
}
impl Clone for MerkleTodo {
fn clone(&self) -> Self {
Self {
merkle_todo: self.merkle_todo.clone(),
merkle_todo_notify: Notify::new(),
merkle_todo_bounded_queue: self.merkle_todo_bounded_queue.clone(),
}
}
}
impl MerkleTodo {
fn new<F: TableSchema>(db: &db::Db, config: &MerkleBackpressureEnum) -> Self {
let merkle_todo = db
.open_tree(format!("{}:merkle_todo", F::TABLE_NAME))
.expect("Unable to open DB Merkle TODO tree");
let merkle_todo_bounded_queue = match config {
MerkleBackpressureEnum::None => None,
MerkleBackpressureEnum::FixedQueue(p) => {
Some(Arc::new(Semaphore::new(p.max_queue_size)))
}
};
Self {
merkle_todo,
merkle_todo_notify: Notify::new(),
merkle_todo_bounded_queue,
}
}
pub(crate) fn len(&self) -> Result<usize, db::Error> {
self.merkle_todo.len()
}
pub(crate) async fn with_db<F: FnOnce(&db::Tree, SemaphorePermit)>(&self, f: F) {
let bounded = self
.merkle_todo_bounded_queue
.clone()
.unwrap_or(Arc::new(Semaphore::new(1)));
let permit = bounded.acquire().await.unwrap();
f(&self.merkle_todo, permit);
}
pub(crate) fn appended(&self, permit: SemaphorePermit) {
permit.forget();
self.merkle_todo_notify.notify_one();
}
pub(crate) fn processed(&self) {
let bounded = self
.merkle_todo_bounded_queue
.clone()
.unwrap_or(Arc::new(Semaphore::new(1)));
bounded.add_permits(1);
}
}
pub struct TableData<F: TableSchema, R: TableReplication> {
system: Arc<System>,
@ -29,8 +92,7 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
pub store: db::Tree,
pub(crate) merkle_tree: db::Tree,
pub(crate) merkle_todo: db::Tree,
pub(crate) merkle_todo_notify: Notify,
pub(crate) merkle_todo: MerkleTodo,
pub(crate) insert_queue: db::Tree,
pub(crate) insert_queue_notify: Arc<Notify>,
@ -38,10 +100,18 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
pub(crate) gc_todo: db::Tree,
pub(crate) metrics: TableMetrics,
pub(crate) config: MerkleBackpressureEnum,
}
impl<F: TableSchema, R: TableReplication> TableData<F, R> {
pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> {
pub fn new(
system: Arc<System>,
instance: F,
replication: R,
db: &db::Db,
config: &MerkleBackpressureEnum,
) -> Arc<Self> {
let store = db
.open_tree(format!("{}:table", F::TABLE_NAME))
.expect("Unable to open DB tree");
@ -49,9 +119,8 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
let merkle_tree = db
.open_tree(format!("{}:merkle_tree", F::TABLE_NAME))
.expect("Unable to open DB Merkle tree tree");
let merkle_todo = db
.open_tree(format!("{}:merkle_todo", F::TABLE_NAME))
.expect("Unable to open DB Merkle TODO tree");
let merkle_todo = MerkleTodo::new::<F>(db, config);
let insert_queue = db
.open_tree(format!("{}:insert_queue", F::TABLE_NAME))
@ -76,11 +145,11 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
store,
merkle_tree,
merkle_todo,
merkle_todo_notify: Notify::new(),
insert_queue,
insert_queue_notify: Arc::new(Notify::new()),
gc_todo,
metrics,
config: config.clone(),
})
}
@ -167,6 +236,8 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
// - When an entry is modified or deleted, add it to the merkle updater's todo list.
// This has to be done atomically with the modification for the merkle updater
// to maintain consistency. The merkle updater must then be notified with todo_notify.
// Also to avoid overloading the merkle updater, you need to sleep a given amount of
// time to enable backpressure (ie. slow down clients).
// - When an entry is updated to be a tombstone, add it to the gc_todo tree
pub(crate) fn update_many<T: Borrow<ByteBuf>>(&self, entries: &[T]) -> Result<(), Error> {
@ -201,6 +272,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
) -> Result<Option<F::E>, Error> {
let tree_key = self.tree_key(partition_key, sort_key);
// transaction begins
let changed = self.store.db().transaction(|tx| {
let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? {
Some(old_bytes) => {
@ -238,12 +310,22 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
Ok(None)
}
})?;
// transaction ends
if let Some((new_entry, new_bytes_hash)) = changed {
self.metrics.internal_update_counter.add(1);
// early return if nothing changed
let (new_entry, new_bytes_hash) = match changed {
Some((e, b)) => (e, b),
None => {
let maybe_bound = self.merkle_todo_bounded_queue.clone();
if let Some(b) = &maybe_bound {
b.add_permits(1);
}
return Ok(None);
}
};
// Handle GC in case of tombstone
let is_tombstone = new_entry.is_tombstone();
self.merkle_todo_notify.notify_one();
if is_tombstone {
// We are only responsible for GC'ing this item if we are the
// "leader" of the partition, i.e. the first node in the
@ -259,10 +341,13 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
}
}
// Collect metrics
self.metrics.internal_update_counter.add(1);
// Synchronize with the Merkle Worker
self.merkle_todo_notify.notify_one(); // Wake-up it
Ok(Some(new_entry))
} else {
Ok(None)
}
}
pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
@ -282,10 +367,16 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
_ => Ok(false),
})?;
if removed {
if !removed {
let maybe_bound = self.merkle_todo_bounded_queue.clone();
if let Some(b) = &maybe_bound {
b.add_permits(1);
}
return Ok(false);
}
self.metrics.internal_delete_counter.add(1);
self.merkle_todo_notify.notify_one();
}
Ok(removed)
}
@ -310,11 +401,18 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
_ => Ok(false),
})?;
if removed {
if !removed {
let maybe_bound = self.merkle_todo_bounded_queue.clone();
if let Some(b) = &maybe_bound {
b.add_permits(1);
}
return Ok(false);
}
self.metrics.internal_delete_counter.add(1);
self.merkle_todo_notify.notify_one();
}
Ok(removed)
Ok(true)
}
// ---- Insert queue functions ----

View file

@ -262,7 +262,8 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
// GC has been successful for all of these entries.
// We now remove them all from our local table and from the GC todo list.
for item in items {
self.data
let _is_removed = self
.data
.delete_if_equal_hash(&item.key[..], item.value_hash)
.err_context("GC: local delete tombstones")?;
item.remove_if_equal(&self.data.gc_todo)
@ -275,14 +276,21 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
impl<F: TableSchema, R: TableReplication> EndpointHandler<GcRpc> for TableGc<F, R> {
async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> Result<GcRpc, Error> {
let maybe_bounded = self.data.merkle_todo_bounded_queue.clone();
match message {
GcRpc::Update(items) => {
if let Some(b) = maybe_bounded {
b.acquire_many(items.len() as u32).await.unwrap().forget();
}
self.data.update_many(items)?;
Ok(GcRpc::Ok)
}
GcRpc::DeleteIfEqualHash(items) => {
if let Some(b) = maybe_bounded {
b.acquire_many(items.len() as u32).await.unwrap().forget();
}
for (key, vhash) in items.iter() {
self.data.delete_if_equal_hash(&key[..], *vhash)?;
let _is_removed = self.data.delete_if_equal_hash(&key[..], *vhash)?;
}
Ok(GcRpc::Ok)
}
@ -329,7 +337,6 @@ impl<F: TableSchema, R: TableReplication> Worker for GcWorker<F, R> {
}
async fn wait_for_work(&mut self) -> WorkerState {
tokio::time::sleep(self.wait_delay).await;
WorkerState::Busy
}
}

View file

@ -9,6 +9,7 @@ use tokio::sync::watch;
use garage_db as db;
use garage_util::background::*;
use garage_util::config::MerkleBackpressureEnum;
use garage_util::data::*;
use garage_util::encode::{nonversioned_decode, nonversioned_encode};
use garage_util::error::Error;
@ -70,6 +71,15 @@ impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
pub(crate) fn new(data: Arc<TableData<F, R>>) -> Arc<Self> {
let empty_node_hash = blake2sum(&nonversioned_encode(&MerkleNode::Empty).unwrap()[..]);
// @FIXME: move in worker
match &data.config {
MerkleBackpressureEnum::None => info!("Merkle Backpressure is not activated"),
MerkleBackpressureEnum::FixedQueue(v) => info!(
"Merkle backpressure with a fixed queue size (qlen={}) is activated.",
v.max_queue_size
),
}
Arc::new(Self {
data,
empty_node_hash,
@ -125,6 +135,11 @@ impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
k
);
}
let maybe_bound = self.data.merkle_todo_bounded_queue.clone();
if let Some(b) = &maybe_bound {
b.add_permits(1);
}
Ok(())
}

View file

@ -1,12 +1,16 @@
use opentelemetry::{global, metrics::*, KeyValue};
use std::convert::TryInto;
use garage_db as db;
use crate::data::MerkleTodo;
/// TableMetrics reference all counter used for metrics
pub struct TableMetrics {
pub(crate) _table_size: ValueObserver<u64>,
pub(crate) _merkle_tree_size: ValueObserver<u64>,
pub(crate) _merkle_todo_len: ValueObserver<u64>,
pub(crate) _merkle_todo_bounded_queue_free: ValueObserver<u64>,
pub(crate) _gc_todo_len: ValueObserver<u64>,
pub(crate) get_request_counter: BoundCounter<u64>,
@ -25,7 +29,7 @@ impl TableMetrics {
table_name: &'static str,
store: db::Tree,
merkle_tree: db::Tree,
merkle_todo: db::Tree,
merkle_todo: MerkleTodo,
gc_todo: db::Tree,
) -> Self {
let meter = global::meter(table_name);
@ -72,6 +76,20 @@ impl TableMetrics {
)
.with_description("Merkle tree updater TODO queue length")
.init(),
_merkle_todo_bounded_queue_free: meter
.u64_value_observer(
"table.merkle_todo_bounded_queue_free",
move |observer| {
let maybe_bounded = merkle_todo_bounded_queue.clone();
let free: u64 = match &maybe_bounded {
Some(v) => v.available_permits().try_into().unwrap(),
None => 0,
};
observer.observe(free, &[KeyValue::new("table_name", table_name)])
}
)
.with_description("Merkle TODO queue free slots")
.init(),
_gc_todo_len: meter
.u64_value_observer(
"table.gc_todo_queue_length",

View file

@ -244,8 +244,14 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
// All remote nodes have written those items, now we can delete them locally
let mut not_removed = 0;
let maybe_bounded = self.data.merkle_todo_bounded_queue.clone();
if let Some(b) = maybe_bounded {
b.acquire_many(items.len() as u32).await.unwrap().forget();
}
for (k, v) in items.iter() {
if !self.data.delete_if_equal(&k[..], &v[..])? {
let removed = self.data.delete_if_equal(&k[..], &v[..])?;
if !removed {
not_removed += 1;
}
}

View file

@ -14,6 +14,7 @@ use opentelemetry::{
use garage_db as db;
use garage_util::background::BackgroundRunner;
use garage_util::config::MerkleBackpressureEnum;
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
@ -68,12 +69,18 @@ impl<F: TableSchema> Rpc for TableRpc<F> {
impl<F: TableSchema, R: TableReplication> Table<F, R> {
// =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) ===============
pub fn new(instance: F, replication: R, system: Arc<System>, db: &db::Db) -> Arc<Self> {
pub fn new(
instance: F,
replication: R,
system: Arc<System>,
db: &db::Db,
config: &MerkleBackpressureEnum,
) -> Arc<Self> {
let endpoint = system
.netapp
.endpoint(format!("garage_table/table.rs/Rpc:{}", F::TABLE_NAME));
let data = TableData::new(system.clone(), instance, replication, db);
let data = TableData::new(system.clone(), instance, replication, db, config);
let merkle_updater = MerkleUpdater::new(data.clone());
@ -131,7 +138,8 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
who.as_ref(),
rpc,
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.data.replication.write_quorum()),
.with_quorum(self.data.replication.write_quorum())
.with_write_limiter(),
)
.await?;
@ -527,6 +535,10 @@ impl<F: TableSchema, R: TableReplication> EndpointHandler<TableRpc<F>> for Table
Ok(TableRpc::Update(values))
}
TableRpc::Update(pairs) => {
let maybe_bounded = self.data.merkle_todo_bounded_queue.clone();
if let Some(b) = maybe_bounded {
b.acquire_many(pairs.len() as u32).await.unwrap().forget();
}
self.data.update_many(pairs)?;
Ok(TableRpc::Ok)
}

View file

@ -135,6 +135,10 @@ pub struct Config {
/// Configuration for the admin API endpoint
#[serde(default = "Default::default")]
pub admin: AdminConfig,
/// --- Experimental
#[serde(default = "Default::default")]
pub experimental: ExperimentalConfig,
}
/// Value for data_dir: either a single directory or a list of dirs with attributes
@ -255,6 +259,40 @@ pub struct KubernetesDiscoveryConfig {
pub skip_crd: bool,
}
#[derive(Deserialize, Debug, Clone, Default)]
pub struct ExperimentalConfig {
pub merkle_backpressure: MerkleBackpressureEnum,
pub rpc_in_flight_limiters: RpcInFlightLimiterEnum,
}
#[derive(Deserialize, Debug, Clone, Default)]
#[serde(rename_all = "lowercase", tag = "kind")]
pub enum MerkleBackpressureEnum {
#[default]
None,
FixedQueue(MerkleFixedQueue),
}
#[derive(Deserialize, Debug, Clone, Default)]
#[serde(rename_all = "lowercase", tag = "kind")]
pub enum RpcInFlightLimiterEnum {
#[default]
None,
FixedSize(InFlightFixedSize),
}
#[derive(Deserialize, Debug, Clone, Default)]
pub struct InFlightFixedSize {
#[serde(default = "default_max_table_write")]
pub max_table_write: usize,
}
#[derive(Deserialize, Debug, Clone, Default)]
pub struct MerkleFixedQueue {
#[serde(default = "default_max_queue_size")]
pub max_queue_size: usize,
}
/// Read and parse configuration
pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {
let config = std::fs::read_to_string(config_file)?;
@ -281,6 +319,14 @@ fn default_compression() -> Option<i32> {
Some(1)
}
fn default_max_table_write() -> usize {
64
}
fn default_max_queue_size() -> usize {
256
}
fn deserialize_compression<'de, D>(deserializer: D) -> Result<Option<i32>, D::Error>
where
D: de::Deserializer<'de>,