flink operator state,Flink Sink Hbase

 2023-12-12 阅读 31 评论 0

摘要:文章目錄Flink將數據落地HbaseFlink主類反序列化BeanUtilHTableBase接口UserHTableHbaseBaseMapHtableRowHbaseSink Flink將數據落地Hbase Flink主類 package flink.sink2hbase;import flink.sink2hbase.deserialization.JsonDeserializationSchema; import flink.sink.Hbase

文章目錄

  • Flink將數據落地Hbase
    • Flink主類
    • 反序列化
    • BeanUtil
    • HTableBase接口
    • UserHTable
    • HbaseBaseMap
    • HtableRow
    • HbaseSink

Flink將數據落地Hbase

Flink主類

package flink.sink2hbase;import flink.sink2hbase.deserialization.JsonDeserializationSchema;
import flink.sink.HbaseSinkFunction;
import flink.sink2hbase.map.HTableBaseMap;
import flink.sink2hbase.table.UserHTable;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import flink.sink2hbase.pojo.User;
import util.BeanUtil;
import util.Property;import java.util.Properties;public class FlinkSinkHbase {private static OutputTag<UserHTable> userOutputTag = new OutputTag<>("用戶表", TypeInformation.of(UserHTable.class));public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);Properties prop = Property.getKafkaProperties();prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"g1");// 將從kafka中讀取過來的Json串反序列化成User對象FlinkKafkaConsumer<User> consumer = new FlinkKafkaConsumer<>("test",new JsonDeserializationSchema<>(User.class),prop);DataStreamSource<User> mainStream = env.addSource(consumer);SingleOutputStreamOperator<User> dataStream = mainStream.process(new ProcessFunction<User, User>() {@Overridepublic void processElement(User user,Context context,Collector<User> collector) throws Exception {UserHTable userHTable = UserHTable.builder().build();// 將對象轉換成HTable對象BeanUtil.copy(userHTable,user);context.output(userOutputTag,userHTable);}});dataStream.getSideOutput(userOutputTag).map(new HTableBaseMap<>()).addSink(new HbaseSinkFunction("t_user","info"));env.execute("mainStream");}
}

反序列化

package flink.sink2hbase.deserialization;import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import java.io.IOException;@Slf4j
public class JsonDeserializationSchema<T> implements DeserializationSchema<T> {private Class<T> clazz;public JsonDeserializationSchema(Class<T> clazz) {this.clazz = clazz;}@Overridepublic T deserialize(byte[] bytes) throws IOException {try {return JSON.parseObject(new String(bytes), clazz);} catch (Throwable t) {log.error("json parse error", t.getCause());}return null;}@Overridepublic boolean isEndOfStream(T t) {return false;}@Overridepublic TypeInformation<T> getProducedType() {return TypeExtractor.getForClass(clazz);}
}

BeanUtil

package util;import exception.BeanUtilException;
import org.apache.commons.beanutils.BeanUtils;public class BeanUtil {public static void copy(Object dest, Object orig){try {BeanUtils.copyProperties(dest,orig);} catch (Exception e) {throw new BeanUtilException(e.getMessage(),e.getCause());}}
}

HTableBase接口

package flink.sink2hbase.table;public interface HTableBase {byte[] rowKey(byte[] prefix);
}

UserHTable

package flink.sink2hbase.table;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.Serializable;@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class UserHTable implements Serializable,HTableBase {private String name;private int age;@Overridepublic byte[] rowKey(byte[] prefix) {return Bytes.toBytes(name);}
}

HbaseBaseMap

package flink.sink2hbase.map;import flink.sink2hbase.table.HTableBase;
import flink.sink2hbase.table.HTableRow;
import org.apache.flink.api.common.functions.MapFunction;public class HTableBaseMap<T extends HTableBase> implements MapFunction<T, HTableRow> {@Overridepublic HTableRow map(T hTableBase) throws Exception {return HTableRow.builder(hTableBase,null);}
}

HtableRow

package flink.sink2hbase.table;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import util.ByteUtil;import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;@Slf4j
@Data
@AllArgsConstructor
public class HTableRow implements Serializable {private byte[] row;private Map<String, byte[]> colValueMap;public static HTableRow builder(HTableBase src, byte[] prefix) {byte[] rowKey = src.rowKey(prefix);Map<String, byte[]> colValueMap = Arrays.stream(src.getClass().getDeclaredFields()).filter(f -> fieldValue(src, f) != null).collect(Collectors.toMap(f -> f.getName(), f -> fieldValue(src, f)));return new HTableRow(rowKey,colValueMap);}public static byte[] fieldValue(Object src, Field f) {f.setAccessible(true);try {return ByteUtil.toByte(f.get(src));} catch (IllegalAccessException e) {log.error("Method fieldValue get value failed", e.getCause());e.printStackTrace();}return null;}
}

HbaseSink

package flink.sink;import flink.sink2hbase.table.HTableRow;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import util.Property;import java.util.Map;@Slf4j
public class HbaseSinkFunction extends RichSinkFunction<HTableRow> {private Connection conn = null;private byte[] cfBytes;private String tableNameStr;private BufferedMutator mutator;private int count;public HbaseSinkFunction(String tableName,String cFamily) {this.tableNameStr = tableName;this.cfBytes = Bytes.toBytes(cFamily);}@Overridepublic void open(org.apache.flink.configuration.Configuration parameters) throws Exception {super.open(parameters);Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum", Property.getContextProperties().getProperty("hbase.zookeeper.quorum"));conf.set("hbase.zookeeper.property.clientPort", Property.getContextProperties().getProperty("hbase.zookeeper.property.clientPort"));conn = ConnectionFactory.createConnection(conf);TableName tableName = TableName.valueOf(tableNameStr);// 更加高效的插入HbaseBufferedMutatorParams params = new BufferedMutatorParams(tableName);params.maxKeyValueSize(10485760);params.writeBufferSize(1024 * 1024);mutator = conn.getBufferedMutator(params);count = 0;}@Overridepublic void close() throws Exception {super.close();if(conn != null){conn.close();}}@Overridepublic void invoke(HTableRow hTableRow, Context context) throws Exception {Put put = new Put(hTableRow.getRow());for (Map.Entry<String, byte[]> entry : hTableRow.getColValueMap().entrySet()) {if (entry.getValue() != null)put.addColumn(cfBytes, Bytes.toBytes(entry.getKey()), entry.getValue());}log.info("put into hbase: \n" + put.toJSON());mutator.mutate(put);if (count >= 100) {mutator.flush();count = 0;}count++;}
}

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://808629.com/196341.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 86后生记录生活 Inc. 保留所有权利。

底部版权信息