haste_operation_executor/providers/deno_embedded/
pool.rs1use crate::providers::deno_embedded::run_code;
2use crate::structs::PluginCodeType;
3use crate::traits::OperationExecutor;
4use crate::validate::validate_parameters;
5use crate::{CUSTOM_CODE_EXTENSION_URL, extract_code_from_operation_definition};
6use deno_core::serde_json::json;
7use deno_core::{error::AnyError, serde_json};
8use haste_fhir_client::FHIRClient;
9use haste_fhir_client::request::InvocationRequest;
10use haste_fhir_model::r4::generated::resources::{OperationDefinition, Parameters};
11use haste_fhir_model::r4::generated::terminology::{IssueType, OperationParameterUse};
12use haste_fhir_operation_error::OperationOutcomeError;
13use std::io;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicUsize, Ordering};
16use std::sync::mpsc;
17use std::thread::{self, JoinHandle};
18use tokio::runtime::Runtime;
19use tokio::sync::oneshot;
20
21type JobResult = Result<Option<serde_json::Value>, AnyError>;
22
23pub struct DenoPool {
24 next_worker: AtomicUsize,
25 workers: Vec<WorkerHandle>,
26}
27
28impl DenoPool {
29 pub fn new(thread_count: usize) -> Result<Self, AnyError> {
30 if thread_count == 0 {
31 return Err(io::Error::other("DenoPool requires at least one worker thread").into());
32 }
33
34 let mut workers = Vec::with_capacity(thread_count);
35
36 for index in 0..thread_count {
37 let result = spawn_worker(index);
38
39 match result {
40 Ok(worker) => workers.push(worker),
41 Err(error) => {
42 shutdown_workers(&mut workers);
43 return Err(error);
44 }
45 }
46 }
47
48 Ok(Self {
49 next_worker: AtomicUsize::new(0),
50 workers,
51 })
52 }
53
54 async fn execute<
55 CTX: Clone + Send + 'static,
56 Client: FHIRClient<CTX, OperationOutcomeError> + 'static,
57 >(
58 &self,
59 ctx: CTX,
60 client: Arc<Client>,
61 media_type: PluginCodeType,
62 code: impl Into<String>,
63 input: serde_json::Value,
64 ) -> JobResult {
65 let worker_index = self.next_worker.fetch_add(1, Ordering::Relaxed) % self.workers.len();
66 let worker = &self.workers[worker_index];
67
68 let (response_tx, response_rx) = oneshot::channel();
69 let code = code.into();
70
71 let task = Box::new(move |runtime: &Runtime| {
72 let result = runtime.block_on(async move {
73 let output = run_code(ctx, client, media_type, &code, input).await?;
74
75 output
76 .map(serde_json::from_value)
77 .transpose()
78 .map_err(AnyError::from)
79 });
80
81 let _ = response_tx.send(result);
82 }) as Box<dyn WorkerTask>;
83
84 worker
85 .command_tx
86 .send(WorkerCommand::Run(task))
87 .map_err(|_| io::Error::other("DenoPool worker is not accepting jobs"))?;
88
89 response_rx
90 .await
91 .map_err(|_| io::Error::other("DenoPool worker dropped the response channel"))?
92 }
93}
94
95impl Drop for DenoPool {
96 fn drop(&mut self) {
97 shutdown_workers(&mut self.workers);
98 }
99}
100
101fn get_parameters<'a>(input: &'a InvocationRequest) -> &'a Parameters {
102 match input {
103 InvocationRequest::Instance(instance_request) => &instance_request.parameters,
104 InvocationRequest::Type(type_request) => &type_request.parameters,
105 InvocationRequest::System(system_request) => &system_request.parameters,
106 }
107}
108
109fn request_to_json(input: &InvocationRequest) -> Result<serde_json::Value, OperationOutcomeError> {
110 let parameter_json: serde_json::Value =
111 serde_json::to_value(get_parameters(input)).map_err(|_| {
112 OperationOutcomeError::error(
113 IssueType::Invalid(None),
114 "Failed to convert operation input parameters to JSON value".to_string(),
115 )
116 })?;
117
118 match input {
119 InvocationRequest::Instance(instance_request) => Ok(json!({
120 "id": &instance_request.id,
121 "resource": instance_request.resource_type.as_ref(),
122 "parameters": parameter_json,
123
124 })),
125 InvocationRequest::Type(type_request) => Ok(json!({
126 "resource": type_request.resource_type.as_ref(),
127 "parameters": parameter_json,
128 })),
129 InvocationRequest::System(_system_request) => Ok(json!({
130 "parameters": parameter_json,
131 })),
132 }
133}
134
135impl OperationExecutor for DenoPool {
136 async fn execute_operation<
137 CTX: Clone + Send + 'static,
138 Client: FHIRClient<CTX, OperationOutcomeError> + 'static,
139 >(
140 &self,
141 context: CTX,
142 client: Arc<Client>,
143 operation: &OperationDefinition,
144 input: &InvocationRequest,
145 ) -> Result<Parameters, OperationOutcomeError> {
146 validate_parameters(
147 get_parameters(input),
148 &operation.parameter.as_deref().unwrap_or_default(),
149 &OperationParameterUse::In(None),
150 )?;
151
152 let (code, media_type) =
153 extract_code_from_operation_definition(operation).ok_or_else(|| {
154 OperationOutcomeError::error(
155 IssueType::Invalid(None),
156 format!(
157 "OperationDefinition missing custom code extension metadata '{}'",
158 CUSTOM_CODE_EXTENSION_URL
159 ),
160 )
161 })?;
162
163 let media_type = PluginCodeType::try_from(media_type)?;
164
165 let output = self
166 .execute(
167 context,
168 client,
169 media_type,
170 code.to_string(),
171 request_to_json(input)?,
172 )
173 .await
174 .map_err(|error| {
175 OperationOutcomeError::error(
176 IssueType::Processing(None),
177 format!("Failed to execute operation custom code: {error}"),
178 )
179 })?
180 .ok_or_else(|| {
181 OperationOutcomeError::error(
182 IssueType::Processing(None),
183 "Operation custom code returned no output".to_string(),
184 )
185 })?;
186
187 let output = serde_json::from_value::<Parameters>(output).map_err(|error| {
188 OperationOutcomeError::error(
189 IssueType::Invalid(None),
190 format!("Operation custom code returned invalid Parameters payload: {error}"),
191 )
192 })?;
193
194 validate_parameters(
195 &output,
196 &operation.parameter.as_deref().unwrap_or_default(),
197 &OperationParameterUse::Out(None),
198 )?;
199
200 Ok(output)
201 }
202}
203
204struct WorkerHandle {
205 command_tx: mpsc::Sender<WorkerCommand>,
206 join_handle: Option<JoinHandle<()>>,
207}
208
209enum WorkerCommand {
210 Run(Box<dyn WorkerTask>),
211 Shutdown,
212}
213
214trait WorkerTask: Send + 'static {
215 fn run(self: Box<Self>, runtime: &Runtime);
216}
217
218impl<Function> WorkerTask for Function
219where
220 Function: FnOnce(&Runtime) + Send + 'static,
221{
222 fn run(self: Box<Self>, runtime: &Runtime) {
223 (*self)(runtime);
224 }
225}
226
227fn spawn_worker(index: usize) -> Result<WorkerHandle, AnyError> {
228 let (command_tx, command_rx) = mpsc::channel();
229 let (startup_tx, startup_rx) = mpsc::sync_channel(1);
230
231 let join_handle = thread::Builder::new()
232 .name(format!("deno-pool-{index}"))
233 .spawn(move || {
234 let runtime = match tokio::runtime::Builder::new_current_thread()
235 .enable_all()
236 .build()
237 {
238 Ok(runtime) => {
239 let _ = startup_tx.send(Ok(()));
240 runtime
241 }
242 Err(error) => {
243 let _ = startup_tx.send(Err::<(), AnyError>(error.into()));
244 return;
245 }
246 };
247
248 while let Ok(command) = command_rx.recv() {
249 match command {
250 WorkerCommand::Run(task) => task.run(&runtime),
251 WorkerCommand::Shutdown => break,
252 }
253 }
254 })?;
255
256 startup_rx
257 .recv()
258 .map_err(|_| io::Error::other("DenoPool worker failed during startup"))??;
259
260 Ok(WorkerHandle {
261 command_tx,
262 join_handle: Some(join_handle),
263 })
264}
265
266fn shutdown_workers(workers: &mut [WorkerHandle]) {
267 for worker in workers.iter() {
268 let _ = worker.command_tx.send(WorkerCommand::Shutdown);
269 }
270
271 for worker in workers.iter_mut() {
272 if let Some(join_handle) = worker.join_handle.take() {
273 let _ = join_handle.join();
274 }
275 }
276}