1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440
//! # Hydroflow Deterministic Simulation Testing Framework
//! This module provides a deterministic simulation testing framework for testing Hydroflow
//! processes.
//! It can be used to test complex interactions between multiple Hydroflow processes in a
//! deterministic manner by running them in a single-threaded environment. The framework also
//! provides a "virtual network" implementation that allows production processes to exchange
//! messages within the simulation. More importantly, the network is fully under control of the
//! unit test and the test can introduce faults such as message delays, message drops and
//! network partitions.
//! ## Overview
//! Conceptually, the simulation contains a "Fleet", which is a collection of "Hosts". These
//! aren't real hosts, but rather a collection of individual Hydroflow processes (one per host)
//! that can communicate with each other over a virtual network. Every host has a "hostname"
//! which uniquely identifies it within the fleet.
//! ```text
//! ┌───────────────────────────────────────────────────────────────────────────────────────────┐
//! │ ┌───────────────────────────────────────────────────────────────────────────────────────┐ │
//! │ │FLEET │ │
//! │ │ ┌───────────────────────────────┐ ┌───────────────────────────────┐ │ │
//! │ │ │HOST │ │HOST │ │ │
//! │ │ │ ┌──────┐ ┌──────┐ ┌──────┐ │ │ ┌──────┐ ┌──────┐ ┌──────┐ │ │ │
//! │ │ │ │INBOX │ │INBOX │ │INBOX │ │ ┌-┼-►INBOX │ │INBOX │ │INBOX │ │ │ │
//! │ │ │ └──┬───┘ └──┬───┘ └──┬───┘ │ │ │ └──┬───┘ └──┬───┘ └──┬───┘ │ │ │
//! │ │ │ ┌──▼──────────▼─────────▼───┐ │ │ │ ┌──▼──────────▼─────────▼───┐ │ │ │
//! │ │ │ │ │ │ │ │ │ │ │ │ │
//! │ │ │ │ TRANSDUCER │ │ │ │ │ TRANSDUCER │ │ │ │
//! │ │ │ │ │ │ │ │ │ │ │ │ │
//! │ │ │ └───┬─────────┬──────────┬──┘ │ │ │ └───┬─────────┬─────────┬───┘ │ │ │
//! │ │ │ ┌──▼───┐ ┌──▼───┐ ┌───▼──┐ │ │ │ ┌──▼───┐ ┌──▼───┐ ┌──▼───┐ │ │ │
//! │ │ │ │OUTBOX│ │OUTBOX│ │OUTBOX┼-┼--┐ │ │ │OUTBOX│ │OUTBOX│ │OUTBOX│ │ │ │
//! │ │ │ └──────┘ └──────┘ └──────┘ │ │ │ │ └──────┘ └──────┘ └──────┘ │ │ │
//! │ │ └───────────────────────────────┘ │ │ └───────────────────────────────┘ │ │
//! │ └────────────────────────────────────┼──────────────┼───────────────────────────────────┘ │
//! │ ┌─┼──────────────┼─┐ │
//! │ │ └--------------┘ │ │
//! │ │ PROCESSING │ │
//! │ └──────────────────┘ │
//! └───────────────────────────────────────────────────────────────────────────────────────────┘
//! ```
//! ## Network Processing
//! ### Outboxes & Inboxes
//! When a process wishes to send a message to another process, it sends the message to an
//! "outbox" on its host. The unit test invokes the simulation's network message processing logic
//! at some desired cadence to pick up all messages from all outboxes and deliver them to the
//! corresponding inboxes on the destination hosts. The network message processing logic is the
//! point at which failures can be injected to change the behavior of the network.
//! ### Interface Names
//! Every inbox and outbox is associated with an "interface name". This is a string that uniquely
//! identifies the interface on the host. When a process sends a message, it specifies the
//! destination hostname and the interface name on that host to which the message should be
//! delivered.
//! ## Progress of Time in the Simulation
//! The single-threaded unit test can drive time forward on every host by invoking the `run_tick`
//! method on the host. This ultimately runs a single tick on the process. The unit test is
//! also responsible for invoking the network message processing at the time of its choosing and
//! can interleave the progress of time on various hosts and network processing as it sees fit.
//! ## Examples
//! Check the tests module for examples on how to use the simulation framework.
use std::any::Any;
use std::collections::HashMap;
use std::convert::Infallible;
use std::fmt::Debug;
use std::future::ready;
use std::pin::Pin;
use futures::{sink, Sink, SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_stream::Stream;
use tracing::trace;
use crate::scheduled::graph::Dfir;
use crate::util::{collect_ready_async, unbounded_channel};
/// A hostname is a unique identifier for a host in the simulation. It is used to address messages
/// to a specific host (and thus a specific Hydroflow process).
pub type Hostname = String;
/// An interface name is a unique identifier for an inbox or an outbox on host.
type InterfaceName = String;
/// An address is a combination of a hostname and an interface name.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Address {
host: Hostname,
interface: InterfaceName,
impl Address {
/// Create a new address with the given hostname and interface name.
pub fn new(host: Hostname, interface: InterfaceName) -> Self {
Address { host, interface }
/// A message sender is used to send messages to an inbox on a host.
pub trait MessageSender {
/// Send a message to the inbox on the host.
fn send(&self, message: MessageWithAddress);
impl<T: 'static> MessageSender for UnboundedSender<(T, Address)> {
fn send(&self, message: (Box<dyn Any>, Address)) {
match message.0.downcast::<T>() {
Ok(msg) => {
self.send((*msg, message.1)).unwrap();
Err(e) => {
panic!("Failed to downcast message to expected type: {:?}", e);
/// A message with an delivery address.
pub type MessageWithAddress = (Box<dyn Any>, Address);
/// An inbox is used by a host to receive messages for the process.
pub struct Inbox {
sender: Box<dyn MessageSender>,
/// Processes can send messages to other processes by putting those messages in an outbox
/// on their host.
pub struct Outbox {
receiver: Pin<Box<dyn Stream<Item = MessageWithAddress>>>,
/// A host is a single Hydroflow process running in the simulation. It has a unique hostname
/// and can communicate with other hosts over the virtual network. It has a collection of inboxes
/// and outboxes.
pub struct Host {
name: Hostname,
process: Dfir<'static>,
inputs: HashMap<InterfaceName, Inbox>,
output: HashMap<InterfaceName, Outbox>,
impl Host {
/// Run a single tick on the host's process. Returns true if any work was done by the
/// process. This effectively "advances" time on the process.
pub fn run_tick(&mut self) -> bool {
/// A builder for constructing a host in the simulation.
pub struct HostBuilder {
name: Hostname,
process: Option<Dfir<'static>>,
inboxes: HashMap<InterfaceName, Inbox>,
outboxes: HashMap<InterfaceName, Outbox>,
/// Used in conjunction with the `HostBuilder` to construct a host in the simulation.
pub struct ProcessBuilderContext<'context> {
inboxes: &'context mut HashMap<InterfaceName, Inbox>,
outboxes: &'context mut HashMap<InterfaceName, Outbox>,
fn sink_from_fn<T>(mut f: impl FnMut(T)) -> impl Sink<T, Error = Infallible> {
sink::drain().with(move |item| {
ready(Result::<(), Infallible>::Ok(()))
impl ProcessBuilderContext<'_> {
/// Create a new inbox on the host with the given interface name. Returns a stream that can
/// be read by the process using the source_stream dfir operator.
pub fn new_inbox<T: 'static>(
&mut self,
interface: InterfaceName,
) -> UnboundedReceiverStream<(T, Address)> {
let (sender, receiver) = unbounded_channel::<(T, Address)>();
Inbox {
sender: Box::new(sender),
/// Creates a new outbox on the host with the given interface name. Returns a sink that can
/// be written to by the process using the dest_sink dfir operator.
pub fn new_outbox<T: 'static>(
&mut self,
interface: InterfaceName,
) -> impl Sink<(T, Address), Error = Infallible> {
let (sender, receiver) = unbounded_channel::<(T, Address)>();
let receiver = receiver.map(|(msg, addr)| (Box::new(msg) as Box<dyn Any>, addr));
Outbox {
receiver: Box::pin(receiver),
sink_from_fn(move |message: (T, Address)| sender.send((message.0, message.1)).unwrap())
impl HostBuilder {
/// Creates a new instance of HostBuilder for a given hostname,
pub fn new(name: Hostname) -> Self {
HostBuilder {
process: None,
inboxes: Default::default(),
outboxes: Default::default(),
/// Supplies the (mandatory) process that runs on this host.
pub fn with_process<F>(mut self, builder: F) -> Self
F: FnOnce(&mut ProcessBuilderContext) -> Dfir<'static>,
let mut context = ProcessBuilderContext {
inboxes: &mut self.inboxes,
outboxes: &mut self.outboxes,
let process = builder(&mut context);
self.process = Some(process);
/// Builds the host with the supplied configuration.
pub fn build(self) -> Host {
if self.process.is_none() {
panic!("Process is required to build a host");
Host {
name: self.name,
process: self.process.unwrap(),
inputs: self.inboxes,
output: self.outboxes,
/// A fleet is a collection of hosts in the simulation. It is responsible for running the
/// simulation and processing network messages.
pub struct Fleet {
hosts: HashMap<String, Host>,
impl Fleet {
/// Creates a new instance of Fleet.
pub fn new() -> Self {
Fleet {
hosts: HashMap::new(),
/// Adds a new host to the fleet with the given name and process.
pub fn add_host<F>(&mut self, name: String, process_builder: F) -> &Host
F: FnOnce(&mut ProcessBuilderContext) -> Dfir<'static>,
let host = HostBuilder::new(name.clone())
self.hosts.insert(host.name.clone(), host).is_none(),
"Host with name {} already exists",
/// Get a host by name.
pub fn get_host(&self, name: &str) -> Option<&Host> {
/// Get a host by name.
pub fn get_host_mut(&mut self, name: &str) -> Option<&mut Host> {
/// Advance time on all hosts by a single tick. Returns true if any work was done by any of the
/// hosts. After ticking once on all the hosts, the method also processes network messages.
/// The order in which the ticks are processed is not guaranteed.
pub async fn run_single_tick_all_hosts(&mut self) -> bool {
let mut work_done: bool = false;
for (name, host) in self.hosts.iter_mut() {
trace!("Running tick for host: {}", name);
work_done |= host.run_tick();
/// Process all network messages in the simulation. This method picks up all messages from all
/// outboxes on all hosts and delivers them to the corresponding inboxes on the destination.
/// The order in which the messages are processed is not guaranteed.
pub async fn process_network(&mut self) {
let mut all_messages: Vec<(Address, MessageWithAddress)> = Vec::new();
// Collect all messages from all outboxes on all hosts.
for (name, host) in self.hosts.iter_mut() {
for (interface, output) in host.output.iter_mut() {
let src_address = Address::new(name.clone(), interface.clone());
let all_messages_on_interface: Vec<_> =
collect_ready_async(&mut output.receiver).await;
for message_on_interface in all_messages_on_interface {
all_messages.push((src_address.clone(), message_on_interface));
// Deliver all messages to the corresponding inboxes on the destination hosts.
for (src_address, (msg, addr)) in all_messages {
if let Some(destination_host) = self.hosts.get(&addr.host) {
if let Some(input) = destination_host.inputs.get(&addr.interface) {
input.sender.send((msg, src_address.clone()));
} else {
"No interface named {:?} found on host {:?}. Dropping message {:?}.",
} else {
"No host named {:?} found. Dropping message {:?}.",
/// Tick all hosts until all hosts are quiescent (i.e. no new work is done by any host). Ticking
/// is done in "rounds". At each round, all hosts are ticked once and then network messages are
/// processed. The process continues until no work is done by any host in a round.
pub async fn run_until_quiescent(&mut self) {
while self.run_single_tick_all_hosts().await {}
impl Default for Fleet {
fn default() -> Self {
mod tests {
use dfir_macro::{dfir_syntax, dfir_test};
use futures::StreamExt;
use crate::util::simulation::{Address, Fleet, Hostname};
use crate::util::unbounded_channel;
/// A simple test to demonstrate use of the simulation framework. Implements an echo server
/// and client.
async fn test_echo() {
let mut fleet = Fleet::new();
// Hostnames for the server and client
let server: Hostname = "server".to_string();
let client: Hostname = "client".to_string();
// Interface name for the echo "protocol"
let interface: String = "echo".to_string();
let server_address = Address::new(server.clone(), interface.clone());
// Create the echo server
fleet.add_host(server.clone(), |ctx| {
let network_input = ctx.new_inbox::<String>(interface.clone());
let network_output = ctx.new_outbox::<String>(interface.clone());
dfir_syntax! {
out = dest_sink(network_output);
-> inspect(|(msg, addr)| println!("Received {:?} from {:?}", msg, addr))
-> out;
// The client trigger channel is used to trigger the client into sending a message to the
// server. This allows the unit test to control when the client sends a message.
let (client_trigger_tx, client_trigger_rx) = unbounded_channel::<String>();
let (client_response_tx, mut client_response_rx) = unbounded_channel::<String>();
fleet.add_host(client.clone(), |ctx| {
let network_out = ctx.new_outbox::<String>(interface.clone());
let network_in = ctx.new_inbox::<String>(interface.clone());
dfir_syntax! {
out = dest_sink(network_out);
-> map(|msg| (msg, server_address.clone()))
-> out;
-> inspect(|(msg, addr)| println!("Received {:?} from {:?}", msg, addr))
-> for_each(|(msg, _addr)| client_response_tx.send(msg).unwrap());
// Trigger the client to send a message.
client_trigger_tx.send("Hello, world!".to_string()).unwrap();
// Run the simulation until no new work is done by any host.
// Check that the message was received.
let response = client_response_rx.next().await.unwrap();
assert_eq!(response, "Hello, world!");