Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import com.nextcloud.client.database.entity.toUploadEntity
import com.nextcloud.client.device.BatteryStatus
import com.nextcloud.client.device.PowerManagementService
import com.nextcloud.client.jobs.BackgroundJobManager
import com.nextcloud.client.jobs.upload.FileUploadWorker.Companion.currentUploadFileOperation
import com.nextcloud.client.jobs.upload.FileUploadWorker.Companion.activeUploadFileOperations
import com.nextcloud.client.network.Connectivity
import com.nextcloud.client.network.ConnectivityService
import com.nextcloud.utils.extensions.getUploadIds
Expand Down Expand Up @@ -360,17 +360,12 @@ class FileUploadHelper {

@Suppress("ReturnCount")
fun isUploadingNow(upload: OCUpload?): Boolean {
val currentUploadFileOperation = currentUploadFileOperation
if (currentUploadFileOperation == null || currentUploadFileOperation.user == null) return false
if (upload == null || upload.accountName != currentUploadFileOperation.user.accountName) return false

return if (currentUploadFileOperation.oldFile != null) {
// For file conflicts check old file remote path
upload.remotePath == currentUploadFileOperation.remotePath ||
upload.remotePath == currentUploadFileOperation.oldFile!!
.remotePath
} else {
upload.remotePath == currentUploadFileOperation.remotePath
upload ?: return false

return activeUploadFileOperations.values.any { operation ->
operation.user?.accountName == upload.accountName &&
(upload.remotePath == operation.remotePath ||
upload.remotePath == operation.oldFile?.remotePath)
}
}

Expand Down
184 changes: 124 additions & 60 deletions app/src/main/java/com/nextcloud/client/jobs/upload/FileUploadWorker.kt
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,17 @@ import com.owncloud.android.operations.UploadFileOperation
import com.owncloud.android.ui.notifications.NotificationUtils
import com.owncloud.android.utils.theme.ViewThemeUtils
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import kotlinx.coroutines.withContext
import java.io.File
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import kotlin.random.Random

@Suppress("LongParameterList", "TooGenericExceptionCaught")
Expand All @@ -73,10 +81,15 @@ class FileUploadWorker(
const val UPLOAD_IDS = "uploads_ids"
const val CURRENT_BATCH_INDEX = "batch_index"
const val TOTAL_UPLOAD_SIZE = "total_upload_size"
const val SHOW_SAME_FILE_ALREADY_EXISTS_NOTIFICATION = "show_same_file_already_exists_notification"

var currentUploadFileOperation: UploadFileOperation? = null
/**
* The maximum number of concurrent parallel uploads
*/
const val MAX_CONCURRENT_UPLOADS = 10
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the user be given flexibility over this field?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be helpfull as depending on the phone specs and "bloat" on the phone it may freeze or lag substantially.


const val SHOW_SAME_FILE_ALREADY_EXISTS_NOTIFICATION = "show_same_file_already_exists_notification"

val activeUploadFileOperations = ConcurrentHashMap<String, UploadFileOperation>()
private const val UPLOADS_ADDED_MESSAGE = "UPLOADS_ADDED"
private const val UPLOAD_START_MESSAGE = "UPLOAD_START"
private const val UPLOAD_FINISH_MESSAGE = "UPLOAD_FINISH"
Expand All @@ -103,20 +116,18 @@ class FileUploadWorker(
fun getUploadFinishMessage(): String = FileUploadWorker::class.java.name + UPLOAD_FINISH_MESSAGE

fun cancelCurrentUpload(remotePath: String, accountName: String, onCompleted: () -> Unit) {
currentUploadFileOperation?.let {
activeUploadFileOperations.values.forEach {
if (it.remotePath == remotePath && it.user.accountName == accountName) {
it.cancel(ResultCode.USER_CANCELLED)
onCompleted()
}
}
onCompleted()
}

fun isUploading(remotePath: String?, accountName: String?): Boolean {
currentUploadFileOperation?.let {
return it.remotePath == remotePath && it.user.accountName == accountName
return activeUploadFileOperations.values.any {
it.remotePath == remotePath && it.user.accountName == accountName
}

return false
}

fun getUploadAction(action: String): Int = when (action) {
Expand All @@ -127,7 +138,9 @@ class FileUploadWorker(
}
}

private var lastPercent = 0
private val lastPercents = ConcurrentHashMap<String, Int>()
private val lastUpdateTimes = ConcurrentHashMap<String, Long>()

private val notificationId = Random.nextInt()
private val notificationManager = UploadNotificationManager(context, viewThemeUtils, notificationId)
private val intents = FileUploaderIntents(context)
Expand Down Expand Up @@ -202,7 +215,7 @@ class FileUploadWorker(
Log_OC.e(TAG, "FileUploadWorker stopped")

setIdleWorkerState()
currentUploadFileOperation?.cancel(null)
activeUploadFileOperations.values.forEach { it.cancel(null) }
notificationManager.dismissNotification()
}

Expand All @@ -211,7 +224,8 @@ class FileUploadWorker(
}

private fun setIdleWorkerState() {
WorkerStateObserver.send(WorkerState.FileUploadCompleted(currentUploadFileOperation?.file))
val lastOp = activeUploadFileOperations.values.lastOrNull()
WorkerStateObserver.send(WorkerState.FileUploadCompleted(lastOp?.file))
}

@Suppress("ReturnCount", "LongMethod", "DEPRECATION")
Expand Down Expand Up @@ -253,52 +267,95 @@ class FileUploadWorker(
val ocAccount = OwnCloudAccount(user.toPlatformAccount(), context)
val client = OwnCloudClientManagerFactory.getDefaultSingleton().getClientFor(ocAccount, context)

for ((index, upload) in uploads.withIndex()) {
ensureActive()
return@withContext parallelUpload(
uploads,
user,
previouslyUploadedFileSize,
totalUploadSize,
client,
accountName
)
}

if (preferences.isGlobalUploadPaused) {
Log_OC.d(TAG, "Upload is paused, skip uploading files!")
notificationManager.notifyPaused(
intents.openUploadListIntent(null)
)
return@withContext Result.success()
}
private suspend fun parallelUpload(
uploads: List<OCUpload>?,
user: User,
previouslyUploadedFileSize: Int,
totalUploadSize: Int,
client: OwnCloudClient,
accountName: String
): Result {
if (uploads.isNullOrEmpty()) {
return Result.success()
}

if (canExitEarly()) {
notificationManager.showConnectionErrorNotification()
return@withContext Result.failure()
}
val semaphore = Semaphore(MAX_CONCURRENT_UPLOADS)
val quotaExceeded = AtomicBoolean(false)
val completedCount = AtomicInteger(0)
val storageManager = FileDataStorageManager(user, context.contentResolver)

coroutineScope {
for (upload in uploads) {
if (quotaExceeded.get()) break
ensureActive()

launch {
if (preferences.isGlobalUploadPaused) {
Log_OC.d(TAG, "Upload is paused, skip uploading files!")
notificationManager.notifyPaused(intents.openUploadListIntent(null))
return@launch
}

setWorkerState(user)
val operation = createUploadFileOperation(upload, user)
currentUploadFileOperation = operation
semaphore.withPermit {
if (quotaExceeded.get() || isStopped) return@launch

val currentIndex = (index + 1)
val currentUploadIndex = (currentIndex + previouslyUploadedFileSize)
notificationManager.prepareForStart(
operation,
startIntent = intents.openUploadListIntent(operation),
currentUploadIndex = currentUploadIndex,
totalUploadSize = totalUploadSize
)
if (canExitEarly()) {
notificationManager.showConnectionErrorNotification()
return@launch
}

val result = withContext(Dispatchers.IO) {
upload(operation, user, client)
}
val entity = uploadsStorageManager.uploadDao.getUploadById(upload.uploadId, accountName)
uploadsStorageManager.updateStatus(entity, result.isSuccess)
currentUploadFileOperation = null

if (result.code == ResultCode.QUOTA_EXCEEDED) {
Log_OC.w(TAG, "Quota exceeded, stopping uploads")
notificationManager.showQuotaExceedNotification(operation)
break
}
setWorkerState(user)
val operation = createUploadFileOperation(upload, user, storageManager)
activeUploadFileOperations[operation.originalStoragePath] = operation

try {
val currentUploadIndex = previouslyUploadedFileSize + completedCount.incrementAndGet()

// Synchronize notification updates
synchronized(notificationManager) {
notificationManager.prepareForStart(
operation,
startIntent = intents.openUploadListIntent(operation),
currentUploadIndex = currentUploadIndex,
totalUploadSize = totalUploadSize
)
}

val result = upload(operation, user, client)

val entity = uploadsStorageManager.uploadDao.getUploadById(upload.uploadId, accountName)
uploadsStorageManager.updateStatus(entity, result.isSuccess)

if (result.code == ResultCode.QUOTA_EXCEEDED) {
Log_OC.w(TAG, "Quota exceeded, stopping uploads")
notificationManager.showQuotaExceedNotification(operation)
quotaExceeded.set(true)
[email protected]("Quota exceeded")
return@launch
}

sendUploadFinishEvent(totalUploadSize, currentUploadIndex, operation, result)
sendUploadFinishEvent(totalUploadSize, currentUploadIndex, operation, result)
} finally {
activeUploadFileOperations.remove(operation.originalStoragePath)
lastPercents.remove(operation.originalStoragePath)
lastUpdateTimes.remove(operation.originalStoragePath)
}
}
}
}
}

return@withContext Result.success()
return if (quotaExceeded.get()) Result.failure() else Result.success()
}

private fun sendUploadFinishEvent(
Expand Down Expand Up @@ -336,7 +393,11 @@ class FileUploadWorker(
return result
}

private fun createUploadFileOperation(upload: OCUpload, user: User): UploadFileOperation = UploadFileOperation(
private fun createUploadFileOperation(
upload: OCUpload,
user: User,
storageManager: FileDataStorageManager
): UploadFileOperation = UploadFileOperation(
uploadsStorageManager,
connectivityService,
powerManagementService,
Expand All @@ -349,7 +410,7 @@ class FileUploadWorker(
upload.isUseWifiOnly,
upload.isWhileChargingOnly,
true,
FileDataStorageManager(user, context.contentResolver)
storageManager
).apply {
addDataTransferProgressListener(this@FileUploadWorker)
}
Expand Down Expand Up @@ -410,20 +471,24 @@ class FileUploadWorker(
totalToTransfer: Long,
fileAbsoluteName: String
) {
val operation = activeUploadFileOperations[fileAbsoluteName] ?: return
val percent = getPercent(totalTransferredSoFar, totalToTransfer)
val currentTime = System.currentTimeMillis()

val lastPercent = lastPercents[fileAbsoluteName] ?: 0
val lastUpdateTime = lastUpdateTimes[fileAbsoluteName] ?: 0L

if (percent != lastPercent && (currentTime - lastUpdateTime) >= minProgressUpdateInterval) {
notificationManager.run {
val accountName = currentUploadFileOperation?.user?.accountName
val remotePath = currentUploadFileOperation?.remotePath
synchronized(notificationManager) {
val accountName = operation.user.accountName
val remotePath = operation.remotePath

updateUploadProgress(percent, currentUploadFileOperation)
notificationManager.updateUploadProgress(percent, operation)

if (accountName != null && remotePath != null) {
val key: String = FileUploadHelper.buildRemoteName(accountName, remotePath)
val boundListener = FileUploadHelper.mBoundListeners[key]
val filename = currentUploadFileOperation?.fileName ?: ""
val filename = operation.fileName ?: ""

boundListener?.onTransferProgress(
progressRate,
Expand All @@ -433,11 +498,10 @@ class FileUploadWorker(
)
}

dismissOldErrorNotification(currentUploadFileOperation)
notificationManager.dismissOldErrorNotification(operation)
}
lastUpdateTime = currentTime
lastUpdateTimes[fileAbsoluteName] = currentTime
lastPercents[fileAbsoluteName] = percent
}

lastPercent = percent
}
}