Skip to main content

haste_operation_executor/providers/deno_embedded/
pool.rs

1use crate::providers::deno_embedded::run_code;
2use crate::structs::PluginCodeType;
3use crate::traits::OperationExecutor;
4use crate::validate::validate_parameters;
5use crate::{CUSTOM_CODE_EXTENSION_URL, extract_code_from_operation_definition};
6use deno_core::serde_json::json;
7use deno_core::{error::AnyError, serde_json};
8use haste_fhir_client::FHIRClient;
9use haste_fhir_client::request::InvocationRequest;
10use haste_fhir_model::r4::generated::resources::{OperationDefinition, Parameters};
11use haste_fhir_model::r4::generated::terminology::{IssueType, OperationParameterUse};
12use haste_fhir_operation_error::OperationOutcomeError;
13use std::io;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicUsize, Ordering};
16use std::sync::mpsc;
17use std::thread::{self, JoinHandle};
18use tokio::runtime::Runtime;
19use tokio::sync::oneshot;
20
21type JobResult = Result<Option<serde_json::Value>, AnyError>;
22
23pub struct DenoPool {
24    next_worker: AtomicUsize,
25    workers: Vec<WorkerHandle>,
26}
27
28impl DenoPool {
29    pub fn new(thread_count: usize) -> Result<Self, AnyError> {
30        if thread_count == 0 {
31            return Err(io::Error::other("DenoPool requires at least one worker thread").into());
32        }
33
34        let mut workers = Vec::with_capacity(thread_count);
35
36        for index in 0..thread_count {
37            let result = spawn_worker(index);
38
39            match result {
40                Ok(worker) => workers.push(worker),
41                Err(error) => {
42                    shutdown_workers(&mut workers);
43                    return Err(error);
44                }
45            }
46        }
47
48        Ok(Self {
49            next_worker: AtomicUsize::new(0),
50            workers,
51        })
52    }
53
54    async fn execute<
55        CTX: Clone + Send + 'static,
56        Client: FHIRClient<CTX, OperationOutcomeError> + 'static,
57    >(
58        &self,
59        ctx: CTX,
60        client: Arc<Client>,
61        media_type: PluginCodeType,
62        code: impl Into<String>,
63        input: serde_json::Value,
64    ) -> JobResult {
65        let worker_index = self.next_worker.fetch_add(1, Ordering::Relaxed) % self.workers.len();
66        let worker = &self.workers[worker_index];
67
68        let (response_tx, response_rx) = oneshot::channel();
69        let code = code.into();
70
71        let task = Box::new(move |runtime: &Runtime| {
72            let result = runtime.block_on(async move {
73                let output = run_code(ctx, client, media_type, &code, input).await?;
74
75                output
76                    .map(serde_json::from_value)
77                    .transpose()
78                    .map_err(AnyError::from)
79            });
80
81            let _ = response_tx.send(result);
82        }) as Box<dyn WorkerTask>;
83
84        worker
85            .command_tx
86            .send(WorkerCommand::Run(task))
87            .map_err(|_| io::Error::other("DenoPool worker is not accepting jobs"))?;
88
89        response_rx
90            .await
91            .map_err(|_| io::Error::other("DenoPool worker dropped the response channel"))?
92    }
93}
94
95impl Drop for DenoPool {
96    fn drop(&mut self) {
97        shutdown_workers(&mut self.workers);
98    }
99}
100
101fn get_parameters<'a>(input: &'a InvocationRequest) -> &'a Parameters {
102    match input {
103        InvocationRequest::Instance(instance_request) => &instance_request.parameters,
104        InvocationRequest::Type(type_request) => &type_request.parameters,
105        InvocationRequest::System(system_request) => &system_request.parameters,
106    }
107}
108
109fn request_to_json(input: &InvocationRequest) -> Result<serde_json::Value, OperationOutcomeError> {
110    let parameter_json: serde_json::Value =
111        serde_json::to_value(get_parameters(input)).map_err(|_| {
112            OperationOutcomeError::error(
113                IssueType::Invalid(None),
114                "Failed to convert operation input parameters to JSON value".to_string(),
115            )
116        })?;
117
118    match input {
119        InvocationRequest::Instance(instance_request) => Ok(json!({
120            "id": &instance_request.id,
121            "resource": instance_request.resource_type.as_ref(),
122            "parameters": parameter_json,
123
124        })),
125        InvocationRequest::Type(type_request) => Ok(json!({
126            "resource": type_request.resource_type.as_ref(),
127            "parameters": parameter_json,
128        })),
129        InvocationRequest::System(_system_request) => Ok(json!({
130            "parameters": parameter_json,
131        })),
132    }
133}
134
135impl OperationExecutor for DenoPool {
136    async fn execute_operation<
137        CTX: Clone + Send + 'static,
138        Client: FHIRClient<CTX, OperationOutcomeError> + 'static,
139    >(
140        &self,
141        context: CTX,
142        client: Arc<Client>,
143        operation: &OperationDefinition,
144        input: &InvocationRequest,
145    ) -> Result<Parameters, OperationOutcomeError> {
146        validate_parameters(
147            get_parameters(input),
148            &operation.parameter.as_deref().unwrap_or_default(),
149            &OperationParameterUse::In(None),
150        )?;
151
152        let (code, media_type) =
153            extract_code_from_operation_definition(operation).ok_or_else(|| {
154                OperationOutcomeError::error(
155                    IssueType::Invalid(None),
156                    format!(
157                        "OperationDefinition missing custom code extension metadata '{}'",
158                        CUSTOM_CODE_EXTENSION_URL
159                    ),
160                )
161            })?;
162
163        let media_type = PluginCodeType::try_from(media_type)?;
164
165        let output = self
166            .execute(
167                context,
168                client,
169                media_type,
170                code.to_string(),
171                request_to_json(input)?,
172            )
173            .await
174            .map_err(|error| {
175                OperationOutcomeError::error(
176                    IssueType::Processing(None),
177                    format!("Failed to execute operation custom code: {error}"),
178                )
179            })?
180            .ok_or_else(|| {
181                OperationOutcomeError::error(
182                    IssueType::Processing(None),
183                    "Operation custom code returned no output".to_string(),
184                )
185            })?;
186
187        let output = serde_json::from_value::<Parameters>(output).map_err(|error| {
188            OperationOutcomeError::error(
189                IssueType::Invalid(None),
190                format!("Operation custom code returned invalid Parameters payload: {error}"),
191            )
192        })?;
193
194        validate_parameters(
195            &output,
196            &operation.parameter.as_deref().unwrap_or_default(),
197            &OperationParameterUse::Out(None),
198        )?;
199
200        Ok(output)
201    }
202}
203
204struct WorkerHandle {
205    command_tx: mpsc::Sender<WorkerCommand>,
206    join_handle: Option<JoinHandle<()>>,
207}
208
209enum WorkerCommand {
210    Run(Box<dyn WorkerTask>),
211    Shutdown,
212}
213
214trait WorkerTask: Send + 'static {
215    fn run(self: Box<Self>, runtime: &Runtime);
216}
217
218impl<Function> WorkerTask for Function
219where
220    Function: FnOnce(&Runtime) + Send + 'static,
221{
222    fn run(self: Box<Self>, runtime: &Runtime) {
223        (*self)(runtime);
224    }
225}
226
227fn spawn_worker(index: usize) -> Result<WorkerHandle, AnyError> {
228    let (command_tx, command_rx) = mpsc::channel();
229    let (startup_tx, startup_rx) = mpsc::sync_channel(1);
230
231    let join_handle = thread::Builder::new()
232        .name(format!("deno-pool-{index}"))
233        .spawn(move || {
234            let runtime = match tokio::runtime::Builder::new_current_thread()
235                .enable_all()
236                .build()
237            {
238                Ok(runtime) => {
239                    let _ = startup_tx.send(Ok(()));
240                    runtime
241                }
242                Err(error) => {
243                    let _ = startup_tx.send(Err::<(), AnyError>(error.into()));
244                    return;
245                }
246            };
247
248            while let Ok(command) = command_rx.recv() {
249                match command {
250                    WorkerCommand::Run(task) => task.run(&runtime),
251                    WorkerCommand::Shutdown => break,
252                }
253            }
254        })?;
255
256    startup_rx
257        .recv()
258        .map_err(|_| io::Error::other("DenoPool worker failed during startup"))??;
259
260    Ok(WorkerHandle {
261        command_tx,
262        join_handle: Some(join_handle),
263    })
264}
265
266fn shutdown_workers(workers: &mut [WorkerHandle]) {
267    for worker in workers.iter() {
268        let _ = worker.command_tx.send(WorkerCommand::Shutdown);
269    }
270
271    for worker in workers.iter_mut() {
272        if let Some(join_handle) = worker.join_handle.take() {
273            let _ = join_handle.join();
274        }
275    }
276}