Skip to main content

haste_server/
services.rs

1use crate::{
2    ServerEnvironmentVariables,
3    fhir_client::{FHIRServerClient, ServerClientConfig},
4};
5use haste_config::Config;
6use haste_fhir_model::r4::generated::terminology::IssueType;
7use haste_fhir_operation_error::{OperationOutcomeError, derive::OperationOutcomeError};
8use haste_fhir_search::{SearchEngine, elastic_search::ElasticSearchEngine};
9use haste_fhir_terminology::{
10    FHIRTerminology,
11    client::FHIRCanonicalTerminology,
12    resolvers::{self, remote::LRUCanonicalRemoteResolver},
13};
14use haste_fhirpath::FPEngine;
15use haste_repository::{Repository, pg::PGConnection};
16use sqlx::{Pool, Postgres};
17use sqlx_postgres::PgPoolOptions;
18use std::{env::VarError, sync::Arc};
19use tokio::sync::OnceCell;
20use tracing::info;
21
22// Singleton for the database connection pool in postgres.
23static POOL: OnceCell<Pool<Postgres>> = OnceCell::const_new();
24pub async fn get_pool(config: &dyn Config<ServerEnvironmentVariables>) -> &'static Pool<Postgres> {
25    POOL.get_or_init(async || {
26        let database_url = config
27            .get(ServerEnvironmentVariables::DataBaseURL)
28            .expect(&format!(
29                "'{}' must be set",
30                String::from(ServerEnvironmentVariables::DataBaseURL)
31            ));
32        info!("Connecting to postgres database");
33        let connection = PgPoolOptions::new()
34            .max_connections(5)
35            .connect(&database_url)
36            .await
37            .expect("Failed to create database connection pool");
38        connection
39    })
40    .await
41}
42
43#[derive(OperationOutcomeError, Debug)]
44pub enum ConfigError {
45    #[error(code = "invalid", diagnostic = "Invalid environment!")]
46    DotEnv(#[from] dotenvy::Error),
47    #[error(code = "invalid", diagnostic = "Invalid session!")]
48    Session(#[from] tower_sessions::session::Error),
49    #[error(code = "invalid", diagnostic = "Database error")]
50    Database(#[from] sqlx::Error),
51    #[error(code = "invalid", diagnostic = "Environment variable not set {arg0}")]
52    EnvironmentVariable(#[from] VarError),
53    #[error(code = "invalid", diagnostic = "Failed to render template.")]
54    TemplateRender,
55}
56
57#[derive(OperationOutcomeError, Debug)]
58pub enum CustomOpError {
59    #[error(code = "invalid", diagnostic = "FHIRPath error")]
60    FHIRPath(#[from] haste_fhirpath::FHIRPathError),
61    #[error(code = "invalid", diagnostic = "Failed to deserialize resource")]
62    Deserialize(#[from] serde_json::Error),
63    #[error(code = "invalid", diagnostic = "Internal server error")]
64    InternalServerError,
65}
66
67pub struct AppState<
68    Repo: Repository + Send + Sync + 'static,
69    Search: SearchEngine + Send + Sync + 'static,
70    Terminology: FHIRTerminology + Send + Sync + 'static,
71> {
72    pub terminology: Arc<Terminology>,
73    pub search: Arc<Search>,
74    pub repo: Arc<Repo>,
75    pub rate_limit: Arc<dyn haste_rate_limit::RateLimit>,
76    pub fhir_client: Arc<FHIRServerClient<Repo, Search, Terminology>>,
77    pub config: Arc<dyn Config<ServerEnvironmentVariables>>,
78}
79
80impl<
81    Repo: Repository + Send + Sync + 'static,
82    Search: SearchEngine + Send + Sync + 'static,
83    Terminology: FHIRTerminology + Send + Sync + 'static,
84> AppState<Repo, Search, Terminology>
85{
86    pub async fn transaction(&self) -> Result<Self, OperationOutcomeError> {
87        self.repo.transaction(true).await.map(|tx_repo| {
88            let tx_repo = Arc::new(tx_repo);
89            AppState {
90                terminology: self.terminology.clone(),
91                search: self.search.clone(),
92                repo: tx_repo.clone(),
93                rate_limit: self.rate_limit.clone(),
94                fhir_client: Arc::new(FHIRServerClient::new(ServerClientConfig::new(
95                    tx_repo,
96                    self.search.clone(),
97                    self.terminology.clone(),
98                    self.config.clone(),
99                ))),
100                config: self.config.clone(),
101            }
102        })
103    }
104    pub async fn commit(self) -> Result<(), OperationOutcomeError> {
105        let repo = self.repo.clone();
106        drop(self);
107
108        Arc::try_unwrap(repo)
109            .map_err(|_e| {
110                OperationOutcomeError::fatal(
111                    IssueType::Exception(None),
112                    "Failed to unwrap transaction client".to_string(),
113                )
114            })?
115            .commit()
116            .await?;
117
118        Ok(())
119    }
120}
121
122pub async fn create_services(
123    config: Arc<dyn Config<ServerEnvironmentVariables>>,
124) -> Result<
125    Arc<
126        AppState<
127            PGConnection,
128            ElasticSearchEngine,
129            FHIRCanonicalTerminology<LRUCanonicalRemoteResolver<PGConnection, ElasticSearchEngine>>,
130        >,
131    >,
132    OperationOutcomeError,
133> {
134    let pool = get_pool(config.as_ref()).await;
135    let search_engine = Arc::new(
136        haste_fhir_search::elastic_search::ElasticSearchEngine::new(
137            Arc::new(FPEngine::new()),
138            &config
139                .get(ServerEnvironmentVariables::ElasticSearchURL)
140                .expect(&format!(
141                    "'{}' variable not set",
142                    String::from(ServerEnvironmentVariables::ElasticSearchURL)
143                )),
144            config
145                .get(ServerEnvironmentVariables::ElasticSearchUsername)
146                .expect(&format!(
147                    "'{}' variable not set",
148                    String::from(ServerEnvironmentVariables::ElasticSearchUsername)
149                )),
150            config
151                .get(ServerEnvironmentVariables::ElasticSearchPassword)
152                .expect(&format!(
153                    "'{}' variable not set",
154                    String::from(ServerEnvironmentVariables::ElasticSearchPassword)
155                )),
156        )
157        .expect("Failed to create Elasticsearch client"),
158    );
159
160    let pool = Arc::new(PGConnection::pool(pool.clone()));
161
162    let terminology = Arc::new(FHIRCanonicalTerminology::new(
163        resolvers::remote::LRUCanonicalRemoteResolver::new(pool.clone(), search_engine.clone()),
164    ));
165
166    let can_mutate: String = config
167        .get(ServerEnvironmentVariables::AllowArtifactMutations)
168        .unwrap_or("false".into());
169
170    let fhir_client = Arc::new(FHIRServerClient::new(if can_mutate == "true" {
171        ServerClientConfig::allow_mutate_artifacts(
172            pool.clone(),
173            search_engine.clone(),
174            terminology.clone(),
175            config.clone(),
176        )
177    } else {
178        ServerClientConfig::new(
179            pool.clone(),
180            search_engine.clone(),
181            terminology.clone(),
182            config.clone(),
183        )
184    }));
185
186    let shared_state = Arc::new(AppState {
187        config,
188        rate_limit: pool.clone(),
189        repo: pool,
190        terminology: terminology,
191        search: search_engine,
192        fhir_client,
193    });
194
195    Ok(shared_state)
196}