Skip to main content

haste_indexing_worker/
lib.rs

1use crate::indexing_lock::IndexLockProvider;
2use haste_config::get_config;
3use haste_fhir_model::r4::generated::resources::ResourceTypeError;
4use haste_fhir_operation_error::{OperationOutcomeError, derive::OperationOutcomeError};
5use haste_fhir_search::{IndexResource, SearchEngine, elastic_search::ElasticSearchEngine};
6use haste_fhirpath::FHIRPathError;
7use haste_jwt::TenantId;
8use haste_repository::{fhir::FHIRRepository, types::SupportedFHIRVersions};
9use sqlx::{Pool, Postgres, query_as, types::time::OffsetDateTime};
10use std::sync::Arc;
11use tokio::sync::Mutex;
12
13mod indexing_lock;
14
15#[derive(OperationOutcomeError, Debug)]
16pub enum IndexingWorkerError {
17    #[fatal(code = "exception", diagnostic = "Database error: '{arg0}'")]
18    DatabaseConnectionError(#[from] sqlx::Error),
19    #[fatal(code = "exception", diagnostic = "Lock error: '{arg0}'")]
20    OperationError(#[from] OperationOutcomeError),
21    #[fatal(code = "exception", diagnostic = "Elasticsearch error: '{arg0}'")]
22    ElasticsearchError(#[from] elasticsearch::Error),
23    #[fatal(code = "exception", diagnostic = "FHIRPath error: '{arg0}'")]
24    FHIRPathError(#[from] FHIRPathError),
25    #[fatal(
26        code = "exception",
27        diagnostic = "Missing search parameters for resource: '{arg0}'"
28    )]
29    MissingSearchParameters(String),
30    #[fatal(
31        code = "exception",
32        diagnostic = "Fatal error occurred during indexing"
33    )]
34    Fatal,
35    #[fatal(
36        code = "exception",
37        diagnostic = "Artifact error: Invalid resource type '{arg0}'"
38    )]
39    ResourceTypeError(#[from] ResourceTypeError),
40}
41
42struct TenantReturn {
43    id: TenantId,
44    created_at: OffsetDateTime,
45}
46
47async fn get_tenants(
48    client: &Pool<Postgres>,
49    cursor: &OffsetDateTime,
50    count: usize,
51) -> Result<Vec<TenantReturn>, OperationOutcomeError> {
52    let result = query_as!(
53        TenantReturn,
54        r#"SELECT id as "id: TenantId", created_at FROM tenants WHERE created_at > $1 ORDER BY created_at DESC LIMIT $2"#,
55        cursor,
56        count as i64
57    )
58    .fetch_all(client)
59    .await
60    .map_err(IndexingWorkerError::from)?;
61
62    Ok(result)
63}
64
65static TOTAL_INDEXED: std::sync::LazyLock<Mutex<usize>> =
66    std::sync::LazyLock::new(|| Mutex::new(0));
67
68async fn index_tenant_next_sequence<
69    Repo: FHIRRepository + IndexLockProvider,
70    Engine: SearchEngine,
71>(
72    search_client: Arc<Engine>,
73    tx: &Repo,
74    tenant_id: &TenantId,
75) -> Result<(), IndexingWorkerError> {
76    let start = std::time::Instant::now();
77    let tenant_locks = tx.get_available_locks(vec![tenant_id]).await?;
78
79    if tenant_locks.is_empty() {
80        return Ok(());
81    }
82
83    let resources = tx
84        .get_sequence(
85            tenant_id,
86            tenant_locks[0].index_sequence_position as u64,
87            Some(1000),
88        )
89        .await?;
90
91    let resources_total = resources.len();
92    let start_sequence = resources.first().map(|r| r.sequence);
93    let last_value = resources.last().cloned();
94
95    // Perform indexing if there are resources to index.
96    if !resources.is_empty() {
97        let result = search_client
98            .index(
99                SupportedFHIRVersions::R4,
100                tenant_id.clone(),
101                resources
102                    .into_iter()
103                    .map(|r| IndexResource {
104                        id: r.id,
105                        version_id: r.version_id,
106                        project: r.project,
107                        fhir_method: r.fhir_method,
108                        resource_type: r.resource_type,
109                        resource: r.resource.0,
110                    })
111                    .collect(),
112            )
113            .await?;
114
115        if result.0 != resources_total {
116            tracing::error!(
117                "Indexed resource count '{}' does not match retrieved resource count '{}'",
118                result.0,
119                resources_total
120            );
121            return Err(IndexingWorkerError::Fatal);
122        }
123
124        if let Some(resource) = last_value {
125            let diff = (resource.sequence + 1) - start_sequence.unwrap_or(0);
126            let total = resources_total;
127
128            if total != diff as usize {
129                tracing::event!(
130                    tracing::Level::INFO,
131                    // safe_seq = resource.max_safe_seq.unwrap_or(0),
132                    first_seq = start_sequence.unwrap_or(0),
133                    last_seq = resource.sequence,
134                    total = resources_total,
135                    diff = (resource.sequence + 1) - start_sequence.unwrap_or(0)
136                );
137            }
138
139            tx.update_lock(tenant_id.as_ref(), resource.sequence as usize)
140                .await?;
141
142            let elapsed = start.elapsed();
143            tracing::info!(
144                "Indexed {} resources for tenant '{}' in {:.2?} (up to sequence {})",
145                result.0,
146                tenant_id.as_ref(),
147                elapsed,
148                resource.sequence
149            );
150        }
151
152        *(TOTAL_INDEXED.lock().await) += result.0;
153    }
154
155    Ok(())
156}
157
158async fn index_for_tenant<Search: SearchEngine, Repository: FHIRRepository + IndexLockProvider>(
159    repo: Arc<Repository>,
160    search_client: Arc<Search>,
161    tenant_id: &TenantId,
162) -> Result<(), IndexingWorkerError> {
163    let search_client = search_client.clone();
164
165    let tx = repo.transaction(false).await.unwrap();
166
167    let res = index_tenant_next_sequence(search_client, &tx, &tenant_id).await;
168
169    match res {
170        Ok(res) => {
171            tx.commit().await?;
172            Ok(res)
173        }
174        Err(e) => {
175            tx.rollback().await?;
176            Err(e)
177        }
178    }
179}
180
181pub enum IndexingWorkerEnvironmentVariables {
182    DatabaseURL,
183    ElasticSearchURL,
184    ElasticSearchUsername,
185    ElasticSearchPassword,
186}
187
188impl From<IndexingWorkerEnvironmentVariables> for String {
189    fn from(value: IndexingWorkerEnvironmentVariables) -> Self {
190        match value {
191            IndexingWorkerEnvironmentVariables::DatabaseURL => "DATABASE_URL".to_string(),
192            IndexingWorkerEnvironmentVariables::ElasticSearchURL => "ELASTICSEARCH_URL".to_string(),
193            IndexingWorkerEnvironmentVariables::ElasticSearchUsername => {
194                "ELASTICSEARCH_USERNAME".to_string()
195            }
196            IndexingWorkerEnvironmentVariables::ElasticSearchPassword => {
197                "ELASTICSEARCH_PASSWORD".to_string()
198            }
199        }
200    }
201}
202
203pub async fn run_worker() -> Result<(), OperationOutcomeError> {
204    let config = get_config::<IndexingWorkerEnvironmentVariables>("environment".into());
205    let fp_engine = Arc::new(haste_fhirpath::FPEngine::new());
206
207    let search_engine = Arc::new(
208        ElasticSearchEngine::new(
209            fp_engine.clone(),
210            &config
211                .get(IndexingWorkerEnvironmentVariables::ElasticSearchURL)
212                .expect(&format!(
213                    "'{}' variable not set",
214                    String::from(IndexingWorkerEnvironmentVariables::ElasticSearchURL)
215                )),
216            config
217                .get(IndexingWorkerEnvironmentVariables::ElasticSearchUsername)
218                .expect(&format!(
219                    "'{}' variable not set",
220                    String::from(IndexingWorkerEnvironmentVariables::ElasticSearchUsername)
221                )),
222            config
223                .get(IndexingWorkerEnvironmentVariables::ElasticSearchPassword)
224                .expect(&format!(
225                    "'{}' variable not set",
226                    String::from(IndexingWorkerEnvironmentVariables::ElasticSearchPassword)
227                )),
228        )
229        .expect("Failed to create Elasticsearch client"),
230    );
231
232    let mut attempts = 0;
233    while !search_engine.is_connected().await.is_ok() && attempts < 5 {
234        tracing::error!("Elasticsearch is not connected, retrying in 5 seconds...");
235        tokio::time::sleep(std::time::Duration::from_secs(5)).await;
236        attempts += 1;
237    }
238
239    let pg_pool = sqlx::PgPool::connect(
240        &config
241            .get(IndexingWorkerEnvironmentVariables::DatabaseURL)
242            .unwrap(),
243    )
244    .await
245    .expect("Failed to connect to the database");
246
247    let repo = Arc::new(haste_repository::pg::PGConnection::pool(pg_pool.clone()));
248    let mut cursor = OffsetDateTime::UNIX_EPOCH;
249    let tenants_limit: usize = 100;
250
251    tracing::info!("Starting indexing worker...");
252
253    let mut k = *TOTAL_INDEXED.lock().await;
254
255    loop {
256        let tenants_to_check = get_tenants(&pg_pool, &cursor, tenants_limit).await;
257        if let Ok(tenants_to_check) = tenants_to_check {
258            if tenants_to_check.is_empty() || tenants_to_check.len() < tenants_limit {
259                cursor = OffsetDateTime::UNIX_EPOCH; // Reset cursor if no tenants found
260            } else {
261                cursor = tenants_to_check[0].created_at;
262            }
263
264            for tenant in tenants_to_check {
265                let result =
266                    index_for_tenant(repo.clone(), search_engine.clone(), &tenant.id).await;
267
268                if let Err(_error) = result {
269                    tracing::error!(
270                        "Failed to index tenant: '{}' cause: '{:?}'",
271                        &tenant.id,
272                        _error
273                    );
274                }
275            }
276        } else if let Err(error) = tenants_to_check {
277            tracing::error!("Failed to retrieve tenants: {:?}", error);
278        }
279
280        if k != *TOTAL_INDEXED.lock().await {
281            k = *TOTAL_INDEXED.lock().await;
282            tracing::info!("TOTAL INDEXED SO FAR: {}", k);
283        }
284    }
285}