Merge pull request 'feat: add presigned URL import functionality' (#33) from refactor-architecture into main
Reviewed-on: #33
This commit is contained in:
commit
92dce8eb00
@ -4,6 +4,8 @@ import com.magamochi.common.model.dto.DefaultResponseDTO;
|
||||
import com.magamochi.content.model.dto.FileImportRequestDTO;
|
||||
import com.magamochi.content.model.dto.MangaContentDTO;
|
||||
import com.magamochi.content.model.dto.MangaContentImagesDTO;
|
||||
import com.magamochi.content.model.dto.PresignedImportRequestDTO;
|
||||
import com.magamochi.content.model.dto.PresignedImportResponseDTO;
|
||||
import com.magamochi.content.model.enumeration.ContentArchiveFileType;
|
||||
import com.magamochi.content.service.ContentDownloadService;
|
||||
import com.magamochi.content.service.ContentImportService;
|
||||
@ -93,4 +95,16 @@ public class ContentController {
|
||||
|
||||
return DefaultResponseDTO.ok().build();
|
||||
}
|
||||
|
||||
@Operation(
|
||||
summary = "Request presigned URL for import",
|
||||
description =
|
||||
"Generates a presigned URL to upload a file directly to S3 and registers a pending import job.",
|
||||
tags = {"Content"},
|
||||
operationId = "requestPresignedImport")
|
||||
@PostMapping(value = "/import/presigned")
|
||||
public DefaultResponseDTO<PresignedImportResponseDTO> requestPresignedImport(
|
||||
@RequestBody PresignedImportRequestDTO request) {
|
||||
return DefaultResponseDTO.ok(contentImportService.requestPresignedImport(request));
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,8 @@
|
||||
package com.magamochi.content.model.dto;
|
||||
|
||||
import jakarta.validation.constraints.NotBlank;
|
||||
import lombok.Builder;
|
||||
|
||||
@Builder
|
||||
public record PresignedImportRequestDTO(
|
||||
Long malId, Long aniListId, @NotBlank String originalFilename) {}
|
||||
@ -0,0 +1,6 @@
|
||||
package com.magamochi.content.model.dto;
|
||||
|
||||
import lombok.Builder;
|
||||
|
||||
@Builder
|
||||
public record PresignedImportResponseDTO(Long jobId, String presignedUrl, String fileKey) {}
|
||||
@ -0,0 +1,36 @@
|
||||
package com.magamochi.content.model.entity;
|
||||
|
||||
import com.magamochi.content.model.enumeration.ImportJobStatus;
|
||||
import jakarta.persistence.*;
|
||||
import java.time.Instant;
|
||||
import lombok.*;
|
||||
import org.hibernate.annotations.CreationTimestamp;
|
||||
import org.hibernate.annotations.UpdateTimestamp;
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Entity
|
||||
@Table(name = "manga_import_job")
|
||||
public class MangaImportJob {
|
||||
@Id
|
||||
@GeneratedValue(strategy = GenerationType.IDENTITY)
|
||||
private Long id;
|
||||
|
||||
private Long malId;
|
||||
|
||||
private Long aniListId;
|
||||
|
||||
private String originalFilename;
|
||||
|
||||
private String s3FileKey;
|
||||
|
||||
@Enumerated(EnumType.STRING)
|
||||
private ImportJobStatus status;
|
||||
|
||||
@CreationTimestamp private Instant createdAt;
|
||||
|
||||
@UpdateTimestamp private Instant updatedAt;
|
||||
}
|
||||
@ -0,0 +1,8 @@
|
||||
package com.magamochi.content.model.enumeration;
|
||||
|
||||
public enum ImportJobStatus {
|
||||
PENDING,
|
||||
PROCESSING,
|
||||
SUCCESS,
|
||||
FAILED
|
||||
}
|
||||
@ -0,0 +1,12 @@
|
||||
package com.magamochi.content.model.repository;
|
||||
|
||||
import com.magamochi.content.model.entity.MangaImportJob;
|
||||
import com.magamochi.content.model.enumeration.ImportJobStatus;
|
||||
import java.util.List;
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
@Repository
|
||||
public interface MangaImportJobRepository extends JpaRepository<MangaImportJob, Long> {
|
||||
List<MangaImportJob> findByStatus(ImportJobStatus status);
|
||||
}
|
||||
@ -1,3 +1,4 @@
|
||||
package com.magamochi.content.queue.command;
|
||||
|
||||
public record FileImportCommand(long mangaContentProviderId, String filename) {}
|
||||
public record FileImportCommand(
|
||||
long mangaContentProviderId, String filename, Long mangaImportJobId) {}
|
||||
|
||||
@ -1,5 +1,7 @@
|
||||
package com.magamochi.content.queue.consumer;
|
||||
|
||||
import static java.util.Objects.nonNull;
|
||||
|
||||
import com.magamochi.content.queue.command.FileImportCommand;
|
||||
import com.magamochi.content.service.ContentImportService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
@ -16,6 +18,23 @@ public class FileImportConsumer {
|
||||
@RabbitListener(queues = "${queues.file-import}")
|
||||
public void receiveFileImportCommand(FileImportCommand command) {
|
||||
log.info("Received file import command: {}", command);
|
||||
contentImportService.importFile(command.mangaContentProviderId(), command.filename());
|
||||
try {
|
||||
contentImportService.importFile(
|
||||
command.mangaContentProviderId(), command.filename(), command.mangaImportJobId());
|
||||
|
||||
if (nonNull(command.mangaImportJobId())) {
|
||||
contentImportService.updateJobStatus(
|
||||
command.mangaImportJobId(),
|
||||
com.magamochi.content.model.enumeration.ImportJobStatus.SUCCESS);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
if (nonNull(command.mangaImportJobId())) {
|
||||
contentImportService.updateJobStatus(
|
||||
command.mangaImportJobId(),
|
||||
com.magamochi.content.model.enumeration.ImportJobStatus.FAILED);
|
||||
}
|
||||
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,14 +1,20 @@
|
||||
package com.magamochi.content.service;
|
||||
|
||||
import static java.util.Objects.isNull;
|
||||
import static java.util.Objects.nonNull;
|
||||
import static org.apache.commons.lang3.StringUtils.isBlank;
|
||||
|
||||
import com.magamochi.catalog.service.MangaContentProviderService;
|
||||
import com.magamochi.catalog.service.MangaResolutionService;
|
||||
import com.magamochi.common.exception.UnprocessableException;
|
||||
import com.magamochi.common.model.enumeration.ContentType;
|
||||
import com.magamochi.content.model.dto.PresignedImportRequestDTO;
|
||||
import com.magamochi.content.model.dto.PresignedImportResponseDTO;
|
||||
import com.magamochi.content.model.entity.MangaContentImage;
|
||||
import com.magamochi.content.model.entity.MangaImportJob;
|
||||
import com.magamochi.content.model.enumeration.ImportJobStatus;
|
||||
import com.magamochi.content.model.repository.MangaContentImageRepository;
|
||||
import com.magamochi.content.model.repository.MangaImportJobRepository;
|
||||
import com.magamochi.content.queue.command.FileImportCommand;
|
||||
import com.magamochi.content.queue.producer.FileImportProducer;
|
||||
import com.magamochi.image.service.ImageFetchService;
|
||||
@ -25,6 +31,7 @@ import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import net.greypanther.natsort.CaseInsensitiveSimpleNaturalComparator;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Propagation;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.web.multipart.MultipartFile;
|
||||
|
||||
@ -41,6 +48,7 @@ public class ContentImportService {
|
||||
|
||||
private final FileImportProducer fileImportProducer;
|
||||
private final MangaContentImageRepository mangaContentImageRepository;
|
||||
private final MangaImportJobRepository mangaImportJobRepository;
|
||||
private final ImageService imageService;
|
||||
|
||||
public void importFiles(String malId, String aniListId, @NotNull List<MultipartFile> files) {
|
||||
@ -73,43 +81,86 @@ public class ContentImportService {
|
||||
log.info("Temp file uploaded to S3: {}", filename);
|
||||
|
||||
fileImportProducer.sendFileImportCommand(
|
||||
new FileImportCommand(mangaContentProvider.getId(), filename));
|
||||
new FileImportCommand(mangaContentProvider.getId(), filename, null));
|
||||
} catch (IOException e) {
|
||||
throw new UnprocessableException("Failed to upload file to S3");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public PresignedImportResponseDTO requestPresignedImport(PresignedImportRequestDTO request) {
|
||||
if (isNull(request.malId()) && isNull(request.aniListId())) {
|
||||
throw new UnprocessableException("Either MyAnimeList or AniList IDs are required.");
|
||||
}
|
||||
|
||||
var uuid = UUID.randomUUID().toString();
|
||||
var fileKey = "temp/import/" + uuid + "-" + request.originalFilename();
|
||||
|
||||
var job =
|
||||
mangaImportJobRepository.save(
|
||||
MangaImportJob.builder()
|
||||
.malId(request.malId())
|
||||
.aniListId(request.aniListId())
|
||||
.originalFilename(request.originalFilename())
|
||||
.s3FileKey(fileKey)
|
||||
.status(ImportJobStatus.PENDING)
|
||||
.build());
|
||||
|
||||
var presignedUrl = s3Service.generatePresignedUploadUrl(fileKey);
|
||||
|
||||
return PresignedImportResponseDTO.builder()
|
||||
.jobId(job.getId())
|
||||
.fileKey(fileKey)
|
||||
.presignedUrl(presignedUrl)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Transactional(propagation = Propagation.REQUIRES_NEW)
|
||||
public void updateJobStatus(Long jobId, ImportJobStatus status) {
|
||||
mangaImportJobRepository
|
||||
.findById(jobId)
|
||||
.ifPresent(
|
||||
job -> {
|
||||
job.setStatus(status);
|
||||
mangaImportJobRepository.save(job);
|
||||
});
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public void importFile(Long mangaContentProviderId, String filename) {
|
||||
public void importFile(Long mangaContentProviderId, String filename, Long mangaImportJobId) {
|
||||
var contentName = removeImportPrefix(removeFileExtension(filename));
|
||||
|
||||
if (nonNull(mangaImportJobId)) {
|
||||
var jobOpt = mangaImportJobRepository.findById(mangaImportJobId);
|
||||
if (jobOpt.isPresent()) {
|
||||
contentName = removeFileExtension(jobOpt.get().getOriginalFilename());
|
||||
}
|
||||
}
|
||||
|
||||
var mangaContent =
|
||||
contentIngestService.ingest(
|
||||
mangaContentProviderId,
|
||||
removeImportPrefix(removeFileExtension(filename)),
|
||||
null,
|
||||
"en-US");
|
||||
contentIngestService.ingest(mangaContentProviderId, contentName, null, "en-US");
|
||||
|
||||
try (var is = s3Service.getFileStream(filename);
|
||||
var zis = new ZipInputStream(is)) {
|
||||
|
||||
Map<String, byte[]> entryMap =
|
||||
new TreeMap<>(
|
||||
CaseInsensitiveSimpleNaturalComparator
|
||||
.getInstance()); // TreeMap keeps keys sorted naturally
|
||||
new TreeMap<>(CaseInsensitiveSimpleNaturalComparator.getInstance());
|
||||
|
||||
ZipEntry entry;
|
||||
while ((entry = zis.getNextEntry()) != null) {
|
||||
if (entry.isDirectory()) continue;
|
||||
if (entry.isDirectory()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
ByteArrayOutputStream os = new ByteArrayOutputStream();
|
||||
var os = new ByteArrayOutputStream();
|
||||
zis.transferTo(os);
|
||||
entryMap.put(entry.getName(), os.toByteArray());
|
||||
zis.closeEntry();
|
||||
}
|
||||
|
||||
int position = 0;
|
||||
for (Map.Entry<String, byte[]> sortedEntry : entryMap.entrySet()) {
|
||||
byte[] bytes = sortedEntry.getValue();
|
||||
var position = 0;
|
||||
for (var sortedEntry : entryMap.entrySet()) {
|
||||
var bytes = sortedEntry.getValue();
|
||||
|
||||
var imageId = imageFetchService.uploadImage(bytes, null, ContentType.CONTENT_IMAGE);
|
||||
var image = imageService.find(imageId);
|
||||
@ -134,7 +185,7 @@ public class ContentImportService {
|
||||
return filename;
|
||||
}
|
||||
|
||||
int lastDotIndex = filename.lastIndexOf('.');
|
||||
var lastDotIndex = filename.lastIndexOf('.');
|
||||
|
||||
// No dot, or dot is the first character (like .gitignore)
|
||||
if (lastDotIndex <= 0) {
|
||||
|
||||
@ -0,0 +1,56 @@
|
||||
package com.magamochi.content.task;
|
||||
|
||||
import com.magamochi.catalog.service.MangaContentProviderService;
|
||||
import com.magamochi.catalog.service.MangaResolutionService;
|
||||
import com.magamochi.content.model.enumeration.ImportJobStatus;
|
||||
import com.magamochi.content.model.repository.MangaImportJobRepository;
|
||||
import com.magamochi.content.queue.command.FileImportCommand;
|
||||
import com.magamochi.content.queue.producer.FileImportProducer;
|
||||
import com.magamochi.image.service.S3Service;
|
||||
import com.magamochi.ingestion.service.ContentProviderService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Log4j2
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class PendingImportScannerTask {
|
||||
private final MangaImportJobRepository mangaImportJobRepository;
|
||||
private final S3Service s3Service;
|
||||
private final MangaResolutionService mangaResolutionService;
|
||||
private final ContentProviderService contentProviderService;
|
||||
private final MangaContentProviderService mangaContentProviderService;
|
||||
private final FileImportProducer fileImportProducer;
|
||||
|
||||
@Scheduled(fixedDelayString = "${tasks.pending-import-scanner.delay:30000}")
|
||||
public void scanPendingImports() {
|
||||
var pendingJobs = mangaImportJobRepository.findByStatus(ImportJobStatus.PENDING);
|
||||
|
||||
for (var job : pendingJobs) {
|
||||
if (!s3Service.objectExists(job.getS3FileKey())) {
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("Found file for job {} in S3: {}", job.getId(), job.getS3FileKey());
|
||||
|
||||
try {
|
||||
var manga = mangaResolutionService.findOrCreateManga(job.getAniListId(), job.getMalId());
|
||||
|
||||
var contentProvider = contentProviderService.findManualImportContentProvider();
|
||||
var mangaContentProvider = mangaContentProviderService.findOrCreate(manga, contentProvider);
|
||||
|
||||
job.setStatus(ImportJobStatus.PROCESSING);
|
||||
mangaImportJobRepository.save(job);
|
||||
|
||||
fileImportProducer.sendFileImportCommand(
|
||||
new FileImportCommand(mangaContentProvider.getId(), job.getS3FileKey(), job.getId()));
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to enqueue job {}", job.getId(), e);
|
||||
job.setStatus(ImportJobStatus.FAILED);
|
||||
mangaImportJobRepository.save(job);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -9,6 +9,7 @@ import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
|
||||
import software.amazon.awssdk.regions.Region;
|
||||
import software.amazon.awssdk.services.s3.S3Client;
|
||||
import software.amazon.awssdk.services.s3.S3Configuration;
|
||||
import software.amazon.awssdk.services.s3.presigner.S3Presigner;
|
||||
|
||||
@Configuration
|
||||
public class S3ClientConfig {
|
||||
@ -34,4 +35,18 @@ public class S3ClientConfig {
|
||||
.serviceConfiguration(configuration)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public S3Presigner s3Presigner() {
|
||||
var credentials = AwsBasicCredentials.create(accessKey, secretKey);
|
||||
|
||||
var configuration = S3Configuration.builder().pathStyleAccessEnabled(true).build();
|
||||
|
||||
return S3Presigner.builder()
|
||||
.endpointOverride(URI.create(endpoint))
|
||||
.credentialsProvider(StaticCredentialsProvider.create(credentials))
|
||||
.region(Region.US_EAST_1)
|
||||
.serviceConfiguration(configuration)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
@ -3,18 +3,23 @@ package com.magamochi.image.service;
|
||||
import static java.util.Objects.nonNull;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import software.amazon.awssdk.core.sync.RequestBody;
|
||||
import software.amazon.awssdk.services.s3.S3Client;
|
||||
import software.amazon.awssdk.services.s3.model.*;
|
||||
import software.amazon.awssdk.services.s3.presigner.S3Presigner;
|
||||
import software.amazon.awssdk.services.s3.presigner.model.PutObjectPresignRequest;
|
||||
|
||||
@Log4j2
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class S3Service {
|
||||
@ -26,6 +31,7 @@ public class S3Service {
|
||||
private String baseUrl;
|
||||
|
||||
private final S3Client s3Client;
|
||||
private final S3Presigner s3Presigner;
|
||||
|
||||
public String uploadFile(byte[] data, String contentType, String filename) {
|
||||
var request =
|
||||
@ -64,14 +70,14 @@ public class S3Service {
|
||||
}
|
||||
|
||||
final int BATCH_SIZE = 500;
|
||||
List<ObjectIdentifier> allObjects =
|
||||
var allObjects =
|
||||
objectKeys.stream().map(key -> ObjectIdentifier.builder().key(key).build()).toList();
|
||||
|
||||
for (int i = 0; i < allObjects.size(); i += BATCH_SIZE) {
|
||||
for (var i = 0; i < allObjects.size(); i += BATCH_SIZE) {
|
||||
var end = Math.min(i + BATCH_SIZE, allObjects.size());
|
||||
List<ObjectIdentifier> batch = allObjects.subList(i, end);
|
||||
var batch = allObjects.subList(i, end);
|
||||
|
||||
DeleteObjectsRequest deleteRequest =
|
||||
var deleteRequest =
|
||||
DeleteObjectsRequest.builder()
|
||||
.bucket(bucket)
|
||||
.delete(Delete.builder().objects(batch).build())
|
||||
@ -84,23 +90,17 @@ public class S3Service {
|
||||
response
|
||||
.errors()
|
||||
.forEach(
|
||||
error ->
|
||||
System.err.println(
|
||||
"Error deleting key: " + error.key() + " -> " + error.message()));
|
||||
} else {
|
||||
System.out.println(
|
||||
"Deleted "
|
||||
+ batch.size()
|
||||
+ " objects successfully (batch "
|
||||
+ (i / BATCH_SIZE + 1)
|
||||
+ ")");
|
||||
error -> log.error("Error deleting key: {} -> {}", error.key(), error.message()));
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
log.info("Deleted {} objects successfully (batch {})", batch.size(), i / BATCH_SIZE + 1);
|
||||
} catch (S3Exception e) {
|
||||
System.err.println(
|
||||
"Failed to delete batch starting at index "
|
||||
+ i
|
||||
+ ": "
|
||||
+ e.awsErrorDetails().errorMessage());
|
||||
log.error(
|
||||
"Failed to delete batch starting at index {}: {}",
|
||||
i,
|
||||
e.awsErrorDetails().errorMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -110,4 +110,33 @@ public class S3Service {
|
||||
|
||||
return s3Client.getObject(request);
|
||||
}
|
||||
|
||||
public String generatePresignedUploadUrl(String key) {
|
||||
try {
|
||||
var putObjectRequest = PutObjectRequest.builder().bucket(bucket).key(key).build();
|
||||
|
||||
var presignRequest =
|
||||
PutObjectPresignRequest.builder()
|
||||
.signatureDuration(Duration.ofMinutes(60))
|
||||
.putObjectRequest(putObjectRequest)
|
||||
.build();
|
||||
|
||||
return s3Presigner.presignPutObject(presignRequest).url().toString();
|
||||
} catch (Exception e) {
|
||||
log.error("Error generating presigned url for key: {}", key, e);
|
||||
throw new RuntimeException("Failed to generate presigned url", e);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean objectExists(String key) {
|
||||
try {
|
||||
s3Client.headObject(HeadObjectRequest.builder().bucket(bucket).key(key).build());
|
||||
return true;
|
||||
} catch (NoSuchKeyException e) {
|
||||
return false;
|
||||
} catch (Exception e) {
|
||||
log.error("Error checking object existence for key: {}", key, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
11
src/main/resources/db/migration/V0007__MANGA_IMPORT_JOB.sql
Normal file
11
src/main/resources/db/migration/V0007__MANGA_IMPORT_JOB.sql
Normal file
@ -0,0 +1,11 @@
|
||||
CREATE TABLE manga_import_job
|
||||
(
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
mal_id BIGINT,
|
||||
ani_list_id BIGINT,
|
||||
original_filename VARCHAR(1000) NOT NULL,
|
||||
s3_file_key VARCHAR(1000) NOT NULL,
|
||||
status VARCHAR(50) NOT NULL,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
Loading…
x
Reference in New Issue
Block a user