haste_operation_executor/providers/deno_embedded/
pool.rs1use crate::extract_code_from_operation_definition;
2use crate::providers::deno_embedded::run_code;
3use crate::structs::PluginCodeType;
4use crate::traits::OperationExecutor;
5use deno_core::{error::AnyError, serde_json};
6use haste_fhir_client::FHIRClient;
7use haste_fhir_client::request::InvocationRequest;
8use haste_fhir_model::r4::generated::resources::{OperationDefinition, Parameters};
9use haste_fhir_model::r4::generated::terminology::IssueType;
10use haste_fhir_operation_error::OperationOutcomeError;
11use std::io;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::sync::mpsc;
15use std::thread::{self, JoinHandle};
16use tokio::runtime::Runtime;
17use tokio::sync::oneshot;
18
19type JobResult = Result<Option<serde_json::Value>, AnyError>;
20
21pub struct DenoPool {
22 next_worker: AtomicUsize,
23 workers: Vec<WorkerHandle>,
24}
25
26impl DenoPool {
27 pub fn new(thread_count: usize) -> Result<Self, AnyError> {
28 if thread_count == 0 {
29 return Err(io::Error::other("DenoPool requires at least one worker thread").into());
30 }
31
32 let mut workers = Vec::with_capacity(thread_count);
33
34 for index in 0..thread_count {
35 let result = spawn_worker(index);
36
37 match result {
38 Ok(worker) => workers.push(worker),
39 Err(error) => {
40 shutdown_workers(&mut workers);
41 return Err(error);
42 }
43 }
44 }
45
46 Ok(Self {
47 next_worker: AtomicUsize::new(0),
48 workers,
49 })
50 }
51
52 async fn execute<
53 CTX: Clone + Send + 'static,
54 Client: FHIRClient<CTX, OperationOutcomeError> + 'static,
55 >(
56 &self,
57 ctx: CTX,
58 client: Arc<Client>,
59 media_type: PluginCodeType,
60 code: impl Into<String>,
61 ) -> JobResult {
62 let worker_index = self.next_worker.fetch_add(1, Ordering::Relaxed) % self.workers.len();
63 let worker = &self.workers[worker_index];
64 let (response_tx, response_rx) = oneshot::channel();
65 let code = code.into();
66
67 let task = Box::new(move |runtime: &Runtime| {
68 let result = runtime.block_on(async move {
69 let output = run_code(ctx, client, media_type, code).await?;
70
71 output
72 .map(serde_json::from_value)
73 .transpose()
74 .map_err(AnyError::from)
75 });
76
77 let _ = response_tx.send(result);
78 }) as Box<dyn WorkerTask>;
79
80 worker
81 .command_tx
82 .send(WorkerCommand::Run(task))
83 .map_err(|_| io::Error::other("DenoPool worker is not accepting jobs"))?;
84
85 response_rx
86 .await
87 .map_err(|_| io::Error::other("DenoPool worker dropped the response channel"))?
88 }
89}
90
91impl Drop for DenoPool {
92 fn drop(&mut self) {
93 shutdown_workers(&mut self.workers);
94 }
95}
96
97impl OperationExecutor for DenoPool {
98 async fn execute_operation<
99 CTX: Clone + Send + 'static,
100 Client: FHIRClient<CTX, OperationOutcomeError> + 'static,
101 >(
102 &self,
103 context: CTX,
104 client: Arc<Client>,
105 operation: &OperationDefinition,
106 _input: &InvocationRequest,
107 ) -> Result<Parameters, OperationOutcomeError> {
108 let (code, media_type) =
109 extract_code_from_operation_definition(operation).ok_or_else(|| {
110 OperationOutcomeError::error(
111 IssueType::Invalid(None),
112 "OperationDefinition missing custom code extension metadata".to_string(),
113 )
114 })?;
115
116 let media_type = PluginCodeType::try_from(media_type)?;
117
118 let output = self
119 .execute(context, client, media_type, code.to_string())
120 .await
121 .map_err(|error| {
122 OperationOutcomeError::error(
123 IssueType::Processing(None),
124 format!("Failed to execute operation custom code: {error}"),
125 )
126 })?
127 .ok_or_else(|| {
128 OperationOutcomeError::error(
129 IssueType::Processing(None),
130 "Operation custom code returned no output".to_string(),
131 )
132 })?;
133
134 haste_fhir_serialization_json::from_serde_value(output).map_err(|error| {
135 OperationOutcomeError::error(
136 IssueType::Invalid(None),
137 format!("Operation custom code returned invalid Parameters payload: {error}"),
138 )
139 })
140 }
141}
142
143struct WorkerHandle {
144 command_tx: mpsc::Sender<WorkerCommand>,
145 join_handle: Option<JoinHandle<()>>,
146}
147
148enum WorkerCommand {
149 Run(Box<dyn WorkerTask>),
150 Shutdown,
151}
152
153trait WorkerTask: Send + 'static {
154 fn run(self: Box<Self>, runtime: &Runtime);
155}
156
157impl<Function> WorkerTask for Function
158where
159 Function: FnOnce(&Runtime) + Send + 'static,
160{
161 fn run(self: Box<Self>, runtime: &Runtime) {
162 (*self)(runtime);
163 }
164}
165
166fn spawn_worker(index: usize) -> Result<WorkerHandle, AnyError> {
167 let (command_tx, command_rx) = mpsc::channel();
168 let (startup_tx, startup_rx) = mpsc::sync_channel(1);
169
170 let join_handle = thread::Builder::new()
171 .name(format!("deno-pool-{index}"))
172 .spawn(move || {
173 let runtime = match tokio::runtime::Builder::new_current_thread()
174 .enable_all()
175 .build()
176 {
177 Ok(runtime) => {
178 let _ = startup_tx.send(Ok(()));
179 runtime
180 }
181 Err(error) => {
182 let _ = startup_tx.send(Err::<(), AnyError>(error.into()));
183 return;
184 }
185 };
186
187 while let Ok(command) = command_rx.recv() {
188 match command {
189 WorkerCommand::Run(task) => task.run(&runtime),
190 WorkerCommand::Shutdown => break,
191 }
192 }
193 })?;
194
195 startup_rx
196 .recv()
197 .map_err(|_| io::Error::other("DenoPool worker failed during startup"))??;
198
199 Ok(WorkerHandle {
200 command_tx,
201 join_handle: Some(join_handle),
202 })
203}
204
205fn shutdown_workers(workers: &mut [WorkerHandle]) {
206 for worker in workers.iter() {
207 let _ = worker.command_tx.send(WorkerCommand::Shutdown);
208 }
209
210 for worker in workers.iter_mut() {
211 if let Some(join_handle) = worker.join_handle.take() {
212 let _ = join_handle.join();
213 }
214 }
215}