1use std::cell::RefCell;
4use std::collections::HashMap;
5use std::pin::Pin;
6use std::rc::Rc;
7
8use bollard::Docker;
9use bollard::models::{ContainerCreateBody, EndpointSettings, HostConfig, NetworkCreateRequest};
10use bollard::query_parameters::{
11 BuildImageOptions, CreateContainerOptions, InspectContainerOptions, KillContainerOptions,
12 RemoveContainerOptions, StartContainerOptions,
13};
14use bollard::secret::NetworkingConfig;
15use bytes::Bytes;
16use dfir_lang::graph::DfirGraph;
17use futures::{Sink, SinkExt, Stream, StreamExt};
18use http_body_util::Full;
19use hydro_deploy::rust_crate::build::{BuildError, build_crate_memoized};
20use hydro_deploy::{LinuxCompileType, RustCrate};
21use nanoid::nanoid;
22use proc_macro2::Span;
23use sinktools::lazy::LazySink;
24use stageleft::QuotedWithContext;
25use syn::parse_quote;
26use tar::{Builder, Header};
27use tokio::net::TcpStream;
28use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
29use tracing::{Instrument, instrument, trace, warn};
30
31use super::deploy_runtime_containerized::*;
32use crate::compile::builder::ExternalPortId;
33use crate::compile::deploy::DeployResult;
34use crate::compile::deploy_provider::{
35 ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort,
36};
37use crate::compile::trybuild::generate::{LinkingMode, create_graph_trybuild};
38use crate::location::dynamic::LocationId;
39use crate::location::member_id::TaglessMemberId;
40use crate::location::{LocationKey, MembershipEvent, NetworkHint};
41
42#[derive(Clone, Debug)]
44pub struct DockerNetwork {
45 name: String,
46}
47
48impl DockerNetwork {
49 pub fn new(name: String) -> Self {
51 Self {
52 name: format!("{name}-{}", nanoid::nanoid!(6, &CONTAINER_ALPHABET)),
53 }
54 }
55}
56
57#[derive(Clone)]
59pub struct DockerDeployProcess {
60 key: LocationKey,
61 name: String,
62 next_port: Rc<RefCell<u16>>,
63 rust_crate: Rc<RefCell<Option<RustCrate>>>,
64
65 exposed_ports: Rc<RefCell<Vec<u16>>>,
66
67 docker_container_name: Rc<RefCell<Option<String>>>,
68
69 compilation_options: Option<String>,
70
71 config: Vec<String>,
72
73 network: DockerNetwork,
74}
75
76impl Node for DockerDeployProcess {
77 type Port = u16;
78 type Meta = ();
79 type InstantiateEnv = DockerDeploy;
80
81 #[instrument(level = "trace", skip_all, ret, fields(key = %self.key, name = self.name))]
82 fn next_port(&self) -> Self::Port {
83 let port = {
84 let mut borrow = self.next_port.borrow_mut();
85 let port = *borrow;
86 *borrow += 1;
87 port
88 };
89
90 port
91 }
92
93 #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name))]
94 fn update_meta(&self, _meta: &Self::Meta) {}
95
96 #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
97 fn instantiate(
98 &self,
99 _env: &mut Self::InstantiateEnv,
100 meta: &mut Self::Meta,
101 graph: DfirGraph,
102 extra_stmts: &[syn::Stmt],
103 sidecars: &[syn::Expr],
104 ) {
105 let (bin_name, config) = create_graph_trybuild(
106 graph,
107 extra_stmts,
108 sidecars,
109 Some(&self.name),
110 crate::compile::trybuild::generate::DeployMode::Containerized,
111 LinkingMode::Static,
112 );
113
114 let mut ret = RustCrate::new(&config.project_dir, &config.project_dir)
115 .target_dir(config.target_dir)
116 .example(bin_name)
117 .no_default_features();
118
119 ret = ret.display_name("test_display_name");
120
121 ret = ret.features(vec!["hydro___feature_docker_runtime".to_owned()]);
122
123 if let Some(features) = config.features {
124 ret = ret.features(features);
125 }
126
127 ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
128 ret = ret.config("build.incremental = false");
129
130 *self.rust_crate.borrow_mut() = Some(ret);
131 }
132}
133
134#[derive(Clone)]
136pub struct DockerDeployCluster {
137 key: LocationKey,
138 name: String,
139 next_port: Rc<RefCell<u16>>,
140 rust_crate: Rc<RefCell<Option<RustCrate>>>,
141
142 docker_container_name: Rc<RefCell<Vec<String>>>,
143
144 compilation_options: Option<String>,
145
146 config: Vec<String>,
147
148 count: usize,
149}
150
151impl Node for DockerDeployCluster {
152 type Port = u16;
153 type Meta = ();
154 type InstantiateEnv = DockerDeploy;
155
156 #[instrument(level = "trace", skip_all, ret, fields(key = %self.key, name = self.name))]
157 fn next_port(&self) -> Self::Port {
158 let port = {
159 let mut borrow = self.next_port.borrow_mut();
160 let port = *borrow;
161 *borrow += 1;
162 port
163 };
164
165 port
166 }
167
168 #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name))]
169 fn update_meta(&self, _meta: &Self::Meta) {}
170
171 #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name, extra_stmts = extra_stmts.len()))]
172 fn instantiate(
173 &self,
174 _env: &mut Self::InstantiateEnv,
175 _meta: &mut Self::Meta,
176 graph: DfirGraph,
177 extra_stmts: &[syn::Stmt],
178 sidecars: &[syn::Expr],
179 ) {
180 let (bin_name, config) = create_graph_trybuild(
181 graph,
182 extra_stmts,
183 sidecars,
184 Some(&self.name),
185 crate::compile::trybuild::generate::DeployMode::Containerized,
186 LinkingMode::Static,
187 );
188
189 let mut ret = RustCrate::new(&config.project_dir, &config.project_dir)
190 .target_dir(config.target_dir)
191 .example(bin_name)
192 .no_default_features();
193
194 ret = ret.display_name("test_display_name");
195
196 ret = ret.features(vec!["hydro___feature_docker_runtime".to_owned()]);
197
198 if let Some(features) = config.features {
199 ret = ret.features(features);
200 }
201
202 ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
203 ret = ret.config("build.incremental = false");
204
205 *self.rust_crate.borrow_mut() = Some(ret);
206 }
207}
208
209#[derive(Clone, Debug)]
211pub struct DockerDeployExternal {
212 name: String,
213 next_port: Rc<RefCell<u16>>,
214
215 ports: Rc<RefCell<HashMap<ExternalPortId, u16>>>,
216
217 #[expect(clippy::type_complexity, reason = "internal code")]
218 connection_info: Rc<RefCell<HashMap<u16, (Rc<RefCell<Option<String>>>, u16, DockerNetwork)>>>,
219}
220
221impl Node for DockerDeployExternal {
222 type Port = u16;
223 type Meta = ();
224 type InstantiateEnv = DockerDeploy;
225
226 #[instrument(level = "trace", skip_all, ret, fields(name = self.name))]
227 fn next_port(&self) -> Self::Port {
228 let port = {
229 let mut borrow = self.next_port.borrow_mut();
230 let port = *borrow;
231 *borrow += 1;
232 port
233 };
234
235 port
236 }
237
238 #[instrument(level = "trace", skip_all, fields(name = self.name))]
239 fn update_meta(&self, _meta: &Self::Meta) {}
240
241 #[instrument(level = "trace", skip_all, fields(name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
242 fn instantiate(
243 &self,
244 _env: &mut Self::InstantiateEnv,
245 meta: &mut Self::Meta,
246 graph: DfirGraph,
247 extra_stmts: &[syn::Stmt],
248 sidecars: &[syn::Expr],
249 ) {
250 trace!(name: "surface", surface = graph.surface_syntax_string());
251 }
252}
253
254type DynSourceSink<Out, In, InErr> = (
255 Pin<Box<dyn Stream<Item = Out>>>,
256 Pin<Box<dyn Sink<In, Error = InErr>>>,
257);
258
259impl<'a> RegisterPort<'a, DockerDeploy> for DockerDeployExternal {
260 #[instrument(level = "trace", skip_all, fields(name = self.name, %external_port_id, %port))]
261 fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {
262 self.ports.borrow_mut().insert(external_port_id, port);
263 }
264
265 fn as_bytes_bidi(
266 &self,
267 external_port_id: ExternalPortId,
268 ) -> impl Future<
269 Output = DynSourceSink<Result<bytes::BytesMut, std::io::Error>, Bytes, std::io::Error>,
270 > + 'a {
271 let guard =
272 tracing::trace_span!("as_bytes_bidi", name = %self.name, %external_port_id).entered();
273
274 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
275 let (docker_container_name, remote_port, _) = self
276 .connection_info
277 .borrow()
278 .get(&local_port)
279 .unwrap()
280 .clone();
281
282 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
283
284 async move {
285 let local_port =
286 find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
287 let remote_ip_address = "localhost";
288
289 trace!(name: "as_bytes_bidi_connecting", to = %remote_ip_address, to_port = %local_port);
290
291 let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
292 .await
293 .unwrap();
294
295 trace!(name: "as_bytes_bidi_connected", to = %remote_ip_address, to_port = %local_port);
296
297 let (rx, tx) = stream.into_split();
298
299 let source = Box::pin(
300 FramedRead::new(rx, LengthDelimitedCodec::new()),
301 ) as Pin<Box<dyn Stream<Item = Result<bytes::BytesMut, std::io::Error>>>>;
302
303 let sink = Box::pin(FramedWrite::new(tx, LengthDelimitedCodec::new()))
304 as Pin<Box<dyn Sink<Bytes, Error = std::io::Error>>>;
305
306 (source, sink)
307 }
308 .instrument(guard.exit())
309 }
310
311 fn as_bincode_bidi<InT, OutT>(
312 &self,
313 external_port_id: ExternalPortId,
314 ) -> impl Future<Output = DynSourceSink<OutT, InT, std::io::Error>> + 'a
315 where
316 InT: serde::Serialize + 'static,
317 OutT: serde::de::DeserializeOwned + 'static,
318 {
319 let guard =
320 tracing::trace_span!("as_bincode_bidi", name = %self.name, %external_port_id).entered();
321
322 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
323 let (docker_container_name, remote_port, _) = self
324 .connection_info
325 .borrow()
326 .get(&local_port)
327 .unwrap()
328 .clone();
329
330 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
331
332 async move {
333 let local_port =
334 find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
335 let remote_ip_address = "localhost";
336
337 trace!(name: "as_bincode_bidi_connecting", to = %remote_ip_address, to_port = %local_port);
338
339 let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
340 .await
341 .unwrap();
342
343 trace!(name: "as_bincode_bidi_connected", to = %remote_ip_address, to_port = %local_port);
344
345 let (rx, tx) = stream.into_split();
346
347 let source = Box::pin(
348 FramedRead::new(rx, LengthDelimitedCodec::new())
349 .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
350 ) as Pin<Box<dyn Stream<Item = OutT>>>;
351
352 let sink = Box::pin(
353 FramedWrite::new(tx, LengthDelimitedCodec::new()).with(move |v: InT| async move {
354 Ok::<_, std::io::Error>(Bytes::from(bincode::serialize(&v).unwrap()))
355 }),
356 ) as Pin<Box<dyn Sink<InT, Error = std::io::Error>>>;
357
358 (source, sink)
359 }
360 .instrument(guard.exit())
361 }
362
363 fn as_bincode_sink<T>(
364 &self,
365 external_port_id: ExternalPortId,
366 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a
367 where
368 T: serde::Serialize + 'static,
369 {
370 let guard =
371 tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
372
373 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
374 let (docker_container_name, remote_port, _) = self
375 .connection_info
376 .borrow()
377 .get(&local_port)
378 .unwrap()
379 .clone();
380
381 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
382
383 async move {
384 let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
385 let remote_ip_address = "localhost";
386
387 Box::pin(
388 LazySink::new(move || {
389 Box::pin(async move {
390 trace!(name: "as_bincode_sink_connecting", to = %remote_ip_address, to_port = %local_port);
391
392 let stream =
393 TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
394 .await?;
395
396 trace!(name: "as_bincode_sink_connected", to = %remote_ip_address, to_port = %local_port);
397
398 Result::<_, std::io::Error>::Ok(FramedWrite::new(
399 stream,
400 LengthDelimitedCodec::new(),
401 ))
402 })
403 })
404 .with(move |v| async move {
405 Ok(Bytes::from(bincode::serialize(&v).unwrap()))
406 }),
407 ) as Pin<Box<dyn Sink<T, Error = std::io::Error>>>
408 }
409 .instrument(guard.exit())
410 }
411
412 fn as_bincode_source<T>(
413 &self,
414 external_port_id: ExternalPortId,
415 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
416 where
417 T: serde::de::DeserializeOwned + 'static,
418 {
419 let guard =
420 tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
421
422 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
423 let (docker_container_name, remote_port, _) = self
424 .connection_info
425 .borrow()
426 .get(&local_port)
427 .unwrap()
428 .clone();
429
430 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
431
432 async move {
433
434 let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
435 let remote_ip_address = "localhost";
436
437 trace!(name: "as_bincode_source_connecting", to = %remote_ip_address, to_port = %local_port);
438
439 let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
440 .await
441 .unwrap();
442
443 trace!(name: "as_bincode_source_connected", to = %remote_ip_address, to_port = %local_port);
444
445 Box::pin(
446 FramedRead::new(stream, LengthDelimitedCodec::new())
447 .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
448 ) as Pin<Box<dyn Stream<Item = T>>>
449 }
450 .instrument(guard.exit())
451 }
452}
453
454#[instrument(level = "trace", skip_all, fields(%docker_container_name, %destination_port))]
455async fn find_dynamically_allocated_docker_port(
456 docker_container_name: &str,
457 destination_port: u16,
458) -> u16 {
459 let docker = Docker::connect_with_local_defaults().unwrap();
460
461 let container_info = docker
462 .inspect_container(docker_container_name, None::<InspectContainerOptions>)
463 .await
464 .unwrap();
465
466 trace!(name: "port struct", container_info = ?container_info.network_settings.as_ref().unwrap().ports.as_ref().unwrap());
467
468 let remote_port = container_info
470 .network_settings
471 .as_ref()
472 .unwrap()
473 .ports
474 .as_ref()
475 .unwrap()
476 .get(&format!("{destination_port}/tcp"))
477 .unwrap()
478 .as_ref()
479 .unwrap()
480 .iter()
481 .find(|v| v.host_ip == Some("0.0.0.0".to_owned()))
482 .unwrap()
483 .host_port
484 .as_ref()
485 .unwrap()
486 .parse()
487 .unwrap();
488
489 remote_port
490}
491
492pub struct DockerDeploy {
494 docker_processes: Vec<DockerDeployProcessSpec>,
495 docker_clusters: Vec<DockerDeployClusterSpec>,
496 network: DockerNetwork,
497 deployment_instance: String,
498}
499
500#[instrument(level = "trace", skip_all, fields(%image_name, %container_name, %network_name, %deployment_instance))]
501async fn create_and_start_container(
502 docker: &Docker,
503 container_name: &str,
504 image_name: &str,
505 network_name: &str,
506 deployment_instance: &str,
507) -> Result<(), anyhow::Error> {
508 let config = ContainerCreateBody {
509 image: Some(image_name.to_owned()),
510 hostname: Some(container_name.to_owned()),
511 host_config: Some(HostConfig {
512 binds: Some(vec!["/var/run/docker.sock:/var/run/docker.sock".to_owned()]),
513 publish_all_ports: Some(true),
514 port_bindings: Some(HashMap::new()), ..Default::default()
516 }),
517 env: Some(vec![
518 format!("CONTAINER_NAME={container_name}"),
519 format!("DEPLOYMENT_INSTANCE={deployment_instance}"),
520 format!("RUST_LOG=trace"),
521 ]),
522 networking_config: Some(NetworkingConfig {
523 endpoints_config: Some(HashMap::from([(
524 network_name.to_owned(),
525 EndpointSettings {
526 ..Default::default()
527 },
528 )])),
529 }),
530 tty: Some(true),
531 ..Default::default()
532 };
533
534 let options = CreateContainerOptions {
535 name: Some(container_name.to_owned()),
536 ..Default::default()
537 };
538
539 tracing::error!("Config: {}", serde_json::to_string_pretty(&config).unwrap());
540 docker.create_container(Some(options), config).await?;
541 docker
542 .start_container(container_name, None::<StartContainerOptions>)
543 .await?;
544
545 Ok(())
546}
547
548#[instrument(level = "trace", skip_all, fields(%image_name))]
549async fn build_and_create_image(
550 rust_crate: &Rc<RefCell<Option<RustCrate>>>,
551 compilation_options: Option<&str>,
552 config: &[String],
553 exposed_ports: &[u16],
554 image_name: &str,
555) -> Result<(), anyhow::Error> {
556 let mut rust_crate = rust_crate
557 .borrow_mut()
558 .take()
559 .unwrap()
560 .rustflags(compilation_options.unwrap_or_default());
561
562 for cfg in config {
563 rust_crate = rust_crate.config(cfg);
564 }
565
566 let build_output = match build_crate_memoized(
567 rust_crate.get_build_params(hydro_deploy::HostTargetType::Linux(LinuxCompileType::Musl)),
568 )
569 .await
570 {
571 Ok(build_output) => build_output,
572 Err(BuildError::FailedToBuildCrate {
573 exit_status,
574 diagnostics,
575 text_lines,
576 stderr_lines,
577 }) => {
578 let diagnostics = diagnostics
579 .into_iter()
580 .map(|d| d.rendered.unwrap())
581 .collect::<Vec<_>>()
582 .join("\n");
583 let text_lines = text_lines.join("\n");
584 let stderr_lines = stderr_lines.join("\n");
585
586 anyhow::bail!(
587 r#"
588Failed to build crate {exit_status:?}
589--- diagnostics
590---
591{diagnostics}
592---
593---
594---
595
596--- text_lines
597---
598---
599{text_lines}
600---
601---
602---
603
604--- stderr_lines
605---
606---
607{stderr_lines}
608---
609---
610---"#
611 );
612 }
613 Err(err) => {
614 anyhow::bail!("Failed to build crate {err:?}");
615 }
616 };
617
618 let docker = Docker::connect_with_local_defaults()?;
619
620 let mut tar_data = Vec::new();
621 {
622 let mut tar = Builder::new(&mut tar_data);
623
624 let exposed_ports = exposed_ports
625 .iter()
626 .map(|port| format!("EXPOSE {port}/tcp"))
627 .collect::<Vec<_>>()
628 .join("\n");
629
630 let dockerfile_content = format!(
631 r#"
632 FROM scratch
633 {exposed_ports}
634 COPY app /app
635 CMD ["/app"]
636 "#,
637 );
638
639 trace!(name: "dockerfile", %dockerfile_content);
640
641 let mut header = Header::new_gnu();
642 header.set_path("Dockerfile")?;
643 header.set_size(dockerfile_content.len() as u64);
644 header.set_cksum();
645 tar.append(&header, dockerfile_content.as_bytes())?;
646
647 let mut header = Header::new_gnu();
648 header.set_path("app")?;
649 header.set_size(build_output.bin_data.len() as u64);
650 header.set_mode(0o755);
651 header.set_cksum();
652 tar.append(&header, &build_output.bin_data[..])?;
653
654 tar.finish()?;
655 }
656
657 let build_options = BuildImageOptions {
658 dockerfile: "Dockerfile".to_owned(),
659 t: Some(image_name.to_owned()),
660 rm: true,
661 ..Default::default()
662 };
663
664 use bollard::errors::Error;
665
666 let body = http_body_util::Either::Left(Full::new(Bytes::from(tar_data)));
667 let mut build_stream = docker.build_image(build_options, None, Some(body));
668 while let Some(msg) = build_stream.next().await {
669 match msg {
670 Ok(_) => {}
671 Err(e) => match e {
672 Error::DockerStreamError { error } => {
673 return Err(anyhow::anyhow!(
674 "Docker build failed: DockerStreamError: {{ error: {error} }}"
675 ));
676 }
677 _ => return Err(anyhow::anyhow!("Docker build failed: {}", e)),
678 },
679 }
680 }
681
682 Ok(())
683}
684
685impl DockerDeploy {
686 pub fn new(network: DockerNetwork) -> Self {
688 Self {
689 docker_processes: Vec::new(),
690 docker_clusters: Vec::new(),
691 network,
692 deployment_instance: nanoid!(6, &CONTAINER_ALPHABET),
693 }
694 }
695
696 pub fn add_localhost_docker(
698 &mut self,
699 compilation_options: Option<String>,
700 config: Vec<String>,
701 ) -> DockerDeployProcessSpec {
702 let process = DockerDeployProcessSpec {
703 compilation_options,
704 config,
705 network: self.network.clone(),
706 deployment_instance: self.deployment_instance.clone(),
707 };
708
709 self.docker_processes.push(process.clone());
710
711 process
712 }
713
714 pub fn add_localhost_docker_cluster(
716 &mut self,
717 compilation_options: Option<String>,
718 config: Vec<String>,
719 count: usize,
720 ) -> DockerDeployClusterSpec {
721 let cluster = DockerDeployClusterSpec {
722 compilation_options,
723 config,
724 count,
725 deployment_instance: self.deployment_instance.clone(),
726 };
727
728 self.docker_clusters.push(cluster.clone());
729
730 cluster
731 }
732
733 pub fn add_external(&self, name: String) -> DockerDeployExternalSpec {
735 DockerDeployExternalSpec { name }
736 }
737
738 pub fn get_deployment_instance(&self) -> String {
740 self.deployment_instance.clone()
741 }
742
743 #[instrument(level = "trace", skip_all)]
745 pub async fn provision(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
746 for (_, _, process) in nodes.get_all_processes() {
747 let exposed_ports = process.exposed_ports.borrow().clone();
748
749 build_and_create_image(
750 &process.rust_crate,
751 process.compilation_options.as_deref(),
752 &process.config,
753 &exposed_ports,
754 &process.name,
755 )
756 .await?;
757 }
758
759 for (_, _, cluster) in nodes.get_all_clusters() {
760 build_and_create_image(
761 &cluster.rust_crate,
762 cluster.compilation_options.as_deref(),
763 &cluster.config,
764 &[], &cluster.name,
766 )
767 .await?;
768 }
769
770 Ok(())
771 }
772
773 #[instrument(level = "trace", skip_all)]
775 pub async fn start(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
776 let docker = Docker::connect_with_local_defaults()?;
777
778 match docker
779 .create_network(NetworkCreateRequest {
780 name: self.network.name.clone(),
781 driver: Some("bridge".to_owned()),
782 ..Default::default()
783 })
784 .await
785 {
786 Ok(v) => v.id,
787 Err(e) => {
788 panic!("Failed to create docker network: {e:?}");
789 }
790 };
791
792 for (_, _, process) in nodes.get_all_processes() {
793 let docker_container_name: String = get_docker_container_name(&process.name, None);
794 *process.docker_container_name.borrow_mut() = Some(docker_container_name.clone());
795
796 create_and_start_container(
797 &docker,
798 &docker_container_name,
799 &process.name,
800 &self.network.name,
801 &self.deployment_instance,
802 )
803 .await?;
804 }
805
806 for (_, _, cluster) in nodes.get_all_clusters() {
807 for num in 0..cluster.count {
808 let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
809 cluster
810 .docker_container_name
811 .borrow_mut()
812 .push(docker_container_name.clone());
813
814 create_and_start_container(
815 &docker,
816 &docker_container_name,
817 &cluster.name,
818 &self.network.name,
819 &self.deployment_instance,
820 )
821 .await?;
822 }
823 }
824
825 Ok(())
826 }
827
828 #[instrument(level = "trace", skip_all)]
830 pub async fn stop(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
831 let docker = Docker::connect_with_local_defaults()?;
832
833 for (_, _, process) in nodes.get_all_processes() {
834 let docker_container_name: String = get_docker_container_name(&process.name, None);
835
836 docker
837 .kill_container(&docker_container_name, None::<KillContainerOptions>)
838 .await?;
839 }
840
841 for (_, _, cluster) in nodes.get_all_clusters() {
842 for num in 0..cluster.count {
843 let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
844
845 docker
846 .kill_container(&docker_container_name, None::<KillContainerOptions>)
847 .await?;
848 }
849 }
850
851 Ok(())
852 }
853
854 #[instrument(level = "trace", skip_all)]
856 pub async fn cleanup(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
857 let docker = Docker::connect_with_local_defaults()?;
858
859 for (_, _, process) in nodes.get_all_processes() {
860 let docker_container_name: String = get_docker_container_name(&process.name, None);
861
862 docker
863 .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
864 .await?;
865 }
866
867 for (_, _, cluster) in nodes.get_all_clusters() {
868 for num in 0..cluster.count {
869 let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
870
871 docker
872 .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
873 .await?;
874 }
875 }
876
877 docker
878 .remove_network(&self.network.name)
879 .await
880 .map_err(|e| anyhow::anyhow!("Failed to remove docker network: {e:?}"))?;
881
882 use bollard::query_parameters::RemoveImageOptions;
883
884 for (_, _, process) in nodes.get_all_processes() {
885 docker
886 .remove_image(&process.name, None::<RemoveImageOptions>, None)
887 .await?;
888 }
889
890 for (_, _, cluster) in nodes.get_all_clusters() {
891 docker
892 .remove_image(&cluster.name, None::<RemoveImageOptions>, None)
893 .await?;
894 }
895
896 Ok(())
897 }
898}
899
900impl<'a> Deploy<'a> for DockerDeploy {
901 type Meta = ();
902 type InstantiateEnv = Self;
903
904 type Process = DockerDeployProcess;
905 type Cluster = DockerDeployCluster;
906 type External = DockerDeployExternal;
907
908 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, p2_port))]
909 fn o2o_sink_source(
910 _env: &mut Self::InstantiateEnv,
911 p1: &Self::Process,
912 p1_port: &<Self::Process as Node>::Port,
913 p2: &Self::Process,
914 p2_port: &<Self::Process as Node>::Port,
915 _name: Option<&str>,
916 ) -> (syn::Expr, syn::Expr) {
917 let bind_addr = format!("0.0.0.0:{}", p2_port);
918 let target = format!("{}:{p2_port}", p2.name);
919
920 deploy_containerized_o2o(target.as_str(), bind_addr.as_str())
921 }
922
923 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, p2_port))]
924 fn o2o_connect(
925 p1: &Self::Process,
926 p1_port: &<Self::Process as Node>::Port,
927 p2: &Self::Process,
928 p2_port: &<Self::Process as Node>::Port,
929 ) -> Box<dyn FnOnce()> {
930 let serialized = format!("o2o_connect {}:{p1_port} -> {}:{p2_port}", p1.name, p2.name);
931
932 Box::new(move || {
933 trace!(name: "o2o_connect thunk", %serialized);
934 })
935 }
936
937 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
938 fn o2m_sink_source(
939 p1: &Self::Process,
940 p1_port: &<Self::Process as Node>::Port,
941 c2: &Self::Cluster,
942 c2_port: &<Self::Cluster as Node>::Port,
943 ) -> (syn::Expr, syn::Expr) {
944 deploy_containerized_o2m(*c2_port)
945 }
946
947 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
948 fn o2m_connect(
949 p1: &Self::Process,
950 p1_port: &<Self::Process as Node>::Port,
951 c2: &Self::Cluster,
952 c2_port: &<Self::Cluster as Node>::Port,
953 ) -> Box<dyn FnOnce()> {
954 let serialized = format!("o2m_connect {}:{p1_port} -> {}:{c2_port}", p1.name, c2.name);
955
956 Box::new(move || {
957 trace!(name: "o2m_connect thunk", %serialized);
958 })
959 }
960
961 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
962 fn m2o_sink_source(
963 c1: &Self::Cluster,
964 c1_port: &<Self::Cluster as Node>::Port,
965 p2: &Self::Process,
966 p2_port: &<Self::Process as Node>::Port,
967 ) -> (syn::Expr, syn::Expr) {
968 deploy_containerized_m2o(*p2_port, &p2.name)
969 }
970
971 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
972 fn m2o_connect(
973 c1: &Self::Cluster,
974 c1_port: &<Self::Cluster as Node>::Port,
975 p2: &Self::Process,
976 p2_port: &<Self::Process as Node>::Port,
977 ) -> Box<dyn FnOnce()> {
978 let serialized = format!("o2m_connect {}:{c1_port} -> {}:{p2_port}", c1.name, p2.name);
979
980 Box::new(move || {
981 trace!(name: "m2o_connect thunk", %serialized);
982 })
983 }
984
985 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
986 fn m2m_sink_source(
987 c1: &Self::Cluster,
988 c1_port: &<Self::Cluster as Node>::Port,
989 c2: &Self::Cluster,
990 c2_port: &<Self::Cluster as Node>::Port,
991 ) -> (syn::Expr, syn::Expr) {
992 deploy_containerized_m2m(*c2_port)
993 }
994
995 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
996 fn m2m_connect(
997 c1: &Self::Cluster,
998 c1_port: &<Self::Cluster as Node>::Port,
999 c2: &Self::Cluster,
1000 c2_port: &<Self::Cluster as Node>::Port,
1001 ) -> Box<dyn FnOnce()> {
1002 let serialized = format!("m2m_connect {}:{c1_port} -> {}:{c2_port}", c1.name, c2.name);
1003
1004 Box::new(move || {
1005 trace!(name: "m2m_connect thunk", %serialized);
1006 })
1007 }
1008
1009 #[instrument(level = "trace", skip_all, fields(p2 = p2.name, %p2_port, %shared_handle, extra_stmts = extra_stmts.len()))]
1010 fn e2o_many_source(
1011 extra_stmts: &mut Vec<syn::Stmt>,
1012 p2: &Self::Process,
1013 p2_port: &<Self::Process as Node>::Port,
1014 codec_type: &syn::Type,
1015 shared_handle: String,
1016 ) -> syn::Expr {
1017 p2.exposed_ports.borrow_mut().push(*p2_port);
1018
1019 let socket_ident = syn::Ident::new(
1020 &format!("__hydro_deploy_many_{}_socket", &shared_handle),
1021 Span::call_site(),
1022 );
1023
1024 let source_ident = syn::Ident::new(
1025 &format!("__hydro_deploy_many_{}_source", &shared_handle),
1026 Span::call_site(),
1027 );
1028
1029 let sink_ident = syn::Ident::new(
1030 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
1031 Span::call_site(),
1032 );
1033
1034 let membership_ident = syn::Ident::new(
1035 &format!("__hydro_deploy_many_{}_membership", &shared_handle),
1036 Span::call_site(),
1037 );
1038
1039 let bind_addr = format!("0.0.0.0:{}", p2_port);
1040
1041 extra_stmts.push(syn::parse_quote! {
1042 let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1043 });
1044
1045 let root = crate::staging_util::get_this_crate();
1046
1047 extra_stmts.push(syn::parse_quote! {
1048 let (#source_ident, #sink_ident, #membership_ident) = #root::runtime_support::hydro_deploy_integration::multi_connection::tcp_multi_connection::<_, #codec_type>(#socket_ident);
1049 });
1050
1051 parse_quote!(#source_ident)
1052 }
1053
1054 #[instrument(level = "trace", skip_all, fields(%shared_handle))]
1055 fn e2o_many_sink(shared_handle: String) -> syn::Expr {
1056 let sink_ident = syn::Ident::new(
1057 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
1058 Span::call_site(),
1059 );
1060 parse_quote!(#sink_ident)
1061 }
1062
1063 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1064 fn e2o_source(
1065 extra_stmts: &mut Vec<syn::Stmt>,
1066 p1: &Self::External,
1067 p1_port: &<Self::External as Node>::Port,
1068 p2: &Self::Process,
1069 p2_port: &<Self::Process as Node>::Port,
1070 _codec_type: &syn::Type,
1071 shared_handle: String,
1072 ) -> syn::Expr {
1073 p1.connection_info.borrow_mut().insert(
1074 *p1_port,
1075 (
1076 p2.docker_container_name.clone(),
1077 *p2_port,
1078 p2.network.clone(),
1079 ),
1080 );
1081
1082 p2.exposed_ports.borrow_mut().push(*p2_port);
1083
1084 let socket_ident = syn::Ident::new(
1085 &format!("__hydro_deploy_{}_socket", &shared_handle),
1086 Span::call_site(),
1087 );
1088
1089 let source_ident = syn::Ident::new(
1090 &format!("__hydro_deploy_{}_source", &shared_handle),
1091 Span::call_site(),
1092 );
1093
1094 let sink_ident = syn::Ident::new(
1095 &format!("__hydro_deploy_{}_sink", &shared_handle),
1096 Span::call_site(),
1097 );
1098
1099 let bind_addr = format!("0.0.0.0:{}", p2_port);
1100
1101 extra_stmts.push(syn::parse_quote! {
1102 let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1103 });
1104
1105 let create_expr = deploy_containerized_external_sink_source_ident(socket_ident);
1106
1107 extra_stmts.push(syn::parse_quote! {
1108 let (#sink_ident, #source_ident) = (#create_expr).split();
1109 });
1110
1111 parse_quote!(#source_ident)
1112 }
1113
1114 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?many, ?server_hint))]
1115 fn e2o_connect(
1116 p1: &Self::External,
1117 p1_port: &<Self::External as Node>::Port,
1118 p2: &Self::Process,
1119 p2_port: &<Self::Process as Node>::Port,
1120 many: bool,
1121 server_hint: NetworkHint,
1122 ) -> Box<dyn FnOnce()> {
1123 if server_hint != NetworkHint::Auto {
1124 panic!(
1125 "Docker deployment only supports NetworkHint::Auto, got {:?}",
1126 server_hint
1127 );
1128 }
1129
1130 if many {
1132 p1.connection_info.borrow_mut().insert(
1133 *p1_port,
1134 (
1135 p2.docker_container_name.clone(),
1136 *p2_port,
1137 p2.network.clone(),
1138 ),
1139 );
1140 }
1141
1142 let serialized = format!("e2o_connect {}:{p1_port} -> {}:{p2_port}", p1.name, p2.name);
1143
1144 Box::new(move || {
1145 trace!(name: "e2o_connect thunk", %serialized);
1146 })
1147 }
1148
1149 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1150 fn o2e_sink(
1151 p1: &Self::Process,
1152 p1_port: &<Self::Process as Node>::Port,
1153 p2: &Self::External,
1154 p2_port: &<Self::External as Node>::Port,
1155 shared_handle: String,
1156 ) -> syn::Expr {
1157 let sink_ident = syn::Ident::new(
1158 &format!("__hydro_deploy_{}_sink", &shared_handle),
1159 Span::call_site(),
1160 );
1161 parse_quote!(#sink_ident)
1162 }
1163
1164 #[instrument(level = "trace", skip_all, fields(%of_cluster))]
1165 fn cluster_ids(
1166 of_cluster: LocationKey,
1167 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
1168 cluster_ids()
1169 }
1170
1171 #[instrument(level = "trace", skip_all)]
1172 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
1173 cluster_self_id()
1174 }
1175
1176 #[instrument(level = "trace", skip_all, fields(?location_id))]
1177 fn cluster_membership_stream(
1178 location_id: &LocationId,
1179 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
1180 {
1181 cluster_membership_stream(location_id)
1182 }
1183}
1184
1185const CONTAINER_ALPHABET: [char; 36] = [
1186 '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i',
1187 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
1188];
1189
1190#[instrument(level = "trace", skip_all, ret, fields(%name_hint, %location_key, %deployment_instance))]
1191fn get_docker_image_name(
1192 name_hint: &str,
1193 location_key: LocationKey,
1194 deployment_instance: &str,
1195) -> String {
1196 let name_hint = name_hint
1197 .split("::")
1198 .last()
1199 .unwrap()
1200 .to_ascii_lowercase()
1201 .replace(".", "-")
1202 .replace("_", "-")
1203 .replace("::", "-");
1204
1205 let image_unique_tag = nanoid::nanoid!(6, &CONTAINER_ALPHABET);
1206
1207 format!("hy-{name_hint}-{image_unique_tag}-{deployment_instance}-{location_key}")
1208}
1209
1210#[instrument(level = "trace", skip_all, ret, fields(%image_name, ?instance))]
1211fn get_docker_container_name(image_name: &str, instance: Option<usize>) -> String {
1212 if let Some(instance) = instance {
1213 format!("{image_name}-{instance}")
1214 } else {
1215 image_name.to_owned()
1216 }
1217}
1218#[derive(Clone)]
1220pub struct DockerDeployProcessSpec {
1221 compilation_options: Option<String>,
1222 config: Vec<String>,
1223 network: DockerNetwork,
1224 deployment_instance: String,
1225}
1226
1227impl<'a> ProcessSpec<'a, DockerDeploy> for DockerDeployProcessSpec {
1228 #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1229 fn build(self, key: LocationKey, name_hint: &'_ str) -> <DockerDeploy as Deploy<'a>>::Process {
1230 DockerDeployProcess {
1231 key,
1232 name: get_docker_image_name(name_hint, key, &self.deployment_instance),
1233
1234 next_port: Rc::new(RefCell::new(1000)),
1235 rust_crate: Rc::new(RefCell::new(None)),
1236
1237 exposed_ports: Rc::new(RefCell::new(Vec::new())),
1238
1239 docker_container_name: Rc::new(RefCell::new(None)),
1240
1241 compilation_options: self.compilation_options,
1242 config: self.config,
1243
1244 network: self.network.clone(),
1245 }
1246 }
1247}
1248
1249#[derive(Clone)]
1251pub struct DockerDeployClusterSpec {
1252 compilation_options: Option<String>,
1253 config: Vec<String>,
1254 count: usize,
1255 deployment_instance: String,
1256}
1257
1258impl<'a> ClusterSpec<'a, DockerDeploy> for DockerDeployClusterSpec {
1259 #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1260 fn build(self, key: LocationKey, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::Cluster {
1261 DockerDeployCluster {
1262 key,
1263 name: get_docker_image_name(name_hint, key, &self.deployment_instance),
1264
1265 next_port: Rc::new(RefCell::new(1000)),
1266 rust_crate: Rc::new(RefCell::new(None)),
1267
1268 docker_container_name: Rc::new(RefCell::new(Vec::new())),
1269
1270 compilation_options: self.compilation_options,
1271 config: self.config,
1272
1273 count: self.count,
1274 }
1275 }
1276}
1277
1278pub struct DockerDeployExternalSpec {
1280 name: String,
1281}
1282
1283impl<'a> ExternalSpec<'a, DockerDeploy> for DockerDeployExternalSpec {
1284 #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1285 fn build(self, key: LocationKey, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::External {
1286 DockerDeployExternal {
1287 name: self.name,
1288 next_port: Rc::new(RefCell::new(10000)),
1289 ports: Rc::new(RefCell::new(HashMap::new())),
1290 connection_info: Rc::new(RefCell::new(HashMap::new())),
1291 }
1292 }
1293}