feat:实现相当牛逼的链式反应

This commit is contained in:
wangyu 2021-12-20 11:48:52 +08:00
parent 7deb4fc580
commit 3d617679cd
14 changed files with 302 additions and 44 deletions

View File

@ -7,7 +7,6 @@ import com.flyfish.framework.approval.domain.todo.ApprovalDto;
import com.flyfish.framework.approval.enums.ApproveAction;
import com.flyfish.framework.approval.enums.ApproveStatus;
import com.flyfish.framework.beans.meta.RestBean;
import com.flyfish.framework.domain.base.DomainService;
import com.flyfish.framework.exception.biz.InvalidBusinessException;
import com.flyfish.framework.service.BaseReactiveService;
import com.flyfish.framework.utils.ReflectionUtils;
@ -48,7 +47,7 @@ public class ModuleDelegateService {
.map(clazz -> ClassUtils.isAssignable(clazz, ApprovalDomain.class))
.orElse(false))
.map(service -> (BaseReactiveService<ApprovalDomain>) service)
.collect(Collectors.toMap(DomainService::getCollectionName, s -> s));
.collect(Collectors.toMap(s -> s.getEntityInformation().getCollectionName(), s -> s));
}
/**

View File

@ -1,5 +1,7 @@
package com.flyfish.framework.domain.base;
import org.springframework.data.mongodb.repository.query.MongoEntityInformation;
/**
* 占位标记service
*/
@ -10,5 +12,5 @@ public interface DomainService {
*
* @return 结果
*/
String getCollectionName();
MongoEntityInformation<? extends Domain, String> getEntityInformation();
}

View File

@ -0,0 +1,44 @@
package com.flyfish.framework.domain.po;
import com.flyfish.framework.domain.base.AuditDomain;
import com.flyfish.framework.enums.NamedEnum;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
/**
* 系统备份
*
* @author wangyu
*/
@Getter
@Setter
public class Backup extends AuditDomain {
// 文件路径
private String filepath;
// 备份日志
private String log;
// 备份状态
private Status status;
// 备份耗时
private Long period;
// 备份大小
private String size;
/**
* 备份状态
*/
@Getter
@AllArgsConstructor
public enum Status implements NamedEnum {
RUNNING("备份中"), SUCCESS("备份成功"), FAILED("备份失败");
private final String name;
}
}

View File

@ -1,5 +1,6 @@
package com.flyfish.framework.repository;
import com.flyfish.framework.repository.base.DomainRepository;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.data.repository.NoRepositoryBean;
@ -10,6 +11,7 @@ import org.springframework.data.repository.NoRepositoryBean;
* @param <T> 泛型
*/
@NoRepositoryBean
public interface DefaultReactiveRepository<T> extends ReactiveMongoRepository<T, String>, ReactiveQueryModelExecutor<T> {
public interface DefaultReactiveRepository<T> extends ReactiveMongoRepository<T, String>, ReactiveQueryModelExecutor<T>,
DomainRepository<T> {
}

View File

@ -1,6 +1,7 @@
package com.flyfish.framework.repository;
import com.flyfish.framework.domain.base.Domain;
import com.flyfish.framework.repository.base.DomainRepository;
import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.data.repository.NoRepositoryBean;
@ -10,6 +11,7 @@ import org.springframework.data.repository.NoRepositoryBean;
* @author wangyu
*/
@NoRepositoryBean
public interface DefaultRepository<T extends Domain> extends MongoRepository<T, String>, QueryModelExecutor<T> {
public interface DefaultRepository<T extends Domain> extends MongoRepository<T, String>, QueryModelExecutor<T>,
DomainRepository<T> {
}

View File

@ -89,11 +89,4 @@ public interface QueryModelExecutor<T> {
* @return {@literal true} if the data store contains elements that match the given {@link Qo}.
*/
boolean exists(Qo<T> query);
/**
* 获取集合名称
*
* @return 结果
*/
String getCollectionName();
}

View File

@ -99,11 +99,4 @@ public interface ReactiveQueryModelExecutor<T> {
* @return 结果
*/
Mono<Void> deleteAll(Qo<T> qo);
/**
* 获取集合名称
*
* @return 结果
*/
String getCollectionName();
}

View File

@ -0,0 +1,18 @@
package com.flyfish.framework.repository.base;
import org.springframework.data.mongodb.repository.query.MongoEntityInformation;
/**
* 标记实体的仓库
*
* @param <T> 泛型
*/
public interface DomainRepository<T> {
/**
* 获取集合名称
*
* @return 结果
*/
MongoEntityInformation<T, String> getEntityInformation();
}

View File

@ -3,6 +3,7 @@ package com.flyfish.framework.repository.impl;
import com.flyfish.framework.domain.base.Domain;
import com.flyfish.framework.domain.base.Qo;
import com.flyfish.framework.repository.DefaultReactiveRepository;
import lombok.Getter;
import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
import org.springframework.dao.IncorrectResultSizeDataAccessException;
@ -28,6 +29,7 @@ import java.util.Optional;
public class DefaultReactiveRepositoryImpl<T extends Domain> extends SimpleReactiveMongoRepository<T, String>
implements DefaultReactiveRepository<T> {
@Getter
private final MongoEntityInformation<T, String> entityInformation;
private final ReactiveMongoOperations mongoOperations;
@ -177,14 +179,4 @@ public class DefaultReactiveRepositoryImpl<T extends Domain> extends SimpleReact
entityInformation.getCollectionName()))
.then(Mono.empty());
}
/**
* 获取集合名称
*
* @return 结果
*/
@Override
public String getCollectionName() {
return entityInformation.getCollectionName();
}
}

View File

@ -3,6 +3,7 @@ package com.flyfish.framework.repository.impl;
import com.flyfish.framework.domain.base.Domain;
import com.flyfish.framework.domain.base.Qo;
import com.flyfish.framework.repository.DefaultRepository;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.springframework.dao.IncorrectResultSizeDataAccessException;
import org.springframework.data.domain.Page;
@ -30,6 +31,7 @@ public class DefaultRepositoryImpl<T extends Domain> extends SimpleMongoReposito
implements DefaultRepository<T> {
private MongoOperations mongoOperations;
@Getter
private MongoEntityInformation<T, String> entityInformation;
/**
@ -168,15 +170,4 @@ public class DefaultRepositoryImpl<T extends Domain> extends SimpleMongoReposito
entityInformation.getJavaType(), entityInformation.getCollectionName()))
.orElse(false);
}
/**
* 获取集合名称
*
* @return 结果
*/
@Override
public String getCollectionName() {
return entityInformation.getCollectionName();
}
}

View File

@ -0,0 +1,55 @@
package com.flyfish.framework.scheduler;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.nio.ByteBuffer;
import java.util.List;
/**
* 备份索引
*
* @author wangyu
*/
@Data
public class BackupIndex {
// 备份uuid与数据库对应
private String id;
// 创建时间
private String createTime;
// 备份条目
private List<BackupItem> items;
/**
* 备份条目
*/
@Data
public static class BackupItem {
// 集合名称
private String collection;
// 文件路径
private String path;
// 备份大小
private String size;
}
/**
* 备份内容
*/
@Data
@AllArgsConstructor
public static class BackupContent {
// 集合名称
private String collection;
// 内容
private byte[] content;
}
}

View File

@ -0,0 +1,165 @@
package com.flyfish.framework.scheduler;
import com.alibaba.fastjson.JSON;
import com.flyfish.framework.domain.base.DomainService;
import com.flyfish.framework.domain.po.Backup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
import org.springframework.data.mongodb.repository.query.MongoEntityInformation;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.unit.DataSize;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
/**
* 备份因功能小而简单作为一个bean一直存在定时跑就可以
*
* @author wangyu
*/
@Component
@Slf4j
public class BackupScheduler {
// data buffer 工厂
private final DataBufferFactory factory = new DefaultDataBufferFactory();
// 用于注入所有集合名称
private List<MongoEntityInformation<?, String>> collections;
// 异步的mongo操作可以快速备份
private ReactiveMongoOperations operations;
// 备份路径
private String backupPath = "/opt/flyfish/backup";
@Autowired
public void setServices(ObjectProvider<DomainService> services) {
this.collections = services.stream().map(DomainService::getEntityInformation).collect(Collectors.toList());
}
@Autowired
public void setOperations(ReactiveMongoOperations reactiveMongoOperations) {
this.operations = reactiveMongoOperations;
}
@Autowired
public void setBackupPath(@Value("${flyfish.backup.path}") String backupPath) {
if (StringUtils.isNotBlank(backupPath)) {
this.backupPath = backupPath;
}
}
/**
* 每天凌晨2点执行
* 按日期建立目录备份所有集合到对应目录下
* 建立index.json将绝对路径放入遍历该文件即可还原
*/
@Scheduled(cron = "0 0 2 * * ?")
public void backup() {
// 本次备份任务代号
String code = UUID.randomUUID().toString();
// 本次备份父目录
String parent = backupPath + "/" + code;
createIfNotExists(parent);
// 开始备份先构造一个指示器
Backup backup = new Backup();
backup.setCreateTime(new Date());
backup.setCreator("系统");
backup.setCode(code);
backup.setStatus(Backup.Status.RUNNING);
backup.setFilepath(parent + "/back.zip");
// 开始备份
operations.save(backup)
.thenMany(Flux.fromIterable(this.collections))
.flatMap(info -> operations
.findAll(info.getJavaType(), info.getCollectionName()).collectList()
.map(list -> new BackupIndex.BackupContent(info.getCollectionName(), JSON.toJSONBytes(list)))
)
.flatMap(content -> writeContents(content, parent))
.collectList()
.flatMap(list -> {
BackupIndex index = new BackupIndex();
index.setId(code);
index.setItems(list.stream().map(content -> {
DataSize size = DataSize.parse(new String(content.getContent(), StandardCharsets.UTF_8));
BackupIndex.BackupItem item = new BackupIndex.BackupItem();
item.setCollection(content.getCollection());
item.setPath(parent + "/" + content.getCollection() + ".json");
item.setSize(size.toKilobytes() + "KB");
return item;
}).collect(Collectors.toList()));
// 写入备份结果
return write(JSON.toJSONBytes(index), parent + "/meta.json");
})
.then(Mono.defer(() -> {
backup.setLog("成功备份");
backup.setPeriod(new Date().getTime() - backup.getCreateTime().getTime());
backup.setStatus(Backup.Status.SUCCESS);
return operations.save(backup);
}))
.subscribe(v -> {
log.info("成功完成备份");
}, e -> {
backup.setStatus(Backup.Status.FAILED);
backup.setLog(e.getMessage());
backup.setPeriod(new Date().getTime() - backup.getCreateTime().getTime());
operations.save(backup).subscribe();
});
}
/**
* 如果不存在就创建
*
* @param filepath 路径
*/
@SneakyThrows
private void createIfNotExists(String filepath) {
Path path = Paths.get(filepath);
if (!Files.exists(path)) {
Files.createDirectories(path);
}
}
/**
* 写入文件内容
*
* @param content 文件内容
* @return 结果
*/
private Mono<BackupIndex.BackupContent> writeContents(BackupIndex.BackupContent content, String parent) {
String path = parent + "/" + content.getCollection() + ".json";
return write(content.getContent(), path).thenReturn(content);
}
/**
* 写入文件
*
* @param content 内容
* @param path 路径
* @return 结果
*/
private Mono<Void> write(byte[] content, String path) {
return Mono.fromCallable(() -> AsynchronousFileChannel.open(Paths.get(path), StandardOpenOption.WRITE))
.flatMapMany(channel -> DataBufferUtils.write(Mono.just(factory.wrap(content)), channel, 0))
.then();
}
}

View File

@ -18,6 +18,7 @@ import org.springframework.data.domain.Example;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.Pageable;
import org.springframework.data.mongodb.repository.query.MongoEntityInformation;
import org.springframework.data.util.CastUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -411,8 +412,8 @@ public class BaseReactiveServiceImpl<T extends Domain> implements BaseReactiveSe
* @return 结果
*/
@Override
public String getCollectionName() {
return getRepository().getCollectionName();
public MongoEntityInformation<? extends Domain, String> getEntityInformation() {
return repository.getEntityInformation();
}
}

View File

@ -16,6 +16,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Example;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.mongodb.repository.query.MongoEntityInformation;
import java.util.*;
import java.util.stream.Collectors;
@ -381,7 +382,7 @@ public class BaseServiceImpl<T extends Domain> implements BaseService<T> {
* @return 结果
*/
@Override
public String getCollectionName() {
return getRepository().getCollectionName();
public MongoEntityInformation<? extends Domain, String> getEntityInformation() {
return repository.getEntityInformation();
}
}