电影网站源码系统,wordpress 安装乱码,如何管理网站域名,网站前端是做啥的项目场景#xff1a;
项目中需要把Mysql数据同步到ElasticSearch中 问题描述
数据传输过程中数据不时出现丢失的情况#xff0c;偶尔会丢失一部分数据#xff0c;本地测试也无法复现#xff0c;后台程序也没有报错#xff0c;一到正式环境就有问题,很崩溃
这里是批量操…项目场景
项目中需要把Mysql数据同步到ElasticSearch中 问题描述
数据传输过程中数据不时出现丢失的情况偶尔会丢失一部分数据本地测试也无法复现后台程序也没有报错一到正式环境就有问题,很崩溃
这里是批量操作的代码
private void bulk(ListIndexRequest indexRequests) throws Exception {try {// 在这里可以对你获取到的批量结果数据进行需要的业务处理BulkProcessor bulkProcessor BulkProcessor.builder((req, bulkListener) - restHighLevelClient.bulkAsync(req, RequestOptions.DEFAULT, bulkListener),new BulkProcessor.Listener() {private int totalCount 0;Overridepublic void beforeBulk(long executionId, BulkRequest request) {}Overridepublic void afterBulk(long executionId, BulkRequest request, BulkResponse response) {// 统计条数并输出信息int count response.getItems().length;totalCount count;log.info(批量操作 [{}] 成功执行了{}条请求共处理了{}条数据, executionId, count, totalCount);}Overridepublic void afterBulk(long executionId, BulkRequest request, Throwable failure) {log.error(数据处理失败,执行id为{}错误信息为{}, executionId, failure);}}).setConcurrentRequests(esproperties.getThreadSize())/*并发请求的数量。默认为1。*/.setFlushInterval(TimeValue.timeValueSeconds(30)) // 固定30s必须刷新一次.setBulkSize(new ByteSizeValue(10L, ByteSizeUnit.MB)) // 5MB batch size.setBulkActions(esproperties.getBulkActions()) // 每次执行最多处理5000个请求.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)).build();indexRequests.forEach(bulkProcessor::add);bulkProcessor.close();} catch (Exception e) {e.printStackTrace();throw new Exception(e);}}原因分析 当时想到的问题是这里是不是数据格式有问题因为采用的是异步就是错误了也不会影响到其它数据的插入 接着就定位到了这段代码想想是不是哪里没有处理错误的数据信息所以没有打印出来果然发现了BulkResponse 这个类是可以处理每个错误信息的接着就优化了代码如下
其实只需要修改afterBulk 方法遍历出现的异常就能够打印出导入不进去的错误信息 Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {// 统计条数并输出信息
// int count response.getItems().length;
// totalCount count;
// log.info(批量操作 [{}] 成功执行了{}条请求共处理了{}条数据, executionId, count, totalCount);if (response.hasFailures()){for (BulkItemResponse itemResponse : response) {if (itemResponse.isFailed()) {log.info(数据写入失败错误信息为{},itemResponse.getFailureMessage());}}
// log.info(数据写入失败{},response.buildFailureMessage());}
}解决方案 接着修改代码后把新的包放上去执行终于找到了错误信息 下面是错误信息的截图
报错 Limit of total fields 1000 这里就能看出来是字段数量大于1000了因为我的是宽表而之前创建的索引字段数量都是小于1000的新的索引结构数量大于1000找到问题就好办了
在kibana执行下面脚本修改字段限制根据实际情况来没有kibana就写出curl 请求
PUT 你的索引名/_settings
{index: {mapping.total_fields.limit: 2000}
}总结
没有测试好宽表字段比较多的情况写代码的时候以为很简单不会出现问题所以日志也比较随便。日常开发要打印好日志它能够在出现错误的情况下很快的帮我们定位出问题所在。