feat(storage): Implement multipart upload functionality and caching for file operations

- Added multipart upload methods to the IStorageProvider interface and implemented them in the MinioStorageProvider and AliyunOssStorageProvider classes.
- Integrated Redis caching for user quota management in ConfirmUploadCommandHandler and DeleteFileCommandHandler to ensure updated quota values.
- Enhanced GetUserQuotaQueryHandler to utilize cache-aside pattern for improved performance.
- Updated Dependency Injection to register Redis cache service and configured related settings.
- Introduced database schema changes to support multipart uploads and their parts.
This commit is contained in:
Ho Ngoc Hai
2026-01-13 23:42:35 +07:00
parent 517bea32e7
commit 964e33bee6
29 changed files with 2298 additions and 19 deletions

View File

@@ -4,6 +4,7 @@ using Microsoft.Extensions.Options;
using StorageService.API.Application.Queries;
using StorageService.Domain.AggregatesModel.FileAggregate;
using StorageService.Domain.AggregatesModel.QuotaAggregate;
using StorageService.Infrastructure.Caching;
using StorageService.Infrastructure.Configuration;
using StorageService.Infrastructure.Storage;
@@ -25,6 +26,7 @@ public class ConfirmUploadCommandHandler : IRequestHandler<ConfirmUploadCommand,
private readonly IQuotaRepository _quotaRepository;
private readonly IStorageProviderFactory _storageProviderFactory;
private readonly StorageSettings _settings;
private readonly IRedisCacheService _cache;
private readonly ILogger<ConfirmUploadCommandHandler> _logger;
public ConfirmUploadCommandHandler(
@@ -32,12 +34,14 @@ public class ConfirmUploadCommandHandler : IRequestHandler<ConfirmUploadCommand,
IQuotaRepository quotaRepository,
IStorageProviderFactory storageProviderFactory,
IOptions<StorageSettings> settings,
IRedisCacheService cache,
ILogger<ConfirmUploadCommandHandler> logger)
{
_fileRepository = fileRepository;
_quotaRepository = quotaRepository;
_storageProviderFactory = storageProviderFactory;
_settings = settings.Value;
_cache = cache;
_logger = logger;
}
@@ -124,6 +128,13 @@ public class ConfirmUploadCommandHandler : IRequestHandler<ConfirmUploadCommand,
// EN: Save changes / VI: Lưu thay đổi
await _fileRepository.UnitOfWork.SaveEntitiesAsync(cancellationToken);
// EN: Invalidate quota cache so next query gets updated value
// VI: Invalidate quota cache để query tiếp theo lấy giá trị mới
if (Guid.TryParse(request.UserId, out var userId))
{
await _cache.DeleteAsync(CacheKeys.UserQuota(userId), cancellationToken);
}
_logger.LogInformation(
"Upload confirmed successfully: fileId={FileId}, objectKey={ObjectKey}, user={UserId}",
storageFile.Id, request.ObjectKey, request.UserId);

View File

@@ -1,6 +1,7 @@
using MediatR;
using StorageService.Domain.AggregatesModel.FileAggregate;
using StorageService.Domain.AggregatesModel.QuotaAggregate;
using StorageService.Infrastructure.Caching;
using StorageService.Infrastructure.Storage;
namespace StorageService.API.Application.Commands;
@@ -14,17 +15,20 @@ public class DeleteFileCommandHandler : IRequestHandler<DeleteFileCommand, Delet
private readonly IFileRepository _fileRepository;
private readonly IQuotaRepository _quotaRepository;
private readonly IStorageProviderFactory _storageProviderFactory;
private readonly IRedisCacheService _cache;
private readonly ILogger<DeleteFileCommandHandler> _logger;
public DeleteFileCommandHandler(
IFileRepository fileRepository,
IQuotaRepository quotaRepository,
IStorageProviderFactory storageProviderFactory,
IRedisCacheService cache,
ILogger<DeleteFileCommandHandler> logger)
{
_fileRepository = fileRepository;
_quotaRepository = quotaRepository;
_storageProviderFactory = storageProviderFactory;
_cache = cache;
_logger = logger;
}
@@ -72,6 +76,12 @@ public class DeleteFileCommandHandler : IRequestHandler<DeleteFileCommand, Delet
// EN: Save changes / VI: Lưu thay đổi
await _fileRepository.UnitOfWork.SaveEntitiesAsync(cancellationToken);
// EN: Invalidate quota cache / VI: Invalidate quota cache
if (Guid.TryParse(request.UserId, out var userId))
{
await _cache.DeleteAsync(CacheKeys.UserQuota(userId), cancellationToken);
}
_logger.LogInformation("File deleted successfully: {FileId}", request.FileId);
return new DeleteFileResult(true, null);
}

View File

@@ -0,0 +1,25 @@
using MediatR;
namespace StorageService.API.Application.Commands.MultipartUpload;
/// <summary>
/// EN: Command to abort a multipart upload.
/// VI: Command để hủy multipart upload.
/// </summary>
public record AbortMultipartUploadCommand(
Guid UploadId,
string UserId
) : IRequest<AbortMultipartUploadResult>;
/// <summary>
/// EN: Result of multipart upload abortion.
/// VI: Kết quả hủy multipart upload.
/// </summary>
public record AbortMultipartUploadResult(
bool Success,
string? Error
)
{
public static AbortMultipartUploadResult Ok() => new(true, null);
public static AbortMultipartUploadResult Fail(string error) => new(false, error);
}

View File

@@ -0,0 +1,88 @@
using MediatR;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using StorageService.Infrastructure.Persistence;
using StorageService.Infrastructure.Storage;
namespace StorageService.API.Application.Commands.MultipartUpload;
/// <summary>
/// EN: Handler for AbortMultipartUploadCommand.
/// VI: Handler cho AbortMultipartUploadCommand.
/// </summary>
public class AbortMultipartUploadCommandHandler
: IRequestHandler<AbortMultipartUploadCommand, AbortMultipartUploadResult>
{
private readonly StorageServiceContext _context;
private readonly IStorageProviderFactory _storageProviderFactory;
private readonly ILogger<AbortMultipartUploadCommandHandler> _logger;
public AbortMultipartUploadCommandHandler(
StorageServiceContext context,
IStorageProviderFactory storageProviderFactory,
ILogger<AbortMultipartUploadCommandHandler> logger)
{
_context = context;
_storageProviderFactory = storageProviderFactory;
_logger = logger;
}
public async Task<AbortMultipartUploadResult> Handle(
AbortMultipartUploadCommand request,
CancellationToken cancellationToken)
{
try
{
_logger.LogInformation("Aborting multipart upload {UploadId}", request.UploadId);
// EN: Get multipart upload / VI: Lấy multipart upload
var upload = await _context.MultipartUploads
.FirstOrDefaultAsync(u => u.Id == request.UploadId, cancellationToken);
if (upload == null)
{
_logger.LogWarning("Multipart upload not found: {UploadId}", request.UploadId);
return AbortMultipartUploadResult.Fail("Multipart upload not found");
}
// EN: Verify ownership / VI: Xác minh quyền sở hữu
if (upload.UserId != request.UserId)
{
_logger.LogWarning("User {UserId} attempted to abort upload owned by {OwnerId}",
request.UserId, upload.UserId);
return AbortMultipartUploadResult.Fail("Unauthorized");
}
// EN: Abort at provider / VI: Hủy tại provider
var provider = _storageProviderFactory.GetProvider();
try
{
await provider.AbortMultipartUploadAsync(
upload.BucketName,
upload.ObjectKey,
upload.ProviderUploadId,
cancellationToken);
}
catch (Exception ex)
{
// EN: Log but continue - provider cleanup is best effort
// VI: Log nhưng tiếp tục - cleanup provider là best effort
_logger.LogWarning(ex, "Failed to abort multipart upload at provider for {UploadId}", request.UploadId);
}
// EN: Mark as aborted in database / VI: Đánh dấu aborted trong database
upload.Abort();
await _context.SaveEntitiesAsync(cancellationToken);
_logger.LogInformation("Multipart upload {UploadId} aborted successfully", request.UploadId);
return AbortMultipartUploadResult.Ok();
}
catch (Exception ex)
{
_logger.LogError(ex, "Error aborting multipart upload {UploadId}", request.UploadId);
return AbortMultipartUploadResult.Fail("An error occurred while aborting multipart upload.");
}
}
}

View File

@@ -0,0 +1,29 @@
using MediatR;
namespace StorageService.API.Application.Commands.MultipartUpload;
/// <summary>
/// EN: Command to complete a multipart upload.
/// VI: Command để hoàn thành multipart upload.
/// </summary>
public record CompleteMultipartUploadCommand(
Guid UploadId,
string UserId
) : IRequest<CompleteMultipartUploadResult>;
/// <summary>
/// EN: Result of multipart upload completion.
/// VI: Kết quả hoàn thành multipart upload.
/// </summary>
public record CompleteMultipartUploadResult(
bool Success,
Guid? FileId,
string? ObjectKey,
string? Error
)
{
public static CompleteMultipartUploadResult Ok(Guid fileId, string objectKey) =>
new(true, fileId, objectKey, null);
public static CompleteMultipartUploadResult Fail(string error) =>
new(false, null, null, error);
}

View File

@@ -0,0 +1,120 @@
using MediatR;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using StorageService.Domain.AggregatesModel.FileAggregate;
using StorageService.Domain.AggregatesModel.QuotaAggregate;
using StorageService.Infrastructure.Persistence;
using StorageService.Infrastructure.Storage;
namespace StorageService.API.Application.Commands.MultipartUpload;
/// <summary>
/// EN: Handler for CompleteMultipartUploadCommand.
/// VI: Handler cho CompleteMultipartUploadCommand.
/// </summary>
public class CompleteMultipartUploadCommandHandler
: IRequestHandler<CompleteMultipartUploadCommand, CompleteMultipartUploadResult>
{
private readonly StorageServiceContext _context;
private readonly IQuotaRepository _quotaRepository;
private readonly IStorageProviderFactory _storageProviderFactory;
private readonly ILogger<CompleteMultipartUploadCommandHandler> _logger;
public CompleteMultipartUploadCommandHandler(
StorageServiceContext context,
IQuotaRepository quotaRepository,
IStorageProviderFactory storageProviderFactory,
ILogger<CompleteMultipartUploadCommandHandler> logger)
{
_context = context;
_quotaRepository = quotaRepository;
_storageProviderFactory = storageProviderFactory;
_logger = logger;
}
public async Task<CompleteMultipartUploadResult> Handle(
CompleteMultipartUploadCommand request,
CancellationToken cancellationToken)
{
try
{
_logger.LogInformation("Completing multipart upload {UploadId}", request.UploadId);
// EN: Get multipart upload with parts / VI: Lấy multipart upload với parts
var upload = await _context.MultipartUploads
.Include(u => u.Parts)
.FirstOrDefaultAsync(u => u.Id == request.UploadId, cancellationToken);
if (upload == null)
{
_logger.LogWarning("Multipart upload not found: {UploadId}", request.UploadId);
return CompleteMultipartUploadResult.Fail("Multipart upload not found");
}
// EN: Verify ownership / VI: Xác minh quyền sở hữu
if (upload.UserId != request.UserId)
{
_logger.LogWarning("User {UserId} attempted to complete upload owned by {OwnerId}",
request.UserId, upload.UserId);
return CompleteMultipartUploadResult.Fail("Unauthorized");
}
// EN: Complete the upload (domain validation) / VI: Hoàn thành upload (domain validation)
try
{
upload.Complete();
}
catch (InvalidOperationException ex)
{
_logger.LogWarning(ex, "Cannot complete upload {UploadId}: {Message}", request.UploadId, ex.Message);
return CompleteMultipartUploadResult.Fail(ex.Message);
}
// EN: Complete multipart upload at provider / VI: Hoàn thành multipart upload tại provider
var provider = _storageProviderFactory.GetProvider();
var parts = upload.Parts
.OrderBy(p => p.PartNumber)
.Select(p => new PartETag(p.PartNumber, p.ETag))
.ToList();
await provider.CompleteMultipartUploadAsync(
upload.BucketName,
upload.ObjectKey,
upload.ProviderUploadId,
parts,
cancellationToken);
// EN: Create StorageFile entity / VI: Tạo StorageFile entity
var storageFile = new StorageFile(
fileName: upload.FileName,
bucketName: upload.BucketName,
objectKey: upload.ObjectKey,
contentType: upload.ContentType,
fileSizeBytes: upload.TotalSizeBytes,
userId: upload.UserId,
provider: provider.ProviderType,
accessLevel: FileAccessLevel.Private // TODO: Get from upload metadata
);
await _context.StorageFiles.AddAsync(storageFile, cancellationToken);
// EN: Update user quota / VI: Cập nhật quota của user
var quota = await _quotaRepository.GetOrCreateAsync(upload.UserId, cancellationToken);
quota.AddUsage(upload.TotalSizeBytes, 1);
// EN: Save all changes / VI: Lưu tất cả thay đổi
await _context.SaveEntitiesAsync(cancellationToken);
_logger.LogInformation(
"Multipart upload completed successfully. FileId: {FileId}, Size: {Size} bytes",
storageFile.Id, upload.TotalSizeBytes);
return CompleteMultipartUploadResult.Ok(storageFile.Id, upload.ObjectKey);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error completing multipart upload {UploadId}", request.UploadId);
return CompleteMultipartUploadResult.Fail("An error occurred while completing multipart upload.");
}
}
}

View File

@@ -0,0 +1,34 @@
using MediatR;
using StorageService.Domain.AggregatesModel.FileAggregate;
namespace StorageService.API.Application.Commands.MultipartUpload;
/// <summary>
/// EN: Command to initiate a multipart upload.
/// VI: Command để khởi tạo multipart upload.
/// </summary>
public record InitiateMultipartUploadCommand(
string FileName,
long FileSizeBytes,
string ContentType,
string UserId,
int? ChunkSizeBytes = null, // Default: 5MB
FileAccessLevel AccessLevel = FileAccessLevel.Private,
string? TenantId = null
) : IRequest<InitiateMultipartUploadResult>;
/// <summary>
/// EN: Result of multipart upload initiation.
/// VI: Kết quả khởi tạo multipart upload.
/// </summary>
public record InitiateMultipartUploadResult(
bool Success,
Guid? UploadId,
string? ProviderUploadId,
string? ObjectKey,
string? BucketName,
int? TotalChunks,
int? ChunkSizeBytes,
DateTime? ExpiresAt,
string? Error
);

View File

@@ -0,0 +1,173 @@
using MediatR;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StorageService.Domain.AggregatesModel.MultipartUploadAggregate;
using StorageService.Domain.AggregatesModel.QuotaAggregate;
using StorageService.Infrastructure.Configuration;
using StorageService.Infrastructure.Persistence;
using StorageService.Infrastructure.Storage;
namespace StorageService.API.Application.Commands.MultipartUpload;
/// <summary>
/// EN: Handler for InitiateMultipartUploadCommand.
/// VI: Handler cho InitiateMultipartUploadCommand.
/// </summary>
public class InitiateMultipartUploadCommandHandler
: IRequestHandler<InitiateMultipartUploadCommand, InitiateMultipartUploadResult>
{
private const int DefaultChunkSizeBytes = 5 * 1024 * 1024; // 5MB
private const int MinChunkSizeBytes = 5 * 1024 * 1024; // 5MB (MinIO minimum)
private const int MaxChunkSizeBytes = 100 * 1024 * 1024; // 100MB
private readonly IQuotaRepository _quotaRepository;
private readonly StorageServiceContext _context;
private readonly IStorageProviderFactory _storageProviderFactory;
private readonly StorageSettings _settings;
private readonly ILogger<InitiateMultipartUploadCommandHandler> _logger;
public InitiateMultipartUploadCommandHandler(
IQuotaRepository quotaRepository,
StorageServiceContext context,
IStorageProviderFactory storageProviderFactory,
IOptions<StorageSettings> settings,
ILogger<InitiateMultipartUploadCommandHandler> logger)
{
_quotaRepository = quotaRepository;
_context = context;
_storageProviderFactory = storageProviderFactory;
_settings = settings.Value;
_logger = logger;
}
public async Task<InitiateMultipartUploadResult> Handle(
InitiateMultipartUploadCommand request,
CancellationToken cancellationToken)
{
try
{
_logger.LogInformation(
"Initiating multipart upload for user {UserId}, file: {FileName}, size: {Size} bytes",
request.UserId, request.FileName, request.FileSizeBytes);
// EN: Validate file size / VI: Kiểm tra kích thước file
if (request.FileSizeBytes <= 0)
{
return Fail("File size must be greater than 0");
}
// EN: Check user quota / VI: Kiểm tra quota của user
var quota = await _quotaRepository.GetOrCreateAsync(request.UserId, cancellationToken);
if (!quota.CanUpload(request.FileSizeBytes))
{
_logger.LogWarning(
"Quota exceeded for user {UserId}. Used: {Used}/{Max}",
request.UserId, quota.UsedStorageBytes, quota.MaxStorageBytes);
return Fail("Storage quota exceeded. Please upgrade your plan or delete some files.");
}
// EN: Determine chunk size / VI: Xác định chunk size
var chunkSizeBytes = request.ChunkSizeBytes ?? DefaultChunkSizeBytes;
if (chunkSizeBytes < MinChunkSizeBytes || chunkSizeBytes > MaxChunkSizeBytes)
{
chunkSizeBytes = DefaultChunkSizeBytes;
_logger.LogWarning(
"Invalid chunk size {Size}, using default {Default}",
request.ChunkSizeBytes, DefaultChunkSizeBytes);
}
// EN: Generate object key / VI: Tạo object key
var objectKey = GenerateObjectKey(request.UserId, request.FileName, request.AccessLevel);
var bucketName = _settings.DefaultBucket;
// EN: Initiate multipart upload with storage provider / VI: Khởi tạo multipart upload với storage provider
var provider = _storageProviderFactory.GetProvider();
await provider.EnsureBucketExistsAsync(bucketName, cancellationToken);
var providerUploadId = await provider.InitiateMultipartUploadAsync(
bucketName,
objectKey,
request.ContentType,
cancellationToken);
if (string.IsNullOrEmpty(providerUploadId))
{
_logger.LogError("Provider failed to return upload ID");
return Fail("Failed to initiate multipart upload");
}
// EN: Create domain entity / VI: Tạo domain entity
var multipartUpload = new Domain.AggregatesModel.MultipartUploadAggregate.MultipartUpload(
userId: request.UserId,
fileName: request.FileName,
totalSizeBytes: request.FileSizeBytes,
chunkSizeBytes: chunkSizeBytes,
providerUploadId: providerUploadId,
bucketName: bucketName,
objectKey: objectKey,
contentType: request.ContentType,
expirationHours: 24
);
// EN: Save to database / VI: Lưu vào database
await _context.MultipartUploads.AddAsync(multipartUpload, cancellationToken);
await _context.SaveEntitiesAsync(cancellationToken);
_logger.LogInformation(
"Multipart upload initiated. UploadId: {UploadId}, ProviderUploadId: {ProviderUploadId}, TotalChunks: {TotalChunks}",
multipartUpload.Id, providerUploadId, multipartUpload.TotalChunks);
return new InitiateMultipartUploadResult(
Success: true,
UploadId: multipartUpload.Id,
ProviderUploadId: providerUploadId,
ObjectKey: objectKey,
BucketName: bucketName,
TotalChunks: multipartUpload.TotalChunks,
ChunkSizeBytes: chunkSizeBytes,
ExpiresAt: multipartUpload.ExpiresAt,
Error: null
);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error initiating multipart upload for user {UserId}", request.UserId);
return Fail("An error occurred while initiating multipart upload.");
}
}
private static string GenerateObjectKey(string userId, string fileName, Domain.AggregatesModel.FileAggregate.FileAccessLevel accessLevel)
{
var prefix = accessLevel switch
{
Domain.AggregatesModel.FileAggregate.FileAccessLevel.Public => "public",
Domain.AggregatesModel.FileAggregate.FileAccessLevel.Shared => "shared",
_ => "private"
};
var date = DateTime.UtcNow.ToString("yyyyMMdd");
var fileId = Guid.NewGuid().ToString("N")[..8];
var sanitizedName = SanitizeFileName(fileName);
return $"{prefix}/{userId}/{date}/{fileId}_{sanitizedName}";
}
private static string SanitizeFileName(string fileName)
{
var invalidChars = Path.GetInvalidFileNameChars();
var sanitized = string.Join("_", fileName.Split(invalidChars, StringSplitOptions.RemoveEmptyEntries));
if (sanitized.Length > 100)
{
var extension = Path.GetExtension(sanitized);
var nameWithoutExt = Path.GetFileNameWithoutExtension(sanitized);
sanitized = nameWithoutExt[..Math.Min(nameWithoutExt.Length, 90)] + extension;
}
return sanitized;
}
private static InitiateMultipartUploadResult Fail(string error) =>
new(false, null, null, null, null, null, null, null, error);
}

View File

@@ -0,0 +1,28 @@
using MediatR;
namespace StorageService.API.Application.Commands.MultipartUpload;
/// <summary>
/// EN: Command to upload a single part of a multipart upload.
/// VI: Command để upload một part của multipart upload.
/// </summary>
public record UploadPartCommand(
Guid UploadId,
int PartNumber,
Stream DataStream,
string UserId
) : IRequest<UploadPartResult>;
/// <summary>
/// EN: Result of part upload.
/// VI: Kết quả upload part.
/// </summary>
public record UploadPartResult(
bool Success,
string? ETag,
string? Error
)
{
public static UploadPartResult Ok(string etag) => new(true, etag, null);
public static UploadPartResult Fail(string error) => new(false, null, error);
}

View File

@@ -0,0 +1,105 @@
using MediatR;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using StorageService.Infrastructure.Persistence;
using StorageService.Infrastructure.Storage;
namespace StorageService.API.Application.Commands.MultipartUpload;
/// <summary>
/// EN: Handler for UploadPartCommand.
/// VI: Handler cho UploadPartCommand.
/// </summary>
public class UploadPartCommandHandler : IRequestHandler<UploadPartCommand, UploadPartResult>
{
private readonly StorageServiceContext _context;
private readonly IStorageProviderFactory _storageProviderFactory;
private readonly ILogger<UploadPartCommandHandler> _logger;
public UploadPartCommandHandler(
StorageServiceContext context,
IStorageProviderFactory storageProviderFactory,
ILogger<UploadPartCommandHandler> logger)
{
_context = context;
_storageProviderFactory = storageProviderFactory;
_logger = logger;
}
public async Task<UploadPartResult> Handle(
UploadPartCommand request,
CancellationToken cancellationToken)
{
try
{
_logger.LogInformation(
"Uploading part {PartNumber} for upload {UploadId}",
request.PartNumber, request.UploadId);
// EN: Get multipart upload / VI: Lấy multipart upload
var upload = await _context.MultipartUploads
.Include(u => u.Parts)
.FirstOrDefaultAsync(u => u.Id == request.UploadId, cancellationToken);
if (upload == null)
{
_logger.LogWarning("Multipart upload not found: {UploadId}", request.UploadId);
return UploadPartResult.Fail("Multipart upload not found");
}
// EN: Verify ownership / VI: Xác minh quyền sở hữu
if (upload.UserId != request.UserId)
{
_logger.LogWarning(
"User {UserId} attempted to upload part for upload owned by {OwnerId}",
request.UserId, upload.UserId);
return UploadPartResult.Fail("Unauthorized");
}
// EN: Check if upload is in progress / VI: Kiểm tra upload đang progress
if (upload.Status != Domain.AggregatesModel.MultipartUploadAggregate.MultipartUploadStatus.InProgress)
{
_logger.LogWarning("Upload {UploadId} is not in progress: {Status}", request.UploadId, upload.Status);
return UploadPartResult.Fail($"Upload is not in progress (status: {upload.Status})");
}
// EN: Check if expired / VI: Kiểm tra đã hết hạn
if (upload.IsExpired())
{
_logger.LogWarning("Upload {UploadId} has expired", request.UploadId);
upload.MarkFailed();
await _context.SaveEntitiesAsync(cancellationToken);
return UploadPartResult.Fail("Upload has expired");
}
// EN: Upload part to storage provider / VI: Upload part lên storage provider
var provider = _storageProviderFactory.GetProvider();
var etag = await provider.UploadPartAsync(
upload.BucketName,
upload.ObjectKey,
upload.ProviderUploadId,
request.PartNumber,
request.DataStream,
cancellationToken);
// EN: Add part to upload / VI: Thêm part vào upload
var partSize = request.DataStream.Length;
upload.AddPart(request.PartNumber, etag, partSize);
// EN: Save changes / VI: Lưu thay đổi
await _context.SaveEntitiesAsync(cancellationToken);
_logger.LogInformation(
"Part {PartNumber} uploaded successfully. Progress: {Progress}%",
request.PartNumber, upload.GetProgressPercentage());
return UploadPartResult.Ok(etag);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error uploading part {PartNumber} for upload {UploadId}",
request.PartNumber, request.UploadId);
return UploadPartResult.Fail("An error occurred while uploading part.");
}
}
}

View File

@@ -79,26 +79,57 @@ public class GetUserFilesQueryHandler : IRequestHandler<GetUserFilesQuery, UserF
public class GetUserQuotaQueryHandler : IRequestHandler<GetUserQuotaQuery, QuotaDto?>
{
private readonly IQuotaRepository _quotaRepository;
private readonly Infrastructure.Caching.IRedisCacheService _cache;
public GetUserQuotaQueryHandler(IQuotaRepository quotaRepository)
public GetUserQuotaQueryHandler(
IQuotaRepository quotaRepository,
Infrastructure.Caching.IRedisCacheService cache)
{
_quotaRepository = quotaRepository;
_cache = cache;
}
public async Task<QuotaDto?> Handle(GetUserQuotaQuery request, CancellationToken cancellationToken)
{
var quota = await _quotaRepository.GetOrCreateAsync(request.UserId, cancellationToken);
// EN: Use cache-aside pattern for quota
// VI: Sử dụng cache-aside pattern cho quota
if (Guid.TryParse(request.UserId, out var userId))
{
var cacheKey = Infrastructure.Caching.CacheKeys.UserQuota(userId);
return await _cache.GetOrSetAsync(
cacheKey,
async () =>
{
var quota = await _quotaRepository.GetOrCreateAsync(request.UserId, cancellationToken);
return new QuotaDto(
quota.UserId,
quota.MaxStorageBytes,
quota.UsedStorageBytes,
quota.RemainingStorageBytes,
quota.MaxFileCount,
quota.CurrentFileCount,
quota.RemainingFileCount,
quota.UsagePercentage,
quota.QuotaTier);
},
Infrastructure.Caching.CacheKeys.DefaultTtl,
cancellationToken);
}
// EN: Fallback if UserId is not valid GUID
// VI: Fallback nếu UserId không phải GUID hợp lệ
var quotaFallback = await _quotaRepository.GetOrCreateAsync(request.UserId, cancellationToken);
return new QuotaDto(
quota.UserId,
quota.MaxStorageBytes,
quota.UsedStorageBytes,
quota.RemainingStorageBytes,
quota.MaxFileCount,
quota.CurrentFileCount,
quota.RemainingFileCount,
quota.UsagePercentage,
quota.QuotaTier);
quotaFallback.UserId,
quotaFallback.MaxStorageBytes,
quotaFallback.UsedStorageBytes,
quotaFallback.RemainingStorageBytes,
quotaFallback.MaxFileCount,
quotaFallback.CurrentFileCount,
quotaFallback.RemainingFileCount,
quotaFallback.UsagePercentage,
quotaFallback.QuotaTier);
}
}

View File

@@ -0,0 +1,80 @@
using MediatR;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using StorageService.API.Controllers;
using StorageService.Infrastructure.Persistence;
namespace StorageService.API.Application.Queries;
/// <summary>
/// EN: Handler for GetMultipartUploadProgressQuery.
/// VI: Handler cho GetMultipartUploadProgressQuery.
/// </summary>
public class GetMultipartUploadProgressQueryHandler
: IRequestHandler<GetMultipartUploadProgressQuery, MultipartUploadProgressResult?>
{
private readonly StorageServiceContext _context;
private readonly ILogger<GetMultipartUploadProgressQueryHandler> _logger;
public GetMultipartUploadProgressQueryHandler(
StorageServiceContext context,
ILogger<GetMultipartUploadProgressQueryHandler> logger)
{
_context = context;
_logger = logger;
}
public async Task<MultipartUploadProgressResult?> Handle(
GetMultipartUploadProgressQuery request,
CancellationToken cancellationToken)
{
try
{
_logger.LogInformation(
"Getting progress for multipart upload {UploadId}",
request.UploadId);
// EN: Get multipart upload with parts / VI: Lấy multipart upload với parts
var upload = await _context.MultipartUploads
.Include(u => u.Parts)
.FirstOrDefaultAsync(u => u.Id == request.UploadId, cancellationToken);
if (upload == null)
{
_logger.LogWarning("Multipart upload not found: {UploadId}", request.UploadId);
return null;
}
// EN: Verify ownership / VI: Xác minh quyền sở hữu
if (upload.UserId != request.UserId)
{
_logger.LogWarning(
"User {UserId} attempted to access upload owned by {OwnerId}",
request.UserId, upload.UserId);
return null;
}
// EN: Calculate progress / VI: Tính toán tiến độ
var uploadedBytes = upload.Parts.Sum(p => p.SizeBytes);
return new MultipartUploadProgressResult
{
UploadId = upload.Id,
FileName = upload.FileName,
TotalSizeBytes = upload.TotalSizeBytes,
UploadedBytes = uploadedBytes,
TotalChunks = upload.TotalChunks,
UploadedChunks = upload.Parts.Count,
ProgressPercentage = upload.GetProgressPercentage(),
Status = upload.Status.ToString(),
CreatedAt = upload.CreatedAt,
ExpiresAt = upload.ExpiresAt
};
}
catch (Exception ex)
{
_logger.LogError(ex, "Error getting progress for multipart upload {UploadId}", request.UploadId);
throw;
}
}
}

View File

@@ -0,0 +1,323 @@
using MediatR;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using StorageService.API.Application.Commands.MultipartUpload;
using Swashbuckle.AspNetCore.Annotations;
using System.Security.Claims;
using Asp.Versioning;
namespace StorageService.API.Controllers;
/// <summary>
/// EN: Controller for multipart file upload operations.
/// VI: Controller cho các thao tác multipart upload file.
/// </summary>
[ApiController]
[ApiVersion("1.0")]
[Route("api/v{version:apiVersion}/files/multipart")]
[SwaggerTag("Multipart Upload - Upload large files in chunks")]
public class MultipartUploadController : ControllerBase
{
private readonly IMediator _mediator;
private readonly ILogger<MultipartUploadController> _logger;
public MultipartUploadController(IMediator mediator, ILogger<MultipartUploadController> logger)
{
_mediator = mediator;
_logger = logger;
}
/// <summary>
/// EN: Initiate a multipart upload session.
/// VI: Khởi tạo phiên multipart upload.
/// </summary>
/// <remarks>
/// EN: Start uploading a large file in chunks. Returns an upload ID to use for uploading parts.
/// VI: Bắt đầu upload file lớn theo từng phần. Trả về upload ID để sử dụng cho việc upload các phần.
/// </remarks>
[HttpPost("initiate")]
[Authorize]
[SwaggerOperation(
Summary = "Initiate multipart upload",
Description = "Start a new multipart upload session for large files")]
[SwaggerResponse(200, "Upload session initiated successfully", typeof(ApiResponse<InitiateMultipartUploadResult>))]
[SwaggerResponse(400, "Invalid request")]
[SwaggerResponse(401, "Unauthorized")]
public async Task<ActionResult<ApiResponse<InitiateMultipartUploadResult>>> Initiate(
[FromBody] InitiateMultipartUploadRequest request,
CancellationToken cancellationToken = default)
{
var userId = GetUserId();
if (string.IsNullOrEmpty(userId))
return Unauthorized(new ApiResponse<InitiateMultipartUploadResult>
{
Success = false,
Error = "User ID not found"
});
var command = new InitiateMultipartUploadCommand(
request.FileName,
request.TotalSizeBytes,
request.ContentType ?? "application/octet-stream",
userId,
request.ChunkSizeBytes);
var result = await _mediator.Send(command, cancellationToken);
if (!result.Success)
return BadRequest(new ApiResponse<InitiateMultipartUploadResult>
{
Success = false,
Error = result.Error
});
return Ok(new ApiResponse<InitiateMultipartUploadResult> { Success = true, Data = result });
}
/// <summary>
/// EN: Upload a single part of a multipart upload.
/// VI: Upload một phần của multipart upload.
/// </summary>
/// <remarks>
/// EN: Upload one chunk of the file. Part numbers must be between 1 and N (total chunks).
/// VI: Upload một phần của file. Số thứ tự phần phải từ 1 đến N (tổng số chunks).
/// </remarks>
[HttpPost("upload-part")]
[Authorize]
[RequestSizeLimit(20971520)] // 20MB per part
[SwaggerOperation(
Summary = "Upload a part",
Description = "Upload a single chunk of the file")]
[SwaggerResponse(200, "Part uploaded successfully", typeof(ApiResponse<UploadPartResult>))]
[SwaggerResponse(400, "Invalid request")]
[SwaggerResponse(401, "Unauthorized")]
[SwaggerResponse(404, "Upload session not found")]
public async Task<ActionResult<ApiResponse<UploadPartResult>>> UploadPart(
[FromForm] Guid uploadId,
[FromForm] int partNumber,
IFormFile file,
CancellationToken cancellationToken = default)
{
var userId = GetUserId();
if (string.IsNullOrEmpty(userId))
return Unauthorized(new ApiResponse<UploadPartResult>
{
Success = false,
Error = "User ID not found"
});
if (file == null || file.Length == 0)
return BadRequest(new ApiResponse<UploadPartResult>
{
Success = false,
Error = "No file data provided"
});
using var stream = file.OpenReadStream();
var command = new UploadPartCommand(uploadId, partNumber, stream, userId);
var result = await _mediator.Send(command, cancellationToken);
if (!result.Success)
return BadRequest(new ApiResponse<UploadPartResult>
{
Success = false,
Error = result.Error
});
return Ok(new ApiResponse<UploadPartResult> { Success = true, Data = result });
}
/// <summary>
/// EN: Complete the multipart upload.
/// VI: Hoàn thành multipart upload.
/// </summary>
/// <remarks>
/// EN: Complete the upload by merging all parts into a single file.
/// VI: Hoàn thành upload bằng cách merge tất cả các phần thành một file duy nhất.
/// </remarks>
[HttpPost("complete")]
[Authorize]
[SwaggerOperation(
Summary = "Complete multipart upload",
Description = "Finalize the upload and merge all parts")]
[SwaggerResponse(200, "Upload completed successfully", typeof(ApiResponse<CompleteMultipartUploadResult>))]
[SwaggerResponse(400, "Invalid request or missing parts")]
[SwaggerResponse(401, "Unauthorized")]
[SwaggerResponse(404, "Upload session not found")]
public async Task<ActionResult<ApiResponse<CompleteMultipartUploadResult>>> Complete(
[FromBody] CompleteMultipartUploadRequest request,
CancellationToken cancellationToken = default)
{
var userId = GetUserId();
if (string.IsNullOrEmpty(userId))
return Unauthorized(new ApiResponse<CompleteMultipartUploadResult>
{
Success = false,
Error = "User ID not found"
});
var command = new CompleteMultipartUploadCommand(request.UploadId, userId);
var result = await _mediator.Send(command, cancellationToken);
if (!result.Success)
return BadRequest(new ApiResponse<CompleteMultipartUploadResult>
{
Success = false,
Error = result.Error
});
return Ok(new ApiResponse<CompleteMultipartUploadResult> { Success = true, Data = result });
}
/// <summary>
/// EN: Abort the multipart upload.
/// VI: Hủy multipart upload.
/// </summary>
/// <remarks>
/// EN: Cancel the upload and delete all uploaded parts.
/// VI: Hủy upload và xóa tất cả các phần đã upload.
/// </remarks>
[HttpDelete("abort")]
[Authorize]
[SwaggerOperation(
Summary = "Abort multipart upload",
Description = "Cancel the upload and cleanup resources")]
[SwaggerResponse(200, "Upload aborted successfully", typeof(ApiResponse<AbortMultipartUploadResult>))]
[SwaggerResponse(400, "Invalid request")]
[SwaggerResponse(401, "Unauthorized")]
[SwaggerResponse(404, "Upload session not found")]
public async Task<ActionResult<ApiResponse<AbortMultipartUploadResult>>> Abort(
[FromBody] AbortMultipartUploadRequest request,
CancellationToken cancellationToken = default)
{
var userId = GetUserId();
if (string.IsNullOrEmpty(userId))
return Unauthorized(new ApiResponse<AbortMultipartUploadResult>
{
Success = false,
Error = "User ID not found"
});
var command = new AbortMultipartUploadCommand(request.UploadId, userId);
var result = await _mediator.Send(command, cancellationToken);
if (!result.Success)
return BadRequest(new ApiResponse<AbortMultipartUploadResult>
{
Success = false,
Error = result.Error
});
return Ok(new ApiResponse<AbortMultipartUploadResult> { Success = true, Data = result });
}
/// <summary>
/// EN: Get multipart upload progress.
/// VI: Lấy tiến độ multipart upload.
/// </summary>
/// <remarks>
/// EN: Get the current status and progress of an ongoing multipart upload.
/// VI: Lấy trạng thái và tiến độ hiện tại của một multipart upload đang diễn ra.
/// </remarks>
[HttpGet("{uploadId:guid}")]
[Authorize]
[SwaggerOperation(
Summary = "Get upload progress",
Description = "Get the current status and progress of a multipart upload")]
[SwaggerResponse(200, "Progress retrieved successfully", typeof(ApiResponse<MultipartUploadProgressResult>))]
[SwaggerResponse(401, "Unauthorized")]
[SwaggerResponse(404, "Upload session not found")]
public async Task<ActionResult<ApiResponse<MultipartUploadProgressResult>>> GetProgress(
Guid uploadId,
CancellationToken cancellationToken = default)
{
var userId = GetUserId();
if (string.IsNullOrEmpty(userId))
return Unauthorized(new ApiResponse<MultipartUploadProgressResult>
{
Success = false,
Error = "User ID not found"
});
var query = new GetMultipartUploadProgressQuery(uploadId, userId);
var result = await _mediator.Send(query, cancellationToken);
if (result == null)
return NotFound(new ApiResponse<MultipartUploadProgressResult>
{
Success = false,
Error = "Upload session not found"
});
return Ok(new ApiResponse<MultipartUploadProgressResult> { Success = true, Data = result });
}
private string? GetUserId() => User.FindFirstValue(ClaimTypes.NameIdentifier);
}
// ========== Request DTOs ==========
/// <summary>
/// EN: Request to initiate a multipart upload.
/// VI: Request để khởi tạo multipart upload.
/// </summary>
public record InitiateMultipartUploadRequest
{
/// <summary>EN: File name / VI: Tên file</summary>
public required string FileName { get; init; }
/// <summary>EN: Total file size in bytes / VI: Tổng kích thước file (bytes)</summary>
public required long TotalSizeBytes { get; init; }
/// <summary>EN: Content type (MIME) / VI: Content type (MIME)</summary>
public string? ContentType { get; init; }
/// <summary>EN: Size of each chunk in bytes (default: 5MB) / VI: Kích thước mỗi chunk (bytes, mặc định: 5MB)</summary>
public int? ChunkSizeBytes { get; init; }
}
/// <summary>
/// EN: Request to complete a multipart upload.
/// VI: Request để hoàn thành multipart upload.
/// </summary>
public record CompleteMultipartUploadRequest
{
/// <summary>EN: Upload ID / VI: ID của upload</summary>
public required Guid UploadId { get; init; }
}
/// <summary>
/// EN: Request to abort a multipart upload.
/// VI: Request để hủy multipart upload.
/// </summary>
public record AbortMultipartUploadRequest
{
/// <summary>EN: Upload ID / VI: ID của upload</summary>
public required Guid UploadId { get; init; }
}
// ========== Query for Progress ==========
/// <summary>
/// EN: Query to get multipart upload progress.
/// VI: Query để lấy tiến độ multipart upload.
/// </summary>
public record GetMultipartUploadProgressQuery(Guid UploadId, string UserId) : IRequest<MultipartUploadProgressResult?>;
/// <summary>
/// EN: Result of multipart upload progress query.
/// VI: Kết quả query tiến độ multipart upload.
/// </summary>
public record MultipartUploadProgressResult
{
public Guid UploadId { get; init; }
public string FileName { get; init; } = default!;
public long TotalSizeBytes { get; init; }
public long UploadedBytes { get; init; }
public int TotalChunks { get; init; }
public int UploadedChunks { get; init; }
public double ProgressPercentage { get; init; }
public string Status { get; init; } = default!;
public DateTime CreatedAt { get; init; }
public DateTime? ExpiresAt { get; init; }
}

View File

@@ -0,0 +1,188 @@
using StorageService.Domain.SeedWork;
using StorageService.Domain.Events;
namespace StorageService.Domain.AggregatesModel.MultipartUploadAggregate;
/// <summary>
/// EN: Represents a multipart upload in progress.
/// VI: Đại diện cho quá trình multipart upload.
/// </summary>
public class MultipartUpload : Entity, IAggregateRoot
{
private readonly List<MultipartUploadPart> _parts = new();
/// <summary>EN: User ID / VI: ID người dùng</summary>
public string UserId { get; private set; } = string.Empty;
/// <summary>EN: Original file name / VI: Tên file gốc</summary>
public string FileName { get; private set; } = string.Empty;
/// <summary>EN: Total file size in bytes / VI: Tổng kích thước file (bytes)</summary>
public long TotalSizeBytes { get; private set; }
/// <summary>EN: Chunk size in bytes / VI: Kích thước chunk (bytes)</summary>
public int ChunkSizeBytes { get; private set; }
/// <summary>EN: Total number of chunks / VI: Tổng số chunk</summary>
public int TotalChunks { get; private set; }
/// <summary>EN: Number of uploaded chunks / VI: Số chunk đã upload</summary>
public int UploadedChunks { get; private set; }
/// <summary>EN: Upload ID from storage provider / VI: Upload ID từ storage provider</summary>
public string ProviderUploadId { get; private set; } = string.Empty;
/// <summary>EN: Bucket name / VI: Tên bucket</summary>
public string BucketName { get; private set; } = string.Empty;
/// <summary>EN: Object key / VI: Object key</summary>
public string ObjectKey { get; private set; } = string.Empty;
/// <summary>EN: Content type / VI: Content type</summary>
public string ContentType { get; private set; } = string.Empty;
/// <summary>EN: Upload status / VI: Trạng thái upload</summary>
public MultipartUploadStatus Status { get; private set; }
/// <summary>EN: Creation timestamp / VI: Thời gian tạo</summary>
public DateTime CreatedAt { get; private set; }
/// <summary>EN: Completion timestamp / VI: Thời gian hoàn thành</summary>
public DateTime? CompletedAt { get; private set; }
/// <summary>EN: Expiration timestamp / VI: Thời gian hết hạn</summary>
public DateTime ExpiresAt { get; private set; }
/// <summary>EN: Upload parts / VI: Các part đã upload</summary>
public IReadOnlyCollection<MultipartUploadPart> Parts => _parts.AsReadOnly();
// EN: EF Core requires parameterless constructor / VI: EF Core cần constructor không tham số
protected MultipartUpload() { }
/// <summary>
/// EN: Create a new multipart upload.
/// VI: Tạo một multipart upload mới.
/// </summary>
public MultipartUpload(
string userId,
string fileName,
long totalSizeBytes,
int chunkSizeBytes,
string providerUploadId,
string bucketName,
string objectKey,
string contentType,
int expirationHours = 24)
{
if (string.IsNullOrWhiteSpace(userId))
throw new ArgumentNullException(nameof(userId));
if (string.IsNullOrWhiteSpace(fileName))
throw new ArgumentNullException(nameof(fileName));
if (totalSizeBytes <= 0)
throw new ArgumentException("Total size must be > 0", nameof(totalSizeBytes));
if (chunkSizeBytes <= 0)
throw new ArgumentException("Chunk size must be > 0", nameof(chunkSizeBytes));
if (string.IsNullOrWhiteSpace(providerUploadId))
throw new ArgumentNullException(nameof(providerUploadId));
if (string.IsNullOrWhiteSpace(bucketName))
throw new ArgumentNullException(nameof(bucketName));
if (string.IsNullOrWhiteSpace(objectKey))
throw new ArgumentNullException(nameof(objectKey));
Id = Guid.NewGuid();
UserId = userId;
FileName = fileName;
TotalSizeBytes = totalSizeBytes;
ChunkSizeBytes = chunkSizeBytes;
TotalChunks = (int)Math.Ceiling((double)totalSizeBytes / chunkSizeBytes);
UploadedChunks = 0;
ProviderUploadId = providerUploadId;
BucketName = bucketName;
ObjectKey = objectKey;
ContentType = contentType ?? "application/octet-stream";
Status = MultipartUploadStatus.InProgress;
CreatedAt = DateTime.UtcNow;
ExpiresAt = DateTime.UtcNow.AddHours(expirationHours);
// EN: Raise domain event / VI: Phát domain event
AddDomainEvent(new MultipartUploadInitiatedDomainEvent(Id, fileName, userId, totalSizeBytes));
}
/// <summary>
/// EN: Add a part to the upload.
/// VI: Thêm một part đã upload.
/// </summary>
public void AddPart(int partNumber, string etag, long sizeBytes)
{
if (Status != MultipartUploadStatus.InProgress)
throw new InvalidOperationException($"Cannot add part to upload with status {Status}");
if (_parts.Any(p => p.PartNumber == partNumber))
throw new InvalidOperationException($"Part {partNumber} already uploaded");
var part = new MultipartUploadPart(Id, partNumber, etag, sizeBytes);
_parts.Add(part);
UploadedChunks = _parts.Count;
}
/// <summary>
/// EN: Complete the multipart upload.
/// VI: Hoàn thành multipart upload.
/// </summary>
public void Complete()
{
if (Status != MultipartUploadStatus.InProgress)
throw new InvalidOperationException($"Cannot complete upload with status {Status}");
if (UploadedChunks != TotalChunks)
throw new InvalidOperationException($"Cannot complete: {UploadedChunks}/{TotalChunks} parts uploaded");
Status = MultipartUploadStatus.Completed;
CompletedAt = DateTime.UtcNow;
// EN: Raise domain event / VI: Phát domain event
AddDomainEvent(new MultipartUploadCompletedDomainEvent(Id, FileName, UserId, TotalSizeBytes, ObjectKey));
}
/// <summary>
/// EN: Abort the multipart upload.
/// VI: Hủy multipart upload.
/// </summary>
public void Abort()
{
if (Status == MultipartUploadStatus.Completed)
throw new InvalidOperationException("Cannot abort completed upload");
Status = MultipartUploadStatus.Aborted;
CompletedAt = DateTime.UtcNow;
// EN: Raise domain event / VI: Phát domain event
AddDomainEvent(new MultipartUploadAbortedDomainEvent(Id, UserId));
}
/// <summary>
/// EN: Mark upload as failed.
/// VI: Đánh dấu upload thất bại.
/// </summary>
public void MarkFailed()
{
Status = MultipartUploadStatus.Failed;
CompletedAt = DateTime.UtcNow;
}
/// <summary>
/// EN: Check if upload has expired.
/// VI: Kiểm tra xem upload đã hết hạn chưa.
/// </summary>
public bool IsExpired() => DateTime.UtcNow > ExpiresAt;
/// <summary>
/// EN: Get progress percentage.
/// VI: Lấy phần trăm tiến độ.
/// </summary>
public double GetProgressPercentage()
{
if (TotalChunks == 0) return 0;
return (double)UploadedChunks / TotalChunks * 100;
}
}

View File

@@ -0,0 +1,53 @@
using StorageService.Domain.SeedWork;
namespace StorageService.Domain.AggregatesModel.MultipartUploadAggregate;
/// <summary>
/// EN: Represents a single part of a multipart upload.
/// VI: Đại diện cho một part của multipart upload.
/// </summary>
public class MultipartUploadPart : Entity
{
/// <summary>EN: Multipart upload ID / VI: ID của multipart upload</summary>
public Guid MultipartUploadId { get; private set; }
/// <summary>EN: Part number (1-indexed) / VI: Số thứ tự part (bắt đầu từ 1)</summary>
public int PartNumber { get; private set; }
/// <summary>EN: ETag returned by storage provider / VI: ETag trả về từ storage provider</summary>
public string ETag { get; private set; } = string.Empty;
/// <summary>EN: Part size in bytes / VI: Kích thước part (bytes)</summary>
public long SizeBytes { get; private set; }
/// <summary>EN: Upload timestamp / VI: Thời gian upload</summary>
public DateTime UploadedAt { get; private set; }
// EN: EF Core requires parameterless constructor / VI: EF Core cần constructor không tham số
protected MultipartUploadPart() { }
/// <summary>
/// EN: Create a new multipart upload part.
/// VI: Tạo một part mới cho multipart upload.
/// </summary>
public MultipartUploadPart(
Guid multipartUploadId,
int partNumber,
string etag,
long sizeBytes)
{
if (partNumber < 1)
throw new ArgumentException("Part number must be >= 1", nameof(partNumber));
if (string.IsNullOrWhiteSpace(etag))
throw new ArgumentNullException(nameof(etag));
if (sizeBytes <= 0)
throw new ArgumentException("Size must be > 0", nameof(sizeBytes));
Id = Guid.NewGuid();
MultipartUploadId = multipartUploadId;
PartNumber = partNumber;
ETag = etag;
SizeBytes = sizeBytes;
UploadedAt = DateTime.UtcNow;
}
}

View File

@@ -0,0 +1,23 @@
namespace StorageService.Domain.AggregatesModel.MultipartUploadAggregate;
/// <summary>
/// EN: Status of multipart upload.
/// VI: Trạng thái của multipart upload.
/// </summary>
public enum MultipartUploadStatus
{
/// <summary>EN: Upload in progress / VI: Đang upload</summary>
InProgress = 0,
/// <summary>EN: Upload completed successfully / VI: Upload hoàn thành</summary>
Completed = 1,
/// <summary>EN: Upload aborted by user / VI: Upload bị hủy bởi user</summary>
Aborted = 2,
/// <summary>EN: Upload failed / VI: Upload thất bại</summary>
Failed = 3,
/// <summary>EN: Upload expired / VI: Upload hết hạn</summary>
Expired = 4
}

View File

@@ -0,0 +1,19 @@
using MediatR;
namespace StorageService.Domain.Events;
/// <summary>
/// EN: Event raised when a multipart upload is aborted.
/// VI: Event được raise khi multipart upload bị hủy.
/// </summary>
public class MultipartUploadAbortedDomainEvent : INotification
{
public Guid UploadId { get; }
public string UserId { get; }
public MultipartUploadAbortedDomainEvent(Guid uploadId, string userId)
{
UploadId = uploadId;
UserId = userId;
}
}

View File

@@ -0,0 +1,25 @@
using MediatR;
namespace StorageService.Domain.Events;
/// <summary>
/// EN: Event raised when a multipart upload is completed.
/// VI: Event được raise khi multipart upload hoàn thành.
/// </summary>
public class MultipartUploadCompletedDomainEvent : INotification
{
public Guid UploadId { get; }
public string FileName { get; }
public string UserId { get; }
public long TotalSizeBytes { get; }
public string ObjectKey { get; }
public MultipartUploadCompletedDomainEvent(Guid uploadId, string fileName, string userId, long totalSizeBytes, string objectKey)
{
UploadId = uploadId;
FileName = fileName;
UserId = userId;
TotalSizeBytes = totalSizeBytes;
ObjectKey = objectKey;
}
}

View File

@@ -0,0 +1,23 @@
using MediatR;
namespace StorageService.Domain.Events;
/// <summary>
/// EN: Event raised when a multipart upload is initiated.
/// VI: Event được raise khi multipart upload được khởi tạo.
/// </summary>
public class MultipartUploadInitiatedDomainEvent : INotification
{
public Guid UploadId { get; }
public string FileName { get; }
public string UserId { get; }
public long TotalSizeBytes { get; }
public MultipartUploadInitiatedDomainEvent(Guid uploadId, string fileName, string userId, long totalSizeBytes)
{
UploadId = uploadId;
FileName = fileName;
UserId = userId;
TotalSizeBytes = totalSizeBytes;
}
}

View File

@@ -0,0 +1,100 @@
namespace StorageService.Infrastructure.Caching;
/// <summary>
/// EN: Cache key constants for Storage Service.
/// VI: Các hằng số cache key cho Storage Service.
/// </summary>
public static class CacheKeys
{
/// <summary>
/// EN: Cache key prefix for the service.
/// VI: Prefix cho cache key của service.
/// </summary>
private const string Prefix = "storage";
/// <summary>
/// EN: Default cache TTL (5 minutes).
/// VI: TTL cache mặc định (5 phút).
/// </summary>
public static readonly TimeSpan DefaultTtl = TimeSpan.FromMinutes(5);
/// <summary>
/// EN: Short cache TTL (1 minute).
/// VI: TTL cache ngắn (1 phút).
/// </summary>
public static readonly TimeSpan ShortTtl = TimeSpan.FromMinutes(1);
/// <summary>
/// EN: Long cache TTL (30 minutes).
/// VI: TTL cache dài (30 phút).
/// </summary>
public static readonly TimeSpan LongTtl = TimeSpan.FromMinutes(30);
/// <summary>
/// EN: Build cache key for user quota.
/// VI: Xây dựng cache key cho user quota.
/// </summary>
/// <param name="userId">User ID</param>
/// <returns>Cache key / Cache key</returns>
public static string UserQuota(Guid userId) => $"{Prefix}:quota:{userId}";
/// <summary>
/// EN: Build cache key for file metadata.
/// VI: Xây dựng cache key cho file metadata.
/// </summary>
/// <param name="fileId">File ID</param>
/// <returns>Cache key / Cache key</returns>
public static string FileMetadata(Guid fileId) => $"{Prefix}:file:{fileId}";
/// <summary>
/// EN: Build cache key for user's file list.
/// VI: Xây dựng cache key cho danh sách file của user.
/// </summary>
/// <param name="userId">User ID</param>
/// <param name="folderId">Folder ID (optional) / Folder ID (tùy chọn)</param>
/// <returns>Cache key / Cache key</returns>
public static string UserFileList(Guid userId, Guid? folderId = null) =>
folderId.HasValue
? $"{Prefix}:files:{userId}:folder:{folderId}"
: $"{Prefix}:files:{userId}:root";
/// <summary>
/// EN: Build pattern for invalidating all user's file cache.
/// VI: Xây dựng pattern để invalidate tất cả cache file của user.
/// </summary>
/// <param name="userId">User ID</param>
/// <returns>Cache pattern / Cache pattern</returns>
public static string UserFilesPattern(Guid userId) => $"{Prefix}:files:{userId}:*";
/// <summary>
/// EN: Build cache key for folder metadata.
/// VI: Xây dựng cache key cho folder metadata.
/// </summary>
/// <param name="folderId">Folder ID</param>
/// <returns>Cache key / Cache key</returns>
public static string FolderMetadata(Guid folderId) => $"{Prefix}:folder:{folderId}";
/// <summary>
/// EN: Build cache key for folder children (files and subfolders).
/// VI: Xây dựng cache key cho folder children (files và subfolders).
/// </summary>
/// <param name="folderId">Folder ID</param>
/// <returns>Cache key / Cache key</returns>
public static string FolderChildren(Guid folderId) => $"{Prefix}:folder:{folderId}:children";
/// <summary>
/// EN: Build pattern for invalidating folder cache.
/// VI: Xây dựng pattern để invalidate cache folder.
/// </summary>
/// <param name="folderId">Folder ID</param>
/// <returns>Cache pattern / Cache pattern</returns>
public static string FolderPattern(Guid folderId) => $"{Prefix}:folder:{folderId}:*";
/// <summary>
/// EN: Build cache key for multipart upload session.
/// VI: Xây dựng cache key cho multipart upload session.
/// </summary>
/// <param name="uploadId">Upload ID</param>
/// <returns>Cache key / Cache key</returns>
public static string MultipartUpload(string uploadId) => $"{Prefix}:multipart:{uploadId}";
}

View File

@@ -0,0 +1,77 @@
namespace StorageService.Infrastructure.Caching;
/// <summary>
/// EN: Interface for Redis caching operations.
/// VI: Interface cho các thao tác Redis caching.
/// </summary>
public interface IRedisCacheService
{
/// <summary>
/// EN: Get value from cache.
/// VI: Lấy giá trị từ cache.
/// </summary>
/// <typeparam name="T">Type of cached value / Kiểu giá trị cache</typeparam>
/// <param name="key">Cache key / Key cache</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>Cached value or default / Giá trị cache hoặc default</returns>
Task<T?> GetAsync<T>(string key, CancellationToken cancellationToken = default) where T : class;
/// <summary>
/// EN: Set value in cache with TTL.
/// VI: Lưu giá trị vào cache với TTL.
/// </summary>
/// <typeparam name="T">Type of value / Kiểu giá trị</typeparam>
/// <param name="key">Cache key / Key cache</param>
/// <param name="value">Value to cache / Giá trị cần cache</param>
/// <param name="expiry">Time to live / Thời gian sống của cache</param>
/// <param name="cancellationToken">Cancellation token</param>
Task SetAsync<T>(string key, T value, TimeSpan? expiry = null, CancellationToken cancellationToken = default) where T : class;
/// <summary>
/// EN: Delete key from cache.
/// VI: Xóa key khỏi cache.
/// </summary>
/// <param name="key">Cache key / Key cache</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>True if key existed and was removed / True nếu key tồn tại và đã xóa</returns>
Task<bool> DeleteAsync(string key, CancellationToken cancellationToken = default);
/// <summary>
/// EN: Delete multiple keys matching pattern.
/// VI: Xóa nhiều keys theo pattern.
/// </summary>
/// <param name="pattern">Pattern (e.g., "user:123:*") / Pattern (ví dụ: "user:123:*")</param>
/// <param name="cancellationToken">Cancellation token</param>
Task DeleteByPatternAsync(string pattern, CancellationToken cancellationToken = default);
/// <summary>
/// EN: Check if key exists in cache.
/// VI: Kiểm tra key có tồn tại trong cache.
/// </summary>
/// <param name="key">Cache key / Key cache</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>True if exists / True nếu tồn tại</returns>
Task<bool> ExistsAsync(string key, CancellationToken cancellationToken = default);
/// <summary>
/// EN: Get value from cache, or execute factory and cache result.
/// VI: Lấy giá trị từ cache, hoặc thực thi factory và cache kết quả.
/// </summary>
/// <typeparam name="T">Type of cached value / Kiểu giá trị cache</typeparam>
/// <param name="key">Cache key / Key cache</param>
/// <param name="factory">Factory function to create value if not cached / Factory để tạo value nếu chưa cache</param>
/// <param name="expiry">Time to live / Thời gian sống của cache</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>Cached or newly created value / Giá trị từ cache hoặc mới tạo</returns>
Task<T?> GetOrSetAsync<T>(
string key,
Func<Task<T?>> factory,
TimeSpan? expiry = null,
CancellationToken cancellationToken = default) where T : class;
/// <summary>
/// EN: Check if Redis is connected.
/// VI: Kiểm tra Redis đã kết nối.
/// </summary>
bool IsConnected { get; }
}

View File

@@ -0,0 +1,224 @@
using System.Text.Json;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
namespace StorageService.Infrastructure.Caching;
/// <summary>
/// EN: Redis cache service implementation using StackExchange.Redis.
/// VI: Implementation của Redis cache service sử dụng StackExchange.Redis.
/// </summary>
public class RedisCacheService : IRedisCacheService, IAsyncDisposable
{
private readonly ILogger<RedisCacheService> _logger;
private readonly Configuration.RedisSettings _settings;
private readonly Lazy<ConnectionMultiplexer> _lazyConnection;
private readonly JsonSerializerOptions _jsonOptions;
private bool _disposed;
public RedisCacheService(
IOptions<Configuration.RedisSettings> options,
ILogger<RedisCacheService> logger)
{
_settings = options.Value;
_logger = logger;
_jsonOptions = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
WriteIndented = false
};
// EN: Lazy initialization for Redis connection
// VI: Khởi tạo lazy cho Redis connection
_lazyConnection = new Lazy<ConnectionMultiplexer>(() =>
{
try
{
var connectionString = _settings.GetConnectionString();
_logger.LogInformation("Connecting to Redis: {Host}:{Port}", _settings.Host, _settings.Port);
return ConnectionMultiplexer.Connect(connectionString);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to connect to Redis");
throw;
}
});
}
/// <summary>
/// EN: Get Redis database instance.
/// VI: Lấy instance Redis database.
/// </summary>
private IDatabase Database => _lazyConnection.Value.GetDatabase(_settings.Database);
/// <inheritdoc />
public bool IsConnected => _lazyConnection.IsValueCreated && _lazyConnection.Value.IsConnected;
/// <inheritdoc />
public async Task<T?> GetAsync<T>(string key, CancellationToken cancellationToken = default) where T : class
{
if (!_settings.Enabled)
{
_logger.LogDebug("Redis caching is disabled");
return default;
}
try
{
var value = await Database.StringGetAsync(key);
if (value.IsNullOrEmpty)
{
_logger.LogDebug("Cache miss for key: {Key}", key);
return default;
}
_logger.LogDebug("Cache hit for key: {Key}", key);
return JsonSerializer.Deserialize<T>(value.ToString(), _jsonOptions);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Error getting cache key: {Key}", key);
return default;
}
}
/// <inheritdoc />
public async Task SetAsync<T>(
string key,
T value,
TimeSpan? expiry = null,
CancellationToken cancellationToken = default) where T : class
{
if (!_settings.Enabled)
{
_logger.LogDebug("Redis caching is disabled");
return;
}
try
{
var serialized = JsonSerializer.Serialize(value, _jsonOptions);
await Database.StringSetAsync(key, serialized, expiry);
_logger.LogDebug("Cache set for key: {Key}, TTL: {Expiry}", key, expiry);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Error setting cache key: {Key}", key);
}
}
/// <inheritdoc />
public async Task<bool> DeleteAsync(string key, CancellationToken cancellationToken = default)
{
if (!_settings.Enabled)
{
return false;
}
try
{
var result = await Database.KeyDeleteAsync(key);
_logger.LogDebug("Cache deleted for key: {Key}, Result: {Result}", key, result);
return result;
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Error deleting cache key: {Key}", key);
return false;
}
}
/// <inheritdoc />
public async Task DeleteByPatternAsync(string pattern, CancellationToken cancellationToken = default)
{
if (!_settings.Enabled)
{
return;
}
try
{
var server = _lazyConnection.Value.GetServer(_settings.Host, _settings.Port);
var keys = server.Keys(pattern: pattern, database: _settings.Database);
var keyArray = keys.ToArray();
if (keyArray.Length > 0)
{
await Database.KeyDeleteAsync(keyArray);
_logger.LogDebug("Deleted {Count} keys matching pattern: {Pattern}", keyArray.Length, pattern);
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Error deleting cache keys by pattern: {Pattern}", pattern);
}
}
/// <inheritdoc />
public async Task<bool> ExistsAsync(string key, CancellationToken cancellationToken = default)
{
if (!_settings.Enabled)
{
return false;
}
try
{
return await Database.KeyExistsAsync(key);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Error checking cache key existence: {Key}", key);
return false;
}
}
/// <inheritdoc />
public async Task<T?> GetOrSetAsync<T>(
string key,
Func<Task<T?>> factory,
TimeSpan? expiry = null,
CancellationToken cancellationToken = default) where T : class
{
// EN: Try to get from cache first
// VI: Thử lấy từ cache trước
var cached = await GetAsync<T>(key, cancellationToken);
if (cached is not null)
{
return cached;
}
// EN: Cache miss - execute factory
// VI: Cache miss - thực thi factory
var value = await factory();
if (value is not null)
{
await SetAsync(key, value, expiry, cancellationToken);
}
return value;
}
/// <summary>
/// EN: Dispose Redis connection.
/// VI: Giải phóng Redis connection.
/// </summary>
public async ValueTask DisposeAsync()
{
if (_disposed)
{
return;
}
if (_lazyConnection.IsValueCreated)
{
await _lazyConnection.Value.CloseAsync();
_lazyConnection.Value.Dispose();
}
_disposed = true;
GC.SuppressFinalize(this);
}
}

View File

@@ -0,0 +1,54 @@
namespace StorageService.Infrastructure.Configuration;
/// <summary>
/// EN: Redis cache settings.
/// VI: Cấu hình Redis cache.
/// </summary>
public class RedisSettings
{
public const string SectionName = "Redis";
/// <summary>EN: Redis host / VI: Redis host</summary>
public string Host { get; set; } = "localhost";
/// <summary>EN: Redis port / VI: Redis port</summary>
public int Port { get; set; } = 6379;
/// <summary>EN: Redis password (optional) / VI: Redis password (tùy chọn)</summary>
public string? Password { get; set; }
/// <summary>EN: Redis database index / VI: Redis database index</summary>
public int Database { get; set; } = 0;
/// <summary>EN: Connection timeout in ms / VI: Timeout kết nối (ms)</summary>
public int ConnectTimeout { get; set; } = 5000;
/// <summary>EN: Sync operation timeout in ms / VI: Timeout thao tác đồng bộ (ms)</summary>
public int SyncTimeout { get; set; } = 5000;
/// <summary>EN: Enable Redis caching / VI: Bật Redis caching</summary>
public bool Enabled { get; set; } = true;
/// <summary>
/// EN: Build connection string for StackExchange.Redis.
/// VI: Xây dựng connection string cho StackExchange.Redis.
/// </summary>
public string GetConnectionString()
{
var connectionString = $"{Host}:{Port},connectTimeout={ConnectTimeout},syncTimeout={SyncTimeout}";
if (!string.IsNullOrWhiteSpace(Password))
{
connectionString += $",password={Password}";
}
if (Database > 0)
{
connectionString += $",defaultDatabase={Database}";
}
connectionString += ",abortConnect=false";
return connectionString;
}
}

View File

@@ -5,6 +5,7 @@ using Polly;
using Polly.Extensions.Http;
using StorageService.Domain.AggregatesModel.FileAggregate;
using StorageService.Domain.AggregatesModel.QuotaAggregate;
using StorageService.Infrastructure.Caching;
using StorageService.Infrastructure.Configuration;
using StorageService.Infrastructure.ExternalServices;
using StorageService.Infrastructure.Idempotency;
@@ -32,6 +33,10 @@ public static class DependencyInjection
// EN: Bind configuration sections / VI: Bind các configuration sections
services.Configure<StorageSettings>(configuration.GetSection(StorageSettings.SectionName));
services.Configure<IamServiceSettings>(configuration.GetSection(IamServiceSettings.SectionName));
services.Configure<RedisSettings>(configuration.GetSection(RedisSettings.SectionName));
// EN: Register Redis cache service / VI: Đăng ký Redis cache service
services.AddSingleton<IRedisCacheService, RedisCacheService>();
// EN: Add DbContext with PostgreSQL / VI: Thêm DbContext với PostgreSQL
if (!string.Equals(environmentName, "Testing", StringComparison.OrdinalIgnoreCase))

View File

@@ -3,6 +3,7 @@ using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using StorageService.Domain.AggregatesModel.FileAggregate;
using StorageService.Domain.AggregatesModel.QuotaAggregate;
using StorageService.Domain.AggregatesModel.MultipartUploadAggregate;
using StorageService.Domain.SeedWork;
using System.Data;
@@ -19,6 +20,8 @@ public class StorageServiceContext : DbContext, IUnitOfWork
public DbSet<StorageFile> StorageFiles => Set<StorageFile>();
public DbSet<UserStorageQuota> UserStorageQuotas => Set<UserStorageQuota>();
public DbSet<MultipartUpload> MultipartUploads => Set<MultipartUpload>();
public DbSet<MultipartUploadPart> MultipartUploadParts => Set<MultipartUploadPart>();
public IDbContextTransaction? CurrentTransaction => _currentTransaction;
public bool HasActiveTransaction => _currentTransaction != null;
@@ -171,6 +174,133 @@ public class StorageServiceContext : DbContext, IUnitOfWork
// EN: Ignore domain events / VI: Bỏ qua domain events
entity.Ignore(e => e.DomainEvents);
});
// EN: Configure MultipartUpload entity / VI: Cấu hình entity MultipartUpload
modelBuilder.Entity<MultipartUpload>(entity =>
{
entity.ToTable("multipart_uploads");
entity.HasKey(e => e.Id);
entity.Property(e => e.Id)
.HasColumnName("id");
entity.Property(e => e.UserId)
.HasColumnName("user_id")
.HasMaxLength(255)
.IsRequired();
entity.Property(e => e.FileName)
.HasColumnName("file_name")
.HasMaxLength(255)
.IsRequired();
entity.Property(e => e.TotalSizeBytes)
.HasColumnName("total_size_bytes")
.IsRequired();
entity.Property(e => e.ChunkSizeBytes)
.HasColumnName("chunk_size_bytes")
.IsRequired();
entity.Property(e => e.TotalChunks)
.HasColumnName("total_chunks")
.IsRequired();
entity.Property(e => e.UploadedChunks)
.HasColumnName("uploaded_chunks")
.IsRequired();
entity.Property(e => e.ProviderUploadId)
.HasColumnName("upload_id")
.HasMaxLength(255)
.IsRequired();
entity.Property(e => e.BucketName)
.HasColumnName("bucket_name")
.HasMaxLength(255)
.IsRequired();
entity.Property(e => e.ObjectKey)
.HasColumnName("object_key")
.HasMaxLength(500)
.IsRequired();
entity.Property(e => e.ContentType)
.HasColumnName("content_type")
.HasMaxLength(100)
.IsRequired();
entity.Property(e => e.Status)
.HasColumnName("status")
.HasConversion<string>()
.HasMaxLength(50)
.IsRequired();
entity.Property(e => e.CreatedAt)
.HasColumnName("created_at")
.IsRequired();
entity.Property(e => e.CompletedAt)
.HasColumnName("completed_at");
entity.Property(e => e.ExpiresAt)
.HasColumnName("expires_at")
.IsRequired();
// EN: Indexes / VI: Indexes
entity.HasIndex(e => new { e.UserId, e.Status });
entity.HasIndex(e => e.ProviderUploadId);
entity.HasIndex(e => e.CreatedAt);
// EN: Configure relationship with parts / VI: Cấu hình mối quan hệ với parts
entity.HasMany(e => e.Parts)
.WithOne()
.HasForeignKey(p => p.MultipartUploadId)
.OnDelete(DeleteBehavior.Cascade);
// EN: Ignore domain events / VI: Bỏ qua domain events
entity.Ignore(e => e.DomainEvents);
});
// EN: Configure MultipartUploadPart entity / VI: Cấu hình entity MultipartUploadPart
modelBuilder.Entity<MultipartUploadPart>(entity =>
{
entity.ToTable("multipart_upload_parts");
entity.HasKey(e => e.Id);
entity.Property(e => e.Id)
.HasColumnName("id");
entity.Property(e => e.MultipartUploadId)
.HasColumnName("multipart_upload_id")
.IsRequired();
entity.Property(e => e.PartNumber)
.HasColumnName("part_number")
.IsRequired();
entity.Property(e => e.ETag)
.HasColumnName("etag")
.HasMaxLength(255)
.IsRequired();
entity.Property(e => e.SizeBytes)
.HasColumnName("size_bytes")
.IsRequired();
entity.Property(e => e.UploadedAt)
.HasColumnName("uploaded_at")
.IsRequired();
// EN: Indexes / VI: Indexes
entity.HasIndex(e => new { e.MultipartUploadId, e.PartNumber })
.IsUnique();
// EN: Ignore domain events / VI: Bỏ qua domain events
entity.Ignore(e => e.DomainEvents);
});
}
/// <inheritdoc />

View File

@@ -161,4 +161,57 @@ public class AliyunOssStorageProvider : IStorageProvider
return Task.CompletedTask;
}
// ========== Multipart Upload Methods ==========
/// <inheritdoc />
public Task<string> InitiateMultipartUploadAsync(
string bucketName,
string objectKey,
string contentType,
CancellationToken cancellationToken = default)
{
// TODO: Implement using Aliyun OSS InitiateMultipartUpload
_logger.LogWarning("Multipart upload for Aliyun OSS not yet implemented");
throw new NotImplementedException("Multipart upload will be implemented in next phase");
}
/// <inheritdoc />
public Task<string> UploadPartAsync(
string bucketName,
string objectKey,
string uploadId,
int partNumber,
Stream dataStream,
CancellationToken cancellationToken = default)
{
// TODO: Implement using Aliyun OSS UploadPart
_logger.LogWarning("Upload part for Aliyun OSS not yet implemented");
throw new NotImplementedException("Upload part will be implemented in next phase");
}
/// <inheritdoc />
public Task CompleteMultipartUploadAsync(
string bucketName,
string objectKey,
string uploadId,
List<PartETag> parts,
CancellationToken cancellationToken = default)
{
// TODO: Implement using Aliyun OSS CompleteMultipartUpload
_logger.LogWarning("Complete multipart upload for Aliyun OSS not yet implemented");
throw new NotImplementedException("Complete multipart upload will be implemented in next phase");
}
/// <inheritdoc />
public Task AbortMultipartUploadAsync(
string bucketName,
string objectKey,
string uploadId,
CancellationToken cancellationToken = default)
{
// TODO: Implement using Aliyun OSS AbortMultipartUpload
_logger.LogWarning("Abort multipart upload for Aliyun OSS not yet implemented");
throw new NotImplementedException("Abort multipart upload will be implemented in next phase");
}
}

View File

@@ -96,4 +96,57 @@ public interface IStorageProvider
Task EnsureBucketExistsAsync(
string bucketName,
CancellationToken cancellationToken = default);
// ========== Multipart Upload Methods ==========
/// <summary>
/// EN: Initiate a multipart upload session.
/// VI: Khởi tạo phiên multipart upload.
/// </summary>
/// <returns>Upload ID from provider</returns>
Task<string> InitiateMultipartUploadAsync(
string bucketName,
string objectKey,
string contentType,
CancellationToken cancellationToken = default);
/// <summary>
/// EN: Upload a single part of a multipart upload.
/// VI: Upload một part của multipart upload.
/// </summary>
/// <returns>ETag of the uploaded part</returns>
Task<string> UploadPartAsync(
string bucketName,
string objectKey,
string uploadId,
int partNumber,
Stream dataStream,
CancellationToken cancellationToken = default);
/// <summary>
/// EN: Complete a multipart upload by merging all parts.
/// VI: Hoàn thành multipart upload bằng cách merge tất cả parts.
/// </summary>
Task CompleteMultipartUploadAsync(
string bucketName,
string objectKey,
string uploadId,
List<PartETag> parts,
CancellationToken cancellationToken = default);
/// <summary>
/// EN: Abort a multipart upload and cleanup all parts.
/// VI: Hủy multipart upload và cleanup tất cả parts.
/// </summary>
Task AbortMultipartUploadAsync(
string bucketName,
string objectKey,
string uploadId,
CancellationToken cancellationToken = default);
}
/// <summary>
/// EN: Represents a part ETag for multipart upload completion.
/// VI: Đại diện cho ETag của một part cho multipart upload completion.
/// </summary>
public record PartETag(int PartNumber, string ETag);

View File

@@ -1,3 +1,7 @@
using Amazon;
using Amazon.Runtime;
using Amazon.S3;
using Amazon.S3.Model;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Minio;
@@ -8,13 +12,15 @@ using StorageService.Infrastructure.Configuration;
namespace StorageService.Infrastructure.Storage;
/// <summary>
/// EN: MinIO storage provider implementation.
/// VI: Implementation storage provider cho MinIO.
/// EN: MinIO storage provider implementation using S3-compatible APIs.
/// VI: Implementation storage provider cho MinIO sử dụng S3-compatible APIs.
/// </summary>
public class MinioStorageProvider : IStorageProvider
{
private readonly IMinioClient _minioClient;
private readonly IAmazonS3 _s3Client;
private readonly ILogger<MinioStorageProvider> _logger;
private readonly MinioSettings _settings;
public StorageProvider ProviderType => StorageProvider.MinIO;
@@ -23,15 +29,31 @@ public class MinioStorageProvider : IStorageProvider
ILogger<MinioStorageProvider> logger)
{
_logger = logger;
var minioSettings = settings.Value.MinIO;
_settings = settings.Value.MinIO;
// EN: Initialize MinIO client for normal operations
// VI: Khởi tạo MinIO client cho các thao tác thông thường
_minioClient = new MinioClient()
.WithEndpoint(minioSettings.Endpoint)
.WithCredentials(minioSettings.AccessKey, minioSettings.SecretKey)
.WithSSL(minioSettings.UseSSL)
.WithEndpoint(_settings.Endpoint)
.WithCredentials(_settings.AccessKey, _settings.SecretKey)
.WithSSL(_settings.UseSSL)
.Build();
_logger.LogInformation("MinIO client initialized with endpoint: {Endpoint}", minioSettings.Endpoint);
// EN: Initialize S3 client for low-level multipart upload operations
// VI: Khởi tạo S3 client cho các thao tác multipart upload low-level
var s3Config = new AmazonS3Config
{
ServiceURL = $"{(_settings.UseSSL ? "https" : "http")}://{_settings.Endpoint}",
ForcePathStyle = true, // EN: Required for MinIO / VI: Bắt buộc cho MinIO
UseHttp = !_settings.UseSSL
};
var credentials = new BasicAWSCredentials(_settings.AccessKey, _settings.SecretKey);
_s3Client = new AmazonS3Client(credentials, s3Config);
_logger.LogInformation(
"MinIO client initialized with endpoint: {Endpoint} (S3-compatible multipart enabled)",
_settings.Endpoint);
}
/// <inheritdoc />
@@ -183,4 +205,173 @@ public class MinioStorageProvider : IStorageProvider
_logger.LogInformation("Created MinIO bucket: {Bucket}", bucketName);
}
}
// ========== Multipart Upload Methods (S3-Compatible) ==========
/// <inheritdoc />
public async Task<string> InitiateMultipartUploadAsync(
string bucketName,
string objectKey,
string contentType,
CancellationToken cancellationToken = default)
{
try
{
_logger.LogInformation(
"Initiating multipart upload for {Bucket}/{ObjectKey}",
bucketName, objectKey);
// EN: Ensure bucket exists / VI: Đảm bảo bucket tồn tại
await EnsureBucketExistsAsync(bucketName, cancellationToken);
// EN: Initiate multipart upload using S3 API / VI: Khởi tạo multipart upload sử dụng S3 API
var request = new InitiateMultipartUploadRequest
{
BucketName = bucketName,
Key = objectKey,
ContentType = contentType
};
var response = await _s3Client.InitiateMultipartUploadAsync(request, cancellationToken);
_logger.LogInformation(
"Multipart upload initiated. UploadId: {UploadId}",
response.UploadId);
return response.UploadId;
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to initiate multipart upload for {Bucket}/{ObjectKey}",
bucketName, objectKey);
throw;
}
}
/// <inheritdoc />
public async Task<string> UploadPartAsync(
string bucketName,
string objectKey,
string uploadId,
int partNumber,
Stream dataStream,
CancellationToken cancellationToken = default)
{
try
{
_logger.LogDebug(
"Uploading part {PartNumber} for {Bucket}/{ObjectKey}, UploadId: {UploadId}",
partNumber, bucketName, objectKey, uploadId);
// EN: Upload part using S3 API / VI: Upload part sử dụng S3 API
var request = new UploadPartRequest
{
BucketName = bucketName,
Key = objectKey,
UploadId = uploadId,
PartNumber = partNumber,
InputStream = dataStream
};
var response = await _s3Client.UploadPartAsync(request, cancellationToken);
_logger.LogInformation(
"Part {PartNumber} uploaded. ETag: {ETag}",
partNumber, response.ETag);
return response.ETag;
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to upload part {PartNumber} for {Bucket}/{ObjectKey}",
partNumber, bucketName, objectKey);
throw;
}
}
/// <inheritdoc />
public async Task CompleteMultipartUploadAsync(
string bucketName,
string objectKey,
string uploadId,
List<PartETag> parts,
CancellationToken cancellationToken = default)
{
try
{
_logger.LogInformation(
"Completing multipart upload for {Bucket}/{ObjectKey}, UploadId: {UploadId}, Parts: {PartCount}",
bucketName, objectKey, uploadId, parts.Count);
// EN: Convert to S3 SDK format / VI: Chuyển đổi sang định dạng S3 SDK
var s3Parts = parts
.OrderBy(p => p.PartNumber)
.Select(p => new Amazon.S3.Model.PartETag
{
PartNumber = p.PartNumber,
ETag = p.ETag
})
.ToList();
// EN: Complete multipart upload using S3 API / VI: Hoàn thành multipart upload sử dụng S3 API
var request = new CompleteMultipartUploadRequest
{
BucketName = bucketName,
Key = objectKey,
UploadId = uploadId,
PartETags = s3Parts
};
await _s3Client.CompleteMultipartUploadAsync(request, cancellationToken);
_logger.LogInformation(
"Multipart upload completed successfully for {Bucket}/{ObjectKey}",
bucketName, objectKey);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to complete multipart upload for {Bucket}/{ObjectKey}",
bucketName, objectKey);
throw;
}
}
/// <inheritdoc />
public async Task AbortMultipartUploadAsync(
string bucketName,
string objectKey,
string uploadId,
CancellationToken cancellationToken = default)
{
try
{
_logger.LogInformation(
"Aborting multipart upload for {Bucket}/{ObjectKey}, UploadId: {UploadId}",
bucketName, objectKey, uploadId);
// EN: Abort multipart upload using S3 API / VI: Hủy multipart upload sử dụng S3 API
var request = new AbortMultipartUploadRequest
{
BucketName = bucketName,
Key = objectKey,
UploadId = uploadId
};
await _s3Client.AbortMultipartUploadAsync(request, cancellationToken);
_logger.LogInformation(
"Multipart upload aborted for {Bucket}/{ObjectKey}",
bucketName, objectKey);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to abort multipart upload for {Bucket}/{ObjectKey}",
bucketName, objectKey);
throw;
}
}
}

View File

@@ -31,6 +31,10 @@
<!-- EN: MinIO S3 Client / VI: MinIO S3 Client -->
<PackageReference Include="Minio" Version="6.0.4" />
<!-- EN: AWS S3 SDK for low-level multipart upload (S3-compatible with MinIO) -->
<!-- VI: AWS S3 SDK cho multipart upload low-level (S3-compatible với MinIO) -->
<PackageReference Include="AWSSDK.S3" Version="3.7.400" />
<!-- EN: Alibaba Cloud OSS SDK / VI: Alibaba Cloud OSS SDK -->
<PackageReference Include="Aliyun.OSS.SDK.NetCore" Version="2.14.1" />
</ItemGroup>