hydro_lang/live_collections/optional.rs
1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::builder::CycleId;
15use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
16#[cfg(stageleft_runtime)]
17use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
18use crate::forward_handle::{ForwardRef, TickCycle};
19#[cfg(stageleft_runtime)]
20use crate::location::dynamic::{DynLocation, LocationId};
21use crate::location::tick::{Atomic, DeferTick, NoAtomic};
22use crate::location::{Location, NoTick, Tick, check_matching_location};
23use crate::nondet::{NonDet, nondet};
24
25/// A *nullable* Rust value that can asynchronously change over time.
26///
27/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
28/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
29/// asynchronously change over time, including becoming present of uninhabited.
30///
31/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
32/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
33///
34/// Type Parameters:
35/// - `Type`: the type of the value in this optional (when it is not null)
36/// - `Loc`: the [`Location`] where the optional is materialized
37/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
38pub struct Optional<Type, Loc, Bound: Boundedness> {
39 pub(crate) location: Loc,
40 pub(crate) ir_node: RefCell<HydroNode>,
41
42 _phantom: PhantomData<(Type, Loc, Bound)>,
43}
44
45impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
46where
47 T: Clone,
48 L: Location<'a> + NoTick,
49{
50 fn from(value: Optional<T, L, Bounded>) -> Self {
51 let tick = value.location().tick();
52 value.clone_into_tick(&tick).latest()
53 }
54}
55
56impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
57where
58 L: Location<'a>,
59{
60 fn defer_tick(self) -> Self {
61 Optional::defer_tick(self)
62 }
63}
64
65impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
66where
67 L: Location<'a>,
68{
69 type Location = Tick<L>;
70
71 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
72 Optional::new(
73 location.clone(),
74 HydroNode::CycleSource {
75 cycle_id,
76 metadata: location.new_node_metadata(Self::collection_kind()),
77 },
78 )
79 }
80}
81
82impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
83where
84 L: Location<'a>,
85{
86 type Location = Tick<L>;
87
88 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
89 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
90 location.clone(),
91 HydroNode::DeferTick {
92 input: Box::new(HydroNode::CycleSource {
93 cycle_id,
94 metadata: location.new_node_metadata(Self::collection_kind()),
95 }),
96 metadata: location
97 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
98 },
99 );
100
101 from_previous_tick.or(initial.filter_if_some(location.optional_first_tick(q!(()))))
102 }
103}
104
105impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
106where
107 L: Location<'a>,
108{
109 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
110 assert_eq!(
111 Location::id(&self.location),
112 expected_location,
113 "locations do not match"
114 );
115 self.location
116 .flow_state()
117 .borrow_mut()
118 .push_root(HydroRoot::CycleSink {
119 cycle_id,
120 input: Box::new(self.ir_node.into_inner()),
121 op_metadata: HydroIrOpMetadata::new(),
122 });
123 }
124}
125
126impl<'a, T, L> CycleCollection<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
127where
128 L: Location<'a>,
129{
130 type Location = Tick<L>;
131
132 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
133 Optional::new(
134 location.clone(),
135 HydroNode::CycleSource {
136 cycle_id,
137 metadata: location.new_node_metadata(Self::collection_kind()),
138 },
139 )
140 }
141}
142
143impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
144where
145 L: Location<'a>,
146{
147 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
148 assert_eq!(
149 Location::id(&self.location),
150 expected_location,
151 "locations do not match"
152 );
153 self.location
154 .flow_state()
155 .borrow_mut()
156 .push_root(HydroRoot::CycleSink {
157 cycle_id,
158 input: Box::new(self.ir_node.into_inner()),
159 op_metadata: HydroIrOpMetadata::new(),
160 });
161 }
162}
163
164impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
165where
166 L: Location<'a> + NoTick,
167{
168 type Location = L;
169
170 fn create_source(cycle_id: CycleId, location: L) -> Self {
171 Optional::new(
172 location.clone(),
173 HydroNode::CycleSource {
174 cycle_id,
175 metadata: location.new_node_metadata(Self::collection_kind()),
176 },
177 )
178 }
179}
180
181impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
182where
183 L: Location<'a> + NoTick,
184{
185 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
186 assert_eq!(
187 Location::id(&self.location),
188 expected_location,
189 "locations do not match"
190 );
191 self.location
192 .flow_state()
193 .borrow_mut()
194 .push_root(HydroRoot::CycleSink {
195 cycle_id,
196 input: Box::new(self.ir_node.into_inner()),
197 op_metadata: HydroIrOpMetadata::new(),
198 });
199 }
200}
201
202impl<'a, T, L, B: Boundedness> From<Singleton<T, L, B>> for Optional<T, L, B>
203where
204 L: Location<'a>,
205{
206 fn from(singleton: Singleton<T, L, B>) -> Self {
207 Optional::new(
208 singleton.location.clone(),
209 HydroNode::Cast {
210 inner: Box::new(singleton.ir_node.into_inner()),
211 metadata: singleton
212 .location
213 .new_node_metadata(Self::collection_kind()),
214 },
215 )
216 }
217}
218
219#[cfg(stageleft_runtime)]
220fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
221 me: Optional<T, L, B>,
222 other: Optional<O, L, B>,
223) -> Optional<(T, O), L, B> {
224 check_matching_location(&me.location, &other.location);
225
226 Optional::new(
227 me.location.clone(),
228 HydroNode::CrossSingleton {
229 left: Box::new(me.ir_node.into_inner()),
230 right: Box::new(other.ir_node.into_inner()),
231 metadata: me
232 .location
233 .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
234 },
235 )
236}
237
238#[cfg(stageleft_runtime)]
239fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
240 me: Optional<T, L, B>,
241 other: Optional<T, L, B>,
242) -> Optional<T, L, B> {
243 check_matching_location(&me.location, &other.location);
244
245 Optional::new(
246 me.location.clone(),
247 HydroNode::ChainFirst {
248 first: Box::new(me.ir_node.into_inner()),
249 second: Box::new(other.ir_node.into_inner()),
250 metadata: me
251 .location
252 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
253 },
254 )
255}
256
257impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
258where
259 T: Clone,
260 L: Location<'a>,
261{
262 fn clone(&self) -> Self {
263 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
264 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
265 *self.ir_node.borrow_mut() = HydroNode::Tee {
266 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
267 metadata: self.location.new_node_metadata(Self::collection_kind()),
268 };
269 }
270
271 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
272 Optional {
273 location: self.location.clone(),
274 ir_node: HydroNode::Tee {
275 inner: TeeNode(inner.0.clone()),
276 metadata: metadata.clone(),
277 }
278 .into(),
279 _phantom: PhantomData,
280 }
281 } else {
282 unreachable!()
283 }
284 }
285}
286
287impl<'a, T, L, B: Boundedness> Optional<T, L, B>
288where
289 L: Location<'a>,
290{
291 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
292 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
293 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
294 Optional {
295 location,
296 ir_node: RefCell::new(ir_node),
297 _phantom: PhantomData,
298 }
299 }
300
301 pub(crate) fn collection_kind() -> CollectionKind {
302 CollectionKind::Optional {
303 bound: B::BOUND_KIND,
304 element_type: stageleft::quote_type::<T>().into(),
305 }
306 }
307
308 /// Returns the [`Location`] where this optional is being materialized.
309 pub fn location(&self) -> &L {
310 &self.location
311 }
312
313 /// Transforms the optional value by applying a function `f` to it,
314 /// continuously as the input is updated.
315 ///
316 /// Whenever the optional is empty, the output optional is also empty.
317 ///
318 /// # Example
319 /// ```rust
320 /// # #[cfg(feature = "deploy")] {
321 /// # use hydro_lang::prelude::*;
322 /// # use futures::StreamExt;
323 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
324 /// let tick = process.tick();
325 /// let optional = tick.optional_first_tick(q!(1));
326 /// optional.map(q!(|v| v + 1)).all_ticks()
327 /// # }, |mut stream| async move {
328 /// // 2
329 /// # assert_eq!(stream.next().await.unwrap(), 2);
330 /// # }));
331 /// # }
332 /// ```
333 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
334 where
335 F: Fn(T) -> U + 'a,
336 {
337 let f = f.splice_fn1_ctx(&self.location).into();
338 Optional::new(
339 self.location.clone(),
340 HydroNode::Map {
341 f,
342 input: Box::new(self.ir_node.into_inner()),
343 metadata: self
344 .location
345 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
346 },
347 )
348 }
349
350 /// Transforms the optional value by applying a function `f` to it and then flattening
351 /// the result into a stream, preserving the order of elements.
352 ///
353 /// If the optional is empty, the output stream is also empty. If the optional contains
354 /// a value, `f` is applied to produce an iterator, and all items from that iterator
355 /// are emitted in the output stream in deterministic order.
356 ///
357 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
358 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
359 /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
360 ///
361 /// # Example
362 /// ```rust
363 /// # #[cfg(feature = "deploy")] {
364 /// # use hydro_lang::prelude::*;
365 /// # use futures::StreamExt;
366 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
367 /// let tick = process.tick();
368 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
369 /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
370 /// # }, |mut stream| async move {
371 /// // 1, 2, 3
372 /// # for w in vec![1, 2, 3] {
373 /// # assert_eq!(stream.next().await.unwrap(), w);
374 /// # }
375 /// # }));
376 /// # }
377 /// ```
378 pub fn flat_map_ordered<U, I, F>(
379 self,
380 f: impl IntoQuotedMut<'a, F, L>,
381 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
382 where
383 I: IntoIterator<Item = U>,
384 F: Fn(T) -> I + 'a,
385 {
386 let f = f.splice_fn1_ctx(&self.location).into();
387 Stream::new(
388 self.location.clone(),
389 HydroNode::FlatMap {
390 f,
391 input: Box::new(self.ir_node.into_inner()),
392 metadata: self.location.new_node_metadata(
393 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
394 ),
395 },
396 )
397 }
398
399 /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
400 /// for the output type `I` to produce items in any order.
401 ///
402 /// If the optional is empty, the output stream is also empty. If the optional contains
403 /// a value, `f` is applied to produce an iterator, and all items from that iterator
404 /// are emitted in the output stream in non-deterministic order.
405 ///
406 /// # Example
407 /// ```rust
408 /// # #[cfg(feature = "deploy")] {
409 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
410 /// # use futures::StreamExt;
411 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
412 /// let tick = process.tick();
413 /// let optional = tick.optional_first_tick(q!(
414 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
415 /// ));
416 /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
417 /// # }, |mut stream| async move {
418 /// // 1, 2, 3, but in no particular order
419 /// # let mut results = Vec::new();
420 /// # for _ in 0..3 {
421 /// # results.push(stream.next().await.unwrap());
422 /// # }
423 /// # results.sort();
424 /// # assert_eq!(results, vec![1, 2, 3]);
425 /// # }));
426 /// # }
427 /// ```
428 pub fn flat_map_unordered<U, I, F>(
429 self,
430 f: impl IntoQuotedMut<'a, F, L>,
431 ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
432 where
433 I: IntoIterator<Item = U>,
434 F: Fn(T) -> I + 'a,
435 {
436 let f = f.splice_fn1_ctx(&self.location).into();
437 Stream::new(
438 self.location.clone(),
439 HydroNode::FlatMap {
440 f,
441 input: Box::new(self.ir_node.into_inner()),
442 metadata: self
443 .location
444 .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
445 },
446 )
447 }
448
449 /// Flattens the optional value into a stream, preserving the order of elements.
450 ///
451 /// If the optional is empty, the output stream is also empty. If the optional contains
452 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
453 /// in the output stream in deterministic order.
454 ///
455 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
456 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
457 /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
458 ///
459 /// # Example
460 /// ```rust
461 /// # #[cfg(feature = "deploy")] {
462 /// # use hydro_lang::prelude::*;
463 /// # use futures::StreamExt;
464 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
465 /// let tick = process.tick();
466 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
467 /// optional.flatten_ordered().all_ticks()
468 /// # }, |mut stream| async move {
469 /// // 1, 2, 3
470 /// # for w in vec![1, 2, 3] {
471 /// # assert_eq!(stream.next().await.unwrap(), w);
472 /// # }
473 /// # }));
474 /// # }
475 /// ```
476 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
477 where
478 T: IntoIterator<Item = U>,
479 {
480 self.flat_map_ordered(q!(|v| v))
481 }
482
483 /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
484 /// for the element type `T` to produce items in any order.
485 ///
486 /// If the optional is empty, the output stream is also empty. If the optional contains
487 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
488 /// in the output stream in non-deterministic order.
489 ///
490 /// # Example
491 /// ```rust
492 /// # #[cfg(feature = "deploy")] {
493 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
494 /// # use futures::StreamExt;
495 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
496 /// let tick = process.tick();
497 /// let optional = tick.optional_first_tick(q!(
498 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
499 /// ));
500 /// optional.flatten_unordered().all_ticks()
501 /// # }, |mut stream| async move {
502 /// // 1, 2, 3, but in no particular order
503 /// # let mut results = Vec::new();
504 /// # for _ in 0..3 {
505 /// # results.push(stream.next().await.unwrap());
506 /// # }
507 /// # results.sort();
508 /// # assert_eq!(results, vec![1, 2, 3]);
509 /// # }));
510 /// # }
511 /// ```
512 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
513 where
514 T: IntoIterator<Item = U>,
515 {
516 self.flat_map_unordered(q!(|v| v))
517 }
518
519 /// Creates an optional containing only the value if it satisfies a predicate `f`.
520 ///
521 /// If the optional is empty, the output optional is also empty. If the optional contains
522 /// a value and the predicate returns `true`, the output optional contains the same value.
523 /// If the predicate returns `false`, the output optional is empty.
524 ///
525 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
526 /// not modify or take ownership of the value. If you need to modify the value while filtering
527 /// use [`Optional::filter_map`] instead.
528 ///
529 /// # Example
530 /// ```rust
531 /// # #[cfg(feature = "deploy")] {
532 /// # use hydro_lang::prelude::*;
533 /// # use futures::StreamExt;
534 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
535 /// let tick = process.tick();
536 /// let optional = tick.optional_first_tick(q!(5));
537 /// optional.filter(q!(|&x| x > 3)).all_ticks()
538 /// # }, |mut stream| async move {
539 /// // 5
540 /// # assert_eq!(stream.next().await.unwrap(), 5);
541 /// # }));
542 /// # }
543 /// ```
544 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
545 where
546 F: Fn(&T) -> bool + 'a,
547 {
548 let f = f.splice_fn1_borrow_ctx(&self.location).into();
549 Optional::new(
550 self.location.clone(),
551 HydroNode::Filter {
552 f,
553 input: Box::new(self.ir_node.into_inner()),
554 metadata: self.location.new_node_metadata(Self::collection_kind()),
555 },
556 )
557 }
558
559 /// An operator that both filters and maps. It yields only the value if the supplied
560 /// closure `f` returns `Some(value)`.
561 ///
562 /// If the optional is empty, the output optional is also empty. If the optional contains
563 /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
564 /// If the closure returns `None`, the output optional is empty.
565 ///
566 /// # Example
567 /// ```rust
568 /// # #[cfg(feature = "deploy")] {
569 /// # use hydro_lang::prelude::*;
570 /// # use futures::StreamExt;
571 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
572 /// let tick = process.tick();
573 /// let optional = tick.optional_first_tick(q!("42"));
574 /// optional
575 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
576 /// .all_ticks()
577 /// # }, |mut stream| async move {
578 /// // 42
579 /// # assert_eq!(stream.next().await.unwrap(), 42);
580 /// # }));
581 /// # }
582 /// ```
583 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
584 where
585 F: Fn(T) -> Option<U> + 'a,
586 {
587 let f = f.splice_fn1_ctx(&self.location).into();
588 Optional::new(
589 self.location.clone(),
590 HydroNode::FilterMap {
591 f,
592 input: Box::new(self.ir_node.into_inner()),
593 metadata: self
594 .location
595 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
596 },
597 )
598 }
599
600 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
601 ///
602 /// If the other value is a [`Optional`], the output will be non-null only if the argument is
603 /// non-null. This is useful for combining several pieces of state together.
604 ///
605 /// # Example
606 /// ```rust
607 /// # #[cfg(feature = "deploy")] {
608 /// # use hydro_lang::prelude::*;
609 /// # use futures::StreamExt;
610 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
611 /// let tick = process.tick();
612 /// let numbers = process
613 /// .source_iter(q!(vec![123, 456, 789]))
614 /// .batch(&tick, nondet!(/** test */));
615 /// let min = numbers.clone().min(); // Optional
616 /// let max = numbers.max(); // Optional
617 /// min.zip(max).all_ticks()
618 /// # }, |mut stream| async move {
619 /// // [(123, 789)]
620 /// # for w in vec![(123, 789)] {
621 /// # assert_eq!(stream.next().await.unwrap(), w);
622 /// # }
623 /// # }));
624 /// # }
625 /// ```
626 pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
627 where
628 O: Clone,
629 {
630 let other: Optional<O, L, B> = other.into();
631 check_matching_location(&self.location, &other.location);
632
633 if L::is_top_level()
634 && let Some(tick) = self.location.try_tick()
635 {
636 let out = zip_inside_tick(
637 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
638 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
639 )
640 .latest();
641
642 Optional::new(out.location, out.ir_node.into_inner())
643 } else {
644 zip_inside_tick(self, other)
645 }
646 }
647
648 /// Passes through `self` when it has a value, otherwise passes through `other`.
649 ///
650 /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
651 /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
652 /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
653 ///
654 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
655 /// of the inputs change (including to/from null states).
656 ///
657 /// # Example
658 /// ```rust
659 /// # #[cfg(feature = "deploy")] {
660 /// # use hydro_lang::prelude::*;
661 /// # use futures::StreamExt;
662 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
663 /// let tick = process.tick();
664 /// // ticks are lazy by default, forces the second tick to run
665 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
666 ///
667 /// let some_first_tick = tick.optional_first_tick(q!(123));
668 /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
669 /// some_first_tick.or(some_second_tick).all_ticks()
670 /// # }, |mut stream| async move {
671 /// // [123 /* first tick */, 456 /* second tick */]
672 /// # for w in vec![123, 456] {
673 /// # assert_eq!(stream.next().await.unwrap(), w);
674 /// # }
675 /// # }));
676 /// # }
677 /// ```
678 pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
679 check_matching_location(&self.location, &other.location);
680
681 if L::is_top_level()
682 && !B::BOUNDED // only if unbounded we need to use a tick
683 && let Some(tick) = self.location.try_tick()
684 {
685 let out = or_inside_tick(
686 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
687 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
688 )
689 .latest();
690
691 Optional::new(out.location, out.ir_node.into_inner())
692 } else {
693 Optional::new(
694 self.location.clone(),
695 HydroNode::ChainFirst {
696 first: Box::new(self.ir_node.into_inner()),
697 second: Box::new(other.ir_node.into_inner()),
698 metadata: self.location.new_node_metadata(Self::collection_kind()),
699 },
700 )
701 }
702 }
703
704 /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
705 ///
706 /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
707 /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
708 ///
709 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
710 /// of the inputs change (including to/from null states).
711 ///
712 /// # Example
713 /// ```rust
714 /// # #[cfg(feature = "deploy")] {
715 /// # use hydro_lang::prelude::*;
716 /// # use futures::StreamExt;
717 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
718 /// let tick = process.tick();
719 /// // ticks are lazy by default, forces the later ticks to run
720 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
721 ///
722 /// let some_first_tick = tick.optional_first_tick(q!(123));
723 /// some_first_tick
724 /// .unwrap_or(tick.singleton(q!(456)))
725 /// .all_ticks()
726 /// # }, |mut stream| async move {
727 /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
728 /// # for w in vec![123, 456, 456, 456] {
729 /// # assert_eq!(stream.next().await.unwrap(), w);
730 /// # }
731 /// # }));
732 /// # }
733 /// ```
734 pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
735 let res_option = self.or(other.into());
736 Singleton::new(
737 res_option.location.clone(),
738 HydroNode::Cast {
739 inner: Box::new(res_option.ir_node.into_inner()),
740 metadata: res_option
741 .location
742 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
743 },
744 )
745 }
746
747 /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
748 ///
749 /// Useful for writing custom Rust code that needs to interact with both the null and non-null
750 /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
751 /// so that Hydro can skip any computation on null values.
752 ///
753 /// # Example
754 /// ```rust
755 /// # #[cfg(feature = "deploy")] {
756 /// # use hydro_lang::prelude::*;
757 /// # use futures::StreamExt;
758 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
759 /// let tick = process.tick();
760 /// // ticks are lazy by default, forces the later ticks to run
761 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
762 ///
763 /// let some_first_tick = tick.optional_first_tick(q!(123));
764 /// some_first_tick.into_singleton().all_ticks()
765 /// # }, |mut stream| async move {
766 /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
767 /// # for w in vec![Some(123), None, None, None] {
768 /// # assert_eq!(stream.next().await.unwrap(), w);
769 /// # }
770 /// # }));
771 /// # }
772 /// ```
773 pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
774 where
775 T: Clone,
776 {
777 let none: syn::Expr = parse_quote!(::std::option::Option::None);
778
779 let none_singleton = Singleton::new(
780 self.location.clone(),
781 HydroNode::SingletonSource {
782 value: none.into(),
783 metadata: self
784 .location
785 .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
786 },
787 );
788
789 self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
790 }
791
792 /// An operator which allows you to "name" a `HydroNode`.
793 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
794 pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
795 {
796 let mut node = self.ir_node.borrow_mut();
797 let metadata = node.metadata_mut();
798 metadata.tag = Some(name.to_owned());
799 }
800 self
801 }
802
803 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
804 /// implies that `B == Bounded`.
805 pub fn make_bounded(self) -> Optional<T, L, Bounded>
806 where
807 B: IsBounded,
808 {
809 Optional::new(self.location, self.ir_node.into_inner())
810 }
811
812 /// Clones this bounded optional into a tick, returning a optional that has the
813 /// same value as the outer optional. Because the outer optional is bounded, this
814 /// is deterministic because there is only a single immutable version.
815 pub fn clone_into_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded>
816 where
817 B: IsBounded,
818 T: Clone,
819 {
820 // TODO(shadaj): avoid printing simulator logs for this snapshot
821 self.snapshot(
822 tick,
823 nondet!(/** bounded top-level optional so deterministic */),
824 )
825 }
826
827 /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
828 /// non-null. Otherwise, the stream is empty.
829 ///
830 /// # Example
831 /// ```rust
832 /// # #[cfg(feature = "deploy")] {
833 /// # use hydro_lang::prelude::*;
834 /// # use futures::StreamExt;
835 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
836 /// # let tick = process.tick();
837 /// # // ticks are lazy by default, forces the second tick to run
838 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
839 /// # let batch_first_tick = process
840 /// # .source_iter(q!(vec![]))
841 /// # .batch(&tick, nondet!(/** test */));
842 /// # let batch_second_tick = process
843 /// # .source_iter(q!(vec![123, 456]))
844 /// # .batch(&tick, nondet!(/** test */))
845 /// # .defer_tick(); // appears on the second tick
846 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
847 /// input_batch // first tick: [], second tick: [123, 456]
848 /// .clone()
849 /// .max()
850 /// .into_stream()
851 /// .chain(input_batch)
852 /// .all_ticks()
853 /// # }, |mut stream| async move {
854 /// // [456, 123, 456]
855 /// # for w in vec![456, 123, 456] {
856 /// # assert_eq!(stream.next().await.unwrap(), w);
857 /// # }
858 /// # }));
859 /// # }
860 /// ```
861 pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
862 where
863 B: IsBounded,
864 {
865 Stream::new(
866 self.location.clone(),
867 HydroNode::Cast {
868 inner: Box::new(self.ir_node.into_inner()),
869 metadata: self.location.new_node_metadata(Stream::<
870 T,
871 Tick<L>,
872 Bounded,
873 TotalOrder,
874 ExactlyOnce,
875 >::collection_kind()),
876 },
877 )
878 }
879
880 /// Filters this optional, passing through the optional value if it is non-null **and** the
881 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
882 ///
883 /// Useful for conditionally processing, such as only emitting an optional's value outside
884 /// a tick if some other condition is satisfied.
885 ///
886 /// # Example
887 /// ```rust
888 /// # #[cfg(feature = "deploy")] {
889 /// # use hydro_lang::prelude::*;
890 /// # use futures::StreamExt;
891 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
892 /// let tick = process.tick();
893 /// // ticks are lazy by default, forces the second tick to run
894 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
895 ///
896 /// let batch_first_tick = process
897 /// .source_iter(q!(vec![]))
898 /// .batch(&tick, nondet!(/** test */));
899 /// let batch_second_tick = process
900 /// .source_iter(q!(vec![456]))
901 /// .batch(&tick, nondet!(/** test */))
902 /// .defer_tick(); // appears on the second tick
903 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
904 /// batch_first_tick.chain(batch_second_tick).first()
905 /// .filter_if_some(some_on_first_tick)
906 /// .unwrap_or(tick.singleton(q!(789)))
907 /// .all_ticks()
908 /// # }, |mut stream| async move {
909 /// // [789, 789]
910 /// # for w in vec![789, 789] {
911 /// # assert_eq!(stream.next().await.unwrap(), w);
912 /// # }
913 /// # }));
914 /// # }
915 /// ```
916 pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>
917 where
918 B: IsBounded,
919 {
920 self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
921 }
922
923 /// Filters this optional, passing through the optional value if it is non-null **and** the
924 /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
925 ///
926 /// Useful for conditionally processing, such as only emitting an optional's value outside
927 /// a tick if some other condition is satisfied.
928 ///
929 /// # Example
930 /// ```rust
931 /// # #[cfg(feature = "deploy")] {
932 /// # use hydro_lang::prelude::*;
933 /// # use futures::StreamExt;
934 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
935 /// let tick = process.tick();
936 /// // ticks are lazy by default, forces the second tick to run
937 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
938 ///
939 /// let batch_first_tick = process
940 /// .source_iter(q!(vec![]))
941 /// .batch(&tick, nondet!(/** test */));
942 /// let batch_second_tick = process
943 /// .source_iter(q!(vec![456]))
944 /// .batch(&tick, nondet!(/** test */))
945 /// .defer_tick(); // appears on the second tick
946 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
947 /// batch_first_tick.chain(batch_second_tick).first()
948 /// .filter_if_none(some_on_first_tick)
949 /// .unwrap_or(tick.singleton(q!(789)))
950 /// .all_ticks()
951 /// # }, |mut stream| async move {
952 /// // [789, 789]
953 /// # for w in vec![789, 456] {
954 /// # assert_eq!(stream.next().await.unwrap(), w);
955 /// # }
956 /// # }));
957 /// # }
958 /// ```
959 pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>
960 where
961 B: IsBounded,
962 {
963 self.filter_if_some(
964 other
965 .map(q!(|_| ()))
966 .into_singleton()
967 .filter(q!(|o| o.is_none())),
968 )
969 }
970
971 /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
972 ///
973 /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
974 /// having a value, such as only releasing a piece of state if the node is the leader.
975 ///
976 /// # Example
977 /// ```rust
978 /// # #[cfg(feature = "deploy")] {
979 /// # use hydro_lang::prelude::*;
980 /// # use futures::StreamExt;
981 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
982 /// let tick = process.tick();
983 /// // ticks are lazy by default, forces the second tick to run
984 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
985 ///
986 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
987 /// some_on_first_tick
988 /// .if_some_then(tick.singleton(q!(456)))
989 /// .unwrap_or(tick.singleton(q!(123)))
990 /// # .all_ticks()
991 /// # }, |mut stream| async move {
992 /// // 456 (first tick) ~> 123 (second tick onwards)
993 /// # for w in vec![456, 123, 123] {
994 /// # assert_eq!(stream.next().await.unwrap(), w);
995 /// # }
996 /// # }));
997 /// # }
998 /// ```
999 pub fn if_some_then<U>(self, value: Singleton<U, L, B>) -> Optional<U, L, B>
1000 where
1001 B: IsBounded,
1002 {
1003 value.filter_if_some(self)
1004 }
1005}
1006
1007impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
1008where
1009 L: Location<'a> + NoTick,
1010{
1011 /// Returns an optional value corresponding to the latest snapshot of the optional
1012 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1013 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1014 /// all snapshots of this optional into the atomic-associated tick will observe the
1015 /// same value each tick.
1016 ///
1017 /// # Non-Determinism
1018 /// Because this picks a snapshot of a optional whose value is continuously changing,
1019 /// the output optional has a non-deterministic value since the snapshot can be at an
1020 /// arbitrary point in time.
1021 pub fn snapshot_atomic(self, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1022 Optional::new(
1023 self.location.clone().tick,
1024 HydroNode::Batch {
1025 inner: Box::new(self.ir_node.into_inner()),
1026 metadata: self
1027 .location
1028 .tick
1029 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1030 },
1031 )
1032 }
1033
1034 /// Returns this optional back into a top-level, asynchronous execution context where updates
1035 /// to the value will be asynchronously propagated.
1036 pub fn end_atomic(self) -> Optional<T, L, B> {
1037 Optional::new(
1038 self.location.tick.l.clone(),
1039 HydroNode::EndAtomic {
1040 inner: Box::new(self.ir_node.into_inner()),
1041 metadata: self
1042 .location
1043 .tick
1044 .l
1045 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1046 },
1047 )
1048 }
1049}
1050
1051impl<'a, T, L, B: Boundedness> Optional<T, L, B>
1052where
1053 L: Location<'a>,
1054{
1055 /// Shifts this optional into an atomic context, which guarantees that any downstream logic
1056 /// will observe the same version of the value and will be executed synchronously before any
1057 /// outputs are yielded (in [`Optional::end_atomic`]).
1058 ///
1059 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1060 /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
1061 /// a different version).
1062 ///
1063 /// Entering an atomic section requires a [`Tick`] argument that declares where the optional will
1064 /// be atomically processed. Snapshotting an optional into the _same_ [`Tick`] will preserve the
1065 /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
1066 pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
1067 let out_location = Atomic { tick: tick.clone() };
1068 Optional::new(
1069 out_location.clone(),
1070 HydroNode::BeginAtomic {
1071 inner: Box::new(self.ir_node.into_inner()),
1072 metadata: out_location
1073 .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
1074 },
1075 )
1076 }
1077
1078 /// Given a tick, returns a optional value corresponding to a snapshot of the optional
1079 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1080 /// relevant data that contributed to the snapshot at tick `t`.
1081 ///
1082 /// # Non-Determinism
1083 /// Because this picks a snapshot of a optional whose value is continuously changing,
1084 /// the output optional has a non-deterministic value since the snapshot can be at an
1085 /// arbitrary point in time.
1086 pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1087 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1088 Optional::new(
1089 tick.clone(),
1090 HydroNode::Batch {
1091 inner: Box::new(self.ir_node.into_inner()),
1092 metadata: tick
1093 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1094 },
1095 )
1096 }
1097
1098 /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
1099 /// with order corresponding to increasing prefixes of data contributing to the optional.
1100 ///
1101 /// # Non-Determinism
1102 /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
1103 /// to non-deterministic batching and arrival of inputs, the output stream is
1104 /// non-deterministic.
1105 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1106 where
1107 L: NoTick,
1108 {
1109 let tick = self.location.tick();
1110 self.snapshot(&tick, nondet).all_ticks().weaken_retries()
1111 }
1112
1113 /// Given a time interval, returns a stream corresponding to snapshots of the optional
1114 /// value taken at various points in time. Because the input optional may be
1115 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1116 /// represent the value of the optional given some prefix of the streams leading up to
1117 /// it.
1118 ///
1119 /// # Non-Determinism
1120 /// The output stream is non-deterministic in which elements are sampled, since this
1121 /// is controlled by a clock.
1122 pub fn sample_every(
1123 self,
1124 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1125 nondet: NonDet,
1126 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1127 where
1128 L: NoTick + NoAtomic,
1129 {
1130 let samples = self.location.source_interval(interval, nondet);
1131 let tick = self.location.tick();
1132
1133 self.snapshot(&tick, nondet)
1134 .filter_if_some(samples.batch(&tick, nondet).first())
1135 .all_ticks()
1136 .weaken_retries()
1137 }
1138}
1139
1140impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1141where
1142 L: Location<'a>,
1143{
1144 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1145 /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1146 /// null values).
1147 ///
1148 /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1149 /// producing one element in the output for each (non-null) tick. This is useful for batched
1150 /// computations, where the results from each tick must be combined together.
1151 ///
1152 /// # Example
1153 /// ```rust
1154 /// # #[cfg(feature = "deploy")] {
1155 /// # use hydro_lang::prelude::*;
1156 /// # use futures::StreamExt;
1157 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1158 /// # let tick = process.tick();
1159 /// # // ticks are lazy by default, forces the second tick to run
1160 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1161 /// # let batch_first_tick = process
1162 /// # .source_iter(q!(vec![]))
1163 /// # .batch(&tick, nondet!(/** test */));
1164 /// # let batch_second_tick = process
1165 /// # .source_iter(q!(vec![1, 2, 3]))
1166 /// # .batch(&tick, nondet!(/** test */))
1167 /// # .defer_tick(); // appears on the second tick
1168 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1169 /// input_batch // first tick: [], second tick: [1, 2, 3]
1170 /// .max()
1171 /// .all_ticks()
1172 /// # }, |mut stream| async move {
1173 /// // [3]
1174 /// # for w in vec![3] {
1175 /// # assert_eq!(stream.next().await.unwrap(), w);
1176 /// # }
1177 /// # }));
1178 /// # }
1179 /// ```
1180 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1181 self.into_stream().all_ticks()
1182 }
1183
1184 /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1185 /// which will stream the value computed in _each_ tick as a separate stream element.
1186 ///
1187 /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1188 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1189 /// optional's [`Tick`] context.
1190 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1191 self.into_stream().all_ticks_atomic()
1192 }
1193
1194 /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1195 /// be asynchronously updated with the latest value of the optional inside the tick, including
1196 /// whether the optional is null or not.
1197 ///
1198 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1199 /// tick that tracks the inner value. This is useful for getting the value as of the
1200 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1201 ///
1202 /// # Example
1203 /// ```rust
1204 /// # #[cfg(feature = "deploy")] {
1205 /// # use hydro_lang::prelude::*;
1206 /// # use futures::StreamExt;
1207 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1208 /// # let tick = process.tick();
1209 /// # // ticks are lazy by default, forces the second tick to run
1210 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1211 /// # let batch_first_tick = process
1212 /// # .source_iter(q!(vec![]))
1213 /// # .batch(&tick, nondet!(/** test */));
1214 /// # let batch_second_tick = process
1215 /// # .source_iter(q!(vec![1, 2, 3]))
1216 /// # .batch(&tick, nondet!(/** test */))
1217 /// # .defer_tick(); // appears on the second tick
1218 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1219 /// input_batch // first tick: [], second tick: [1, 2, 3]
1220 /// .max()
1221 /// .latest()
1222 /// # .into_singleton()
1223 /// # .sample_eager(nondet!(/** test */))
1224 /// # }, |mut stream| async move {
1225 /// // asynchronously changes from None ~> 3
1226 /// # for w in vec![None, Some(3)] {
1227 /// # assert_eq!(stream.next().await.unwrap(), w);
1228 /// # }
1229 /// # }));
1230 /// # }
1231 /// ```
1232 pub fn latest(self) -> Optional<T, L, Unbounded> {
1233 Optional::new(
1234 self.location.outer().clone(),
1235 HydroNode::YieldConcat {
1236 inner: Box::new(self.ir_node.into_inner()),
1237 metadata: self
1238 .location
1239 .outer()
1240 .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1241 },
1242 )
1243 }
1244
1245 /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1246 /// be updated with the latest value of the optional inside the tick.
1247 ///
1248 /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1249 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1250 /// optional's [`Tick`] context.
1251 pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1252 let out_location = Atomic {
1253 tick: self.location.clone(),
1254 };
1255
1256 Optional::new(
1257 out_location.clone(),
1258 HydroNode::YieldConcat {
1259 inner: Box::new(self.ir_node.into_inner()),
1260 metadata: out_location
1261 .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1262 },
1263 )
1264 }
1265
1266 /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1267 /// always has the state of `self` at tick `T - 1`.
1268 ///
1269 /// At tick `0`, the output optional is null, since there is no previous tick.
1270 ///
1271 /// This operator enables stateful iterative processing with ticks, by sending data from one
1272 /// tick to the next. For example, you can use it to compare state across consecutive batches.
1273 ///
1274 /// # Example
1275 /// ```rust
1276 /// # #[cfg(feature = "deploy")] {
1277 /// # use hydro_lang::prelude::*;
1278 /// # use futures::StreamExt;
1279 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1280 /// let tick = process.tick();
1281 /// // ticks are lazy by default, forces the second tick to run
1282 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1283 ///
1284 /// let batch_first_tick = process
1285 /// .source_iter(q!(vec![1, 2]))
1286 /// .batch(&tick, nondet!(/** test */));
1287 /// let batch_second_tick = process
1288 /// .source_iter(q!(vec![3, 4]))
1289 /// .batch(&tick, nondet!(/** test */))
1290 /// .defer_tick(); // appears on the second tick
1291 /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1292 /// .reduce(q!(|state, v| *state += v));
1293 ///
1294 /// current_tick_sum.clone().into_singleton().zip(
1295 /// current_tick_sum.defer_tick().into_singleton() // state from previous tick
1296 /// ).all_ticks()
1297 /// # }, |mut stream| async move {
1298 /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1299 /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1300 /// # assert_eq!(stream.next().await.unwrap(), w);
1301 /// # }
1302 /// # }));
1303 /// # }
1304 /// ```
1305 pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1306 Optional::new(
1307 self.location.clone(),
1308 HydroNode::DeferTick {
1309 input: Box::new(self.ir_node.into_inner()),
1310 metadata: self.location.new_node_metadata(Self::collection_kind()),
1311 },
1312 )
1313 }
1314}
1315
1316#[cfg(test)]
1317mod tests {
1318 #[cfg(feature = "deploy")]
1319 use futures::StreamExt;
1320 #[cfg(feature = "deploy")]
1321 use hydro_deploy::Deployment;
1322 #[cfg(any(feature = "deploy", feature = "sim"))]
1323 use stageleft::q;
1324
1325 #[cfg(feature = "deploy")]
1326 use super::Optional;
1327 #[cfg(any(feature = "deploy", feature = "sim"))]
1328 use crate::compile::builder::FlowBuilder;
1329 #[cfg(any(feature = "deploy", feature = "sim"))]
1330 use crate::location::Location;
1331 #[cfg(feature = "deploy")]
1332 use crate::nondet::nondet;
1333
1334 #[cfg(feature = "deploy")]
1335 #[tokio::test]
1336 async fn optional_or_cardinality() {
1337 let mut deployment = Deployment::new();
1338
1339 let mut flow = FlowBuilder::new();
1340 let node = flow.process::<()>();
1341 let external = flow.external::<()>();
1342
1343 let node_tick = node.tick();
1344 let tick_singleton = node_tick.singleton(q!(123));
1345 let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1346 let counts = tick_optional_inhabited
1347 .clone()
1348 .or(tick_optional_inhabited)
1349 .into_stream()
1350 .count()
1351 .all_ticks()
1352 .send_bincode_external(&external);
1353
1354 let nodes = flow
1355 .with_process(&node, deployment.Localhost())
1356 .with_external(&external, deployment.Localhost())
1357 .deploy(&mut deployment);
1358
1359 deployment.deploy().await.unwrap();
1360
1361 let mut external_out = nodes.connect(counts).await;
1362
1363 deployment.start().await.unwrap();
1364
1365 assert_eq!(external_out.next().await.unwrap(), 1);
1366 }
1367
1368 #[cfg(feature = "deploy")]
1369 #[tokio::test]
1370 async fn into_singleton_top_level_none_cardinality() {
1371 let mut deployment = Deployment::new();
1372
1373 let mut flow = FlowBuilder::new();
1374 let node = flow.process::<()>();
1375 let external = flow.external::<()>();
1376
1377 let node_tick = node.tick();
1378 let top_level_none = node.singleton(q!(123)).filter(q!(|_| false));
1379 let into_singleton = top_level_none.into_singleton();
1380
1381 let tick_driver = node.spin();
1382
1383 let counts = into_singleton
1384 .snapshot(&node_tick, nondet!(/** test */))
1385 .into_stream()
1386 .count()
1387 .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1388 .map(q!(|(c, _)| c))
1389 .all_ticks()
1390 .send_bincode_external(&external);
1391
1392 let nodes = flow
1393 .with_process(&node, deployment.Localhost())
1394 .with_external(&external, deployment.Localhost())
1395 .deploy(&mut deployment);
1396
1397 deployment.deploy().await.unwrap();
1398
1399 let mut external_out = nodes.connect(counts).await;
1400
1401 deployment.start().await.unwrap();
1402
1403 assert_eq!(external_out.next().await.unwrap(), 1);
1404 assert_eq!(external_out.next().await.unwrap(), 1);
1405 assert_eq!(external_out.next().await.unwrap(), 1);
1406 }
1407
1408 #[cfg(feature = "deploy")]
1409 #[tokio::test]
1410 async fn into_singleton_unbounded_top_level_none_cardinality() {
1411 let mut deployment = Deployment::new();
1412
1413 let mut flow = FlowBuilder::new();
1414 let node = flow.process::<()>();
1415 let external = flow.external::<()>();
1416
1417 let node_tick = node.tick();
1418 let top_level_none = node_tick.singleton(q!(123)).latest().filter(q!(|_| false));
1419 let into_singleton = top_level_none.into_singleton();
1420
1421 let tick_driver = node.spin();
1422
1423 let counts = into_singleton
1424 .snapshot(&node_tick, nondet!(/** test */))
1425 .into_stream()
1426 .count()
1427 .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1428 .map(q!(|(c, _)| c))
1429 .all_ticks()
1430 .send_bincode_external(&external);
1431
1432 let nodes = flow
1433 .with_process(&node, deployment.Localhost())
1434 .with_external(&external, deployment.Localhost())
1435 .deploy(&mut deployment);
1436
1437 deployment.deploy().await.unwrap();
1438
1439 let mut external_out = nodes.connect(counts).await;
1440
1441 deployment.start().await.unwrap();
1442
1443 assert_eq!(external_out.next().await.unwrap(), 1);
1444 assert_eq!(external_out.next().await.unwrap(), 1);
1445 assert_eq!(external_out.next().await.unwrap(), 1);
1446 }
1447
1448 #[cfg(feature = "sim")]
1449 #[test]
1450 fn top_level_optional_some_into_stream_no_replay() {
1451 let mut flow = FlowBuilder::new();
1452 let node = flow.process::<()>();
1453
1454 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1455 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1456 let filtered_some = folded.filter(q!(|_| true));
1457
1458 let out_recv = filtered_some.into_stream().sim_output();
1459
1460 flow.sim().exhaustive(async || {
1461 out_recv.assert_yields_only([10]).await;
1462 });
1463 }
1464
1465 #[cfg(feature = "sim")]
1466 #[test]
1467 fn top_level_optional_none_into_stream_no_replay() {
1468 let mut flow = FlowBuilder::new();
1469 let node = flow.process::<()>();
1470
1471 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1472 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1473 let filtered_none = folded.filter(q!(|_| false));
1474
1475 let out_recv = filtered_none.into_stream().sim_output();
1476
1477 flow.sim().exhaustive(async || {
1478 out_recv.assert_yields_only([] as [i32; 0]).await;
1479 });
1480 }
1481}