diff --git a/src/main/java/com/magamochi/content/controller/ContentController.java b/src/main/java/com/magamochi/content/controller/ContentController.java index d671553..68d25fa 100644 --- a/src/main/java/com/magamochi/content/controller/ContentController.java +++ b/src/main/java/com/magamochi/content/controller/ContentController.java @@ -4,6 +4,8 @@ import com.magamochi.common.model.dto.DefaultResponseDTO; import com.magamochi.content.model.dto.FileImportRequestDTO; import com.magamochi.content.model.dto.MangaContentDTO; import com.magamochi.content.model.dto.MangaContentImagesDTO; +import com.magamochi.content.model.dto.PresignedImportRequestDTO; +import com.magamochi.content.model.dto.PresignedImportResponseDTO; import com.magamochi.content.model.enumeration.ContentArchiveFileType; import com.magamochi.content.service.ContentDownloadService; import com.magamochi.content.service.ContentImportService; @@ -93,4 +95,16 @@ public class ContentController { return DefaultResponseDTO.ok().build(); } + + @Operation( + summary = "Request presigned URL for import", + description = + "Generates a presigned URL to upload a file directly to S3 and registers a pending import job.", + tags = {"Content"}, + operationId = "requestPresignedImport") + @PostMapping(value = "/import/presigned") + public DefaultResponseDTO requestPresignedImport( + @RequestBody PresignedImportRequestDTO request) { + return DefaultResponseDTO.ok(contentImportService.requestPresignedImport(request)); + } } diff --git a/src/main/java/com/magamochi/content/model/dto/PresignedImportRequestDTO.java b/src/main/java/com/magamochi/content/model/dto/PresignedImportRequestDTO.java new file mode 100644 index 0000000..6f9daff --- /dev/null +++ b/src/main/java/com/magamochi/content/model/dto/PresignedImportRequestDTO.java @@ -0,0 +1,8 @@ +package com.magamochi.content.model.dto; + +import jakarta.validation.constraints.NotBlank; +import lombok.Builder; + +@Builder +public record PresignedImportRequestDTO( + Long malId, Long aniListId, @NotBlank String originalFilename) {} diff --git a/src/main/java/com/magamochi/content/model/dto/PresignedImportResponseDTO.java b/src/main/java/com/magamochi/content/model/dto/PresignedImportResponseDTO.java new file mode 100644 index 0000000..791a90e --- /dev/null +++ b/src/main/java/com/magamochi/content/model/dto/PresignedImportResponseDTO.java @@ -0,0 +1,6 @@ +package com.magamochi.content.model.dto; + +import lombok.Builder; + +@Builder +public record PresignedImportResponseDTO(Long jobId, String presignedUrl, String fileKey) {} diff --git a/src/main/java/com/magamochi/content/model/entity/MangaImportJob.java b/src/main/java/com/magamochi/content/model/entity/MangaImportJob.java new file mode 100644 index 0000000..0e4f885 --- /dev/null +++ b/src/main/java/com/magamochi/content/model/entity/MangaImportJob.java @@ -0,0 +1,36 @@ +package com.magamochi.content.model.entity; + +import com.magamochi.content.model.enumeration.ImportJobStatus; +import jakarta.persistence.*; +import java.time.Instant; +import lombok.*; +import org.hibernate.annotations.CreationTimestamp; +import org.hibernate.annotations.UpdateTimestamp; + +@Getter +@Setter +@Builder +@NoArgsConstructor +@AllArgsConstructor +@Entity +@Table(name = "manga_import_job") +public class MangaImportJob { + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long id; + + private Long malId; + + private Long aniListId; + + private String originalFilename; + + private String s3FileKey; + + @Enumerated(EnumType.STRING) + private ImportJobStatus status; + + @CreationTimestamp private Instant createdAt; + + @UpdateTimestamp private Instant updatedAt; +} diff --git a/src/main/java/com/magamochi/content/model/enumeration/ImportJobStatus.java b/src/main/java/com/magamochi/content/model/enumeration/ImportJobStatus.java new file mode 100644 index 0000000..d3ef8ae --- /dev/null +++ b/src/main/java/com/magamochi/content/model/enumeration/ImportJobStatus.java @@ -0,0 +1,8 @@ +package com.magamochi.content.model.enumeration; + +public enum ImportJobStatus { + PENDING, + PROCESSING, + SUCCESS, + FAILED +} diff --git a/src/main/java/com/magamochi/content/model/repository/MangaImportJobRepository.java b/src/main/java/com/magamochi/content/model/repository/MangaImportJobRepository.java new file mode 100644 index 0000000..7fa8d0c --- /dev/null +++ b/src/main/java/com/magamochi/content/model/repository/MangaImportJobRepository.java @@ -0,0 +1,12 @@ +package com.magamochi.content.model.repository; + +import com.magamochi.content.model.entity.MangaImportJob; +import com.magamochi.content.model.enumeration.ImportJobStatus; +import java.util.List; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.stereotype.Repository; + +@Repository +public interface MangaImportJobRepository extends JpaRepository { + List findByStatus(ImportJobStatus status); +} diff --git a/src/main/java/com/magamochi/content/queue/command/FileImportCommand.java b/src/main/java/com/magamochi/content/queue/command/FileImportCommand.java index 5243531..759915b 100644 --- a/src/main/java/com/magamochi/content/queue/command/FileImportCommand.java +++ b/src/main/java/com/magamochi/content/queue/command/FileImportCommand.java @@ -1,3 +1,4 @@ package com.magamochi.content.queue.command; -public record FileImportCommand(long mangaContentProviderId, String filename) {} +public record FileImportCommand( + long mangaContentProviderId, String filename, Long mangaImportJobId) {} diff --git a/src/main/java/com/magamochi/content/queue/consumer/FileImportConsumer.java b/src/main/java/com/magamochi/content/queue/consumer/FileImportConsumer.java index 285765b..55de27e 100644 --- a/src/main/java/com/magamochi/content/queue/consumer/FileImportConsumer.java +++ b/src/main/java/com/magamochi/content/queue/consumer/FileImportConsumer.java @@ -1,5 +1,7 @@ package com.magamochi.content.queue.consumer; +import static java.util.Objects.nonNull; + import com.magamochi.content.queue.command.FileImportCommand; import com.magamochi.content.service.ContentImportService; import lombok.RequiredArgsConstructor; @@ -16,6 +18,23 @@ public class FileImportConsumer { @RabbitListener(queues = "${queues.file-import}") public void receiveFileImportCommand(FileImportCommand command) { log.info("Received file import command: {}", command); - contentImportService.importFile(command.mangaContentProviderId(), command.filename()); + try { + contentImportService.importFile( + command.mangaContentProviderId(), command.filename(), command.mangaImportJobId()); + + if (nonNull(command.mangaImportJobId())) { + contentImportService.updateJobStatus( + command.mangaImportJobId(), + com.magamochi.content.model.enumeration.ImportJobStatus.SUCCESS); + } + } catch (Exception e) { + if (nonNull(command.mangaImportJobId())) { + contentImportService.updateJobStatus( + command.mangaImportJobId(), + com.magamochi.content.model.enumeration.ImportJobStatus.FAILED); + } + + throw e; + } } } diff --git a/src/main/java/com/magamochi/content/service/ContentImportService.java b/src/main/java/com/magamochi/content/service/ContentImportService.java index 9adafef..4d87c90 100644 --- a/src/main/java/com/magamochi/content/service/ContentImportService.java +++ b/src/main/java/com/magamochi/content/service/ContentImportService.java @@ -1,14 +1,20 @@ package com.magamochi.content.service; import static java.util.Objects.isNull; +import static java.util.Objects.nonNull; import static org.apache.commons.lang3.StringUtils.isBlank; import com.magamochi.catalog.service.MangaContentProviderService; import com.magamochi.catalog.service.MangaResolutionService; import com.magamochi.common.exception.UnprocessableException; import com.magamochi.common.model.enumeration.ContentType; +import com.magamochi.content.model.dto.PresignedImportRequestDTO; +import com.magamochi.content.model.dto.PresignedImportResponseDTO; import com.magamochi.content.model.entity.MangaContentImage; +import com.magamochi.content.model.entity.MangaImportJob; +import com.magamochi.content.model.enumeration.ImportJobStatus; import com.magamochi.content.model.repository.MangaContentImageRepository; +import com.magamochi.content.model.repository.MangaImportJobRepository; import com.magamochi.content.queue.command.FileImportCommand; import com.magamochi.content.queue.producer.FileImportProducer; import com.magamochi.image.service.ImageFetchService; @@ -25,6 +31,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import net.greypanther.natsort.CaseInsensitiveSimpleNaturalComparator; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.multipart.MultipartFile; @@ -41,6 +48,7 @@ public class ContentImportService { private final FileImportProducer fileImportProducer; private final MangaContentImageRepository mangaContentImageRepository; + private final MangaImportJobRepository mangaImportJobRepository; private final ImageService imageService; public void importFiles(String malId, String aniListId, @NotNull List files) { @@ -73,43 +81,86 @@ public class ContentImportService { log.info("Temp file uploaded to S3: {}", filename); fileImportProducer.sendFileImportCommand( - new FileImportCommand(mangaContentProvider.getId(), filename)); + new FileImportCommand(mangaContentProvider.getId(), filename, null)); } catch (IOException e) { throw new UnprocessableException("Failed to upload file to S3"); } }); } + public PresignedImportResponseDTO requestPresignedImport(PresignedImportRequestDTO request) { + if (isNull(request.malId()) && isNull(request.aniListId())) { + throw new UnprocessableException("Either MyAnimeList or AniList IDs are required."); + } + + var uuid = UUID.randomUUID().toString(); + var fileKey = "temp/import/" + uuid + "-" + request.originalFilename(); + + var job = + mangaImportJobRepository.save( + MangaImportJob.builder() + .malId(request.malId()) + .aniListId(request.aniListId()) + .originalFilename(request.originalFilename()) + .s3FileKey(fileKey) + .status(ImportJobStatus.PENDING) + .build()); + + var presignedUrl = s3Service.generatePresignedUploadUrl(fileKey); + + return PresignedImportResponseDTO.builder() + .jobId(job.getId()) + .fileKey(fileKey) + .presignedUrl(presignedUrl) + .build(); + } + + @Transactional(propagation = Propagation.REQUIRES_NEW) + public void updateJobStatus(Long jobId, ImportJobStatus status) { + mangaImportJobRepository + .findById(jobId) + .ifPresent( + job -> { + job.setStatus(status); + mangaImportJobRepository.save(job); + }); + } + @Transactional - public void importFile(Long mangaContentProviderId, String filename) { + public void importFile(Long mangaContentProviderId, String filename, Long mangaImportJobId) { + var contentName = removeImportPrefix(removeFileExtension(filename)); + + if (nonNull(mangaImportJobId)) { + var jobOpt = mangaImportJobRepository.findById(mangaImportJobId); + if (jobOpt.isPresent()) { + contentName = removeFileExtension(jobOpt.get().getOriginalFilename()); + } + } + var mangaContent = - contentIngestService.ingest( - mangaContentProviderId, - removeImportPrefix(removeFileExtension(filename)), - null, - "en-US"); + contentIngestService.ingest(mangaContentProviderId, contentName, null, "en-US"); try (var is = s3Service.getFileStream(filename); var zis = new ZipInputStream(is)) { Map entryMap = - new TreeMap<>( - CaseInsensitiveSimpleNaturalComparator - .getInstance()); // TreeMap keeps keys sorted naturally + new TreeMap<>(CaseInsensitiveSimpleNaturalComparator.getInstance()); ZipEntry entry; while ((entry = zis.getNextEntry()) != null) { - if (entry.isDirectory()) continue; + if (entry.isDirectory()) { + continue; + } - ByteArrayOutputStream os = new ByteArrayOutputStream(); + var os = new ByteArrayOutputStream(); zis.transferTo(os); entryMap.put(entry.getName(), os.toByteArray()); zis.closeEntry(); } - int position = 0; - for (Map.Entry sortedEntry : entryMap.entrySet()) { - byte[] bytes = sortedEntry.getValue(); + var position = 0; + for (var sortedEntry : entryMap.entrySet()) { + var bytes = sortedEntry.getValue(); var imageId = imageFetchService.uploadImage(bytes, null, ContentType.CONTENT_IMAGE); var image = imageService.find(imageId); @@ -134,7 +185,7 @@ public class ContentImportService { return filename; } - int lastDotIndex = filename.lastIndexOf('.'); + var lastDotIndex = filename.lastIndexOf('.'); // No dot, or dot is the first character (like .gitignore) if (lastDotIndex <= 0) { diff --git a/src/main/java/com/magamochi/content/task/PendingImportScannerTask.java b/src/main/java/com/magamochi/content/task/PendingImportScannerTask.java new file mode 100644 index 0000000..55175d2 --- /dev/null +++ b/src/main/java/com/magamochi/content/task/PendingImportScannerTask.java @@ -0,0 +1,56 @@ +package com.magamochi.content.task; + +import com.magamochi.catalog.service.MangaContentProviderService; +import com.magamochi.catalog.service.MangaResolutionService; +import com.magamochi.content.model.enumeration.ImportJobStatus; +import com.magamochi.content.model.repository.MangaImportJobRepository; +import com.magamochi.content.queue.command.FileImportCommand; +import com.magamochi.content.queue.producer.FileImportProducer; +import com.magamochi.image.service.S3Service; +import com.magamochi.ingestion.service.ContentProviderService; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +@Log4j2 +@Component +@RequiredArgsConstructor +public class PendingImportScannerTask { + private final MangaImportJobRepository mangaImportJobRepository; + private final S3Service s3Service; + private final MangaResolutionService mangaResolutionService; + private final ContentProviderService contentProviderService; + private final MangaContentProviderService mangaContentProviderService; + private final FileImportProducer fileImportProducer; + + @Scheduled(fixedDelayString = "${tasks.pending-import-scanner.delay:30000}") + public void scanPendingImports() { + var pendingJobs = mangaImportJobRepository.findByStatus(ImportJobStatus.PENDING); + + for (var job : pendingJobs) { + if (!s3Service.objectExists(job.getS3FileKey())) { + return; + } + + log.info("Found file for job {} in S3: {}", job.getId(), job.getS3FileKey()); + + try { + var manga = mangaResolutionService.findOrCreateManga(job.getAniListId(), job.getMalId()); + + var contentProvider = contentProviderService.findManualImportContentProvider(); + var mangaContentProvider = mangaContentProviderService.findOrCreate(manga, contentProvider); + + job.setStatus(ImportJobStatus.PROCESSING); + mangaImportJobRepository.save(job); + + fileImportProducer.sendFileImportCommand( + new FileImportCommand(mangaContentProvider.getId(), job.getS3FileKey(), job.getId())); + } catch (Exception e) { + log.error("Failed to enqueue job {}", job.getId(), e); + job.setStatus(ImportJobStatus.FAILED); + mangaImportJobRepository.save(job); + } + } + } +} diff --git a/src/main/java/com/magamochi/image/config/S3ClientConfig.java b/src/main/java/com/magamochi/image/config/S3ClientConfig.java index 64dbb19..e6dedcc 100644 --- a/src/main/java/com/magamochi/image/config/S3ClientConfig.java +++ b/src/main/java/com/magamochi/image/config/S3ClientConfig.java @@ -9,6 +9,7 @@ import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3Configuration; +import software.amazon.awssdk.services.s3.presigner.S3Presigner; @Configuration public class S3ClientConfig { @@ -34,4 +35,18 @@ public class S3ClientConfig { .serviceConfiguration(configuration) .build(); } + + @Bean + public S3Presigner s3Presigner() { + var credentials = AwsBasicCredentials.create(accessKey, secretKey); + + var configuration = S3Configuration.builder().pathStyleAccessEnabled(true).build(); + + return S3Presigner.builder() + .endpointOverride(URI.create(endpoint)) + .credentialsProvider(StaticCredentialsProvider.create(credentials)) + .region(Region.US_EAST_1) + .serviceConfiguration(configuration) + .build(); + } } diff --git a/src/main/java/com/magamochi/image/service/S3Service.java b/src/main/java/com/magamochi/image/service/S3Service.java index 2adb5a1..125b8f2 100644 --- a/src/main/java/com/magamochi/image/service/S3Service.java +++ b/src/main/java/com/magamochi/image/service/S3Service.java @@ -3,18 +3,23 @@ package com.magamochi.image.service; import static java.util.Objects.nonNull; import java.io.InputStream; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Set; import lombok.Getter; import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.*; +import software.amazon.awssdk.services.s3.presigner.S3Presigner; +import software.amazon.awssdk.services.s3.presigner.model.PutObjectPresignRequest; +@Log4j2 @Service @RequiredArgsConstructor public class S3Service { @@ -26,6 +31,7 @@ public class S3Service { private String baseUrl; private final S3Client s3Client; + private final S3Presigner s3Presigner; public String uploadFile(byte[] data, String contentType, String filename) { var request = @@ -64,14 +70,14 @@ public class S3Service { } final int BATCH_SIZE = 500; - List allObjects = + var allObjects = objectKeys.stream().map(key -> ObjectIdentifier.builder().key(key).build()).toList(); - for (int i = 0; i < allObjects.size(); i += BATCH_SIZE) { + for (var i = 0; i < allObjects.size(); i += BATCH_SIZE) { var end = Math.min(i + BATCH_SIZE, allObjects.size()); - List batch = allObjects.subList(i, end); + var batch = allObjects.subList(i, end); - DeleteObjectsRequest deleteRequest = + var deleteRequest = DeleteObjectsRequest.builder() .bucket(bucket) .delete(Delete.builder().objects(batch).build()) @@ -84,23 +90,17 @@ public class S3Service { response .errors() .forEach( - error -> - System.err.println( - "Error deleting key: " + error.key() + " -> " + error.message())); - } else { - System.out.println( - "Deleted " - + batch.size() - + " objects successfully (batch " - + (i / BATCH_SIZE + 1) - + ")"); + error -> log.error("Error deleting key: {} -> {}", error.key(), error.message())); + + continue; } + + log.info("Deleted {} objects successfully (batch {})", batch.size(), i / BATCH_SIZE + 1); } catch (S3Exception e) { - System.err.println( - "Failed to delete batch starting at index " - + i - + ": " - + e.awsErrorDetails().errorMessage()); + log.error( + "Failed to delete batch starting at index {}: {}", + i, + e.awsErrorDetails().errorMessage()); } } } @@ -110,4 +110,33 @@ public class S3Service { return s3Client.getObject(request); } + + public String generatePresignedUploadUrl(String key) { + try { + var putObjectRequest = PutObjectRequest.builder().bucket(bucket).key(key).build(); + + var presignRequest = + PutObjectPresignRequest.builder() + .signatureDuration(Duration.ofMinutes(60)) + .putObjectRequest(putObjectRequest) + .build(); + + return s3Presigner.presignPutObject(presignRequest).url().toString(); + } catch (Exception e) { + log.error("Error generating presigned url for key: {}", key, e); + throw new RuntimeException("Failed to generate presigned url", e); + } + } + + public boolean objectExists(String key) { + try { + s3Client.headObject(HeadObjectRequest.builder().bucket(bucket).key(key).build()); + return true; + } catch (NoSuchKeyException e) { + return false; + } catch (Exception e) { + log.error("Error checking object existence for key: {}", key, e); + return false; + } + } } diff --git a/src/main/resources/db/migration/V0007__MANGA_IMPORT_JOB.sql b/src/main/resources/db/migration/V0007__MANGA_IMPORT_JOB.sql new file mode 100644 index 0000000..6ee2d3a --- /dev/null +++ b/src/main/resources/db/migration/V0007__MANGA_IMPORT_JOB.sql @@ -0,0 +1,11 @@ +CREATE TABLE manga_import_job +( + id BIGSERIAL PRIMARY KEY, + mal_id BIGINT, + ani_list_id BIGINT, + original_filename VARCHAR(1000) NOT NULL, + s3_file_key VARCHAR(1000) NOT NULL, + status VARCHAR(50) NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +);