hydro_lang/live_collections/keyed_stream/mod.rs
1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
11
12use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
13use super::keyed_singleton::KeyedSingleton;
14use super::optional::Optional;
15use super::singleton::Singleton;
16use super::stream::{
17 ExactlyOnce, IsExactlyOnce, IsOrdered, MinOrder, MinRetries, NoOrder, Stream, TotalOrder,
18};
19use crate::compile::builder::CycleId;
20use crate::compile::ir::{
21 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
22};
23#[cfg(stageleft_runtime)]
24use crate::forward_handle::{CycleCollection, ReceiverComplete};
25use crate::forward_handle::{ForwardRef, TickCycle};
26use crate::live_collections::batch_atomic::BatchAtomic;
27use crate::live_collections::stream::{
28 AtLeastOnce, Ordering, Retries, WeakerOrderingThan, WeakerRetryThan,
29};
30#[cfg(stageleft_runtime)]
31use crate::location::dynamic::{DynLocation, LocationId};
32use crate::location::tick::DeferTick;
33use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
34use crate::manual_expr::ManualExpr;
35use crate::nondet::{NonDet, nondet};
36use crate::properties::{AggFuncAlgebra, ValidCommutativityFor, ValidIdempotenceFor};
37
38pub mod networking;
39
40/// Streaming elements of type `V` grouped by a key of type `K`.
41///
42/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
43/// order of keys is non-deterministic but the order *within* each group may be deterministic.
44///
45/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
46/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
47/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
48///
49/// Type Parameters:
50/// - `K`: the type of the key for each group
51/// - `V`: the type of the elements inside each group
52/// - `Loc`: the [`Location`] where the keyed stream is materialized
53/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
54/// - `Order`: tracks whether the elements within each group have deterministic order
55/// ([`TotalOrder`]) or not ([`NoOrder`])
56/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
57/// ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
58pub struct KeyedStream<
59 K,
60 V,
61 Loc,
62 Bound: Boundedness = Unbounded,
63 Order: Ordering = TotalOrder,
64 Retry: Retries = ExactlyOnce,
65> {
66 pub(crate) location: Loc,
67 pub(crate) ir_node: RefCell<HydroNode>,
68
69 _phantom: PhantomData<(K, V, Loc, Bound, Order, Retry)>,
70}
71
72impl<'a, K, V, L, O: Ordering, R: Retries> From<KeyedStream<K, V, L, Bounded, O, R>>
73 for KeyedStream<K, V, L, Unbounded, O, R>
74where
75 L: Location<'a>,
76{
77 fn from(stream: KeyedStream<K, V, L, Bounded, O, R>) -> KeyedStream<K, V, L, Unbounded, O, R> {
78 let new_meta = stream
79 .location
80 .new_node_metadata(KeyedStream::<K, V, L, Unbounded, O, R>::collection_kind());
81
82 KeyedStream {
83 location: stream.location,
84 ir_node: RefCell::new(HydroNode::Cast {
85 inner: Box::new(stream.ir_node.into_inner()),
86 metadata: new_meta,
87 }),
88 _phantom: PhantomData,
89 }
90 }
91}
92
93impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
94 for KeyedStream<K, V, L, B, NoOrder, R>
95where
96 L: Location<'a>,
97{
98 fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
99 stream.weaken_ordering()
100 }
101}
102
103impl<'a, K, V, L, O: Ordering, R: Retries> DeferTick for KeyedStream<K, V, Tick<L>, Bounded, O, R>
104where
105 L: Location<'a>,
106{
107 fn defer_tick(self) -> Self {
108 KeyedStream::defer_tick(self)
109 }
110}
111
112impl<'a, K, V, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
113 for KeyedStream<K, V, Tick<L>, Bounded, O, R>
114where
115 L: Location<'a>,
116{
117 type Location = Tick<L>;
118
119 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
120 KeyedStream {
121 location: location.clone(),
122 ir_node: RefCell::new(HydroNode::CycleSource {
123 cycle_id,
124 metadata: location.new_node_metadata(
125 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
126 ),
127 }),
128 _phantom: PhantomData,
129 }
130 }
131}
132
133impl<'a, K, V, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
134 for KeyedStream<K, V, Tick<L>, Bounded, O, R>
135where
136 L: Location<'a>,
137{
138 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
139 assert_eq!(
140 Location::id(&self.location),
141 expected_location,
142 "locations do not match"
143 );
144
145 self.location
146 .flow_state()
147 .borrow_mut()
148 .push_root(HydroRoot::CycleSink {
149 cycle_id,
150 input: Box::new(self.ir_node.into_inner()),
151 op_metadata: HydroIrOpMetadata::new(),
152 });
153 }
154}
155
156impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
157 for KeyedStream<K, V, L, B, O, R>
158where
159 L: Location<'a> + NoTick,
160{
161 type Location = L;
162
163 fn create_source(cycle_id: CycleId, location: L) -> Self {
164 KeyedStream {
165 location: location.clone(),
166 ir_node: RefCell::new(HydroNode::CycleSource {
167 cycle_id,
168 metadata: location
169 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
170 }),
171 _phantom: PhantomData,
172 }
173 }
174}
175
176impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
177 for KeyedStream<K, V, L, B, O, R>
178where
179 L: Location<'a> + NoTick,
180{
181 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
182 assert_eq!(
183 Location::id(&self.location),
184 expected_location,
185 "locations do not match"
186 );
187 self.location
188 .flow_state()
189 .borrow_mut()
190 .push_root(HydroRoot::CycleSink {
191 cycle_id,
192 input: Box::new(self.ir_node.into_inner()),
193 op_metadata: HydroIrOpMetadata::new(),
194 });
195 }
196}
197
198impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
199 Clone for KeyedStream<K, V, Loc, Bound, Order, R>
200{
201 fn clone(&self) -> Self {
202 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
203 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
204 *self.ir_node.borrow_mut() = HydroNode::Tee {
205 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
206 metadata: self.location.new_node_metadata(Self::collection_kind()),
207 };
208 }
209
210 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
211 KeyedStream {
212 location: self.location.clone(),
213 ir_node: HydroNode::Tee {
214 inner: TeeNode(inner.0.clone()),
215 metadata: metadata.clone(),
216 }
217 .into(),
218 _phantom: PhantomData,
219 }
220 } else {
221 unreachable!()
222 }
223 }
224}
225
226/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
227/// control the processing of future elements.
228pub enum Generate<T> {
229 /// Emit the provided element, and keep processing future inputs.
230 Yield(T),
231 /// Emit the provided element as the _final_ element, do not process future inputs.
232 Return(T),
233 /// Do not emit anything, but continue processing future inputs.
234 Continue,
235 /// Do not emit anything, and do not process further inputs.
236 Break,
237}
238
239impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
240 KeyedStream<K, V, L, B, O, R>
241{
242 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
243 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
244 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
245
246 KeyedStream {
247 location,
248 ir_node: RefCell::new(ir_node),
249 _phantom: PhantomData,
250 }
251 }
252
253 /// Returns the [`CollectionKind`] corresponding to this type.
254 pub fn collection_kind() -> CollectionKind {
255 CollectionKind::KeyedStream {
256 bound: B::BOUND_KIND,
257 value_order: O::ORDERING_KIND,
258 value_retry: R::RETRIES_KIND,
259 key_type: stageleft::quote_type::<K>().into(),
260 value_type: stageleft::quote_type::<V>().into(),
261 }
262 }
263
264 /// Returns the [`Location`] where this keyed stream is being materialized.
265 pub fn location(&self) -> &L {
266 &self.location
267 }
268
269 /// Explicitly "casts" the keyed stream to a type with a different ordering
270 /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
271 /// by the type-system.
272 ///
273 /// # Non-Determinism
274 /// This function is used as an escape hatch, and any mistakes in the
275 /// provided ordering guarantee will propagate into the guarantees
276 /// for the rest of the program.
277 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
278 if O::ORDERING_KIND == O2::ORDERING_KIND {
279 KeyedStream::new(self.location, self.ir_node.into_inner())
280 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
281 // We can always weaken the ordering guarantee
282 KeyedStream::new(
283 self.location.clone(),
284 HydroNode::Cast {
285 inner: Box::new(self.ir_node.into_inner()),
286 metadata: self
287 .location
288 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
289 },
290 )
291 } else {
292 KeyedStream::new(
293 self.location.clone(),
294 HydroNode::ObserveNonDet {
295 inner: Box::new(self.ir_node.into_inner()),
296 trusted: false,
297 metadata: self
298 .location
299 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
300 },
301 )
302 }
303 }
304
305 fn assume_ordering_trusted<O2: Ordering>(
306 self,
307 _nondet: NonDet,
308 ) -> KeyedStream<K, V, L, B, O2, R> {
309 if O::ORDERING_KIND == O2::ORDERING_KIND {
310 KeyedStream::new(self.location, self.ir_node.into_inner())
311 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
312 // We can always weaken the ordering guarantee
313 KeyedStream::new(
314 self.location.clone(),
315 HydroNode::Cast {
316 inner: Box::new(self.ir_node.into_inner()),
317 metadata: self
318 .location
319 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
320 },
321 )
322 } else {
323 KeyedStream::new(
324 self.location.clone(),
325 HydroNode::ObserveNonDet {
326 inner: Box::new(self.ir_node.into_inner()),
327 trusted: true,
328 metadata: self
329 .location
330 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
331 },
332 )
333 }
334 }
335
336 #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
337 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
338 /// which is always safe because that is the weakest possible guarantee.
339 pub fn weakest_ordering(self) -> KeyedStream<K, V, L, B, NoOrder, R> {
340 self.weaken_ordering::<NoOrder>()
341 }
342
343 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
344 /// enforcing that `O2` is weaker than the input ordering guarantee.
345 pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> KeyedStream<K, V, L, B, O2, R> {
346 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
347 self.assume_ordering::<O2>(nondet)
348 }
349
350 /// Explicitly "casts" the keyed stream to a type with a different retries
351 /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
352 /// be proven by the type-system.
353 ///
354 /// # Non-Determinism
355 /// This function is used as an escape hatch, and any mistakes in the
356 /// provided retries guarantee will propagate into the guarantees
357 /// for the rest of the program.
358 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
359 if R::RETRIES_KIND == R2::RETRIES_KIND {
360 KeyedStream::new(self.location, self.ir_node.into_inner())
361 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
362 // We can always weaken the retries guarantee
363 KeyedStream::new(
364 self.location.clone(),
365 HydroNode::Cast {
366 inner: Box::new(self.ir_node.into_inner()),
367 metadata: self
368 .location
369 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
370 },
371 )
372 } else {
373 KeyedStream::new(
374 self.location.clone(),
375 HydroNode::ObserveNonDet {
376 inner: Box::new(self.ir_node.into_inner()),
377 trusted: false,
378 metadata: self
379 .location
380 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
381 },
382 )
383 }
384 }
385
386 #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
387 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
388 /// which is always safe because that is the weakest possible guarantee.
389 pub fn weakest_retries(self) -> KeyedStream<K, V, L, B, O, AtLeastOnce> {
390 self.weaken_retries::<AtLeastOnce>()
391 }
392
393 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
394 /// enforcing that `R2` is weaker than the input retries guarantee.
395 pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> KeyedStream<K, V, L, B, O, R2> {
396 let nondet = nondet!(/** this is a weaker retries guarantee, so it is safe to assume */);
397 self.assume_retries::<R2>(nondet)
398 }
399
400 /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
401 /// implies that `O == TotalOrder`.
402 pub fn make_totally_ordered(self) -> KeyedStream<K, V, L, B, TotalOrder, R>
403 where
404 O: IsOrdered,
405 {
406 self.assume_ordering(nondet!(/** no-op */))
407 }
408
409 /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
410 /// implies that `R == ExactlyOnce`.
411 pub fn make_exactly_once(self) -> KeyedStream<K, V, L, B, O, ExactlyOnce>
412 where
413 R: IsExactlyOnce,
414 {
415 self.assume_retries(nondet!(/** no-op */))
416 }
417
418 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
419 /// implies that `B == Bounded`.
420 pub fn make_bounded(self) -> KeyedStream<K, V, L, Bounded, O, R>
421 where
422 B: IsBounded,
423 {
424 KeyedStream::new(self.location, self.ir_node.into_inner())
425 }
426
427 /// Flattens the keyed stream into an unordered stream of key-value pairs.
428 ///
429 /// # Example
430 /// ```rust
431 /// # #[cfg(feature = "deploy")] {
432 /// # use hydro_lang::prelude::*;
433 /// # use futures::StreamExt;
434 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
435 /// process
436 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
437 /// .into_keyed()
438 /// .entries()
439 /// # }, |mut stream| async move {
440 /// // (1, 2), (1, 3), (2, 4) in any order
441 /// # let mut results = Vec::new();
442 /// # for _ in 0..3 {
443 /// # results.push(stream.next().await.unwrap());
444 /// # }
445 /// # results.sort();
446 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
447 /// # }));
448 /// # }
449 /// ```
450 pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
451 Stream::new(
452 self.location.clone(),
453 HydroNode::Cast {
454 inner: Box::new(self.ir_node.into_inner()),
455 metadata: self
456 .location
457 .new_node_metadata(Stream::<(K, V), L, B, NoOrder, R>::collection_kind()),
458 },
459 )
460 }
461
462 /// Flattens the keyed stream into an unordered stream of only the values.
463 ///
464 /// # Example
465 /// ```rust
466 /// # #[cfg(feature = "deploy")] {
467 /// # use hydro_lang::prelude::*;
468 /// # use futures::StreamExt;
469 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
470 /// process
471 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
472 /// .into_keyed()
473 /// .values()
474 /// # }, |mut stream| async move {
475 /// // 2, 3, 4 in any order
476 /// # let mut results = Vec::new();
477 /// # for _ in 0..3 {
478 /// # results.push(stream.next().await.unwrap());
479 /// # }
480 /// # results.sort();
481 /// # assert_eq!(results, vec![2, 3, 4]);
482 /// # }));
483 /// # }
484 /// ```
485 pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
486 self.entries().map(q!(|(_, v)| v))
487 }
488
489 /// Flattens the keyed stream into an unordered stream of just the keys.
490 ///
491 /// # Example
492 /// ```rust
493 /// # #[cfg(feature = "deploy")] {
494 /// # use hydro_lang::prelude::*;
495 /// # use futures::StreamExt;
496 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
497 /// # process
498 /// # .source_iter(q!(vec![(1, 2), (2, 4), (1, 5)]))
499 /// # .into_keyed()
500 /// # .keys()
501 /// # }, |mut stream| async move {
502 /// // 1, 2 in any order
503 /// # let mut results = Vec::new();
504 /// # for _ in 0..2 {
505 /// # results.push(stream.next().await.unwrap());
506 /// # }
507 /// # results.sort();
508 /// # assert_eq!(results, vec![1, 2]);
509 /// # }));
510 /// # }
511 /// ```
512 pub fn keys(self) -> Stream<K, L, B, NoOrder, ExactlyOnce>
513 where
514 K: Eq + Hash,
515 {
516 self.entries().map(q!(|(k, _)| k)).unique()
517 }
518
519 /// Transforms each value by invoking `f` on each element, with keys staying the same
520 /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
521 ///
522 /// If you do not want to modify the stream and instead only want to view
523 /// each item use [`KeyedStream::inspect`] instead.
524 ///
525 /// # Example
526 /// ```rust
527 /// # #[cfg(feature = "deploy")] {
528 /// # use hydro_lang::prelude::*;
529 /// # use futures::StreamExt;
530 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
531 /// process
532 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
533 /// .into_keyed()
534 /// .map(q!(|v| v + 1))
535 /// # .entries()
536 /// # }, |mut stream| async move {
537 /// // { 1: [3, 4], 2: [5] }
538 /// # let mut results = Vec::new();
539 /// # for _ in 0..3 {
540 /// # results.push(stream.next().await.unwrap());
541 /// # }
542 /// # results.sort();
543 /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 5)]);
544 /// # }));
545 /// # }
546 /// ```
547 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
548 where
549 F: Fn(V) -> U + 'a,
550 {
551 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
552 let map_f = q!({
553 let orig = f;
554 move |(k, v)| (k, orig(v))
555 })
556 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
557 .into();
558
559 KeyedStream::new(
560 self.location.clone(),
561 HydroNode::Map {
562 f: map_f,
563 input: Box::new(self.ir_node.into_inner()),
564 metadata: self
565 .location
566 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
567 },
568 )
569 }
570
571 /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
572 /// re-grouped even they are tuples; instead they will be grouped under the original key.
573 ///
574 /// If you do not want to modify the stream and instead only want to view
575 /// each item use [`KeyedStream::inspect_with_key`] instead.
576 ///
577 /// # Example
578 /// ```rust
579 /// # #[cfg(feature = "deploy")] {
580 /// # use hydro_lang::prelude::*;
581 /// # use futures::StreamExt;
582 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
583 /// process
584 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
585 /// .into_keyed()
586 /// .map_with_key(q!(|(k, v)| k + v))
587 /// # .entries()
588 /// # }, |mut stream| async move {
589 /// // { 1: [3, 4], 2: [6] }
590 /// # let mut results = Vec::new();
591 /// # for _ in 0..3 {
592 /// # results.push(stream.next().await.unwrap());
593 /// # }
594 /// # results.sort();
595 /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 6)]);
596 /// # }));
597 /// # }
598 /// ```
599 pub fn map_with_key<U, F>(
600 self,
601 f: impl IntoQuotedMut<'a, F, L> + Copy,
602 ) -> KeyedStream<K, U, L, B, O, R>
603 where
604 F: Fn((K, V)) -> U + 'a,
605 K: Clone,
606 {
607 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
608 let map_f = q!({
609 let orig = f;
610 move |(k, v)| {
611 let out = orig((Clone::clone(&k), v));
612 (k, out)
613 }
614 })
615 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
616 .into();
617
618 KeyedStream::new(
619 self.location.clone(),
620 HydroNode::Map {
621 f: map_f,
622 input: Box::new(self.ir_node.into_inner()),
623 metadata: self
624 .location
625 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
626 },
627 )
628 }
629
630 /// Prepends a new value to the key of each element in the stream, producing a new
631 /// keyed stream with compound keys. Because the original key is preserved, no re-grouping
632 /// occurs and the elements in each group preserve their original order.
633 ///
634 /// # Example
635 /// ```rust
636 /// # #[cfg(feature = "deploy")] {
637 /// # use hydro_lang::prelude::*;
638 /// # use futures::StreamExt;
639 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
640 /// process
641 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
642 /// .into_keyed()
643 /// .prefix_key(q!(|&(k, _)| k % 2))
644 /// # .entries()
645 /// # }, |mut stream| async move {
646 /// // { (1, 1): [2, 3], (0, 2): [4] }
647 /// # let mut results = Vec::new();
648 /// # for _ in 0..3 {
649 /// # results.push(stream.next().await.unwrap());
650 /// # }
651 /// # results.sort();
652 /// # assert_eq!(results, vec![((0, 2), 4), ((1, 1), 2), ((1, 1), 3)]);
653 /// # }));
654 /// # }
655 /// ```
656 pub fn prefix_key<K2, F>(
657 self,
658 f: impl IntoQuotedMut<'a, F, L> + Copy,
659 ) -> KeyedStream<(K2, K), V, L, B, O, R>
660 where
661 F: Fn(&(K, V)) -> K2 + 'a,
662 {
663 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
664 let map_f = q!({
665 let orig = f;
666 move |kv| {
667 let out = orig(&kv);
668 ((out, kv.0), kv.1)
669 }
670 })
671 .splice_fn1_ctx::<(K, V), ((K2, K), V)>(&self.location)
672 .into();
673
674 KeyedStream::new(
675 self.location.clone(),
676 HydroNode::Map {
677 f: map_f,
678 input: Box::new(self.ir_node.into_inner()),
679 metadata: self
680 .location
681 .new_node_metadata(KeyedStream::<(K2, K), V, L, B, O, R>::collection_kind()),
682 },
683 )
684 }
685
686 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
687 /// `f`, preserving the order of the elements within the group.
688 ///
689 /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
690 /// not modify or take ownership of the values. If you need to modify the values while filtering
691 /// use [`KeyedStream::filter_map`] instead.
692 ///
693 /// # Example
694 /// ```rust
695 /// # #[cfg(feature = "deploy")] {
696 /// # use hydro_lang::prelude::*;
697 /// # use futures::StreamExt;
698 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
699 /// process
700 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
701 /// .into_keyed()
702 /// .filter(q!(|&x| x > 2))
703 /// # .entries()
704 /// # }, |mut stream| async move {
705 /// // { 1: [3], 2: [4] }
706 /// # let mut results = Vec::new();
707 /// # for _ in 0..2 {
708 /// # results.push(stream.next().await.unwrap());
709 /// # }
710 /// # results.sort();
711 /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
712 /// # }));
713 /// # }
714 /// ```
715 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
716 where
717 F: Fn(&V) -> bool + 'a,
718 {
719 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
720 let filter_f = q!({
721 let orig = f;
722 move |t: &(_, _)| orig(&t.1)
723 })
724 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
725 .into();
726
727 KeyedStream::new(
728 self.location.clone(),
729 HydroNode::Filter {
730 f: filter_f,
731 input: Box::new(self.ir_node.into_inner()),
732 metadata: self.location.new_node_metadata(Self::collection_kind()),
733 },
734 )
735 }
736
737 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
738 /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
739 ///
740 /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
741 /// not modify or take ownership of the values. If you need to modify the values while filtering
742 /// use [`KeyedStream::filter_map_with_key`] instead.
743 ///
744 /// # Example
745 /// ```rust
746 /// # #[cfg(feature = "deploy")] {
747 /// # use hydro_lang::prelude::*;
748 /// # use futures::StreamExt;
749 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
750 /// process
751 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
752 /// .into_keyed()
753 /// .filter_with_key(q!(|&(k, v)| v - k == 2))
754 /// # .entries()
755 /// # }, |mut stream| async move {
756 /// // { 1: [3], 2: [4] }
757 /// # let mut results = Vec::new();
758 /// # for _ in 0..2 {
759 /// # results.push(stream.next().await.unwrap());
760 /// # }
761 /// # results.sort();
762 /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
763 /// # }));
764 /// # }
765 /// ```
766 pub fn filter_with_key<F>(
767 self,
768 f: impl IntoQuotedMut<'a, F, L> + Copy,
769 ) -> KeyedStream<K, V, L, B, O, R>
770 where
771 F: Fn(&(K, V)) -> bool + 'a,
772 {
773 let filter_f = f
774 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
775 .into();
776
777 KeyedStream::new(
778 self.location.clone(),
779 HydroNode::Filter {
780 f: filter_f,
781 input: Box::new(self.ir_node.into_inner()),
782 metadata: self.location.new_node_metadata(Self::collection_kind()),
783 },
784 )
785 }
786
787 /// An operator that both filters and maps each value, with keys staying the same.
788 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
789 /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
790 ///
791 /// # Example
792 /// ```rust
793 /// # #[cfg(feature = "deploy")] {
794 /// # use hydro_lang::prelude::*;
795 /// # use futures::StreamExt;
796 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
797 /// process
798 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
799 /// .into_keyed()
800 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
801 /// # .entries()
802 /// # }, |mut stream| async move {
803 /// // { 1: [2], 2: [4] }
804 /// # let mut results = Vec::new();
805 /// # for _ in 0..2 {
806 /// # results.push(stream.next().await.unwrap());
807 /// # }
808 /// # results.sort();
809 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
810 /// # }));
811 /// # }
812 /// ```
813 pub fn filter_map<U, F>(
814 self,
815 f: impl IntoQuotedMut<'a, F, L> + Copy,
816 ) -> KeyedStream<K, U, L, B, O, R>
817 where
818 F: Fn(V) -> Option<U> + 'a,
819 {
820 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
821 let filter_map_f = q!({
822 let orig = f;
823 move |(k, v)| orig(v).map(|o| (k, o))
824 })
825 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
826 .into();
827
828 KeyedStream::new(
829 self.location.clone(),
830 HydroNode::FilterMap {
831 f: filter_map_f,
832 input: Box::new(self.ir_node.into_inner()),
833 metadata: self
834 .location
835 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
836 },
837 )
838 }
839
840 /// An operator that both filters and maps each key-value pair. The resulting values are **not**
841 /// re-grouped even they are tuples; instead they will be grouped under the original key.
842 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
843 ///
844 /// # Example
845 /// ```rust
846 /// # #[cfg(feature = "deploy")] {
847 /// # use hydro_lang::prelude::*;
848 /// # use futures::StreamExt;
849 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
850 /// process
851 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
852 /// .into_keyed()
853 /// .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
854 /// # .entries()
855 /// # }, |mut stream| async move {
856 /// // { 2: [2] }
857 /// # let mut results = Vec::new();
858 /// # for _ in 0..1 {
859 /// # results.push(stream.next().await.unwrap());
860 /// # }
861 /// # results.sort();
862 /// # assert_eq!(results, vec![(2, 2)]);
863 /// # }));
864 /// # }
865 /// ```
866 pub fn filter_map_with_key<U, F>(
867 self,
868 f: impl IntoQuotedMut<'a, F, L> + Copy,
869 ) -> KeyedStream<K, U, L, B, O, R>
870 where
871 F: Fn((K, V)) -> Option<U> + 'a,
872 K: Clone,
873 {
874 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
875 let filter_map_f = q!({
876 let orig = f;
877 move |(k, v)| {
878 let out = orig((Clone::clone(&k), v));
879 out.map(|o| (k, o))
880 }
881 })
882 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
883 .into();
884
885 KeyedStream::new(
886 self.location.clone(),
887 HydroNode::FilterMap {
888 f: filter_map_f,
889 input: Box::new(self.ir_node.into_inner()),
890 metadata: self
891 .location
892 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
893 },
894 )
895 }
896
897 /// Generates a keyed stream that maps each value `v` to a tuple `(v, x)`,
898 /// where `v` is the value of `other`, a bounded [`super::singleton::Singleton`] or
899 /// [`Optional`]. If `other` is an empty [`Optional`], no values will be produced.
900 ///
901 /// # Example
902 /// ```rust
903 /// # #[cfg(feature = "deploy")] {
904 /// # use hydro_lang::prelude::*;
905 /// # use futures::StreamExt;
906 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
907 /// let tick = process.tick();
908 /// let batch = process
909 /// .source_iter(q!(vec![(1, 123), (1, 456), (2, 123)]))
910 /// .into_keyed()
911 /// .batch(&tick, nondet!(/** test */));
912 /// let count = batch.clone().entries().count(); // `count()` returns a singleton
913 /// batch.cross_singleton(count).all_ticks().entries()
914 /// # }, |mut stream| async move {
915 /// // { 1: [(123, 3), (456, 3)], 2: [(123, 3)] }
916 /// # let mut results = Vec::new();
917 /// # for _ in 0..3 {
918 /// # results.push(stream.next().await.unwrap());
919 /// # }
920 /// # results.sort();
921 /// # assert_eq!(results, vec![(1, (123, 3)), (1, (456, 3)), (2, (123, 3))]);
922 /// # }));
923 /// # }
924 /// ```
925 pub fn cross_singleton<O2>(
926 self,
927 other: impl Into<Optional<O2, L, Bounded>>,
928 ) -> KeyedStream<K, (V, O2), L, B, O, R>
929 where
930 O2: Clone,
931 {
932 let other: Optional<O2, L, Bounded> = other.into();
933 check_matching_location(&self.location, &other.location);
934
935 Stream::new(
936 self.location.clone(),
937 HydroNode::CrossSingleton {
938 left: Box::new(self.ir_node.into_inner()),
939 right: Box::new(other.ir_node.into_inner()),
940 metadata: self
941 .location
942 .new_node_metadata(Stream::<((K, V), O2), L, B, O, R>::collection_kind()),
943 },
944 )
945 .map(q!(|((k, v), o2)| (k, (v, o2))))
946 .into_keyed()
947 }
948
949 /// For each value `v` in each group, transform `v` using `f` and then treat the
950 /// result as an [`Iterator`] to produce values one by one within the same group.
951 /// The implementation for [`Iterator`] for the output type `I` must produce items
952 /// in a **deterministic** order.
953 ///
954 /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
955 /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
956 ///
957 /// # Example
958 /// ```rust
959 /// # #[cfg(feature = "deploy")] {
960 /// # use hydro_lang::prelude::*;
961 /// # use futures::StreamExt;
962 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
963 /// process
964 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
965 /// .into_keyed()
966 /// .flat_map_ordered(q!(|x| x))
967 /// # .entries()
968 /// # }, |mut stream| async move {
969 /// // { 1: [2, 3, 4], 2: [5, 6] }
970 /// # let mut results = Vec::new();
971 /// # for _ in 0..5 {
972 /// # results.push(stream.next().await.unwrap());
973 /// # }
974 /// # results.sort();
975 /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
976 /// # }));
977 /// # }
978 /// ```
979 pub fn flat_map_ordered<U, I, F>(
980 self,
981 f: impl IntoQuotedMut<'a, F, L> + Copy,
982 ) -> KeyedStream<K, U, L, B, O, R>
983 where
984 I: IntoIterator<Item = U>,
985 F: Fn(V) -> I + 'a,
986 K: Clone,
987 {
988 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
989 let flat_map_f = q!({
990 let orig = f;
991 move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
992 })
993 .splice_fn1_ctx::<(K, V), _>(&self.location)
994 .into();
995
996 KeyedStream::new(
997 self.location.clone(),
998 HydroNode::FlatMap {
999 f: flat_map_f,
1000 input: Box::new(self.ir_node.into_inner()),
1001 metadata: self
1002 .location
1003 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
1004 },
1005 )
1006 }
1007
1008 /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
1009 /// for the output type `I` to produce items in any order.
1010 ///
1011 /// # Example
1012 /// ```rust
1013 /// # #[cfg(feature = "deploy")] {
1014 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1015 /// # use futures::StreamExt;
1016 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1017 /// process
1018 /// .source_iter(q!(vec![
1019 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1020 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
1021 /// ]))
1022 /// .into_keyed()
1023 /// .flat_map_unordered(q!(|x| x))
1024 /// # .entries()
1025 /// # }, |mut stream| async move {
1026 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1027 /// # let mut results = Vec::new();
1028 /// # for _ in 0..4 {
1029 /// # results.push(stream.next().await.unwrap());
1030 /// # }
1031 /// # results.sort();
1032 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1033 /// # }));
1034 /// # }
1035 /// ```
1036 pub fn flat_map_unordered<U, I, F>(
1037 self,
1038 f: impl IntoQuotedMut<'a, F, L> + Copy,
1039 ) -> KeyedStream<K, U, L, B, NoOrder, R>
1040 where
1041 I: IntoIterator<Item = U>,
1042 F: Fn(V) -> I + 'a,
1043 K: Clone,
1044 {
1045 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1046 let flat_map_f = q!({
1047 let orig = f;
1048 move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1049 })
1050 .splice_fn1_ctx::<(K, V), _>(&self.location)
1051 .into();
1052
1053 KeyedStream::new(
1054 self.location.clone(),
1055 HydroNode::FlatMap {
1056 f: flat_map_f,
1057 input: Box::new(self.ir_node.into_inner()),
1058 metadata: self
1059 .location
1060 .new_node_metadata(KeyedStream::<K, U, L, B, NoOrder, R>::collection_kind()),
1061 },
1062 )
1063 }
1064
1065 /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
1066 /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
1067 /// items in a **deterministic** order.
1068 ///
1069 /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
1070 /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
1071 ///
1072 /// # Example
1073 /// ```rust
1074 /// # #[cfg(feature = "deploy")] {
1075 /// # use hydro_lang::prelude::*;
1076 /// # use futures::StreamExt;
1077 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1078 /// process
1079 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
1080 /// .into_keyed()
1081 /// .flatten_ordered()
1082 /// # .entries()
1083 /// # }, |mut stream| async move {
1084 /// // { 1: [2, 3, 4], 2: [5, 6] }
1085 /// # let mut results = Vec::new();
1086 /// # for _ in 0..5 {
1087 /// # results.push(stream.next().await.unwrap());
1088 /// # }
1089 /// # results.sort();
1090 /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1091 /// # }));
1092 /// # }
1093 /// ```
1094 pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
1095 where
1096 V: IntoIterator<Item = U>,
1097 K: Clone,
1098 {
1099 self.flat_map_ordered(q!(|d| d))
1100 }
1101
1102 /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
1103 /// for the value type `V` to produce items in any order.
1104 ///
1105 /// # Example
1106 /// ```rust
1107 /// # #[cfg(feature = "deploy")] {
1108 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1109 /// # use futures::StreamExt;
1110 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1111 /// process
1112 /// .source_iter(q!(vec![
1113 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1114 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
1115 /// ]))
1116 /// .into_keyed()
1117 /// .flatten_unordered()
1118 /// # .entries()
1119 /// # }, |mut stream| async move {
1120 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1121 /// # let mut results = Vec::new();
1122 /// # for _ in 0..4 {
1123 /// # results.push(stream.next().await.unwrap());
1124 /// # }
1125 /// # results.sort();
1126 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1127 /// # }));
1128 /// # }
1129 /// ```
1130 pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
1131 where
1132 V: IntoIterator<Item = U>,
1133 K: Clone,
1134 {
1135 self.flat_map_unordered(q!(|d| d))
1136 }
1137
1138 /// An operator which allows you to "inspect" each element of a stream without
1139 /// modifying it. The closure `f` is called on a reference to each value. This is
1140 /// mainly useful for debugging, and should not be used to generate side-effects.
1141 ///
1142 /// # Example
1143 /// ```rust
1144 /// # #[cfg(feature = "deploy")] {
1145 /// # use hydro_lang::prelude::*;
1146 /// # use futures::StreamExt;
1147 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1148 /// process
1149 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1150 /// .into_keyed()
1151 /// .inspect(q!(|v| println!("{}", v)))
1152 /// # .entries()
1153 /// # }, |mut stream| async move {
1154 /// # let mut results = Vec::new();
1155 /// # for _ in 0..3 {
1156 /// # results.push(stream.next().await.unwrap());
1157 /// # }
1158 /// # results.sort();
1159 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1160 /// # }));
1161 /// # }
1162 /// ```
1163 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1164 where
1165 F: Fn(&V) + 'a,
1166 {
1167 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1168 let inspect_f = q!({
1169 let orig = f;
1170 move |t: &(_, _)| orig(&t.1)
1171 })
1172 .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1173 .into();
1174
1175 KeyedStream::new(
1176 self.location.clone(),
1177 HydroNode::Inspect {
1178 f: inspect_f,
1179 input: Box::new(self.ir_node.into_inner()),
1180 metadata: self.location.new_node_metadata(Self::collection_kind()),
1181 },
1182 )
1183 }
1184
1185 /// An operator which allows you to "inspect" each element of a stream without
1186 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1187 /// mainly useful for debugging, and should not be used to generate side-effects.
1188 ///
1189 /// # Example
1190 /// ```rust
1191 /// # #[cfg(feature = "deploy")] {
1192 /// # use hydro_lang::prelude::*;
1193 /// # use futures::StreamExt;
1194 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1195 /// process
1196 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1197 /// .into_keyed()
1198 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1199 /// # .entries()
1200 /// # }, |mut stream| async move {
1201 /// # let mut results = Vec::new();
1202 /// # for _ in 0..3 {
1203 /// # results.push(stream.next().await.unwrap());
1204 /// # }
1205 /// # results.sort();
1206 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1207 /// # }));
1208 /// # }
1209 /// ```
1210 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1211 where
1212 F: Fn(&(K, V)) + 'a,
1213 {
1214 let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1215
1216 KeyedStream::new(
1217 self.location.clone(),
1218 HydroNode::Inspect {
1219 f: inspect_f,
1220 input: Box::new(self.ir_node.into_inner()),
1221 metadata: self.location.new_node_metadata(Self::collection_kind()),
1222 },
1223 )
1224 }
1225
1226 /// An operator which allows you to "name" a `HydroNode`.
1227 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1228 pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
1229 {
1230 let mut node = self.ir_node.borrow_mut();
1231 let metadata = node.metadata_mut();
1232 metadata.tag = Some(name.to_owned());
1233 }
1234 self
1235 }
1236
1237 /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
1238 ///
1239 /// Unlike [`KeyedStream::fold`] which only returns the final accumulated value, `scan` produces a new stream
1240 /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
1241 /// early by returning `None`.
1242 ///
1243 /// The function takes a mutable reference to the accumulator and the current element, and returns
1244 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1245 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1246 ///
1247 /// # Example
1248 /// ```rust
1249 /// # #[cfg(feature = "deploy")] {
1250 /// # use hydro_lang::prelude::*;
1251 /// # use futures::StreamExt;
1252 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1253 /// process
1254 /// .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
1255 /// .into_keyed()
1256 /// .scan(
1257 /// q!(|| 0),
1258 /// q!(|acc, x| {
1259 /// *acc += x;
1260 /// if *acc % 2 == 0 { None } else { Some(*acc) }
1261 /// }),
1262 /// )
1263 /// # .entries()
1264 /// # }, |mut stream| async move {
1265 /// // Output: { 0: [1], 1: [3, 7] }
1266 /// # let mut results = Vec::new();
1267 /// # for _ in 0..3 {
1268 /// # results.push(stream.next().await.unwrap());
1269 /// # }
1270 /// # results.sort();
1271 /// # assert_eq!(results, vec![(0, 1), (1, 3), (1, 7)]);
1272 /// # }));
1273 /// # }
1274 /// ```
1275 pub fn scan<A, U, I, F>(
1276 self,
1277 init: impl IntoQuotedMut<'a, I, L> + Copy,
1278 f: impl IntoQuotedMut<'a, F, L> + Copy,
1279 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1280 where
1281 O: IsOrdered,
1282 R: IsExactlyOnce,
1283 K: Clone + Eq + Hash,
1284 I: Fn() -> A + 'a,
1285 F: Fn(&mut A, V) -> Option<U> + 'a,
1286 {
1287 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1288 self.make_totally_ordered().make_exactly_once().generator(
1289 init,
1290 q!({
1291 let orig = f;
1292 move |state, v| {
1293 if let Some(out) = orig(state, v) {
1294 Generate::Yield(out)
1295 } else {
1296 Generate::Break
1297 }
1298 }
1299 }),
1300 )
1301 }
1302
1303 /// Iteratively processes the elements in each group using a state machine that can yield
1304 /// elements as it processes its inputs. This is designed to mirror the unstable generator
1305 /// syntax in Rust, without requiring special syntax.
1306 ///
1307 /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
1308 /// state for each group. The second argument defines the processing logic, taking in a
1309 /// mutable reference to the group's state and the value to be processed. It emits a
1310 /// [`Generate`] value, whose variants define what is emitted and whether further inputs
1311 /// should be processed.
1312 ///
1313 /// # Example
1314 /// ```rust
1315 /// # #[cfg(feature = "deploy")] {
1316 /// # use hydro_lang::prelude::*;
1317 /// # use futures::StreamExt;
1318 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1319 /// process
1320 /// .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
1321 /// .into_keyed()
1322 /// .generator(
1323 /// q!(|| 0),
1324 /// q!(|acc, x| {
1325 /// *acc += x;
1326 /// if *acc > 100 {
1327 /// hydro_lang::live_collections::keyed_stream::Generate::Return(
1328 /// "done!".to_owned()
1329 /// )
1330 /// } else if *acc % 2 == 0 {
1331 /// hydro_lang::live_collections::keyed_stream::Generate::Yield(
1332 /// "even".to_owned()
1333 /// )
1334 /// } else {
1335 /// hydro_lang::live_collections::keyed_stream::Generate::Continue
1336 /// }
1337 /// }),
1338 /// )
1339 /// # .entries()
1340 /// # }, |mut stream| async move {
1341 /// // Output: { 0: ["even", "done!"], 1: ["even"] }
1342 /// # let mut results = Vec::new();
1343 /// # for _ in 0..3 {
1344 /// # results.push(stream.next().await.unwrap());
1345 /// # }
1346 /// # results.sort();
1347 /// # assert_eq!(results, vec![(0, "done!".to_owned()), (0, "even".to_owned()), (1, "even".to_owned())]);
1348 /// # }));
1349 /// # }
1350 /// ```
1351 pub fn generator<A, U, I, F>(
1352 self,
1353 init: impl IntoQuotedMut<'a, I, L> + Copy,
1354 f: impl IntoQuotedMut<'a, F, L> + Copy,
1355 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1356 where
1357 O: IsOrdered,
1358 R: IsExactlyOnce,
1359 K: Clone + Eq + Hash,
1360 I: Fn() -> A + 'a,
1361 F: Fn(&mut A, V) -> Generate<U> + 'a,
1362 {
1363 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1364 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1365
1366 let this = self.make_totally_ordered().make_exactly_once();
1367
1368 let scan_init = q!(|| HashMap::new())
1369 .splice_fn0_ctx::<HashMap<K, Option<A>>>(&this.location)
1370 .into();
1371 let scan_f = q!(move |acc: &mut HashMap<_, _>, (k, v)| {
1372 let existing_state = acc.entry(Clone::clone(&k)).or_insert_with(|| Some(init()));
1373 if let Some(existing_state_value) = existing_state {
1374 match f(existing_state_value, v) {
1375 Generate::Yield(out) => Some(Some((k, out))),
1376 Generate::Return(out) => {
1377 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1378 Some(Some((k, out)))
1379 }
1380 Generate::Break => {
1381 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1382 Some(None)
1383 }
1384 Generate::Continue => Some(None),
1385 }
1386 } else {
1387 Some(None)
1388 }
1389 })
1390 .splice_fn2_borrow_mut_ctx::<HashMap<K, Option<A>>, (K, V), _>(&this.location)
1391 .into();
1392
1393 let scan_node = HydroNode::Scan {
1394 init: scan_init,
1395 acc: scan_f,
1396 input: Box::new(this.ir_node.into_inner()),
1397 metadata: this.location.new_node_metadata(Stream::<
1398 Option<(K, U)>,
1399 L,
1400 B,
1401 TotalOrder,
1402 ExactlyOnce,
1403 >::collection_kind()),
1404 };
1405
1406 let flatten_f = q!(|d| d)
1407 .splice_fn1_ctx::<Option<(K, U)>, _>(&this.location)
1408 .into();
1409 let flatten_node = HydroNode::FlatMap {
1410 f: flatten_f,
1411 input: Box::new(scan_node),
1412 metadata: this.location.new_node_metadata(KeyedStream::<
1413 K,
1414 U,
1415 L,
1416 B,
1417 TotalOrder,
1418 ExactlyOnce,
1419 >::collection_kind()),
1420 };
1421
1422 KeyedStream::new(this.location, flatten_node)
1423 }
1424
1425 /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
1426 /// in-order across the values in each group. But the aggregation function returns a boolean,
1427 /// which when true indicates that the aggregated result is complete and can be released to
1428 /// downstream computation. Unlike [`KeyedStream::fold`], this means that even if the input
1429 /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
1430 /// normal stream elements.
1431 ///
1432 /// # Example
1433 /// ```rust
1434 /// # #[cfg(feature = "deploy")] {
1435 /// # use hydro_lang::prelude::*;
1436 /// # use futures::StreamExt;
1437 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1438 /// process
1439 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1440 /// .into_keyed()
1441 /// .fold_early_stop(
1442 /// q!(|| 0),
1443 /// q!(|acc, x| {
1444 /// *acc += x;
1445 /// x % 2 == 0
1446 /// }),
1447 /// )
1448 /// # .entries()
1449 /// # }, |mut stream| async move {
1450 /// // Output: { 0: 2, 1: 9 }
1451 /// # let mut results = Vec::new();
1452 /// # for _ in 0..2 {
1453 /// # results.push(stream.next().await.unwrap());
1454 /// # }
1455 /// # results.sort();
1456 /// # assert_eq!(results, vec![(0, 2), (1, 9)]);
1457 /// # }));
1458 /// # }
1459 /// ```
1460 pub fn fold_early_stop<A, I, F>(
1461 self,
1462 init: impl IntoQuotedMut<'a, I, L> + Copy,
1463 f: impl IntoQuotedMut<'a, F, L> + Copy,
1464 ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
1465 where
1466 O: IsOrdered,
1467 R: IsExactlyOnce,
1468 K: Clone + Eq + Hash,
1469 I: Fn() -> A + 'a,
1470 F: Fn(&mut A, V) -> bool + 'a,
1471 {
1472 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1473 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1474 let out_without_bound_cast = self.generator(
1475 q!(move || Some(init())),
1476 q!(move |key_state, v| {
1477 if let Some(key_state_value) = key_state.as_mut() {
1478 if f(key_state_value, v) {
1479 Generate::Return(key_state.take().unwrap())
1480 } else {
1481 Generate::Continue
1482 }
1483 } else {
1484 unreachable!()
1485 }
1486 }),
1487 );
1488
1489 KeyedSingleton::new(
1490 out_without_bound_cast.location.clone(),
1491 HydroNode::Cast {
1492 inner: Box::new(out_without_bound_cast.ir_node.into_inner()),
1493 metadata: out_without_bound_cast
1494 .location
1495 .new_node_metadata(
1496 KeyedSingleton::<K, A, L, B::WhenValueBounded>::collection_kind(),
1497 ),
1498 },
1499 )
1500 }
1501
1502 /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
1503 /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
1504 /// otherwise the first element would be non-deterministic.
1505 ///
1506 /// # Example
1507 /// ```rust
1508 /// # #[cfg(feature = "deploy")] {
1509 /// # use hydro_lang::prelude::*;
1510 /// # use futures::StreamExt;
1511 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1512 /// process
1513 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1514 /// .into_keyed()
1515 /// .first()
1516 /// # .entries()
1517 /// # }, |mut stream| async move {
1518 /// // Output: { 0: 2, 1: 3 }
1519 /// # let mut results = Vec::new();
1520 /// # for _ in 0..2 {
1521 /// # results.push(stream.next().await.unwrap());
1522 /// # }
1523 /// # results.sort();
1524 /// # assert_eq!(results, vec![(0, 2), (1, 3)]);
1525 /// # }));
1526 /// # }
1527 /// ```
1528 pub fn first(self) -> KeyedSingleton<K, V, L, B::WhenValueBounded>
1529 where
1530 O: IsOrdered,
1531 R: IsExactlyOnce,
1532 K: Clone + Eq + Hash,
1533 {
1534 self.fold_early_stop(
1535 q!(|| None),
1536 q!(|acc, v| {
1537 *acc = Some(v);
1538 true
1539 }),
1540 )
1541 .map(q!(|v| v.unwrap()))
1542 }
1543
1544 /// Counts the number of elements in each group, producing a [`KeyedSingleton`] with the counts.
1545 ///
1546 /// # Example
1547 /// ```rust
1548 /// # #[cfg(feature = "deploy")] {
1549 /// # use hydro_lang::prelude::*;
1550 /// # use futures::StreamExt;
1551 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1552 /// let tick = process.tick();
1553 /// let numbers = process
1554 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4), (1, 5)]))
1555 /// .into_keyed();
1556 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1557 /// batch
1558 /// .value_counts()
1559 /// .entries()
1560 /// .all_ticks()
1561 /// # }, |mut stream| async move {
1562 /// // (1, 3), (2, 2)
1563 /// # let mut results = Vec::new();
1564 /// # for _ in 0..2 {
1565 /// # results.push(stream.next().await.unwrap());
1566 /// # }
1567 /// # results.sort();
1568 /// # assert_eq!(results, vec![(1, 3), (2, 2)]);
1569 /// # }));
1570 /// # }
1571 /// ```
1572 pub fn value_counts(self) -> KeyedSingleton<K, usize, L, B::WhenValueUnbounded>
1573 where
1574 R: IsExactlyOnce,
1575 K: Eq + Hash,
1576 {
1577 self.make_exactly_once()
1578 .assume_ordering_trusted(
1579 nondet!(/** ordering within each group affects neither result nor intermediates */),
1580 )
1581 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1582 }
1583
1584 /// Like [`Stream::fold`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1585 /// group via the `comb` closure.
1586 ///
1587 /// Depending on the input stream guarantees, the closure may need to be commutative
1588 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1589 ///
1590 /// If the input and output value types are the same and do not require initialization then use
1591 /// [`KeyedStream::reduce`].
1592 ///
1593 /// # Example
1594 /// ```rust
1595 /// # #[cfg(feature = "deploy")] {
1596 /// # use hydro_lang::prelude::*;
1597 /// # use futures::StreamExt;
1598 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1599 /// let tick = process.tick();
1600 /// let numbers = process
1601 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1602 /// .into_keyed();
1603 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1604 /// batch
1605 /// .fold(q!(|| false), q!(|acc, x| *acc |= x))
1606 /// .entries()
1607 /// .all_ticks()
1608 /// # }, |mut stream| async move {
1609 /// // (1, false), (2, true)
1610 /// # let mut results = Vec::new();
1611 /// # for _ in 0..2 {
1612 /// # results.push(stream.next().await.unwrap());
1613 /// # }
1614 /// # results.sort();
1615 /// # assert_eq!(results, vec![(1, false), (2, true)]);
1616 /// # }));
1617 /// # }
1618 /// ```
1619 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V), C, Idemp>(
1620 self,
1621 init: impl IntoQuotedMut<'a, I, L>,
1622 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1623 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
1624 where
1625 K: Eq + Hash,
1626 C: ValidCommutativityFor<O>,
1627 Idemp: ValidIdempotenceFor<R>,
1628 {
1629 let init = init.splice_fn0_ctx(&self.location).into();
1630 let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1631 proof.register_proof(&comb);
1632
1633 let ordered = self
1634 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1635 .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1636
1637 KeyedSingleton::new(
1638 ordered.location.clone(),
1639 HydroNode::FoldKeyed {
1640 init,
1641 acc: comb.into(),
1642 input: Box::new(ordered.ir_node.into_inner()),
1643 metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1644 K,
1645 A,
1646 L,
1647 B::WhenValueUnbounded,
1648 >::collection_kind()),
1649 },
1650 )
1651 }
1652
1653 /// Like [`Stream::reduce`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1654 /// group via the `comb` closure.
1655 ///
1656 /// Depending on the input stream guarantees, the closure may need to be commutative
1657 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1658 ///
1659 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1660 ///
1661 /// # Example
1662 /// ```rust
1663 /// # #[cfg(feature = "deploy")] {
1664 /// # use hydro_lang::prelude::*;
1665 /// # use futures::StreamExt;
1666 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1667 /// let tick = process.tick();
1668 /// let numbers = process
1669 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1670 /// .into_keyed();
1671 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1672 /// batch
1673 /// .reduce(q!(|acc, x| *acc |= x))
1674 /// .entries()
1675 /// .all_ticks()
1676 /// # }, |mut stream| async move {
1677 /// // (1, false), (2, true)
1678 /// # let mut results = Vec::new();
1679 /// # for _ in 0..2 {
1680 /// # results.push(stream.next().await.unwrap());
1681 /// # }
1682 /// # results.sort();
1683 /// # assert_eq!(results, vec![(1, false), (2, true)]);
1684 /// # }));
1685 /// # }
1686 /// ```
1687 pub fn reduce<F: Fn(&mut V, V) + 'a, C, Idemp>(
1688 self,
1689 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1690 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1691 where
1692 K: Eq + Hash,
1693 C: ValidCommutativityFor<O>,
1694 Idemp: ValidIdempotenceFor<R>,
1695 {
1696 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1697 proof.register_proof(&f);
1698
1699 let ordered = self
1700 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1701 .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1702
1703 KeyedSingleton::new(
1704 ordered.location.clone(),
1705 HydroNode::ReduceKeyed {
1706 f: f.into(),
1707 input: Box::new(ordered.ir_node.into_inner()),
1708 metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1709 K,
1710 V,
1711 L,
1712 B::WhenValueUnbounded,
1713 >::collection_kind()),
1714 },
1715 )
1716 }
1717
1718 /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark
1719 /// are automatically deleted.
1720 ///
1721 /// Depending on the input stream guarantees, the closure may need to be commutative
1722 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1723 ///
1724 /// # Example
1725 /// ```rust
1726 /// # #[cfg(feature = "deploy")] {
1727 /// # use hydro_lang::prelude::*;
1728 /// # use futures::StreamExt;
1729 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1730 /// let tick = process.tick();
1731 /// let watermark = tick.singleton(q!(1));
1732 /// let numbers = process
1733 /// .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1734 /// .into_keyed();
1735 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1736 /// batch
1737 /// .reduce_watermark(watermark, q!(|acc, x| *acc |= x))
1738 /// .entries()
1739 /// .all_ticks()
1740 /// # }, |mut stream| async move {
1741 /// // (2, true)
1742 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1743 /// # }));
1744 /// # }
1745 /// ```
1746 pub fn reduce_watermark<O2, F, C, Idemp>(
1747 self,
1748 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1749 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1750 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1751 where
1752 K: Eq + Hash,
1753 O2: Clone,
1754 F: Fn(&mut V, V) + 'a,
1755 C: ValidCommutativityFor<O>,
1756 Idemp: ValidIdempotenceFor<R>,
1757 {
1758 let other: Optional<O2, Tick<L::Root>, Bounded> = other.into();
1759 check_matching_location(&self.location.root(), other.location.outer());
1760 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1761 proof.register_proof(&f);
1762
1763 let ordered = self
1764 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1765 .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1766
1767 KeyedSingleton::new(
1768 ordered.location.clone(),
1769 HydroNode::ReduceKeyedWatermark {
1770 f: f.into(),
1771 input: Box::new(ordered.ir_node.into_inner()),
1772 watermark: Box::new(other.ir_node.into_inner()),
1773 metadata: ordered.location.new_node_metadata(KeyedSingleton::<
1774 K,
1775 V,
1776 L,
1777 B::WhenValueUnbounded,
1778 >::collection_kind()),
1779 },
1780 )
1781 }
1782
1783 /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1784 /// whose keys are not in the bounded stream.
1785 ///
1786 /// # Example
1787 /// ```rust
1788 /// # #[cfg(feature = "deploy")] {
1789 /// # use hydro_lang::prelude::*;
1790 /// # use futures::StreamExt;
1791 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1792 /// let tick = process.tick();
1793 /// let keyed_stream = process
1794 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1795 /// .batch(&tick, nondet!(/** test */))
1796 /// .into_keyed();
1797 /// let keys_to_remove = process
1798 /// .source_iter(q!(vec![1, 2]))
1799 /// .batch(&tick, nondet!(/** test */));
1800 /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1801 /// # .entries()
1802 /// # }, |mut stream| async move {
1803 /// // { 3: ['c'], 4: ['d'] }
1804 /// # let mut results = Vec::new();
1805 /// # for _ in 0..2 {
1806 /// # results.push(stream.next().await.unwrap());
1807 /// # }
1808 /// # results.sort();
1809 /// # assert_eq!(results, vec![(3, 'c'), (4, 'd')]);
1810 /// # }));
1811 /// # }
1812 /// ```
1813 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1814 self,
1815 other: Stream<K, L, Bounded, O2, R2>,
1816 ) -> Self
1817 where
1818 K: Eq + Hash,
1819 {
1820 check_matching_location(&self.location, &other.location);
1821
1822 KeyedStream::new(
1823 self.location.clone(),
1824 HydroNode::AntiJoin {
1825 pos: Box::new(self.ir_node.into_inner()),
1826 neg: Box::new(other.ir_node.into_inner()),
1827 metadata: self.location.new_node_metadata(Self::collection_kind()),
1828 },
1829 )
1830 }
1831
1832 /// Emit a keyed stream containing keys shared between two keyed streams,
1833 /// where each value in the output keyed stream is a tuple of
1834 /// (self's value, other's value).
1835 /// If there are multiple values for the same key, this performs a cross product
1836 /// for each matching key.
1837 ///
1838 /// # Example
1839 /// ```rust
1840 /// # #[cfg(feature = "deploy")] {
1841 /// # use hydro_lang::prelude::*;
1842 /// # use futures::StreamExt;
1843 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1844 /// let tick = process.tick();
1845 /// let keyed_data = process
1846 /// .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
1847 /// .into_keyed()
1848 /// .batch(&tick, nondet!(/** test */));
1849 /// let other_data = process
1850 /// .source_iter(q!(vec![(1, 100), (2, 200), (2, 201)]))
1851 /// .into_keyed()
1852 /// .batch(&tick, nondet!(/** test */));
1853 /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
1854 /// # }, |mut stream| async move {
1855 /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200), (20, 201)] } in any order
1856 /// # let mut results = vec![];
1857 /// # for _ in 0..4 {
1858 /// # results.push(stream.next().await.unwrap());
1859 /// # }
1860 /// # results.sort();
1861 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200)), (2, (20, 201))]);
1862 /// # }));
1863 /// # }
1864 /// ```
1865 pub fn join_keyed_stream<V2, O2: Ordering, R2: Retries>(
1866 self,
1867 other: KeyedStream<K, V2, L, B, O2, R2>,
1868 ) -> KeyedStream<K, (V, V2), L, B, NoOrder, <R as MinRetries<R2>>::Min>
1869 where
1870 K: Eq + Hash,
1871 R: MinRetries<R2>,
1872 {
1873 self.entries().join(other.entries()).into_keyed()
1874 }
1875
1876 /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
1877 /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
1878 /// is only present in one of the inputs, its values are passed through as-is). The output has
1879 /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
1880 ///
1881 /// Currently, both input streams must be [`Bounded`]. This operator will block
1882 /// on the first stream until all its elements are available. In a future version,
1883 /// we will relax the requirement on the `other` stream.
1884 ///
1885 /// # Example
1886 /// ```rust
1887 /// # #[cfg(feature = "deploy")] {
1888 /// # use hydro_lang::prelude::*;
1889 /// # use futures::StreamExt;
1890 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1891 /// let tick = process.tick();
1892 /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
1893 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1894 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1895 /// # .entries()
1896 /// # }, |mut stream| async move {
1897 /// // { 0: [2, 1], 1: [4, 3] }
1898 /// # let mut results = Vec::new();
1899 /// # for _ in 0..4 {
1900 /// # results.push(stream.next().await.unwrap());
1901 /// # }
1902 /// # results.sort();
1903 /// # assert_eq!(results, vec![(0, 1), (0, 2), (1, 3), (1, 4)]);
1904 /// # }));
1905 /// # }
1906 /// ```
1907 pub fn chain<O2: Ordering, R2: Retries>(
1908 self,
1909 other: KeyedStream<K, V, L, Bounded, O2, R2>,
1910 ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1911 where
1912 B: IsBounded,
1913 O: MinOrder<O2>,
1914 R: MinRetries<R2>,
1915 {
1916 let this = self.make_bounded();
1917 check_matching_location(&this.location, &other.location);
1918
1919 KeyedStream::new(
1920 this.location.clone(),
1921 HydroNode::Chain {
1922 first: Box::new(this.ir_node.into_inner()),
1923 second: Box::new(other.ir_node.into_inner()),
1924 metadata: this.location.new_node_metadata(KeyedStream::<
1925 K,
1926 V,
1927 L,
1928 Bounded,
1929 <O as MinOrder<O2>>::Min,
1930 <R as MinRetries<R2>>::Min,
1931 >::collection_kind()),
1932 },
1933 )
1934 }
1935
1936 /// Emit a keyed stream containing keys shared between the keyed stream and the
1937 /// keyed singleton, where each value in the output keyed stream is a tuple of
1938 /// (the keyed stream's value, the keyed singleton's value).
1939 ///
1940 /// # Example
1941 /// ```rust
1942 /// # #[cfg(feature = "deploy")] {
1943 /// # use hydro_lang::prelude::*;
1944 /// # use futures::StreamExt;
1945 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1946 /// let tick = process.tick();
1947 /// let keyed_data = process
1948 /// .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
1949 /// .into_keyed()
1950 /// .batch(&tick, nondet!(/** test */));
1951 /// let singleton_data = process
1952 /// .source_iter(q!(vec![(1, 100), (2, 200)]))
1953 /// .into_keyed()
1954 /// .batch(&tick, nondet!(/** test */))
1955 /// .first();
1956 /// keyed_data.join_keyed_singleton(singleton_data).entries().all_ticks()
1957 /// # }, |mut stream| async move {
1958 /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200)] } in any order
1959 /// # let mut results = vec![];
1960 /// # for _ in 0..3 {
1961 /// # results.push(stream.next().await.unwrap());
1962 /// # }
1963 /// # results.sort();
1964 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200))]);
1965 /// # }));
1966 /// # }
1967 /// ```
1968 pub fn join_keyed_singleton<V2: Clone>(
1969 self,
1970 keyed_singleton: KeyedSingleton<K, V2, L, Bounded>,
1971 ) -> KeyedStream<K, (V, V2), L, Bounded, NoOrder, R>
1972 where
1973 B: IsBounded,
1974 K: Eq + Hash,
1975 {
1976 keyed_singleton
1977 .join_keyed_stream(self.make_bounded())
1978 .map(q!(|(v2, v)| (v, v2)))
1979 }
1980
1981 /// Gets the values associated with a specific key from the keyed stream.
1982 ///
1983 /// # Example
1984 /// ```rust
1985 /// # #[cfg(feature = "deploy")] {
1986 /// # use hydro_lang::prelude::*;
1987 /// # use futures::StreamExt;
1988 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1989 /// let tick = process.tick();
1990 /// let keyed_data = process
1991 /// .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
1992 /// .into_keyed()
1993 /// .batch(&tick, nondet!(/** test */));
1994 /// let key = tick.singleton(q!(1));
1995 /// keyed_data.get(key).all_ticks()
1996 /// # }, |mut stream| async move {
1997 /// // 10, 11 in any order
1998 /// # let mut results = vec![];
1999 /// # for _ in 0..2 {
2000 /// # results.push(stream.next().await.unwrap());
2001 /// # }
2002 /// # results.sort();
2003 /// # assert_eq!(results, vec![10, 11]);
2004 /// # }));
2005 /// # }
2006 /// ```
2007 pub fn get(self, key: Singleton<K, L, Bounded>) -> Stream<V, L, Bounded, NoOrder, R>
2008 where
2009 B: IsBounded,
2010 K: Eq + Hash,
2011 {
2012 self.make_bounded()
2013 .entries()
2014 .join(key.into_stream().map(q!(|k| (k, ()))))
2015 .map(q!(|(_, (v, _))| v))
2016 }
2017
2018 /// For each value in `self`, find the matching key in `lookup`.
2019 /// The output is a keyed stream with the key from `self`, and a value
2020 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2021 /// If the key is not present in `lookup`, the option will be [`None`].
2022 ///
2023 /// # Example
2024 /// ```rust
2025 /// # #[cfg(feature = "deploy")] {
2026 /// # use hydro_lang::prelude::*;
2027 /// # use futures::StreamExt;
2028 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2029 /// # let tick = process.tick();
2030 /// let requests = // { 1: [10, 11], 2: 20 }
2031 /// # process
2032 /// # .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2033 /// # .into_keyed()
2034 /// # .batch(&tick, nondet!(/** test */));
2035 /// let other_data = // { 10: 100, 11: 110 }
2036 /// # process
2037 /// # .source_iter(q!(vec![(10, 100), (11, 110)]))
2038 /// # .into_keyed()
2039 /// # .batch(&tick, nondet!(/** test */))
2040 /// # .first();
2041 /// requests.lookup_keyed_singleton(other_data)
2042 /// # .entries().all_ticks()
2043 /// # }, |mut stream| async move {
2044 /// // { 1: [(10, Some(100)), (11, Some(110))], 2: (20, None) }
2045 /// # let mut results = vec![];
2046 /// # for _ in 0..3 {
2047 /// # results.push(stream.next().await.unwrap());
2048 /// # }
2049 /// # results.sort();
2050 /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (11, Some(110))), (2, (20, None))]);
2051 /// # }));
2052 /// # }
2053 /// ```
2054 pub fn lookup_keyed_singleton<V2>(
2055 self,
2056 lookup: KeyedSingleton<V, V2, L, Bounded>,
2057 ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
2058 where
2059 B: IsBounded,
2060 K: Eq + Hash + Clone,
2061 V: Eq + Hash + Clone,
2062 V2: Clone,
2063 {
2064 self.lookup_keyed_stream(
2065 lookup
2066 .into_keyed_stream()
2067 .assume_retries::<R>(nondet!(/** Retries are irrelevant for keyed singletons */)),
2068 )
2069 }
2070
2071 /// For each value in `self`, find the matching key in `lookup`.
2072 /// The output is a keyed stream with the key from `self`, and a value
2073 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2074 /// If the key is not present in `lookup`, the option will be [`None`].
2075 ///
2076 /// # Example
2077 /// ```rust
2078 /// # #[cfg(feature = "deploy")] {
2079 /// # use hydro_lang::prelude::*;
2080 /// # use futures::StreamExt;
2081 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2082 /// # let tick = process.tick();
2083 /// let requests = // { 1: [10, 11], 2: 20 }
2084 /// # process
2085 /// # .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2086 /// # .into_keyed()
2087 /// # .batch(&tick, nondet!(/** test */));
2088 /// let other_data = // { 10: [100, 101], 11: 110 }
2089 /// # process
2090 /// # .source_iter(q!(vec![(10, 100), (10, 101), (11, 110)]))
2091 /// # .into_keyed()
2092 /// # .batch(&tick, nondet!(/** test */));
2093 /// requests.lookup_keyed_stream(other_data)
2094 /// # .entries().all_ticks()
2095 /// # }, |mut stream| async move {
2096 /// // { 1: [(10, Some(100)), (10, Some(101)), (11, Some(110))], 2: (20, None) }
2097 /// # let mut results = vec![];
2098 /// # for _ in 0..4 {
2099 /// # results.push(stream.next().await.unwrap());
2100 /// # }
2101 /// # results.sort();
2102 /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(101))), (1, (11, Some(110))), (2, (20, None))]);
2103 /// # }));
2104 /// # }
2105 /// ```
2106 #[expect(clippy::type_complexity, reason = "retries propagation")]
2107 pub fn lookup_keyed_stream<V2, O2: Ordering, R2: Retries>(
2108 self,
2109 lookup: KeyedStream<V, V2, L, Bounded, O2, R2>,
2110 ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, <R as MinRetries<R2>>::Min>
2111 where
2112 B: IsBounded,
2113 K: Eq + Hash + Clone,
2114 V: Eq + Hash + Clone,
2115 V2: Clone,
2116 R: MinRetries<R2>,
2117 {
2118 let inverted = self
2119 .make_bounded()
2120 .entries()
2121 .map(q!(|(key, lookup_value)| (lookup_value, key)))
2122 .into_keyed();
2123 let found = inverted
2124 .clone()
2125 .join_keyed_stream(lookup.clone())
2126 .entries()
2127 .map(q!(|(lookup_value, (key, value))| (
2128 key,
2129 (lookup_value, Some(value))
2130 )))
2131 .into_keyed();
2132 let not_found = inverted
2133 .filter_key_not_in(lookup.keys())
2134 .entries()
2135 .map(q!(|(lookup_value, key)| (key, (lookup_value, None))))
2136 .into_keyed();
2137
2138 found.chain(not_found.weaken_retries::<<R as MinRetries<R2>>::Min>())
2139 }
2140
2141 /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
2142 /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
2143 ///
2144 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2145 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
2146 /// argument that declares where the stream will be atomically processed. Batching a stream into
2147 /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
2148 /// [`Tick`] will introduce asynchrony.
2149 pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
2150 let out_location = Atomic { tick: tick.clone() };
2151 KeyedStream::new(
2152 out_location.clone(),
2153 HydroNode::BeginAtomic {
2154 inner: Box::new(self.ir_node.into_inner()),
2155 metadata: out_location
2156 .new_node_metadata(KeyedStream::<K, V, Atomic<L>, B, O, R>::collection_kind()),
2157 },
2158 )
2159 }
2160
2161 /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
2162 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2163 /// the order of the input.
2164 ///
2165 /// # Non-Determinism
2166 /// The batch boundaries are non-deterministic and may change across executions.
2167 pub fn batch(
2168 self,
2169 tick: &Tick<L>,
2170 nondet: NonDet,
2171 ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2172 let _ = nondet;
2173 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2174 KeyedStream::new(
2175 tick.clone(),
2176 HydroNode::Batch {
2177 inner: Box::new(self.ir_node.into_inner()),
2178 metadata: tick.new_node_metadata(
2179 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
2180 ),
2181 },
2182 )
2183 }
2184}
2185
2186impl<'a, K1, K2, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2187 KeyedStream<(K1, K2), V, L, B, O, R>
2188{
2189 /// Produces a new keyed stream by dropping the first element of the compound key.
2190 ///
2191 /// Because multiple keys may share the same suffix, this operation results in re-grouping
2192 /// of the values under the new keys. The values across groups with the same new key
2193 /// will be interleaved, so the resulting stream has [`NoOrder`] within each group.
2194 ///
2195 /// # Example
2196 /// ```rust
2197 /// # #[cfg(feature = "deploy")] {
2198 /// # use hydro_lang::prelude::*;
2199 /// # use futures::StreamExt;
2200 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2201 /// process
2202 /// .source_iter(q!(vec![((1, 10), 2), ((1, 10), 3), ((2, 20), 4)]))
2203 /// .into_keyed()
2204 /// .drop_key_prefix()
2205 /// # .entries()
2206 /// # }, |mut stream| async move {
2207 /// // { 10: [2, 3], 20: [4] }
2208 /// # let mut results = Vec::new();
2209 /// # for _ in 0..3 {
2210 /// # results.push(stream.next().await.unwrap());
2211 /// # }
2212 /// # results.sort();
2213 /// # assert_eq!(results, vec![(10, 2), (10, 3), (20, 4)]);
2214 /// # }));
2215 /// # }
2216 /// ```
2217 pub fn drop_key_prefix(self) -> KeyedStream<K2, V, L, B, NoOrder, R> {
2218 self.entries()
2219 .map(q!(|((_k1, k2), v)| (k2, v)))
2220 .into_keyed()
2221 }
2222}
2223
2224impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries>
2225 KeyedStream<K, V, L, Unbounded, O, R>
2226{
2227 /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
2228 /// of any overlapping groups. The result has [`NoOrder`] on each group because the
2229 /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
2230 /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
2231 ///
2232 /// Currently, both input streams must be [`Unbounded`].
2233 ///
2234 /// # Example
2235 /// ```rust
2236 /// # #[cfg(feature = "deploy")] {
2237 /// # use hydro_lang::prelude::*;
2238 /// # use futures::StreamExt;
2239 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2240 /// let numbers1: KeyedStream<i32, i32, _> = // { 1: [2], 3: [4] }
2241 /// # process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed().into();
2242 /// let numbers2: KeyedStream<i32, i32, _> = // { 1: [3], 3: [5] }
2243 /// # process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed().into();
2244 /// numbers1.interleave(numbers2)
2245 /// # .entries()
2246 /// # }, |mut stream| async move {
2247 /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
2248 /// # let mut results = Vec::new();
2249 /// # for _ in 0..4 {
2250 /// # results.push(stream.next().await.unwrap());
2251 /// # }
2252 /// # results.sort();
2253 /// # assert_eq!(results, vec![(1, 2), (1, 3), (3, 4), (3, 5)]);
2254 /// # }));
2255 /// # }
2256 /// ```
2257 pub fn interleave<O2: Ordering, R2: Retries>(
2258 self,
2259 other: KeyedStream<K, V, L, Unbounded, O2, R2>,
2260 ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2261 where
2262 R: MinRetries<R2>,
2263 {
2264 KeyedStream::new(
2265 self.location.clone(),
2266 HydroNode::Chain {
2267 first: Box::new(self.ir_node.into_inner()),
2268 second: Box::new(other.ir_node.into_inner()),
2269 metadata: self.location.new_node_metadata(KeyedStream::<
2270 K,
2271 V,
2272 L,
2273 Unbounded,
2274 NoOrder,
2275 <R as MinRetries<R2>>::Min,
2276 >::collection_kind()),
2277 },
2278 )
2279 }
2280}
2281
2282impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
2283where
2284 L: Location<'a> + NoTick,
2285{
2286 /// Returns a keyed stream corresponding to the latest batch of elements being atomically
2287 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2288 /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
2289 /// used to create the atomic section.
2290 ///
2291 /// # Non-Determinism
2292 /// The batch boundaries are non-deterministic and may change across executions.
2293 pub fn batch_atomic(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2294 let _ = nondet;
2295 KeyedStream::new(
2296 self.location.clone().tick,
2297 HydroNode::Batch {
2298 inner: Box::new(self.ir_node.into_inner()),
2299 metadata: self.location.tick.new_node_metadata(KeyedStream::<
2300 K,
2301 V,
2302 Tick<L>,
2303 Bounded,
2304 O,
2305 R,
2306 >::collection_kind(
2307 )),
2308 },
2309 )
2310 }
2311
2312 /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
2313 /// See [`KeyedStream::atomic`] for more details.
2314 pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
2315 KeyedStream::new(
2316 self.location.tick.l.clone(),
2317 HydroNode::EndAtomic {
2318 inner: Box::new(self.ir_node.into_inner()),
2319 metadata: self
2320 .location
2321 .tick
2322 .l
2323 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2324 },
2325 )
2326 }
2327}
2328
2329impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
2330where
2331 L: Location<'a>,
2332{
2333 /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2334 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2335 /// each key.
2336 pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
2337 KeyedStream::new(
2338 self.location.outer().clone(),
2339 HydroNode::YieldConcat {
2340 inner: Box::new(self.ir_node.into_inner()),
2341 metadata: self.location.outer().new_node_metadata(KeyedStream::<
2342 K,
2343 V,
2344 L,
2345 Unbounded,
2346 O,
2347 R,
2348 >::collection_kind(
2349 )),
2350 },
2351 )
2352 }
2353
2354 /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2355 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2356 /// each key.
2357 ///
2358 /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
2359 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2360 /// stream's [`Tick`] context.
2361 pub fn all_ticks_atomic(self) -> KeyedStream<K, V, Atomic<L>, Unbounded, O, R> {
2362 let out_location = Atomic {
2363 tick: self.location.clone(),
2364 };
2365
2366 KeyedStream::new(
2367 out_location.clone(),
2368 HydroNode::YieldConcat {
2369 inner: Box::new(self.ir_node.into_inner()),
2370 metadata: out_location.new_node_metadata(KeyedStream::<
2371 K,
2372 V,
2373 Atomic<L>,
2374 Unbounded,
2375 O,
2376 R,
2377 >::collection_kind()),
2378 },
2379 )
2380 }
2381
2382 /// Transforms the keyed stream using the given closure in "stateful" mode, where stateful operators
2383 /// such as `fold` retrain their memory for each key across ticks rather than resetting across batches of each key.
2384 ///
2385 /// This API is particularly useful for stateful computation on batches of data, such as
2386 /// maintaining an accumulated state that is up to date with the current batch.
2387 ///
2388 /// # Example
2389 /// ```rust
2390 /// # #[cfg(feature = "deploy")] {
2391 /// # use hydro_lang::prelude::*;
2392 /// # use futures::StreamExt;
2393 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2394 /// let tick = process.tick();
2395 /// # // ticks are lazy by default, forces the second tick to run
2396 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2397 /// # let batch_first_tick = process
2398 /// # .source_iter(q!(vec![(0, 1), (1, 2), (2, 3), (3, 4)]))
2399 /// # .into_keyed()
2400 /// # .batch(&tick, nondet!(/** test */));
2401 /// # let batch_second_tick = process
2402 /// # .source_iter(q!(vec![(0, 5), (1, 6), (2, 7)]))
2403 /// # .into_keyed()
2404 /// # .batch(&tick, nondet!(/** test */))
2405 /// # .defer_tick(); // appears on the second tick
2406 /// let input = batch_first_tick.chain(batch_second_tick).all_ticks();
2407 ///
2408 /// input.batch(&tick, nondet!(/** test */))
2409 /// .across_ticks(|s| s.reduce(q!(|sum, new| {
2410 /// *sum += new;
2411 /// }))).entries().all_ticks()
2412 /// # }, |mut stream| async move {
2413 /// // First tick: [(0, 1), (1, 2), (2, 3), (3, 4)]
2414 /// # let mut results = Vec::new();
2415 /// # for _ in 0..4 {
2416 /// # results.push(stream.next().await.unwrap());
2417 /// # }
2418 /// # results.sort();
2419 /// # assert_eq!(results, vec![(0, 1), (1, 2), (2, 3), (3, 4)]);
2420 /// // Second tick: [(0, 6), (1, 8), (2, 10), (3, 4)]
2421 /// # results.clear();
2422 /// # for _ in 0..4 {
2423 /// # results.push(stream.next().await.unwrap());
2424 /// # }
2425 /// # results.sort();
2426 /// # assert_eq!(results, vec![(0, 6), (1, 8), (2, 10), (3, 4)]);
2427 /// # }));
2428 /// # }
2429 /// ```
2430 pub fn across_ticks<Out: BatchAtomic>(
2431 self,
2432 thunk: impl FnOnce(KeyedStream<K, V, Atomic<L>, Unbounded, O, R>) -> Out,
2433 ) -> Out::Batched {
2434 thunk(self.all_ticks_atomic()).batched_atomic()
2435 }
2436
2437 /// Shifts the entries in `self` to the **next tick**, so that the returned keyed stream at
2438 /// tick `T` always has the entries of `self` at tick `T - 1`.
2439 ///
2440 /// At tick `0`, the output keyed stream is empty, since there is no previous tick.
2441 ///
2442 /// This operator enables stateful iterative processing with ticks, by sending data from one
2443 /// tick to the next. For example, you can use it to combine inputs across consecutive batches.
2444 ///
2445 /// # Example
2446 /// ```rust
2447 /// # #[cfg(feature = "deploy")] {
2448 /// # use hydro_lang::prelude::*;
2449 /// # use futures::StreamExt;
2450 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2451 /// let tick = process.tick();
2452 /// # // ticks are lazy by default, forces the second tick to run
2453 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2454 /// # let batch_first_tick = process
2455 /// # .source_iter(q!(vec![(1, 2), (1, 3)]))
2456 /// # .batch(&tick, nondet!(/** test */))
2457 /// # .into_keyed();
2458 /// # let batch_second_tick = process
2459 /// # .source_iter(q!(vec![(1, 4), (2, 5)]))
2460 /// # .batch(&tick, nondet!(/** test */))
2461 /// # .defer_tick()
2462 /// # .into_keyed(); // appears on the second tick
2463 /// let changes_across_ticks = // { 1: [2, 3] } (first tick), { 1: [4], 2: [5] } (second tick)
2464 /// # batch_first_tick.chain(batch_second_tick);
2465 /// changes_across_ticks.clone().defer_tick().chain( // from the previous tick
2466 /// changes_across_ticks // from the current tick
2467 /// )
2468 /// # .entries().all_ticks()
2469 /// # }, |mut stream| async move {
2470 /// // First tick: { 1: [2, 3] }
2471 /// # let mut results = Vec::new();
2472 /// # for _ in 0..2 {
2473 /// # results.push(stream.next().await.unwrap());
2474 /// # }
2475 /// # results.sort();
2476 /// # assert_eq!(results, vec![(1, 2), (1, 3)]);
2477 /// // Second tick: { 1: [2, 3, 4], 2: [5] }
2478 /// # results.clear();
2479 /// # for _ in 0..4 {
2480 /// # results.push(stream.next().await.unwrap());
2481 /// # }
2482 /// # results.sort();
2483 /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5)]);
2484 /// // Third tick: { 1: [4], 2: [5] }
2485 /// # results.clear();
2486 /// # for _ in 0..2 {
2487 /// # results.push(stream.next().await.unwrap());
2488 /// # }
2489 /// # results.sort();
2490 /// # assert_eq!(results, vec![(1, 4), (2, 5)]);
2491 /// # }));
2492 /// # }
2493 /// ```
2494 pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2495 KeyedStream::new(
2496 self.location.clone(),
2497 HydroNode::DeferTick {
2498 input: Box::new(self.ir_node.into_inner()),
2499 metadata: self.location.new_node_metadata(KeyedStream::<
2500 K,
2501 V,
2502 Tick<L>,
2503 Bounded,
2504 O,
2505 R,
2506 >::collection_kind()),
2507 },
2508 )
2509 }
2510}
2511
2512#[cfg(test)]
2513mod tests {
2514 #[cfg(feature = "deploy")]
2515 use futures::{SinkExt, StreamExt};
2516 #[cfg(feature = "deploy")]
2517 use hydro_deploy::Deployment;
2518 #[cfg(any(feature = "deploy", feature = "sim"))]
2519 use stageleft::q;
2520
2521 #[cfg(any(feature = "deploy", feature = "sim"))]
2522 use crate::compile::builder::FlowBuilder;
2523 #[cfg(feature = "deploy")]
2524 use crate::live_collections::stream::ExactlyOnce;
2525 #[cfg(feature = "sim")]
2526 use crate::live_collections::stream::{NoOrder, TotalOrder};
2527 #[cfg(any(feature = "deploy", feature = "sim"))]
2528 use crate::location::Location;
2529 #[cfg(any(feature = "deploy", feature = "sim"))]
2530 use crate::nondet::nondet;
2531 #[cfg(feature = "deploy")]
2532 use crate::properties::ManualProof;
2533
2534 #[cfg(feature = "deploy")]
2535 #[tokio::test]
2536 async fn reduce_watermark_filter() {
2537 let mut deployment = Deployment::new();
2538
2539 let mut flow = FlowBuilder::new();
2540 let node = flow.process::<()>();
2541 let external = flow.external::<()>();
2542
2543 let node_tick = node.tick();
2544 let watermark = node_tick.singleton(q!(1));
2545
2546 let sum = node
2547 .source_stream(q!(tokio_stream::iter([
2548 (0, 100),
2549 (1, 101),
2550 (2, 102),
2551 (2, 102)
2552 ])))
2553 .into_keyed()
2554 .reduce_watermark(
2555 watermark,
2556 q!(|acc, v| {
2557 *acc += v;
2558 }),
2559 )
2560 .snapshot(&node_tick, nondet!(/** test */))
2561 .entries()
2562 .all_ticks()
2563 .send_bincode_external(&external);
2564
2565 let nodes = flow
2566 .with_process(&node, deployment.Localhost())
2567 .with_external(&external, deployment.Localhost())
2568 .deploy(&mut deployment);
2569
2570 deployment.deploy().await.unwrap();
2571
2572 let mut out = nodes.connect(sum).await;
2573
2574 deployment.start().await.unwrap();
2575
2576 assert_eq!(out.next().await.unwrap(), (2, 204));
2577 }
2578
2579 #[cfg(feature = "deploy")]
2580 #[tokio::test]
2581 async fn reduce_watermark_bounded() {
2582 let mut deployment = Deployment::new();
2583
2584 let mut flow = FlowBuilder::new();
2585 let node = flow.process::<()>();
2586 let external = flow.external::<()>();
2587
2588 let node_tick = node.tick();
2589 let watermark = node_tick.singleton(q!(1));
2590
2591 let sum = node
2592 .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2593 .into_keyed()
2594 .reduce_watermark(
2595 watermark,
2596 q!(|acc, v| {
2597 *acc += v;
2598 }),
2599 )
2600 .entries()
2601 .send_bincode_external(&external);
2602
2603 let nodes = flow
2604 .with_process(&node, deployment.Localhost())
2605 .with_external(&external, deployment.Localhost())
2606 .deploy(&mut deployment);
2607
2608 deployment.deploy().await.unwrap();
2609
2610 let mut out = nodes.connect(sum).await;
2611
2612 deployment.start().await.unwrap();
2613
2614 assert_eq!(out.next().await.unwrap(), (2, 204));
2615 }
2616
2617 #[cfg(feature = "deploy")]
2618 #[tokio::test]
2619 async fn reduce_watermark_garbage_collect() {
2620 let mut deployment = Deployment::new();
2621
2622 let mut flow = FlowBuilder::new();
2623 let node = flow.process::<()>();
2624 let external = flow.external::<()>();
2625 let (tick_send, tick_trigger) =
2626 node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
2627
2628 let node_tick = node.tick();
2629 let (watermark_complete_cycle, watermark) =
2630 node_tick.cycle_with_initial(node_tick.singleton(q!(1)));
2631 let next_watermark = watermark.clone().map(q!(|v| v + 1));
2632 watermark_complete_cycle.complete_next_tick(next_watermark);
2633
2634 let tick_triggered_input = node_tick
2635 .singleton(q!((3, 103)))
2636 .into_stream()
2637 .filter_if_some(
2638 tick_trigger
2639 .clone()
2640 .batch(&node_tick, nondet!(/** test */))
2641 .first(),
2642 )
2643 .all_ticks();
2644
2645 let sum = node
2646 .source_stream(q!(tokio_stream::iter([
2647 (0, 100),
2648 (1, 101),
2649 (2, 102),
2650 (2, 102)
2651 ])))
2652 .interleave(tick_triggered_input)
2653 .into_keyed()
2654 .reduce_watermark(
2655 watermark,
2656 q!(
2657 |acc, v| {
2658 *acc += v;
2659 },
2660 commutative = ManualProof(/* integer addition is commutative */)
2661 ),
2662 )
2663 .snapshot(&node_tick, nondet!(/** test */))
2664 .entries()
2665 .all_ticks()
2666 .send_bincode_external(&external);
2667
2668 let nodes = flow
2669 .with_default_optimize()
2670 .with_process(&node, deployment.Localhost())
2671 .with_external(&external, deployment.Localhost())
2672 .deploy(&mut deployment);
2673
2674 deployment.deploy().await.unwrap();
2675
2676 let mut tick_send = nodes.connect(tick_send).await;
2677 let mut out_recv = nodes.connect(sum).await;
2678
2679 deployment.start().await.unwrap();
2680
2681 assert_eq!(out_recv.next().await.unwrap(), (2, 204));
2682
2683 tick_send.send(()).await.unwrap();
2684
2685 assert_eq!(out_recv.next().await.unwrap(), (3, 103));
2686 }
2687
2688 #[cfg(feature = "sim")]
2689 #[test]
2690 #[should_panic]
2691 fn sim_batch_nondet_size() {
2692 let mut flow = FlowBuilder::new();
2693 let node = flow.process::<()>();
2694
2695 let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2696
2697 let tick = node.tick();
2698 let out_recv = input
2699 .batch(&tick, nondet!(/** test */))
2700 .fold(q!(|| vec![]), q!(|acc, v| acc.push(v)))
2701 .entries()
2702 .all_ticks()
2703 .sim_output();
2704
2705 flow.sim().exhaustive(async || {
2706 out_recv
2707 .assert_yields_only_unordered([(1, vec![1, 2])])
2708 .await;
2709 });
2710 }
2711
2712 #[cfg(feature = "sim")]
2713 #[test]
2714 fn sim_batch_preserves_group_order() {
2715 let mut flow = FlowBuilder::new();
2716 let node = flow.process::<()>();
2717
2718 let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
2719
2720 let tick = node.tick();
2721 let out_recv = input
2722 .batch(&tick, nondet!(/** test */))
2723 .all_ticks()
2724 .fold_early_stop(
2725 q!(|| 0),
2726 q!(|acc, v| {
2727 *acc = std::cmp::max(v, *acc);
2728 *acc >= 2
2729 }),
2730 )
2731 .entries()
2732 .sim_output();
2733
2734 let instances = flow.sim().exhaustive(async || {
2735 out_recv
2736 .assert_yields_only_unordered([(1, 2), (2, 3)])
2737 .await;
2738 });
2739
2740 assert_eq!(instances, 8);
2741 // - three cases: all three in a separate tick (pick where (2, 3) is)
2742 // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after
2743 // - two cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them
2744 // - one case: all three together
2745 }
2746
2747 #[cfg(feature = "sim")]
2748 #[test]
2749 fn sim_batch_unordered_shuffles() {
2750 let mut flow = FlowBuilder::new();
2751 let node = flow.process::<()>();
2752
2753 let input = node
2754 .source_iter(q!([(1, 1), (1, 2), (2, 3)]))
2755 .into_keyed()
2756 .weaken_ordering::<NoOrder>();
2757
2758 let tick = node.tick();
2759 let out_recv = input
2760 .batch(&tick, nondet!(/** test */))
2761 .all_ticks()
2762 .entries()
2763 .sim_output();
2764
2765 let instances = flow.sim().exhaustive(async || {
2766 out_recv
2767 .assert_yields_only_unordered([(1, 1), (1, 2), (2, 3)])
2768 .await;
2769 });
2770
2771 assert_eq!(instances, 13);
2772 // - 6 (3 * 2) cases: all three in a separate tick (pick where (2, 3) is), and order of (1, 1), (1, 2)
2773 // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2774 // - 4 (2 * 2) cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them, and order of (1, 1), (1, 2)
2775 // - one case: all three together (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
2776 }
2777
2778 #[cfg(feature = "sim")]
2779 #[test]
2780 #[should_panic]
2781 fn sim_observe_order_batched() {
2782 let mut flow = FlowBuilder::new();
2783 let node = flow.process::<()>();
2784
2785 let (port, input) = node.sim_input::<_, NoOrder, _>();
2786
2787 let tick = node.tick();
2788 let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
2789 let out_recv = batch
2790 .assume_ordering::<TotalOrder>(nondet!(/** test */))
2791 .all_ticks()
2792 .first()
2793 .entries()
2794 .sim_output();
2795
2796 flow.sim().exhaustive(async || {
2797 port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
2798 out_recv
2799 .assert_yields_only_unordered([(1, 1), (2, 1)])
2800 .await; // fails with assume_ordering
2801 });
2802 }
2803
2804 #[cfg(feature = "sim")]
2805 #[test]
2806 fn sim_observe_order_batched_count() {
2807 let mut flow = FlowBuilder::new();
2808 let node = flow.process::<()>();
2809
2810 let (port, input) = node.sim_input::<_, NoOrder, _>();
2811
2812 let tick = node.tick();
2813 let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
2814 let out_recv = batch
2815 .assume_ordering::<TotalOrder>(nondet!(/** test */))
2816 .all_ticks()
2817 .entries()
2818 .sim_output();
2819
2820 let instance_count = flow.sim().exhaustive(async || {
2821 port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
2822 let _ = out_recv.collect_sorted::<Vec<_>>().await;
2823 });
2824
2825 assert_eq!(instance_count, 104); // too complicated to enumerate here, but less than stream equivalent
2826 }
2827
2828 #[cfg(feature = "sim")]
2829 #[test]
2830 fn sim_top_level_assume_ordering() {
2831 use std::collections::HashMap;
2832
2833 let mut flow = FlowBuilder::new();
2834 let node = flow.process::<()>();
2835
2836 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2837
2838 let out_recv = input
2839 .into_keyed()
2840 .assume_ordering::<TotalOrder>(nondet!(/** test */))
2841 .fold_early_stop(
2842 q!(|| Vec::new()),
2843 q!(|acc, v| {
2844 acc.push(v);
2845 acc.len() >= 2
2846 }),
2847 )
2848 .entries()
2849 .sim_output();
2850
2851 let instance_count = flow.sim().exhaustive(async || {
2852 in_send.send_many_unordered([(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd')]);
2853 let out: HashMap<_, _> = out_recv
2854 .collect_sorted::<Vec<_>>()
2855 .await
2856 .into_iter()
2857 .collect();
2858 // Each key accumulates its values; we get one entry per key
2859 assert_eq!(out.len(), 2);
2860 });
2861
2862 assert_eq!(instance_count, 24)
2863 }
2864
2865 #[cfg(feature = "sim")]
2866 #[test]
2867 fn sim_top_level_assume_ordering_cycle_back() {
2868 use std::collections::HashMap;
2869
2870 let mut flow = FlowBuilder::new();
2871 let node = flow.process::<()>();
2872
2873 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2874
2875 let (complete_cycle_back, cycle_back) =
2876 node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
2877 let ordered = input
2878 .into_keyed()
2879 .interleave(cycle_back)
2880 .assume_ordering::<TotalOrder>(nondet!(/** test */));
2881 complete_cycle_back.complete(
2882 ordered
2883 .clone()
2884 .map(q!(|v| v + 1))
2885 .filter(q!(|v| v % 2 == 1)),
2886 );
2887
2888 let out_recv = ordered
2889 .fold_early_stop(
2890 q!(|| Vec::new()),
2891 q!(|acc, v| {
2892 acc.push(v);
2893 acc.len() >= 2
2894 }),
2895 )
2896 .entries()
2897 .sim_output();
2898
2899 let mut saw = false;
2900 let instance_count = flow.sim().exhaustive(async || {
2901 // Send (1, 0) and (1, 2). 0+1=1 is odd so cycles back.
2902 // We want to see [0, 1] - the cycled back value interleaved
2903 in_send.send_many_unordered([(1, 0), (1, 2)]);
2904 let out: HashMap<_, _> = out_recv
2905 .collect_sorted::<Vec<_>>()
2906 .await
2907 .into_iter()
2908 .collect();
2909
2910 // We want to see an instance where key 1 gets: 0, then 1 (cycled back from 0+1)
2911 if let Some(values) = out.get(&1)
2912 && *values == vec![0, 1]
2913 {
2914 saw = true;
2915 }
2916 });
2917
2918 assert!(
2919 saw,
2920 "did not see an instance with key 1 having [0, 1] in order"
2921 );
2922 assert_eq!(instance_count, 6);
2923 }
2924
2925 #[cfg(feature = "sim")]
2926 #[test]
2927 fn sim_top_level_assume_ordering_cross_key_cycle() {
2928 use std::collections::HashMap;
2929
2930 // This test demonstrates why releasing one entry at a time is important:
2931 // When one key's observed order cycles back into a different key, we need
2932 // to be able to interleave the cycled-back entry with pending items for
2933 // that other key.
2934 let mut flow = FlowBuilder::new();
2935 let node = flow.process::<()>();
2936
2937 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2938
2939 let (complete_cycle_back, cycle_back) =
2940 node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
2941 let ordered = input
2942 .into_keyed()
2943 .interleave(cycle_back)
2944 .assume_ordering::<TotalOrder>(nondet!(/** test */));
2945
2946 // Cycle back: when we see (1, 10), emit (2, 100) to key 2
2947 complete_cycle_back.complete(
2948 ordered
2949 .clone()
2950 .filter(q!(|v| *v == 10))
2951 .map(q!(|_| 100))
2952 .entries()
2953 .map(q!(|(_, v)| (2, v))) // Change key from 1 to 2
2954 .into_keyed(),
2955 );
2956
2957 let out_recv = ordered
2958 .fold_early_stop(
2959 q!(|| Vec::new()),
2960 q!(|acc, v| {
2961 acc.push(v);
2962 acc.len() >= 2
2963 }),
2964 )
2965 .entries()
2966 .sim_output();
2967
2968 // We want to see an instance where:
2969 // - (1, 10) is released first
2970 // - This causes (2, 100) to be cycled back
2971 // - (2, 100) is released BEFORE (2, 20) which was already pending
2972 let mut saw_cross_key_interleave = false;
2973 let instance_count = flow.sim().exhaustive(async || {
2974 // Send (1, 10), (1, 11) for key 1, and (2, 20), (2, 21) for key 2
2975 in_send.send_many_unordered([(1, 10), (1, 11), (2, 20), (2, 21)]);
2976 let out: HashMap<_, _> = out_recv
2977 .collect_sorted::<Vec<_>>()
2978 .await
2979 .into_iter()
2980 .collect();
2981
2982 // Check if we see the cross-key interleaving:
2983 // key 2 should have [100, 20] or [100, 21] - cycled back 100 before a pending item
2984 if let Some(values) = out.get(&2)
2985 && values.len() >= 2
2986 && values[0] == 100
2987 {
2988 saw_cross_key_interleave = true;
2989 }
2990 });
2991
2992 assert!(
2993 saw_cross_key_interleave,
2994 "did not see an instance where cycled-back 100 was released before pending items for key 2"
2995 );
2996 assert_eq!(instance_count, 60);
2997 }
2998
2999 #[cfg(feature = "sim")]
3000 #[test]
3001 fn sim_top_level_assume_ordering_cycle_back_tick() {
3002 use std::collections::HashMap;
3003
3004 let mut flow = FlowBuilder::new();
3005 let node = flow.process::<()>();
3006
3007 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3008
3009 let (complete_cycle_back, cycle_back) =
3010 node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3011 let ordered = input
3012 .into_keyed()
3013 .interleave(cycle_back)
3014 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3015 complete_cycle_back.complete(
3016 ordered
3017 .clone()
3018 .batch(&node.tick(), nondet!(/** test */))
3019 .all_ticks()
3020 .map(q!(|v| v + 1))
3021 .filter(q!(|v| v % 2 == 1)),
3022 );
3023
3024 let out_recv = ordered
3025 .fold_early_stop(
3026 q!(|| Vec::new()),
3027 q!(|acc, v| {
3028 acc.push(v);
3029 acc.len() >= 2
3030 }),
3031 )
3032 .entries()
3033 .sim_output();
3034
3035 let mut saw = false;
3036 let instance_count = flow.sim().exhaustive(async || {
3037 in_send.send_many_unordered([(1, 0), (1, 2)]);
3038 let out: HashMap<_, _> = out_recv
3039 .collect_sorted::<Vec<_>>()
3040 .await
3041 .into_iter()
3042 .collect();
3043
3044 if let Some(values) = out.get(&1)
3045 && *values == vec![0, 1]
3046 {
3047 saw = true;
3048 }
3049 });
3050
3051 assert!(
3052 saw,
3053 "did not see an instance with key 1 having [0, 1] in order"
3054 );
3055 assert_eq!(instance_count, 58);
3056 }
3057}