Clusters
When building scalable distributed systems in Hydro, you'll often need to use clusters, which represent groups of threads all running the same piece of your program (Single-Program-Multiple-Data, or "SPMD"). Hydro clusters can be used to implement scale-out systems using techniques such as sharding or replication. Unlike processes, the number of threads in a cluster does not need to be static, and can be chosen during deployment.
Like when creating a process, you can pass in a type parameter to a cluster to distinguish it from other clusters. For example, you can create a cluster with a marker of Worker
to represent a pool of workers in a distributed system:
struct Worker {}
let flow = FlowBuilder::new();
let workers: Cluster<Worker> = flow.cluster::<Worker>();
You can then instantiate a live collection on the cluster using the same APIs as for processes. For example, you can create a stream of integers on the worker cluster. If you launch this program, each member of the cluster will create a stream containing the elements 1, 2, 3, and 4:
let numbers = workers.source_iter(q!(vec![1, 2, 3, 4]));
Networking
When sending a live collection from a cluster to another location, each member of the cluster will send its local collection. On the receiver side, these collections will be joined together into a single stream of (ID, Data)
tuples where the ID uniquely identifies which member of the cluster the data came from. For example, you can send a stream from the worker cluster to another process using the send_bincode
method:
let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
numbers.send_bincode(&process)
// if there are 4 members in the cluster, we should receive 4 elements
// (ClusterId::<Worker>(0), 1), (ClusterId::<Worker>(1), 1), (ClusterId::<Worker>(2), 1), (ClusterId::<Worker>(3), 1)
If you do not need to know which member of the cluster the data came from, you can use the send_bincode_anonymous
method instead, which will drop the IDs at the receiver:
let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
numbers.send_bincode_anonymous(&process)
// if there are 4 members in the cluster, we should receive 4 elements
// 1, 1, 1, 1
In the reverse direction, when sending a stream to a cluster, the sender must prepare (ID, Data)
tuples, where the ID uniquely identifies which member of the cluster the data is intended for. For example, we can send a stream from a process to the worker cluster using the send_bincode
method:
let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
let on_worker: Stream<_, Cluster<_>, _> = numbers
.map(q!(|x| (ClusterId::from_raw(x), x)))
.send_bincode(&workers);
on_worker.send_bincode(&p2)
// if there are 4 members in the cluster, we should receive 4 elements
// (ClusterId::<Worker>(0), 0), (ClusterId::<Worker>(1), 1), (ClusterId::<Worker>(2), 2), (ClusterId::<Worker>(3), 3)
Broadcasting and Membership Lists
A common pattern in distributed systems is to broadcast data to all members of a cluster. In Hydro, this can be achieved using broadcast_bincode
, which takes in a stream of only data elements and broadcasts them to all members of the cluster. For example, we can broadcast a stream of integers to the worker cluster:
let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers);
on_worker.send_bincode(&p2)
// if there are 4 members in the cluster, we should receive 4 elements
// (ClusterId::<Worker>(0), 123), (ClusterId::<Worker>(1), 123), (ClusterId::<Worker>(2), 123), (ClusterId::<Worker>(3), 123)
The current broadcast implementation assumes a static configuration where members cannot be added or removed at runtime. This will change in the future as Hydro will support dynamically scaled clusters.
Under the hood, the broadcast_bincode
API uses a list of members of the cluster provided by the deployment system. To manually access this list, you can use the members
method on a cluster to get a value that can be used inside q!(...)
blocks:
let cluster_members = workers.members();
let members_stream: Stream<ClusterId<_>, Process<_>, _> = p1
.source_iter(q!(cluster_members /* : &[ClusterId<Worker>] */))
.cloned();
members_stream.send_bincode(&p2)
// if there are 4 members in the cluster, we should receive 4 elements
// ClusterId::<Worker>(0), ClusterId::<Worker>(1), ClusterId::<Worker>(2), ClusterId::<Worker>(3)
Self-Identification
In some programs, it may be necessary for cluster members to know their own ID (for example, to construct a ballot in Paxos). In Hydro, this can be achieved by using the CLUSTER_SELF_ID
constant, which can be used inside q!(...)
blocks to get the current cluster member's ID:
let workers: Cluster<()> = flow.cluster::<()>();
let self_id_stream = workers.source_iter(q!([CLUSTER_SELF_ID]));
self_id_stream
.filter(q!(|x| x.raw_id % 2 == 0))
.map(q!(|x| format!("hello from {}", x.raw_id)))
.send_bincode_anonymous(&process)
// if there are 4 members in the cluster, we should receive 2 elements
// "hello from 0", "hello from 2"
You can only use CLUSTER_SELF_ID
in code that will run on a Cluster<_>
, such as when calling Stream::map
when that stream is on a cluster. If you try to use it in code that will run on a Process<_>
, you'll get a compile-time error:
let process: Process<()> = flow.process::<()>();
process.source_iter(q!([CLUSTER_SELF_ID]));
// error[E0277]: the trait bound `ClusterSelfId<'_>: FreeVariableWithContext<hydro_lang::Process<'_>>` is not satisfied