Skip to main content

hydro_lang/location/
tick.rs

1use sealed::sealed;
2use stageleft::{QuotedWithContext, q};
3
4#[cfg(stageleft_runtime)]
5use super::dynamic::DynLocation;
6use super::{Cluster, Location, LocationId, Process};
7use crate::compile::builder::{ClockId, FlowState};
8use crate::compile::ir::{HydroNode, HydroSource};
9#[cfg(stageleft_runtime)]
10use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial};
11use crate::forward_handle::{TickCycle, TickCycleHandle};
12use crate::live_collections::boundedness::{Bounded, Unbounded};
13use crate::live_collections::optional::Optional;
14use crate::live_collections::singleton::Singleton;
15use crate::live_collections::stream::{ExactlyOnce, Stream, TotalOrder};
16use crate::nondet::nondet;
17
18#[sealed]
19pub trait NoTick {}
20#[sealed]
21impl<T> NoTick for Process<'_, T> {}
22#[sealed]
23impl<T> NoTick for Cluster<'_, T> {}
24
25#[sealed]
26pub trait NoAtomic {}
27#[sealed]
28impl<T> NoAtomic for Process<'_, T> {}
29#[sealed]
30impl<T> NoAtomic for Cluster<'_, T> {}
31#[sealed]
32impl<'a, L> NoAtomic for Tick<L> where L: Location<'a> {}
33
34#[derive(Clone)]
35pub struct Atomic<Loc> {
36    pub(crate) tick: Tick<Loc>,
37}
38
39impl<L: DynLocation> DynLocation for Atomic<L> {
40    fn id(&self) -> LocationId {
41        LocationId::Atomic(Box::new(self.tick.id()))
42    }
43
44    fn flow_state(&self) -> &FlowState {
45        self.tick.flow_state()
46    }
47
48    fn is_top_level() -> bool {
49        L::is_top_level()
50    }
51
52    fn multiversioned(&self) -> bool {
53        self.tick.multiversioned()
54    }
55}
56
57impl<'a, L> Location<'a> for Atomic<L>
58where
59    L: Location<'a>,
60{
61    type Root = L::Root;
62
63    fn root(&self) -> Self::Root {
64        self.tick.root()
65    }
66}
67
68#[sealed]
69impl<L> NoTick for Atomic<L> {}
70
71pub trait DeferTick {
72    fn defer_tick(self) -> Self;
73}
74
75/// Marks the stream as being inside the single global clock domain.
76#[derive(Clone)]
77pub struct Tick<L> {
78    pub(crate) id: ClockId,
79    /// Location.
80    pub(crate) l: L,
81}
82
83impl<L: DynLocation> DynLocation for Tick<L> {
84    fn id(&self) -> LocationId {
85        LocationId::Tick(self.id, Box::new(self.l.id()))
86    }
87
88    fn flow_state(&self) -> &FlowState {
89        self.l.flow_state()
90    }
91
92    fn is_top_level() -> bool {
93        false
94    }
95
96    fn multiversioned(&self) -> bool {
97        self.l.multiversioned()
98    }
99}
100
101impl<'a, L> Location<'a> for Tick<L>
102where
103    L: Location<'a>,
104{
105    type Root = L::Root;
106
107    fn root(&self) -> Self::Root {
108        self.l.root()
109    }
110}
111
112impl<'a, L> Tick<L>
113where
114    L: Location<'a>,
115{
116    pub fn outer(&self) -> &L {
117        &self.l
118    }
119
120    pub fn spin_batch(
121        &self,
122        batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
123    ) -> Stream<(), Self, Bounded, TotalOrder, ExactlyOnce>
124    where
125        L: NoTick,
126    {
127        let out = self
128            .l
129            .spin()
130            .flat_map_ordered(q!(move |_| 0..batch_size))
131            .map(q!(|_| ()));
132
133        out.batch(self, nondet!(/** at runtime, `spin` produces a single value per tick, so each batch is guaranteed to be the same size. */))
134    }
135
136    pub fn singleton<T>(
137        &self,
138        e: impl QuotedWithContext<'a, T, Tick<L>>,
139    ) -> Singleton<T, Self, Bounded>
140    where
141        T: Clone,
142    {
143        let e = e.splice_untyped_ctx(self);
144
145        Singleton::new(
146            self.clone(),
147            HydroNode::SingletonSource {
148                value: e.into(),
149                metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
150            },
151        )
152    }
153
154    /// Creates an [`Optional`] which has a null value on every tick.
155    ///
156    /// # Example
157    /// ```rust
158    /// # #[cfg(feature = "deploy")] {
159    /// # use hydro_lang::prelude::*;
160    /// # use futures::StreamExt;
161    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
162    /// let tick = process.tick();
163    /// let optional = tick.none::<i32>();
164    /// optional.unwrap_or(tick.singleton(q!(123)))
165    /// # .all_ticks()
166    /// # }, |mut stream| async move {
167    /// // 123
168    /// # assert_eq!(stream.next().await.unwrap(), 123);
169    /// # }));
170    /// # }
171    /// ```
172    pub fn none<T>(&self) -> Optional<T, Self, Bounded> {
173        let e = q!([]);
174        let e = QuotedWithContext::<'a, [(); 0], Self>::splice_typed_ctx(e, self);
175
176        let unit_optional: Optional<(), Self, Bounded> = Optional::new(
177            self.clone(),
178            HydroNode::Source {
179                source: HydroSource::Iter(e.into()),
180                metadata: self.new_node_metadata(Optional::<(), Self, Bounded>::collection_kind()),
181            },
182        );
183
184        unit_optional.map(q!(|_| unreachable!())) // always empty
185    }
186
187    /// Creates an [`Optional`] which will have the provided static value on the first tick, and be
188    /// null on all subsequent ticks.
189    ///
190    /// This is useful for bootstrapping stateful computations which need an initial value.
191    ///
192    /// # Example
193    /// ```rust
194    /// # #[cfg(feature = "deploy")] {
195    /// # use hydro_lang::prelude::*;
196    /// # use futures::StreamExt;
197    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
198    /// let tick = process.tick();
199    /// // ticks are lazy by default, forces the second tick to run
200    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
201    /// let optional = tick.optional_first_tick(q!(5));
202    /// optional.unwrap_or(tick.singleton(q!(123))).all_ticks()
203    /// # }, |mut stream| async move {
204    /// // 5, 123, 123, 123, ...
205    /// # assert_eq!(stream.next().await.unwrap(), 5);
206    /// # assert_eq!(stream.next().await.unwrap(), 123);
207    /// # assert_eq!(stream.next().await.unwrap(), 123);
208    /// # assert_eq!(stream.next().await.unwrap(), 123);
209    /// # }));
210    /// # }
211    /// ```
212    pub fn optional_first_tick<T: Clone>(
213        &self,
214        e: impl QuotedWithContext<'a, T, Tick<L>>,
215    ) -> Optional<T, Self, Bounded> {
216        let e_arr = q!([e]);
217        let e = e_arr.splice_untyped_ctx(self);
218
219        Optional::new(
220            self.clone(),
221            HydroNode::Batch {
222                inner: Box::new(HydroNode::Source {
223                    source: HydroSource::Iter(e.into()),
224                    metadata: self
225                        .outer()
226                        .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
227                }),
228                metadata: self.new_node_metadata(Optional::<T, Self, Bounded>::collection_kind()),
229            },
230        )
231    }
232
233    #[expect(
234        private_bounds,
235        reason = "only Hydro collections can implement ReceiverComplete"
236    )]
237    pub fn cycle<S>(&self) -> (TickCycleHandle<'a, S>, S)
238    where
239        S: CycleCollection<'a, TickCycle, Location = Self> + DeferTick,
240        L: NoTick,
241    {
242        let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
243        (
244            TickCycleHandle::new(cycle_id, Location::id(self)),
245            S::create_source(cycle_id, self.clone()).defer_tick(),
246        )
247    }
248
249    #[expect(
250        private_bounds,
251        reason = "only Hydro collections can implement ReceiverComplete"
252    )]
253    pub fn cycle_with_initial<S>(&self, initial: S) -> (TickCycleHandle<'a, S>, S)
254    where
255        S: CycleCollectionWithInitial<'a, TickCycle, Location = Self>,
256    {
257        let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
258        (
259            TickCycleHandle::new(cycle_id, Location::id(self)),
260            // no need to defer_tick, create_source_with_initial does it for us
261            S::create_source_with_initial(cycle_id, initial, self.clone()),
262        )
263    }
264}
265
266#[cfg(test)]
267mod tests {
268    #[cfg(feature = "sim")]
269    use stageleft::q;
270
271    #[cfg(feature = "sim")]
272    use crate::live_collections::sliced::sliced;
273    #[cfg(feature = "sim")]
274    use crate::location::Location;
275    #[cfg(feature = "sim")]
276    use crate::nondet::nondet;
277    #[cfg(feature = "sim")]
278    use crate::prelude::FlowBuilder;
279
280    #[cfg(feature = "sim")]
281    #[test]
282    fn sim_atomic_stream() {
283        let mut flow = FlowBuilder::new();
284        let node = flow.process::<()>();
285
286        let (write_send, write_req) = node.sim_input();
287        let (read_send, read_req) = node.sim_input::<(), _, _>();
288
289        let tick = node.tick();
290        let atomic_write = write_req.atomic(&tick);
291        let current_state = atomic_write.clone().fold(
292            q!(|| 0),
293            q!(|state: &mut i32, v: i32| {
294                *state += v;
295            }),
296        );
297
298        let write_ack_recv = atomic_write.end_atomic().sim_output();
299        let read_response_recv = sliced! {
300            let batch_of_req = use(read_req, nondet!(/** test */));
301            let latest_singleton = use::atomic(current_state, nondet!(/** test */));
302            batch_of_req.cross_singleton(latest_singleton)
303        }
304        .sim_output();
305
306        let sim_compiled = flow.sim().compiled();
307        let instances = sim_compiled.exhaustive(async || {
308            write_send.send(1);
309            write_ack_recv.assert_yields([1]).await;
310            read_send.send(());
311            assert!(read_response_recv.next().await.is_some_and(|(_, v)| v >= 1));
312        });
313
314        assert_eq!(instances, 1);
315
316        let instances_read_before_write = sim_compiled.exhaustive(async || {
317            write_send.send(1);
318            read_send.send(());
319            write_ack_recv.assert_yields([1]).await;
320            let _ = read_response_recv.next().await;
321        });
322
323        assert_eq!(instances_read_before_write, 3); // read before write, write before read, both in same tick
324    }
325
326    #[cfg(feature = "sim")]
327    #[test]
328    #[should_panic]
329    fn sim_non_atomic_stream() {
330        // shows that atomic is necessary
331        let mut flow = FlowBuilder::new();
332        let node = flow.process::<()>();
333
334        let (write_send, write_req) = node.sim_input();
335        let (read_send, read_req) = node.sim_input::<(), _, _>();
336
337        let current_state = write_req.clone().fold(
338            q!(|| 0),
339            q!(|state: &mut i32, v: i32| {
340                *state += v;
341            }),
342        );
343
344        let write_ack_recv = write_req.sim_output();
345
346        let read_response_recv = sliced! {
347            let batch_of_req = use(read_req, nondet!(/** test */));
348            let latest_singleton = use(current_state, nondet!(/** test */));
349            batch_of_req.cross_singleton(latest_singleton)
350        }
351        .sim_output();
352
353        flow.sim().exhaustive(async || {
354            write_send.send(1);
355            write_ack_recv.assert_yields([1]).await;
356            read_send.send(());
357
358            if let Some((_, v)) = read_response_recv.next().await {
359                assert_eq!(v, 1);
360            }
361        });
362    }
363}