Skip to main content

hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::KeyedStream;
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::CycleId;
19use crate::compile::ir::{
20    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26#[cfg(stageleft_runtime)]
27use crate::location::dynamic::{DynLocation, LocationId};
28use crate::location::tick::{Atomic, DeferTick, NoAtomic};
29use crate::location::{Location, NoTick, Tick, check_matching_location};
30use crate::nondet::{NonDet, nondet};
31use crate::prelude::ManualProof;
32use crate::properties::{AggFuncAlgebra, ValidCommutativityFor, ValidIdempotenceFor};
33
34pub mod networking;
35
36/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
37#[sealed::sealed]
38pub trait Ordering:
39    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
40{
41    /// The [`StreamOrder`] corresponding to this type.
42    const ORDERING_KIND: StreamOrder;
43}
44
45/// Marks the stream as being totally ordered, which means that there are
46/// no sources of non-determinism (other than intentional ones) that will
47/// affect the order of elements.
48pub enum TotalOrder {}
49
50#[sealed::sealed]
51impl Ordering for TotalOrder {
52    const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
53}
54
55/// Marks the stream as having no order, which means that the order of
56/// elements may be affected by non-determinism.
57///
58/// This restricts certain operators, such as `fold` and `reduce`, to only
59/// be used with commutative aggregation functions.
60pub enum NoOrder {}
61
62#[sealed::sealed]
63impl Ordering for NoOrder {
64    const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
65}
66
67/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
68/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
69/// have `Self` guarantees instead.
70#[sealed::sealed]
71pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
72#[sealed::sealed]
73impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
74
75/// Helper trait for determining the weakest of two orderings.
76#[sealed::sealed]
77pub trait MinOrder<Other: ?Sized> {
78    /// The weaker of the two orderings.
79    type Min: Ordering;
80}
81
82#[sealed::sealed]
83impl<O: Ordering> MinOrder<O> for TotalOrder {
84    type Min = O;
85}
86
87#[sealed::sealed]
88impl<O: Ordering> MinOrder<O> for NoOrder {
89    type Min = NoOrder;
90}
91
92/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
93#[sealed::sealed]
94pub trait Retries:
95    MinRetries<Self, Min = Self>
96    + MinRetries<ExactlyOnce, Min = Self>
97    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
98{
99    /// The [`StreamRetry`] corresponding to this type.
100    const RETRIES_KIND: StreamRetry;
101}
102
103/// Marks the stream as having deterministic message cardinality, with no
104/// possibility of duplicates.
105pub enum ExactlyOnce {}
106
107#[sealed::sealed]
108impl Retries for ExactlyOnce {
109    const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
110}
111
112/// Marks the stream as having non-deterministic message cardinality, which
113/// means that duplicates may occur, but messages will not be dropped.
114pub enum AtLeastOnce {}
115
116#[sealed::sealed]
117impl Retries for AtLeastOnce {
118    const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
119}
120
121/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
122/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
123/// have `Self` guarantees instead.
124#[sealed::sealed]
125pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
126#[sealed::sealed]
127impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
128
129/// Helper trait for determining the weakest of two retry guarantees.
130#[sealed::sealed]
131pub trait MinRetries<Other: ?Sized> {
132    /// The weaker of the two retry guarantees.
133    type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
134}
135
136#[sealed::sealed]
137impl<R: Retries> MinRetries<R> for ExactlyOnce {
138    type Min = R;
139}
140
141#[sealed::sealed]
142impl<R: Retries> MinRetries<R> for AtLeastOnce {
143    type Min = AtLeastOnce;
144}
145
146#[sealed::sealed]
147#[diagnostic::on_unimplemented(
148    message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
149    label = "required here",
150    note = "To intentionally process the stream with a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
151)]
152/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
153pub trait IsOrdered: Ordering {}
154
155#[sealed::sealed]
156#[diagnostic::do_not_recommend]
157impl IsOrdered for TotalOrder {}
158
159#[sealed::sealed]
160#[diagnostic::on_unimplemented(
161    message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
162    label = "required here",
163    note = "To intentionally process the stream with non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
164)]
165/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
166pub trait IsExactlyOnce: Retries {}
167
168#[sealed::sealed]
169#[diagnostic::do_not_recommend]
170impl IsExactlyOnce for ExactlyOnce {}
171
172/// Streaming sequence of elements with type `Type`.
173///
174/// This live collection represents a growing sequence of elements, with new elements being
175/// asynchronously appended to the end of the sequence. This can be used to model the arrival
176/// of network input, such as API requests, or streaming ingestion.
177///
178/// By default, all streams have deterministic ordering and each element is materialized exactly
179/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
180/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
181/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
182///
183/// Type Parameters:
184/// - `Type`: the type of elements in the stream
185/// - `Loc`: the location where the stream is being materialized
186/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
187/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
188///   (default is [`TotalOrder`])
189/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
190///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
191pub struct Stream<
192    Type,
193    Loc,
194    Bound: Boundedness = Unbounded,
195    Order: Ordering = TotalOrder,
196    Retry: Retries = ExactlyOnce,
197> {
198    pub(crate) location: Loc,
199    pub(crate) ir_node: RefCell<HydroNode>,
200
201    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
202}
203
204impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
205    for Stream<T, L, Unbounded, O, R>
206where
207    L: Location<'a>,
208{
209    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
210        let new_meta = stream
211            .location
212            .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
213
214        Stream {
215            location: stream.location,
216            ir_node: RefCell::new(HydroNode::Cast {
217                inner: Box::new(stream.ir_node.into_inner()),
218                metadata: new_meta,
219            }),
220            _phantom: PhantomData,
221        }
222    }
223}
224
225impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
226    for Stream<T, L, B, NoOrder, R>
227where
228    L: Location<'a>,
229{
230    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
231        stream.weaken_ordering()
232    }
233}
234
235impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
236    for Stream<T, L, B, O, AtLeastOnce>
237where
238    L: Location<'a>,
239{
240    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
241        stream.weaken_retries()
242    }
243}
244
245impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
246where
247    L: Location<'a>,
248{
249    fn defer_tick(self) -> Self {
250        Stream::defer_tick(self)
251    }
252}
253
254impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
255    for Stream<T, Tick<L>, Bounded, O, R>
256where
257    L: Location<'a>,
258{
259    type Location = Tick<L>;
260
261    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
262        Stream::new(
263            location.clone(),
264            HydroNode::CycleSource {
265                cycle_id,
266                metadata: location.new_node_metadata(Self::collection_kind()),
267            },
268        )
269    }
270}
271
272impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
273    for Stream<T, Tick<L>, Bounded, O, R>
274where
275    L: Location<'a>,
276{
277    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
278        assert_eq!(
279            Location::id(&self.location),
280            expected_location,
281            "locations do not match"
282        );
283        self.location
284            .flow_state()
285            .borrow_mut()
286            .push_root(HydroRoot::CycleSink {
287                cycle_id,
288                input: Box::new(self.ir_node.into_inner()),
289                op_metadata: HydroIrOpMetadata::new(),
290            });
291    }
292}
293
294impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
295    for Stream<T, L, B, O, R>
296where
297    L: Location<'a> + NoTick,
298{
299    type Location = L;
300
301    fn create_source(cycle_id: CycleId, location: L) -> Self {
302        Stream::new(
303            location.clone(),
304            HydroNode::CycleSource {
305                cycle_id,
306                metadata: location.new_node_metadata(Self::collection_kind()),
307            },
308        )
309    }
310}
311
312impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
313    for Stream<T, L, B, O, R>
314where
315    L: Location<'a> + NoTick,
316{
317    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
318        assert_eq!(
319            Location::id(&self.location),
320            expected_location,
321            "locations do not match"
322        );
323        self.location
324            .flow_state()
325            .borrow_mut()
326            .push_root(HydroRoot::CycleSink {
327                cycle_id,
328                input: Box::new(self.ir_node.into_inner()),
329                op_metadata: HydroIrOpMetadata::new(),
330            });
331    }
332}
333
334impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
335where
336    T: Clone,
337    L: Location<'a>,
338{
339    fn clone(&self) -> Self {
340        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
341            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
342            *self.ir_node.borrow_mut() = HydroNode::Tee {
343                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
344                metadata: self.location.new_node_metadata(Self::collection_kind()),
345            };
346        }
347
348        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
349            Stream {
350                location: self.location.clone(),
351                ir_node: HydroNode::Tee {
352                    inner: TeeNode(inner.0.clone()),
353                    metadata: metadata.clone(),
354                }
355                .into(),
356                _phantom: PhantomData,
357            }
358        } else {
359            unreachable!()
360        }
361    }
362}
363
364impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
365where
366    L: Location<'a>,
367{
368    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
369        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
370        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
371
372        Stream {
373            location,
374            ir_node: RefCell::new(ir_node),
375            _phantom: PhantomData,
376        }
377    }
378
379    /// Returns the [`Location`] where this stream is being materialized.
380    pub fn location(&self) -> &L {
381        &self.location
382    }
383
384    pub(crate) fn collection_kind() -> CollectionKind {
385        CollectionKind::Stream {
386            bound: B::BOUND_KIND,
387            order: O::ORDERING_KIND,
388            retry: R::RETRIES_KIND,
389            element_type: quote_type::<T>().into(),
390        }
391    }
392
393    /// Produces a stream based on invoking `f` on each element.
394    /// If you do not want to modify the stream and instead only want to view
395    /// each item use [`Stream::inspect`] instead.
396    ///
397    /// # Example
398    /// ```rust
399    /// # #[cfg(feature = "deploy")] {
400    /// # use hydro_lang::prelude::*;
401    /// # use futures::StreamExt;
402    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
403    /// let words = process.source_iter(q!(vec!["hello", "world"]));
404    /// words.map(q!(|x| x.to_uppercase()))
405    /// # }, |mut stream| async move {
406    /// # for w in vec!["HELLO", "WORLD"] {
407    /// #     assert_eq!(stream.next().await.unwrap(), w);
408    /// # }
409    /// # }));
410    /// # }
411    /// ```
412    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
413    where
414        F: Fn(T) -> U + 'a,
415    {
416        let f = f.splice_fn1_ctx(&self.location).into();
417        Stream::new(
418            self.location.clone(),
419            HydroNode::Map {
420                f,
421                input: Box::new(self.ir_node.into_inner()),
422                metadata: self
423                    .location
424                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
425            },
426        )
427    }
428
429    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
430    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
431    /// for the output type `U` must produce items in a **deterministic** order.
432    ///
433    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
434    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
435    ///
436    /// # Example
437    /// ```rust
438    /// # #[cfg(feature = "deploy")] {
439    /// # use hydro_lang::prelude::*;
440    /// # use futures::StreamExt;
441    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
442    /// process
443    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
444    ///     .flat_map_ordered(q!(|x| x))
445    /// # }, |mut stream| async move {
446    /// // 1, 2, 3, 4
447    /// # for w in (1..5) {
448    /// #     assert_eq!(stream.next().await.unwrap(), w);
449    /// # }
450    /// # }));
451    /// # }
452    /// ```
453    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
454    where
455        I: IntoIterator<Item = U>,
456        F: Fn(T) -> I + 'a,
457    {
458        let f = f.splice_fn1_ctx(&self.location).into();
459        Stream::new(
460            self.location.clone(),
461            HydroNode::FlatMap {
462                f,
463                input: Box::new(self.ir_node.into_inner()),
464                metadata: self
465                    .location
466                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
467            },
468        )
469    }
470
471    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
472    /// for the output type `U` to produce items in any order.
473    ///
474    /// # Example
475    /// ```rust
476    /// # #[cfg(feature = "deploy")] {
477    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
478    /// # use futures::StreamExt;
479    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
480    /// process
481    ///     .source_iter(q!(vec![
482    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
483    ///         std::collections::HashSet::from_iter(vec![3, 4]),
484    ///     ]))
485    ///     .flat_map_unordered(q!(|x| x))
486    /// # }, |mut stream| async move {
487    /// // 1, 2, 3, 4, but in no particular order
488    /// # let mut results = Vec::new();
489    /// # for w in (1..5) {
490    /// #     results.push(stream.next().await.unwrap());
491    /// # }
492    /// # results.sort();
493    /// # assert_eq!(results, vec![1, 2, 3, 4]);
494    /// # }));
495    /// # }
496    /// ```
497    pub fn flat_map_unordered<U, I, F>(
498        self,
499        f: impl IntoQuotedMut<'a, F, L>,
500    ) -> Stream<U, L, B, NoOrder, R>
501    where
502        I: IntoIterator<Item = U>,
503        F: Fn(T) -> I + 'a,
504    {
505        let f = f.splice_fn1_ctx(&self.location).into();
506        Stream::new(
507            self.location.clone(),
508            HydroNode::FlatMap {
509                f,
510                input: Box::new(self.ir_node.into_inner()),
511                metadata: self
512                    .location
513                    .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
514            },
515        )
516    }
517
518    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
519    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
520    ///
521    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
522    /// not deterministic, use [`Stream::flatten_unordered`] instead.
523    ///
524    /// ```rust
525    /// # #[cfg(feature = "deploy")] {
526    /// # use hydro_lang::prelude::*;
527    /// # use futures::StreamExt;
528    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
529    /// process
530    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
531    ///     .flatten_ordered()
532    /// # }, |mut stream| async move {
533    /// // 1, 2, 3, 4
534    /// # for w in (1..5) {
535    /// #     assert_eq!(stream.next().await.unwrap(), w);
536    /// # }
537    /// # }));
538    /// # }
539    /// ```
540    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
541    where
542        T: IntoIterator<Item = U>,
543    {
544        self.flat_map_ordered(q!(|d| d))
545    }
546
547    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
548    /// for the element type `T` to produce items in any order.
549    ///
550    /// # Example
551    /// ```rust
552    /// # #[cfg(feature = "deploy")] {
553    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
554    /// # use futures::StreamExt;
555    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
556    /// process
557    ///     .source_iter(q!(vec![
558    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
559    ///         std::collections::HashSet::from_iter(vec![3, 4]),
560    ///     ]))
561    ///     .flatten_unordered()
562    /// # }, |mut stream| async move {
563    /// // 1, 2, 3, 4, but in no particular order
564    /// # let mut results = Vec::new();
565    /// # for w in (1..5) {
566    /// #     results.push(stream.next().await.unwrap());
567    /// # }
568    /// # results.sort();
569    /// # assert_eq!(results, vec![1, 2, 3, 4]);
570    /// # }));
571    /// # }
572    /// ```
573    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
574    where
575        T: IntoIterator<Item = U>,
576    {
577        self.flat_map_unordered(q!(|d| d))
578    }
579
580    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
581    /// `f`, preserving the order of the elements.
582    ///
583    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
584    /// not modify or take ownership of the values. If you need to modify the values while filtering
585    /// use [`Stream::filter_map`] instead.
586    ///
587    /// # Example
588    /// ```rust
589    /// # #[cfg(feature = "deploy")] {
590    /// # use hydro_lang::prelude::*;
591    /// # use futures::StreamExt;
592    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
593    /// process
594    ///     .source_iter(q!(vec![1, 2, 3, 4]))
595    ///     .filter(q!(|&x| x > 2))
596    /// # }, |mut stream| async move {
597    /// // 3, 4
598    /// # for w in (3..5) {
599    /// #     assert_eq!(stream.next().await.unwrap(), w);
600    /// # }
601    /// # }));
602    /// # }
603    /// ```
604    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
605    where
606        F: Fn(&T) -> bool + 'a,
607    {
608        let f = f.splice_fn1_borrow_ctx(&self.location).into();
609        Stream::new(
610            self.location.clone(),
611            HydroNode::Filter {
612                f,
613                input: Box::new(self.ir_node.into_inner()),
614                metadata: self.location.new_node_metadata(Self::collection_kind()),
615            },
616        )
617    }
618
619    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
620    ///
621    /// # Example
622    /// ```rust
623    /// # #[cfg(feature = "deploy")] {
624    /// # use hydro_lang::prelude::*;
625    /// # use futures::StreamExt;
626    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
627    /// process
628    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
629    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
630    /// # }, |mut stream| async move {
631    /// // 1, 2
632    /// # for w in (1..3) {
633    /// #     assert_eq!(stream.next().await.unwrap(), w);
634    /// # }
635    /// # }));
636    /// # }
637    /// ```
638    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
639    where
640        F: Fn(T) -> Option<U> + 'a,
641    {
642        let f = f.splice_fn1_ctx(&self.location).into();
643        Stream::new(
644            self.location.clone(),
645            HydroNode::FilterMap {
646                f,
647                input: Box::new(self.ir_node.into_inner()),
648                metadata: self
649                    .location
650                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
651            },
652        )
653    }
654
655    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
656    /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
657    /// If `other` is an empty [`Optional`], no values will be produced.
658    ///
659    /// # Example
660    /// ```rust
661    /// # #[cfg(feature = "deploy")] {
662    /// # use hydro_lang::prelude::*;
663    /// # use futures::StreamExt;
664    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
665    /// let tick = process.tick();
666    /// let batch = process
667    ///   .source_iter(q!(vec![1, 2, 3, 4]))
668    ///   .batch(&tick, nondet!(/** test */));
669    /// let count = batch.clone().count(); // `count()` returns a singleton
670    /// batch.cross_singleton(count).all_ticks()
671    /// # }, |mut stream| async move {
672    /// // (1, 4), (2, 4), (3, 4), (4, 4)
673    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
674    /// #     assert_eq!(stream.next().await.unwrap(), w);
675    /// # }
676    /// # }));
677    /// # }
678    /// ```
679    pub fn cross_singleton<O2>(
680        self,
681        other: impl Into<Optional<O2, L, Bounded>>,
682    ) -> Stream<(T, O2), L, B, O, R>
683    where
684        O2: Clone,
685    {
686        let other: Optional<O2, L, Bounded> = other.into();
687        check_matching_location(&self.location, &other.location);
688
689        Stream::new(
690            self.location.clone(),
691            HydroNode::CrossSingleton {
692                left: Box::new(self.ir_node.into_inner()),
693                right: Box::new(other.ir_node.into_inner()),
694                metadata: self
695                    .location
696                    .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
697            },
698        )
699    }
700
701    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
702    ///
703    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
704    /// leader of a cluster.
705    ///
706    /// # Example
707    /// ```rust
708    /// # #[cfg(feature = "deploy")] {
709    /// # use hydro_lang::prelude::*;
710    /// # use futures::StreamExt;
711    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
712    /// let tick = process.tick();
713    /// // ticks are lazy by default, forces the second tick to run
714    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
715    ///
716    /// let batch_first_tick = process
717    ///   .source_iter(q!(vec![1, 2, 3, 4]))
718    ///   .batch(&tick, nondet!(/** test */));
719    /// let batch_second_tick = process
720    ///   .source_iter(q!(vec![5, 6, 7, 8]))
721    ///   .batch(&tick, nondet!(/** test */))
722    ///   .defer_tick(); // appears on the second tick
723    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
724    /// batch_first_tick.chain(batch_second_tick)
725    ///   .filter_if_some(some_on_first_tick)
726    ///   .all_ticks()
727    /// # }, |mut stream| async move {
728    /// // [1, 2, 3, 4]
729    /// # for w in vec![1, 2, 3, 4] {
730    /// #     assert_eq!(stream.next().await.unwrap(), w);
731    /// # }
732    /// # }));
733    /// # }
734    /// ```
735    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
736        self.cross_singleton(signal.map(q!(|_u| ())))
737            .map(q!(|(d, _signal)| d))
738    }
739
740    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
741    ///
742    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
743    /// some local state.
744    ///
745    /// # Example
746    /// ```rust
747    /// # #[cfg(feature = "deploy")] {
748    /// # use hydro_lang::prelude::*;
749    /// # use futures::StreamExt;
750    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
751    /// let tick = process.tick();
752    /// // ticks are lazy by default, forces the second tick to run
753    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
754    ///
755    /// let batch_first_tick = process
756    ///   .source_iter(q!(vec![1, 2, 3, 4]))
757    ///   .batch(&tick, nondet!(/** test */));
758    /// let batch_second_tick = process
759    ///   .source_iter(q!(vec![5, 6, 7, 8]))
760    ///   .batch(&tick, nondet!(/** test */))
761    ///   .defer_tick(); // appears on the second tick
762    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
763    /// batch_first_tick.chain(batch_second_tick)
764    ///   .filter_if_none(some_on_first_tick)
765    ///   .all_ticks()
766    /// # }, |mut stream| async move {
767    /// // [5, 6, 7, 8]
768    /// # for w in vec![5, 6, 7, 8] {
769    /// #     assert_eq!(stream.next().await.unwrap(), w);
770    /// # }
771    /// # }));
772    /// # }
773    /// ```
774    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
775        self.filter_if_some(
776            other
777                .map(q!(|_| ()))
778                .into_singleton()
779                .filter(q!(|o| o.is_none())),
780        )
781    }
782
783    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
784    /// tupled pairs in a non-deterministic order.
785    ///
786    /// # Example
787    /// ```rust
788    /// # #[cfg(feature = "deploy")] {
789    /// # use hydro_lang::prelude::*;
790    /// # use std::collections::HashSet;
791    /// # use futures::StreamExt;
792    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
793    /// let tick = process.tick();
794    /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
795    /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
796    /// stream1.cross_product(stream2)
797    /// # }, |mut stream| async move {
798    /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
799    /// # stream.map(|i| assert!(expected.contains(&i)));
800    /// # }));
801    /// # }
802    /// ```
803    pub fn cross_product<T2, O2: Ordering>(
804        self,
805        other: Stream<T2, L, B, O2, R>,
806    ) -> Stream<(T, T2), L, B, NoOrder, R>
807    where
808        T: Clone,
809        T2: Clone,
810    {
811        check_matching_location(&self.location, &other.location);
812
813        Stream::new(
814            self.location.clone(),
815            HydroNode::CrossProduct {
816                left: Box::new(self.ir_node.into_inner()),
817                right: Box::new(other.ir_node.into_inner()),
818                metadata: self
819                    .location
820                    .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
821            },
822        )
823    }
824
825    /// Takes one stream as input and filters out any duplicate occurrences. The output
826    /// contains all unique values from the input.
827    ///
828    /// # Example
829    /// ```rust
830    /// # #[cfg(feature = "deploy")] {
831    /// # use hydro_lang::prelude::*;
832    /// # use futures::StreamExt;
833    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
834    /// let tick = process.tick();
835    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
836    /// # }, |mut stream| async move {
837    /// # for w in vec![1, 2, 3, 4] {
838    /// #     assert_eq!(stream.next().await.unwrap(), w);
839    /// # }
840    /// # }));
841    /// # }
842    /// ```
843    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
844    where
845        T: Eq + Hash,
846    {
847        Stream::new(
848            self.location.clone(),
849            HydroNode::Unique {
850                input: Box::new(self.ir_node.into_inner()),
851                metadata: self
852                    .location
853                    .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
854            },
855        )
856    }
857
858    /// Outputs everything in this stream that is *not* contained in the `other` stream.
859    ///
860    /// The `other` stream must be [`Bounded`], since this function will wait until
861    /// all its elements are available before producing any output.
862    /// # Example
863    /// ```rust
864    /// # #[cfg(feature = "deploy")] {
865    /// # use hydro_lang::prelude::*;
866    /// # use futures::StreamExt;
867    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
868    /// let tick = process.tick();
869    /// let stream = process
870    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
871    ///   .batch(&tick, nondet!(/** test */));
872    /// let batch = process
873    ///   .source_iter(q!(vec![1, 2]))
874    ///   .batch(&tick, nondet!(/** test */));
875    /// stream.filter_not_in(batch).all_ticks()
876    /// # }, |mut stream| async move {
877    /// # for w in vec![3, 4] {
878    /// #     assert_eq!(stream.next().await.unwrap(), w);
879    /// # }
880    /// # }));
881    /// # }
882    /// ```
883    pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
884    where
885        T: Eq + Hash,
886        B2: IsBounded,
887    {
888        check_matching_location(&self.location, &other.location);
889
890        Stream::new(
891            self.location.clone(),
892            HydroNode::Difference {
893                pos: Box::new(self.ir_node.into_inner()),
894                neg: Box::new(other.ir_node.into_inner()),
895                metadata: self
896                    .location
897                    .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
898            },
899        )
900    }
901
902    /// An operator which allows you to "inspect" each element of a stream without
903    /// modifying it. The closure `f` is called on a reference to each item. This is
904    /// mainly useful for debugging, and should not be used to generate side-effects.
905    ///
906    /// # Example
907    /// ```rust
908    /// # #[cfg(feature = "deploy")] {
909    /// # use hydro_lang::prelude::*;
910    /// # use futures::StreamExt;
911    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
912    /// let nums = process.source_iter(q!(vec![1, 2]));
913    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
914    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
915    /// # }, |mut stream| async move {
916    /// # for w in vec![1, 2] {
917    /// #     assert_eq!(stream.next().await.unwrap(), w);
918    /// # }
919    /// # }));
920    /// # }
921    /// ```
922    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
923    where
924        F: Fn(&T) + 'a,
925    {
926        let f = f.splice_fn1_borrow_ctx(&self.location).into();
927
928        Stream::new(
929            self.location.clone(),
930            HydroNode::Inspect {
931                f,
932                input: Box::new(self.ir_node.into_inner()),
933                metadata: self.location.new_node_metadata(Self::collection_kind()),
934            },
935        )
936    }
937
938    /// Executes the provided closure for every element in this stream.
939    ///
940    /// Because the closure may have side effects, the stream must have deterministic order
941    /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
942    /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
943    /// [`Stream::assume_retries`] with an explanation for why this is the case.
944    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
945    where
946        O: IsOrdered,
947        R: IsExactlyOnce,
948    {
949        let f = f.splice_fn1_ctx(&self.location).into();
950        self.location
951            .flow_state()
952            .borrow_mut()
953            .push_root(HydroRoot::ForEach {
954                input: Box::new(self.ir_node.into_inner()),
955                f,
956                op_metadata: HydroIrOpMetadata::new(),
957            });
958    }
959
960    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
961    /// TCP socket to some other server. You should _not_ use this API for interacting with
962    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
963    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
964    /// interaction with asynchronous sinks.
965    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
966    where
967        O: IsOrdered,
968        R: IsExactlyOnce,
969        S: 'a + futures::Sink<T> + Unpin,
970    {
971        self.location
972            .flow_state()
973            .borrow_mut()
974            .push_root(HydroRoot::DestSink {
975                sink: sink.splice_typed_ctx(&self.location).into(),
976                input: Box::new(self.ir_node.into_inner()),
977                op_metadata: HydroIrOpMetadata::new(),
978            });
979    }
980
981    /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
982    ///
983    /// # Example
984    /// ```rust
985    /// # #[cfg(feature = "deploy")] {
986    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
987    /// # use futures::StreamExt;
988    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
989    /// let tick = process.tick();
990    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
991    /// numbers.enumerate()
992    /// # }, |mut stream| async move {
993    /// // (0, 1), (1, 2), (2, 3), (3, 4)
994    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
995    /// #     assert_eq!(stream.next().await.unwrap(), w);
996    /// # }
997    /// # }));
998    /// # }
999    /// ```
1000    pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1001    where
1002        O: IsOrdered,
1003        R: IsExactlyOnce,
1004    {
1005        Stream::new(
1006            self.location.clone(),
1007            HydroNode::Enumerate {
1008                input: Box::new(self.ir_node.into_inner()),
1009                metadata: self.location.new_node_metadata(Stream::<
1010                    (usize, T),
1011                    L,
1012                    B,
1013                    TotalOrder,
1014                    ExactlyOnce,
1015                >::collection_kind()),
1016            },
1017        )
1018    }
1019
1020    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1021    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1022    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1023    ///
1024    /// Depending on the input stream guarantees, the closure may need to be commutative
1025    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1026    ///
1027    /// # Example
1028    /// ```rust
1029    /// # #[cfg(feature = "deploy")] {
1030    /// # use hydro_lang::prelude::*;
1031    /// # use futures::StreamExt;
1032    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1033    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1034    /// words
1035    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1036    ///     .into_stream()
1037    /// # }, |mut stream| async move {
1038    /// // "HELLOWORLD"
1039    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1040    /// # }));
1041    /// # }
1042    /// ```
1043    pub fn fold<A, I, F, C, Idemp>(
1044        self,
1045        init: impl IntoQuotedMut<'a, I, L>,
1046        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1047    ) -> Singleton<A, L, B>
1048    where
1049        I: Fn() -> A + 'a,
1050        F: Fn(&mut A, T),
1051        C: ValidCommutativityFor<O>,
1052        Idemp: ValidIdempotenceFor<R>,
1053    {
1054        let init = init.splice_fn0_ctx(&self.location).into();
1055        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1056        proof.register_proof(&comb);
1057
1058        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1059        let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1060
1061        let core = HydroNode::Fold {
1062            init,
1063            acc: comb.into(),
1064            input: Box::new(ordered_etc.ir_node.into_inner()),
1065            metadata: ordered_etc
1066                .location
1067                .new_node_metadata(Singleton::<A, L, B>::collection_kind()),
1068        };
1069
1070        Singleton::new(ordered_etc.location, core)
1071    }
1072
1073    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1074    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1075    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1076    /// reference, so that it can be modified in place.
1077    ///
1078    /// Depending on the input stream guarantees, the closure may need to be commutative
1079    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1080    ///
1081    /// # Example
1082    /// ```rust
1083    /// # #[cfg(feature = "deploy")] {
1084    /// # use hydro_lang::prelude::*;
1085    /// # use futures::StreamExt;
1086    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1087    /// let bools = process.source_iter(q!(vec![false, true, false]));
1088    /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1089    /// # }, |mut stream| async move {
1090    /// // true
1091    /// # assert_eq!(stream.next().await.unwrap(), true);
1092    /// # }));
1093    /// # }
1094    /// ```
1095    pub fn reduce<F, C, Idemp>(
1096        self,
1097        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1098    ) -> Optional<T, L, B>
1099    where
1100        F: Fn(&mut T, T) + 'a,
1101        C: ValidCommutativityFor<O>,
1102        Idemp: ValidIdempotenceFor<R>,
1103    {
1104        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1105        proof.register_proof(&f);
1106
1107        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1108        let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1109
1110        let core = HydroNode::Reduce {
1111            f: f.into(),
1112            input: Box::new(ordered_etc.ir_node.into_inner()),
1113            metadata: ordered_etc
1114                .location
1115                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1116        };
1117
1118        Optional::new(ordered_etc.location, core)
1119    }
1120
1121    /// Computes the maximum element in the stream as an [`Optional`], which
1122    /// will be empty until the first element in the input arrives.
1123    ///
1124    /// # Example
1125    /// ```rust
1126    /// # #[cfg(feature = "deploy")] {
1127    /// # use hydro_lang::prelude::*;
1128    /// # use futures::StreamExt;
1129    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1130    /// let tick = process.tick();
1131    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1132    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1133    /// batch.max().all_ticks()
1134    /// # }, |mut stream| async move {
1135    /// // 4
1136    /// # assert_eq!(stream.next().await.unwrap(), 4);
1137    /// # }));
1138    /// # }
1139    /// ```
1140    pub fn max(self) -> Optional<T, L, B>
1141    where
1142        T: Ord,
1143    {
1144        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1145            .assume_ordering_trusted_bounded::<TotalOrder>(
1146                nondet!(/** max is commutative, but order affects intermediates */),
1147            )
1148            .reduce(q!(|curr, new| {
1149                if new > *curr {
1150                    *curr = new;
1151                }
1152            }))
1153    }
1154
1155    /// Computes the minimum element in the stream as an [`Optional`], which
1156    /// will be empty until the first element in the input arrives.
1157    ///
1158    /// # Example
1159    /// ```rust
1160    /// # #[cfg(feature = "deploy")] {
1161    /// # use hydro_lang::prelude::*;
1162    /// # use futures::StreamExt;
1163    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1164    /// let tick = process.tick();
1165    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1166    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1167    /// batch.min().all_ticks()
1168    /// # }, |mut stream| async move {
1169    /// // 1
1170    /// # assert_eq!(stream.next().await.unwrap(), 1);
1171    /// # }));
1172    /// # }
1173    /// ```
1174    pub fn min(self) -> Optional<T, L, B>
1175    where
1176        T: Ord,
1177    {
1178        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1179            .assume_ordering_trusted_bounded::<TotalOrder>(
1180                nondet!(/** max is commutative, but order affects intermediates */),
1181            )
1182            .reduce(q!(|curr, new| {
1183                if new < *curr {
1184                    *curr = new;
1185                }
1186            }))
1187    }
1188
1189    /// Computes the first element in the stream as an [`Optional`], which
1190    /// will be empty until the first element in the input arrives.
1191    ///
1192    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1193    /// re-ordering of elements may cause the first element to change.
1194    ///
1195    /// # Example
1196    /// ```rust
1197    /// # #[cfg(feature = "deploy")] {
1198    /// # use hydro_lang::prelude::*;
1199    /// # use futures::StreamExt;
1200    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1201    /// let tick = process.tick();
1202    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1203    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1204    /// batch.first().all_ticks()
1205    /// # }, |mut stream| async move {
1206    /// // 1
1207    /// # assert_eq!(stream.next().await.unwrap(), 1);
1208    /// # }));
1209    /// # }
1210    /// ```
1211    pub fn first(self) -> Optional<T, L, B>
1212    where
1213        O: IsOrdered,
1214    {
1215        self.make_totally_ordered()
1216            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1217            .reduce(q!(|_, _| {}))
1218    }
1219
1220    /// Computes the last element in the stream as an [`Optional`], which
1221    /// will be empty until an element in the input arrives.
1222    ///
1223    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1224    /// re-ordering of elements may cause the last element to change.
1225    ///
1226    /// # Example
1227    /// ```rust
1228    /// # #[cfg(feature = "deploy")] {
1229    /// # use hydro_lang::prelude::*;
1230    /// # use futures::StreamExt;
1231    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1232    /// let tick = process.tick();
1233    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1234    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1235    /// batch.last().all_ticks()
1236    /// # }, |mut stream| async move {
1237    /// // 4
1238    /// # assert_eq!(stream.next().await.unwrap(), 4);
1239    /// # }));
1240    /// # }
1241    /// ```
1242    pub fn last(self) -> Optional<T, L, B>
1243    where
1244        O: IsOrdered,
1245    {
1246        self.make_totally_ordered()
1247            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1248            .reduce(q!(|curr, new| *curr = new))
1249    }
1250
1251    /// Collects all the elements of this stream into a single [`Vec`] element.
1252    ///
1253    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1254    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1255    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1256    /// the vector at an arbitrary point in time.
1257    ///
1258    /// # Example
1259    /// ```rust
1260    /// # #[cfg(feature = "deploy")] {
1261    /// # use hydro_lang::prelude::*;
1262    /// # use futures::StreamExt;
1263    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1264    /// let tick = process.tick();
1265    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1266    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1267    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1268    /// # }, |mut stream| async move {
1269    /// // [ vec![1, 2, 3, 4] ]
1270    /// # for w in vec![vec![1, 2, 3, 4]] {
1271    /// #     assert_eq!(stream.next().await.unwrap(), w);
1272    /// # }
1273    /// # }));
1274    /// # }
1275    /// ```
1276    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1277    where
1278        O: IsOrdered,
1279        R: IsExactlyOnce,
1280    {
1281        self.make_totally_ordered().make_exactly_once().fold(
1282            q!(|| vec![]),
1283            q!(|acc, v| {
1284                acc.push(v);
1285            }),
1286        )
1287    }
1288
1289    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1290    /// and emitting each intermediate result.
1291    ///
1292    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1293    /// containing all intermediate accumulated values. The scan operation can also terminate early
1294    /// by returning `None`.
1295    ///
1296    /// The function takes a mutable reference to the accumulator and the current element, and returns
1297    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1298    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1299    ///
1300    /// # Examples
1301    ///
1302    /// Basic usage - running sum:
1303    /// ```rust
1304    /// # #[cfg(feature = "deploy")] {
1305    /// # use hydro_lang::prelude::*;
1306    /// # use futures::StreamExt;
1307    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1308    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1309    ///     q!(|| 0),
1310    ///     q!(|acc, x| {
1311    ///         *acc += x;
1312    ///         Some(*acc)
1313    ///     }),
1314    /// )
1315    /// # }, |mut stream| async move {
1316    /// // Output: 1, 3, 6, 10
1317    /// # for w in vec![1, 3, 6, 10] {
1318    /// #     assert_eq!(stream.next().await.unwrap(), w);
1319    /// # }
1320    /// # }));
1321    /// # }
1322    /// ```
1323    ///
1324    /// Early termination example:
1325    /// ```rust
1326    /// # #[cfg(feature = "deploy")] {
1327    /// # use hydro_lang::prelude::*;
1328    /// # use futures::StreamExt;
1329    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1330    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1331    ///     q!(|| 1),
1332    ///     q!(|state, x| {
1333    ///         *state = *state * x;
1334    ///         if *state > 6 {
1335    ///             None // Terminate the stream
1336    ///         } else {
1337    ///             Some(-*state)
1338    ///         }
1339    ///     }),
1340    /// )
1341    /// # }, |mut stream| async move {
1342    /// // Output: -1, -2, -6
1343    /// # for w in vec![-1, -2, -6] {
1344    /// #     assert_eq!(stream.next().await.unwrap(), w);
1345    /// # }
1346    /// # }));
1347    /// # }
1348    /// ```
1349    pub fn scan<A, U, I, F>(
1350        self,
1351        init: impl IntoQuotedMut<'a, I, L>,
1352        f: impl IntoQuotedMut<'a, F, L>,
1353    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1354    where
1355        O: IsOrdered,
1356        R: IsExactlyOnce,
1357        I: Fn() -> A + 'a,
1358        F: Fn(&mut A, T) -> Option<U> + 'a,
1359    {
1360        let init = init.splice_fn0_ctx(&self.location).into();
1361        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1362
1363        Stream::new(
1364            self.location.clone(),
1365            HydroNode::Scan {
1366                init,
1367                acc: f,
1368                input: Box::new(self.ir_node.into_inner()),
1369                metadata: self.location.new_node_metadata(
1370                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1371                ),
1372            },
1373        )
1374    }
1375
1376    /// Given a time interval, returns a stream corresponding to samples taken from the
1377    /// stream roughly at that interval. The output will have elements in the same order
1378    /// as the input, but with arbitrary elements skipped between samples. There is also
1379    /// no guarantee on the exact timing of the samples.
1380    ///
1381    /// # Non-Determinism
1382    /// The output stream is non-deterministic in which elements are sampled, since this
1383    /// is controlled by a clock.
1384    pub fn sample_every(
1385        self,
1386        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1387        nondet: NonDet,
1388    ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
1389    where
1390        L: NoTick + NoAtomic,
1391    {
1392        let samples = self.location.source_interval(interval, nondet);
1393
1394        let tick = self.location.tick();
1395        self.batch(&tick, nondet)
1396            .filter_if_some(samples.batch(&tick, nondet).first())
1397            .all_ticks()
1398            .weaken_retries()
1399    }
1400
1401    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
1402    /// stream has not emitted a value since that duration.
1403    ///
1404    /// # Non-Determinism
1405    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1406    /// samples take place, timeouts may be non-deterministically generated or missed,
1407    /// and the notification of the timeout may be delayed as well. There is also no
1408    /// guarantee on how long the [`Optional`] will have a value after the timeout is
1409    /// detected based on when the next sample is taken.
1410    pub fn timeout(
1411        self,
1412        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
1413        nondet: NonDet,
1414    ) -> Optional<(), L, Unbounded>
1415    where
1416        L: NoTick + NoAtomic,
1417    {
1418        let tick = self.location.tick();
1419
1420        let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
1421            q!(|| None),
1422            q!(
1423                |latest, _| {
1424                    *latest = Some(Instant::now());
1425                },
1426                commutative = ManualProof(/* TODO */)
1427            ),
1428        );
1429
1430        latest_received
1431            .snapshot(&tick, nondet)
1432            .filter_map(q!(move |latest_received| {
1433                if let Some(latest_received) = latest_received {
1434                    if Instant::now().duration_since(latest_received) > duration {
1435                        Some(())
1436                    } else {
1437                        None
1438                    }
1439                } else {
1440                    Some(())
1441                }
1442            }))
1443            .latest()
1444    }
1445
1446    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
1447    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
1448    ///
1449    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1450    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1451    /// argument that declares where the stream will be atomically processed. Batching a stream into
1452    /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
1453    /// [`Tick`] will introduce asynchrony.
1454    pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
1455        let out_location = Atomic { tick: tick.clone() };
1456        Stream::new(
1457            out_location.clone(),
1458            HydroNode::BeginAtomic {
1459                inner: Box::new(self.ir_node.into_inner()),
1460                metadata: out_location
1461                    .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
1462            },
1463        )
1464    }
1465
1466    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1467    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1468    /// the order of the input. The output stream will execute in the [`Tick`] that was
1469    /// used to create the atomic section.
1470    ///
1471    /// # Non-Determinism
1472    /// The batch boundaries are non-deterministic and may change across executions.
1473    pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
1474        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1475        Stream::new(
1476            tick.clone(),
1477            HydroNode::Batch {
1478                inner: Box::new(self.ir_node.into_inner()),
1479                metadata: tick
1480                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
1481            },
1482        )
1483    }
1484
1485    /// An operator which allows you to "name" a `HydroNode`.
1486    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1487    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
1488        {
1489            let mut node = self.ir_node.borrow_mut();
1490            let metadata = node.metadata_mut();
1491            metadata.tag = Some(name.to_owned());
1492        }
1493        self
1494    }
1495
1496    /// Explicitly "casts" the stream to a type with a different ordering
1497    /// guarantee. Useful in unsafe code where the ordering cannot be proven
1498    /// by the type-system.
1499    ///
1500    /// # Non-Determinism
1501    /// This function is used as an escape hatch, and any mistakes in the
1502    /// provided ordering guarantee will propagate into the guarantees
1503    /// for the rest of the program.
1504    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
1505        if O::ORDERING_KIND == O2::ORDERING_KIND {
1506            Stream::new(self.location, self.ir_node.into_inner())
1507        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1508            // We can always weaken the ordering guarantee
1509            Stream::new(
1510                self.location.clone(),
1511                HydroNode::Cast {
1512                    inner: Box::new(self.ir_node.into_inner()),
1513                    metadata: self
1514                        .location
1515                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1516                },
1517            )
1518        } else {
1519            Stream::new(
1520                self.location.clone(),
1521                HydroNode::ObserveNonDet {
1522                    inner: Box::new(self.ir_node.into_inner()),
1523                    trusted: false,
1524                    metadata: self
1525                        .location
1526                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1527                },
1528            )
1529        }
1530    }
1531
1532    // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
1533    // intermediate states will not be revealed
1534    fn assume_ordering_trusted_bounded<O2: Ordering>(
1535        self,
1536        nondet: NonDet,
1537    ) -> Stream<T, L, B, O2, R> {
1538        if B::BOUNDED {
1539            self.assume_ordering_trusted(nondet)
1540        } else {
1541            self.assume_ordering(nondet)
1542        }
1543    }
1544
1545    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1546    // is not observable
1547    pub(crate) fn assume_ordering_trusted<O2: Ordering>(
1548        self,
1549        _nondet: NonDet,
1550    ) -> Stream<T, L, B, O2, R> {
1551        if O::ORDERING_KIND == O2::ORDERING_KIND {
1552            Stream::new(self.location, self.ir_node.into_inner())
1553        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1554            // We can always weaken the ordering guarantee
1555            Stream::new(
1556                self.location.clone(),
1557                HydroNode::Cast {
1558                    inner: Box::new(self.ir_node.into_inner()),
1559                    metadata: self
1560                        .location
1561                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1562                },
1563            )
1564        } else {
1565            Stream::new(
1566                self.location.clone(),
1567                HydroNode::ObserveNonDet {
1568                    inner: Box::new(self.ir_node.into_inner()),
1569                    trusted: true,
1570                    metadata: self
1571                        .location
1572                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1573                },
1574            )
1575        }
1576    }
1577
1578    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
1579    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
1580    /// which is always safe because that is the weakest possible guarantee.
1581    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
1582        self.weaken_ordering::<NoOrder>()
1583    }
1584
1585    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
1586    /// enforcing that `O2` is weaker than the input ordering guarantee.
1587    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
1588        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
1589        self.assume_ordering::<O2>(nondet)
1590    }
1591
1592    /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
1593    /// implies that `O == TotalOrder`.
1594    pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
1595    where
1596        O: IsOrdered,
1597    {
1598        self.assume_ordering(nondet!(/** no-op */))
1599    }
1600
1601    /// Explicitly "casts" the stream to a type with a different retries
1602    /// guarantee. Useful in unsafe code where the lack of retries cannot
1603    /// be proven by the type-system.
1604    ///
1605    /// # Non-Determinism
1606    /// This function is used as an escape hatch, and any mistakes in the
1607    /// provided retries guarantee will propagate into the guarantees
1608    /// for the rest of the program.
1609    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1610        if R::RETRIES_KIND == R2::RETRIES_KIND {
1611            Stream::new(self.location, self.ir_node.into_inner())
1612        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1613            // We can always weaken the retries guarantee
1614            Stream::new(
1615                self.location.clone(),
1616                HydroNode::Cast {
1617                    inner: Box::new(self.ir_node.into_inner()),
1618                    metadata: self
1619                        .location
1620                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1621                },
1622            )
1623        } else {
1624            Stream::new(
1625                self.location.clone(),
1626                HydroNode::ObserveNonDet {
1627                    inner: Box::new(self.ir_node.into_inner()),
1628                    trusted: false,
1629                    metadata: self
1630                        .location
1631                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1632                },
1633            )
1634        }
1635    }
1636
1637    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1638    // is not observable
1639    fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1640        if R::RETRIES_KIND == R2::RETRIES_KIND {
1641            Stream::new(self.location, self.ir_node.into_inner())
1642        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1643            // We can always weaken the retries guarantee
1644            Stream::new(
1645                self.location.clone(),
1646                HydroNode::Cast {
1647                    inner: Box::new(self.ir_node.into_inner()),
1648                    metadata: self
1649                        .location
1650                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1651                },
1652            )
1653        } else {
1654            Stream::new(
1655                self.location.clone(),
1656                HydroNode::ObserveNonDet {
1657                    inner: Box::new(self.ir_node.into_inner()),
1658                    trusted: true,
1659                    metadata: self
1660                        .location
1661                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1662                },
1663            )
1664        }
1665    }
1666
1667    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
1668    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
1669    /// which is always safe because that is the weakest possible guarantee.
1670    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
1671        self.weaken_retries::<AtLeastOnce>()
1672    }
1673
1674    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
1675    /// enforcing that `R2` is weaker than the input retries guarantee.
1676    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
1677        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1678        self.assume_retries::<R2>(nondet)
1679    }
1680
1681    /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
1682    /// implies that `R == ExactlyOnce`.
1683    pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
1684    where
1685        R: IsExactlyOnce,
1686    {
1687        self.assume_retries(nondet!(/** no-op */))
1688    }
1689
1690    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1691    /// implies that `B == Bounded`.
1692    pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
1693    where
1694        B: IsBounded,
1695    {
1696        Stream::new(self.location, self.ir_node.into_inner())
1697    }
1698}
1699
1700impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
1701where
1702    L: Location<'a>,
1703{
1704    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
1705    ///
1706    /// # Example
1707    /// ```rust
1708    /// # #[cfg(feature = "deploy")] {
1709    /// # use hydro_lang::prelude::*;
1710    /// # use futures::StreamExt;
1711    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1712    /// process.source_iter(q!(&[1, 2, 3])).cloned()
1713    /// # }, |mut stream| async move {
1714    /// // 1, 2, 3
1715    /// # for w in vec![1, 2, 3] {
1716    /// #     assert_eq!(stream.next().await.unwrap(), w);
1717    /// # }
1718    /// # }));
1719    /// # }
1720    /// ```
1721    pub fn cloned(self) -> Stream<T, L, B, O, R>
1722    where
1723        T: Clone,
1724    {
1725        self.map(q!(|d| d.clone()))
1726    }
1727}
1728
1729impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1730where
1731    L: Location<'a>,
1732{
1733    /// Computes the number of elements in the stream as a [`Singleton`].
1734    ///
1735    /// # Example
1736    /// ```rust
1737    /// # #[cfg(feature = "deploy")] {
1738    /// # use hydro_lang::prelude::*;
1739    /// # use futures::StreamExt;
1740    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1741    /// let tick = process.tick();
1742    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1743    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1744    /// batch.count().all_ticks()
1745    /// # }, |mut stream| async move {
1746    /// // 4
1747    /// # assert_eq!(stream.next().await.unwrap(), 4);
1748    /// # }));
1749    /// # }
1750    /// ```
1751    pub fn count(self) -> Singleton<usize, L, B> {
1752        self.assume_ordering_trusted::<TotalOrder>(nondet!(
1753            /// Order does not affect eventual count, and also does not affect intermediate states.
1754        ))
1755        .fold(q!(|| 0usize), q!(|count, _| *count += 1))
1756    }
1757}
1758
1759impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
1760    /// Produces a new stream that interleaves the elements of the two input streams.
1761    /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1762    ///
1763    /// Currently, both input streams must be [`Unbounded`]. When the streams are
1764    /// [`Bounded`], you can use [`Stream::chain`] instead.
1765    ///
1766    /// # Example
1767    /// ```rust
1768    /// # #[cfg(feature = "deploy")] {
1769    /// # use hydro_lang::prelude::*;
1770    /// # use futures::StreamExt;
1771    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1772    /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
1773    /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
1774    /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1775    /// # }, |mut stream| async move {
1776    /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1777    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1778    /// #     assert_eq!(stream.next().await.unwrap(), w);
1779    /// # }
1780    /// # }));
1781    /// # }
1782    /// ```
1783    pub fn interleave<O2: Ordering, R2: Retries>(
1784        self,
1785        other: Stream<T, L, Unbounded, O2, R2>,
1786    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1787    where
1788        R: MinRetries<R2>,
1789    {
1790        Stream::new(
1791            self.location.clone(),
1792            HydroNode::Chain {
1793                first: Box::new(self.ir_node.into_inner()),
1794                second: Box::new(other.ir_node.into_inner()),
1795                metadata: self.location.new_node_metadata(Stream::<
1796                    T,
1797                    L,
1798                    Unbounded,
1799                    NoOrder,
1800                    <R as MinRetries<R2>>::Min,
1801                >::collection_kind()),
1802            },
1803        )
1804    }
1805}
1806
1807impl<'a, T, L: Location<'a> + NoTick, R: Retries> Stream<T, L, Unbounded, TotalOrder, R> {
1808    /// Produces a new stream that combines the elements of the two input streams,
1809    /// preserving the relative order of elements within each input.
1810    ///
1811    /// Currently, both input streams must be [`Unbounded`]. When the streams are
1812    /// [`Bounded`], you can use [`Stream::chain`] instead.
1813    ///
1814    /// # Non-Determinism
1815    /// The order in which elements *across* the two streams will be interleaved is
1816    /// non-deterministic, so the order of elements will vary across runs. If the output order
1817    /// is irrelevant, use [`Stream::interleave`] instead, which is deterministic but emits an
1818    /// unordered stream.
1819    ///
1820    /// # Example
1821    /// ```rust
1822    /// # #[cfg(feature = "deploy")] {
1823    /// # use hydro_lang::prelude::*;
1824    /// # use futures::StreamExt;
1825    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1826    /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
1827    /// # process.source_iter(q!(vec![1, 3])).into();
1828    /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
1829    /// # }, |mut stream| async move {
1830    /// // 1, 3 and 2, 4 in some order, preserving the original local order
1831    /// # for w in vec![1, 3, 2, 4] {
1832    /// #     assert_eq!(stream.next().await.unwrap(), w);
1833    /// # }
1834    /// # }));
1835    /// # }
1836    /// ```
1837    pub fn merge_ordered<R2: Retries>(
1838        self,
1839        other: Stream<T, L, Unbounded, TotalOrder, R2>,
1840        _nondet: NonDet,
1841    ) -> Stream<T, L, Unbounded, TotalOrder, <R as MinRetries<R2>>::Min>
1842    where
1843        R: MinRetries<R2>,
1844    {
1845        Stream::new(
1846            self.location.clone(),
1847            HydroNode::Chain {
1848                first: Box::new(self.ir_node.into_inner()),
1849                second: Box::new(other.ir_node.into_inner()),
1850                metadata: self.location.new_node_metadata(Stream::<
1851                    T,
1852                    L,
1853                    Unbounded,
1854                    TotalOrder,
1855                    <R as MinRetries<R2>>::Min,
1856                >::collection_kind()),
1857            },
1858        )
1859    }
1860}
1861
1862impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
1863where
1864    L: Location<'a>,
1865{
1866    /// Produces a new stream that emits the input elements in sorted order.
1867    ///
1868    /// The input stream can have any ordering guarantee, but the output stream
1869    /// will have a [`TotalOrder`] guarantee. This operator will block until all
1870    /// elements in the input stream are available, so it requires the input stream
1871    /// to be [`Bounded`].
1872    ///
1873    /// # Example
1874    /// ```rust
1875    /// # #[cfg(feature = "deploy")] {
1876    /// # use hydro_lang::prelude::*;
1877    /// # use futures::StreamExt;
1878    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1879    /// let tick = process.tick();
1880    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1881    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1882    /// batch.sort().all_ticks()
1883    /// # }, |mut stream| async move {
1884    /// // 1, 2, 3, 4
1885    /// # for w in (1..5) {
1886    /// #     assert_eq!(stream.next().await.unwrap(), w);
1887    /// # }
1888    /// # }));
1889    /// # }
1890    /// ```
1891    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1892    where
1893        B: IsBounded,
1894        T: Ord,
1895    {
1896        let this = self.make_bounded();
1897        Stream::new(
1898            this.location.clone(),
1899            HydroNode::Sort {
1900                input: Box::new(this.ir_node.into_inner()),
1901                metadata: this
1902                    .location
1903                    .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
1904            },
1905        )
1906    }
1907
1908    /// Produces a new stream that first emits the elements of the `self` stream,
1909    /// and then emits the elements of the `other` stream. The output stream has
1910    /// a [`TotalOrder`] guarantee if and only if both input streams have a
1911    /// [`TotalOrder`] guarantee.
1912    ///
1913    /// Currently, both input streams must be [`Bounded`]. This operator will block
1914    /// on the first stream until all its elements are available. In a future version,
1915    /// we will relax the requirement on the `other` stream.
1916    ///
1917    /// # Example
1918    /// ```rust
1919    /// # #[cfg(feature = "deploy")] {
1920    /// # use hydro_lang::prelude::*;
1921    /// # use futures::StreamExt;
1922    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1923    /// let tick = process.tick();
1924    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1925    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1926    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1927    /// # }, |mut stream| async move {
1928    /// // 2, 3, 4, 5, 1, 2, 3, 4
1929    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1930    /// #     assert_eq!(stream.next().await.unwrap(), w);
1931    /// # }
1932    /// # }));
1933    /// # }
1934    /// ```
1935    pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
1936        self,
1937        other: Stream<T, L, B2, O2, R2>,
1938    ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1939    where
1940        B: IsBounded,
1941        O: MinOrder<O2>,
1942        R: MinRetries<R2>,
1943    {
1944        check_matching_location(&self.location, &other.location);
1945
1946        Stream::new(
1947            self.location.clone(),
1948            HydroNode::Chain {
1949                first: Box::new(self.ir_node.into_inner()),
1950                second: Box::new(other.ir_node.into_inner()),
1951                metadata: self.location.new_node_metadata(Stream::<
1952                    T,
1953                    L,
1954                    B2,
1955                    <O as MinOrder<O2>>::Min,
1956                    <R as MinRetries<R2>>::Min,
1957                >::collection_kind()),
1958            },
1959        )
1960    }
1961
1962    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
1963    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
1964    /// because this is compiled into a nested loop.
1965    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
1966        self,
1967        other: Stream<T2, L, Bounded, O2, R>,
1968    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
1969    where
1970        B: IsBounded,
1971        T: Clone,
1972        T2: Clone,
1973    {
1974        let this = self.make_bounded();
1975        check_matching_location(&this.location, &other.location);
1976
1977        Stream::new(
1978            this.location.clone(),
1979            HydroNode::CrossProduct {
1980                left: Box::new(this.ir_node.into_inner()),
1981                right: Box::new(other.ir_node.into_inner()),
1982                metadata: this.location.new_node_metadata(Stream::<
1983                    (T, T2),
1984                    L,
1985                    Bounded,
1986                    <O2 as MinOrder<O>>::Min,
1987                    R,
1988                >::collection_kind()),
1989            },
1990        )
1991    }
1992
1993    /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
1994    /// `self` used as the values for *each* key.
1995    ///
1996    /// This is helpful when "broadcasting" a set of values so that all the keys have the same
1997    /// values. For example, it can be used to send the same set of elements to several cluster
1998    /// members, if the membership information is available as a [`KeyedSingleton`].
1999    ///
2000    /// # Example
2001    /// ```rust
2002    /// # #[cfg(feature = "deploy")] {
2003    /// # use hydro_lang::prelude::*;
2004    /// # use futures::StreamExt;
2005    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2006    /// # let tick = process.tick();
2007    /// let keyed_singleton = // { 1: (), 2: () }
2008    /// # process
2009    /// #     .source_iter(q!(vec![(1, ()), (2, ())]))
2010    /// #     .into_keyed()
2011    /// #     .batch(&tick, nondet!(/** test */))
2012    /// #     .first();
2013    /// let stream = // [ "a", "b" ]
2014    /// # process
2015    /// #     .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2016    /// #     .batch(&tick, nondet!(/** test */));
2017    /// stream.repeat_with_keys(keyed_singleton)
2018    /// # .entries().all_ticks()
2019    /// # }, |mut stream| async move {
2020    /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2021    /// # let mut results = Vec::new();
2022    /// # for _ in 0..4 {
2023    /// #     results.push(stream.next().await.unwrap());
2024    /// # }
2025    /// # results.sort();
2026    /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2027    /// # }));
2028    /// # }
2029    /// ```
2030    pub fn repeat_with_keys<K, V2>(
2031        self,
2032        keys: KeyedSingleton<K, V2, L, Bounded>,
2033    ) -> KeyedStream<K, T, L, Bounded, O, R>
2034    where
2035        B: IsBounded,
2036        K: Clone,
2037        T: Clone,
2038    {
2039        keys.keys()
2040            .weaken_retries()
2041            .assume_ordering_trusted::<TotalOrder>(
2042                nondet!(/** keyed stream does not depend on ordering of keys */),
2043            )
2044            .cross_product_nested_loop(self.make_bounded())
2045            .into_keyed()
2046    }
2047}
2048
2049impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2050where
2051    L: Location<'a>,
2052{
2053    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2054    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2055    /// by equi-joining the two streams on the key attribute `K`.
2056    ///
2057    /// # Example
2058    /// ```rust
2059    /// # #[cfg(feature = "deploy")] {
2060    /// # use hydro_lang::prelude::*;
2061    /// # use std::collections::HashSet;
2062    /// # use futures::StreamExt;
2063    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2064    /// let tick = process.tick();
2065    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2066    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2067    /// stream1.join(stream2)
2068    /// # }, |mut stream| async move {
2069    /// // (1, ('a', 'x')), (2, ('b', 'y'))
2070    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2071    /// # stream.map(|i| assert!(expected.contains(&i)));
2072    /// # }));
2073    /// # }
2074    pub fn join<V2, O2: Ordering, R2: Retries>(
2075        self,
2076        n: Stream<(K, V2), L, B, O2, R2>,
2077    ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
2078    where
2079        K: Eq + Hash,
2080        R: MinRetries<R2>,
2081    {
2082        check_matching_location(&self.location, &n.location);
2083
2084        Stream::new(
2085            self.location.clone(),
2086            HydroNode::Join {
2087                left: Box::new(self.ir_node.into_inner()),
2088                right: Box::new(n.ir_node.into_inner()),
2089                metadata: self.location.new_node_metadata(Stream::<
2090                    (K, (V1, V2)),
2091                    L,
2092                    B,
2093                    NoOrder,
2094                    <R as MinRetries<R2>>::Min,
2095                >::collection_kind()),
2096            },
2097        )
2098    }
2099
2100    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2101    /// computes the anti-join of the items in the input -- i.e. returns
2102    /// unique items in the first input that do not have a matching key
2103    /// in the second input.
2104    ///
2105    /// # Example
2106    /// ```rust
2107    /// # #[cfg(feature = "deploy")] {
2108    /// # use hydro_lang::prelude::*;
2109    /// # use futures::StreamExt;
2110    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2111    /// let tick = process.tick();
2112    /// let stream = process
2113    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2114    ///   .batch(&tick, nondet!(/** test */));
2115    /// let batch = process
2116    ///   .source_iter(q!(vec![1, 2]))
2117    ///   .batch(&tick, nondet!(/** test */));
2118    /// stream.anti_join(batch).all_ticks()
2119    /// # }, |mut stream| async move {
2120    /// # for w in vec![(3, 'c'), (4, 'd')] {
2121    /// #     assert_eq!(stream.next().await.unwrap(), w);
2122    /// # }
2123    /// # }));
2124    /// # }
2125    pub fn anti_join<O2: Ordering, R2: Retries>(
2126        self,
2127        n: Stream<K, L, Bounded, O2, R2>,
2128    ) -> Stream<(K, V1), L, B, O, R>
2129    where
2130        K: Eq + Hash,
2131    {
2132        check_matching_location(&self.location, &n.location);
2133
2134        Stream::new(
2135            self.location.clone(),
2136            HydroNode::AntiJoin {
2137                pos: Box::new(self.ir_node.into_inner()),
2138                neg: Box::new(n.ir_node.into_inner()),
2139                metadata: self
2140                    .location
2141                    .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2142            },
2143        )
2144    }
2145}
2146
2147impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2148    Stream<(K, V), L, B, O, R>
2149{
2150    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2151    /// is used as the key and the second element is added to the entries associated with that key.
2152    ///
2153    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2154    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2155    /// performing grouped aggregations, but also for more precise ordering guarantees such as
2156    /// total ordering _within_ each group but no ordering _across_ groups.
2157    ///
2158    /// # Example
2159    /// ```rust
2160    /// # #[cfg(feature = "deploy")] {
2161    /// # use hydro_lang::prelude::*;
2162    /// # use futures::StreamExt;
2163    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2164    /// process
2165    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2166    ///     .into_keyed()
2167    /// #   .entries()
2168    /// # }, |mut stream| async move {
2169    /// // { 1: [2, 3], 2: [4] }
2170    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2171    /// #     assert_eq!(stream.next().await.unwrap(), w);
2172    /// # }
2173    /// # }));
2174    /// # }
2175    /// ```
2176    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2177        KeyedStream::new(
2178            self.location.clone(),
2179            HydroNode::Cast {
2180                inner: Box::new(self.ir_node.into_inner()),
2181                metadata: self
2182                    .location
2183                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2184            },
2185        )
2186    }
2187}
2188
2189impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2190where
2191    K: Eq + Hash,
2192    L: Location<'a>,
2193{
2194    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2195    /// # Example
2196    /// ```rust
2197    /// # #[cfg(feature = "deploy")] {
2198    /// # use hydro_lang::prelude::*;
2199    /// # use futures::StreamExt;
2200    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2201    /// let tick = process.tick();
2202    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2203    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2204    /// batch.keys().all_ticks()
2205    /// # }, |mut stream| async move {
2206    /// // 1, 2
2207    /// # assert_eq!(stream.next().await.unwrap(), 1);
2208    /// # assert_eq!(stream.next().await.unwrap(), 2);
2209    /// # }));
2210    /// # }
2211    /// ```
2212    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2213        self.into_keyed()
2214            .fold(
2215                q!(|| ()),
2216                q!(
2217                    |_, _| {},
2218                    commutative = ManualProof(/* values are ignored */),
2219                    idempotent = ManualProof(/* values are ignored */)
2220                ),
2221            )
2222            .keys()
2223    }
2224}
2225
2226impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2227where
2228    L: Location<'a> + NoTick,
2229{
2230    /// Returns a stream corresponding to the latest batch of elements being atomically
2231    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2232    /// the order of the input.
2233    ///
2234    /// # Non-Determinism
2235    /// The batch boundaries are non-deterministic and may change across executions.
2236    pub fn batch_atomic(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2237        Stream::new(
2238            self.location.clone().tick,
2239            HydroNode::Batch {
2240                inner: Box::new(self.ir_node.into_inner()),
2241                metadata: self
2242                    .location
2243                    .tick
2244                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2245            },
2246        )
2247    }
2248
2249    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2250    /// See [`Stream::atomic`] for more details.
2251    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2252        Stream::new(
2253            self.location.tick.l.clone(),
2254            HydroNode::EndAtomic {
2255                inner: Box::new(self.ir_node.into_inner()),
2256                metadata: self
2257                    .location
2258                    .tick
2259                    .l
2260                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2261            },
2262        )
2263    }
2264}
2265
2266impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2267where
2268    L: Location<'a> + NoTick + NoAtomic,
2269    F: Future<Output = T>,
2270{
2271    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2272    /// Future outputs are produced as available, regardless of input arrival order.
2273    ///
2274    /// # Example
2275    /// ```rust
2276    /// # #[cfg(feature = "deploy")] {
2277    /// # use std::collections::HashSet;
2278    /// # use futures::StreamExt;
2279    /// # use hydro_lang::prelude::*;
2280    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2281    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2282    ///     .map(q!(|x| async move {
2283    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2284    ///         x
2285    ///     }))
2286    ///     .resolve_futures()
2287    /// #   },
2288    /// #   |mut stream| async move {
2289    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2290    /// #       let mut output = HashSet::new();
2291    /// #       for _ in 1..10 {
2292    /// #           output.insert(stream.next().await.unwrap());
2293    /// #       }
2294    /// #       assert_eq!(
2295    /// #           output,
2296    /// #           HashSet::<i32>::from_iter(1..10)
2297    /// #       );
2298    /// #   },
2299    /// # ));
2300    /// # }
2301    pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
2302        Stream::new(
2303            self.location.clone(),
2304            HydroNode::ResolveFutures {
2305                input: Box::new(self.ir_node.into_inner()),
2306                metadata: self
2307                    .location
2308                    .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
2309            },
2310        )
2311    }
2312
2313    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2314    /// Future outputs are produced in the same order as the input stream.
2315    ///
2316    /// # Example
2317    /// ```rust
2318    /// # #[cfg(feature = "deploy")] {
2319    /// # use std::collections::HashSet;
2320    /// # use futures::StreamExt;
2321    /// # use hydro_lang::prelude::*;
2322    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2323    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2324    ///     .map(q!(|x| async move {
2325    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2326    ///         x
2327    ///     }))
2328    ///     .resolve_futures_ordered()
2329    /// #   },
2330    /// #   |mut stream| async move {
2331    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2332    /// #       let mut output = Vec::new();
2333    /// #       for _ in 1..10 {
2334    /// #           output.push(stream.next().await.unwrap());
2335    /// #       }
2336    /// #       assert_eq!(
2337    /// #           output,
2338    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2339    /// #       );
2340    /// #   },
2341    /// # ));
2342    /// # }
2343    pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
2344        Stream::new(
2345            self.location.clone(),
2346            HydroNode::ResolveFuturesOrdered {
2347                input: Box::new(self.ir_node.into_inner()),
2348                metadata: self
2349                    .location
2350                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2351            },
2352        )
2353    }
2354}
2355
2356impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2357where
2358    L: Location<'a>,
2359{
2360    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2361    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2362    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2363        Stream::new(
2364            self.location.outer().clone(),
2365            HydroNode::YieldConcat {
2366                inner: Box::new(self.ir_node.into_inner()),
2367                metadata: self
2368                    .location
2369                    .outer()
2370                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2371            },
2372        )
2373    }
2374
2375    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2376    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2377    ///
2378    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2379    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2380    /// stream's [`Tick`] context.
2381    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2382        let out_location = Atomic {
2383            tick: self.location.clone(),
2384        };
2385
2386        Stream::new(
2387            out_location.clone(),
2388            HydroNode::YieldConcat {
2389                inner: Box::new(self.ir_node.into_inner()),
2390                metadata: out_location
2391                    .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2392            },
2393        )
2394    }
2395
2396    /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
2397    /// such as `fold` retrain their memory across ticks rather than resetting across batches of
2398    /// input.
2399    ///
2400    /// This API is particularly useful for stateful computation on batches of data, such as
2401    /// maintaining an accumulated state that is up to date with the current batch.
2402    ///
2403    /// # Example
2404    /// ```rust
2405    /// # #[cfg(feature = "deploy")] {
2406    /// # use hydro_lang::prelude::*;
2407    /// # use futures::StreamExt;
2408    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2409    /// let tick = process.tick();
2410    /// # // ticks are lazy by default, forces the second tick to run
2411    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2412    /// # let batch_first_tick = process
2413    /// #   .source_iter(q!(vec![1, 2, 3, 4]))
2414    /// #  .batch(&tick, nondet!(/** test */));
2415    /// # let batch_second_tick = process
2416    /// #   .source_iter(q!(vec![5, 6, 7]))
2417    /// #   .batch(&tick, nondet!(/** test */))
2418    /// #   .defer_tick(); // appears on the second tick
2419    /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
2420    /// # batch_first_tick.chain(batch_second_tick).all_ticks();
2421    ///
2422    /// input.batch(&tick, nondet!(/** test */))
2423    ///     .across_ticks(|s| s.count()).all_ticks()
2424    /// # }, |mut stream| async move {
2425    /// // [4, 7]
2426    /// assert_eq!(stream.next().await.unwrap(), 4);
2427    /// assert_eq!(stream.next().await.unwrap(), 7);
2428    /// # }));
2429    /// # }
2430    /// ```
2431    pub fn across_ticks<Out: BatchAtomic>(
2432        self,
2433        thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
2434    ) -> Out::Batched {
2435        thunk(self.all_ticks_atomic()).batched_atomic()
2436    }
2437
2438    /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2439    /// always has the elements of `self` at tick `T - 1`.
2440    ///
2441    /// At tick `0`, the output stream is empty, since there is no previous tick.
2442    ///
2443    /// This operator enables stateful iterative processing with ticks, by sending data from one
2444    /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2445    ///
2446    /// # Example
2447    /// ```rust
2448    /// # #[cfg(feature = "deploy")] {
2449    /// # use hydro_lang::prelude::*;
2450    /// # use futures::StreamExt;
2451    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2452    /// let tick = process.tick();
2453    /// // ticks are lazy by default, forces the second tick to run
2454    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2455    ///
2456    /// let batch_first_tick = process
2457    ///   .source_iter(q!(vec![1, 2, 3, 4]))
2458    ///   .batch(&tick, nondet!(/** test */));
2459    /// let batch_second_tick = process
2460    ///   .source_iter(q!(vec![0, 3, 4, 5, 6]))
2461    ///   .batch(&tick, nondet!(/** test */))
2462    ///   .defer_tick(); // appears on the second tick
2463    /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2464    ///
2465    /// changes_across_ticks.clone().filter_not_in(
2466    ///     changes_across_ticks.defer_tick() // the elements from the previous tick
2467    /// ).all_ticks()
2468    /// # }, |mut stream| async move {
2469    /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2470    /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2471    /// #     assert_eq!(stream.next().await.unwrap(), w);
2472    /// # }
2473    /// # }));
2474    /// # }
2475    /// ```
2476    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2477        Stream::new(
2478            self.location.clone(),
2479            HydroNode::DeferTick {
2480                input: Box::new(self.ir_node.into_inner()),
2481                metadata: self
2482                    .location
2483                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2484            },
2485        )
2486    }
2487}
2488
2489#[cfg(test)]
2490mod tests {
2491    #[cfg(feature = "deploy")]
2492    use futures::{SinkExt, StreamExt};
2493    #[cfg(feature = "deploy")]
2494    use hydro_deploy::Deployment;
2495    #[cfg(feature = "deploy")]
2496    use serde::{Deserialize, Serialize};
2497    #[cfg(any(feature = "deploy", feature = "sim"))]
2498    use stageleft::q;
2499
2500    #[cfg(any(feature = "deploy", feature = "sim"))]
2501    use crate::compile::builder::FlowBuilder;
2502    #[cfg(feature = "deploy")]
2503    use crate::live_collections::sliced::sliced;
2504    #[cfg(feature = "deploy")]
2505    use crate::live_collections::stream::ExactlyOnce;
2506    #[cfg(feature = "sim")]
2507    use crate::live_collections::stream::NoOrder;
2508    #[cfg(any(feature = "deploy", feature = "sim"))]
2509    use crate::live_collections::stream::TotalOrder;
2510    #[cfg(any(feature = "deploy", feature = "sim"))]
2511    use crate::location::Location;
2512    #[cfg(any(feature = "deploy", feature = "sim"))]
2513    use crate::nondet::nondet;
2514
2515    mod backtrace_chained_ops;
2516
2517    #[cfg(feature = "deploy")]
2518    struct P1 {}
2519    #[cfg(feature = "deploy")]
2520    struct P2 {}
2521
2522    #[cfg(feature = "deploy")]
2523    #[derive(Serialize, Deserialize, Debug)]
2524    struct SendOverNetwork {
2525        n: u32,
2526    }
2527
2528    #[cfg(feature = "deploy")]
2529    #[tokio::test]
2530    async fn first_ten_distributed() {
2531        use crate::networking::TCP;
2532
2533        let mut deployment = Deployment::new();
2534
2535        let mut flow = FlowBuilder::new();
2536        let first_node = flow.process::<P1>();
2537        let second_node = flow.process::<P2>();
2538        let external = flow.external::<P2>();
2539
2540        let numbers = first_node.source_iter(q!(0..10));
2541        let out_port = numbers
2542            .map(q!(|n| SendOverNetwork { n }))
2543            .send(&second_node, TCP.fail_stop().bincode())
2544            .send_bincode_external(&external);
2545
2546        let nodes = flow
2547            .with_process(&first_node, deployment.Localhost())
2548            .with_process(&second_node, deployment.Localhost())
2549            .with_external(&external, deployment.Localhost())
2550            .deploy(&mut deployment);
2551
2552        deployment.deploy().await.unwrap();
2553
2554        let mut external_out = nodes.connect(out_port).await;
2555
2556        deployment.start().await.unwrap();
2557
2558        for i in 0..10 {
2559            assert_eq!(external_out.next().await.unwrap().n, i);
2560        }
2561    }
2562
2563    #[cfg(feature = "deploy")]
2564    #[tokio::test]
2565    async fn first_cardinality() {
2566        let mut deployment = Deployment::new();
2567
2568        let mut flow = FlowBuilder::new();
2569        let node = flow.process::<()>();
2570        let external = flow.external::<()>();
2571
2572        let node_tick = node.tick();
2573        let count = node_tick
2574            .singleton(q!([1, 2, 3]))
2575            .into_stream()
2576            .flatten_ordered()
2577            .first()
2578            .into_stream()
2579            .count()
2580            .all_ticks()
2581            .send_bincode_external(&external);
2582
2583        let nodes = flow
2584            .with_process(&node, deployment.Localhost())
2585            .with_external(&external, deployment.Localhost())
2586            .deploy(&mut deployment);
2587
2588        deployment.deploy().await.unwrap();
2589
2590        let mut external_out = nodes.connect(count).await;
2591
2592        deployment.start().await.unwrap();
2593
2594        assert_eq!(external_out.next().await.unwrap(), 1);
2595    }
2596
2597    #[cfg(feature = "deploy")]
2598    #[tokio::test]
2599    async fn unbounded_reduce_remembers_state() {
2600        let mut deployment = Deployment::new();
2601
2602        let mut flow = FlowBuilder::new();
2603        let node = flow.process::<()>();
2604        let external = flow.external::<()>();
2605
2606        let (input_port, input) = node.source_external_bincode(&external);
2607        let out = input
2608            .reduce(q!(|acc, v| *acc += v))
2609            .sample_eager(nondet!(/** test */))
2610            .send_bincode_external(&external);
2611
2612        let nodes = flow
2613            .with_process(&node, deployment.Localhost())
2614            .with_external(&external, deployment.Localhost())
2615            .deploy(&mut deployment);
2616
2617        deployment.deploy().await.unwrap();
2618
2619        let mut external_in = nodes.connect(input_port).await;
2620        let mut external_out = nodes.connect(out).await;
2621
2622        deployment.start().await.unwrap();
2623
2624        external_in.send(1).await.unwrap();
2625        assert_eq!(external_out.next().await.unwrap(), 1);
2626
2627        external_in.send(2).await.unwrap();
2628        assert_eq!(external_out.next().await.unwrap(), 3);
2629    }
2630
2631    #[cfg(feature = "deploy")]
2632    #[tokio::test]
2633    async fn top_level_bounded_cross_singleton() {
2634        let mut deployment = Deployment::new();
2635
2636        let mut flow = FlowBuilder::new();
2637        let node = flow.process::<()>();
2638        let external = flow.external::<()>();
2639
2640        let (input_port, input) =
2641            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2642
2643        let out = input
2644            .cross_singleton(
2645                node.source_iter(q!(vec![1, 2, 3]))
2646                    .fold(q!(|| 0), q!(|acc, v| *acc += v)),
2647            )
2648            .send_bincode_external(&external);
2649
2650        let nodes = flow
2651            .with_process(&node, deployment.Localhost())
2652            .with_external(&external, deployment.Localhost())
2653            .deploy(&mut deployment);
2654
2655        deployment.deploy().await.unwrap();
2656
2657        let mut external_in = nodes.connect(input_port).await;
2658        let mut external_out = nodes.connect(out).await;
2659
2660        deployment.start().await.unwrap();
2661
2662        external_in.send(1).await.unwrap();
2663        assert_eq!(external_out.next().await.unwrap(), (1, 6));
2664
2665        external_in.send(2).await.unwrap();
2666        assert_eq!(external_out.next().await.unwrap(), (2, 6));
2667    }
2668
2669    #[cfg(feature = "deploy")]
2670    #[tokio::test]
2671    async fn top_level_bounded_reduce_cardinality() {
2672        let mut deployment = Deployment::new();
2673
2674        let mut flow = FlowBuilder::new();
2675        let node = flow.process::<()>();
2676        let external = flow.external::<()>();
2677
2678        let (input_port, input) =
2679            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2680
2681        let out = sliced! {
2682            let input = use(input, nondet!(/** test */));
2683            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
2684            input.cross_singleton(v.into_stream().count())
2685        }
2686        .send_bincode_external(&external);
2687
2688        let nodes = flow
2689            .with_process(&node, deployment.Localhost())
2690            .with_external(&external, deployment.Localhost())
2691            .deploy(&mut deployment);
2692
2693        deployment.deploy().await.unwrap();
2694
2695        let mut external_in = nodes.connect(input_port).await;
2696        let mut external_out = nodes.connect(out).await;
2697
2698        deployment.start().await.unwrap();
2699
2700        external_in.send(1).await.unwrap();
2701        assert_eq!(external_out.next().await.unwrap(), (1, 1));
2702
2703        external_in.send(2).await.unwrap();
2704        assert_eq!(external_out.next().await.unwrap(), (2, 1));
2705    }
2706
2707    #[cfg(feature = "deploy")]
2708    #[tokio::test]
2709    async fn top_level_bounded_into_singleton_cardinality() {
2710        let mut deployment = Deployment::new();
2711
2712        let mut flow = FlowBuilder::new();
2713        let node = flow.process::<()>();
2714        let external = flow.external::<()>();
2715
2716        let (input_port, input) =
2717            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2718
2719        let out = sliced! {
2720            let input = use(input, nondet!(/** test */));
2721            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
2722            input.cross_singleton(v.into_stream().count())
2723        }
2724        .send_bincode_external(&external);
2725
2726        let nodes = flow
2727            .with_process(&node, deployment.Localhost())
2728            .with_external(&external, deployment.Localhost())
2729            .deploy(&mut deployment);
2730
2731        deployment.deploy().await.unwrap();
2732
2733        let mut external_in = nodes.connect(input_port).await;
2734        let mut external_out = nodes.connect(out).await;
2735
2736        deployment.start().await.unwrap();
2737
2738        external_in.send(1).await.unwrap();
2739        assert_eq!(external_out.next().await.unwrap(), (1, 1));
2740
2741        external_in.send(2).await.unwrap();
2742        assert_eq!(external_out.next().await.unwrap(), (2, 1));
2743    }
2744
2745    #[cfg(feature = "deploy")]
2746    #[tokio::test]
2747    async fn atomic_fold_replays_each_tick() {
2748        let mut deployment = Deployment::new();
2749
2750        let mut flow = FlowBuilder::new();
2751        let node = flow.process::<()>();
2752        let external = flow.external::<()>();
2753
2754        let (input_port, input) =
2755            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2756        let tick = node.tick();
2757
2758        let out = input
2759            .batch(&tick, nondet!(/** test */))
2760            .cross_singleton(
2761                node.source_iter(q!(vec![1, 2, 3]))
2762                    .atomic(&tick)
2763                    .fold(q!(|| 0), q!(|acc, v| *acc += v))
2764                    .snapshot_atomic(nondet!(/** test */)),
2765            )
2766            .all_ticks()
2767            .send_bincode_external(&external);
2768
2769        let nodes = flow
2770            .with_process(&node, deployment.Localhost())
2771            .with_external(&external, deployment.Localhost())
2772            .deploy(&mut deployment);
2773
2774        deployment.deploy().await.unwrap();
2775
2776        let mut external_in = nodes.connect(input_port).await;
2777        let mut external_out = nodes.connect(out).await;
2778
2779        deployment.start().await.unwrap();
2780
2781        external_in.send(1).await.unwrap();
2782        assert_eq!(external_out.next().await.unwrap(), (1, 6));
2783
2784        external_in.send(2).await.unwrap();
2785        assert_eq!(external_out.next().await.unwrap(), (2, 6));
2786    }
2787
2788    #[cfg(feature = "deploy")]
2789    #[tokio::test]
2790    async fn unbounded_scan_remembers_state() {
2791        let mut deployment = Deployment::new();
2792
2793        let mut flow = FlowBuilder::new();
2794        let node = flow.process::<()>();
2795        let external = flow.external::<()>();
2796
2797        let (input_port, input) = node.source_external_bincode(&external);
2798        let out = input
2799            .scan(
2800                q!(|| 0),
2801                q!(|acc, v| {
2802                    *acc += v;
2803                    Some(*acc)
2804                }),
2805            )
2806            .send_bincode_external(&external);
2807
2808        let nodes = flow
2809            .with_process(&node, deployment.Localhost())
2810            .with_external(&external, deployment.Localhost())
2811            .deploy(&mut deployment);
2812
2813        deployment.deploy().await.unwrap();
2814
2815        let mut external_in = nodes.connect(input_port).await;
2816        let mut external_out = nodes.connect(out).await;
2817
2818        deployment.start().await.unwrap();
2819
2820        external_in.send(1).await.unwrap();
2821        assert_eq!(external_out.next().await.unwrap(), 1);
2822
2823        external_in.send(2).await.unwrap();
2824        assert_eq!(external_out.next().await.unwrap(), 3);
2825    }
2826
2827    #[cfg(feature = "deploy")]
2828    #[tokio::test]
2829    async fn unbounded_enumerate_remembers_state() {
2830        let mut deployment = Deployment::new();
2831
2832        let mut flow = FlowBuilder::new();
2833        let node = flow.process::<()>();
2834        let external = flow.external::<()>();
2835
2836        let (input_port, input) = node.source_external_bincode(&external);
2837        let out = input.enumerate().send_bincode_external(&external);
2838
2839        let nodes = flow
2840            .with_process(&node, deployment.Localhost())
2841            .with_external(&external, deployment.Localhost())
2842            .deploy(&mut deployment);
2843
2844        deployment.deploy().await.unwrap();
2845
2846        let mut external_in = nodes.connect(input_port).await;
2847        let mut external_out = nodes.connect(out).await;
2848
2849        deployment.start().await.unwrap();
2850
2851        external_in.send(1).await.unwrap();
2852        assert_eq!(external_out.next().await.unwrap(), (0, 1));
2853
2854        external_in.send(2).await.unwrap();
2855        assert_eq!(external_out.next().await.unwrap(), (1, 2));
2856    }
2857
2858    #[cfg(feature = "deploy")]
2859    #[tokio::test]
2860    async fn unbounded_unique_remembers_state() {
2861        let mut deployment = Deployment::new();
2862
2863        let mut flow = FlowBuilder::new();
2864        let node = flow.process::<()>();
2865        let external = flow.external::<()>();
2866
2867        let (input_port, input) =
2868            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2869        let out = input.unique().send_bincode_external(&external);
2870
2871        let nodes = flow
2872            .with_process(&node, deployment.Localhost())
2873            .with_external(&external, deployment.Localhost())
2874            .deploy(&mut deployment);
2875
2876        deployment.deploy().await.unwrap();
2877
2878        let mut external_in = nodes.connect(input_port).await;
2879        let mut external_out = nodes.connect(out).await;
2880
2881        deployment.start().await.unwrap();
2882
2883        external_in.send(1).await.unwrap();
2884        assert_eq!(external_out.next().await.unwrap(), 1);
2885
2886        external_in.send(2).await.unwrap();
2887        assert_eq!(external_out.next().await.unwrap(), 2);
2888
2889        external_in.send(1).await.unwrap();
2890        external_in.send(3).await.unwrap();
2891        assert_eq!(external_out.next().await.unwrap(), 3);
2892    }
2893
2894    #[cfg(feature = "sim")]
2895    #[test]
2896    #[should_panic]
2897    fn sim_batch_nondet_size() {
2898        let mut flow = FlowBuilder::new();
2899        let node = flow.process::<()>();
2900
2901        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
2902
2903        let tick = node.tick();
2904        let out_recv = input
2905            .batch(&tick, nondet!(/** test */))
2906            .count()
2907            .all_ticks()
2908            .sim_output();
2909
2910        flow.sim().exhaustive(async || {
2911            in_send.send(());
2912            in_send.send(());
2913            in_send.send(());
2914
2915            assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
2916        });
2917    }
2918
2919    #[cfg(feature = "sim")]
2920    #[test]
2921    fn sim_batch_preserves_order() {
2922        let mut flow = FlowBuilder::new();
2923        let node = flow.process::<()>();
2924
2925        let (in_send, input) = node.sim_input();
2926
2927        let tick = node.tick();
2928        let out_recv = input
2929            .batch(&tick, nondet!(/** test */))
2930            .all_ticks()
2931            .sim_output();
2932
2933        flow.sim().exhaustive(async || {
2934            in_send.send(1);
2935            in_send.send(2);
2936            in_send.send(3);
2937
2938            out_recv.assert_yields_only([1, 2, 3]).await;
2939        });
2940    }
2941
2942    #[cfg(feature = "sim")]
2943    #[test]
2944    #[should_panic]
2945    fn sim_batch_unordered_shuffles() {
2946        let mut flow = FlowBuilder::new();
2947        let node = flow.process::<()>();
2948
2949        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2950
2951        let tick = node.tick();
2952        let batch = input.batch(&tick, nondet!(/** test */));
2953        let out_recv = batch
2954            .clone()
2955            .min()
2956            .zip(batch.max())
2957            .all_ticks()
2958            .sim_output();
2959
2960        flow.sim().exhaustive(async || {
2961            in_send.send_many_unordered([1, 2, 3]);
2962
2963            if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
2964                panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
2965            }
2966        });
2967    }
2968
2969    #[cfg(feature = "sim")]
2970    #[test]
2971    fn sim_batch_unordered_shuffles_count() {
2972        let mut flow = FlowBuilder::new();
2973        let node = flow.process::<()>();
2974
2975        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2976
2977        let tick = node.tick();
2978        let batch = input.batch(&tick, nondet!(/** test */));
2979        let out_recv = batch.all_ticks().sim_output();
2980
2981        let instance_count = flow.sim().exhaustive(async || {
2982            in_send.send_many_unordered([1, 2, 3, 4]);
2983            out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
2984        });
2985
2986        assert_eq!(
2987            instance_count,
2988            75 // ∑ (k=1 to 4) S(4,k) × k! = 75
2989        )
2990    }
2991
2992    #[cfg(feature = "sim")]
2993    #[test]
2994    #[should_panic]
2995    fn sim_observe_order_batched() {
2996        let mut flow = FlowBuilder::new();
2997        let node = flow.process::<()>();
2998
2999        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3000
3001        let tick = node.tick();
3002        let batch = input.batch(&tick, nondet!(/** test */));
3003        let out_recv = batch
3004            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3005            .all_ticks()
3006            .sim_output();
3007
3008        flow.sim().exhaustive(async || {
3009            in_send.send_many_unordered([1, 2, 3, 4]);
3010            out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3011        });
3012    }
3013
3014    #[cfg(feature = "sim")]
3015    #[test]
3016    fn sim_observe_order_batched_count() {
3017        let mut flow = FlowBuilder::new();
3018        let node = flow.process::<()>();
3019
3020        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3021
3022        let tick = node.tick();
3023        let batch = input.batch(&tick, nondet!(/** test */));
3024        let out_recv = batch
3025            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3026            .all_ticks()
3027            .sim_output();
3028
3029        let instance_count = flow.sim().exhaustive(async || {
3030            in_send.send_many_unordered([1, 2, 3, 4]);
3031            let _ = out_recv.collect::<Vec<_>>().await;
3032        });
3033
3034        assert_eq!(
3035            instance_count,
3036            192 // 4! * 2^{4 - 1}
3037        )
3038    }
3039
3040    #[cfg(feature = "sim")]
3041    #[test]
3042    fn sim_unordered_count_instance_count() {
3043        let mut flow = FlowBuilder::new();
3044        let node = flow.process::<()>();
3045
3046        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3047
3048        let tick = node.tick();
3049        let out_recv = input
3050            .count()
3051            .snapshot(&tick, nondet!(/** test */))
3052            .all_ticks()
3053            .sim_output();
3054
3055        let instance_count = flow.sim().exhaustive(async || {
3056            in_send.send_many_unordered([1, 2, 3, 4]);
3057            assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3058        });
3059
3060        assert_eq!(
3061            instance_count,
3062            16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3063        )
3064    }
3065
3066    #[cfg(feature = "sim")]
3067    #[test]
3068    fn sim_top_level_assume_ordering() {
3069        let mut flow = FlowBuilder::new();
3070        let node = flow.process::<()>();
3071
3072        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3073
3074        let out_recv = input
3075            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3076            .sim_output();
3077
3078        let instance_count = flow.sim().exhaustive(async || {
3079            in_send.send_many_unordered([1, 2, 3]);
3080            let mut out = out_recv.collect::<Vec<_>>().await;
3081            out.sort();
3082            assert_eq!(out, vec![1, 2, 3]);
3083        });
3084
3085        assert_eq!(instance_count, 6)
3086    }
3087
3088    #[cfg(feature = "sim")]
3089    #[test]
3090    fn sim_top_level_assume_ordering_cycle_back() {
3091        let mut flow = FlowBuilder::new();
3092        let node = flow.process::<()>();
3093
3094        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3095
3096        let (complete_cycle_back, cycle_back) =
3097            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3098        let ordered = input
3099            .interleave(cycle_back)
3100            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3101        complete_cycle_back.complete(
3102            ordered
3103                .clone()
3104                .map(q!(|v| v + 1))
3105                .filter(q!(|v| v % 2 == 1)),
3106        );
3107
3108        let out_recv = ordered.sim_output();
3109
3110        let mut saw = false;
3111        let instance_count = flow.sim().exhaustive(async || {
3112            in_send.send_many_unordered([0, 2]);
3113            let out = out_recv.collect::<Vec<_>>().await;
3114
3115            if out.starts_with(&[0, 1, 2]) {
3116                saw = true;
3117            }
3118        });
3119
3120        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3121        assert_eq!(instance_count, 6)
3122    }
3123
3124    #[cfg(feature = "sim")]
3125    #[test]
3126    fn sim_top_level_assume_ordering_cycle_back_tick() {
3127        let mut flow = FlowBuilder::new();
3128        let node = flow.process::<()>();
3129
3130        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3131
3132        let (complete_cycle_back, cycle_back) =
3133            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3134        let ordered = input
3135            .interleave(cycle_back)
3136            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3137        complete_cycle_back.complete(
3138            ordered
3139                .clone()
3140                .batch(&node.tick(), nondet!(/** test */))
3141                .all_ticks()
3142                .map(q!(|v| v + 1))
3143                .filter(q!(|v| v % 2 == 1)),
3144        );
3145
3146        let out_recv = ordered.sim_output();
3147
3148        let mut saw = false;
3149        let instance_count = flow.sim().exhaustive(async || {
3150            in_send.send_many_unordered([0, 2]);
3151            let out = out_recv.collect::<Vec<_>>().await;
3152
3153            if out.starts_with(&[0, 1, 2]) {
3154                saw = true;
3155            }
3156        });
3157
3158        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3159        assert_eq!(instance_count, 58)
3160    }
3161
3162    #[cfg(feature = "sim")]
3163    #[test]
3164    fn sim_top_level_assume_ordering_multiple() {
3165        let mut flow = FlowBuilder::new();
3166        let node = flow.process::<()>();
3167
3168        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3169        let (_, input2) = node.sim_input::<_, NoOrder, _>();
3170
3171        let (complete_cycle_back, cycle_back) =
3172            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3173        let input1_ordered = input
3174            .clone()
3175            .interleave(cycle_back)
3176            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3177        let foo = input1_ordered
3178            .clone()
3179            .map(q!(|v| v + 3))
3180            .weaken_ordering::<NoOrder>()
3181            .interleave(input2)
3182            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3183
3184        complete_cycle_back.complete(foo.filter(q!(|v| *v == 3)));
3185
3186        let out_recv = input1_ordered.sim_output();
3187
3188        let mut saw = false;
3189        let instance_count = flow.sim().exhaustive(async || {
3190            in_send.send_many_unordered([0, 1]);
3191            let out = out_recv.collect::<Vec<_>>().await;
3192
3193            if out.starts_with(&[0, 3, 1]) {
3194                saw = true;
3195            }
3196        });
3197
3198        assert!(saw, "did not see an instance with 0, 3, 1 in order");
3199        assert_eq!(instance_count, 24)
3200    }
3201
3202    #[cfg(feature = "sim")]
3203    #[test]
3204    fn sim_atomic_assume_ordering_cycle_back() {
3205        let mut flow = FlowBuilder::new();
3206        let node = flow.process::<()>();
3207
3208        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3209
3210        let (complete_cycle_back, cycle_back) =
3211            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3212        let ordered = input
3213            .interleave(cycle_back)
3214            .atomic(&node.tick())
3215            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3216            .end_atomic();
3217        complete_cycle_back.complete(
3218            ordered
3219                .clone()
3220                .map(q!(|v| v + 1))
3221                .filter(q!(|v| v % 2 == 1)),
3222        );
3223
3224        let out_recv = ordered.sim_output();
3225
3226        let instance_count = flow.sim().exhaustive(async || {
3227            in_send.send_many_unordered([0, 2]);
3228            let out = out_recv.collect::<Vec<_>>().await;
3229            assert_eq!(out.len(), 4);
3230        });
3231
3232        assert_eq!(instance_count, 22)
3233    }
3234}