Streamsets binlog采集时区问题

通过Streamsets采集mysql binglog增量数据时候,出现数据库中datetime时区问题。

要注意一点是,streamsets的前端展示的时间也是有时区的,后端返回的数据是时间戳,等于做了两次时区的转换

后端binglog时区转换->时间戳->前端时区转换(默认是CST时区),这部门的时区问题涉及到前端的修改,暂时不做,仅修改后端返回的时间戳时区问题

  通过返回的接口查看,差了12个小时

 通过查看streamsets源码可知,binglog用的采集为:mysql-binlog-connector-java GitHub - osheroff/mysql-binlog-connector-java: MySQL Binary Log connector

当前用的streamsets为3.23.0,对于版本为mysql-binlog-connector-java 0.23.4

查找相关的github issue,发现有人遇到了相同的问题

https://github.com/osheroff/mysql-binlog-connector-java/issues/13

其相关修改的代码为:https://github.com/shyiko/mysql-binlog-connector-java/pull/292/commits/5b29c33e0c8643ff3a7b6d315689ccead28b3782

按照其相关commit修改源代码在

AbstractRowsEventDataDeserializer类下

添加方法

 private long convertLocalTimestamp(long millis) {
        TimeZone tz = TimeZone.getDefault();
        Calendar c = Calendar.getInstance(tz);
        long localMillis = millis;
        int offset, time;

        c.set(1970, Calendar.JANUARY, 1, 0, 0, 0);

        // Add milliseconds
        while (localMillis > Integer.MAX_VALUE)
        {
            c.add(Calendar.MILLISECOND, Integer.MAX_VALUE);
            localMillis -= Integer.MAX_VALUE;
        }
        c.add(Calendar.MILLISECOND, (int)localMillis);

        // Stupidly, the Calendar will give us the wrong result if we use getTime() directly.
        // Instead, we calculate the offset and do the math ourselves.
        time = c.get(Calendar.MILLISECOND);
        time += c.get(Calendar.SECOND) * 1000;
        time += c.get(Calendar.MINUTE) * 60 * 1000;
        time += c.get(Calendar.HOUR_OF_DAY) * 60 * 60 * 1000;
        offset = tz.getOffset(c.get(Calendar.ERA), c.get(Calendar.YEAR), c.get(Calendar.MONTH), c.get(Calendar.DAY_OF_MONTH), c.get(Calendar.DAY_OF_WEEK), time);

        return (millis - offset);
    }

修改方法asUnixTime返回值

 protected Long asUnixTime(int year, int month, int day, int hour, int minute, int second, int millis) {
        // https://dev.mysql.com/doc/refman/5.0/en/datetime.html
        if (year == 0 || month == 0 || day == 0) {
            return invalidDateAndTimeRepresentation;
        }
//        return UnixTime.from(year, month, day, hour, minute, second, millis);
        return convertLocalTimestamp(UnixTime.from(year, month, day, hour, minute, second, millis));
    }

重新打包,替换到streamsets相关路径

/streamsets-datacollector/streamsets-libs/streamsets-datacollector-mysql-binlog-lib/lib

重启服务,测试,解决问题