hydro_lang/live_collections/stream/mod.rs
1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::KeyedStream;
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::CycleId;
19use crate::compile::ir::{
20 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, StreamOrder, StreamRetry, TeeNode,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26#[cfg(stageleft_runtime)]
27use crate::location::dynamic::{DynLocation, LocationId};
28use crate::location::tick::{Atomic, DeferTick, NoAtomic};
29use crate::location::{Location, NoTick, Tick, check_matching_location};
30use crate::nondet::{NonDet, nondet};
31use crate::prelude::ManualProof;
32use crate::properties::{AggFuncAlgebra, ValidCommutativityFor, ValidIdempotenceFor};
33
34pub mod networking;
35
36/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
37#[sealed::sealed]
38pub trait Ordering:
39 MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
40{
41 /// The [`StreamOrder`] corresponding to this type.
42 const ORDERING_KIND: StreamOrder;
43}
44
45/// Marks the stream as being totally ordered, which means that there are
46/// no sources of non-determinism (other than intentional ones) that will
47/// affect the order of elements.
48pub enum TotalOrder {}
49
50#[sealed::sealed]
51impl Ordering for TotalOrder {
52 const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
53}
54
55/// Marks the stream as having no order, which means that the order of
56/// elements may be affected by non-determinism.
57///
58/// This restricts certain operators, such as `fold` and `reduce`, to only
59/// be used with commutative aggregation functions.
60pub enum NoOrder {}
61
62#[sealed::sealed]
63impl Ordering for NoOrder {
64 const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
65}
66
67/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
68/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
69/// have `Self` guarantees instead.
70#[sealed::sealed]
71pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
72#[sealed::sealed]
73impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
74
75/// Helper trait for determining the weakest of two orderings.
76#[sealed::sealed]
77pub trait MinOrder<Other: ?Sized> {
78 /// The weaker of the two orderings.
79 type Min: Ordering;
80}
81
82#[sealed::sealed]
83impl<O: Ordering> MinOrder<O> for TotalOrder {
84 type Min = O;
85}
86
87#[sealed::sealed]
88impl<O: Ordering> MinOrder<O> for NoOrder {
89 type Min = NoOrder;
90}
91
92/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
93#[sealed::sealed]
94pub trait Retries:
95 MinRetries<Self, Min = Self>
96 + MinRetries<ExactlyOnce, Min = Self>
97 + MinRetries<AtLeastOnce, Min = AtLeastOnce>
98{
99 /// The [`StreamRetry`] corresponding to this type.
100 const RETRIES_KIND: StreamRetry;
101}
102
103/// Marks the stream as having deterministic message cardinality, with no
104/// possibility of duplicates.
105pub enum ExactlyOnce {}
106
107#[sealed::sealed]
108impl Retries for ExactlyOnce {
109 const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
110}
111
112/// Marks the stream as having non-deterministic message cardinality, which
113/// means that duplicates may occur, but messages will not be dropped.
114pub enum AtLeastOnce {}
115
116#[sealed::sealed]
117impl Retries for AtLeastOnce {
118 const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
119}
120
121/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
122/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
123/// have `Self` guarantees instead.
124#[sealed::sealed]
125pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
126#[sealed::sealed]
127impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
128
129/// Helper trait for determining the weakest of two retry guarantees.
130#[sealed::sealed]
131pub trait MinRetries<Other: ?Sized> {
132 /// The weaker of the two retry guarantees.
133 type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
134}
135
136#[sealed::sealed]
137impl<R: Retries> MinRetries<R> for ExactlyOnce {
138 type Min = R;
139}
140
141#[sealed::sealed]
142impl<R: Retries> MinRetries<R> for AtLeastOnce {
143 type Min = AtLeastOnce;
144}
145
146#[sealed::sealed]
147#[diagnostic::on_unimplemented(
148 message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
149 label = "required here",
150 note = "To intentionally process the stream with a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
151)]
152/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
153pub trait IsOrdered: Ordering {}
154
155#[sealed::sealed]
156#[diagnostic::do_not_recommend]
157impl IsOrdered for TotalOrder {}
158
159#[sealed::sealed]
160#[diagnostic::on_unimplemented(
161 message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
162 label = "required here",
163 note = "To intentionally process the stream with non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
164)]
165/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
166pub trait IsExactlyOnce: Retries {}
167
168#[sealed::sealed]
169#[diagnostic::do_not_recommend]
170impl IsExactlyOnce for ExactlyOnce {}
171
172/// Streaming sequence of elements with type `Type`.
173///
174/// This live collection represents a growing sequence of elements, with new elements being
175/// asynchronously appended to the end of the sequence. This can be used to model the arrival
176/// of network input, such as API requests, or streaming ingestion.
177///
178/// By default, all streams have deterministic ordering and each element is materialized exactly
179/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
180/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
181/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
182///
183/// Type Parameters:
184/// - `Type`: the type of elements in the stream
185/// - `Loc`: the location where the stream is being materialized
186/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
187/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
188/// (default is [`TotalOrder`])
189/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
190/// [`AtLeastOnce`] (default is [`ExactlyOnce`])
191pub struct Stream<
192 Type,
193 Loc,
194 Bound: Boundedness = Unbounded,
195 Order: Ordering = TotalOrder,
196 Retry: Retries = ExactlyOnce,
197> {
198 pub(crate) location: Loc,
199 pub(crate) ir_node: RefCell<HydroNode>,
200
201 _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
202}
203
204impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
205 for Stream<T, L, Unbounded, O, R>
206where
207 L: Location<'a>,
208{
209 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
210 let new_meta = stream
211 .location
212 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
213
214 Stream {
215 location: stream.location,
216 ir_node: RefCell::new(HydroNode::Cast {
217 inner: Box::new(stream.ir_node.into_inner()),
218 metadata: new_meta,
219 }),
220 _phantom: PhantomData,
221 }
222 }
223}
224
225impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
226 for Stream<T, L, B, NoOrder, R>
227where
228 L: Location<'a>,
229{
230 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
231 stream.weaken_ordering()
232 }
233}
234
235impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
236 for Stream<T, L, B, O, AtLeastOnce>
237where
238 L: Location<'a>,
239{
240 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
241 stream.weaken_retries()
242 }
243}
244
245impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
246where
247 L: Location<'a>,
248{
249 fn defer_tick(self) -> Self {
250 Stream::defer_tick(self)
251 }
252}
253
254impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
255 for Stream<T, Tick<L>, Bounded, O, R>
256where
257 L: Location<'a>,
258{
259 type Location = Tick<L>;
260
261 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
262 Stream::new(
263 location.clone(),
264 HydroNode::CycleSource {
265 cycle_id,
266 metadata: location.new_node_metadata(Self::collection_kind()),
267 },
268 )
269 }
270}
271
272impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
273 for Stream<T, Tick<L>, Bounded, O, R>
274where
275 L: Location<'a>,
276{
277 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
278 assert_eq!(
279 Location::id(&self.location),
280 expected_location,
281 "locations do not match"
282 );
283 self.location
284 .flow_state()
285 .borrow_mut()
286 .push_root(HydroRoot::CycleSink {
287 cycle_id,
288 input: Box::new(self.ir_node.into_inner()),
289 op_metadata: HydroIrOpMetadata::new(),
290 });
291 }
292}
293
294impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
295 for Stream<T, L, B, O, R>
296where
297 L: Location<'a> + NoTick,
298{
299 type Location = L;
300
301 fn create_source(cycle_id: CycleId, location: L) -> Self {
302 Stream::new(
303 location.clone(),
304 HydroNode::CycleSource {
305 cycle_id,
306 metadata: location.new_node_metadata(Self::collection_kind()),
307 },
308 )
309 }
310}
311
312impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
313 for Stream<T, L, B, O, R>
314where
315 L: Location<'a> + NoTick,
316{
317 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
318 assert_eq!(
319 Location::id(&self.location),
320 expected_location,
321 "locations do not match"
322 );
323 self.location
324 .flow_state()
325 .borrow_mut()
326 .push_root(HydroRoot::CycleSink {
327 cycle_id,
328 input: Box::new(self.ir_node.into_inner()),
329 op_metadata: HydroIrOpMetadata::new(),
330 });
331 }
332}
333
334impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
335where
336 T: Clone,
337 L: Location<'a>,
338{
339 fn clone(&self) -> Self {
340 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
341 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
342 *self.ir_node.borrow_mut() = HydroNode::Tee {
343 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
344 metadata: self.location.new_node_metadata(Self::collection_kind()),
345 };
346 }
347
348 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
349 Stream {
350 location: self.location.clone(),
351 ir_node: HydroNode::Tee {
352 inner: TeeNode(inner.0.clone()),
353 metadata: metadata.clone(),
354 }
355 .into(),
356 _phantom: PhantomData,
357 }
358 } else {
359 unreachable!()
360 }
361 }
362}
363
364impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
365where
366 L: Location<'a>,
367{
368 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
369 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
370 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
371
372 Stream {
373 location,
374 ir_node: RefCell::new(ir_node),
375 _phantom: PhantomData,
376 }
377 }
378
379 /// Returns the [`Location`] where this stream is being materialized.
380 pub fn location(&self) -> &L {
381 &self.location
382 }
383
384 pub(crate) fn collection_kind() -> CollectionKind {
385 CollectionKind::Stream {
386 bound: B::BOUND_KIND,
387 order: O::ORDERING_KIND,
388 retry: R::RETRIES_KIND,
389 element_type: quote_type::<T>().into(),
390 }
391 }
392
393 /// Produces a stream based on invoking `f` on each element.
394 /// If you do not want to modify the stream and instead only want to view
395 /// each item use [`Stream::inspect`] instead.
396 ///
397 /// # Example
398 /// ```rust
399 /// # #[cfg(feature = "deploy")] {
400 /// # use hydro_lang::prelude::*;
401 /// # use futures::StreamExt;
402 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
403 /// let words = process.source_iter(q!(vec!["hello", "world"]));
404 /// words.map(q!(|x| x.to_uppercase()))
405 /// # }, |mut stream| async move {
406 /// # for w in vec!["HELLO", "WORLD"] {
407 /// # assert_eq!(stream.next().await.unwrap(), w);
408 /// # }
409 /// # }));
410 /// # }
411 /// ```
412 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
413 where
414 F: Fn(T) -> U + 'a,
415 {
416 let f = f.splice_fn1_ctx(&self.location).into();
417 Stream::new(
418 self.location.clone(),
419 HydroNode::Map {
420 f,
421 input: Box::new(self.ir_node.into_inner()),
422 metadata: self
423 .location
424 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
425 },
426 )
427 }
428
429 /// For each item `i` in the input stream, transform `i` using `f` and then treat the
430 /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
431 /// for the output type `U` must produce items in a **deterministic** order.
432 ///
433 /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
434 /// not deterministic, use [`Stream::flat_map_unordered`] instead.
435 ///
436 /// # Example
437 /// ```rust
438 /// # #[cfg(feature = "deploy")] {
439 /// # use hydro_lang::prelude::*;
440 /// # use futures::StreamExt;
441 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
442 /// process
443 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
444 /// .flat_map_ordered(q!(|x| x))
445 /// # }, |mut stream| async move {
446 /// // 1, 2, 3, 4
447 /// # for w in (1..5) {
448 /// # assert_eq!(stream.next().await.unwrap(), w);
449 /// # }
450 /// # }));
451 /// # }
452 /// ```
453 pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
454 where
455 I: IntoIterator<Item = U>,
456 F: Fn(T) -> I + 'a,
457 {
458 let f = f.splice_fn1_ctx(&self.location).into();
459 Stream::new(
460 self.location.clone(),
461 HydroNode::FlatMap {
462 f,
463 input: Box::new(self.ir_node.into_inner()),
464 metadata: self
465 .location
466 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
467 },
468 )
469 }
470
471 /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
472 /// for the output type `U` to produce items in any order.
473 ///
474 /// # Example
475 /// ```rust
476 /// # #[cfg(feature = "deploy")] {
477 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
478 /// # use futures::StreamExt;
479 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
480 /// process
481 /// .source_iter(q!(vec![
482 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
483 /// std::collections::HashSet::from_iter(vec![3, 4]),
484 /// ]))
485 /// .flat_map_unordered(q!(|x| x))
486 /// # }, |mut stream| async move {
487 /// // 1, 2, 3, 4, but in no particular order
488 /// # let mut results = Vec::new();
489 /// # for w in (1..5) {
490 /// # results.push(stream.next().await.unwrap());
491 /// # }
492 /// # results.sort();
493 /// # assert_eq!(results, vec![1, 2, 3, 4]);
494 /// # }));
495 /// # }
496 /// ```
497 pub fn flat_map_unordered<U, I, F>(
498 self,
499 f: impl IntoQuotedMut<'a, F, L>,
500 ) -> Stream<U, L, B, NoOrder, R>
501 where
502 I: IntoIterator<Item = U>,
503 F: Fn(T) -> I + 'a,
504 {
505 let f = f.splice_fn1_ctx(&self.location).into();
506 Stream::new(
507 self.location.clone(),
508 HydroNode::FlatMap {
509 f,
510 input: Box::new(self.ir_node.into_inner()),
511 metadata: self
512 .location
513 .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
514 },
515 )
516 }
517
518 /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
519 /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
520 ///
521 /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
522 /// not deterministic, use [`Stream::flatten_unordered`] instead.
523 ///
524 /// ```rust
525 /// # #[cfg(feature = "deploy")] {
526 /// # use hydro_lang::prelude::*;
527 /// # use futures::StreamExt;
528 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
529 /// process
530 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
531 /// .flatten_ordered()
532 /// # }, |mut stream| async move {
533 /// // 1, 2, 3, 4
534 /// # for w in (1..5) {
535 /// # assert_eq!(stream.next().await.unwrap(), w);
536 /// # }
537 /// # }));
538 /// # }
539 /// ```
540 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
541 where
542 T: IntoIterator<Item = U>,
543 {
544 self.flat_map_ordered(q!(|d| d))
545 }
546
547 /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
548 /// for the element type `T` to produce items in any order.
549 ///
550 /// # Example
551 /// ```rust
552 /// # #[cfg(feature = "deploy")] {
553 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
554 /// # use futures::StreamExt;
555 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
556 /// process
557 /// .source_iter(q!(vec![
558 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
559 /// std::collections::HashSet::from_iter(vec![3, 4]),
560 /// ]))
561 /// .flatten_unordered()
562 /// # }, |mut stream| async move {
563 /// // 1, 2, 3, 4, but in no particular order
564 /// # let mut results = Vec::new();
565 /// # for w in (1..5) {
566 /// # results.push(stream.next().await.unwrap());
567 /// # }
568 /// # results.sort();
569 /// # assert_eq!(results, vec![1, 2, 3, 4]);
570 /// # }));
571 /// # }
572 /// ```
573 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
574 where
575 T: IntoIterator<Item = U>,
576 {
577 self.flat_map_unordered(q!(|d| d))
578 }
579
580 /// Creates a stream containing only the elements of the input stream that satisfy a predicate
581 /// `f`, preserving the order of the elements.
582 ///
583 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
584 /// not modify or take ownership of the values. If you need to modify the values while filtering
585 /// use [`Stream::filter_map`] instead.
586 ///
587 /// # Example
588 /// ```rust
589 /// # #[cfg(feature = "deploy")] {
590 /// # use hydro_lang::prelude::*;
591 /// # use futures::StreamExt;
592 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
593 /// process
594 /// .source_iter(q!(vec![1, 2, 3, 4]))
595 /// .filter(q!(|&x| x > 2))
596 /// # }, |mut stream| async move {
597 /// // 3, 4
598 /// # for w in (3..5) {
599 /// # assert_eq!(stream.next().await.unwrap(), w);
600 /// # }
601 /// # }));
602 /// # }
603 /// ```
604 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
605 where
606 F: Fn(&T) -> bool + 'a,
607 {
608 let f = f.splice_fn1_borrow_ctx(&self.location).into();
609 Stream::new(
610 self.location.clone(),
611 HydroNode::Filter {
612 f,
613 input: Box::new(self.ir_node.into_inner()),
614 metadata: self.location.new_node_metadata(Self::collection_kind()),
615 },
616 )
617 }
618
619 /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
620 ///
621 /// # Example
622 /// ```rust
623 /// # #[cfg(feature = "deploy")] {
624 /// # use hydro_lang::prelude::*;
625 /// # use futures::StreamExt;
626 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
627 /// process
628 /// .source_iter(q!(vec!["1", "hello", "world", "2"]))
629 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
630 /// # }, |mut stream| async move {
631 /// // 1, 2
632 /// # for w in (1..3) {
633 /// # assert_eq!(stream.next().await.unwrap(), w);
634 /// # }
635 /// # }));
636 /// # }
637 /// ```
638 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
639 where
640 F: Fn(T) -> Option<U> + 'a,
641 {
642 let f = f.splice_fn1_ctx(&self.location).into();
643 Stream::new(
644 self.location.clone(),
645 HydroNode::FilterMap {
646 f,
647 input: Box::new(self.ir_node.into_inner()),
648 metadata: self
649 .location
650 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
651 },
652 )
653 }
654
655 /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
656 /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
657 /// If `other` is an empty [`Optional`], no values will be produced.
658 ///
659 /// # Example
660 /// ```rust
661 /// # #[cfg(feature = "deploy")] {
662 /// # use hydro_lang::prelude::*;
663 /// # use futures::StreamExt;
664 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
665 /// let tick = process.tick();
666 /// let batch = process
667 /// .source_iter(q!(vec![1, 2, 3, 4]))
668 /// .batch(&tick, nondet!(/** test */));
669 /// let count = batch.clone().count(); // `count()` returns a singleton
670 /// batch.cross_singleton(count).all_ticks()
671 /// # }, |mut stream| async move {
672 /// // (1, 4), (2, 4), (3, 4), (4, 4)
673 /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
674 /// # assert_eq!(stream.next().await.unwrap(), w);
675 /// # }
676 /// # }));
677 /// # }
678 /// ```
679 pub fn cross_singleton<O2>(
680 self,
681 other: impl Into<Optional<O2, L, Bounded>>,
682 ) -> Stream<(T, O2), L, B, O, R>
683 where
684 O2: Clone,
685 {
686 let other: Optional<O2, L, Bounded> = other.into();
687 check_matching_location(&self.location, &other.location);
688
689 Stream::new(
690 self.location.clone(),
691 HydroNode::CrossSingleton {
692 left: Box::new(self.ir_node.into_inner()),
693 right: Box::new(other.ir_node.into_inner()),
694 metadata: self
695 .location
696 .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
697 },
698 )
699 }
700
701 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
702 ///
703 /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
704 /// leader of a cluster.
705 ///
706 /// # Example
707 /// ```rust
708 /// # #[cfg(feature = "deploy")] {
709 /// # use hydro_lang::prelude::*;
710 /// # use futures::StreamExt;
711 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
712 /// let tick = process.tick();
713 /// // ticks are lazy by default, forces the second tick to run
714 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
715 ///
716 /// let batch_first_tick = process
717 /// .source_iter(q!(vec![1, 2, 3, 4]))
718 /// .batch(&tick, nondet!(/** test */));
719 /// let batch_second_tick = process
720 /// .source_iter(q!(vec![5, 6, 7, 8]))
721 /// .batch(&tick, nondet!(/** test */))
722 /// .defer_tick(); // appears on the second tick
723 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
724 /// batch_first_tick.chain(batch_second_tick)
725 /// .filter_if_some(some_on_first_tick)
726 /// .all_ticks()
727 /// # }, |mut stream| async move {
728 /// // [1, 2, 3, 4]
729 /// # for w in vec![1, 2, 3, 4] {
730 /// # assert_eq!(stream.next().await.unwrap(), w);
731 /// # }
732 /// # }));
733 /// # }
734 /// ```
735 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
736 self.cross_singleton(signal.map(q!(|_u| ())))
737 .map(q!(|(d, _signal)| d))
738 }
739
740 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
741 ///
742 /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
743 /// some local state.
744 ///
745 /// # Example
746 /// ```rust
747 /// # #[cfg(feature = "deploy")] {
748 /// # use hydro_lang::prelude::*;
749 /// # use futures::StreamExt;
750 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
751 /// let tick = process.tick();
752 /// // ticks are lazy by default, forces the second tick to run
753 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
754 ///
755 /// let batch_first_tick = process
756 /// .source_iter(q!(vec![1, 2, 3, 4]))
757 /// .batch(&tick, nondet!(/** test */));
758 /// let batch_second_tick = process
759 /// .source_iter(q!(vec![5, 6, 7, 8]))
760 /// .batch(&tick, nondet!(/** test */))
761 /// .defer_tick(); // appears on the second tick
762 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
763 /// batch_first_tick.chain(batch_second_tick)
764 /// .filter_if_none(some_on_first_tick)
765 /// .all_ticks()
766 /// # }, |mut stream| async move {
767 /// // [5, 6, 7, 8]
768 /// # for w in vec![5, 6, 7, 8] {
769 /// # assert_eq!(stream.next().await.unwrap(), w);
770 /// # }
771 /// # }));
772 /// # }
773 /// ```
774 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
775 self.filter_if_some(
776 other
777 .map(q!(|_| ()))
778 .into_singleton()
779 .filter(q!(|o| o.is_none())),
780 )
781 }
782
783 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
784 /// tupled pairs in a non-deterministic order.
785 ///
786 /// # Example
787 /// ```rust
788 /// # #[cfg(feature = "deploy")] {
789 /// # use hydro_lang::prelude::*;
790 /// # use std::collections::HashSet;
791 /// # use futures::StreamExt;
792 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
793 /// let tick = process.tick();
794 /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
795 /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
796 /// stream1.cross_product(stream2)
797 /// # }, |mut stream| async move {
798 /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
799 /// # stream.map(|i| assert!(expected.contains(&i)));
800 /// # }));
801 /// # }
802 /// ```
803 pub fn cross_product<T2, O2: Ordering>(
804 self,
805 other: Stream<T2, L, B, O2, R>,
806 ) -> Stream<(T, T2), L, B, NoOrder, R>
807 where
808 T: Clone,
809 T2: Clone,
810 {
811 check_matching_location(&self.location, &other.location);
812
813 Stream::new(
814 self.location.clone(),
815 HydroNode::CrossProduct {
816 left: Box::new(self.ir_node.into_inner()),
817 right: Box::new(other.ir_node.into_inner()),
818 metadata: self
819 .location
820 .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
821 },
822 )
823 }
824
825 /// Takes one stream as input and filters out any duplicate occurrences. The output
826 /// contains all unique values from the input.
827 ///
828 /// # Example
829 /// ```rust
830 /// # #[cfg(feature = "deploy")] {
831 /// # use hydro_lang::prelude::*;
832 /// # use futures::StreamExt;
833 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
834 /// let tick = process.tick();
835 /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
836 /// # }, |mut stream| async move {
837 /// # for w in vec![1, 2, 3, 4] {
838 /// # assert_eq!(stream.next().await.unwrap(), w);
839 /// # }
840 /// # }));
841 /// # }
842 /// ```
843 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
844 where
845 T: Eq + Hash,
846 {
847 Stream::new(
848 self.location.clone(),
849 HydroNode::Unique {
850 input: Box::new(self.ir_node.into_inner()),
851 metadata: self
852 .location
853 .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
854 },
855 )
856 }
857
858 /// Outputs everything in this stream that is *not* contained in the `other` stream.
859 ///
860 /// The `other` stream must be [`Bounded`], since this function will wait until
861 /// all its elements are available before producing any output.
862 /// # Example
863 /// ```rust
864 /// # #[cfg(feature = "deploy")] {
865 /// # use hydro_lang::prelude::*;
866 /// # use futures::StreamExt;
867 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
868 /// let tick = process.tick();
869 /// let stream = process
870 /// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
871 /// .batch(&tick, nondet!(/** test */));
872 /// let batch = process
873 /// .source_iter(q!(vec![1, 2]))
874 /// .batch(&tick, nondet!(/** test */));
875 /// stream.filter_not_in(batch).all_ticks()
876 /// # }, |mut stream| async move {
877 /// # for w in vec![3, 4] {
878 /// # assert_eq!(stream.next().await.unwrap(), w);
879 /// # }
880 /// # }));
881 /// # }
882 /// ```
883 pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
884 where
885 T: Eq + Hash,
886 B2: IsBounded,
887 {
888 check_matching_location(&self.location, &other.location);
889
890 Stream::new(
891 self.location.clone(),
892 HydroNode::Difference {
893 pos: Box::new(self.ir_node.into_inner()),
894 neg: Box::new(other.ir_node.into_inner()),
895 metadata: self
896 .location
897 .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
898 },
899 )
900 }
901
902 /// An operator which allows you to "inspect" each element of a stream without
903 /// modifying it. The closure `f` is called on a reference to each item. This is
904 /// mainly useful for debugging, and should not be used to generate side-effects.
905 ///
906 /// # Example
907 /// ```rust
908 /// # #[cfg(feature = "deploy")] {
909 /// # use hydro_lang::prelude::*;
910 /// # use futures::StreamExt;
911 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
912 /// let nums = process.source_iter(q!(vec![1, 2]));
913 /// // prints "1 * 10 = 10" and "2 * 10 = 20"
914 /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
915 /// # }, |mut stream| async move {
916 /// # for w in vec![1, 2] {
917 /// # assert_eq!(stream.next().await.unwrap(), w);
918 /// # }
919 /// # }));
920 /// # }
921 /// ```
922 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
923 where
924 F: Fn(&T) + 'a,
925 {
926 let f = f.splice_fn1_borrow_ctx(&self.location).into();
927
928 Stream::new(
929 self.location.clone(),
930 HydroNode::Inspect {
931 f,
932 input: Box::new(self.ir_node.into_inner()),
933 metadata: self.location.new_node_metadata(Self::collection_kind()),
934 },
935 )
936 }
937
938 /// Executes the provided closure for every element in this stream.
939 ///
940 /// Because the closure may have side effects, the stream must have deterministic order
941 /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
942 /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
943 /// [`Stream::assume_retries`] with an explanation for why this is the case.
944 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
945 where
946 O: IsOrdered,
947 R: IsExactlyOnce,
948 {
949 let f = f.splice_fn1_ctx(&self.location).into();
950 self.location
951 .flow_state()
952 .borrow_mut()
953 .push_root(HydroRoot::ForEach {
954 input: Box::new(self.ir_node.into_inner()),
955 f,
956 op_metadata: HydroIrOpMetadata::new(),
957 });
958 }
959
960 /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
961 /// TCP socket to some other server. You should _not_ use this API for interacting with
962 /// external clients, instead see [`Location::bidi_external_many_bytes`] and
963 /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
964 /// interaction with asynchronous sinks.
965 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
966 where
967 O: IsOrdered,
968 R: IsExactlyOnce,
969 S: 'a + futures::Sink<T> + Unpin,
970 {
971 self.location
972 .flow_state()
973 .borrow_mut()
974 .push_root(HydroRoot::DestSink {
975 sink: sink.splice_typed_ctx(&self.location).into(),
976 input: Box::new(self.ir_node.into_inner()),
977 op_metadata: HydroIrOpMetadata::new(),
978 });
979 }
980
981 /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
982 ///
983 /// # Example
984 /// ```rust
985 /// # #[cfg(feature = "deploy")] {
986 /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
987 /// # use futures::StreamExt;
988 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
989 /// let tick = process.tick();
990 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
991 /// numbers.enumerate()
992 /// # }, |mut stream| async move {
993 /// // (0, 1), (1, 2), (2, 3), (3, 4)
994 /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
995 /// # assert_eq!(stream.next().await.unwrap(), w);
996 /// # }
997 /// # }));
998 /// # }
999 /// ```
1000 pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1001 where
1002 O: IsOrdered,
1003 R: IsExactlyOnce,
1004 {
1005 Stream::new(
1006 self.location.clone(),
1007 HydroNode::Enumerate {
1008 input: Box::new(self.ir_node.into_inner()),
1009 metadata: self.location.new_node_metadata(Stream::<
1010 (usize, T),
1011 L,
1012 B,
1013 TotalOrder,
1014 ExactlyOnce,
1015 >::collection_kind()),
1016 },
1017 )
1018 }
1019
1020 /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1021 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1022 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1023 ///
1024 /// Depending on the input stream guarantees, the closure may need to be commutative
1025 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1026 ///
1027 /// # Example
1028 /// ```rust
1029 /// # #[cfg(feature = "deploy")] {
1030 /// # use hydro_lang::prelude::*;
1031 /// # use futures::StreamExt;
1032 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1033 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1034 /// words
1035 /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1036 /// .into_stream()
1037 /// # }, |mut stream| async move {
1038 /// // "HELLOWORLD"
1039 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1040 /// # }));
1041 /// # }
1042 /// ```
1043 pub fn fold<A, I, F, C, Idemp>(
1044 self,
1045 init: impl IntoQuotedMut<'a, I, L>,
1046 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1047 ) -> Singleton<A, L, B>
1048 where
1049 I: Fn() -> A + 'a,
1050 F: Fn(&mut A, T),
1051 C: ValidCommutativityFor<O>,
1052 Idemp: ValidIdempotenceFor<R>,
1053 {
1054 let init = init.splice_fn0_ctx(&self.location).into();
1055 let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1056 proof.register_proof(&comb);
1057
1058 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1059 let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1060
1061 let core = HydroNode::Fold {
1062 init,
1063 acc: comb.into(),
1064 input: Box::new(ordered_etc.ir_node.into_inner()),
1065 metadata: ordered_etc
1066 .location
1067 .new_node_metadata(Singleton::<A, L, B>::collection_kind()),
1068 };
1069
1070 Singleton::new(ordered_etc.location, core)
1071 }
1072
1073 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1074 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1075 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1076 /// reference, so that it can be modified in place.
1077 ///
1078 /// Depending on the input stream guarantees, the closure may need to be commutative
1079 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1080 ///
1081 /// # Example
1082 /// ```rust
1083 /// # #[cfg(feature = "deploy")] {
1084 /// # use hydro_lang::prelude::*;
1085 /// # use futures::StreamExt;
1086 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1087 /// let bools = process.source_iter(q!(vec![false, true, false]));
1088 /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1089 /// # }, |mut stream| async move {
1090 /// // true
1091 /// # assert_eq!(stream.next().await.unwrap(), true);
1092 /// # }));
1093 /// # }
1094 /// ```
1095 pub fn reduce<F, C, Idemp>(
1096 self,
1097 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1098 ) -> Optional<T, L, B>
1099 where
1100 F: Fn(&mut T, T) + 'a,
1101 C: ValidCommutativityFor<O>,
1102 Idemp: ValidIdempotenceFor<R>,
1103 {
1104 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1105 proof.register_proof(&f);
1106
1107 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1108 let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1109
1110 let core = HydroNode::Reduce {
1111 f: f.into(),
1112 input: Box::new(ordered_etc.ir_node.into_inner()),
1113 metadata: ordered_etc
1114 .location
1115 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1116 };
1117
1118 Optional::new(ordered_etc.location, core)
1119 }
1120
1121 /// Computes the maximum element in the stream as an [`Optional`], which
1122 /// will be empty until the first element in the input arrives.
1123 ///
1124 /// # Example
1125 /// ```rust
1126 /// # #[cfg(feature = "deploy")] {
1127 /// # use hydro_lang::prelude::*;
1128 /// # use futures::StreamExt;
1129 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1130 /// let tick = process.tick();
1131 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1132 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1133 /// batch.max().all_ticks()
1134 /// # }, |mut stream| async move {
1135 /// // 4
1136 /// # assert_eq!(stream.next().await.unwrap(), 4);
1137 /// # }));
1138 /// # }
1139 /// ```
1140 pub fn max(self) -> Optional<T, L, B>
1141 where
1142 T: Ord,
1143 {
1144 self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1145 .assume_ordering_trusted_bounded::<TotalOrder>(
1146 nondet!(/** max is commutative, but order affects intermediates */),
1147 )
1148 .reduce(q!(|curr, new| {
1149 if new > *curr {
1150 *curr = new;
1151 }
1152 }))
1153 }
1154
1155 /// Computes the minimum element in the stream as an [`Optional`], which
1156 /// will be empty until the first element in the input arrives.
1157 ///
1158 /// # Example
1159 /// ```rust
1160 /// # #[cfg(feature = "deploy")] {
1161 /// # use hydro_lang::prelude::*;
1162 /// # use futures::StreamExt;
1163 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1164 /// let tick = process.tick();
1165 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1166 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1167 /// batch.min().all_ticks()
1168 /// # }, |mut stream| async move {
1169 /// // 1
1170 /// # assert_eq!(stream.next().await.unwrap(), 1);
1171 /// # }));
1172 /// # }
1173 /// ```
1174 pub fn min(self) -> Optional<T, L, B>
1175 where
1176 T: Ord,
1177 {
1178 self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1179 .assume_ordering_trusted_bounded::<TotalOrder>(
1180 nondet!(/** max is commutative, but order affects intermediates */),
1181 )
1182 .reduce(q!(|curr, new| {
1183 if new < *curr {
1184 *curr = new;
1185 }
1186 }))
1187 }
1188
1189 /// Computes the first element in the stream as an [`Optional`], which
1190 /// will be empty until the first element in the input arrives.
1191 ///
1192 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1193 /// re-ordering of elements may cause the first element to change.
1194 ///
1195 /// # Example
1196 /// ```rust
1197 /// # #[cfg(feature = "deploy")] {
1198 /// # use hydro_lang::prelude::*;
1199 /// # use futures::StreamExt;
1200 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1201 /// let tick = process.tick();
1202 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1203 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1204 /// batch.first().all_ticks()
1205 /// # }, |mut stream| async move {
1206 /// // 1
1207 /// # assert_eq!(stream.next().await.unwrap(), 1);
1208 /// # }));
1209 /// # }
1210 /// ```
1211 pub fn first(self) -> Optional<T, L, B>
1212 where
1213 O: IsOrdered,
1214 {
1215 self.make_totally_ordered()
1216 .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1217 .reduce(q!(|_, _| {}))
1218 }
1219
1220 /// Computes the last element in the stream as an [`Optional`], which
1221 /// will be empty until an element in the input arrives.
1222 ///
1223 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1224 /// re-ordering of elements may cause the last element to change.
1225 ///
1226 /// # Example
1227 /// ```rust
1228 /// # #[cfg(feature = "deploy")] {
1229 /// # use hydro_lang::prelude::*;
1230 /// # use futures::StreamExt;
1231 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1232 /// let tick = process.tick();
1233 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1234 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1235 /// batch.last().all_ticks()
1236 /// # }, |mut stream| async move {
1237 /// // 4
1238 /// # assert_eq!(stream.next().await.unwrap(), 4);
1239 /// # }));
1240 /// # }
1241 /// ```
1242 pub fn last(self) -> Optional<T, L, B>
1243 where
1244 O: IsOrdered,
1245 {
1246 self.make_totally_ordered()
1247 .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1248 .reduce(q!(|curr, new| *curr = new))
1249 }
1250
1251 /// Collects all the elements of this stream into a single [`Vec`] element.
1252 ///
1253 /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1254 /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1255 /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1256 /// the vector at an arbitrary point in time.
1257 ///
1258 /// # Example
1259 /// ```rust
1260 /// # #[cfg(feature = "deploy")] {
1261 /// # use hydro_lang::prelude::*;
1262 /// # use futures::StreamExt;
1263 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1264 /// let tick = process.tick();
1265 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1266 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1267 /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1268 /// # }, |mut stream| async move {
1269 /// // [ vec![1, 2, 3, 4] ]
1270 /// # for w in vec![vec![1, 2, 3, 4]] {
1271 /// # assert_eq!(stream.next().await.unwrap(), w);
1272 /// # }
1273 /// # }));
1274 /// # }
1275 /// ```
1276 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1277 where
1278 O: IsOrdered,
1279 R: IsExactlyOnce,
1280 {
1281 self.make_totally_ordered().make_exactly_once().fold(
1282 q!(|| vec![]),
1283 q!(|acc, v| {
1284 acc.push(v);
1285 }),
1286 )
1287 }
1288
1289 /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1290 /// and emitting each intermediate result.
1291 ///
1292 /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1293 /// containing all intermediate accumulated values. The scan operation can also terminate early
1294 /// by returning `None`.
1295 ///
1296 /// The function takes a mutable reference to the accumulator and the current element, and returns
1297 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1298 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1299 ///
1300 /// # Examples
1301 ///
1302 /// Basic usage - running sum:
1303 /// ```rust
1304 /// # #[cfg(feature = "deploy")] {
1305 /// # use hydro_lang::prelude::*;
1306 /// # use futures::StreamExt;
1307 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1308 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1309 /// q!(|| 0),
1310 /// q!(|acc, x| {
1311 /// *acc += x;
1312 /// Some(*acc)
1313 /// }),
1314 /// )
1315 /// # }, |mut stream| async move {
1316 /// // Output: 1, 3, 6, 10
1317 /// # for w in vec![1, 3, 6, 10] {
1318 /// # assert_eq!(stream.next().await.unwrap(), w);
1319 /// # }
1320 /// # }));
1321 /// # }
1322 /// ```
1323 ///
1324 /// Early termination example:
1325 /// ```rust
1326 /// # #[cfg(feature = "deploy")] {
1327 /// # use hydro_lang::prelude::*;
1328 /// # use futures::StreamExt;
1329 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1330 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1331 /// q!(|| 1),
1332 /// q!(|state, x| {
1333 /// *state = *state * x;
1334 /// if *state > 6 {
1335 /// None // Terminate the stream
1336 /// } else {
1337 /// Some(-*state)
1338 /// }
1339 /// }),
1340 /// )
1341 /// # }, |mut stream| async move {
1342 /// // Output: -1, -2, -6
1343 /// # for w in vec![-1, -2, -6] {
1344 /// # assert_eq!(stream.next().await.unwrap(), w);
1345 /// # }
1346 /// # }));
1347 /// # }
1348 /// ```
1349 pub fn scan<A, U, I, F>(
1350 self,
1351 init: impl IntoQuotedMut<'a, I, L>,
1352 f: impl IntoQuotedMut<'a, F, L>,
1353 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1354 where
1355 O: IsOrdered,
1356 R: IsExactlyOnce,
1357 I: Fn() -> A + 'a,
1358 F: Fn(&mut A, T) -> Option<U> + 'a,
1359 {
1360 let init = init.splice_fn0_ctx(&self.location).into();
1361 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1362
1363 Stream::new(
1364 self.location.clone(),
1365 HydroNode::Scan {
1366 init,
1367 acc: f,
1368 input: Box::new(self.ir_node.into_inner()),
1369 metadata: self.location.new_node_metadata(
1370 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1371 ),
1372 },
1373 )
1374 }
1375
1376 /// Given a time interval, returns a stream corresponding to samples taken from the
1377 /// stream roughly at that interval. The output will have elements in the same order
1378 /// as the input, but with arbitrary elements skipped between samples. There is also
1379 /// no guarantee on the exact timing of the samples.
1380 ///
1381 /// # Non-Determinism
1382 /// The output stream is non-deterministic in which elements are sampled, since this
1383 /// is controlled by a clock.
1384 pub fn sample_every(
1385 self,
1386 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1387 nondet: NonDet,
1388 ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
1389 where
1390 L: NoTick + NoAtomic,
1391 {
1392 let samples = self.location.source_interval(interval, nondet);
1393
1394 let tick = self.location.tick();
1395 self.batch(&tick, nondet)
1396 .filter_if_some(samples.batch(&tick, nondet).first())
1397 .all_ticks()
1398 .weaken_retries()
1399 }
1400
1401 /// Given a timeout duration, returns an [`Optional`] which will have a value if the
1402 /// stream has not emitted a value since that duration.
1403 ///
1404 /// # Non-Determinism
1405 /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1406 /// samples take place, timeouts may be non-deterministically generated or missed,
1407 /// and the notification of the timeout may be delayed as well. There is also no
1408 /// guarantee on how long the [`Optional`] will have a value after the timeout is
1409 /// detected based on when the next sample is taken.
1410 pub fn timeout(
1411 self,
1412 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
1413 nondet: NonDet,
1414 ) -> Optional<(), L, Unbounded>
1415 where
1416 L: NoTick + NoAtomic,
1417 {
1418 let tick = self.location.tick();
1419
1420 let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
1421 q!(|| None),
1422 q!(
1423 |latest, _| {
1424 *latest = Some(Instant::now());
1425 },
1426 commutative = ManualProof(/* TODO */)
1427 ),
1428 );
1429
1430 latest_received
1431 .snapshot(&tick, nondet)
1432 .filter_map(q!(move |latest_received| {
1433 if let Some(latest_received) = latest_received {
1434 if Instant::now().duration_since(latest_received) > duration {
1435 Some(())
1436 } else {
1437 None
1438 }
1439 } else {
1440 Some(())
1441 }
1442 }))
1443 .latest()
1444 }
1445
1446 /// Shifts this stream into an atomic context, which guarantees that any downstream logic
1447 /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
1448 ///
1449 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1450 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1451 /// argument that declares where the stream will be atomically processed. Batching a stream into
1452 /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
1453 /// [`Tick`] will introduce asynchrony.
1454 pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
1455 let out_location = Atomic { tick: tick.clone() };
1456 Stream::new(
1457 out_location.clone(),
1458 HydroNode::BeginAtomic {
1459 inner: Box::new(self.ir_node.into_inner()),
1460 metadata: out_location
1461 .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
1462 },
1463 )
1464 }
1465
1466 /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1467 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1468 /// the order of the input. The output stream will execute in the [`Tick`] that was
1469 /// used to create the atomic section.
1470 ///
1471 /// # Non-Determinism
1472 /// The batch boundaries are non-deterministic and may change across executions.
1473 pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
1474 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1475 Stream::new(
1476 tick.clone(),
1477 HydroNode::Batch {
1478 inner: Box::new(self.ir_node.into_inner()),
1479 metadata: tick
1480 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
1481 },
1482 )
1483 }
1484
1485 /// An operator which allows you to "name" a `HydroNode`.
1486 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1487 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
1488 {
1489 let mut node = self.ir_node.borrow_mut();
1490 let metadata = node.metadata_mut();
1491 metadata.tag = Some(name.to_owned());
1492 }
1493 self
1494 }
1495
1496 /// Explicitly "casts" the stream to a type with a different ordering
1497 /// guarantee. Useful in unsafe code where the ordering cannot be proven
1498 /// by the type-system.
1499 ///
1500 /// # Non-Determinism
1501 /// This function is used as an escape hatch, and any mistakes in the
1502 /// provided ordering guarantee will propagate into the guarantees
1503 /// for the rest of the program.
1504 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
1505 if O::ORDERING_KIND == O2::ORDERING_KIND {
1506 Stream::new(self.location, self.ir_node.into_inner())
1507 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1508 // We can always weaken the ordering guarantee
1509 Stream::new(
1510 self.location.clone(),
1511 HydroNode::Cast {
1512 inner: Box::new(self.ir_node.into_inner()),
1513 metadata: self
1514 .location
1515 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1516 },
1517 )
1518 } else {
1519 Stream::new(
1520 self.location.clone(),
1521 HydroNode::ObserveNonDet {
1522 inner: Box::new(self.ir_node.into_inner()),
1523 trusted: false,
1524 metadata: self
1525 .location
1526 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1527 },
1528 )
1529 }
1530 }
1531
1532 // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
1533 // intermediate states will not be revealed
1534 fn assume_ordering_trusted_bounded<O2: Ordering>(
1535 self,
1536 nondet: NonDet,
1537 ) -> Stream<T, L, B, O2, R> {
1538 if B::BOUNDED {
1539 self.assume_ordering_trusted(nondet)
1540 } else {
1541 self.assume_ordering(nondet)
1542 }
1543 }
1544
1545 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1546 // is not observable
1547 pub(crate) fn assume_ordering_trusted<O2: Ordering>(
1548 self,
1549 _nondet: NonDet,
1550 ) -> Stream<T, L, B, O2, R> {
1551 if O::ORDERING_KIND == O2::ORDERING_KIND {
1552 Stream::new(self.location, self.ir_node.into_inner())
1553 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1554 // We can always weaken the ordering guarantee
1555 Stream::new(
1556 self.location.clone(),
1557 HydroNode::Cast {
1558 inner: Box::new(self.ir_node.into_inner()),
1559 metadata: self
1560 .location
1561 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1562 },
1563 )
1564 } else {
1565 Stream::new(
1566 self.location.clone(),
1567 HydroNode::ObserveNonDet {
1568 inner: Box::new(self.ir_node.into_inner()),
1569 trusted: true,
1570 metadata: self
1571 .location
1572 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1573 },
1574 )
1575 }
1576 }
1577
1578 #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
1579 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
1580 /// which is always safe because that is the weakest possible guarantee.
1581 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
1582 self.weaken_ordering::<NoOrder>()
1583 }
1584
1585 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
1586 /// enforcing that `O2` is weaker than the input ordering guarantee.
1587 pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
1588 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
1589 self.assume_ordering::<O2>(nondet)
1590 }
1591
1592 /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
1593 /// implies that `O == TotalOrder`.
1594 pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
1595 where
1596 O: IsOrdered,
1597 {
1598 self.assume_ordering(nondet!(/** no-op */))
1599 }
1600
1601 /// Explicitly "casts" the stream to a type with a different retries
1602 /// guarantee. Useful in unsafe code where the lack of retries cannot
1603 /// be proven by the type-system.
1604 ///
1605 /// # Non-Determinism
1606 /// This function is used as an escape hatch, and any mistakes in the
1607 /// provided retries guarantee will propagate into the guarantees
1608 /// for the rest of the program.
1609 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1610 if R::RETRIES_KIND == R2::RETRIES_KIND {
1611 Stream::new(self.location, self.ir_node.into_inner())
1612 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1613 // We can always weaken the retries guarantee
1614 Stream::new(
1615 self.location.clone(),
1616 HydroNode::Cast {
1617 inner: Box::new(self.ir_node.into_inner()),
1618 metadata: self
1619 .location
1620 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1621 },
1622 )
1623 } else {
1624 Stream::new(
1625 self.location.clone(),
1626 HydroNode::ObserveNonDet {
1627 inner: Box::new(self.ir_node.into_inner()),
1628 trusted: false,
1629 metadata: self
1630 .location
1631 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1632 },
1633 )
1634 }
1635 }
1636
1637 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1638 // is not observable
1639 fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1640 if R::RETRIES_KIND == R2::RETRIES_KIND {
1641 Stream::new(self.location, self.ir_node.into_inner())
1642 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1643 // We can always weaken the retries guarantee
1644 Stream::new(
1645 self.location.clone(),
1646 HydroNode::Cast {
1647 inner: Box::new(self.ir_node.into_inner()),
1648 metadata: self
1649 .location
1650 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1651 },
1652 )
1653 } else {
1654 Stream::new(
1655 self.location.clone(),
1656 HydroNode::ObserveNonDet {
1657 inner: Box::new(self.ir_node.into_inner()),
1658 trusted: true,
1659 metadata: self
1660 .location
1661 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1662 },
1663 )
1664 }
1665 }
1666
1667 #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
1668 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
1669 /// which is always safe because that is the weakest possible guarantee.
1670 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
1671 self.weaken_retries::<AtLeastOnce>()
1672 }
1673
1674 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
1675 /// enforcing that `R2` is weaker than the input retries guarantee.
1676 pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
1677 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1678 self.assume_retries::<R2>(nondet)
1679 }
1680
1681 /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
1682 /// implies that `R == ExactlyOnce`.
1683 pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
1684 where
1685 R: IsExactlyOnce,
1686 {
1687 self.assume_retries(nondet!(/** no-op */))
1688 }
1689
1690 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1691 /// implies that `B == Bounded`.
1692 pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
1693 where
1694 B: IsBounded,
1695 {
1696 Stream::new(self.location, self.ir_node.into_inner())
1697 }
1698}
1699
1700impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
1701where
1702 L: Location<'a>,
1703{
1704 /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
1705 ///
1706 /// # Example
1707 /// ```rust
1708 /// # #[cfg(feature = "deploy")] {
1709 /// # use hydro_lang::prelude::*;
1710 /// # use futures::StreamExt;
1711 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1712 /// process.source_iter(q!(&[1, 2, 3])).cloned()
1713 /// # }, |mut stream| async move {
1714 /// // 1, 2, 3
1715 /// # for w in vec![1, 2, 3] {
1716 /// # assert_eq!(stream.next().await.unwrap(), w);
1717 /// # }
1718 /// # }));
1719 /// # }
1720 /// ```
1721 pub fn cloned(self) -> Stream<T, L, B, O, R>
1722 where
1723 T: Clone,
1724 {
1725 self.map(q!(|d| d.clone()))
1726 }
1727}
1728
1729impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1730where
1731 L: Location<'a>,
1732{
1733 /// Computes the number of elements in the stream as a [`Singleton`].
1734 ///
1735 /// # Example
1736 /// ```rust
1737 /// # #[cfg(feature = "deploy")] {
1738 /// # use hydro_lang::prelude::*;
1739 /// # use futures::StreamExt;
1740 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1741 /// let tick = process.tick();
1742 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1743 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1744 /// batch.count().all_ticks()
1745 /// # }, |mut stream| async move {
1746 /// // 4
1747 /// # assert_eq!(stream.next().await.unwrap(), 4);
1748 /// # }));
1749 /// # }
1750 /// ```
1751 pub fn count(self) -> Singleton<usize, L, B> {
1752 self.assume_ordering_trusted::<TotalOrder>(nondet!(
1753 /// Order does not affect eventual count, and also does not affect intermediate states.
1754 ))
1755 .fold(q!(|| 0usize), q!(|count, _| *count += 1))
1756 }
1757}
1758
1759impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
1760 /// Produces a new stream that interleaves the elements of the two input streams.
1761 /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1762 ///
1763 /// Currently, both input streams must be [`Unbounded`]. When the streams are
1764 /// [`Bounded`], you can use [`Stream::chain`] instead.
1765 ///
1766 /// # Example
1767 /// ```rust
1768 /// # #[cfg(feature = "deploy")] {
1769 /// # use hydro_lang::prelude::*;
1770 /// # use futures::StreamExt;
1771 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1772 /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
1773 /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
1774 /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1775 /// # }, |mut stream| async move {
1776 /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1777 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1778 /// # assert_eq!(stream.next().await.unwrap(), w);
1779 /// # }
1780 /// # }));
1781 /// # }
1782 /// ```
1783 pub fn interleave<O2: Ordering, R2: Retries>(
1784 self,
1785 other: Stream<T, L, Unbounded, O2, R2>,
1786 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1787 where
1788 R: MinRetries<R2>,
1789 {
1790 Stream::new(
1791 self.location.clone(),
1792 HydroNode::Chain {
1793 first: Box::new(self.ir_node.into_inner()),
1794 second: Box::new(other.ir_node.into_inner()),
1795 metadata: self.location.new_node_metadata(Stream::<
1796 T,
1797 L,
1798 Unbounded,
1799 NoOrder,
1800 <R as MinRetries<R2>>::Min,
1801 >::collection_kind()),
1802 },
1803 )
1804 }
1805}
1806
1807impl<'a, T, L: Location<'a> + NoTick, R: Retries> Stream<T, L, Unbounded, TotalOrder, R> {
1808 /// Produces a new stream that combines the elements of the two input streams,
1809 /// preserving the relative order of elements within each input.
1810 ///
1811 /// Currently, both input streams must be [`Unbounded`]. When the streams are
1812 /// [`Bounded`], you can use [`Stream::chain`] instead.
1813 ///
1814 /// # Non-Determinism
1815 /// The order in which elements *across* the two streams will be interleaved is
1816 /// non-deterministic, so the order of elements will vary across runs. If the output order
1817 /// is irrelevant, use [`Stream::interleave`] instead, which is deterministic but emits an
1818 /// unordered stream.
1819 ///
1820 /// # Example
1821 /// ```rust
1822 /// # #[cfg(feature = "deploy")] {
1823 /// # use hydro_lang::prelude::*;
1824 /// # use futures::StreamExt;
1825 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1826 /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
1827 /// # process.source_iter(q!(vec![1, 3])).into();
1828 /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
1829 /// # }, |mut stream| async move {
1830 /// // 1, 3 and 2, 4 in some order, preserving the original local order
1831 /// # for w in vec![1, 3, 2, 4] {
1832 /// # assert_eq!(stream.next().await.unwrap(), w);
1833 /// # }
1834 /// # }));
1835 /// # }
1836 /// ```
1837 pub fn merge_ordered<R2: Retries>(
1838 self,
1839 other: Stream<T, L, Unbounded, TotalOrder, R2>,
1840 _nondet: NonDet,
1841 ) -> Stream<T, L, Unbounded, TotalOrder, <R as MinRetries<R2>>::Min>
1842 where
1843 R: MinRetries<R2>,
1844 {
1845 Stream::new(
1846 self.location.clone(),
1847 HydroNode::Chain {
1848 first: Box::new(self.ir_node.into_inner()),
1849 second: Box::new(other.ir_node.into_inner()),
1850 metadata: self.location.new_node_metadata(Stream::<
1851 T,
1852 L,
1853 Unbounded,
1854 TotalOrder,
1855 <R as MinRetries<R2>>::Min,
1856 >::collection_kind()),
1857 },
1858 )
1859 }
1860}
1861
1862impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
1863where
1864 L: Location<'a>,
1865{
1866 /// Produces a new stream that emits the input elements in sorted order.
1867 ///
1868 /// The input stream can have any ordering guarantee, but the output stream
1869 /// will have a [`TotalOrder`] guarantee. This operator will block until all
1870 /// elements in the input stream are available, so it requires the input stream
1871 /// to be [`Bounded`].
1872 ///
1873 /// # Example
1874 /// ```rust
1875 /// # #[cfg(feature = "deploy")] {
1876 /// # use hydro_lang::prelude::*;
1877 /// # use futures::StreamExt;
1878 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1879 /// let tick = process.tick();
1880 /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1881 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1882 /// batch.sort().all_ticks()
1883 /// # }, |mut stream| async move {
1884 /// // 1, 2, 3, 4
1885 /// # for w in (1..5) {
1886 /// # assert_eq!(stream.next().await.unwrap(), w);
1887 /// # }
1888 /// # }));
1889 /// # }
1890 /// ```
1891 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1892 where
1893 B: IsBounded,
1894 T: Ord,
1895 {
1896 let this = self.make_bounded();
1897 Stream::new(
1898 this.location.clone(),
1899 HydroNode::Sort {
1900 input: Box::new(this.ir_node.into_inner()),
1901 metadata: this
1902 .location
1903 .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
1904 },
1905 )
1906 }
1907
1908 /// Produces a new stream that first emits the elements of the `self` stream,
1909 /// and then emits the elements of the `other` stream. The output stream has
1910 /// a [`TotalOrder`] guarantee if and only if both input streams have a
1911 /// [`TotalOrder`] guarantee.
1912 ///
1913 /// Currently, both input streams must be [`Bounded`]. This operator will block
1914 /// on the first stream until all its elements are available. In a future version,
1915 /// we will relax the requirement on the `other` stream.
1916 ///
1917 /// # Example
1918 /// ```rust
1919 /// # #[cfg(feature = "deploy")] {
1920 /// # use hydro_lang::prelude::*;
1921 /// # use futures::StreamExt;
1922 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1923 /// let tick = process.tick();
1924 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1925 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1926 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1927 /// # }, |mut stream| async move {
1928 /// // 2, 3, 4, 5, 1, 2, 3, 4
1929 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1930 /// # assert_eq!(stream.next().await.unwrap(), w);
1931 /// # }
1932 /// # }));
1933 /// # }
1934 /// ```
1935 pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
1936 self,
1937 other: Stream<T, L, B2, O2, R2>,
1938 ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
1939 where
1940 B: IsBounded,
1941 O: MinOrder<O2>,
1942 R: MinRetries<R2>,
1943 {
1944 check_matching_location(&self.location, &other.location);
1945
1946 Stream::new(
1947 self.location.clone(),
1948 HydroNode::Chain {
1949 first: Box::new(self.ir_node.into_inner()),
1950 second: Box::new(other.ir_node.into_inner()),
1951 metadata: self.location.new_node_metadata(Stream::<
1952 T,
1953 L,
1954 B2,
1955 <O as MinOrder<O2>>::Min,
1956 <R as MinRetries<R2>>::Min,
1957 >::collection_kind()),
1958 },
1959 )
1960 }
1961
1962 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
1963 /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
1964 /// because this is compiled into a nested loop.
1965 pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
1966 self,
1967 other: Stream<T2, L, Bounded, O2, R>,
1968 ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
1969 where
1970 B: IsBounded,
1971 T: Clone,
1972 T2: Clone,
1973 {
1974 let this = self.make_bounded();
1975 check_matching_location(&this.location, &other.location);
1976
1977 Stream::new(
1978 this.location.clone(),
1979 HydroNode::CrossProduct {
1980 left: Box::new(this.ir_node.into_inner()),
1981 right: Box::new(other.ir_node.into_inner()),
1982 metadata: this.location.new_node_metadata(Stream::<
1983 (T, T2),
1984 L,
1985 Bounded,
1986 <O2 as MinOrder<O>>::Min,
1987 R,
1988 >::collection_kind()),
1989 },
1990 )
1991 }
1992
1993 /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
1994 /// `self` used as the values for *each* key.
1995 ///
1996 /// This is helpful when "broadcasting" a set of values so that all the keys have the same
1997 /// values. For example, it can be used to send the same set of elements to several cluster
1998 /// members, if the membership information is available as a [`KeyedSingleton`].
1999 ///
2000 /// # Example
2001 /// ```rust
2002 /// # #[cfg(feature = "deploy")] {
2003 /// # use hydro_lang::prelude::*;
2004 /// # use futures::StreamExt;
2005 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2006 /// # let tick = process.tick();
2007 /// let keyed_singleton = // { 1: (), 2: () }
2008 /// # process
2009 /// # .source_iter(q!(vec![(1, ()), (2, ())]))
2010 /// # .into_keyed()
2011 /// # .batch(&tick, nondet!(/** test */))
2012 /// # .first();
2013 /// let stream = // [ "a", "b" ]
2014 /// # process
2015 /// # .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2016 /// # .batch(&tick, nondet!(/** test */));
2017 /// stream.repeat_with_keys(keyed_singleton)
2018 /// # .entries().all_ticks()
2019 /// # }, |mut stream| async move {
2020 /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2021 /// # let mut results = Vec::new();
2022 /// # for _ in 0..4 {
2023 /// # results.push(stream.next().await.unwrap());
2024 /// # }
2025 /// # results.sort();
2026 /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2027 /// # }));
2028 /// # }
2029 /// ```
2030 pub fn repeat_with_keys<K, V2>(
2031 self,
2032 keys: KeyedSingleton<K, V2, L, Bounded>,
2033 ) -> KeyedStream<K, T, L, Bounded, O, R>
2034 where
2035 B: IsBounded,
2036 K: Clone,
2037 T: Clone,
2038 {
2039 keys.keys()
2040 .weaken_retries()
2041 .assume_ordering_trusted::<TotalOrder>(
2042 nondet!(/** keyed stream does not depend on ordering of keys */),
2043 )
2044 .cross_product_nested_loop(self.make_bounded())
2045 .into_keyed()
2046 }
2047}
2048
2049impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2050where
2051 L: Location<'a>,
2052{
2053 #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2054 /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2055 /// by equi-joining the two streams on the key attribute `K`.
2056 ///
2057 /// # Example
2058 /// ```rust
2059 /// # #[cfg(feature = "deploy")] {
2060 /// # use hydro_lang::prelude::*;
2061 /// # use std::collections::HashSet;
2062 /// # use futures::StreamExt;
2063 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2064 /// let tick = process.tick();
2065 /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2066 /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2067 /// stream1.join(stream2)
2068 /// # }, |mut stream| async move {
2069 /// // (1, ('a', 'x')), (2, ('b', 'y'))
2070 /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2071 /// # stream.map(|i| assert!(expected.contains(&i)));
2072 /// # }));
2073 /// # }
2074 pub fn join<V2, O2: Ordering, R2: Retries>(
2075 self,
2076 n: Stream<(K, V2), L, B, O2, R2>,
2077 ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
2078 where
2079 K: Eq + Hash,
2080 R: MinRetries<R2>,
2081 {
2082 check_matching_location(&self.location, &n.location);
2083
2084 Stream::new(
2085 self.location.clone(),
2086 HydroNode::Join {
2087 left: Box::new(self.ir_node.into_inner()),
2088 right: Box::new(n.ir_node.into_inner()),
2089 metadata: self.location.new_node_metadata(Stream::<
2090 (K, (V1, V2)),
2091 L,
2092 B,
2093 NoOrder,
2094 <R as MinRetries<R2>>::Min,
2095 >::collection_kind()),
2096 },
2097 )
2098 }
2099
2100 /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2101 /// computes the anti-join of the items in the input -- i.e. returns
2102 /// unique items in the first input that do not have a matching key
2103 /// in the second input.
2104 ///
2105 /// # Example
2106 /// ```rust
2107 /// # #[cfg(feature = "deploy")] {
2108 /// # use hydro_lang::prelude::*;
2109 /// # use futures::StreamExt;
2110 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2111 /// let tick = process.tick();
2112 /// let stream = process
2113 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2114 /// .batch(&tick, nondet!(/** test */));
2115 /// let batch = process
2116 /// .source_iter(q!(vec![1, 2]))
2117 /// .batch(&tick, nondet!(/** test */));
2118 /// stream.anti_join(batch).all_ticks()
2119 /// # }, |mut stream| async move {
2120 /// # for w in vec![(3, 'c'), (4, 'd')] {
2121 /// # assert_eq!(stream.next().await.unwrap(), w);
2122 /// # }
2123 /// # }));
2124 /// # }
2125 pub fn anti_join<O2: Ordering, R2: Retries>(
2126 self,
2127 n: Stream<K, L, Bounded, O2, R2>,
2128 ) -> Stream<(K, V1), L, B, O, R>
2129 where
2130 K: Eq + Hash,
2131 {
2132 check_matching_location(&self.location, &n.location);
2133
2134 Stream::new(
2135 self.location.clone(),
2136 HydroNode::AntiJoin {
2137 pos: Box::new(self.ir_node.into_inner()),
2138 neg: Box::new(n.ir_node.into_inner()),
2139 metadata: self
2140 .location
2141 .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2142 },
2143 )
2144 }
2145}
2146
2147impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2148 Stream<(K, V), L, B, O, R>
2149{
2150 /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2151 /// is used as the key and the second element is added to the entries associated with that key.
2152 ///
2153 /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2154 /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2155 /// performing grouped aggregations, but also for more precise ordering guarantees such as
2156 /// total ordering _within_ each group but no ordering _across_ groups.
2157 ///
2158 /// # Example
2159 /// ```rust
2160 /// # #[cfg(feature = "deploy")] {
2161 /// # use hydro_lang::prelude::*;
2162 /// # use futures::StreamExt;
2163 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2164 /// process
2165 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2166 /// .into_keyed()
2167 /// # .entries()
2168 /// # }, |mut stream| async move {
2169 /// // { 1: [2, 3], 2: [4] }
2170 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2171 /// # assert_eq!(stream.next().await.unwrap(), w);
2172 /// # }
2173 /// # }));
2174 /// # }
2175 /// ```
2176 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2177 KeyedStream::new(
2178 self.location.clone(),
2179 HydroNode::Cast {
2180 inner: Box::new(self.ir_node.into_inner()),
2181 metadata: self
2182 .location
2183 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2184 },
2185 )
2186 }
2187}
2188
2189impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2190where
2191 K: Eq + Hash,
2192 L: Location<'a>,
2193{
2194 /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2195 /// # Example
2196 /// ```rust
2197 /// # #[cfg(feature = "deploy")] {
2198 /// # use hydro_lang::prelude::*;
2199 /// # use futures::StreamExt;
2200 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2201 /// let tick = process.tick();
2202 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2203 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2204 /// batch.keys().all_ticks()
2205 /// # }, |mut stream| async move {
2206 /// // 1, 2
2207 /// # assert_eq!(stream.next().await.unwrap(), 1);
2208 /// # assert_eq!(stream.next().await.unwrap(), 2);
2209 /// # }));
2210 /// # }
2211 /// ```
2212 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2213 self.into_keyed()
2214 .fold(
2215 q!(|| ()),
2216 q!(
2217 |_, _| {},
2218 commutative = ManualProof(/* values are ignored */),
2219 idempotent = ManualProof(/* values are ignored */)
2220 ),
2221 )
2222 .keys()
2223 }
2224}
2225
2226impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2227where
2228 L: Location<'a> + NoTick,
2229{
2230 /// Returns a stream corresponding to the latest batch of elements being atomically
2231 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2232 /// the order of the input.
2233 ///
2234 /// # Non-Determinism
2235 /// The batch boundaries are non-deterministic and may change across executions.
2236 pub fn batch_atomic(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2237 Stream::new(
2238 self.location.clone().tick,
2239 HydroNode::Batch {
2240 inner: Box::new(self.ir_node.into_inner()),
2241 metadata: self
2242 .location
2243 .tick
2244 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2245 },
2246 )
2247 }
2248
2249 /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2250 /// See [`Stream::atomic`] for more details.
2251 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2252 Stream::new(
2253 self.location.tick.l.clone(),
2254 HydroNode::EndAtomic {
2255 inner: Box::new(self.ir_node.into_inner()),
2256 metadata: self
2257 .location
2258 .tick
2259 .l
2260 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2261 },
2262 )
2263 }
2264}
2265
2266impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2267where
2268 L: Location<'a> + NoTick + NoAtomic,
2269 F: Future<Output = T>,
2270{
2271 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2272 /// Future outputs are produced as available, regardless of input arrival order.
2273 ///
2274 /// # Example
2275 /// ```rust
2276 /// # #[cfg(feature = "deploy")] {
2277 /// # use std::collections::HashSet;
2278 /// # use futures::StreamExt;
2279 /// # use hydro_lang::prelude::*;
2280 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2281 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2282 /// .map(q!(|x| async move {
2283 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2284 /// x
2285 /// }))
2286 /// .resolve_futures()
2287 /// # },
2288 /// # |mut stream| async move {
2289 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2290 /// # let mut output = HashSet::new();
2291 /// # for _ in 1..10 {
2292 /// # output.insert(stream.next().await.unwrap());
2293 /// # }
2294 /// # assert_eq!(
2295 /// # output,
2296 /// # HashSet::<i32>::from_iter(1..10)
2297 /// # );
2298 /// # },
2299 /// # ));
2300 /// # }
2301 pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
2302 Stream::new(
2303 self.location.clone(),
2304 HydroNode::ResolveFutures {
2305 input: Box::new(self.ir_node.into_inner()),
2306 metadata: self
2307 .location
2308 .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
2309 },
2310 )
2311 }
2312
2313 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2314 /// Future outputs are produced in the same order as the input stream.
2315 ///
2316 /// # Example
2317 /// ```rust
2318 /// # #[cfg(feature = "deploy")] {
2319 /// # use std::collections::HashSet;
2320 /// # use futures::StreamExt;
2321 /// # use hydro_lang::prelude::*;
2322 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2323 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2324 /// .map(q!(|x| async move {
2325 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2326 /// x
2327 /// }))
2328 /// .resolve_futures_ordered()
2329 /// # },
2330 /// # |mut stream| async move {
2331 /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2332 /// # let mut output = Vec::new();
2333 /// # for _ in 1..10 {
2334 /// # output.push(stream.next().await.unwrap());
2335 /// # }
2336 /// # assert_eq!(
2337 /// # output,
2338 /// # vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2339 /// # );
2340 /// # },
2341 /// # ));
2342 /// # }
2343 pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
2344 Stream::new(
2345 self.location.clone(),
2346 HydroNode::ResolveFuturesOrdered {
2347 input: Box::new(self.ir_node.into_inner()),
2348 metadata: self
2349 .location
2350 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2351 },
2352 )
2353 }
2354}
2355
2356impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2357where
2358 L: Location<'a>,
2359{
2360 /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2361 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2362 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2363 Stream::new(
2364 self.location.outer().clone(),
2365 HydroNode::YieldConcat {
2366 inner: Box::new(self.ir_node.into_inner()),
2367 metadata: self
2368 .location
2369 .outer()
2370 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2371 },
2372 )
2373 }
2374
2375 /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2376 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2377 ///
2378 /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2379 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2380 /// stream's [`Tick`] context.
2381 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2382 let out_location = Atomic {
2383 tick: self.location.clone(),
2384 };
2385
2386 Stream::new(
2387 out_location.clone(),
2388 HydroNode::YieldConcat {
2389 inner: Box::new(self.ir_node.into_inner()),
2390 metadata: out_location
2391 .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2392 },
2393 )
2394 }
2395
2396 /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
2397 /// such as `fold` retrain their memory across ticks rather than resetting across batches of
2398 /// input.
2399 ///
2400 /// This API is particularly useful for stateful computation on batches of data, such as
2401 /// maintaining an accumulated state that is up to date with the current batch.
2402 ///
2403 /// # Example
2404 /// ```rust
2405 /// # #[cfg(feature = "deploy")] {
2406 /// # use hydro_lang::prelude::*;
2407 /// # use futures::StreamExt;
2408 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2409 /// let tick = process.tick();
2410 /// # // ticks are lazy by default, forces the second tick to run
2411 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2412 /// # let batch_first_tick = process
2413 /// # .source_iter(q!(vec![1, 2, 3, 4]))
2414 /// # .batch(&tick, nondet!(/** test */));
2415 /// # let batch_second_tick = process
2416 /// # .source_iter(q!(vec![5, 6, 7]))
2417 /// # .batch(&tick, nondet!(/** test */))
2418 /// # .defer_tick(); // appears on the second tick
2419 /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
2420 /// # batch_first_tick.chain(batch_second_tick).all_ticks();
2421 ///
2422 /// input.batch(&tick, nondet!(/** test */))
2423 /// .across_ticks(|s| s.count()).all_ticks()
2424 /// # }, |mut stream| async move {
2425 /// // [4, 7]
2426 /// assert_eq!(stream.next().await.unwrap(), 4);
2427 /// assert_eq!(stream.next().await.unwrap(), 7);
2428 /// # }));
2429 /// # }
2430 /// ```
2431 pub fn across_ticks<Out: BatchAtomic>(
2432 self,
2433 thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
2434 ) -> Out::Batched {
2435 thunk(self.all_ticks_atomic()).batched_atomic()
2436 }
2437
2438 /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2439 /// always has the elements of `self` at tick `T - 1`.
2440 ///
2441 /// At tick `0`, the output stream is empty, since there is no previous tick.
2442 ///
2443 /// This operator enables stateful iterative processing with ticks, by sending data from one
2444 /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2445 ///
2446 /// # Example
2447 /// ```rust
2448 /// # #[cfg(feature = "deploy")] {
2449 /// # use hydro_lang::prelude::*;
2450 /// # use futures::StreamExt;
2451 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2452 /// let tick = process.tick();
2453 /// // ticks are lazy by default, forces the second tick to run
2454 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2455 ///
2456 /// let batch_first_tick = process
2457 /// .source_iter(q!(vec![1, 2, 3, 4]))
2458 /// .batch(&tick, nondet!(/** test */));
2459 /// let batch_second_tick = process
2460 /// .source_iter(q!(vec![0, 3, 4, 5, 6]))
2461 /// .batch(&tick, nondet!(/** test */))
2462 /// .defer_tick(); // appears on the second tick
2463 /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2464 ///
2465 /// changes_across_ticks.clone().filter_not_in(
2466 /// changes_across_ticks.defer_tick() // the elements from the previous tick
2467 /// ).all_ticks()
2468 /// # }, |mut stream| async move {
2469 /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2470 /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2471 /// # assert_eq!(stream.next().await.unwrap(), w);
2472 /// # }
2473 /// # }));
2474 /// # }
2475 /// ```
2476 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2477 Stream::new(
2478 self.location.clone(),
2479 HydroNode::DeferTick {
2480 input: Box::new(self.ir_node.into_inner()),
2481 metadata: self
2482 .location
2483 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2484 },
2485 )
2486 }
2487}
2488
2489#[cfg(test)]
2490mod tests {
2491 #[cfg(feature = "deploy")]
2492 use futures::{SinkExt, StreamExt};
2493 #[cfg(feature = "deploy")]
2494 use hydro_deploy::Deployment;
2495 #[cfg(feature = "deploy")]
2496 use serde::{Deserialize, Serialize};
2497 #[cfg(any(feature = "deploy", feature = "sim"))]
2498 use stageleft::q;
2499
2500 #[cfg(any(feature = "deploy", feature = "sim"))]
2501 use crate::compile::builder::FlowBuilder;
2502 #[cfg(feature = "deploy")]
2503 use crate::live_collections::sliced::sliced;
2504 #[cfg(feature = "deploy")]
2505 use crate::live_collections::stream::ExactlyOnce;
2506 #[cfg(feature = "sim")]
2507 use crate::live_collections::stream::NoOrder;
2508 #[cfg(any(feature = "deploy", feature = "sim"))]
2509 use crate::live_collections::stream::TotalOrder;
2510 #[cfg(any(feature = "deploy", feature = "sim"))]
2511 use crate::location::Location;
2512 #[cfg(any(feature = "deploy", feature = "sim"))]
2513 use crate::nondet::nondet;
2514
2515 mod backtrace_chained_ops;
2516
2517 #[cfg(feature = "deploy")]
2518 struct P1 {}
2519 #[cfg(feature = "deploy")]
2520 struct P2 {}
2521
2522 #[cfg(feature = "deploy")]
2523 #[derive(Serialize, Deserialize, Debug)]
2524 struct SendOverNetwork {
2525 n: u32,
2526 }
2527
2528 #[cfg(feature = "deploy")]
2529 #[tokio::test]
2530 async fn first_ten_distributed() {
2531 use crate::networking::TCP;
2532
2533 let mut deployment = Deployment::new();
2534
2535 let mut flow = FlowBuilder::new();
2536 let first_node = flow.process::<P1>();
2537 let second_node = flow.process::<P2>();
2538 let external = flow.external::<P2>();
2539
2540 let numbers = first_node.source_iter(q!(0..10));
2541 let out_port = numbers
2542 .map(q!(|n| SendOverNetwork { n }))
2543 .send(&second_node, TCP.fail_stop().bincode())
2544 .send_bincode_external(&external);
2545
2546 let nodes = flow
2547 .with_process(&first_node, deployment.Localhost())
2548 .with_process(&second_node, deployment.Localhost())
2549 .with_external(&external, deployment.Localhost())
2550 .deploy(&mut deployment);
2551
2552 deployment.deploy().await.unwrap();
2553
2554 let mut external_out = nodes.connect(out_port).await;
2555
2556 deployment.start().await.unwrap();
2557
2558 for i in 0..10 {
2559 assert_eq!(external_out.next().await.unwrap().n, i);
2560 }
2561 }
2562
2563 #[cfg(feature = "deploy")]
2564 #[tokio::test]
2565 async fn first_cardinality() {
2566 let mut deployment = Deployment::new();
2567
2568 let mut flow = FlowBuilder::new();
2569 let node = flow.process::<()>();
2570 let external = flow.external::<()>();
2571
2572 let node_tick = node.tick();
2573 let count = node_tick
2574 .singleton(q!([1, 2, 3]))
2575 .into_stream()
2576 .flatten_ordered()
2577 .first()
2578 .into_stream()
2579 .count()
2580 .all_ticks()
2581 .send_bincode_external(&external);
2582
2583 let nodes = flow
2584 .with_process(&node, deployment.Localhost())
2585 .with_external(&external, deployment.Localhost())
2586 .deploy(&mut deployment);
2587
2588 deployment.deploy().await.unwrap();
2589
2590 let mut external_out = nodes.connect(count).await;
2591
2592 deployment.start().await.unwrap();
2593
2594 assert_eq!(external_out.next().await.unwrap(), 1);
2595 }
2596
2597 #[cfg(feature = "deploy")]
2598 #[tokio::test]
2599 async fn unbounded_reduce_remembers_state() {
2600 let mut deployment = Deployment::new();
2601
2602 let mut flow = FlowBuilder::new();
2603 let node = flow.process::<()>();
2604 let external = flow.external::<()>();
2605
2606 let (input_port, input) = node.source_external_bincode(&external);
2607 let out = input
2608 .reduce(q!(|acc, v| *acc += v))
2609 .sample_eager(nondet!(/** test */))
2610 .send_bincode_external(&external);
2611
2612 let nodes = flow
2613 .with_process(&node, deployment.Localhost())
2614 .with_external(&external, deployment.Localhost())
2615 .deploy(&mut deployment);
2616
2617 deployment.deploy().await.unwrap();
2618
2619 let mut external_in = nodes.connect(input_port).await;
2620 let mut external_out = nodes.connect(out).await;
2621
2622 deployment.start().await.unwrap();
2623
2624 external_in.send(1).await.unwrap();
2625 assert_eq!(external_out.next().await.unwrap(), 1);
2626
2627 external_in.send(2).await.unwrap();
2628 assert_eq!(external_out.next().await.unwrap(), 3);
2629 }
2630
2631 #[cfg(feature = "deploy")]
2632 #[tokio::test]
2633 async fn top_level_bounded_cross_singleton() {
2634 let mut deployment = Deployment::new();
2635
2636 let mut flow = FlowBuilder::new();
2637 let node = flow.process::<()>();
2638 let external = flow.external::<()>();
2639
2640 let (input_port, input) =
2641 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2642
2643 let out = input
2644 .cross_singleton(
2645 node.source_iter(q!(vec![1, 2, 3]))
2646 .fold(q!(|| 0), q!(|acc, v| *acc += v)),
2647 )
2648 .send_bincode_external(&external);
2649
2650 let nodes = flow
2651 .with_process(&node, deployment.Localhost())
2652 .with_external(&external, deployment.Localhost())
2653 .deploy(&mut deployment);
2654
2655 deployment.deploy().await.unwrap();
2656
2657 let mut external_in = nodes.connect(input_port).await;
2658 let mut external_out = nodes.connect(out).await;
2659
2660 deployment.start().await.unwrap();
2661
2662 external_in.send(1).await.unwrap();
2663 assert_eq!(external_out.next().await.unwrap(), (1, 6));
2664
2665 external_in.send(2).await.unwrap();
2666 assert_eq!(external_out.next().await.unwrap(), (2, 6));
2667 }
2668
2669 #[cfg(feature = "deploy")]
2670 #[tokio::test]
2671 async fn top_level_bounded_reduce_cardinality() {
2672 let mut deployment = Deployment::new();
2673
2674 let mut flow = FlowBuilder::new();
2675 let node = flow.process::<()>();
2676 let external = flow.external::<()>();
2677
2678 let (input_port, input) =
2679 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2680
2681 let out = sliced! {
2682 let input = use(input, nondet!(/** test */));
2683 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
2684 input.cross_singleton(v.into_stream().count())
2685 }
2686 .send_bincode_external(&external);
2687
2688 let nodes = flow
2689 .with_process(&node, deployment.Localhost())
2690 .with_external(&external, deployment.Localhost())
2691 .deploy(&mut deployment);
2692
2693 deployment.deploy().await.unwrap();
2694
2695 let mut external_in = nodes.connect(input_port).await;
2696 let mut external_out = nodes.connect(out).await;
2697
2698 deployment.start().await.unwrap();
2699
2700 external_in.send(1).await.unwrap();
2701 assert_eq!(external_out.next().await.unwrap(), (1, 1));
2702
2703 external_in.send(2).await.unwrap();
2704 assert_eq!(external_out.next().await.unwrap(), (2, 1));
2705 }
2706
2707 #[cfg(feature = "deploy")]
2708 #[tokio::test]
2709 async fn top_level_bounded_into_singleton_cardinality() {
2710 let mut deployment = Deployment::new();
2711
2712 let mut flow = FlowBuilder::new();
2713 let node = flow.process::<()>();
2714 let external = flow.external::<()>();
2715
2716 let (input_port, input) =
2717 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2718
2719 let out = sliced! {
2720 let input = use(input, nondet!(/** test */));
2721 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
2722 input.cross_singleton(v.into_stream().count())
2723 }
2724 .send_bincode_external(&external);
2725
2726 let nodes = flow
2727 .with_process(&node, deployment.Localhost())
2728 .with_external(&external, deployment.Localhost())
2729 .deploy(&mut deployment);
2730
2731 deployment.deploy().await.unwrap();
2732
2733 let mut external_in = nodes.connect(input_port).await;
2734 let mut external_out = nodes.connect(out).await;
2735
2736 deployment.start().await.unwrap();
2737
2738 external_in.send(1).await.unwrap();
2739 assert_eq!(external_out.next().await.unwrap(), (1, 1));
2740
2741 external_in.send(2).await.unwrap();
2742 assert_eq!(external_out.next().await.unwrap(), (2, 1));
2743 }
2744
2745 #[cfg(feature = "deploy")]
2746 #[tokio::test]
2747 async fn atomic_fold_replays_each_tick() {
2748 let mut deployment = Deployment::new();
2749
2750 let mut flow = FlowBuilder::new();
2751 let node = flow.process::<()>();
2752 let external = flow.external::<()>();
2753
2754 let (input_port, input) =
2755 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2756 let tick = node.tick();
2757
2758 let out = input
2759 .batch(&tick, nondet!(/** test */))
2760 .cross_singleton(
2761 node.source_iter(q!(vec![1, 2, 3]))
2762 .atomic(&tick)
2763 .fold(q!(|| 0), q!(|acc, v| *acc += v))
2764 .snapshot_atomic(nondet!(/** test */)),
2765 )
2766 .all_ticks()
2767 .send_bincode_external(&external);
2768
2769 let nodes = flow
2770 .with_process(&node, deployment.Localhost())
2771 .with_external(&external, deployment.Localhost())
2772 .deploy(&mut deployment);
2773
2774 deployment.deploy().await.unwrap();
2775
2776 let mut external_in = nodes.connect(input_port).await;
2777 let mut external_out = nodes.connect(out).await;
2778
2779 deployment.start().await.unwrap();
2780
2781 external_in.send(1).await.unwrap();
2782 assert_eq!(external_out.next().await.unwrap(), (1, 6));
2783
2784 external_in.send(2).await.unwrap();
2785 assert_eq!(external_out.next().await.unwrap(), (2, 6));
2786 }
2787
2788 #[cfg(feature = "deploy")]
2789 #[tokio::test]
2790 async fn unbounded_scan_remembers_state() {
2791 let mut deployment = Deployment::new();
2792
2793 let mut flow = FlowBuilder::new();
2794 let node = flow.process::<()>();
2795 let external = flow.external::<()>();
2796
2797 let (input_port, input) = node.source_external_bincode(&external);
2798 let out = input
2799 .scan(
2800 q!(|| 0),
2801 q!(|acc, v| {
2802 *acc += v;
2803 Some(*acc)
2804 }),
2805 )
2806 .send_bincode_external(&external);
2807
2808 let nodes = flow
2809 .with_process(&node, deployment.Localhost())
2810 .with_external(&external, deployment.Localhost())
2811 .deploy(&mut deployment);
2812
2813 deployment.deploy().await.unwrap();
2814
2815 let mut external_in = nodes.connect(input_port).await;
2816 let mut external_out = nodes.connect(out).await;
2817
2818 deployment.start().await.unwrap();
2819
2820 external_in.send(1).await.unwrap();
2821 assert_eq!(external_out.next().await.unwrap(), 1);
2822
2823 external_in.send(2).await.unwrap();
2824 assert_eq!(external_out.next().await.unwrap(), 3);
2825 }
2826
2827 #[cfg(feature = "deploy")]
2828 #[tokio::test]
2829 async fn unbounded_enumerate_remembers_state() {
2830 let mut deployment = Deployment::new();
2831
2832 let mut flow = FlowBuilder::new();
2833 let node = flow.process::<()>();
2834 let external = flow.external::<()>();
2835
2836 let (input_port, input) = node.source_external_bincode(&external);
2837 let out = input.enumerate().send_bincode_external(&external);
2838
2839 let nodes = flow
2840 .with_process(&node, deployment.Localhost())
2841 .with_external(&external, deployment.Localhost())
2842 .deploy(&mut deployment);
2843
2844 deployment.deploy().await.unwrap();
2845
2846 let mut external_in = nodes.connect(input_port).await;
2847 let mut external_out = nodes.connect(out).await;
2848
2849 deployment.start().await.unwrap();
2850
2851 external_in.send(1).await.unwrap();
2852 assert_eq!(external_out.next().await.unwrap(), (0, 1));
2853
2854 external_in.send(2).await.unwrap();
2855 assert_eq!(external_out.next().await.unwrap(), (1, 2));
2856 }
2857
2858 #[cfg(feature = "deploy")]
2859 #[tokio::test]
2860 async fn unbounded_unique_remembers_state() {
2861 let mut deployment = Deployment::new();
2862
2863 let mut flow = FlowBuilder::new();
2864 let node = flow.process::<()>();
2865 let external = flow.external::<()>();
2866
2867 let (input_port, input) =
2868 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2869 let out = input.unique().send_bincode_external(&external);
2870
2871 let nodes = flow
2872 .with_process(&node, deployment.Localhost())
2873 .with_external(&external, deployment.Localhost())
2874 .deploy(&mut deployment);
2875
2876 deployment.deploy().await.unwrap();
2877
2878 let mut external_in = nodes.connect(input_port).await;
2879 let mut external_out = nodes.connect(out).await;
2880
2881 deployment.start().await.unwrap();
2882
2883 external_in.send(1).await.unwrap();
2884 assert_eq!(external_out.next().await.unwrap(), 1);
2885
2886 external_in.send(2).await.unwrap();
2887 assert_eq!(external_out.next().await.unwrap(), 2);
2888
2889 external_in.send(1).await.unwrap();
2890 external_in.send(3).await.unwrap();
2891 assert_eq!(external_out.next().await.unwrap(), 3);
2892 }
2893
2894 #[cfg(feature = "sim")]
2895 #[test]
2896 #[should_panic]
2897 fn sim_batch_nondet_size() {
2898 let mut flow = FlowBuilder::new();
2899 let node = flow.process::<()>();
2900
2901 let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
2902
2903 let tick = node.tick();
2904 let out_recv = input
2905 .batch(&tick, nondet!(/** test */))
2906 .count()
2907 .all_ticks()
2908 .sim_output();
2909
2910 flow.sim().exhaustive(async || {
2911 in_send.send(());
2912 in_send.send(());
2913 in_send.send(());
2914
2915 assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
2916 });
2917 }
2918
2919 #[cfg(feature = "sim")]
2920 #[test]
2921 fn sim_batch_preserves_order() {
2922 let mut flow = FlowBuilder::new();
2923 let node = flow.process::<()>();
2924
2925 let (in_send, input) = node.sim_input();
2926
2927 let tick = node.tick();
2928 let out_recv = input
2929 .batch(&tick, nondet!(/** test */))
2930 .all_ticks()
2931 .sim_output();
2932
2933 flow.sim().exhaustive(async || {
2934 in_send.send(1);
2935 in_send.send(2);
2936 in_send.send(3);
2937
2938 out_recv.assert_yields_only([1, 2, 3]).await;
2939 });
2940 }
2941
2942 #[cfg(feature = "sim")]
2943 #[test]
2944 #[should_panic]
2945 fn sim_batch_unordered_shuffles() {
2946 let mut flow = FlowBuilder::new();
2947 let node = flow.process::<()>();
2948
2949 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2950
2951 let tick = node.tick();
2952 let batch = input.batch(&tick, nondet!(/** test */));
2953 let out_recv = batch
2954 .clone()
2955 .min()
2956 .zip(batch.max())
2957 .all_ticks()
2958 .sim_output();
2959
2960 flow.sim().exhaustive(async || {
2961 in_send.send_many_unordered([1, 2, 3]);
2962
2963 if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
2964 panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
2965 }
2966 });
2967 }
2968
2969 #[cfg(feature = "sim")]
2970 #[test]
2971 fn sim_batch_unordered_shuffles_count() {
2972 let mut flow = FlowBuilder::new();
2973 let node = flow.process::<()>();
2974
2975 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
2976
2977 let tick = node.tick();
2978 let batch = input.batch(&tick, nondet!(/** test */));
2979 let out_recv = batch.all_ticks().sim_output();
2980
2981 let instance_count = flow.sim().exhaustive(async || {
2982 in_send.send_many_unordered([1, 2, 3, 4]);
2983 out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
2984 });
2985
2986 assert_eq!(
2987 instance_count,
2988 75 // ∑ (k=1 to 4) S(4,k) × k! = 75
2989 )
2990 }
2991
2992 #[cfg(feature = "sim")]
2993 #[test]
2994 #[should_panic]
2995 fn sim_observe_order_batched() {
2996 let mut flow = FlowBuilder::new();
2997 let node = flow.process::<()>();
2998
2999 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3000
3001 let tick = node.tick();
3002 let batch = input.batch(&tick, nondet!(/** test */));
3003 let out_recv = batch
3004 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3005 .all_ticks()
3006 .sim_output();
3007
3008 flow.sim().exhaustive(async || {
3009 in_send.send_many_unordered([1, 2, 3, 4]);
3010 out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3011 });
3012 }
3013
3014 #[cfg(feature = "sim")]
3015 #[test]
3016 fn sim_observe_order_batched_count() {
3017 let mut flow = FlowBuilder::new();
3018 let node = flow.process::<()>();
3019
3020 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3021
3022 let tick = node.tick();
3023 let batch = input.batch(&tick, nondet!(/** test */));
3024 let out_recv = batch
3025 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3026 .all_ticks()
3027 .sim_output();
3028
3029 let instance_count = flow.sim().exhaustive(async || {
3030 in_send.send_many_unordered([1, 2, 3, 4]);
3031 let _ = out_recv.collect::<Vec<_>>().await;
3032 });
3033
3034 assert_eq!(
3035 instance_count,
3036 192 // 4! * 2^{4 - 1}
3037 )
3038 }
3039
3040 #[cfg(feature = "sim")]
3041 #[test]
3042 fn sim_unordered_count_instance_count() {
3043 let mut flow = FlowBuilder::new();
3044 let node = flow.process::<()>();
3045
3046 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3047
3048 let tick = node.tick();
3049 let out_recv = input
3050 .count()
3051 .snapshot(&tick, nondet!(/** test */))
3052 .all_ticks()
3053 .sim_output();
3054
3055 let instance_count = flow.sim().exhaustive(async || {
3056 in_send.send_many_unordered([1, 2, 3, 4]);
3057 assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3058 });
3059
3060 assert_eq!(
3061 instance_count,
3062 16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3063 )
3064 }
3065
3066 #[cfg(feature = "sim")]
3067 #[test]
3068 fn sim_top_level_assume_ordering() {
3069 let mut flow = FlowBuilder::new();
3070 let node = flow.process::<()>();
3071
3072 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3073
3074 let out_recv = input
3075 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3076 .sim_output();
3077
3078 let instance_count = flow.sim().exhaustive(async || {
3079 in_send.send_many_unordered([1, 2, 3]);
3080 let mut out = out_recv.collect::<Vec<_>>().await;
3081 out.sort();
3082 assert_eq!(out, vec![1, 2, 3]);
3083 });
3084
3085 assert_eq!(instance_count, 6)
3086 }
3087
3088 #[cfg(feature = "sim")]
3089 #[test]
3090 fn sim_top_level_assume_ordering_cycle_back() {
3091 let mut flow = FlowBuilder::new();
3092 let node = flow.process::<()>();
3093
3094 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3095
3096 let (complete_cycle_back, cycle_back) =
3097 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3098 let ordered = input
3099 .interleave(cycle_back)
3100 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3101 complete_cycle_back.complete(
3102 ordered
3103 .clone()
3104 .map(q!(|v| v + 1))
3105 .filter(q!(|v| v % 2 == 1)),
3106 );
3107
3108 let out_recv = ordered.sim_output();
3109
3110 let mut saw = false;
3111 let instance_count = flow.sim().exhaustive(async || {
3112 in_send.send_many_unordered([0, 2]);
3113 let out = out_recv.collect::<Vec<_>>().await;
3114
3115 if out.starts_with(&[0, 1, 2]) {
3116 saw = true;
3117 }
3118 });
3119
3120 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3121 assert_eq!(instance_count, 6)
3122 }
3123
3124 #[cfg(feature = "sim")]
3125 #[test]
3126 fn sim_top_level_assume_ordering_cycle_back_tick() {
3127 let mut flow = FlowBuilder::new();
3128 let node = flow.process::<()>();
3129
3130 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3131
3132 let (complete_cycle_back, cycle_back) =
3133 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3134 let ordered = input
3135 .interleave(cycle_back)
3136 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3137 complete_cycle_back.complete(
3138 ordered
3139 .clone()
3140 .batch(&node.tick(), nondet!(/** test */))
3141 .all_ticks()
3142 .map(q!(|v| v + 1))
3143 .filter(q!(|v| v % 2 == 1)),
3144 );
3145
3146 let out_recv = ordered.sim_output();
3147
3148 let mut saw = false;
3149 let instance_count = flow.sim().exhaustive(async || {
3150 in_send.send_many_unordered([0, 2]);
3151 let out = out_recv.collect::<Vec<_>>().await;
3152
3153 if out.starts_with(&[0, 1, 2]) {
3154 saw = true;
3155 }
3156 });
3157
3158 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3159 assert_eq!(instance_count, 58)
3160 }
3161
3162 #[cfg(feature = "sim")]
3163 #[test]
3164 fn sim_top_level_assume_ordering_multiple() {
3165 let mut flow = FlowBuilder::new();
3166 let node = flow.process::<()>();
3167
3168 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3169 let (_, input2) = node.sim_input::<_, NoOrder, _>();
3170
3171 let (complete_cycle_back, cycle_back) =
3172 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3173 let input1_ordered = input
3174 .clone()
3175 .interleave(cycle_back)
3176 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3177 let foo = input1_ordered
3178 .clone()
3179 .map(q!(|v| v + 3))
3180 .weaken_ordering::<NoOrder>()
3181 .interleave(input2)
3182 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3183
3184 complete_cycle_back.complete(foo.filter(q!(|v| *v == 3)));
3185
3186 let out_recv = input1_ordered.sim_output();
3187
3188 let mut saw = false;
3189 let instance_count = flow.sim().exhaustive(async || {
3190 in_send.send_many_unordered([0, 1]);
3191 let out = out_recv.collect::<Vec<_>>().await;
3192
3193 if out.starts_with(&[0, 3, 1]) {
3194 saw = true;
3195 }
3196 });
3197
3198 assert!(saw, "did not see an instance with 0, 3, 1 in order");
3199 assert_eq!(instance_count, 24)
3200 }
3201
3202 #[cfg(feature = "sim")]
3203 #[test]
3204 fn sim_atomic_assume_ordering_cycle_back() {
3205 let mut flow = FlowBuilder::new();
3206 let node = flow.process::<()>();
3207
3208 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3209
3210 let (complete_cycle_back, cycle_back) =
3211 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3212 let ordered = input
3213 .interleave(cycle_back)
3214 .atomic(&node.tick())
3215 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3216 .end_atomic();
3217 complete_cycle_back.complete(
3218 ordered
3219 .clone()
3220 .map(q!(|v| v + 1))
3221 .filter(q!(|v| v % 2 == 1)),
3222 );
3223
3224 let out_recv = ordered.sim_output();
3225
3226 let instance_count = flow.sim().exhaustive(async || {
3227 in_send.send_many_unordered([0, 2]);
3228 let out = out_recv.collect::<Vec<_>>().await;
3229 assert_eq!(out.len(), 4);
3230 });
3231
3232 assert_eq!(instance_count, 22)
3233 }
3234}