Skip to main content

hydro_lang/live_collections/keyed_stream/
mod.rs

1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
11
12use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
13use super::keyed_singleton::KeyedSingleton;
14use super::optional::Optional;
15use super::singleton::Singleton;
16use super::stream::{
17    ExactlyOnce, IsExactlyOnce, IsOrdered, MinOrder, MinRetries, NoOrder, Stream, TotalOrder,
18};
19use crate::compile::builder::CycleId;
20use crate::compile::ir::{
21    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
22};
23#[cfg(stageleft_runtime)]
24use crate::forward_handle::{CycleCollection, ReceiverComplete};
25use crate::forward_handle::{ForwardRef, TickCycle};
26use crate::live_collections::batch_atomic::BatchAtomic;
27use crate::live_collections::stream::{
28    AtLeastOnce, Ordering, Retries, WeakerOrderingThan, WeakerRetryThan,
29};
30#[cfg(stageleft_runtime)]
31use crate::location::dynamic::{DynLocation, LocationId};
32use crate::location::tick::DeferTick;
33use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
34use crate::manual_expr::ManualExpr;
35use crate::nondet::{NonDet, nondet};
36use crate::properties::{AggFuncAlgebra, ValidCommutativityFor, ValidIdempotenceFor};
37
38pub mod networking;
39
40/// Streaming elements of type `V` grouped by a key of type `K`.
41///
42/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
43/// order of keys is non-deterministic but the order *within* each group may be deterministic.
44///
45/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
46/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
47/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
48///
49/// Type Parameters:
50/// - `K`: the type of the key for each group
51/// - `V`: the type of the elements inside each group
52/// - `Loc`: the [`Location`] where the keyed stream is materialized
53/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
54/// - `Order`: tracks whether the elements within each group have deterministic order
55///   ([`TotalOrder`]) or not ([`NoOrder`])
56/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
57///   ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
58pub struct KeyedStream<
59    K,
60    V,
61    Loc,
62    Bound: Boundedness = Unbounded,
63    Order: Ordering = TotalOrder,
64    Retry: Retries = ExactlyOnce,
65> {
66    pub(crate) location: Loc,
67    pub(crate) ir_node: RefCell<HydroNode>,
68
69    _phantom: PhantomData<(K, V, Loc, Bound, Order, Retry)>,
70}
71
72impl<'a, K, V, L, O: Ordering, R: Retries> From<KeyedStream<K, V, L, Bounded, O, R>>
73    for KeyedStream<K, V, L, Unbounded, O, R>
74where
75    L: Location<'a>,
76{
77    fn from(stream: KeyedStream<K, V, L, Bounded, O, R>) -> KeyedStream<K, V, L, Unbounded, O, R> {
78        let new_meta = stream
79            .location
80            .new_node_metadata(KeyedStream::<K, V, L, Unbounded, O, R>::collection_kind());
81
82        KeyedStream {
83            location: stream.location,
84            ir_node: RefCell::new(HydroNode::Cast {
85                inner: Box::new(stream.ir_node.into_inner()),
86                metadata: new_meta,
87            }),
88            _phantom: PhantomData,
89        }
90    }
91}
92
93impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
94    for KeyedStream<K, V, L, B, NoOrder, R>
95where
96    L: Location<'a>,
97{
98    fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
99        stream.weaken_ordering()
100    }
101}
102
103impl<'a, K, V, L, O: Ordering, R: Retries> DeferTick for KeyedStream<K, V, Tick<L>, Bounded, O, R>
104where
105    L: Location<'a>,
106{
107    fn defer_tick(self) -> Self {
108        KeyedStream::defer_tick(self)
109    }
110}
111
112impl<'a, K, V, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
113    for KeyedStream<K, V, Tick<L>, Bounded, O, R>
114where
115    L: Location<'a>,
116{
117    type Location = Tick<L>;
118
119    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
120        KeyedStream {
121            location: location.clone(),
122            ir_node: RefCell::new(HydroNode::CycleSource {
123                cycle_id,
124                metadata: location.new_node_metadata(
125                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
126                ),
127            }),
128            _phantom: PhantomData,
129        }
130    }
131}
132
133impl<'a, K, V, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
134    for KeyedStream<K, V, Tick<L>, Bounded, O, R>
135where
136    L: Location<'a>,
137{
138    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
139        assert_eq!(
140            Location::id(&self.location),
141            expected_location,
142            "locations do not match"
143        );
144
145        self.location
146            .flow_state()
147            .borrow_mut()
148            .push_root(HydroRoot::CycleSink {
149                cycle_id,
150                input: Box::new(self.ir_node.into_inner()),
151                op_metadata: HydroIrOpMetadata::new(),
152            });
153    }
154}
155
156impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
157    for KeyedStream<K, V, L, B, O, R>
158where
159    L: Location<'a> + NoTick,
160{
161    type Location = L;
162
163    fn create_source(cycle_id: CycleId, location: L) -> Self {
164        KeyedStream {
165            location: location.clone(),
166            ir_node: RefCell::new(HydroNode::CycleSource {
167                cycle_id,
168                metadata: location
169                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
170            }),
171            _phantom: PhantomData,
172        }
173    }
174}
175
176impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
177    for KeyedStream<K, V, L, B, O, R>
178where
179    L: Location<'a> + NoTick,
180{
181    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
182        assert_eq!(
183            Location::id(&self.location),
184            expected_location,
185            "locations do not match"
186        );
187        self.location
188            .flow_state()
189            .borrow_mut()
190            .push_root(HydroRoot::CycleSink {
191                cycle_id,
192                input: Box::new(self.ir_node.into_inner()),
193                op_metadata: HydroIrOpMetadata::new(),
194            });
195    }
196}
197
198impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
199    Clone for KeyedStream<K, V, Loc, Bound, Order, R>
200{
201    fn clone(&self) -> Self {
202        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
203            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
204            *self.ir_node.borrow_mut() = HydroNode::Tee {
205                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
206                metadata: self.location.new_node_metadata(Self::collection_kind()),
207            };
208        }
209
210        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
211            KeyedStream {
212                location: self.location.clone(),
213                ir_node: HydroNode::Tee {
214                    inner: TeeNode(inner.0.clone()),
215                    metadata: metadata.clone(),
216                }
217                .into(),
218                _phantom: PhantomData,
219            }
220        } else {
221            unreachable!()
222        }
223    }
224}
225
226/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
227/// control the processing of future elements.
228pub enum Generate<T> {
229    /// Emit the provided element, and keep processing future inputs.
230    Yield(T),
231    /// Emit the provided element as the _final_ element, do not process future inputs.
232    Return(T),
233    /// Do not emit anything, but continue processing future inputs.
234    Continue,
235    /// Do not emit anything, and do not process further inputs.
236    Break,
237}
238
239impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
240    KeyedStream<K, V, L, B, O, R>
241{
242    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
243        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
244        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
245
246        KeyedStream {
247            location,
248            ir_node: RefCell::new(ir_node),
249            _phantom: PhantomData,
250        }
251    }
252
253    /// Returns the [`CollectionKind`] corresponding to this type.
254    pub fn collection_kind() -> CollectionKind {
255        CollectionKind::KeyedStream {
256            bound: B::BOUND_KIND,
257            value_order: O::ORDERING_KIND,
258            value_retry: R::RETRIES_KIND,
259            key_type: stageleft::quote_type::<K>().into(),
260            value_type: stageleft::quote_type::<V>().into(),
261        }
262    }
263
264    /// Returns the [`Location`] where this keyed stream is being materialized.
265    pub fn location(&self) -> &L {
266        &self.location
267    }
268
269    /// Explicitly "casts" the keyed stream to a type with a different ordering
270    /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
271    /// by the type-system.
272    ///
273    /// # Non-Determinism
274    /// This function is used as an escape hatch, and any mistakes in the
275    /// provided ordering guarantee will propagate into the guarantees
276    /// for the rest of the program.
277    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
278        if O::ORDERING_KIND == O2::ORDERING_KIND {
279            KeyedStream::new(self.location, self.ir_node.into_inner())
280        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
281            // We can always weaken the ordering guarantee
282            KeyedStream::new(
283                self.location.clone(),
284                HydroNode::Cast {
285                    inner: Box::new(self.ir_node.into_inner()),
286                    metadata: self
287                        .location
288                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
289                },
290            )
291        } else {
292            KeyedStream::new(
293                self.location.clone(),
294                HydroNode::ObserveNonDet {
295                    inner: Box::new(self.ir_node.into_inner()),
296                    trusted: false,
297                    metadata: self
298                        .location
299                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
300                },
301            )
302        }
303    }
304
305    fn assume_ordering_trusted<O2: Ordering>(
306        self,
307        _nondet: NonDet,
308    ) -> KeyedStream<K, V, L, B, O2, R> {
309        if O::ORDERING_KIND == O2::ORDERING_KIND {
310            KeyedStream::new(self.location, self.ir_node.into_inner())
311        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
312            // We can always weaken the ordering guarantee
313            KeyedStream::new(
314                self.location.clone(),
315                HydroNode::Cast {
316                    inner: Box::new(self.ir_node.into_inner()),
317                    metadata: self
318                        .location
319                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
320                },
321            )
322        } else {
323            KeyedStream::new(
324                self.location.clone(),
325                HydroNode::ObserveNonDet {
326                    inner: Box::new(self.ir_node.into_inner()),
327                    trusted: true,
328                    metadata: self
329                        .location
330                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
331                },
332            )
333        }
334    }
335
336    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
337    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
338    /// which is always safe because that is the weakest possible guarantee.
339    pub fn weakest_ordering(self) -> KeyedStream<K, V, L, B, NoOrder, R> {
340        self.weaken_ordering::<NoOrder>()
341    }
342
343    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
344    /// enforcing that `O2` is weaker than the input ordering guarantee.
345    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> KeyedStream<K, V, L, B, O2, R> {
346        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
347        self.assume_ordering::<O2>(nondet)
348    }
349
350    /// Explicitly "casts" the keyed stream to a type with a different retries
351    /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
352    /// be proven by the type-system.
353    ///
354    /// # Non-Determinism
355    /// This function is used as an escape hatch, and any mistakes in the
356    /// provided retries guarantee will propagate into the guarantees
357    /// for the rest of the program.
358    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
359        if R::RETRIES_KIND == R2::RETRIES_KIND {
360            KeyedStream::new(self.location, self.ir_node.into_inner())
361        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
362            // We can always weaken the retries guarantee
363            KeyedStream::new(
364                self.location.clone(),
365                HydroNode::Cast {
366                    inner: Box::new(self.ir_node.into_inner()),
367                    metadata: self
368                        .location
369                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
370                },
371            )
372        } else {
373            KeyedStream::new(
374                self.location.clone(),
375                HydroNode::ObserveNonDet {
376                    inner: Box::new(self.ir_node.into_inner()),
377                    trusted: false,
378                    metadata: self
379                        .location
380                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
381                },
382            )
383        }
384    }
385
386    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
387    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
388    /// which is always safe because that is the weakest possible guarantee.
389    pub fn weakest_retries(self) -> KeyedStream<K, V, L, B, O, AtLeastOnce> {
390        self.weaken_retries::<AtLeastOnce>()
391    }
392
393    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
394    /// enforcing that `R2` is weaker than the input retries guarantee.
395    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> KeyedStream<K, V, L, B, O, R2> {
396        let nondet = nondet!(/** this is a weaker retries guarantee, so it is safe to assume */);
397        self.assume_retries::<R2>(nondet)
398    }
399
400    /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
401    /// implies that `O == TotalOrder`.
402    pub fn make_totally_ordered(self) -> KeyedStream<K, V, L, B, TotalOrder, R>
403    where
404        O: IsOrdered,
405    {
406        self.assume_ordering(nondet!(/** no-op */))
407    }
408
409    /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
410    /// implies that `R == ExactlyOnce`.
411    pub fn make_exactly_once(self) -> KeyedStream<K, V, L, B, O, ExactlyOnce>
412    where
413        R: IsExactlyOnce,
414    {
415        self.assume_retries(nondet!(/** no-op */))
416    }
417
418    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
419    /// implies that `B == Bounded`.
420    pub fn make_bounded(self) -> KeyedStream<K, V, L, Bounded, O, R>
421    where
422        B: IsBounded,
423    {
424        KeyedStream::new(self.location, self.ir_node.into_inner())
425    }
426
427    /// Flattens the keyed stream into an unordered stream of key-value pairs.
428    ///
429    /// # Example
430    /// ```rust
431    /// # #[cfg(feature = "deploy")] {
432    /// # use hydro_lang::prelude::*;
433    /// # use futures::StreamExt;
434    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
435    /// process
436    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
437    ///     .into_keyed()
438    ///     .entries()
439    /// # }, |mut stream| async move {
440    /// // (1, 2), (1, 3), (2, 4) in any order
441    /// # let mut results = Vec::new();
442    /// # for _ in 0..3 {
443    /// #     results.push(stream.next().await.unwrap());
444    /// # }
445    /// # results.sort();
446    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
447    /// # }));
448    /// # }
449    /// ```
450    pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
451        Stream::new(
452            self.location.clone(),
453            HydroNode::Cast {
454                inner: Box::new(self.ir_node.into_inner()),
455                metadata: self
456                    .location
457                    .new_node_metadata(Stream::<(K, V), L, B, NoOrder, R>::collection_kind()),
458            },
459        )
460    }
461
462    /// Flattens the keyed stream into an unordered stream of only the values.
463    ///
464    /// # Example
465    /// ```rust
466    /// # #[cfg(feature = "deploy")] {
467    /// # use hydro_lang::prelude::*;
468    /// # use futures::StreamExt;
469    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
470    /// process
471    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
472    ///     .into_keyed()
473    ///     .values()
474    /// # }, |mut stream| async move {
475    /// // 2, 3, 4 in any order
476    /// # let mut results = Vec::new();
477    /// # for _ in 0..3 {
478    /// #     results.push(stream.next().await.unwrap());
479    /// # }
480    /// # results.sort();
481    /// # assert_eq!(results, vec![2, 3, 4]);
482    /// # }));
483    /// # }
484    /// ```
485    pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
486        self.entries().map(q!(|(_, v)| v))
487    }
488
489    /// Flattens the keyed stream into an unordered stream of just the keys.
490    ///
491    /// # Example
492    /// ```rust
493    /// # #[cfg(feature = "deploy")] {
494    /// # use hydro_lang::prelude::*;
495    /// # use futures::StreamExt;
496    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
497    /// # process
498    /// #     .source_iter(q!(vec![(1, 2), (2, 4), (1, 5)]))
499    /// #     .into_keyed()
500    /// #     .keys()
501    /// # }, |mut stream| async move {
502    /// // 1, 2 in any order
503    /// # let mut results = Vec::new();
504    /// # for _ in 0..2 {
505    /// #     results.push(stream.next().await.unwrap());
506    /// # }
507    /// # results.sort();
508    /// # assert_eq!(results, vec![1, 2]);
509    /// # }));
510    /// # }
511    /// ```
512    pub fn keys(self) -> Stream<K, L, B, NoOrder, ExactlyOnce>
513    where
514        K: Eq + Hash,
515    {
516        self.entries().map(q!(|(k, _)| k)).unique()
517    }
518
519    /// Transforms each value by invoking `f` on each element, with keys staying the same
520    /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
521    ///
522    /// If you do not want to modify the stream and instead only want to view
523    /// each item use [`KeyedStream::inspect`] instead.
524    ///
525    /// # Example
526    /// ```rust
527    /// # #[cfg(feature = "deploy")] {
528    /// # use hydro_lang::prelude::*;
529    /// # use futures::StreamExt;
530    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
531    /// process
532    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
533    ///     .into_keyed()
534    ///     .map(q!(|v| v + 1))
535    /// #   .entries()
536    /// # }, |mut stream| async move {
537    /// // { 1: [3, 4], 2: [5] }
538    /// # let mut results = Vec::new();
539    /// # for _ in 0..3 {
540    /// #     results.push(stream.next().await.unwrap());
541    /// # }
542    /// # results.sort();
543    /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 5)]);
544    /// # }));
545    /// # }
546    /// ```
547    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
548    where
549        F: Fn(V) -> U + 'a,
550    {
551        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
552        let map_f = q!({
553            let orig = f;
554            move |(k, v)| (k, orig(v))
555        })
556        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
557        .into();
558
559        KeyedStream::new(
560            self.location.clone(),
561            HydroNode::Map {
562                f: map_f,
563                input: Box::new(self.ir_node.into_inner()),
564                metadata: self
565                    .location
566                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
567            },
568        )
569    }
570
571    /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
572    /// re-grouped even they are tuples; instead they will be grouped under the original key.
573    ///
574    /// If you do not want to modify the stream and instead only want to view
575    /// each item use [`KeyedStream::inspect_with_key`] instead.
576    ///
577    /// # Example
578    /// ```rust
579    /// # #[cfg(feature = "deploy")] {
580    /// # use hydro_lang::prelude::*;
581    /// # use futures::StreamExt;
582    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
583    /// process
584    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
585    ///     .into_keyed()
586    ///     .map_with_key(q!(|(k, v)| k + v))
587    /// #   .entries()
588    /// # }, |mut stream| async move {
589    /// // { 1: [3, 4], 2: [6] }
590    /// # let mut results = Vec::new();
591    /// # for _ in 0..3 {
592    /// #     results.push(stream.next().await.unwrap());
593    /// # }
594    /// # results.sort();
595    /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 6)]);
596    /// # }));
597    /// # }
598    /// ```
599    pub fn map_with_key<U, F>(
600        self,
601        f: impl IntoQuotedMut<'a, F, L> + Copy,
602    ) -> KeyedStream<K, U, L, B, O, R>
603    where
604        F: Fn((K, V)) -> U + 'a,
605        K: Clone,
606    {
607        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
608        let map_f = q!({
609            let orig = f;
610            move |(k, v)| {
611                let out = orig((Clone::clone(&k), v));
612                (k, out)
613            }
614        })
615        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
616        .into();
617
618        KeyedStream::new(
619            self.location.clone(),
620            HydroNode::Map {
621                f: map_f,
622                input: Box::new(self.ir_node.into_inner()),
623                metadata: self
624                    .location
625                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
626            },
627        )
628    }
629
630    /// Prepends a new value to the key of each element in the stream, producing a new
631    /// keyed stream with compound keys. Because the original key is preserved, no re-grouping
632    /// occurs and the elements in each group preserve their original order.
633    ///
634    /// # Example
635    /// ```rust
636    /// # #[cfg(feature = "deploy")] {
637    /// # use hydro_lang::prelude::*;
638    /// # use futures::StreamExt;
639    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
640    /// process
641    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
642    ///     .into_keyed()
643    ///     .prefix_key(q!(|&(k, _)| k % 2))
644    /// #   .entries()
645    /// # }, |mut stream| async move {
646    /// // { (1, 1): [2, 3], (0, 2): [4] }
647    /// # let mut results = Vec::new();
648    /// # for _ in 0..3 {
649    /// #     results.push(stream.next().await.unwrap());
650    /// # }
651    /// # results.sort();
652    /// # assert_eq!(results, vec![((0, 2), 4), ((1, 1), 2), ((1, 1), 3)]);
653    /// # }));
654    /// # }
655    /// ```
656    pub fn prefix_key<K2, F>(
657        self,
658        f: impl IntoQuotedMut<'a, F, L> + Copy,
659    ) -> KeyedStream<(K2, K), V, L, B, O, R>
660    where
661        F: Fn(&(K, V)) -> K2 + 'a,
662    {
663        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
664        let map_f = q!({
665            let orig = f;
666            move |kv| {
667                let out = orig(&kv);
668                ((out, kv.0), kv.1)
669            }
670        })
671        .splice_fn1_ctx::<(K, V), ((K2, K), V)>(&self.location)
672        .into();
673
674        KeyedStream::new(
675            self.location.clone(),
676            HydroNode::Map {
677                f: map_f,
678                input: Box::new(self.ir_node.into_inner()),
679                metadata: self
680                    .location
681                    .new_node_metadata(KeyedStream::<(K2, K), V, L, B, O, R>::collection_kind()),
682            },
683        )
684    }
685
686    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
687    /// `f`, preserving the order of the elements within the group.
688    ///
689    /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
690    /// not modify or take ownership of the values. If you need to modify the values while filtering
691    /// use [`KeyedStream::filter_map`] instead.
692    ///
693    /// # Example
694    /// ```rust
695    /// # #[cfg(feature = "deploy")] {
696    /// # use hydro_lang::prelude::*;
697    /// # use futures::StreamExt;
698    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
699    /// process
700    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
701    ///     .into_keyed()
702    ///     .filter(q!(|&x| x > 2))
703    /// #   .entries()
704    /// # }, |mut stream| async move {
705    /// // { 1: [3], 2: [4] }
706    /// # let mut results = Vec::new();
707    /// # for _ in 0..2 {
708    /// #     results.push(stream.next().await.unwrap());
709    /// # }
710    /// # results.sort();
711    /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
712    /// # }));
713    /// # }
714    /// ```
715    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
716    where
717        F: Fn(&V) -> bool + 'a,
718    {
719        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
720        let filter_f = q!({
721            let orig = f;
722            move |t: &(_, _)| orig(&t.1)
723        })
724        .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
725        .into();
726
727        KeyedStream::new(
728            self.location.clone(),
729            HydroNode::Filter {
730                f: filter_f,
731                input: Box::new(self.ir_node.into_inner()),
732                metadata: self.location.new_node_metadata(Self::collection_kind()),
733            },
734        )
735    }
736
737    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
738    /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
739    ///
740    /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
741    /// not modify or take ownership of the values. If you need to modify the values while filtering
742    /// use [`KeyedStream::filter_map_with_key`] instead.
743    ///
744    /// # Example
745    /// ```rust
746    /// # #[cfg(feature = "deploy")] {
747    /// # use hydro_lang::prelude::*;
748    /// # use futures::StreamExt;
749    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
750    /// process
751    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
752    ///     .into_keyed()
753    ///     .filter_with_key(q!(|&(k, v)| v - k == 2))
754    /// #   .entries()
755    /// # }, |mut stream| async move {
756    /// // { 1: [3], 2: [4] }
757    /// # let mut results = Vec::new();
758    /// # for _ in 0..2 {
759    /// #     results.push(stream.next().await.unwrap());
760    /// # }
761    /// # results.sort();
762    /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
763    /// # }));
764    /// # }
765    /// ```
766    pub fn filter_with_key<F>(
767        self,
768        f: impl IntoQuotedMut<'a, F, L> + Copy,
769    ) -> KeyedStream<K, V, L, B, O, R>
770    where
771        F: Fn(&(K, V)) -> bool + 'a,
772    {
773        let filter_f = f
774            .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
775            .into();
776
777        KeyedStream::new(
778            self.location.clone(),
779            HydroNode::Filter {
780                f: filter_f,
781                input: Box::new(self.ir_node.into_inner()),
782                metadata: self.location.new_node_metadata(Self::collection_kind()),
783            },
784        )
785    }
786
787    /// An operator that both filters and maps each value, with keys staying the same.
788    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
789    /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
790    ///
791    /// # Example
792    /// ```rust
793    /// # #[cfg(feature = "deploy")] {
794    /// # use hydro_lang::prelude::*;
795    /// # use futures::StreamExt;
796    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
797    /// process
798    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
799    ///     .into_keyed()
800    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
801    /// #   .entries()
802    /// # }, |mut stream| async move {
803    /// // { 1: [2], 2: [4] }
804    /// # let mut results = Vec::new();
805    /// # for _ in 0..2 {
806    /// #     results.push(stream.next().await.unwrap());
807    /// # }
808    /// # results.sort();
809    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
810    /// # }));
811    /// # }
812    /// ```
813    pub fn filter_map<U, F>(
814        self,
815        f: impl IntoQuotedMut<'a, F, L> + Copy,
816    ) -> KeyedStream<K, U, L, B, O, R>
817    where
818        F: Fn(V) -> Option<U> + 'a,
819    {
820        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
821        let filter_map_f = q!({
822            let orig = f;
823            move |(k, v)| orig(v).map(|o| (k, o))
824        })
825        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
826        .into();
827
828        KeyedStream::new(
829            self.location.clone(),
830            HydroNode::FilterMap {
831                f: filter_map_f,
832                input: Box::new(self.ir_node.into_inner()),
833                metadata: self
834                    .location
835                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
836            },
837        )
838    }
839
840    /// An operator that both filters and maps each key-value pair. The resulting values are **not**
841    /// re-grouped even they are tuples; instead they will be grouped under the original key.
842    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
843    ///
844    /// # Example
845    /// ```rust
846    /// # #[cfg(feature = "deploy")] {
847    /// # use hydro_lang::prelude::*;
848    /// # use futures::StreamExt;
849    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
850    /// process
851    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
852    ///     .into_keyed()
853    ///     .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
854    /// #   .entries()
855    /// # }, |mut stream| async move {
856    /// // { 2: [2] }
857    /// # let mut results = Vec::new();
858    /// # for _ in 0..1 {
859    /// #     results.push(stream.next().await.unwrap());
860    /// # }
861    /// # results.sort();
862    /// # assert_eq!(results, vec![(2, 2)]);
863    /// # }));
864    /// # }
865    /// ```
866    pub fn filter_map_with_key<U, F>(
867        self,
868        f: impl IntoQuotedMut<'a, F, L> + Copy,
869    ) -> KeyedStream<K, U, L, B, O, R>
870    where
871        F: Fn((K, V)) -> Option<U> + 'a,
872        K: Clone,
873    {
874        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
875        let filter_map_f = q!({
876            let orig = f;
877            move |(k, v)| {
878                let out = orig((Clone::clone(&k), v));
879                out.map(|o| (k, o))
880            }
881        })
882        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
883        .into();
884
885        KeyedStream::new(
886            self.location.clone(),
887            HydroNode::FilterMap {
888                f: filter_map_f,
889                input: Box::new(self.ir_node.into_inner()),
890                metadata: self
891                    .location
892                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
893            },
894        )
895    }
896
897    /// Generates a keyed stream that maps each value `v` to a tuple `(v, x)`,
898    /// where `v` is the value of `other`, a bounded [`super::singleton::Singleton`] or
899    /// [`Optional`]. If `other` is an empty [`Optional`], no values will be produced.
900    ///
901    /// # Example
902    /// ```rust
903    /// # #[cfg(feature = "deploy")] {
904    /// # use hydro_lang::prelude::*;
905    /// # use futures::StreamExt;
906    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
907    /// let tick = process.tick();
908    /// let batch = process
909    ///   .source_iter(q!(vec![(1, 123), (1, 456), (2, 123)]))
910    ///   .into_keyed()
911    ///   .batch(&tick, nondet!(/** test */));
912    /// let count = batch.clone().entries().count(); // `count()` returns a singleton
913    /// batch.cross_singleton(count).all_ticks().entries()
914    /// # }, |mut stream| async move {
915    /// // { 1: [(123, 3), (456, 3)], 2: [(123, 3)] }
916    /// # let mut results = Vec::new();
917    /// # for _ in 0..3 {
918    /// #     results.push(stream.next().await.unwrap());
919    /// # }
920    /// # results.sort();
921    /// # assert_eq!(results, vec![(1, (123, 3)), (1, (456, 3)), (2, (123, 3))]);
922    /// # }));
923    /// # }
924    /// ```
925    pub fn cross_singleton<O2>(
926        self,
927        other: impl Into<Optional<O2, L, Bounded>>,
928    ) -> KeyedStream<K, (V, O2), L, B, O, R>
929    where
930        O2: Clone,
931    {
932        let other: Optional<O2, L, Bounded> = other.into();
933        check_matching_location(&self.location, &other.location);
934
935        Stream::new(
936            self.location.clone(),
937            HydroNode::CrossSingleton {
938                left: Box::new(self.ir_node.into_inner()),
939                right: Box::new(other.ir_node.into_inner()),
940                metadata: self
941                    .location
942                    .new_node_metadata(Stream::<((K, V), O2), L, B, O, R>::collection_kind()),
943            },
944        )
945        .map(q!(|((k, v), o2)| (k, (v, o2))))
946        .into_keyed()
947    }
948
949    /// For each value `v` in each group, transform `v` using `f` and then treat the
950    /// result as an [`Iterator`] to produce values one by one within the same group.
951    /// The implementation for [`Iterator`] for the output type `I` must produce items
952    /// in a **deterministic** order.
953    ///
954    /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
955    /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
956    ///
957    /// # Example
958    /// ```rust
959    /// # #[cfg(feature = "deploy")] {
960    /// # use hydro_lang::prelude::*;
961    /// # use futures::StreamExt;
962    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
963    /// process
964    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
965    ///     .into_keyed()
966    ///     .flat_map_ordered(q!(|x| x))
967    /// #   .entries()
968    /// # }, |mut stream| async move {
969    /// // { 1: [2, 3, 4], 2: [5, 6] }
970    /// # let mut results = Vec::new();
971    /// # for _ in 0..5 {
972    /// #     results.push(stream.next().await.unwrap());
973    /// # }
974    /// # results.sort();
975    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
976    /// # }));
977    /// # }
978    /// ```
979    pub fn flat_map_ordered<U, I, F>(
980        self,
981        f: impl IntoQuotedMut<'a, F, L> + Copy,
982    ) -> KeyedStream<K, U, L, B, O, R>
983    where
984        I: IntoIterator<Item = U>,
985        F: Fn(V) -> I + 'a,
986        K: Clone,
987    {
988        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
989        let flat_map_f = q!({
990            let orig = f;
991            move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
992        })
993        .splice_fn1_ctx::<(K, V), _>(&self.location)
994        .into();
995
996        KeyedStream::new(
997            self.location.clone(),
998            HydroNode::FlatMap {
999                f: flat_map_f,
1000                input: Box::new(self.ir_node.into_inner()),
1001                metadata: self
1002                    .location
1003                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
1004            },
1005        )
1006    }
1007
1008    /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
1009    /// for the output type `I` to produce items in any order.
1010    ///
1011    /// # Example
1012    /// ```rust
1013    /// # #[cfg(feature = "deploy")] {
1014    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1015    /// # use futures::StreamExt;
1016    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1017    /// process
1018    ///     .source_iter(q!(vec![
1019    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1020    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
1021    ///     ]))
1022    ///     .into_keyed()
1023    ///     .flat_map_unordered(q!(|x| x))
1024    /// #   .entries()
1025    /// # }, |mut stream| async move {
1026    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1027    /// # let mut results = Vec::new();
1028    /// # for _ in 0..4 {
1029    /// #     results.push(stream.next().await.unwrap());
1030    /// # }
1031    /// # results.sort();
1032    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1033    /// # }));
1034    /// # }
1035    /// ```
1036    pub fn flat_map_unordered<U, I, F>(
1037        self,
1038        f: impl IntoQuotedMut<'a, F, L> + Copy,
1039    ) -> KeyedStream<K, U, L, B, NoOrder, R>
1040    where
1041        I: IntoIterator<Item = U>,
1042        F: Fn(V) -> I + 'a,
1043        K: Clone,
1044    {
1045        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1046        let flat_map_f = q!({
1047            let orig = f;
1048            move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1049        })
1050        .splice_fn1_ctx::<(K, V), _>(&self.location)
1051        .into();
1052
1053        KeyedStream::new(
1054            self.location.clone(),
1055            HydroNode::FlatMap {
1056                f: flat_map_f,
1057                input: Box::new(self.ir_node.into_inner()),
1058                metadata: self
1059                    .location
1060                    .new_node_metadata(KeyedStream::<K, U, L, B, NoOrder, R>::collection_kind()),
1061            },
1062        )
1063    }
1064
1065    /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
1066    /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
1067    /// items in a **deterministic** order.
1068    ///
1069    /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
1070    /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
1071    ///
1072    /// # Example
1073    /// ```rust
1074    /// # #[cfg(feature = "deploy")] {
1075    /// # use hydro_lang::prelude::*;
1076    /// # use futures::StreamExt;
1077    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1078    /// process
1079    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
1080    ///     .into_keyed()
1081    ///     .flatten_ordered()
1082    /// #   .entries()
1083    /// # }, |mut stream| async move {
1084    /// // { 1: [2, 3, 4], 2: [5, 6] }
1085    /// # let mut results = Vec::new();
1086    /// # for _ in 0..5 {
1087    /// #     results.push(stream.next().await.unwrap());
1088    /// # }
1089    /// # results.sort();
1090    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1091    /// # }));
1092    /// # }
1093    /// ```
1094    pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
1095    where
1096        V: IntoIterator<Item = U>,
1097        K: Clone,
1098    {
1099        self.flat_map_ordered(q!(|d| d))
1100    }
1101
1102    /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
1103    /// for the value type `V` to produce items in any order.
1104    ///
1105    /// # Example
1106    /// ```rust
1107    /// # #[cfg(feature = "deploy")] {
1108    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1109    /// # use futures::StreamExt;
1110    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1111    /// process
1112    ///     .source_iter(q!(vec![
1113    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1114    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
1115    ///     ]))
1116    ///     .into_keyed()
1117    ///     .flatten_unordered()
1118    /// #   .entries()
1119    /// # }, |mut stream| async move {
1120    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1121    /// # let mut results = Vec::new();
1122    /// # for _ in 0..4 {
1123    /// #     results.push(stream.next().await.unwrap());
1124    /// # }
1125    /// # results.sort();
1126    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1127    /// # }));
1128    /// # }
1129    /// ```
1130    pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
1131    where
1132        V: IntoIterator<Item = U>,
1133        K: Clone,
1134    {
1135        self.flat_map_unordered(q!(|d| d))
1136    }
1137
1138    /// An operator which allows you to "inspect" each element of a stream without
1139    /// modifying it. The closure `f` is called on a reference to each value. This is
1140    /// mainly useful for debugging, and should not be used to generate side-effects.
1141    ///
1142    /// # Example
1143    /// ```rust
1144    /// # #[cfg(feature = "deploy")] {
1145    /// # use hydro_lang::prelude::*;
1146    /// # use futures::StreamExt;
1147    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1148    /// process
1149    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1150    ///     .into_keyed()
1151    ///     .inspect(q!(|v| println!("{}", v)))
1152    /// #   .entries()
1153    /// # }, |mut stream| async move {
1154    /// # let mut results = Vec::new();
1155    /// # for _ in 0..3 {
1156    /// #     results.push(stream.next().await.unwrap());
1157    /// # }
1158    /// # results.sort();
1159    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1160    /// # }));
1161    /// # }
1162    /// ```
1163    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1164    where
1165        F: Fn(&V) + 'a,
1166    {
1167        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1168        let inspect_f = q!({
1169            let orig = f;
1170            move |t: &(_, _)| orig(&t.1)
1171        })
1172        .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1173        .into();
1174
1175        KeyedStream::new(
1176            self.location.clone(),
1177            HydroNode::Inspect {
1178                f: inspect_f,
1179                input: Box::new(self.ir_node.into_inner()),
1180                metadata: self.location.new_node_metadata(Self::collection_kind()),
1181            },
1182        )
1183    }
1184
1185    /// An operator which allows you to "inspect" each element of a stream without
1186    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1187    /// mainly useful for debugging, and should not be used to generate side-effects.
1188    ///
1189    /// # Example
1190    /// ```rust
1191    /// # #[cfg(feature = "deploy")] {
1192    /// # use hydro_lang::prelude::*;
1193    /// # use futures::StreamExt;
1194    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1195    /// process
1196    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1197    ///     .into_keyed()
1198    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1199    /// #   .entries()
1200    /// # }, |mut stream| async move {
1201    /// # let mut results = Vec::new();
1202    /// # for _ in 0..3 {
1203    /// #     results.push(stream.next().await.unwrap());
1204    /// # }
1205    /// # results.sort();
1206    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1207    /// # }));
1208    /// # }
1209    /// ```
1210    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1211    where
1212        F: Fn(&(K, V)) + 'a,
1213    {
1214        let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1215
1216        KeyedStream::new(
1217            self.location.clone(),
1218            HydroNode::Inspect {
1219                f: inspect_f,
1220                input: Box::new(self.ir_node.into_inner()),
1221                metadata: self.location.new_node_metadata(Self::collection_kind()),
1222            },
1223        )
1224    }
1225
1226    /// An operator which allows you to "name" a `HydroNode`.
1227    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1228    pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
1229        {
1230            let mut node = self.ir_node.borrow_mut();
1231            let metadata = node.metadata_mut();
1232            metadata.tag = Some(name.to_owned());
1233        }
1234        self
1235    }
1236
1237    /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
1238    ///
1239    /// Unlike [`KeyedStream::fold`] which only returns the final accumulated value, `scan` produces a new stream
1240    /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
1241    /// early by returning `None`.
1242    ///
1243    /// The function takes a mutable reference to the accumulator and the current element, and returns
1244    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1245    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1246    ///
1247    /// # Example
1248    /// ```rust
1249    /// # #[cfg(feature = "deploy")] {
1250    /// # use hydro_lang::prelude::*;
1251    /// # use futures::StreamExt;
1252    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1253    /// process
1254    ///     .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
1255    ///     .into_keyed()
1256    ///     .scan(
1257    ///         q!(|| 0),
1258    ///         q!(|acc, x| {
1259    ///             *acc += x;
1260    ///             if *acc % 2 == 0 { None } else { Some(*acc) }
1261    ///         }),
1262    ///     )
1263    /// #   .entries()
1264    /// # }, |mut stream| async move {
1265    /// // Output: { 0: [1], 1: [3, 7] }
1266    /// # let mut results = Vec::new();
1267    /// # for _ in 0..3 {
1268    /// #     results.push(stream.next().await.unwrap());
1269    /// # }
1270    /// # results.sort();
1271    /// # assert_eq!(results, vec![(0, 1), (1, 3), (1, 7)]);
1272    /// # }));
1273    /// # }
1274    /// ```
1275    pub fn scan<A, U, I, F>(
1276        self,
1277        init: impl IntoQuotedMut<'a, I, L> + Copy,
1278        f: impl IntoQuotedMut<'a, F, L> + Copy,
1279    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1280    where
1281        O: IsOrdered,
1282        R: IsExactlyOnce,
1283        K: Clone + Eq + Hash,
1284        I: Fn() -> A + 'a,
1285        F: Fn(&mut A, V) -> Option<U> + 'a,
1286    {
1287        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1288        self.make_totally_ordered().make_exactly_once().generator(
1289            init,
1290            q!({
1291                let orig = f;
1292                move |state, v| {
1293                    if let Some(out) = orig(state, v) {
1294                        Generate::Yield(out)
1295                    } else {
1296                        Generate::Break
1297                    }
1298                }
1299            }),
1300        )
1301    }
1302
1303    /// Iteratively processes the elements in each group using a state machine that can yield
1304    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1305    /// syntax in Rust, without requiring special syntax.
1306    ///
1307    /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
1308    /// state for each group. The second argument defines the processing logic, taking in a
1309    /// mutable reference to the group's state and the value to be processed. It emits a
1310    /// [`Generate`] value, whose variants define what is emitted and whether further inputs
1311    /// should be processed.
1312    ///
1313    /// # Example
1314    /// ```rust
1315    /// # #[cfg(feature = "deploy")] {
1316    /// # use hydro_lang::prelude::*;
1317    /// # use futures::StreamExt;
1318    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1319    /// process
1320    ///     .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
1321    ///     .into_keyed()
1322    ///     .generator(
1323    ///         q!(|| 0),
1324    ///         q!(|acc, x| {
1325    ///             *acc += x;
1326    ///             if *acc > 100 {
1327    ///                 hydro_lang::live_collections::keyed_stream::Generate::Return(
1328    ///                     "done!".to_owned()
1329    ///                 )
1330    ///             } else if *acc % 2 == 0 {
1331    ///                 hydro_lang::live_collections::keyed_stream::Generate::Yield(
1332    ///                     "even".to_owned()
1333    ///                 )
1334    ///             } else {
1335    ///                 hydro_lang::live_collections::keyed_stream::Generate::Continue
1336    ///             }
1337    ///         }),
1338    ///     )
1339    /// #   .entries()
1340    /// # }, |mut stream| async move {
1341    /// // Output: { 0: ["even", "done!"], 1: ["even"] }
1342    /// # let mut results = Vec::new();
1343    /// # for _ in 0..3 {
1344    /// #     results.push(stream.next().await.unwrap());
1345    /// # }
1346    /// # results.sort();
1347    /// # assert_eq!(results, vec![(0, "done!".to_owned()), (0, "even".to_owned()), (1, "even".to_owned())]);
1348    /// # }));
1349    /// # }
1350    /// ```
1351    pub fn generator<A, U, I, F>(
1352        self,
1353        init: impl IntoQuotedMut<'a, I, L> + Copy,
1354        f: impl IntoQuotedMut<'a, F, L> + Copy,
1355    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1356    where
1357        O: IsOrdered,
1358        R: IsExactlyOnce,
1359        K: Clone + Eq + Hash,
1360        I: Fn() -> A + 'a,
1361        F: Fn(&mut A, V) -> Generate<U> + 'a,
1362    {
1363        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1364        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1365
1366        let this = self.make_totally_ordered().make_exactly_once();
1367
1368        let scan_init = q!(|| HashMap::new())
1369            .splice_fn0_ctx::<HashMap<K, Option<A>>>(&this.location)
1370            .into();
1371        let scan_f = q!(move |acc: &mut HashMap<_, _>, (k, v)| {
1372            let existing_state = acc.entry(Clone::clone(&k)).or_insert_with(|| Some(init()));
1373            if let Some(existing_state_value) = existing_state {
1374                match f(existing_state_value, v) {
1375                    Generate::Yield(out) => Some(Some((k, out))),
1376                    Generate::Return(out) => {
1377                        let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1378                        Some(Some((k, out)))
1379                    }
1380                    Generate::Break => {
1381                        let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1382                        Some(None)
1383                    }
1384                    Generate::Continue => Some(None),
1385                }
1386            } else {
1387                Some(None)
1388            }
1389        })
1390        .splice_fn2_borrow_mut_ctx::<HashMap<K, Option<A>>, (K, V), _>(&this.location)
1391        .into();
1392
1393        let scan_node = HydroNode::Scan {
1394            init: scan_init,
1395            acc: scan_f,
1396            input: Box::new(this.ir_node.into_inner()),
1397            metadata: this.location.new_node_metadata(Stream::<
1398                Option<(K, U)>,
1399                L,
1400                B,
1401                TotalOrder,
1402                ExactlyOnce,
1403            >::collection_kind()),
1404        };
1405
1406        let flatten_f = q!(|d| d)
1407            .splice_fn1_ctx::<Option<(K, U)>, _>(&this.location)
1408            .into();
1409        let flatten_node = HydroNode::FlatMap {
1410            f: flatten_f,
1411            input: Box::new(scan_node),
1412            metadata: this.location.new_node_metadata(KeyedStream::<
1413                K,
1414                U,
1415                L,
1416                B,
1417                TotalOrder,
1418                ExactlyOnce,
1419            >::collection_kind()),
1420        };
1421
1422        KeyedStream::new(this.location, flatten_node)
1423    }
1424
1425    /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
1426    /// in-order across the values in each group. But the aggregation function returns a boolean,
1427    /// which when true indicates that the aggregated result is complete and can be released to
1428    /// downstream computation. Unlike [`KeyedStream::fold`], this means that even if the input
1429    /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
1430    /// normal stream elements.
1431    ///
1432    /// # Example
1433    /// ```rust
1434    /// # #[cfg(feature = "deploy")] {
1435    /// # use hydro_lang::prelude::*;
1436    /// # use futures::StreamExt;
1437    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1438    /// process
1439    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1440    ///     .into_keyed()
1441    ///     .fold_early_stop(
1442    ///         q!(|| 0),
1443    ///         q!(|acc, x| {
1444    ///             *acc += x;
1445    ///             x % 2 == 0
1446    ///         }),
1447    ///     )
1448    /// #   .entries()
1449    /// # }, |mut stream| async move {
1450    /// // Output: { 0: 2, 1: 9 }
1451    /// # let mut results = Vec::new();
1452    /// # for _ in 0..2 {
1453    /// #     results.push(stream.next().await.unwrap());
1454    /// # }
1455    /// # results.sort();
1456    /// # assert_eq!(results, vec![(0, 2), (1, 9)]);
1457    /// # }));
1458    /// # }
1459    /// ```
1460    pub fn fold_early_stop<A, I, F>(
1461        self,
1462        init: impl IntoQuotedMut<'a, I, L> + Copy,
1463        f: impl IntoQuotedMut<'a, F, L> + Copy,
1464    ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
1465    where
1466        O: IsOrdered,
1467        R: IsExactlyOnce,
1468        K: Clone + Eq + Hash,
1469        I: Fn() -> A + 'a,
1470        F: Fn(&mut A, V) -> bool + 'a,
1471    {
1472        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1473        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1474        let out_without_bound_cast = self.generator(
1475            q!(move || Some(init())),
1476            q!(move |key_state, v| {
1477                if let Some(key_state_value) = key_state.as_mut() {
1478                    if f(key_state_value, v) {
1479                        Generate::Return(key_state.take().unwrap())
1480                    } else {
1481                        Generate::Continue
1482                    }
1483                } else {
1484                    unreachable!()
1485                }
1486            }),
1487        );
1488
1489        KeyedSingleton::new(
1490            out_without_bound_cast.location.clone(),
1491            HydroNode::Cast {
1492                inner: Box::new(out_without_bound_cast.ir_node.into_inner()),
1493                metadata: out_without_bound_cast
1494                    .location
1495                    .new_node_metadata(
1496                        KeyedSingleton::<K, A, L, B::WhenValueBounded>::collection_kind(),
1497                    ),
1498            },
1499        )
1500    }
1501
1502    /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
1503    /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
1504    /// otherwise the first element would be non-deterministic.
1505    ///
1506    /// # Example
1507    /// ```rust
1508    /// # #[cfg(feature = "deploy")] {
1509    /// # use hydro_lang::prelude::*;
1510    /// # use futures::StreamExt;
1511    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1512    /// process
1513    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1514    ///     .into_keyed()
1515    ///     .first()
1516    /// #   .entries()
1517    /// # }, |mut stream| async move {
1518    /// // Output: { 0: 2, 1: 3 }
1519    /// # let mut results = Vec::new();
1520    /// # for _ in 0..2 {
1521    /// #     results.push(stream.next().await.unwrap());
1522    /// # }
1523    /// # results.sort();
1524    /// # assert_eq!(results, vec![(0, 2), (1, 3)]);
1525    /// # }));
1526    /// # }
1527    /// ```
1528    pub fn first(self) -> KeyedSingleton<K, V, L, B::WhenValueBounded>
1529    where
1530        O: IsOrdered,
1531        R: IsExactlyOnce,
1532        K: Clone + Eq + Hash,
1533    {
1534        self.fold_early_stop(
1535            q!(|| None),
1536            q!(|acc, v| {
1537                *acc = Some(v);
1538                true
1539            }),
1540        )
1541        .map(q!(|v| v.unwrap()))
1542    }
1543
1544    /// Counts the number of elements in each group, producing a [`KeyedSingleton`] with the counts.
1545    ///
1546    /// # Example
1547    /// ```rust
1548    /// # #[cfg(feature = "deploy")] {
1549    /// # use hydro_lang::prelude::*;
1550    /// # use futures::StreamExt;
1551    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1552    /// let tick = process.tick();
1553    /// let numbers = process
1554    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4), (1, 5)]))
1555    ///     .into_keyed();
1556    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1557    /// batch
1558    ///     .value_counts()
1559    ///     .entries()
1560    ///     .all_ticks()
1561    /// # }, |mut stream| async move {
1562    /// // (1, 3), (2, 2)
1563    /// # let mut results = Vec::new();
1564    /// # for _ in 0..2 {
1565    /// #     results.push(stream.next().await.unwrap());
1566    /// # }
1567    /// # results.sort();
1568    /// # assert_eq!(results, vec![(1, 3), (2, 2)]);
1569    /// # }));
1570    /// # }
1571    /// ```
1572    pub fn value_counts(self) -> KeyedSingleton<K, usize, L, B::WhenValueUnbounded>
1573    where
1574        R: IsExactlyOnce,
1575        K: Eq + Hash,
1576    {
1577        self.make_exactly_once()
1578            .assume_ordering_trusted(
1579                nondet!(/** ordering within each group affects neither result nor intermediates */),
1580            )
1581            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1582    }
1583
1584    /// Like [`Stream::fold`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1585    /// group via the `comb` closure.
1586    ///
1587    /// Depending on the input stream guarantees, the closure may need to be commutative
1588    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1589    ///
1590    /// If the input and output value types are the same and do not require initialization then use
1591    /// [`KeyedStream::reduce`].
1592    ///
1593    /// # Example
1594    /// ```rust
1595    /// # #[cfg(feature = "deploy")] {
1596    /// # use hydro_lang::prelude::*;
1597    /// # use futures::StreamExt;
1598    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1599    /// let tick = process.tick();
1600    /// let numbers = process
1601    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1602    ///     .into_keyed();
1603    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1604    /// batch
1605    ///     .fold(q!(|| false), q!(|acc, x| *acc |= x))
1606    ///     .entries()
1607    ///     .all_ticks()
1608    /// # }, |mut stream| async move {
1609    /// // (1, false), (2, true)
1610    /// # let mut results = Vec::new();
1611    /// # for _ in 0..2 {
1612    /// #     results.push(stream.next().await.unwrap());
1613    /// # }
1614    /// # results.sort();
1615    /// # assert_eq!(results, vec![(1, false), (2, true)]);
1616    /// # }));
1617    /// # }
1618    /// ```
1619    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V), C, Idemp>(
1620        self,
1621        init: impl IntoQuotedMut<'a, I, L>,
1622        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1623    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
1624    where
1625        K: Eq + Hash,
1626        C: ValidCommutativityFor<O>,
1627        Idemp: ValidIdempotenceFor<R>,
1628    {
1629        let init = init.splice_fn0_ctx(&self.location).into();
1630        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1631        proof.register_proof(&comb);
1632
1633        let ordered = self
1634            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1635            .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1636
1637        KeyedSingleton::new(
1638            ordered.location.clone(),
1639            HydroNode::FoldKeyed {
1640                init,
1641                acc: comb.into(),
1642                input: Box::new(ordered.ir_node.into_inner()),
1643                metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1644                    K,
1645                    A,
1646                    L,
1647                    B::WhenValueUnbounded,
1648                >::collection_kind()),
1649            },
1650        )
1651    }
1652
1653    /// Like [`Stream::reduce`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1654    /// group via the `comb` closure.
1655    ///
1656    /// Depending on the input stream guarantees, the closure may need to be commutative
1657    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1658    ///
1659    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1660    ///
1661    /// # Example
1662    /// ```rust
1663    /// # #[cfg(feature = "deploy")] {
1664    /// # use hydro_lang::prelude::*;
1665    /// # use futures::StreamExt;
1666    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1667    /// let tick = process.tick();
1668    /// let numbers = process
1669    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1670    ///     .into_keyed();
1671    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1672    /// batch
1673    ///     .reduce(q!(|acc, x| *acc |= x))
1674    ///     .entries()
1675    ///     .all_ticks()
1676    /// # }, |mut stream| async move {
1677    /// // (1, false), (2, true)
1678    /// # let mut results = Vec::new();
1679    /// # for _ in 0..2 {
1680    /// #     results.push(stream.next().await.unwrap());
1681    /// # }
1682    /// # results.sort();
1683    /// # assert_eq!(results, vec![(1, false), (2, true)]);
1684    /// # }));
1685    /// # }
1686    /// ```
1687    pub fn reduce<F: Fn(&mut V, V) + 'a, C, Idemp>(
1688        self,
1689        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1690    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1691    where
1692        K: Eq + Hash,
1693        C: ValidCommutativityFor<O>,
1694        Idemp: ValidIdempotenceFor<R>,
1695    {
1696        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1697        proof.register_proof(&f);
1698
1699        let ordered = self
1700            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1701            .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1702
1703        KeyedSingleton::new(
1704            ordered.location.clone(),
1705            HydroNode::ReduceKeyed {
1706                f: f.into(),
1707                input: Box::new(ordered.ir_node.into_inner()),
1708                metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1709                    K,
1710                    V,
1711                    L,
1712                    B::WhenValueUnbounded,
1713                >::collection_kind()),
1714            },
1715        )
1716    }
1717
1718    /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark
1719    /// are automatically deleted.
1720    ///
1721    /// Depending on the input stream guarantees, the closure may need to be commutative
1722    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1723    ///
1724    /// # Example
1725    /// ```rust
1726    /// # #[cfg(feature = "deploy")] {
1727    /// # use hydro_lang::prelude::*;
1728    /// # use futures::StreamExt;
1729    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1730    /// let tick = process.tick();
1731    /// let watermark = tick.singleton(q!(1));
1732    /// let numbers = process
1733    ///     .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1734    ///     .into_keyed();
1735    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1736    /// batch
1737    ///     .reduce_watermark(watermark, q!(|acc, x| *acc |= x))
1738    ///     .entries()
1739    ///     .all_ticks()
1740    /// # }, |mut stream| async move {
1741    /// // (2, true)
1742    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1743    /// # }));
1744    /// # }
1745    /// ```
1746    pub fn reduce_watermark<O2, F, C, Idemp>(
1747        self,
1748        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1749        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1750    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1751    where
1752        K: Eq + Hash,
1753        O2: Clone,
1754        F: Fn(&mut V, V) + 'a,
1755        C: ValidCommutativityFor<O>,
1756        Idemp: ValidIdempotenceFor<R>,
1757    {
1758        let other: Optional<O2, Tick<L::Root>, Bounded> = other.into();
1759        check_matching_location(&self.location.root(), other.location.outer());
1760        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1761        proof.register_proof(&f);
1762
1763        let ordered = self
1764            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1765            .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1766
1767        KeyedSingleton::new(
1768            ordered.location.clone(),
1769            HydroNode::ReduceKeyedWatermark {
1770                f: f.into(),
1771                input: Box::new(ordered.ir_node.into_inner()),
1772                watermark: Box::new(other.ir_node.into_inner()),
1773                metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1774                    K,
1775                    V,
1776                    L,
1777                    B::WhenValueUnbounded,
1778                >::collection_kind()),
1779            },
1780        )
1781    }
1782
1783    /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1784    /// whose keys are not in the bounded stream.
1785    ///
1786    /// # Example
1787    /// ```rust
1788    /// # #[cfg(feature = "deploy")] {
1789    /// # use hydro_lang::prelude::*;
1790    /// # use futures::StreamExt;
1791    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1792    /// let tick = process.tick();
1793    /// let keyed_stream = process
1794    ///     .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1795    ///     .batch(&tick, nondet!(/** test */))
1796    ///     .into_keyed();
1797    /// let keys_to_remove = process
1798    ///     .source_iter(q!(vec![1, 2]))
1799    ///     .batch(&tick, nondet!(/** test */));
1800    /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1801    /// #   .entries()
1802    /// # }, |mut stream| async move {
1803    /// // { 3: ['c'], 4: ['d'] }
1804    /// # let mut results = Vec::new();
1805    /// # for _ in 0..2 {
1806    /// #     results.push(stream.next().await.unwrap());
1807    /// # }
1808    /// # results.sort();
1809    /// # assert_eq!(results, vec![(3, 'c'), (4, 'd')]);
1810    /// # }));
1811    /// # }
1812    /// ```
1813    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1814        self,
1815        other: Stream<K, L, Bounded, O2, R2>,
1816    ) -> Self
1817    where
1818        K: Eq + Hash,
1819    {
1820        check_matching_location(&self.location, &other.location);
1821
1822        KeyedStream::new(
1823            self.location.clone(),
1824            HydroNode::AntiJoin {
1825                pos: Box::new(self.ir_node.into_inner()),
1826                neg: Box::new(other.ir_node.into_inner()),
1827                metadata: self.location.new_node_metadata(Self::collection_kind()),
1828            },
1829        )
1830    }
1831
1832    /// Emit a keyed stream containing keys shared between two keyed streams,
1833    /// where each value in the output keyed stream is a tuple of
1834    /// (self's value, other's value).
1835    /// If there are multiple values for the same key, this performs a cross product
1836    /// for each matching key.
1837    ///
1838    /// # Example
1839    /// ```rust
1840    /// # #[cfg(feature = "deploy")] {
1841    /// # use hydro_lang::prelude::*;
1842    /// # use futures::StreamExt;
1843    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1844    /// let tick = process.tick();
1845    /// let keyed_data = process
1846    ///     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
1847    ///     .into_keyed()
1848    ///     .batch(&tick, nondet!(/** test */));
1849    /// let other_data = process
1850    ///     .source_iter(q!(vec![(1, 100), (2, 200), (2, 201)]))
1851    ///     .into_keyed()
1852    ///     .batch(&tick, nondet!(/** test */));
1853    /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
1854    /// # }, |mut stream| async move {
1855    /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200), (20, 201)] } in any order
1856    /// # let mut results = vec![];
1857    /// # for _ in 0..4 {
1858    /// #     results.push(stream.next().await.unwrap());
1859    /// # }
1860    /// # results.sort();
1861    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200)), (2, (20, 201))]);
1862    /// # }));
1863    /// # }
1864    /// ```
1865    pub fn join_keyed_stream<V2, O2: Ordering, R2: Retries>(
1866        self,
1867        other: KeyedStream<K, V2, L, B, O2, R2>,
1868    ) -> KeyedStream<K, (V, V2), L, B, NoOrder, <R as MinRetries<R2>>::Min>
1869    where
1870        K: Eq + Hash,
1871        R: MinRetries<R2>,
1872    {
1873        self.entries().join(other.entries()).into_keyed()
1874    }
1875
1876    /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
1877    /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
1878    /// is only present in one of the inputs, its values are passed through as-is). The output has
1879    /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
1880    ///
1881    /// Currently, both input streams must be [`Bounded`]. This operator will block
1882    /// on the first stream until all its elements are available. In a future version,
1883    /// we will relax the requirement on the `other` stream.
1884    ///
1885    /// # Example
1886    /// ```rust
1887    /// # #[cfg(feature = "deploy")] {
1888    /// # use hydro_lang::prelude::*;
1889    /// # use futures::StreamExt;
1890    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1891    /// let tick = process.tick();
1892    /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
1893    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1894    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1895    /// # .entries()
1896    /// # }, |mut stream| async move {
1897    /// // { 0: [2, 1], 1: [4, 3] }
1898    /// # let mut results = Vec::new();
1899    /// # for _ in 0..4 {
1900    /// #     results.push(stream.next().await.unwrap());
1901    /// # }
1902    /// # results.sort();
1903    /// # assert_eq!(results, vec![(0, 1), (0, 2), (1, 3), (1, 4)]);
1904    /// # }));
1905    /// # }
1906    /// ```
1907    pub fn chain<O2: Ordering, R2: Retries>(
1908        self,
1909        other: KeyedStream<K, V, L, Bounded, O2, R2>,
1910    ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1911    where
1912        B: IsBounded,
1913        O: MinOrder<O2>,
1914        R: MinRetries<R2>,
1915    {
1916        let this = self.make_bounded();
1917        check_matching_location(&this.location, &other.location);
1918
1919        KeyedStream::new(
1920            this.location.clone(),
1921            HydroNode::Chain {
1922                first: Box::new(this.ir_node.into_inner()),
1923                second: Box::new(other.ir_node.into_inner()),
1924                metadata: this.location.new_node_metadata(KeyedStream::<
1925                    K,
1926                    V,
1927                    L,
1928                    Bounded,
1929                    <O as MinOrder<O2>>::Min,
1930                    <R as MinRetries<R2>>::Min,
1931                >::collection_kind()),
1932            },
1933        )
1934    }
1935
1936    /// Emit a keyed stream containing keys shared between the keyed stream and the
1937    /// keyed singleton, where each value in the output keyed stream is a tuple of
1938    /// (the keyed stream's value, the keyed singleton's value).
1939    ///
1940    /// # Example
1941    /// ```rust
1942    /// # #[cfg(feature = "deploy")] {
1943    /// # use hydro_lang::prelude::*;
1944    /// # use futures::StreamExt;
1945    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1946    /// let tick = process.tick();
1947    /// let keyed_data = process
1948    ///     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
1949    ///     .into_keyed()
1950    ///     .batch(&tick, nondet!(/** test */));
1951    /// let singleton_data = process
1952    ///     .source_iter(q!(vec![(1, 100), (2, 200)]))
1953    ///     .into_keyed()
1954    ///     .batch(&tick, nondet!(/** test */))
1955    ///     .first();
1956    /// keyed_data.join_keyed_singleton(singleton_data).entries().all_ticks()
1957    /// # }, |mut stream| async move {
1958    /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200)] } in any order
1959    /// # let mut results = vec![];
1960    /// # for _ in 0..3 {
1961    /// #     results.push(stream.next().await.unwrap());
1962    /// # }
1963    /// # results.sort();
1964    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200))]);
1965    /// # }));
1966    /// # }
1967    /// ```
1968    pub fn join_keyed_singleton<V2: Clone>(
1969        self,
1970        keyed_singleton: KeyedSingleton<K, V2, L, Bounded>,
1971    ) -> KeyedStream<K, (V, V2), L, Bounded, NoOrder, R>
1972    where
1973        B: IsBounded,
1974        K: Eq + Hash,
1975    {
1976        keyed_singleton
1977            .join_keyed_stream(self.make_bounded())
1978            .map(q!(|(v2, v)| (v, v2)))
1979    }
1980
1981    /// Gets the values associated with a specific key from the keyed stream.
1982    ///
1983    /// # Example
1984    /// ```rust
1985    /// # #[cfg(feature = "deploy")] {
1986    /// # use hydro_lang::prelude::*;
1987    /// # use futures::StreamExt;
1988    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1989    /// let tick = process.tick();
1990    /// let keyed_data = process
1991    ///     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
1992    ///     .into_keyed()
1993    ///     .batch(&tick, nondet!(/** test */));
1994    /// let key = tick.singleton(q!(1));
1995    /// keyed_data.get(key).all_ticks()
1996    /// # }, |mut stream| async move {
1997    /// // 10, 11 in any order
1998    /// # let mut results = vec![];
1999    /// # for _ in 0..2 {
2000    /// #     results.push(stream.next().await.unwrap());
2001    /// # }
2002    /// # results.sort();
2003    /// # assert_eq!(results, vec![10, 11]);
2004    /// # }));
2005    /// # }
2006    /// ```
2007    pub fn get(self, key: Singleton<K, L, Bounded>) -> Stream<V, L, Bounded, NoOrder, R>
2008    where
2009        B: IsBounded,
2010        K: Eq + Hash,
2011    {
2012        self.make_bounded()
2013            .entries()
2014            .join(key.into_stream().map(q!(|k| (k, ()))))
2015            .map(q!(|(_, (v, _))| v))
2016    }
2017
2018    /// For each value in `self`, find the matching key in `lookup`.
2019    /// The output is a keyed stream with the key from `self`, and a value
2020    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2021    /// If the key is not present in `lookup`, the option will be [`None`].
2022    ///
2023    /// # Example
2024    /// ```rust
2025    /// # #[cfg(feature = "deploy")] {
2026    /// # use hydro_lang::prelude::*;
2027    /// # use futures::StreamExt;
2028    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2029    /// # let tick = process.tick();
2030    /// let requests = // { 1: [10, 11], 2: 20 }
2031    /// # process
2032    /// #     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2033    /// #     .into_keyed()
2034    /// #     .batch(&tick, nondet!(/** test */));
2035    /// let other_data = // { 10: 100, 11: 110 }
2036    /// # process
2037    /// #     .source_iter(q!(vec![(10, 100), (11, 110)]))
2038    /// #     .into_keyed()
2039    /// #     .batch(&tick, nondet!(/** test */))
2040    /// #     .first();
2041    /// requests.lookup_keyed_singleton(other_data)
2042    /// # .entries().all_ticks()
2043    /// # }, |mut stream| async move {
2044    /// // { 1: [(10, Some(100)), (11, Some(110))], 2: (20, None) }
2045    /// # let mut results = vec![];
2046    /// # for _ in 0..3 {
2047    /// #     results.push(stream.next().await.unwrap());
2048    /// # }
2049    /// # results.sort();
2050    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (11, Some(110))), (2, (20, None))]);
2051    /// # }));
2052    /// # }
2053    /// ```
2054    pub fn lookup_keyed_singleton<V2>(
2055        self,
2056        lookup: KeyedSingleton<V, V2, L, Bounded>,
2057    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
2058    where
2059        B: IsBounded,
2060        K: Eq + Hash + Clone,
2061        V: Eq + Hash + Clone,
2062        V2: Clone,
2063    {
2064        self.lookup_keyed_stream(
2065            lookup
2066                .into_keyed_stream()
2067                .assume_retries::<R>(nondet!(/** Retries are irrelevant for keyed singletons */)),
2068        )
2069    }
2070
2071    /// For each value in `self`, find the matching key in `lookup`.
2072    /// The output is a keyed stream with the key from `self`, and a value
2073    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2074    /// If the key is not present in `lookup`, the option will be [`None`].
2075    ///
2076    /// # Example
2077    /// ```rust
2078    /// # #[cfg(feature = "deploy")] {
2079    /// # use hydro_lang::prelude::*;
2080    /// # use futures::StreamExt;
2081    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2082    /// # let tick = process.tick();
2083    /// let requests = // { 1: [10, 11], 2: 20 }
2084    /// # process
2085    /// #     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2086    /// #     .into_keyed()
2087    /// #     .batch(&tick, nondet!(/** test */));
2088    /// let other_data = // { 10: [100, 101], 11: 110 }
2089    /// # process
2090    /// #     .source_iter(q!(vec![(10, 100), (10, 101), (11, 110)]))
2091    /// #     .into_keyed()
2092    /// #     .batch(&tick, nondet!(/** test */));
2093    /// requests.lookup_keyed_stream(other_data)
2094    /// # .entries().all_ticks()
2095    /// # }, |mut stream| async move {
2096    /// // { 1: [(10, Some(100)), (10, Some(101)), (11, Some(110))], 2: (20, None) }
2097    /// # let mut results = vec![];
2098    /// # for _ in 0..4 {
2099    /// #     results.push(stream.next().await.unwrap());
2100    /// # }
2101    /// # results.sort();
2102    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(101))), (1, (11, Some(110))), (2, (20, None))]);
2103    /// # }));
2104    /// # }
2105    /// ```
2106    #[expect(clippy::type_complexity, reason = "retries propagation")]
2107    pub fn lookup_keyed_stream<V2, O2: Ordering, R2: Retries>(
2108        self,
2109        lookup: KeyedStream<V, V2, L, Bounded, O2, R2>,
2110    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, <R as MinRetries<R2>>::Min>
2111    where
2112        B: IsBounded,
2113        K: Eq + Hash + Clone,
2114        V: Eq + Hash + Clone,
2115        V2: Clone,
2116        R: MinRetries<R2>,
2117    {
2118        let inverted = self
2119            .make_bounded()
2120            .entries()
2121            .map(q!(|(key, lookup_value)| (lookup_value, key)))
2122            .into_keyed();
2123        let found = inverted
2124            .clone()
2125            .join_keyed_stream(lookup.clone())
2126            .entries()
2127            .map(q!(|(lookup_value, (key, value))| (
2128                key,
2129                (lookup_value, Some(value))
2130            )))
2131            .into_keyed();
2132        let not_found = inverted
2133            .filter_key_not_in(lookup.keys())
2134            .entries()
2135            .map(q!(|(lookup_value, key)| (key, (lookup_value, None))))
2136            .into_keyed();
2137
2138        found.chain(not_found.weaken_retries::<<R as MinRetries<R2>>::Min>())
2139    }
2140
2141    /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
2142    /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
2143    ///
2144    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2145    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2146    /// argument that declares where the stream will be atomically processed. Batching a stream into
2147    /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2148    /// [`Tick`] will introduce asynchrony.
2149    pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
2150        let out_location = Atomic { tick: tick.clone() };
2151        KeyedStream::new(
2152            out_location.clone(),
2153            HydroNode::BeginAtomic {
2154                inner: Box::new(self.ir_node.into_inner()),
2155                metadata: out_location
2156                    .new_node_metadata(KeyedStream::<K, V, Atomic<L>, B, O, R>::collection_kind()),
2157            },
2158        )
2159    }
2160
2161    /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
2162    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2163    /// the order of the input.
2164    ///
2165    /// # Non-Determinism
2166    /// The batch boundaries are non-deterministic and may change across executions.
2167    pub fn batch(
2168        self,
2169        tick: &Tick<L>,
2170        nondet: NonDet,
2171    ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2172        let _ = nondet;
2173        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2174        KeyedStream::new(
2175            tick.clone(),
2176            HydroNode::Batch {
2177                inner: Box::new(self.ir_node.into_inner()),
2178                metadata: tick.new_node_metadata(
2179                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
2180                ),
2181            },
2182        )
2183    }
2184}
2185
2186impl<'a, K1, K2, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2187    KeyedStream<(K1, K2), V, L, B, O, R>
2188{
2189    /// Produces a new keyed stream by dropping the first element of the compound key.
2190    ///
2191    /// Because multiple keys may share the same suffix, this operation results in re-grouping
2192    /// of the values under the new keys. The values across groups with the same new key
2193    /// will be interleaved, so the resulting stream has [`NoOrder`] within each group.
2194    ///
2195    /// # Example
2196    /// ```rust
2197    /// # #[cfg(feature = "deploy")] {
2198    /// # use hydro_lang::prelude::*;
2199    /// # use futures::StreamExt;
2200    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2201    /// process
2202    ///     .source_iter(q!(vec![((1, 10), 2), ((1, 10), 3), ((2, 20), 4)]))
2203    ///     .into_keyed()
2204    ///     .drop_key_prefix()
2205    /// #   .entries()
2206    /// # }, |mut stream| async move {
2207    /// // { 10: [2, 3], 20: [4] }
2208    /// # let mut results = Vec::new();
2209    /// # for _ in 0..3 {
2210    /// #     results.push(stream.next().await.unwrap());
2211    /// # }
2212    /// # results.sort();
2213    /// # assert_eq!(results, vec![(10, 2), (10, 3), (20, 4)]);
2214    /// # }));
2215    /// # }
2216    /// ```
2217    pub fn drop_key_prefix(self) -> KeyedStream<K2, V, L, B, NoOrder, R> {
2218        self.entries()
2219            .map(q!(|((_k1, k2), v)| (k2, v)))
2220            .into_keyed()
2221    }
2222}
2223
2224impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries>
2225    KeyedStream<K, V, L, Unbounded, O, R>
2226{
2227    /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
2228    /// of any overlapping groups. The result has [`NoOrder`] on each group because the
2229    /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
2230    /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
2231    ///
2232    /// Currently, both input streams must be [`Unbounded`].
2233    ///
2234    /// # Example
2235    /// ```rust
2236    /// # #[cfg(feature = "deploy")] {
2237    /// # use hydro_lang::prelude::*;
2238    /// # use futures::StreamExt;
2239    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2240    /// let numbers1: KeyedStream<i32, i32, _> = // { 1: [2], 3: [4] }
2241    /// # process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed().into();
2242    /// let numbers2: KeyedStream<i32, i32, _> = // { 1: [3], 3: [5] }
2243    /// # process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed().into();
2244    /// numbers1.interleave(numbers2)
2245    /// #   .entries()
2246    /// # }, |mut stream| async move {
2247    /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
2248    /// # let mut results = Vec::new();
2249    /// # for _ in 0..4 {
2250    /// #     results.push(stream.next().await.unwrap());
2251    /// # }
2252    /// # results.sort();
2253    /// # assert_eq!(results, vec![(1, 2), (1, 3), (3, 4), (3, 5)]);
2254    /// # }));
2255    /// # }
2256    /// ```
2257    pub fn interleave<O2: Ordering, R2: Retries>(
2258        self,
2259        other: KeyedStream<K, V, L, Unbounded, O2, R2>,
2260    ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2261    where
2262        R: MinRetries<R2>,
2263    {
2264        KeyedStream::new(
2265            self.location.clone(),
2266            HydroNode::Chain {
2267                first: Box::new(self.ir_node.into_inner()),
2268                second: Box::new(other.ir_node.into_inner()),
2269                metadata: self.location.new_node_metadata(KeyedStream::<
2270                    K,
2271                    V,
2272                    L,
2273                    Unbounded,
2274                    NoOrder,
2275                    <R as MinRetries<R2>>::Min,
2276                >::collection_kind()),
2277            },
2278        )
2279    }
2280}
2281
2282impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
2283where
2284    L: Location<'a> + NoTick,
2285{
2286    /// Returns a keyed stream corresponding to the latest batch of elements being atomically
2287    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2288    /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
2289    /// used to create the atomic section.
2290    ///
2291    /// # Non-Determinism
2292    /// The batch boundaries are non-deterministic and may change across executions.
2293    pub fn batch_atomic(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2294        let _ = nondet;
2295        KeyedStream::new(
2296            self.location.clone().tick,
2297            HydroNode::Batch {
2298                inner: Box::new(self.ir_node.into_inner()),
2299                metadata: self.location.tick.new_node_metadata(KeyedStream::<
2300                    K,
2301                    V,
2302                    Tick<L>,
2303                    Bounded,
2304                    O,
2305                    R,
2306                >::collection_kind(
2307                )),
2308            },
2309        )
2310    }
2311
2312    /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
2313    /// See [`KeyedStream::atomic`] for more details.
2314    pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
2315        KeyedStream::new(
2316            self.location.tick.l.clone(),
2317            HydroNode::EndAtomic {
2318                inner: Box::new(self.ir_node.into_inner()),
2319                metadata: self
2320                    .location
2321                    .tick
2322                    .l
2323                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2324            },
2325        )
2326    }
2327}
2328
2329impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
2330where
2331    L: Location<'a>,
2332{
2333    /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2334    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2335    /// each key.
2336    pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
2337        KeyedStream::new(
2338            self.location.outer().clone(),
2339            HydroNode::YieldConcat {
2340                inner: Box::new(self.ir_node.into_inner()),
2341                metadata: self.location.outer().new_node_metadata(KeyedStream::<
2342                    K,
2343                    V,
2344                    L,
2345                    Unbounded,
2346                    O,
2347                    R,
2348                >::collection_kind(
2349                )),
2350            },
2351        )
2352    }
2353
2354    /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2355    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2356    /// each key.
2357    ///
2358    /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
2359    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2360    /// stream's [`Tick`] context.
2361    pub fn all_ticks_atomic(self) -> KeyedStream<K, V, Atomic<L>, Unbounded, O, R> {
2362        let out_location = Atomic {
2363            tick: self.location.clone(),
2364        };
2365
2366        KeyedStream::new(
2367            out_location.clone(),
2368            HydroNode::YieldConcat {
2369                inner: Box::new(self.ir_node.into_inner()),
2370                metadata: out_location.new_node_metadata(KeyedStream::<
2371                    K,
2372                    V,
2373                    Atomic<L>,
2374                    Unbounded,
2375                    O,
2376                    R,
2377                >::collection_kind()),
2378            },
2379        )
2380    }
2381
2382    /// Transforms the keyed stream using the given closure in "stateful" mode, where stateful operators
2383    /// such as `fold` retrain their memory for each key across ticks rather than resetting across batches of each key.
2384    ///
2385    /// This API is particularly useful for stateful computation on batches of data, such as
2386    /// maintaining an accumulated state that is up to date with the current batch.
2387    ///
2388    /// # Example
2389    /// ```rust
2390    /// # #[cfg(feature = "deploy")] {
2391    /// # use hydro_lang::prelude::*;
2392    /// # use futures::StreamExt;
2393    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2394    /// let tick = process.tick();
2395    /// # // ticks are lazy by default, forces the second tick to run
2396    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2397    /// # let batch_first_tick = process
2398    /// #   .source_iter(q!(vec![(0, 1), (1, 2), (2, 3), (3, 4)]))
2399    /// #   .into_keyed()
2400    /// #   .batch(&tick, nondet!(/** test */));
2401    /// # let batch_second_tick = process
2402    /// #   .source_iter(q!(vec![(0, 5), (1, 6), (2, 7)]))
2403    /// #   .into_keyed()
2404    /// #   .batch(&tick, nondet!(/** test */))
2405    /// #   .defer_tick(); // appears on the second tick
2406    /// let input = batch_first_tick.chain(batch_second_tick).all_ticks();
2407    ///
2408    /// input.batch(&tick, nondet!(/** test */))
2409    ///     .across_ticks(|s| s.reduce(q!(|sum, new| {
2410    ///         *sum += new;
2411    ///     }))).entries().all_ticks()
2412    /// # }, |mut stream| async move {
2413    /// // First tick: [(0, 1), (1, 2), (2, 3), (3, 4)]
2414    /// # let mut results = Vec::new();
2415    /// # for _ in 0..4 {
2416    /// #     results.push(stream.next().await.unwrap());
2417    /// # }
2418    /// # results.sort();
2419    /// # assert_eq!(results, vec![(0, 1), (1, 2), (2, 3), (3, 4)]);
2420    /// // Second tick: [(0, 6), (1, 8), (2, 10), (3, 4)]
2421    /// # results.clear();
2422    /// # for _ in 0..4 {
2423    /// #     results.push(stream.next().await.unwrap());
2424    /// # }
2425    /// # results.sort();
2426    /// # assert_eq!(results, vec![(0, 6), (1, 8), (2, 10), (3, 4)]);
2427    /// # }));
2428    /// # }
2429    /// ```
2430    pub fn across_ticks<Out: BatchAtomic>(
2431        self,
2432        thunk: impl FnOnce(KeyedStream<K, V, Atomic<L>, Unbounded, O, R>) -> Out,
2433    ) -> Out::Batched {
2434        thunk(self.all_ticks_atomic()).batched_atomic()
2435    }
2436
2437    /// Shifts the entries in `self` to the **next tick**, so that the returned keyed stream at
2438    /// tick `T` always has the entries of `self` at tick `T - 1`.
2439    ///
2440    /// At tick `0`, the output keyed stream is empty, since there is no previous tick.
2441    ///
2442    /// This operator enables stateful iterative processing with ticks, by sending data from one
2443    /// tick to the next. For example, you can use it to combine inputs across consecutive batches.
2444    ///
2445    /// # Example
2446    /// ```rust
2447    /// # #[cfg(feature = "deploy")] {
2448    /// # use hydro_lang::prelude::*;
2449    /// # use futures::StreamExt;
2450    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2451    /// let tick = process.tick();
2452    /// # // ticks are lazy by default, forces the second tick to run
2453    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2454    /// # let batch_first_tick = process
2455    /// #   .source_iter(q!(vec![(1, 2), (1, 3)]))
2456    /// #   .batch(&tick, nondet!(/** test */))
2457    /// #   .into_keyed();
2458    /// # let batch_second_tick = process
2459    /// #   .source_iter(q!(vec![(1, 4), (2, 5)]))
2460    /// #   .batch(&tick, nondet!(/** test */))
2461    /// #   .defer_tick()
2462    /// #   .into_keyed(); // appears on the second tick
2463    /// let changes_across_ticks = // { 1: [2, 3] } (first tick), { 1: [4], 2: [5] } (second tick)
2464    /// # batch_first_tick.chain(batch_second_tick);
2465    /// changes_across_ticks.clone().defer_tick().chain( // from the previous tick
2466    ///     changes_across_ticks // from the current tick
2467    /// )
2468    /// # .entries().all_ticks()
2469    /// # }, |mut stream| async move {
2470    /// // First tick: { 1: [2, 3] }
2471    /// # let mut results = Vec::new();
2472    /// # for _ in 0..2 {
2473    /// #     results.push(stream.next().await.unwrap());
2474    /// # }
2475    /// # results.sort();
2476    /// # assert_eq!(results, vec![(1, 2), (1, 3)]);
2477    /// // Second tick: { 1: [2, 3, 4], 2: [5] }
2478    /// # results.clear();
2479    /// # for _ in 0..4 {
2480    /// #     results.push(stream.next().await.unwrap());
2481    /// # }
2482    /// # results.sort();
2483    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5)]);
2484    /// // Third tick: { 1: [4], 2: [5] }
2485    /// # results.clear();
2486    /// # for _ in 0..2 {
2487    /// #     results.push(stream.next().await.unwrap());
2488    /// # }
2489    /// # results.sort();
2490    /// # assert_eq!(results, vec![(1, 4), (2, 5)]);
2491    /// # }));
2492    /// # }
2493    /// ```
2494    pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2495        KeyedStream::new(
2496            self.location.clone(),
2497            HydroNode::DeferTick {
2498                input: Box::new(self.ir_node.into_inner()),
2499                metadata: self.location.new_node_metadata(KeyedStream::<
2500                    K,
2501                    V,
2502                    Tick<L>,
2503                    Bounded,
2504                    O,
2505                    R,
2506                >::collection_kind()),
2507            },
2508        )
2509    }
2510}
2511
2512#[cfg(test)]
2513mod tests {
2514    #[cfg(feature = "deploy")]
2515    use futures::{SinkExt, StreamExt};
2516    #[cfg(feature = "deploy")]
2517    use hydro_deploy::Deployment;
2518    #[cfg(any(feature = "deploy", feature = "sim"))]
2519    use stageleft::q;
2520
2521    #[cfg(any(feature = "deploy", feature = "sim"))]
2522    use crate::compile::builder::FlowBuilder;
2523    #[cfg(feature = "deploy")]
2524    use crate::live_collections::stream::ExactlyOnce;
2525    #[cfg(feature = "sim")]
2526    use crate::live_collections::stream::{NoOrder, TotalOrder};
2527    #[cfg(any(feature = "deploy", feature = "sim"))]
2528    use crate::location::Location;
2529    #[cfg(any(feature = "deploy", feature = "sim"))]
2530    use crate::nondet::nondet;
2531    #[cfg(feature = "deploy")]
2532    use crate::properties::ManualProof;
2533
2534    #[cfg(feature = "deploy")]
2535    #[tokio::test]
2536    async fn reduce_watermark_filter() {
2537        let mut deployment = Deployment::new();
2538
2539        let mut flow = FlowBuilder::new();
2540        let node = flow.process::<()>();
2541        let external = flow.external::<()>();
2542
2543        let node_tick = node.tick();
2544        let watermark = node_tick.singleton(q!(1));
2545
2546        let sum = node
2547            .source_stream(q!(tokio_stream::iter([
2548                (0, 100),
2549                (1, 101),
2550                (2, 102),
2551                (2, 102)
2552            ])))
2553            .into_keyed()
2554            .reduce_watermark(
2555                watermark,
2556                q!(|acc, v| {
2557                    *acc += v;
2558                }),
2559            )
2560            .snapshot(&node_tick, nondet!(/** test */))
2561            .entries()
2562            .all_ticks()
2563            .send_bincode_external(&external);
2564
2565        let nodes = flow
2566            .with_process(&node, deployment.Localhost())
2567            .with_external(&external, deployment.Localhost())
2568            .deploy(&mut deployment);
2569
2570        deployment.deploy().await.unwrap();
2571
2572        let mut out = nodes.connect(sum).await;
2573
2574        deployment.start().await.unwrap();
2575
2576        assert_eq!(out.next().await.unwrap(), (2, 204));
2577    }
2578
2579    #[cfg(feature = "deploy")]
2580    #[tokio::test]
2581    async fn reduce_watermark_bounded() {
2582        let mut deployment = Deployment::new();
2583
2584        let mut flow = FlowBuilder::new();
2585        let node = flow.process::<()>();
2586        let external = flow.external::<()>();
2587
2588        let node_tick = node.tick();
2589        let watermark = node_tick.singleton(q!(1));
2590
2591        let sum = node
2592            .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2593            .into_keyed()
2594            .reduce_watermark(
2595                watermark,
2596                q!(|acc, v| {
2597                    *acc += v;
2598                }),
2599            )
2600            .entries()
2601            .send_bincode_external(&external);
2602
2603        let nodes = flow
2604            .with_process(&node, deployment.Localhost())
2605            .with_external(&external, deployment.Localhost())
2606            .deploy(&mut deployment);
2607
2608        deployment.deploy().await.unwrap();
2609
2610        let mut out = nodes.connect(sum).await;
2611
2612        deployment.start().await.unwrap();
2613
2614        assert_eq!(out.next().await.unwrap(), (2, 204));
2615    }
2616
2617    #[cfg(feature = "deploy")]
2618    #[tokio::test]
2619    async fn reduce_watermark_garbage_collect() {
2620        let mut deployment = Deployment::new();
2621
2622        let mut flow = FlowBuilder::new();
2623        let node = flow.process::<()>();
2624        let external = flow.external::<()>();
2625        let (tick_send, tick_trigger) =
2626            node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
2627
2628        let node_tick = node.tick();
2629        let (watermark_complete_cycle, watermark) =
2630            node_tick.cycle_with_initial(node_tick.singleton(q!(1)));
2631        let next_watermark = watermark.clone().map(q!(|v| v + 1));
2632        watermark_complete_cycle.complete_next_tick(next_watermark);
2633
2634        let tick_triggered_input = node_tick
2635            .singleton(q!((3, 103)))
2636            .into_stream()
2637            .filter_if_some(
2638                tick_trigger
2639                    .clone()
2640                    .batch(&node_tick, nondet!(/** test */))
2641                    .first(),
2642            )
2643            .all_ticks();
2644
2645        let sum = node
2646            .source_stream(q!(tokio_stream::iter([
2647                (0, 100),
2648                (1, 101),
2649                (2, 102),
2650                (2, 102)
2651            ])))
2652            .interleave(tick_triggered_input)
2653            .into_keyed()
2654            .reduce_watermark(
2655                watermark,
2656                q!(
2657                    |acc, v| {
2658                        *acc += v;
2659                    },
2660                    commutative = ManualProof(/* integer addition is commutative */)
2661                ),
2662            )
2663            .snapshot(&node_tick, nondet!(/** test */))
2664            .entries()
2665            .all_ticks()
2666            .send_bincode_external(&external);
2667
2668        let nodes = flow
2669            .with_default_optimize()
2670            .with_process(&node, deployment.Localhost())
2671            .with_external(&external, deployment.Localhost())
2672            .deploy(&mut deployment);
2673
2674        deployment.deploy().await.unwrap();
2675
2676        let mut tick_send = nodes.connect(tick_send).await;
2677        let mut out_recv = nodes.connect(sum).await;
2678
2679        deployment.start().await.unwrap();
2680
2681        assert_eq!(out_recv.next().await.unwrap(), (2, 204));
2682
2683        tick_send.send(()).await.unwrap();
2684
2685        assert_eq!(out_recv.next().await.unwrap(), (3, 103));
2686    }
2687
2688    #[cfg(feature = "sim")]
2689    #[test]
2690    #[should_panic]
2691    fn sim_batch_nondet_size() {
2692        let mut flow = FlowBuilder::new();
2693        let node = flow.process::<()>();
2694
2695        let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2696
2697        let tick = node.tick();
2698        let out_recv = input
2699            .batch(&tick, nondet!(/** test */))
2700            .fold(q!(|| vec![]), q!(|acc, v| acc.push(v)))
2701            .entries()
2702            .all_ticks()
2703            .sim_output();
2704
2705        flow.sim().exhaustive(async || {
2706            out_recv
2707                .assert_yields_only_unordered([(1, vec![1, 2])])
2708                .await;
2709        });
2710    }
2711
2712    #[cfg(feature = "sim")]
2713    #[test]
2714    fn sim_batch_preserves_group_order() {
2715        let mut flow = FlowBuilder::new();
2716        let node = flow.process::<()>();
2717
2718        let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2719
2720        let tick = node.tick();
2721        let out_recv = input
2722            .batch(&tick, nondet!(/** test */))
2723            .all_ticks()
2724            .fold_early_stop(
2725                q!(|| 0),
2726                q!(|acc, v| {
2727                    *acc = std::cmp::max(v, *acc);
2728                    *acc >= 2
2729                }),
2730            )
2731            .entries()
2732            .sim_output();
2733
2734        let instances = flow.sim().exhaustive(async || {
2735            out_recv
2736                .assert_yields_only_unordered([(1, 2), (2, 3)])
2737                .await;
2738        });
2739
2740        assert_eq!(instances, 8);
2741        // - three cases: all three in a separate tick (pick where (2, 3) is)
2742        // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after
2743        // - two cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them
2744        // - one case: all three together
2745    }
2746
2747    #[cfg(feature = "sim")]
2748    #[test]
2749    fn sim_batch_unordered_shuffles() {
2750        let mut flow = FlowBuilder::new();
2751        let node = flow.process::<()>();
2752
2753        let input = node
2754            .source_iter(q!([(1, 1), (1, 2), (2, 3)]))
2755            .into_keyed()
2756            .weaken_ordering::<NoOrder>();
2757
2758        let tick = node.tick();
2759        let out_recv = input
2760            .batch(&tick, nondet!(/** test */))
2761            .all_ticks()
2762            .entries()
2763            .sim_output();
2764
2765        let instances = flow.sim().exhaustive(async || {
2766            out_recv
2767                .assert_yields_only_unordered([(1, 1), (1, 2), (2, 3)])
2768                .await;
2769        });
2770
2771        assert_eq!(instances, 13);
2772        // - 6 (3 * 2) cases: all three in a separate tick (pick where (2, 3) is), and order of (1, 1), (1, 2)
2773        // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2774        // - 4 (2 * 2) cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them, and order of (1, 1), (1, 2)
2775        // - one case: all three together (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2776    }
2777
2778    #[cfg(feature = "sim")]
2779    #[test]
2780    #[should_panic]
2781    fn sim_observe_order_batched() {
2782        let mut flow = FlowBuilder::new();
2783        let node = flow.process::<()>();
2784
2785        let (port, input) = node.sim_input::<_, NoOrder, _>();
2786
2787        let tick = node.tick();
2788        let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
2789        let out_recv = batch
2790            .assume_ordering::<TotalOrder>(nondet!(/** test */))
2791            .all_ticks()
2792            .first()
2793            .entries()
2794            .sim_output();
2795
2796        flow.sim().exhaustive(async || {
2797            port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
2798            out_recv
2799                .assert_yields_only_unordered([(1, 1), (2, 1)])
2800                .await; // fails with assume_ordering
2801        });
2802    }
2803
2804    #[cfg(feature = "sim")]
2805    #[test]
2806    fn sim_observe_order_batched_count() {
2807        let mut flow = FlowBuilder::new();
2808        let node = flow.process::<()>();
2809
2810        let (port, input) = node.sim_input::<_, NoOrder, _>();
2811
2812        let tick = node.tick();
2813        let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
2814        let out_recv = batch
2815            .assume_ordering::<TotalOrder>(nondet!(/** test */))
2816            .all_ticks()
2817            .entries()
2818            .sim_output();
2819
2820        let instance_count = flow.sim().exhaustive(async || {
2821            port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
2822            let _ = out_recv.collect_sorted::<Vec<_>>().await;
2823        });
2824
2825        assert_eq!(instance_count, 104); // too complicated to enumerate here, but less than stream equivalent
2826    }
2827
2828    #[cfg(feature = "sim")]
2829    #[test]
2830    fn sim_top_level_assume_ordering() {
2831        use std::collections::HashMap;
2832
2833        let mut flow = FlowBuilder::new();
2834        let node = flow.process::<()>();
2835
2836        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2837
2838        let out_recv = input
2839            .into_keyed()
2840            .assume_ordering::<TotalOrder>(nondet!(/** test */))
2841            .fold_early_stop(
2842                q!(|| Vec::new()),
2843                q!(|acc, v| {
2844                    acc.push(v);
2845                    acc.len() >= 2
2846                }),
2847            )
2848            .entries()
2849            .sim_output();
2850
2851        let instance_count = flow.sim().exhaustive(async || {
2852            in_send.send_many_unordered([(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd')]);
2853            let out: HashMap<_, _> = out_recv
2854                .collect_sorted::<Vec<_>>()
2855                .await
2856                .into_iter()
2857                .collect();
2858            // Each key accumulates its values; we get one entry per key
2859            assert_eq!(out.len(), 2);
2860        });
2861
2862        assert_eq!(instance_count, 24)
2863    }
2864
2865    #[cfg(feature = "sim")]
2866    #[test]
2867    fn sim_top_level_assume_ordering_cycle_back() {
2868        use std::collections::HashMap;
2869
2870        let mut flow = FlowBuilder::new();
2871        let node = flow.process::<()>();
2872
2873        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2874
2875        let (complete_cycle_back, cycle_back) =
2876            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
2877        let ordered = input
2878            .into_keyed()
2879            .interleave(cycle_back)
2880            .assume_ordering::<TotalOrder>(nondet!(/** test */));
2881        complete_cycle_back.complete(
2882            ordered
2883                .clone()
2884                .map(q!(|v| v + 1))
2885                .filter(q!(|v| v % 2 == 1)),
2886        );
2887
2888        let out_recv = ordered
2889            .fold_early_stop(
2890                q!(|| Vec::new()),
2891                q!(|acc, v| {
2892                    acc.push(v);
2893                    acc.len() >= 2
2894                }),
2895            )
2896            .entries()
2897            .sim_output();
2898
2899        let mut saw = false;
2900        let instance_count = flow.sim().exhaustive(async || {
2901            // Send (1, 0) and (1, 2). 0+1=1 is odd so cycles back.
2902            // We want to see [0, 1] - the cycled back value interleaved
2903            in_send.send_many_unordered([(1, 0), (1, 2)]);
2904            let out: HashMap<_, _> = out_recv
2905                .collect_sorted::<Vec<_>>()
2906                .await
2907                .into_iter()
2908                .collect();
2909
2910            // We want to see an instance where key 1 gets: 0, then 1 (cycled back from 0+1)
2911            if let Some(values) = out.get(&1)
2912                && *values == vec![0, 1]
2913            {
2914                saw = true;
2915            }
2916        });
2917
2918        assert!(
2919            saw,
2920            "did not see an instance with key 1 having [0, 1] in order"
2921        );
2922        assert_eq!(instance_count, 6);
2923    }
2924
2925    #[cfg(feature = "sim")]
2926    #[test]
2927    fn sim_top_level_assume_ordering_cross_key_cycle() {
2928        use std::collections::HashMap;
2929
2930        // This test demonstrates why releasing one entry at a time is important:
2931        // When one key's observed order cycles back into a different key, we need
2932        // to be able to interleave the cycled-back entry with pending items for
2933        // that other key.
2934        let mut flow = FlowBuilder::new();
2935        let node = flow.process::<()>();
2936
2937        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2938
2939        let (complete_cycle_back, cycle_back) =
2940            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
2941        let ordered = input
2942            .into_keyed()
2943            .interleave(cycle_back)
2944            .assume_ordering::<TotalOrder>(nondet!(/** test */));
2945
2946        // Cycle back: when we see (1, 10), emit (2, 100) to key 2
2947        complete_cycle_back.complete(
2948            ordered
2949                .clone()
2950                .filter(q!(|v| *v == 10))
2951                .map(q!(|_| 100))
2952                .entries()
2953                .map(q!(|(_, v)| (2, v))) // Change key from 1 to 2
2954                .into_keyed(),
2955        );
2956
2957        let out_recv = ordered
2958            .fold_early_stop(
2959                q!(|| Vec::new()),
2960                q!(|acc, v| {
2961                    acc.push(v);
2962                    acc.len() >= 2
2963                }),
2964            )
2965            .entries()
2966            .sim_output();
2967
2968        // We want to see an instance where:
2969        // - (1, 10) is released first
2970        // - This causes (2, 100) to be cycled back
2971        // - (2, 100) is released BEFORE (2, 20) which was already pending
2972        let mut saw_cross_key_interleave = false;
2973        let instance_count = flow.sim().exhaustive(async || {
2974            // Send (1, 10), (1, 11) for key 1, and (2, 20), (2, 21) for key 2
2975            in_send.send_many_unordered([(1, 10), (1, 11), (2, 20), (2, 21)]);
2976            let out: HashMap<_, _> = out_recv
2977                .collect_sorted::<Vec<_>>()
2978                .await
2979                .into_iter()
2980                .collect();
2981
2982            // Check if we see the cross-key interleaving:
2983            // key 2 should have [100, 20] or [100, 21] - cycled back 100 before a pending item
2984            if let Some(values) = out.get(&2)
2985                && values.len() >= 2
2986                && values[0] == 100
2987            {
2988                saw_cross_key_interleave = true;
2989            }
2990        });
2991
2992        assert!(
2993            saw_cross_key_interleave,
2994            "did not see an instance where cycled-back 100 was released before pending items for key 2"
2995        );
2996        assert_eq!(instance_count, 60);
2997    }
2998
2999    #[cfg(feature = "sim")]
3000    #[test]
3001    fn sim_top_level_assume_ordering_cycle_back_tick() {
3002        use std::collections::HashMap;
3003
3004        let mut flow = FlowBuilder::new();
3005        let node = flow.process::<()>();
3006
3007        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3008
3009        let (complete_cycle_back, cycle_back) =
3010            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3011        let ordered = input
3012            .into_keyed()
3013            .interleave(cycle_back)
3014            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3015        complete_cycle_back.complete(
3016            ordered
3017                .clone()
3018                .batch(&node.tick(), nondet!(/** test */))
3019                .all_ticks()
3020                .map(q!(|v| v + 1))
3021                .filter(q!(|v| v % 2 == 1)),
3022        );
3023
3024        let out_recv = ordered
3025            .fold_early_stop(
3026                q!(|| Vec::new()),
3027                q!(|acc, v| {
3028                    acc.push(v);
3029                    acc.len() >= 2
3030                }),
3031            )
3032            .entries()
3033            .sim_output();
3034
3035        let mut saw = false;
3036        let instance_count = flow.sim().exhaustive(async || {
3037            in_send.send_many_unordered([(1, 0), (1, 2)]);
3038            let out: HashMap<_, _> = out_recv
3039                .collect_sorted::<Vec<_>>()
3040                .await
3041                .into_iter()
3042                .collect();
3043
3044            if let Some(values) = out.get(&1)
3045                && *values == vec![0, 1]
3046            {
3047                saw = true;
3048            }
3049        });
3050
3051        assert!(
3052            saw,
3053            "did not see an instance with key 1 having [0, 1] in order"
3054        );
3055        assert_eq!(instance_count, 58);
3056    }
3057}