Tensorflow2.0 特征处理

0 / 1057

一、编写数据集导入函数,此函数必须返回2个对象:

1. 字典为 {"特征名称": 特征值Tensor 或 SparseTensor}
2. label的Tensor列表

格式:

def input_fn(dataset):
    ...
    return feature_dict, label    # 这2组合为张量迭代器, 就是 dataset

二、 定义特征列( tf.feature_column)

a = tf.feature_column.numeric_column('a')
b = tf.feature_column.numeric_column('b')
c = tf.feature_column.numeric_column('c', normalizer_fn=lambda x: x)

三、实例化相关的预创建的 Estimator

estimator = tf.estimator.LinearClassifier(
    feature_columns=[a,b,c]
)

四、调用训练、评估或推理方法

.train(数据集处理函数句柄,steps=None)
estimator.predict(数据集处理函数句柄,predict_keys=None, steps=None)
.estimator.evaluate(数据集处理函数句柄,steps=None)

常用 estimator

tf.estimator.DNNClassifier(hidden_units,feature_columns)
tf.estimator.DNNRegressor(hidden_units,feature_columns)

tf.estimator.LinearClassifier(feature_columns)
tf.estimator.LinearRegressor(feature_columns)

tf.estimator.DNNLinearCombinedClassifier()
tf.estimator.DNNLinearCombinedRegressor()

常用 feature_column(指定estimator字段的数据类型)

特征处理几种方式

  • 特征归一化(针对连续型)
  • 特征离散化:很少直接使用连续值作为特征,而是将特征离散化后再输入到模型中。离散化特征对于异常值具有更好的鲁棒性,可以为特征引入非线性的能力。并且,离散化可以更好的进行Embedding,可以通过👉等频分桶对特征值离散化
  • 特征组合:,生成更丰富的行为表征,可加速模型的收敛速度
  • Embedding

分桶,交叉特征

age_buckets = tf.feature_column.bucketized_column(
    age, boundaries=[18, 25, 30, 35, 40, 45, 50, 55, 60, 65])
crossed_columns = [
    tf.feature_column.crossed_column(
        ['education', 'occupation'], hash_bucket_size=1000),
    tf.feature_column.crossed_column(
        [age_buckets, 'education', 'occupation'], hash_bucket_size=1000),
]

tf.feature_column

连续值类型特征 转向量

features = {"price": [[1.], [5.]]}
price = tf.feature_column.numeric_column("price")

将连续值类型特征映射为one-hot(需2步)

aa = tf.feature_column.categorical_column_with_identity(
    key='连续整数特征列',
    num_buckets= num # num_buckets 要 大于 此连续整数列里面所有数的最大值
)

👉注:categorical_column_with_identity这种方式输出结果为 类似SparseTensor的稀疏矩阵。
👉所以我们需要用 tf.feature_column.indicator_column 将稀疏矩阵转为 正常的one-hot👇

tf.feature_column.indicator_column( aa ) # 参数是上面identity包装输出的稀疏矩阵

分桶将连续值特征 按照 数值到校 分段 映射为 离散值类型的one-hot特征列

目标格式:

日期范围	                 表示为…
< 1960 年	                [1, 0, 0, 0]
>= 1960 年 and < 1980 年	[0, 1, 0, 0]
>= 1980 年 and < 2000 年	[0, 0, 1, 0]
>= 2000 年	                [0, 0, 0, 1]

实现:

# 首先,将原始输入转换为一个numeric column
numeric_feature_column = tf.feature_column.numeric_column("Year")

# 然后,按照边界[1960,1980,2000]将numeric column进行bucket,然后自动one-hot
bucketized_feature_column = tf.feature_column.bucketized_column(
    source_column = numeric_feature_column,
    boundaries = [1960, 1980, 2000])

上面说的是连续值类型特征,下面开始聊离散值类型特征及其扩展转化👇

离散值类型特征 转向量 👇

离散值类型特征 多半是不规则数据,所以无法直接转对应的离散向量。
但是可以通过 哈希散列 和 词表散列的方式转换 👇

哈希散列 (离散文本被散列离散为 离散值类型的one-hot特征列)

当类别的数量特别大时,若将每个词汇或整数设置单独的类别,会消耗非常大的内存。
我们可大致给定类别值,让模型自己去做hash,然后映射到设定的类别(桶)中,并学习拟合到这些类别

hashed_feature_column =
    tf.feature_column.categorical_column_with_hash_bucket(
        key = "some_feature",    # 指定的列为字符串组成的列,👉字符串转为ID
        hash_bucket_size = random_n
    ) 
# 假如 random_n 为 100,那么最终的离散值就会随机选取[0-100)闭开区间的 K 个值]
# K为输入的数据类型保持一致的数
# 👉此结果依然是稀疏向量, 需要转为稠密向量 one-hot👇

和将连续值类型特征映射为one-hot的做法类似:
👉我们需要用 tf.feature_column.indicator_column 将稀疏矩阵转为 正常的one-hot👇
具体例子如下:

tf.feature_column.indicator_column(hashed_feature_column)

👉这里还有一个做法,可以把上面的稀疏矩阵转为一个具体的Tensor。但不是one-hot:

import tensorflow as tf
from tensorflow.python.feature_column import feature_column_lib

feature_cache = feature_column_lib.FeatureTransformationCache(features={
    # feature对应的值可以为Tensor,也可以为SparseTensor
    "feature": tf.constant(value=
        [
        ["a", "b"], ["c", "d"]
        ],
    )
})
id_weight_pair = tf.feature_column.categorical_column_with_hash_bucket(
    key='feature',
    hash_bucket_size=100
).get_sparse_tensors(feature_cache, None)
sparse_tensor = id_weight_pair.id_tensor   # id_weight_pair 转为 SparseTensor
tensor = tf.sparse.to_dense(sparse_tensor)
print(tensor)

# 输出结果
# tf.Tensor(
# [[39 22]
#  [72 65]], shape=(2, 2), dtype=int64)

词表散列 (离散文本被散列离散为整数)

vocabulary_feature_column =
    tf.feature_column.categorical_column_with_vocabulary_list(
        key='some_feature',
        vocabulary_list=["哈", "蜜", "瓜"],
        # num_oov_buckets=2,
        # default_value=-1,
)

👉Note: 如果num_oov_buckets这个参数不传,则代表vocabulary_list列表中的vocab被一一映射为和categorical_column_with_hash_bucket类似的效果
👉和categorical_column_with_hash_bucket稍有不同的是:

categorical_column_with_vocabulary_list不是随机映射的
而是从0开始  [0,1,2]对应哈,密,瓜,三个字

👉至于参数 num_oov_buckets=10,它代表着,如果vocabulary_list的词不在 some_feature这个列的值中,那么则会用 hash算法对vocabulary_list中的(不在 some_feature这个列)的值做增扩散列,num_oov_buckets=2 传的是2, 则可能被随机散列为 3或者4,因为(0,1,2)都被占用了。
👉至于参数default_value=-1,代表如果vocabulary_list的词不在 some_feature这个列的值中,那么会被散列,且散列的值设为-1

👉同样的和categorical_column_with_hash_bucket一样,结果也是稀疏矩阵。

所以需要用feature_column把其转为one-hot:

tf.feature_column.indicator_column(vocabulary_feature_column)

将离散文本特征转为embedding特征

👉前提:是在做过categorical_column_with_vocabulary_list 或categorical_column_with_hash_bucket 的离散化基础上,再继续embedding的。
写法:

tf.feature_column.embedding_column(被离散后的特征变量, dimension=5)
# dimension=5 代表转换后每个样本词有5维度的特征。 shape即 [样本词数, 5]

eg:

vocabulary_feature_column = \
    tf.feature_column.categorical_column_with_vocabulary_list(
        key='some_feature',
        vocabulary_list=["哈", "蜜", "瓜"],
        # num_oov_buckets=2,
        # default_value=-1,
)
tf.feature_column.embedding_column(vocabulary_feature_column, dimension=3)

交叉特征

👉Note: 要注意,特征交叉是对应维度做交叉,一般是只对特征做交叉,样本个数不变,eg:

a = [
    [1,2], 
    [3,4] 
]
b = [ 
    [5,6], 
    [7,8] 
]
a与b交叉后得到👇:
[
    [1,5], [1,6], [2,5], [2,6],
    [3,7], [3,8], [4,7], [4,8],
]

tf.feature_column.crossed_column使用例子:

import tensorflow as tf
import tensorflow.feature_column as fc

actual_sex = {'sex': tf.Variable(['male', 'female', 'female', 'male'], dtype=tf.string)}
actual_nationality = {'nationality': tf.Variable(['belgian', 'french', 'belgian', 'belgian'], dtype=tf.string)}
actual_sex_nationality = dict(actual_sex, **actual_nationality)

# hashed_column
sex_hashed_raw = fc.categorical_column_with_hash_bucket("sex", 10)
sex_hashed = fc.indicator_column(sex_hashed_raw)

# crossed column
crossed_sn_raw = fc.crossed_column(['sex', 'nationality'], hash_bucket_size=20)
crossed_sn = fc.indicator_column(crossed_sn_raw) # 交叉后是稀疏矩阵,转稠密

👉Note1: fc.numeric_column不能直接做交叉,需要先转为离散,然后再交叉
👉Note2: hash_bucket_size代表特征交叉后,用hash函数将每个样本的交叉特征映射为0-19的数字
👉Note3: 交叉后的特征为sparse矩阵, 需要用indicator_column转为one-hot或multi-hot形式的dense(稠密)矩阵,(具体是one-hot还是multi-hot,根据稀疏矩阵的格式来转)

将特征输入到模型的2种方式

一. tk.layers.DenseFeatures()

numerical_columns 是没有做任何处理的,就可以直接输入到DNN中

categorical_columns,是经过embedding处理后的

👉Note:训练得出的 User Embed 和 Item Embed 不能直接运算,因为不在同一个向量空间

preprocessing_layer = tf.keras.layers.DenseFeatures(numerical_columns + categorical_columns)

model = tf.keras.Sequential([
    preprocessing_layer,
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(1, activation='sigmoid'),
])

model.compile(
    loss='binary_crossentropy',
    optimizer='adam',
    metrics=['accuracy']
)

model.fit(train_dataset, epochs=10)

test_loss, test_accuracy = model.evaluate(test_dataset)

print('\n\nTest Loss {}, Test Accuracy {}'.format(test_loss, test_accuracy)

二. tf.estimator.模型()

下面会着重说这个方式,此处略

feature_column实例

a = tf.feature_column.numeric_column("数值字段")

# 已知类别个数
b = tf.feature_column.categorical_column_with_vocabulary_file(
    "类别字段名称",
    ["类别", "类别2",...,"类别n"]  # 把所有类别字符串转为 ID 0,1,2,...,n
)

# 和上述类似,未知类别个数时,用分桶
c = tf.feature_column.categorical_column_with_hash_bucket(
    "类别字段名称",
    hash_bucket_size=2000       # 越大,冲突越少
)

最后将 feature_column的结果,组装成列表, 返回给模型构造。
👉eg:

feature_columns = [a, b, c]
classifier = tf.estimator.LinearClassifier(feature_columns)

👉正式训练模型:

classifier.train(数据集处理函数句柄(就是上面返回dataset的那个),steps=None)

👉上面的dataset函数句柄如下(官方CSV操作实例):

TRAIN_DATA_URL = "https://storage.googleapis.com/tf-datasets/titanic/train.csv"
TEST_DATA_URL = "https://storage.googleapis.com/tf-datasets/titanic/eval.csv"

# 把数据下载到此路径下
train_file_path = tf.keras.utils.get_file("train.csv", TRAIN_DATA_URL)
# 'C:\\Users\\lin\\.keras\\datasets\\train.csv'
test_file_path = tf.keras.utils.get_file("eval.csv", TEST_DATA_URL)
# 'C:\\Users\\lin\\.keras\\datasets\\eval.csv'

CSV_COLUMNS = ['survived', 'sex', 'age', 'n_siblings_spouses', 'parch', 'fare', 'class', 'deck', 'embark_town', 'alone']
LABEL_COLUMN = 'survived'
LABELS = [0, 1]

def get_dataset(file_path):
    """
    真正操作CSV的函数
    """
    dataset = tf.data.experimental.make_csv_dataset(
        file_path,
        batch_size=12,
        column_names=CSV_COLUMNS,
        label_name=LABEL_COLUMN,
        na_value="?",
        num_epochs=1,
        ignore_errors=True)
    return dataset
    # 只要我们将  column_names, label_name 这2个字段指定好
    # make_csv_dataset 将会自动帮我们组装 dataset格式 (包括 feature_dict 和 lable的迭代器)并return

👉因为要传函数句柄(就是函数名),而不是调用,所以额外的参数,需要用到偏函数解决

from functools import partial
# 读取上面路径下的CSV
train_func = partial(get_dataset, train_file_path)
test_func = partial(get_dataset, test_file_path)
classifier.train(train_func)
result = classifier.evaluate(test_func) 
print(result)

tf.data(文件)都是 dataset的子集

tf.data.TextLineDataset - 从文本文件中读取行。
tf.data.TFRecordDataset - 从 TFRecord 文件中读取记录。
tf.data.FixedLengthRecordDataset - 从二进制文件中读取具有固定大小的记录。

dataset转iter

dataset1 = tf.data.Dataset.range(10)
c = dataset1.__iter__()
print(c.get_next()) 
print(c.get_next())
print(c.get_next())

👉Note: 不要用next() 也不要用 next()
👉注意:如果 iter() 找不到 get_next()
👉那么就直接用下面即可

iterator = tf.compat.v1.data.make_one_shot_iterator(dataset)
dataset = iterator.get_next()
return dataset

tensorboard

使用方式1:keras callback

训练出模型,直接用tensorboard命令接口,前提需要在具有 模型文件的目录下。model.ckpt。

log_dir="logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
callbacks = [ # put it to fit()
    keras.callbacks.TensorBoard(log_dir),
    keras.callbacks.ModelCheckpoint(save_model_file,save_best_only=True),
    keras.callbacks.EarlyStopping(patience=5, min_delta=1e-3)
]
# tensorboard --logdir "logs/fit"

使用方式2:(基于Tensorflow2.0 的 gradient)

https://www.tensorflow.org/tensorboard/get_started?hl=zh-cn

写入TFRecords

Note: 一个Example就是👉一个 样本

第一步,生成TFRecord Writer

writer = tf.data.experimental.TFRecordWriter(path, compression_type=None)
# path:TFRecord文件的存放路径;
# compression_type:定义TFRecord文件保存的压缩格式;

也可以用纯python接口(好处就是可以用with上下文):

with tf.io.TFRecordWriter() as writer:
    xxx

第二步,tf.train.Feature生成协议信息

feature是一个字典值,它是将某个类型列表编码成特定的feature格式,而该字典键用于读取TFRecords文件时索引得到不同的数据,某个类型列表可能包含零个或多个值,列表类型如下三种:

tf.train.BytesList(value=[value]) # value转化为字符串(二进制)列表
tf.train.FloatList(value=[value]) # value转化为浮点型列表
tf.train.Int64List(value=[value]) # value转化为整型列表

其中,value是你要保存的数据。内层feature编码方式:

feature_internal = {
    "width":tf.train.Feature(int64_list=tf.train.Int64List(value=[width])),
    "weights":tf.train.Feature(float_list=tf.train.FloatList(value=[weights])),
    "image_raw":tf.train.Feature(bytes_list=tf.train.BytesList(value=[image_raw]))
}

外层features再将内层字典编码:

features_extern = tf.train.Features(feature_internal)
  • tf.train.Feature这个接口可以编码封装列表类型和字典类型,内层用的是tf.train.Feature
  • 外层使用tf.train.Features

第三步,使用tf.train.Example将features编码数据封装成特定的PB协议格式

example = tf.train.Example(features_extern)

第四步,将example数据系列化为字符串

example_str = example.SerializeToString()

第五步,将系列化为字符串的example数据写入协议缓冲区

writer.write(example_str)

以上5步合写

writer = tf.data.experimental.TFRecordWriter(path, compression_type=None)
example = tf.train.Example(
    tf.train.Features(
        {
            "width": tf.train.Feature(int64_list=tf.train.Int64List(value=[width])),
            "weights": tf.train.Feature(float_list=tf.train.FloatList(value=[weights])),
            "image_raw": tf.train.Feature(bytes_list=tf.train.BytesList(value=[image_raw]))
        }
    )
)
example_str = example.SerializeToString()  # 将example数据系列化为字符串
writer.write(example_str)

正式将业务数据写入到TFRecord

  1. 将特征中心的表读出(121个特征)
  2. 将click列 和 合并好的feature列用select['click','feature']出来,并 collect()取值
  3. 将取出的值转为 pandas 的 df
  4. 将df存储到 TFRecord

eg:

writer = tf.data.experimental.TFRecordWriter(path, compression_type=None)
click = click_batch[i]
feature = feature_batch[i].tostring()
# [18.0, 0.09475817797242475, 0.0543921297305341...

# 构造example,int64, float64, bytes
example = tf.train.Example(features=tf.train.Features(feature={
    "label": tf.train.Feature(int64_list=tf.train.Int64List(value=[click])),
    "feature": tf.train.Feature(bytes_list=tf.train.BytesList(value=[feature]))
}))

# 序列化example,写入文件
writer.write(example.SerializeToString())

读取 TFRecord

FEATURE_COLUMNS = ['channel_id', 'vector', 'user_weigths', 'article_weights']
def read_ctr_records():
    # 定义转换函数,输入时序列化的
    def parse_tfrecords_function(features):

        parsed_features = tf.data.experimental.parse_example_dataset(features)

        feature = tf.io.decode_raw(parsed_features['feature'], tf.float64)
        feature = tf.reshape(tf.cast(feature, tf.float32), [1, 121])
        # 特征顺序 1 channel_id,  100 article_vector, 10 user_weights, 10 article_weights
        # 1 channel_id类别型特征, 100维文章向量求平均值当连续特征,10维用户权重求平均值当连续特征
        channel_id = tf.cast(tf.slice(feature, [0, 0], [1, 1]), tf.int32)
        vector = tf.reduce_sum(tf.slice(feature, [0, 1], [1, 100]), axis=1)
        user_weights = tf.reduce_sum(tf.slice(feature, [0, 101], [1, 10]), axis=1)
        article_weights = tf.reduce_sum(tf.slice(feature, [0, 111], [1, 10]), axis=1)

        label = tf.cast(parsed_features['label'], tf.float32)

        # 构造字典 名称-tensor
        tensor_list = [channel_id, vector, user_weights, article_weights]

        feature_dict = dict(zip(FEATURE_COLUMNS, tensor_list))

        return feature_dict, label

    dataset = tf.data.TFRecordDataset(["./xx.tfrecords"])
    dataset = dataset.map(parse_tfrecords_function)

正则化(Regularization)

  • L1:最终结果,可能使得某项的权重参数惩罚为0
  • L2:最终结果,可能使得某项的权重参数大大减小,但不会为0(权重衰减)

带有L2的 loss 反向传播(BP)后求导得:

dW = 对w的偏导 + (λ/m)W  (λ 为正则化因子,是超参数)
W:=W−αdW

最终 w =  w(1-αλ/m) - α * 对w的偏导   
(1-αλ/m) 永远小于1, 所以 w只会衰减,所以特征不会消失。

带有L1的 loss BP求导:

同上,略, 最终会使w为0,让特征消失掉。

TFRL

常出现的问题:

  • 模型更新周期慢,不能有效反映线上的变化,最快小时级别,一般是天级别甚至周级别。
    解决方式:采用Online-learning的算法。
  • 模型参数多线上predict的时候需要内存大,QPS无法保证。
    解决方式:采用一些优化的方法,在保证精度的前提下,尽量获取稀疏解,从而降低模型参数的数量。

常用最优化的方法:FTRL(Follow the Regularized Leader)

FTRL是一种获得稀疏模型并且防止过拟合的优化方法:(就是loss+l1+l2的结合体)公式:
loss + l1 + l2
应用场景:

FTRL更适合大量的稀疏特征和大量数据场景。(低稀疏可能效果并不是很好)

算法参数:

  • lambda1:L1正则系数,参考值:10 ~ 15
  • lambda2:L2正则系数,参考值:10 ~ 15
  • alpha:FTRL参数,参考值:0.1
  • beta:FTRL参数,参考值:1.0
  • batchSize: mini-batch的大小,参考值:10000

最终参数:

lambda1 = 15,lambda2 = 15, alpha = 0.1, beta = 1.0

使用FTRL实例:

classifiry = tf.estimator.LinearClassifier(feature_columns=feature_cl,
    optimizer=tf.train.FtrlOptimizer(
        learning_rate=0.01,
        l1_regularization_strength=10, 
        l2_regularization_strength=15,
    )
)