Skip to main content

hydro_lang/location/
mod.rs

1//! Type definitions for distributed locations, which specify where pieces of a Hydro
2//! program will be executed.
3//!
4//! Hydro is a **global**, **distributed** programming model. This means that the data
5//! and computation in a Hydro program can be spread across multiple machines, data
6//! centers, and even continents. To achieve this, Hydro uses the concept of
7//! **locations** to keep track of _where_ data is located and computation is executed.
8//!
9//! Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
10//! which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
11//! and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
12//! to allow live collections to be _moved_ between locations via network send/receive.
13//!
14//! See [the Hydro docs](https://hydro.run/docs/hydro/reference/locations/) for more information.
15
16use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::num::ParseIntError;
19use std::time::Duration;
20
21use bytes::{Bytes, BytesMut};
22use futures::stream::Stream as FuturesStream;
23use proc_macro2::Span;
24use quote::quote;
25use serde::de::DeserializeOwned;
26use serde::{Deserialize, Serialize};
27use slotmap::{Key, new_key_type};
28use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
29use stageleft::{QuotedWithContext, q, quote_type};
30use syn::parse_quote;
31use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
32
33use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource};
34use crate::forward_handle::ForwardRef;
35#[cfg(stageleft_runtime)]
36use crate::forward_handle::{CycleCollection, ForwardHandle};
37use crate::live_collections::boundedness::{Bounded, Unbounded};
38use crate::live_collections::keyed_stream::KeyedStream;
39use crate::live_collections::singleton::Singleton;
40use crate::live_collections::stream::{
41    ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
42};
43use crate::location::dynamic::LocationId;
44use crate::location::external_process::{
45    ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
46};
47use crate::nondet::NonDet;
48#[cfg(feature = "sim")]
49use crate::sim::SimSender;
50use crate::staging_util::get_this_crate;
51
52pub mod dynamic;
53
54#[expect(missing_docs, reason = "TODO")]
55pub mod external_process;
56pub use external_process::External;
57
58#[expect(missing_docs, reason = "TODO")]
59pub mod process;
60pub use process::Process;
61
62#[expect(missing_docs, reason = "TODO")]
63pub mod cluster;
64pub use cluster::Cluster;
65
66#[expect(missing_docs, reason = "TODO")]
67pub mod member_id;
68pub use member_id::{MemberId, TaglessMemberId};
69
70#[expect(missing_docs, reason = "TODO")]
71pub mod tick;
72pub use tick::{Atomic, NoTick, Tick};
73
74#[expect(missing_docs, reason = "TODO")]
75#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
76pub enum MembershipEvent {
77    Joined,
78    Left,
79}
80
81#[expect(missing_docs, reason = "TODO")]
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
83pub enum NetworkHint {
84    Auto,
85    TcpPort(Option<u16>),
86}
87
88pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
89    assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
90}
91
92#[stageleft::export(LocationKey)]
93new_key_type! {
94    /// A unique identifier for a clock tick.
95    pub struct LocationKey;
96}
97
98impl std::fmt::Display for LocationKey {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        write!(f, "loc{:?}", self.data()) // `"loc1v1"``
101    }
102}
103
104/// This is used for the ECS membership stream.
105/// TODO(mingwei): Make this more robust?
106impl std::str::FromStr for LocationKey {
107    type Err = Option<ParseIntError>;
108
109    fn from_str(s: &str) -> Result<Self, Self::Err> {
110        let nvn = s.strip_prefix("loc").ok_or(None)?;
111        let (idx, ver) = nvn.split_once("v").ok_or(None)?;
112        let idx: u64 = idx.parse()?;
113        let ver: u64 = ver.parse()?;
114        Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
115    }
116}
117
118impl LocationKey {
119    /// TODO(minwgei): Remove this and avoid magic key for simulator external.
120    /// The first location key, used by the simulator as the default external location.
121    pub const FIRST: Self = Self(slotmap::KeyData::from_ffi(0x0000000100000001)); // `1v1`
122
123    /// A key for testing with index 1.
124    #[cfg(test)]
125    pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000001)); // `1v255`
126
127    /// A key for testing with index 2.
128    #[cfg(test)]
129    pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000002)); // `2v255`
130}
131
132/// This is used within `q!` code in docker and ECS.
133impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for LocationKey {
134    type O = LocationKey;
135
136    fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
137    where
138        Self: Sized,
139    {
140        let root = get_this_crate();
141        let n = Key::data(&self).as_ffi();
142        (
143            QuoteTokens {
144                prelude: None,
145                expr: Some(quote! {
146                    #root::location::LocationKey::from(#root::runtime_support::slotmap::KeyData::from_ffi(#n))
147                }),
148            },
149            (),
150        )
151    }
152}
153
154/// A simple enum for the type of a root location.
155#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
156pub enum LocationType {
157    /// A process (single node).
158    Process,
159    /// A cluster (multiple nodes).
160    Cluster,
161    /// An external client.
162    External,
163}
164
165/// A location where data can be materialized and computation can be executed.
166///
167/// Hydro is a **global**, **distributed** programming model. This means that the data
168/// and computation in a Hydro program can be spread across multiple machines, data
169/// centers, and even continents. To achieve this, Hydro uses the concept of
170/// **locations** to keep track of _where_ data is located and computation is executed.
171///
172/// Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
173/// which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
174/// and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
175/// to allow live collections to be _moved_ between locations via network send/receive.
176///
177/// See [the Hydro docs](https://hydro.run/docs/hydro/reference/locations/) for more information.
178#[expect(
179    private_bounds,
180    reason = "only internal Hydro code can define location types"
181)]
182pub trait Location<'a>: dynamic::DynLocation {
183    /// The root location type for this location.
184    ///
185    /// For top-level locations like [`Process`] and [`Cluster`], this is `Self`.
186    /// For nested locations like [`Tick`], this is the root location that contains it.
187    type Root: Location<'a>;
188
189    /// Returns the root location for this location.
190    ///
191    /// For top-level locations like [`Process`] and [`Cluster`], this returns `self`.
192    /// For nested locations like [`Tick`], this returns the root location that contains it.
193    fn root(&self) -> Self::Root;
194
195    /// Attempts to create a new [`Tick`] clock domain at this location.
196    ///
197    /// Returns `Some(Tick)` if this is a top-level location (like [`Process`] or [`Cluster`]),
198    /// or `None` if this location is already inside a tick (nested ticks are not supported).
199    ///
200    /// Prefer using [`Location::tick`] when you know the location is top-level.
201    fn try_tick(&self) -> Option<Tick<Self>> {
202        if Self::is_top_level() {
203            let id = self.flow_state().borrow_mut().next_clock_id();
204            Some(Tick {
205                id,
206                l: self.clone(),
207            })
208        } else {
209            None
210        }
211    }
212
213    /// Returns the unique identifier for this location.
214    fn id(&self) -> LocationId {
215        dynamic::DynLocation::id(self)
216    }
217
218    /// Creates a new [`Tick`] clock domain at this location.
219    ///
220    /// A tick represents a logical clock that can be used to batch streaming data
221    /// into discrete time steps. This is useful for implementing iterative algorithms
222    /// or for synchronizing data across multiple streams.
223    ///
224    /// # Example
225    /// ```rust
226    /// # #[cfg(feature = "deploy")] {
227    /// # use hydro_lang::prelude::*;
228    /// # use futures::StreamExt;
229    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
230    /// let tick = process.tick();
231    /// let inside_tick = process
232    ///     .source_iter(q!(vec![1, 2, 3, 4]))
233    ///     .batch(&tick, nondet!(/** test */));
234    /// inside_tick.all_ticks()
235    /// # }, |mut stream| async move {
236    /// // 1, 2, 3, 4
237    /// # for w in vec![1, 2, 3, 4] {
238    /// #     assert_eq!(stream.next().await.unwrap(), w);
239    /// # }
240    /// # }));
241    /// # }
242    /// ```
243    fn tick(&self) -> Tick<Self>
244    where
245        Self: NoTick,
246    {
247        let id = self.flow_state().borrow_mut().next_clock_id();
248        Tick {
249            id,
250            l: self.clone(),
251        }
252    }
253
254    /// Creates an unbounded stream that continuously emits unit values `()`.
255    ///
256    /// This is useful for driving computations that need to run continuously,
257    /// such as polling or heartbeat mechanisms.
258    ///
259    /// # Example
260    /// ```rust
261    /// # #[cfg(feature = "deploy")] {
262    /// # use hydro_lang::prelude::*;
263    /// # use futures::StreamExt;
264    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
265    /// let tick = process.tick();
266    /// process.spin()
267    ///     .batch(&tick, nondet!(/** test */))
268    ///     .map(q!(|_| 42))
269    ///     .all_ticks()
270    /// # }, |mut stream| async move {
271    /// // 42, 42, 42, ...
272    /// # assert_eq!(stream.next().await.unwrap(), 42);
273    /// # assert_eq!(stream.next().await.unwrap(), 42);
274    /// # assert_eq!(stream.next().await.unwrap(), 42);
275    /// # }));
276    /// # }
277    /// ```
278    fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
279    where
280        Self: Sized + NoTick,
281    {
282        Stream::new(
283            self.clone(),
284            HydroNode::Source {
285                source: HydroSource::Spin(),
286                metadata: self.new_node_metadata(Stream::<
287                    (),
288                    Self,
289                    Unbounded,
290                    TotalOrder,
291                    ExactlyOnce,
292                >::collection_kind()),
293            },
294        )
295    }
296
297    /// Creates a stream from an async [`FuturesStream`].
298    ///
299    /// This is useful for integrating with external async data sources,
300    /// such as network connections or file readers.
301    ///
302    /// # Example
303    /// ```rust
304    /// # #[cfg(feature = "deploy")] {
305    /// # use hydro_lang::prelude::*;
306    /// # use futures::StreamExt;
307    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
308    /// process.source_stream(q!(futures::stream::iter(vec![1, 2, 3])))
309    /// # }, |mut stream| async move {
310    /// // 1, 2, 3
311    /// # for w in vec![1, 2, 3] {
312    /// #     assert_eq!(stream.next().await.unwrap(), w);
313    /// # }
314    /// # }));
315    /// # }
316    /// ```
317    fn source_stream<T, E>(
318        &self,
319        e: impl QuotedWithContext<'a, E, Self>,
320    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
321    where
322        E: FuturesStream<Item = T> + Unpin,
323        Self: Sized + NoTick,
324    {
325        let e = e.splice_untyped_ctx(self);
326
327        Stream::new(
328            self.clone(),
329            HydroNode::Source {
330                source: HydroSource::Stream(e.into()),
331                metadata: self.new_node_metadata(Stream::<
332                    T,
333                    Self,
334                    Unbounded,
335                    TotalOrder,
336                    ExactlyOnce,
337                >::collection_kind()),
338            },
339        )
340    }
341
342    /// Creates a bounded stream from an iterator.
343    ///
344    /// The iterator is evaluated once at runtime, and all elements are emitted
345    /// in order. This is useful for creating streams from static data or
346    /// for testing.
347    ///
348    /// # Example
349    /// ```rust
350    /// # #[cfg(feature = "deploy")] {
351    /// # use hydro_lang::prelude::*;
352    /// # use futures::StreamExt;
353    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
354    /// process.source_iter(q!(vec![1, 2, 3, 4]))
355    /// # }, |mut stream| async move {
356    /// // 1, 2, 3, 4
357    /// # for w in vec![1, 2, 3, 4] {
358    /// #     assert_eq!(stream.next().await.unwrap(), w);
359    /// # }
360    /// # }));
361    /// # }
362    /// ```
363    fn source_iter<T, E>(
364        &self,
365        e: impl QuotedWithContext<'a, E, Self>,
366    ) -> Stream<T, Self, Bounded, TotalOrder, ExactlyOnce>
367    where
368        E: IntoIterator<Item = T>,
369        Self: Sized + NoTick,
370    {
371        let e = e.splice_typed_ctx(self);
372
373        Stream::new(
374            self.clone(),
375            HydroNode::Source {
376                source: HydroSource::Iter(e.into()),
377                metadata: self.new_node_metadata(
378                    Stream::<T, Self, Bounded, TotalOrder, ExactlyOnce>::collection_kind(),
379                ),
380            },
381        )
382    }
383
384    /// Creates a stream of membership events for a cluster.
385    ///
386    /// This stream emits [`MembershipEvent::Joined`] when a cluster member joins
387    /// and [`MembershipEvent::Left`] when a cluster member leaves. The stream is
388    /// keyed by the [`MemberId`] of the cluster member.
389    ///
390    /// This is useful for implementing protocols that need to track cluster membership,
391    /// such as broadcasting to all members or detecting failures.
392    ///
393    /// # Example
394    /// ```rust
395    /// # #[cfg(feature = "deploy")] {
396    /// # use hydro_lang::prelude::*;
397    /// # use futures::StreamExt;
398    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
399    /// let p1 = flow.process::<()>();
400    /// let workers: Cluster<()> = flow.cluster::<()>();
401    /// # // do nothing on each worker
402    /// # workers.source_iter(q!(vec![])).for_each(q!(|_: ()| {}));
403    /// let cluster_members = p1.source_cluster_members(&workers);
404    /// # cluster_members.entries().send(&p2, TCP.fail_stop().bincode())
405    /// // if there are 4 members in the cluster, we would see a join event for each
406    /// // { MemberId::<Worker>(0): [MembershipEvent::Join], MemberId::<Worker>(2): [MembershipEvent::Join], ... }
407    /// # }, |mut stream| async move {
408    /// # let mut results = Vec::new();
409    /// # for w in 0..4 {
410    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
411    /// # }
412    /// # results.sort();
413    /// # assert_eq!(results, vec!["(MemberId::<()>(0), Joined)", "(MemberId::<()>(1), Joined)", "(MemberId::<()>(2), Joined)", "(MemberId::<()>(3), Joined)"]);
414    /// # }));
415    /// # }
416    /// ```
417    fn source_cluster_members<C: 'a>(
418        &self,
419        cluster: &Cluster<'a, C>,
420    ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
421    where
422        Self: Sized + NoTick,
423    {
424        Stream::new(
425            self.clone(),
426            HydroNode::Source {
427                source: HydroSource::ClusterMembers(cluster.id()),
428                metadata: self.new_node_metadata(Stream::<
429                    (TaglessMemberId, MembershipEvent),
430                    Self,
431                    Unbounded,
432                    TotalOrder,
433                    ExactlyOnce,
434                >::collection_kind()),
435            },
436        )
437        .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
438        .into_keyed()
439    }
440
441    /// Creates a one-way connection from an external process to receive raw bytes.
442    ///
443    /// Returns a port handle for the external process to connect to, and a stream
444    /// of received byte buffers.
445    ///
446    /// For bidirectional communication or typed data, see [`Location::bind_single_client`]
447    /// or [`Location::source_external_bincode`].
448    fn source_external_bytes<L>(
449        &self,
450        from: &External<L>,
451    ) -> (
452        ExternalBytesPort,
453        Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>,
454    )
455    where
456        Self: Sized + NoTick,
457    {
458        let (port, stream, sink) =
459            self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
460
461        sink.complete(self.source_iter(q!([])));
462
463        (port, stream)
464    }
465
466    /// Creates a one-way connection from an external process to receive bincode-serialized data.
467    ///
468    /// Returns a sink handle for the external process to send data to, and a stream
469    /// of received values.
470    ///
471    /// For bidirectional communication, see [`Location::bind_single_client_bincode`].
472    #[expect(clippy::type_complexity, reason = "stream markers")]
473    fn source_external_bincode<L, T, O: Ordering, R: Retries>(
474        &self,
475        from: &External<L>,
476    ) -> (
477        ExternalBincodeSink<T, NotMany, O, R>,
478        Stream<T, Self, Unbounded, O, R>,
479    )
480    where
481        Self: Sized + NoTick,
482        T: Serialize + DeserializeOwned,
483    {
484        let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
485        sink.complete(self.source_iter(q!([])));
486
487        (
488            ExternalBincodeSink {
489                process_key: from.key,
490                port_id: port.port_id,
491                _phantom: PhantomData,
492            },
493            stream.weaken_ordering().weaken_retries(),
494        )
495    }
496
497    /// Sets up a simulated input port on this location for testing.
498    ///
499    /// Returns a handle to send messages to the location as well as a stream
500    /// of received messages. This is only available when the `sim` feature is enabled.
501    #[cfg(feature = "sim")]
502    #[expect(clippy::type_complexity, reason = "stream markers")]
503    fn sim_input<T, O: Ordering, R: Retries>(
504        &self,
505    ) -> (SimSender<T, O, R>, Stream<T, Self, Unbounded, O, R>)
506    where
507        Self: Sized + NoTick,
508        T: Serialize + DeserializeOwned,
509    {
510        let external_location: External<'a, ()> = External {
511            key: LocationKey::FIRST,
512            flow_state: self.flow_state().clone(),
513            _phantom: PhantomData,
514        };
515
516        let (external, stream) = self.source_external_bincode(&external_location);
517
518        (SimSender(external.port_id, PhantomData), stream)
519    }
520
521    /// Creates an external input stream for embedded deployment mode.
522    ///
523    /// The `name` parameter specifies the name of the generated function parameter
524    /// that will supply data to this stream at runtime. The generated function will
525    /// accept an `impl Stream<Item = T> + Unpin` argument with this name.
526    fn embedded_input<T>(
527        &self,
528        name: impl Into<String>,
529    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
530    where
531        Self: Sized + NoTick,
532    {
533        let ident = syn::Ident::new(&name.into(), Span::call_site());
534
535        Stream::new(
536            self.clone(),
537            HydroNode::Source {
538                source: HydroSource::Embedded(ident),
539                metadata: self.new_node_metadata(Stream::<
540                    T,
541                    Self,
542                    Unbounded,
543                    TotalOrder,
544                    ExactlyOnce,
545                >::collection_kind()),
546            },
547        )
548    }
549
550    /// Establishes a server on this location to receive a bidirectional connection from a single
551    /// client, identified by the given `External` handle. Returns a port handle for the external
552    /// process to connect to, a stream of incoming messages, and a handle to send outgoing
553    /// messages.
554    ///
555    /// # Example
556    /// ```rust
557    /// # #[cfg(feature = "deploy")] {
558    /// # use hydro_lang::prelude::*;
559    /// # use hydro_deploy::Deployment;
560    /// # use futures::{SinkExt, StreamExt};
561    /// # tokio_test::block_on(async {
562    /// # use bytes::Bytes;
563    /// # use hydro_lang::location::NetworkHint;
564    /// # use tokio_util::codec::LengthDelimitedCodec;
565    /// # let mut flow = FlowBuilder::new();
566    /// let node = flow.process::<()>();
567    /// let external = flow.external::<()>();
568    /// let (port, incoming, outgoing) =
569    ///     node.bind_single_client::<_, Bytes, LengthDelimitedCodec>(&external, NetworkHint::Auto);
570    /// outgoing.complete(incoming.map(q!(|data /* : Bytes */| {
571    ///     let mut resp: Vec<u8> = data.into();
572    ///     resp.push(42);
573    ///     resp.into() // : Bytes
574    /// })));
575    ///
576    /// # let mut deployment = Deployment::new();
577    /// let nodes = flow // ... with_process and with_external
578    /// #     .with_process(&node, deployment.Localhost())
579    /// #     .with_external(&external, deployment.Localhost())
580    /// #     .deploy(&mut deployment);
581    ///
582    /// deployment.deploy().await.unwrap();
583    /// deployment.start().await.unwrap();
584    ///
585    /// let (mut external_out, mut external_in) = nodes.connect(port).await;
586    /// external_in.send(vec![1, 2, 3].into()).await.unwrap();
587    /// assert_eq!(
588    ///     external_out.next().await.unwrap().unwrap(),
589    ///     vec![1, 2, 3, 42]
590    /// );
591    /// # });
592    /// # }
593    /// ```
594    #[expect(clippy::type_complexity, reason = "stream markers")]
595    fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
596        &self,
597        from: &External<L>,
598        port_hint: NetworkHint,
599    ) -> (
600        ExternalBytesPort<NotMany>,
601        Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
602        ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>,
603    )
604    where
605        Self: Sized + NoTick,
606    {
607        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
608
609        let (fwd_ref, to_sink) =
610            self.forward_ref::<Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>();
611        let mut flow_state_borrow = self.flow_state().borrow_mut();
612
613        flow_state_borrow.push_root(HydroRoot::SendExternal {
614            to_external_key: from.key,
615            to_port_id: next_external_port_id,
616            to_many: false,
617            unpaired: false,
618            serialize_fn: None,
619            instantiate_fn: DebugInstantiate::Building,
620            input: Box::new(to_sink.ir_node.into_inner()),
621            op_metadata: HydroIrOpMetadata::new(),
622        });
623
624        let raw_stream: Stream<
625            Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
626            Self,
627            Unbounded,
628            TotalOrder,
629            ExactlyOnce,
630        > = Stream::new(
631            self.clone(),
632            HydroNode::ExternalInput {
633                from_external_key: from.key,
634                from_port_id: next_external_port_id,
635                from_many: false,
636                codec_type: quote_type::<Codec>().into(),
637                port_hint,
638                instantiate_fn: DebugInstantiate::Building,
639                deserialize_fn: None,
640                metadata: self.new_node_metadata(Stream::<
641                    Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
642                    Self,
643                    Unbounded,
644                    TotalOrder,
645                    ExactlyOnce,
646                >::collection_kind()),
647            },
648        );
649
650        (
651            ExternalBytesPort {
652                process_key: from.key,
653                port_id: next_external_port_id,
654                _phantom: PhantomData,
655            },
656            raw_stream.flatten_ordered(),
657            fwd_ref,
658        )
659    }
660
661    /// Establishes a bidirectional connection from a single external client using bincode serialization.
662    ///
663    /// Returns a port handle for the external process to connect to, a stream of incoming messages,
664    /// and a handle to send outgoing messages. This is a convenience wrapper around
665    /// [`Location::bind_single_client`] that uses bincode for serialization.
666    ///
667    /// # Type Parameters
668    /// - `InT`: The type of incoming messages (must implement [`DeserializeOwned`])
669    /// - `OutT`: The type of outgoing messages (must implement [`Serialize`])
670    #[expect(clippy::type_complexity, reason = "stream markers")]
671    fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
672        &self,
673        from: &External<L>,
674    ) -> (
675        ExternalBincodeBidi<InT, OutT, NotMany>,
676        Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
677        ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>,
678    )
679    where
680        Self: Sized + NoTick,
681    {
682        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
683
684        let (fwd_ref, to_sink) =
685            self.forward_ref::<Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>();
686        let mut flow_state_borrow = self.flow_state().borrow_mut();
687
688        let root = get_this_crate();
689
690        let out_t_type = quote_type::<OutT>();
691        let ser_fn: syn::Expr = syn::parse_quote! {
692            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
693                |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
694            )
695        };
696
697        flow_state_borrow.push_root(HydroRoot::SendExternal {
698            to_external_key: from.key,
699            to_port_id: next_external_port_id,
700            to_many: false,
701            unpaired: false,
702            serialize_fn: Some(ser_fn.into()),
703            instantiate_fn: DebugInstantiate::Building,
704            input: Box::new(to_sink.ir_node.into_inner()),
705            op_metadata: HydroIrOpMetadata::new(),
706        });
707
708        let in_t_type = quote_type::<InT>();
709
710        let deser_fn: syn::Expr = syn::parse_quote! {
711            |res| {
712                let b = res.unwrap();
713                #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
714            }
715        };
716
717        let raw_stream: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
718            self.clone(),
719            HydroNode::ExternalInput {
720                from_external_key: from.key,
721                from_port_id: next_external_port_id,
722                from_many: false,
723                codec_type: quote_type::<LengthDelimitedCodec>().into(),
724                port_hint: NetworkHint::Auto,
725                instantiate_fn: DebugInstantiate::Building,
726                deserialize_fn: Some(deser_fn.into()),
727                metadata: self.new_node_metadata(Stream::<
728                    InT,
729                    Self,
730                    Unbounded,
731                    TotalOrder,
732                    ExactlyOnce,
733                >::collection_kind()),
734            },
735        );
736
737        (
738            ExternalBincodeBidi {
739                process_key: from.key,
740                port_id: next_external_port_id,
741                _phantom: PhantomData,
742            },
743            raw_stream,
744            fwd_ref,
745        )
746    }
747
748    /// Establishes a server on this location to receive bidirectional connections from multiple
749    /// external clients using raw bytes.
750    ///
751    /// Unlike [`Location::bind_single_client`], this method supports multiple concurrent client
752    /// connections. Each client is assigned a unique `u64` identifier.
753    ///
754    /// Returns:
755    /// - A port handle for external processes to connect to
756    /// - A keyed stream of incoming messages, keyed by client ID
757    /// - A keyed stream of membership events (client joins/leaves), keyed by client ID
758    /// - A handle to send outgoing messages, keyed by client ID
759    #[expect(clippy::type_complexity, reason = "stream markers")]
760    fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
761        &self,
762        from: &External<L>,
763        port_hint: NetworkHint,
764    ) -> (
765        ExternalBytesPort<Many>,
766        KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
767        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
768        ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
769    )
770    where
771        Self: Sized + NoTick,
772    {
773        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
774
775        let (fwd_ref, to_sink) =
776            self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
777        let mut flow_state_borrow = self.flow_state().borrow_mut();
778
779        flow_state_borrow.push_root(HydroRoot::SendExternal {
780            to_external_key: from.key,
781            to_port_id: next_external_port_id,
782            to_many: true,
783            unpaired: false,
784            serialize_fn: None,
785            instantiate_fn: DebugInstantiate::Building,
786            input: Box::new(to_sink.entries().ir_node.into_inner()),
787            op_metadata: HydroIrOpMetadata::new(),
788        });
789
790        let raw_stream: Stream<
791            Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
792            Self,
793            Unbounded,
794            TotalOrder,
795            ExactlyOnce,
796        > = Stream::new(
797            self.clone(),
798            HydroNode::ExternalInput {
799                from_external_key: from.key,
800                from_port_id: next_external_port_id,
801                from_many: true,
802                codec_type: quote_type::<Codec>().into(),
803                port_hint,
804                instantiate_fn: DebugInstantiate::Building,
805                deserialize_fn: None,
806                metadata: self.new_node_metadata(Stream::<
807                    Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
808                    Self,
809                    Unbounded,
810                    TotalOrder,
811                    ExactlyOnce,
812                >::collection_kind()),
813            },
814        );
815
816        let membership_stream_ident = syn::Ident::new(
817            &format!(
818                "__hydro_deploy_many_{}_{}_membership",
819                from.key, next_external_port_id
820            ),
821            Span::call_site(),
822        );
823        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
824        let raw_membership_stream: KeyedStream<
825            u64,
826            bool,
827            Self,
828            Unbounded,
829            TotalOrder,
830            ExactlyOnce,
831        > = KeyedStream::new(
832            self.clone(),
833            HydroNode::Source {
834                source: HydroSource::Stream(membership_stream_expr.into()),
835                metadata: self.new_node_metadata(KeyedStream::<
836                    u64,
837                    bool,
838                    Self,
839                    Unbounded,
840                    TotalOrder,
841                    ExactlyOnce,
842                >::collection_kind()),
843            },
844        );
845
846        (
847            ExternalBytesPort {
848                process_key: from.key,
849                port_id: next_external_port_id,
850                _phantom: PhantomData,
851            },
852            raw_stream
853                .flatten_ordered() // TODO(shadaj): this silently drops framing errors, decide on right defaults
854                .into_keyed(),
855            raw_membership_stream.map(q!(|join| {
856                if join {
857                    MembershipEvent::Joined
858                } else {
859                    MembershipEvent::Left
860                }
861            })),
862            fwd_ref,
863        )
864    }
865
866    /// Establishes a server on this location to receive bidirectional connections from multiple
867    /// external clients using bincode serialization.
868    ///
869    /// Unlike [`Location::bind_single_client_bincode`], this method supports multiple concurrent
870    /// client connections. Each client is assigned a unique `u64` identifier.
871    ///
872    /// Returns:
873    /// - A port handle for external processes to connect to
874    /// - A keyed stream of incoming messages, keyed by client ID
875    /// - A keyed stream of membership events (client joins/leaves), keyed by client ID
876    /// - A handle to send outgoing messages, keyed by client ID
877    ///
878    /// # Type Parameters
879    /// - `InT`: The type of incoming messages (must implement [`DeserializeOwned`])
880    /// - `OutT`: The type of outgoing messages (must implement [`Serialize`])
881    #[expect(clippy::type_complexity, reason = "stream markers")]
882    fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
883        &self,
884        from: &External<L>,
885    ) -> (
886        ExternalBincodeBidi<InT, OutT, Many>,
887        KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
888        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
889        ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
890    )
891    where
892        Self: Sized + NoTick,
893    {
894        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
895
896        let (fwd_ref, to_sink) =
897            self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
898        let mut flow_state_borrow = self.flow_state().borrow_mut();
899
900        let root = get_this_crate();
901
902        let out_t_type = quote_type::<OutT>();
903        let ser_fn: syn::Expr = syn::parse_quote! {
904            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
905                |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
906            )
907        };
908
909        flow_state_borrow.push_root(HydroRoot::SendExternal {
910            to_external_key: from.key,
911            to_port_id: next_external_port_id,
912            to_many: true,
913            unpaired: false,
914            serialize_fn: Some(ser_fn.into()),
915            instantiate_fn: DebugInstantiate::Building,
916            input: Box::new(to_sink.entries().ir_node.into_inner()),
917            op_metadata: HydroIrOpMetadata::new(),
918        });
919
920        let in_t_type = quote_type::<InT>();
921
922        let deser_fn: syn::Expr = syn::parse_quote! {
923            |res| {
924                let (id, b) = res.unwrap();
925                (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
926            }
927        };
928
929        let raw_stream: KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce> =
930            KeyedStream::new(
931                self.clone(),
932                HydroNode::ExternalInput {
933                    from_external_key: from.key,
934                    from_port_id: next_external_port_id,
935                    from_many: true,
936                    codec_type: quote_type::<LengthDelimitedCodec>().into(),
937                    port_hint: NetworkHint::Auto,
938                    instantiate_fn: DebugInstantiate::Building,
939                    deserialize_fn: Some(deser_fn.into()),
940                    metadata: self.new_node_metadata(KeyedStream::<
941                        u64,
942                        InT,
943                        Self,
944                        Unbounded,
945                        TotalOrder,
946                        ExactlyOnce,
947                    >::collection_kind()),
948                },
949            );
950
951        let membership_stream_ident = syn::Ident::new(
952            &format!(
953                "__hydro_deploy_many_{}_{}_membership",
954                from.key, next_external_port_id
955            ),
956            Span::call_site(),
957        );
958        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
959        let raw_membership_stream: KeyedStream<
960            u64,
961            bool,
962            Self,
963            Unbounded,
964            TotalOrder,
965            ExactlyOnce,
966        > = KeyedStream::new(
967            self.clone(),
968            HydroNode::Source {
969                source: HydroSource::Stream(membership_stream_expr.into()),
970                metadata: self.new_node_metadata(KeyedStream::<
971                    u64,
972                    bool,
973                    Self,
974                    Unbounded,
975                    TotalOrder,
976                    ExactlyOnce,
977                >::collection_kind()),
978            },
979        );
980
981        (
982            ExternalBincodeBidi {
983                process_key: from.key,
984                port_id: next_external_port_id,
985                _phantom: PhantomData,
986            },
987            raw_stream,
988            raw_membership_stream.map(q!(|join| {
989                if join {
990                    MembershipEvent::Joined
991                } else {
992                    MembershipEvent::Left
993                }
994            })),
995            fwd_ref,
996        )
997    }
998
999    /// Constructs a [`Singleton`] materialized at this location with the given static value.
1000    ///
1001    /// # Example
1002    /// ```rust
1003    /// # #[cfg(feature = "deploy")] {
1004    /// # use hydro_lang::prelude::*;
1005    /// # use futures::StreamExt;
1006    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1007    /// let tick = process.tick();
1008    /// let singleton = tick.singleton(q!(5));
1009    /// # singleton.all_ticks()
1010    /// # }, |mut stream| async move {
1011    /// // 5
1012    /// # assert_eq!(stream.next().await.unwrap(), 5);
1013    /// # }));
1014    /// # }
1015    /// ```
1016    fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Bounded>
1017    where
1018        T: Clone,
1019        Self: Sized,
1020    {
1021        let e = e.splice_untyped_ctx(self);
1022
1023        Singleton::new(
1024            self.clone(),
1025            HydroNode::SingletonSource {
1026                value: e.into(),
1027                metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
1028            },
1029        )
1030    }
1031
1032    /// Generates a stream with values emitted at a fixed interval, with
1033    /// each value being the current time (as an [`tokio::time::Instant`]).
1034    ///
1035    /// The clock source used is monotonic, so elements will be emitted in
1036    /// increasing order.
1037    ///
1038    /// # Non-Determinism
1039    /// Because this stream is generated by an OS timer, it will be
1040    /// non-deterministic because each timestamp will be arbitrary.
1041    fn source_interval(
1042        &self,
1043        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1044        _nondet: NonDet,
1045    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1046    where
1047        Self: Sized + NoTick,
1048    {
1049        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1050            tokio::time::interval(interval)
1051        )))
1052    }
1053
1054    /// Generates a stream with values emitted at a fixed interval (with an
1055    /// initial delay), with each value being the current time
1056    /// (as an [`tokio::time::Instant`]).
1057    ///
1058    /// The clock source used is monotonic, so elements will be emitted in
1059    /// increasing order.
1060    ///
1061    /// # Non-Determinism
1062    /// Because this stream is generated by an OS timer, it will be
1063    /// non-deterministic because each timestamp will be arbitrary.
1064    fn source_interval_delayed(
1065        &self,
1066        delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1067        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1068        _nondet: NonDet,
1069    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1070    where
1071        Self: Sized + NoTick,
1072    {
1073        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1074            tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
1075        )))
1076    }
1077
1078    /// Creates a forward reference for defining recursive or mutually-dependent dataflows.
1079    ///
1080    /// Returns a handle that must be completed with the actual stream, and a placeholder
1081    /// stream that can be used in the dataflow graph before the actual stream is defined.
1082    ///
1083    /// This is useful for implementing feedback loops or recursive computations where
1084    /// a stream depends on its own output.
1085    ///
1086    /// # Example
1087    /// ```rust
1088    /// # #[cfg(feature = "deploy")] {
1089    /// # use hydro_lang::prelude::*;
1090    /// # use hydro_lang::live_collections::stream::NoOrder;
1091    /// # use futures::StreamExt;
1092    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1093    /// // Create a forward reference for the feedback stream
1094    /// let (complete, feedback) = process.forward_ref::<Stream<i32, _, _, NoOrder>>();
1095    ///
1096    /// // Combine initial input with feedback, then increment
1097    /// let input: Stream<_, _, Unbounded> = process.source_iter(q!([1])).into();
1098    /// let output: Stream<_, _, _, NoOrder> = input.interleave(feedback).map(q!(|x| x + 1));
1099    ///
1100    /// // Complete the forward reference with the output
1101    /// complete.complete(output.clone());
1102    /// output
1103    /// # }, |mut stream| async move {
1104    /// // 2, 3, 4, 5, ...
1105    /// # assert_eq!(stream.next().await.unwrap(), 2);
1106    /// # assert_eq!(stream.next().await.unwrap(), 3);
1107    /// # assert_eq!(stream.next().await.unwrap(), 4);
1108    /// # }));
1109    /// # }
1110    /// ```
1111    fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
1112    where
1113        S: CycleCollection<'a, ForwardRef, Location = Self>,
1114    {
1115        let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
1116        (
1117            ForwardHandle::new(cycle_id, Location::id(self)),
1118            S::create_source(cycle_id, self.clone()),
1119        )
1120    }
1121}
1122
1123#[cfg(feature = "deploy")]
1124#[cfg(test)]
1125mod tests {
1126    use std::collections::HashSet;
1127
1128    use futures::{SinkExt, StreamExt};
1129    use hydro_deploy::Deployment;
1130    use stageleft::q;
1131    use tokio_util::codec::LengthDelimitedCodec;
1132
1133    use crate::compile::builder::FlowBuilder;
1134    use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
1135    use crate::location::{Location, NetworkHint};
1136    use crate::nondet::nondet;
1137
1138    #[tokio::test]
1139    async fn top_level_singleton_replay_cardinality() {
1140        let mut deployment = Deployment::new();
1141
1142        let mut flow = FlowBuilder::new();
1143        let node = flow.process::<()>();
1144        let external = flow.external::<()>();
1145
1146        let (in_port, input) =
1147            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1148        let singleton = node.singleton(q!(123));
1149        let tick = node.tick();
1150        let out = input
1151            .batch(&tick, nondet!(/** test */))
1152            .cross_singleton(singleton.clone().snapshot(&tick, nondet!(/** test */)))
1153            .cross_singleton(
1154                singleton
1155                    .snapshot(&tick, nondet!(/** test */))
1156                    .into_stream()
1157                    .count(),
1158            )
1159            .all_ticks()
1160            .send_bincode_external(&external);
1161
1162        let nodes = flow
1163            .with_process(&node, deployment.Localhost())
1164            .with_external(&external, deployment.Localhost())
1165            .deploy(&mut deployment);
1166
1167        deployment.deploy().await.unwrap();
1168
1169        let mut external_in = nodes.connect(in_port).await;
1170        let mut external_out = nodes.connect(out).await;
1171
1172        deployment.start().await.unwrap();
1173
1174        external_in.send(1).await.unwrap();
1175        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1176
1177        external_in.send(2).await.unwrap();
1178        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1179    }
1180
1181    #[tokio::test]
1182    async fn tick_singleton_replay_cardinality() {
1183        let mut deployment = Deployment::new();
1184
1185        let mut flow = FlowBuilder::new();
1186        let node = flow.process::<()>();
1187        let external = flow.external::<()>();
1188
1189        let (in_port, input) =
1190            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1191        let tick = node.tick();
1192        let singleton = tick.singleton(q!(123));
1193        let out = input
1194            .batch(&tick, nondet!(/** test */))
1195            .cross_singleton(singleton.clone())
1196            .cross_singleton(singleton.into_stream().count())
1197            .all_ticks()
1198            .send_bincode_external(&external);
1199
1200        let nodes = flow
1201            .with_process(&node, deployment.Localhost())
1202            .with_external(&external, deployment.Localhost())
1203            .deploy(&mut deployment);
1204
1205        deployment.deploy().await.unwrap();
1206
1207        let mut external_in = nodes.connect(in_port).await;
1208        let mut external_out = nodes.connect(out).await;
1209
1210        deployment.start().await.unwrap();
1211
1212        external_in.send(1).await.unwrap();
1213        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1214
1215        external_in.send(2).await.unwrap();
1216        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1217    }
1218
1219    #[tokio::test]
1220    async fn external_bytes() {
1221        let mut deployment = Deployment::new();
1222
1223        let mut flow = FlowBuilder::new();
1224        let first_node = flow.process::<()>();
1225        let external = flow.external::<()>();
1226
1227        let (in_port, input) = first_node.source_external_bytes(&external);
1228        let out = input.send_bincode_external(&external);
1229
1230        let nodes = flow
1231            .with_process(&first_node, deployment.Localhost())
1232            .with_external(&external, deployment.Localhost())
1233            .deploy(&mut deployment);
1234
1235        deployment.deploy().await.unwrap();
1236
1237        let mut external_in = nodes.connect(in_port).await.1;
1238        let mut external_out = nodes.connect(out).await;
1239
1240        deployment.start().await.unwrap();
1241
1242        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1243
1244        assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
1245    }
1246
1247    #[tokio::test]
1248    async fn multi_external_source() {
1249        let mut deployment = Deployment::new();
1250
1251        let mut flow = FlowBuilder::new();
1252        let first_node = flow.process::<()>();
1253        let external = flow.external::<()>();
1254
1255        let (in_port, input, _membership, complete_sink) =
1256            first_node.bidi_external_many_bincode(&external);
1257        let out = input.entries().send_bincode_external(&external);
1258        complete_sink.complete(
1259            first_node
1260                .source_iter::<(u64, ()), _>(q!([]))
1261                .into_keyed()
1262                .weaken_ordering(),
1263        );
1264
1265        let nodes = flow
1266            .with_process(&first_node, deployment.Localhost())
1267            .with_external(&external, deployment.Localhost())
1268            .deploy(&mut deployment);
1269
1270        deployment.deploy().await.unwrap();
1271
1272        let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1273        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1274        let external_out = nodes.connect(out).await;
1275
1276        deployment.start().await.unwrap();
1277
1278        external_in_1.send(123).await.unwrap();
1279        external_in_2.send(456).await.unwrap();
1280
1281        assert_eq!(
1282            external_out.take(2).collect::<HashSet<_>>().await,
1283            vec![(0, 123), (1, 456)].into_iter().collect()
1284        );
1285    }
1286
1287    #[tokio::test]
1288    async fn second_connection_only_multi_source() {
1289        let mut deployment = Deployment::new();
1290
1291        let mut flow = FlowBuilder::new();
1292        let first_node = flow.process::<()>();
1293        let external = flow.external::<()>();
1294
1295        let (in_port, input, _membership, complete_sink) =
1296            first_node.bidi_external_many_bincode(&external);
1297        let out = input.entries().send_bincode_external(&external);
1298        complete_sink.complete(
1299            first_node
1300                .source_iter::<(u64, ()), _>(q!([]))
1301                .into_keyed()
1302                .weaken_ordering(),
1303        );
1304
1305        let nodes = flow
1306            .with_process(&first_node, deployment.Localhost())
1307            .with_external(&external, deployment.Localhost())
1308            .deploy(&mut deployment);
1309
1310        deployment.deploy().await.unwrap();
1311
1312        // intentionally skipped to test stream waking logic
1313        let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1314        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1315        let mut external_out = nodes.connect(out).await;
1316
1317        deployment.start().await.unwrap();
1318
1319        external_in_2.send(456).await.unwrap();
1320
1321        assert_eq!(external_out.next().await.unwrap(), (1, 456));
1322    }
1323
1324    #[tokio::test]
1325    async fn multi_external_bytes() {
1326        let mut deployment = Deployment::new();
1327
1328        let mut flow = FlowBuilder::new();
1329        let first_node = flow.process::<()>();
1330        let external = flow.external::<()>();
1331
1332        let (in_port, input, _membership, complete_sink) = first_node
1333            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1334        let out = input.entries().send_bincode_external(&external);
1335        complete_sink.complete(
1336            first_node
1337                .source_iter(q!([]))
1338                .into_keyed()
1339                .weaken_ordering(),
1340        );
1341
1342        let nodes = flow
1343            .with_process(&first_node, deployment.Localhost())
1344            .with_external(&external, deployment.Localhost())
1345            .deploy(&mut deployment);
1346
1347        deployment.deploy().await.unwrap();
1348
1349        let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1350        let mut external_in_2 = nodes.connect(in_port).await.1;
1351        let external_out = nodes.connect(out).await;
1352
1353        deployment.start().await.unwrap();
1354
1355        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1356        external_in_2.send(vec![4, 5].into()).await.unwrap();
1357
1358        assert_eq!(
1359            external_out.take(2).collect::<HashSet<_>>().await,
1360            vec![
1361                (0, (&[1u8, 2, 3] as &[u8]).into()),
1362                (1, (&[4u8, 5] as &[u8]).into())
1363            ]
1364            .into_iter()
1365            .collect()
1366        );
1367    }
1368
1369    #[tokio::test]
1370    async fn single_client_external_bytes() {
1371        let mut deployment = Deployment::new();
1372        let mut flow = FlowBuilder::new();
1373        let first_node = flow.process::<()>();
1374        let external = flow.external::<()>();
1375        let (port, input, complete_sink) = first_node
1376            .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1377        complete_sink.complete(input.map(q!(|data| {
1378            let mut resp: Vec<u8> = data.into();
1379            resp.push(42);
1380            resp.into() // : Bytes
1381        })));
1382
1383        let nodes = flow
1384            .with_process(&first_node, deployment.Localhost())
1385            .with_external(&external, deployment.Localhost())
1386            .deploy(&mut deployment);
1387
1388        deployment.deploy().await.unwrap();
1389        deployment.start().await.unwrap();
1390
1391        let (mut external_out, mut external_in) = nodes.connect(port).await;
1392
1393        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1394        assert_eq!(
1395            external_out.next().await.unwrap().unwrap(),
1396            vec![1, 2, 3, 42]
1397        );
1398    }
1399
1400    #[tokio::test]
1401    async fn echo_external_bytes() {
1402        let mut deployment = Deployment::new();
1403
1404        let mut flow = FlowBuilder::new();
1405        let first_node = flow.process::<()>();
1406        let external = flow.external::<()>();
1407
1408        let (port, input, _membership, complete_sink) = first_node
1409            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1410        complete_sink
1411            .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1412
1413        let nodes = flow
1414            .with_process(&first_node, deployment.Localhost())
1415            .with_external(&external, deployment.Localhost())
1416            .deploy(&mut deployment);
1417
1418        deployment.deploy().await.unwrap();
1419
1420        let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1421        let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1422
1423        deployment.start().await.unwrap();
1424
1425        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1426        external_in_2.send(vec![4, 5].into()).await.unwrap();
1427
1428        assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1429        assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1430    }
1431
1432    #[tokio::test]
1433    async fn echo_external_bincode() {
1434        let mut deployment = Deployment::new();
1435
1436        let mut flow = FlowBuilder::new();
1437        let first_node = flow.process::<()>();
1438        let external = flow.external::<()>();
1439
1440        let (port, input, _membership, complete_sink) =
1441            first_node.bidi_external_many_bincode(&external);
1442        complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1443
1444        let nodes = flow
1445            .with_process(&first_node, deployment.Localhost())
1446            .with_external(&external, deployment.Localhost())
1447            .deploy(&mut deployment);
1448
1449        deployment.deploy().await.unwrap();
1450
1451        let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1452        let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1453
1454        deployment.start().await.unwrap();
1455
1456        external_in_1.send("hi".to_owned()).await.unwrap();
1457        external_in_2.send("hello".to_owned()).await.unwrap();
1458
1459        assert_eq!(external_out_1.next().await.unwrap(), "HI");
1460        assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1461    }
1462}