iroh-blobs 0.95 - New features
by rklaehnIroh-blobs 0.95 contains a number of significant new features that are worth explaining in detail. There are several new features that are useful for blobs users and also for iroh users in general.
Let's start with a feature that is essential for blobs itself, but can also be useful for many other protocols.
Connection pool
There is a new connection pool in util::connection_pool
. This is useful whenever you have a protocol that has to talk to a large number of endpoints while keeping an upper bound of concurrent open connections. In blobs, this is used whenever you use the downloader to orchestrate blobs downloads from multiple providers.
Iroh connections are relatively lightweight, but even so you don't want to keep thousands of them open at the same time. But opening a new connection every time you do a small exchange with a peer is very wasteful. The ConnectionPool
gives you an API to deal with these tradeoffs.
Basic usage
Let's first look at basic usage:
let pool = ConnectionPool::new(ep, iroh_blobs::ALPN, Options::default());
let conn = pool.get_or_connect(remote_id)?;
/// use the connection as usual.
get_or_connect
will try to get an existing connection from the pool. If there is none, it will create one and store it. The connection will be kept in the pool for a configurable time. Idle connections will be closed as needed. So you can just use this as a drop-in replacement for endpoint.connect and be sure that you won't ever create an unbounded number of connections.
Advanced features
There are some advanced features that can be configued using non-default options.
pub struct Options {
pub idle_timeout: Duration,
pub connect_timeout: Duration,
pub max_connections: usize,
pub on_connected: Option<OnConnected>,
}
You can configure the max number of connections to be retained, the maximum tolerable duration for connection establishment, and the max duration connections are kept when idle.
So far, pretty straightforward. There is an additional option to perform some setup before the connection is handed out to the user. For example, you can reject connections based on the data available at this time from the endpoint and the connection, or wait for the connection to reach a certain state before handing it out.
As an example, you might want to do iroh-blobs transfers only on direct connections in order to get good performance or reduce bandwidth use on the relay. If establishing direct connections is not possible, the connection establishment would time out, and you would never even attempt a transfer from such a node.
async fn on_connected(ep: Endpoint, conn: Connection) -> io::Result<()> {
let Ok(id) = conn.remote_node_id() else {
return Err(io::Error::other("unable to get node id"));
};
let Some(watcher) = ep.conn_type(id) else {
return Err(io::Error::other("unable to get conn_type watcher"));
};
let mut stream = watcher.stream();
while let Some(status) = stream.next().await {
if let ConnectionType::Direct { .. } = status {
return Ok(());
}
}
Err(io::Error::other("connection closed before becoming direct"))
};
let options = Options::default().with_on_connected(on_connected);
let pool = ConnectionPool::new(ep, iroh_blobs::ALPN, options);
let conn = pool.get_or_connect(remote_id)?;
/// use the connection as usual.
The code to await a direct connection will change quite a bit once we have QUIC multipath. But the capability will remain, and we will update the test code to reflect the new API.
The connection pool is generic enough that it will move to its own crate together with some other iroh utilities. It lives in blobs only until iroh 1.0 is released.
Until then, just depend on iroh-blobs. Iroh-blobs without persistent storage is a very lightweight dependency.
One thing to keep in mind when using the connection pool: the connection pool needs the ability to track which connections are currently being used. To do this, the connection pool does not return Connection
but ConnectionRef
, a struct that derefs to Connection
but contains some additional lifetime tracking.
But Connection
is Clone
, so in principle there is nothing stopping you from cloning the wrapped connection and losing the lifetime tracking. Don't do this. If you work with connections from the pool, you should pass around either a ConnectionRef
or a &Connection
to make sure the underlying ConnectionRef
stays alive.
Incorrect usage of ConnectionRef
:
fn handle_connection(connection: Connection) { tokio::spawn(...) }
let conn = pool.get_or_connect(remote_id)?;
handle_connection(conn.clone()); // clones the Connection out of the ConnectionRef.
/// The ConnectionRef will be dropped here, and the pool will consider the connection idle!
Correct usage of ConnectionRef
:
fn handle_connection(connection: ConnectionRef) { tokio::spawn(...) }
let conn = pool.get_or_connect(remote_id)?;
handle_connection(conn.clone());
/// The ConnectionRef will be moved into the task, and its lifetime will be properly tracked!
We experimented with a safer callback-based API, but it turned out to be just too inconvenient to use.
Abstract request and response streams
Iroh-blobs is a protocol that tries to avoid overabstraction. For example as of now you can only use the BLAKE3 hash function, and we hardcode the chunk group size to a value that should work well for all users.
But sometimes there are cases where a bit of abstraction is needed. There was a user request to be able to use compression with iroh-blobs in sendme. One way to do this is to compress files before adding them to the blob store. But this has various downsides. It requires you to create a copy of all data before adding it to the blob store, and will also not lead to very good compression rates when dealing with a large number of small files, since each file will have to be compressed in isolation.
It would be better to compress requests and response streams of the entire protocol and expose the resulting protocol under a different ALPN. With this approach the compression algorithm would be able to find redundancies between multiple files when handling a request for multiple blobs.
This was previously impossible since iroh-blobs worked directly with iroh::endpoint::SendStream
and iroh::endpoint::RecvStream
. So we added traits to allow wrapping send and receive stream in a transform such as compression/decompression.
By default, iroh-blobs still works directly with iroh::endpoint::SendStream
and iroh::endpoint::RecvStream
, so for normal use nothing changes.
The traits are a bit similar to Stream and Sink, but with two important additions.
-
We allow sending and receiving Bytes, since iroh streams work with bytes internally. That way we avoid a copy in the default case.
-
We have methods stop and reset to close the stream, and on the send stream a method stopped that returns a future that resolves when the remote side has closed the stream.
Wrapping the entire iroh-blobs protocol into compression is pretty straightforward except for some boilerplate. We have an example compression.rs that shows how to do this.
We will have this as an optional feature of sendme in one of the next releases.
Just like the connection pool, these traits are generally useful whenever you want to derive iroh protocols by wrapping existing protocols, so they will move to a separate crate once iroh 1.0 is released.
Enhanced provider events
This change is from iroh-blobs 0.93
On the provider side, it is now possible to have very detailed events about what the provider is doing. The provider events are now implemented as an irpc protocol. For each request type you can use an event mask to configure if you want to be notified at all, and if you need the ability to intercept the request, e.g. if you only want to serve certain hashes.
There is an example how to use the new provider events to limit by provider node id or hash.
Here is a provider event handler that serves only blobs requests for hashes in a fixed set of allowed hashes:
fn limit_by_hash(allowed_hashes: HashSet<Hash>) -> EventSender {
let mask = EventMask {
// We want to get a request for each get request that we can answer
// with OK or not OK depending on the hash. We do not want detailed
// events once it has been decided to handle a request.
get: RequestMode::Intercept,
..EventMask::DEFAULT
};
let (tx, mut rx) = EventSender::channel(32, mask);
n0_future::task::spawn(async move {
while let Some(msg) = rx.recv().await {
if let ProviderMessage::GetRequestReceived(msg) = msg {
let res = if !msg.request.ranges.is_blob() {
Err(AbortReason::Permission)
} else if !allowed_hashes.contains(&msg.request.hash) {
Err(AbortReason::Permission)
} else {
Ok(())
};
msg.tx.send(res).await.ok();
}
}
});
tx
}
What's next
The next major feature in iroh-blobs will be a minimal version of multiprovider downloads for individual blobs.
As soon as iroh 1.0 is released, several generic parts of iroh-blobs will move to a separate iroh utilities crate.
To get started, take a look at our docs, dive directly into the code, or chat with us in our discord channel.