From db8cae3f1e316c02334d0aae6dc8eb5d232bed42 Mon Sep 17 00:00:00 2001
From: Kould <kould2333@gmail.com>
Date: Fri, 29 Dec 2023 10:09:23 +0800
Subject: [PATCH] feat: impl websocket upload file for doc_info (#20)

---
 Cargo.toml                                    |  9 +-
 .../src/m20220101_000001_create_table.rs      |  3 +-
 src/api/doc_info.rs                           | 24 +++--
 src/main.rs                                   |  4 +
 src/web_socket/doc_info.rs                    | 97 +++++++++++++++++++
 src/web_socket/mod.rs                         |  1 +
 6 files changed, 126 insertions(+), 12 deletions(-)
 create mode 100644 src/web_socket/doc_info.rs
 create mode 100644 src/web_socket/mod.rs

diff --git a/Cargo.toml b/Cargo.toml
index 2a08cd0..aa229a5 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,9 +13,15 @@ actix-multipart = "0.4"
 actix-session = { version = "0.5" }
 actix-identity = { version = "0.4" }
 actix-web-httpauth = { version = "0.6" }
+actix-ws = "0.2.5"
+uuid = { version = "1.6.1", features = [
+    "v4",
+    "fast-rng",
+    "macro-diagnostics",
+] }
 thiserror = "1.0"
 postgres = "0.19.7"
-sea-orm = {version = "0.12.9", features = ["sqlx-postgres", "runtime-tokio-native-tls", "macros"]}
+sea-orm = { version = "0.12.9", features = ["sqlx-postgres", "runtime-tokio-native-tls", "macros"] }
 serde = { version = "1", features = ["derive"] }
 serde_json = "1.0"
 tracing-subscriber = "0.3.18"
@@ -27,6 +33,7 @@ minio = "0.1.0"
 futures-util = "0.3.29"
 actix-multipart-extract = "0.1.5"
 regex = "1.10.2"
+tokio = { version = "1.35.1", features = ["rt", "time", "macros"] }
 
 [[bin]]
 name = "doc_gpt"
diff --git a/migration/src/m20220101_000001_create_table.rs b/migration/src/m20220101_000001_create_table.rs
index e6892d7..439dc4b 100644
--- a/migration/src/m20220101_000001_create_table.rs
+++ b/migration/src/m20220101_000001_create_table.rs
@@ -1,6 +1,7 @@
-use sea_orm_migration::{ prelude::*, sea_orm::Statement };
+use sea_orm_migration::prelude::*;
 use chrono::{ FixedOffset, Utc };
 
+#[allow(dead_code)]
 fn now() -> chrono::DateTime<FixedOffset> {
     Utc::now().with_timezone(&FixedOffset::east_opt(3600 * 8).unwrap())
 }
diff --git a/src/api/doc_info.rs b/src/api/doc_info.rs
index af1ee13..6d3f5a3 100644
--- a/src/api/doc_info.rs
+++ b/src/api/doc_info.rs
@@ -107,8 +107,12 @@ async fn upload(
     payload: Multipart<UploadForm>,
     data: web::Data<AppState>
 ) -> Result<HttpResponse, AppError> {
-    let uid = payload.uid;
-    let file_name = payload.file_field.name.as_str();
+
+
+    Ok(HttpResponse::Ok().body("File uploaded successfully"))
+}
+
+pub(crate) async fn _upload_file(uid: i64, did: i64, file_name: &str, bytes: &[u8], data: &web::Data<AppState>) -> Result<(), AppError> {
     async fn add_number_to_filename(
         file_name: &str,
         conn: &DbConn,
@@ -138,9 +142,9 @@ async fn upload(
         }
         new_file_name
     }
-    let fnm = add_number_to_filename(file_name, &data.conn, uid, payload.did).await;
+    let fnm = add_number_to_filename(file_name, &data.conn, uid, did).await;
 
-    let bucket_name = format!("{}-upload", payload.uid);
+    let bucket_name = format!("{}-upload", uid);
     let s3_client: &minio::s3::client::Client = &data.s3_client;
     let buckets_exists = s3_client
         .bucket_exists(&BucketExistsArgs::new(&bucket_name).unwrap()).await
@@ -152,7 +156,7 @@ async fn upload(
         print!("Existing bucket: {}", bucket_name.clone());
     }
 
-    let location = format!("/{}/{}", payload.did, fnm)
+    let location = format!("/{}/{}", did, fnm)
         .as_bytes()
         .to_vec()
         .iter()
@@ -164,8 +168,8 @@ async fn upload(
         &mut PutObjectArgs::new(
             &bucket_name,
             &location,
-            &mut BufReader::new(payload.file_field.bytes.as_slice()),
-            Some(payload.file_field.bytes.len()),
+            &mut BufReader::new(bytes),
+            Some(bytes.len()),
             None
         )?
     ).await?;
@@ -174,7 +178,7 @@ async fn upload(
         did: Default::default(),
         uid: uid,
         doc_name: fnm.clone(),
-        size: payload.file_field.bytes.len() as i64,
+        size: bytes.len() as i64,
         location,
         r#type: file_type(&fnm),
         thumbnail_base64: Default::default(),
@@ -183,9 +187,9 @@ async fn upload(
         is_deleted: Default::default(),
     }).await?;
 
-    let _ = Mutation::place_doc(&data.conn, payload.did, doc.did.unwrap()).await?;
+    let _ = Mutation::place_doc(&data.conn, did, doc.did.unwrap()).await?;
 
-    Ok(HttpResponse::Ok().body("File uploaded successfully"))
+    Ok(())
 }
 
 #[derive(Deserialize, Debug)]
diff --git a/src/main.rs b/src/main.rs
index e301677..0b2a284 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -2,6 +2,7 @@ mod api;
 mod entity;
 mod service;
 mod errors;
+mod web_socket;
 
 use std::env;
 use actix_files::Files;
@@ -19,6 +20,7 @@ use minio::s3::http::BaseUrl;
 use sea_orm::{ Database, DatabaseConnection };
 use migration::{ Migrator, MigratorTrait };
 use crate::errors::{ AppError, UserError };
+use crate::web_socket::doc_info::upload_file_ws;
 
 #[derive(Debug, Clone)]
 struct AppState {
@@ -138,4 +140,6 @@ fn init(cfg: &mut web::ServiceConfig) {
     cfg.service(api::user_info::login);
     cfg.service(api::user_info::register);
     cfg.service(api::user_info::setting);
+
+    cfg.service(web::resource("/ws-upload-doc").route(web::get().to(upload_file_ws)));
 }
diff --git a/src/web_socket/doc_info.rs b/src/web_socket/doc_info.rs
new file mode 100644
index 0000000..70b8022
--- /dev/null
+++ b/src/web_socket/doc_info.rs
@@ -0,0 +1,97 @@
+use std::io::{Cursor, Write};
+use std::time::{Duration, Instant};
+use actix_rt::time::interval;
+use actix_web::{HttpRequest, HttpResponse, rt, web};
+use actix_web::web::Buf;
+use actix_ws::Message;
+use futures_util::{future, StreamExt};
+use futures_util::future::Either;
+use uuid::Uuid;
+use crate::api::doc_info::_upload_file;
+use crate::AppState;
+use crate::errors::AppError;
+
+const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
+
+/// How long before lack of client response causes a timeout.
+const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
+
+pub async fn upload_file_ws(req: HttpRequest, stream: web::Payload, data: web::Data<AppState>) -> Result<HttpResponse, AppError> {
+    let (res, session, msg_stream) = actix_ws::handle(&req, stream)?;
+
+    // spawn websocket handler (and don't await it) so that the response is returned immediately
+    rt::spawn(upload_file_handler(data, session, msg_stream));
+
+    Ok(res)
+}
+
+async fn upload_file_handler(
+    data: web::Data<AppState>,
+    mut session: actix_ws::Session,
+    mut msg_stream: actix_ws::MessageStream,
+) {
+    let mut bytes = Cursor::new(vec![]);
+    let mut last_heartbeat = Instant::now();
+    let mut interval = interval(HEARTBEAT_INTERVAL);
+
+    let reason = loop {
+        let tick = interval.tick();
+        tokio::pin!(tick);
+
+        match future::select(msg_stream.next(), tick).await {
+            // received message from WebSocket client
+            Either::Left((Some(Ok(msg)), _)) => {
+                match msg {
+                    Message::Text(text) => {
+                        session.text(text).await.unwrap();
+                    }
+
+                    Message::Binary(bin) => {
+                        let mut pos = 0;  // notice the name of the file that will be written
+                        while pos < bin.len() {
+                            let bytes_written = bytes.write(&bin[pos..]).unwrap();
+                            pos += bytes_written
+                        };
+                        session.binary(bin).await.unwrap();
+                    }
+
+                    Message::Close(reason) => {
+                        break reason;
+                    }
+
+                    Message::Ping(bytes) => {
+                        last_heartbeat = Instant::now();
+                        let _ = session.pong(&bytes).await;
+                    }
+
+                    Message::Pong(_) => {
+                        last_heartbeat = Instant::now();
+                    }
+
+                    Message::Continuation(_) | Message::Nop => {}
+                };
+            }
+            Either::Left((Some(Err(_)), _)) => {
+                break None;
+            }
+            Either::Left((None, _)) => break None,
+            Either::Right((_inst, _)) => {
+                if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT {
+                    break None;
+                }
+
+                let _ = session.ping(b"").await;
+            }
+        }
+    };
+    let _ = session.close(reason).await;
+
+    if !bytes.has_remaining() {
+        return;
+    }
+
+    let uid = bytes.get_i64();
+    let did = bytes.get_i64();
+
+    _upload_file(uid, did, &Uuid::new_v4().to_string(), &bytes.into_inner(), &data).await.unwrap();
+}
\ No newline at end of file
diff --git a/src/web_socket/mod.rs b/src/web_socket/mod.rs
new file mode 100644
index 0000000..2753903
--- /dev/null
+++ b/src/web_socket/mod.rs
@@ -0,0 +1 @@
+pub mod doc_info;
\ No newline at end of file
-- 
GitLab