Skip to main content

hydro_lang/compile/
deploy_provider.rs

1use std::io::Error;
2use std::pin::Pin;
3
4use bytes::{Bytes, BytesMut};
5use dfir_lang::graph::DfirGraph;
6use futures::{Sink, Stream};
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9use stageleft::QuotedWithContext;
10
11use crate::compile::builder::ExternalPortId;
12use crate::location::dynamic::LocationId;
13use crate::location::member_id::TaglessMemberId;
14use crate::location::{LocationKey, MembershipEvent, NetworkHint};
15
16pub trait Deploy<'a> {
17    type Meta: Default;
18    type InstantiateEnv;
19
20    type Process: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
21    type Cluster: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
22    type External: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv>
23        + RegisterPort<'a, Self>;
24
25    fn o2o_sink_source(
26        env: &mut Self::InstantiateEnv,
27        p1: &Self::Process,
28        p1_port: &<Self::Process as Node>::Port,
29        p2: &Self::Process,
30        p2_port: &<Self::Process as Node>::Port,
31        name: Option<&str>,
32    ) -> (syn::Expr, syn::Expr);
33    fn o2o_connect(
34        p1: &Self::Process,
35        p1_port: &<Self::Process as Node>::Port,
36        p2: &Self::Process,
37        p2_port: &<Self::Process as Node>::Port,
38    ) -> Box<dyn FnOnce()>;
39
40    fn o2m_sink_source(
41        p1: &Self::Process,
42        p1_port: &<Self::Process as Node>::Port,
43        c2: &Self::Cluster,
44        c2_port: &<Self::Cluster as Node>::Port,
45    ) -> (syn::Expr, syn::Expr);
46    fn o2m_connect(
47        p1: &Self::Process,
48        p1_port: &<Self::Process as Node>::Port,
49        c2: &Self::Cluster,
50        c2_port: &<Self::Cluster as Node>::Port,
51    ) -> Box<dyn FnOnce()>;
52
53    fn m2o_sink_source(
54        c1: &Self::Cluster,
55        c1_port: &<Self::Cluster as Node>::Port,
56        p2: &Self::Process,
57        p2_port: &<Self::Process as Node>::Port,
58    ) -> (syn::Expr, syn::Expr);
59    fn m2o_connect(
60        c1: &Self::Cluster,
61        c1_port: &<Self::Cluster as Node>::Port,
62        p2: &Self::Process,
63        p2_port: &<Self::Process as Node>::Port,
64    ) -> Box<dyn FnOnce()>;
65
66    fn m2m_sink_source(
67        c1: &Self::Cluster,
68        c1_port: &<Self::Cluster as Node>::Port,
69        c2: &Self::Cluster,
70        c2_port: &<Self::Cluster as Node>::Port,
71    ) -> (syn::Expr, syn::Expr);
72    fn m2m_connect(
73        c1: &Self::Cluster,
74        c1_port: &<Self::Cluster as Node>::Port,
75        c2: &Self::Cluster,
76        c2_port: &<Self::Cluster as Node>::Port,
77    ) -> Box<dyn FnOnce()>;
78
79    fn e2o_many_source(
80        extra_stmts: &mut Vec<syn::Stmt>,
81        p2: &Self::Process,
82        p2_port: &<Self::Process as Node>::Port,
83        codec_type: &syn::Type,
84        shared_handle: String,
85    ) -> syn::Expr;
86    fn e2o_many_sink(shared_handle: String) -> syn::Expr;
87
88    fn e2o_source(
89        extra_stmts: &mut Vec<syn::Stmt>,
90        p1: &Self::External,
91        p1_port: &<Self::External as Node>::Port,
92        p2: &Self::Process,
93        p2_port: &<Self::Process as Node>::Port,
94        codec_type: &syn::Type,
95        shared_handle: String,
96    ) -> syn::Expr;
97    fn e2o_connect(
98        p1: &Self::External,
99        p1_port: &<Self::External as Node>::Port,
100        p2: &Self::Process,
101        p2_port: &<Self::Process as Node>::Port,
102        many: bool,
103        server_hint: NetworkHint,
104    ) -> Box<dyn FnOnce()>;
105
106    fn o2e_sink(
107        p1: &Self::Process,
108        p1_port: &<Self::Process as Node>::Port,
109        p2: &Self::External,
110        p2_port: &<Self::External as Node>::Port,
111        shared_handle: String,
112    ) -> syn::Expr;
113
114    fn cluster_ids(
115        of_cluster: LocationKey,
116    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a;
117
118    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a;
119
120    fn cluster_membership_stream(
121        location_id: &LocationId,
122    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>;
123
124    /// Registers an embedded input for the given ident and element type.
125    ///
126    /// Only meaningful for the embedded deployment backend. The default
127    /// implementation panics.
128    fn register_embedded_input(
129        _env: &mut Self::InstantiateEnv,
130        _location_key: LocationKey,
131        _ident: &syn::Ident,
132        _element_type: &syn::Type,
133    ) {
134        panic!("register_embedded_input is only supported by EmbeddedDeploy");
135    }
136
137    /// Registers an embedded output for the given ident and element type.
138    ///
139    /// Only meaningful for the embedded deployment backend. The default
140    /// implementation panics.
141    fn register_embedded_output(
142        _env: &mut Self::InstantiateEnv,
143        _location_key: LocationKey,
144        _ident: &syn::Ident,
145        _element_type: &syn::Type,
146    ) {
147        panic!("register_embedded_output is only supported by EmbeddedDeploy");
148    }
149}
150
151pub trait ProcessSpec<'a, D>
152where
153    D: Deploy<'a> + ?Sized,
154{
155    fn build(self, location_key: LocationKey, name_hint: &str) -> D::Process;
156}
157
158pub trait IntoProcessSpec<'a, D>
159where
160    D: Deploy<'a> + ?Sized,
161{
162    type ProcessSpec: ProcessSpec<'a, D>;
163    fn into_process_spec(self) -> Self::ProcessSpec;
164}
165
166impl<'a, D, T> IntoProcessSpec<'a, D> for T
167where
168    D: Deploy<'a> + ?Sized,
169    T: ProcessSpec<'a, D>,
170{
171    type ProcessSpec = T;
172    fn into_process_spec(self) -> Self::ProcessSpec {
173        self
174    }
175}
176
177pub trait ClusterSpec<'a, D>
178where
179    D: Deploy<'a> + ?Sized,
180{
181    fn build(self, location_key: LocationKey, name_hint: &str) -> D::Cluster;
182}
183
184pub trait ExternalSpec<'a, D>
185where
186    D: Deploy<'a> + ?Sized,
187{
188    fn build(self, location_key: LocationKey, name_hint: &str) -> D::External;
189}
190
191pub trait Node {
192    /// A logical communication endpoint for this node.
193    ///
194    /// Implementors are free to choose the concrete representation (for example,
195    /// a handle or identifier), but it must be `Clone` so that a single logical
196    /// port can be duplicated and passed to multiple consumers. New ports are
197    /// allocated via [`Self::next_port`].
198    type Port: Clone;
199    type Meta: Default;
200    type InstantiateEnv;
201
202    /// Allocates and returns a new port.
203    fn next_port(&self) -> Self::Port;
204
205    fn update_meta(&self, meta: &Self::Meta);
206
207    fn instantiate(
208        &self,
209        env: &mut Self::InstantiateEnv,
210        meta: &mut Self::Meta,
211        graph: DfirGraph,
212        extra_stmts: &[syn::Stmt],
213        sidecars: &[syn::Expr],
214    );
215}
216
217pub type DynSourceSink<Out, In, InErr> = (
218    Pin<Box<dyn Stream<Item = Out>>>,
219    Pin<Box<dyn Sink<In, Error = InErr>>>,
220);
221
222pub trait RegisterPort<'a, D>: Node + Clone
223where
224    D: Deploy<'a> + ?Sized,
225{
226    fn register(&self, external_port_id: ExternalPortId, port: Self::Port);
227
228    fn as_bytes_bidi(
229        &self,
230        external_port_id: ExternalPortId,
231    ) -> impl Future<Output = DynSourceSink<Result<BytesMut, Error>, Bytes, Error>> + 'a;
232
233    fn as_bincode_bidi<InT, OutT>(
234        &self,
235        external_port_id: ExternalPortId,
236    ) -> impl Future<Output = DynSourceSink<OutT, InT, Error>> + 'a
237    where
238        InT: Serialize + 'static,
239        OutT: DeserializeOwned + 'static;
240
241    fn as_bincode_sink<T>(
242        &self,
243        external_port_id: ExternalPortId,
244    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
245    where
246        T: Serialize + 'static;
247
248    fn as_bincode_source<T>(
249        &self,
250        external_port_id: ExternalPortId,
251    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
252    where
253        T: DeserializeOwned + 'static;
254}