当前位置:首页 » 《休闲阅读》 » 正文

实用篇 | 一文快速构建人工智能前端展示streamlit应用

10 人参与  2024年03月23日 08:00  分类 : 《休闲阅读》  评论

点击全文阅读


----------------------- ?API 相关直达 ?--------------------------

?Gradio: 实用篇 | 关于Gradio快速构建人工智能模型实现界面,你想知道的都在这里-CSDN博客

?Streamlit :实用篇 | 一文快速构建人工智能前端展示streamlit应用-CSDN博客

?Flask: 实用篇 | 一文学会人工智能中API的Flask编写(内含模板)-CSDN博客

Streamlit是一个用于机器学习、数据可视化的 Python 框架,它能几行代码就构建出一个精美的在线 app 应用。相比于Gradio,能展示更多的功能~

目录

1.Streamlit的安装

2.Streamlit的语法

2.1.基本语法

2.2.进阶语法

2.2.1.图片,语音,视频

2.2.2.进程提示

2.3.高级语法

2.3.1.@st.cache_data

2.3.2.st.cache_resource

3.创建一个简单的app

实时读取数据并作图

4.人工智能深度学习项目Streamlit实例

4.1.实例1:文本生成

4.1.1ChatGLM的交互

4.1.2.OpenAI的交互

4.2.图像类

4.2.1.图像分类

4.2.2.图片生成

4.3.语音类

4.3.1.语音合成

 4.3.2.语音转文本

参考文献


官网:Get started - Streamlit Docs

1.Streamlit的安装

# 安装pip install streamlitpip install streamlit-chat# 测试streamlit hello

会出现一些案例

2.Streamlit的语法

2.1.基本语法

import streamlit as st

最常用的几种

标题st.title() : st.title("标题")写入st.write(): st.write("Hello world ")文本st.text():单行文本多行文本框st.text_area():st.text_area("文本框",value=''key=None)滑动条st.slider():st.slider(““)按钮st.button():st.button(“按钮“)输入文本st.text_input():st.text_input(“请求用户输入“)单选框组件st.radio()

2.2.进阶语法

2.2.1.图片,语音,视频

都可以输入向量值,比特值,加载文件,文件路径

st.image()st.audio()st.video()

2.2.2.进程提示

st.progress() 显示进度st.spinner()显示执行状态st.error()显示错误信息st.warning - 显示警告信息

2.3.高级语法

2.3.1.@st.cache_data

当使用 Streamlit 的缓存注释标记函数时,它会告诉 Streamlit 每当调用函数时,它应该检查两件事:

用于函数调用的输入参数函数内部的代码

2.3.2.st.cache_resource

用于缓存返回全局资源(例如数据库连接、ML 模型)的函数的装饰器。

缓存的对象在所有用户、会话和重新运行之间共享。他们 必须是线程安全的,因为它们可以从多个线程访问 同时。如果线程安全是一个问题,请考虑改用 st.session_state 来存储每个会话的资源。

默认情况下,cache_resource函数的所有参数都必须是可哈希的。 名称以 _ 开头的任何参数都不会进行哈希处理。

3.创建一个简单的app

实时读取数据并作图

import streamlit as stimport pandas as pdimport numpy as npst.title('Uber pickups in NYC')DATA_COLUMN = 'data/time'DATA_URL = ('https://s3-us-west-2.amazonaws.com/'            'streamlit-demo-data/uber-raw-data-sep14.csv.gz')# 增加缓存@st.cache_data# 下载数据函数def load_data(nrows):    # 读取csv文件    data = pd.rea_csv(data_url,nrows=nrows)    # 转换小写字母    lowercase = lambda x:tr(x).lower()    # 将数据重命名     data.rename(lowercase,axis='columns',inplace=True)    # 将数据以panda的数据列的形式展示出来    data[DATA_COLUMN] = pd.to_datatime(data[DATA_COLUMN])    # 返回最终数据    return data# 直接打印文本信息data_load_state = st.text('正在下载')# 下载一万条数据中的数据data = load_data(10000)# 最后输出文本显示data_load_state.text("完成!(using st.cache_data)")# 检查原始数据if st.checkbox('Show raw data'):    st.subheader('Raw data')    st.write(data)# 绘制直方图# 添加一个子标题st.subheader('Number of pickups by hour')# 使用numpy生成一个直方图,按小时排列hist_values = np.histogram(data[DATE_COLUMN].dt.hour, bins=24, range=(0,24))[0]# 使用Streamlit 的 st.bar_chart() 方法来绘制直方图st.bar_chart(hist_values)# 使用滑动块筛选结果hour_to_filter = st.slider('hour', 0, 23, 17)# 实时更新filtered_data = data[data[DATE_COLUMN].dt.hour == hour_to_filter]# 为地图添加一个副标题st.subheader('Map of all pickups at %s:00' % hour_to_filter)# 使用st.map()函数绘制数据st.map(filtered_data)

运行

streamlit run demo.py

4.人工智能深度学习项目Streamlit实例

4.1.实例1:文本生成

4.1.1ChatGLM的交互

from transformers import AutoModel, AutoTokenizerimport streamlit as stfrom streamlit_chat import messagest.set_page_config(    page_title="ChatGLM-6b 演示",    page_icon=":robot:")@st.cache_resourcedef get_model():    tokenizer = AutoTokenizer.from_pretrained("THUDM/chatglm-6b", trust_remote_code=True)    model = AutoModel.from_pretrained("THUDM/chatglm-6b", trust_remote_code=True).half().cuda()    model = model.eval()    return tokenizer, modelMAX_TURNS = 20MAX_BOXES = MAX_TURNS * 2def predict(input, max_length, top_p, temperature, history=None):    tokenizer, model = get_model()    if history is None:        history = []    with container:        if len(history) > 0:            if len(history)>MAX_BOXES:                history = history[-MAX_TURNS:]            for i, (query, response) in enumerate(history):                message(query, avatar_style="big-smile", key=str(i) + "_user")                message(response, avatar_style="bottts", key=str(i))        message(input, avatar_style="big-smile", key=str(len(history)) + "_user")        st.write("AI正在回复:")        with st.empty():            for response, history in model.stream_chat(tokenizer, input, history, max_length=max_length, top_p=top_p,                                               temperature=temperature):                query, response = history[-1]                st.write(response)    return historycontainer = st.container()# create a prompt text for the text generationprompt_text = st.text_area(label="用户命令输入",            height = 100,            placeholder="请在这儿输入您的命令")max_length = st.sidebar.slider(    'max_length', 0, 4096, 2048, step=1)top_p = st.sidebar.slider(    'top_p', 0.0, 1.0, 0.6, step=0.01)temperature = st.sidebar.slider(    'temperature', 0.0, 1.0, 0.95, step=0.01)if 'state' not in st.session_state:    st.session_state['state'] = []if st.button("发送", key="predict"):    with st.spinner("AI正在思考,请稍等........"):        # text generation        st.session_state["state"] = predict(prompt_text, max_length, top_p, temperature, st.session_state["state"])

4.1.2.OpenAI的交互

from openai import OpenAIimport streamlit as stwith st.sidebar:    openai_api_key = st.text_input("OpenAI API Key", key="chatbot_api_key", type="password")    "[Get an OpenAI API key](https://platform.openai.com/account/api-keys)"    "[View the source code](https://github.com/streamlit/llm-examples/blob/main/Chatbot.py)"    "[![Open in GitHub Codespaces](https://github.com/codespaces/badge.svg)](https://codespaces.new/streamlit/llm-examples?quickstart=1)"st.title("? Chatbot")st.caption("? A streamlit chatbot powered by OpenAI LLM")if "messages" not in st.session_state:    st.session_state["messages"] = [{"role": "assistant", "content": "How can I help you?"}]for msg in st.session_state.messages:    st.chat_message(msg["role"]).write(msg["content"])if prompt := st.chat_input():    if not openai_api_key:        st.info("Please add your OpenAI API key to continue.")        st.stop()    client = OpenAI(api_key=openai_api_key)    st.session_state.messages.append({"role": "user", "content": prompt})    st.chat_message("user").write(prompt)    response = client.chat.completions.create(model="gpt-3.5-turbo", messages=st.session_state.messages)    msg = response.choices[0].message.content    st.session_state.messages.append({"role": "assistant", "content": msg})    st.chat_message("assistant").write(msg)

4.2.图像类

4.2.1.图像分类

import streamlit as stst.markdown('<h1 style="color:black;">Vgg 19 Image classification model</h1>', unsafe_allow_html=True)st.markdown('<h2 style="color:gray;">The image classification model classifies image into following categories:</h2>', unsafe_allow_html=True)st.markdown('<h3 style="color:gray;"> street,  buildings, forest, sea, mountain, glacier</h3>', unsafe_allow_html=True)# 背景图片background image to streamlit@st.cache(allow_output_mutation=True)# 以base64的方式传输文件def get_base64_of_bin_file(bin_file):    with open(bin_file, 'rb') as f:        data = f.read()    return base64.b64encode(data).decode()#设置背景图片,颜色等def set_png_as_page_bg(png_file):    bin_str = get_base64_of_bin_file(png_file)     page_bg_img = '''    <style>    .stApp {    background-image: url("data:image/png;base64,%s");    background-size: cover;    background-repeat: no-repeat;    background-attachment: scroll; # doesn't work    }    </style>    ''' % bin_str        st.markdown(page_bg_img, unsafe_allow_html=True)    returnset_png_as_page_bg('/content/background.webp')# 上传png/jpg的照片upload= st.file_uploader('Insert image for classification', type=['png','jpg'])c1, c2= st.columns(2)if upload is not None:  im= Image.open(upload)  img= np.asarray(im)  image= cv2.resize(img,(224, 224))  img= preprocess_input(image)  img= np.expand_dims(img, 0)  c1.header('Input Image')  c1.image(im)  c1.write(img.shape) # 下载预训练模型 # 输入尺寸  input_shape = (224, 224, 3) # 定义优化器  optim_1 = Adam(learning_rate=0.0001) # 分类数  n_classes=6 # 定义模型  vgg_model = model(input_shape, n_classes, optim_1, fine_tune=2) # 下载权重  vgg_model.load_weights('/content/drive/MyDrive/vgg/tune_model19.weights.best.hdf5')  #预测  vgg_preds = vgg_model.predict(img)  vgg_pred_classes = np.argmax(vgg_preds, axis=1)  c2.header('Output')  c2.subheader('Predicted class :')  c2.write(classes[vgg_pred_classes[0]] )

4.2.2.图片生成

import streamlit as st from dotenv import load_dotenvimport os import openaifrom diffusers import StableDiffusionPipelineimport torchload_dotenv()openai.api_key = os.getenv("OPENAI_API_KEY")#function to generate AI based images using OpenAI Dall-Edef generate_images_using_openai(text):    response = openai.Image.create(prompt= text, n=1, size="512x512")    image_url = response['data'][0]['url']    return image_url#function to generate AI based images using Huggingface Diffusersdef generate_images_using_huggingface_diffusers(text):    pipe = StableDiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16)    pipe = pipe.to("cuda")    prompt = text    image = pipe(prompt).images[0]     return image#Streamlit Codechoice = st.sidebar.selectbox("Select your choice", ["Home", "DALL-E", "Huggingface Diffusers"])if choice == "Home":    st.title("AI Image Generation App")    with st.expander("About the App"):        st.write("This is a simple image generation app that uses AI to generates images from text prompt.")elif choice == "DALL-E":    st.subheader("Image generation using Open AI's DALL-E")    input_prompt = st.text_input("Enter your text prompt")    if input_prompt is not None:        if st.button("Generate Image"):            image_url = generate_images_using_openai(input_prompt)            st.image(image_url, caption="Generated by DALL-E")elif choice == "Huggingface Diffusers":    st.subheader("Image generation using Huggingface Diffusers")    input_prompt = st.text_input("Enter your text prompt")    if input_prompt is not None:        if st.button("Generate Image"):            image_output = generate_images_using_huggingface_diffusers(input_prompt)            st.info("Generating image.....")            st.success("Image Generated Successfully")            st.image(image_output, caption="Generated by Huggingface Diffusers")

4.3.语音类

4.3.1.语音合成

import torchimport streamlit as st# 这里使用coqui-tts,直接pip install tts就可以from TTS.api import TTSimport tempfileimport osdevice = "cuda" if torch.cuda.is_available() else "cpu"# 模型选择model_name = 'tts_models/en/jenny/jenny'tts = TTS(model_name).to(device)st.title('Coqui TTS')# 输入文本text_to_speak = st.text_area('Entire article text here:', '')# 点击按钮监听if st.button('Listen'):    if text_to_speak:        # temp path needed for audio to listen to        # 定义合成语音文件名称        temp_audio_path = './temp_audio.wav'        # 使用tts库中的tts_to_file函数        tts.tts_to_file(text=text_to_speak, file_path=temp_audio_path)        #输出语音        st.audio(temp_audio_path, format='audio/wav')        os.unlink(temp_audio_path)


 4.3.2.语音转文本

import loggingimport logging.handlersimport queueimport threadingimport timeimport urllib.requestimport osfrom collections import dequefrom pathlib import Pathfrom typing import Listimport avimport numpy as npimport pydubimport streamlit as stfrom twilio.rest import Clientfrom streamlit_webrtc import WebRtcMode, webrtc_streamerHERE = Path(__file__).parentlogger = logging.getLogger(__name__)# This code is based on https://github.com/streamlit/demo-self-driving/blob/230245391f2dda0cb464008195a470751c01770b/streamlit_app.py#L48  # noqa: E501def download_file(url, download_to: Path, expected_size=None):    # Don't download the file twice.    # (If possible, verify the download using the file length.)    if download_to.exists():        if expected_size:            if download_to.stat().st_size == expected_size:                return        else:            st.info(f"{url} is already downloaded.")            if not st.button("Download again?"):                return    download_to.parent.mkdir(parents=True, exist_ok=True)    # These are handles to two visual elements to animate.    weights_warning, progress_bar = None, None    try:        weights_warning = st.warning("Downloading %s..." % url)        progress_bar = st.progress(0)        with open(download_to, "wb") as output_file:            with urllib.request.urlopen(url) as response:                length = int(response.info()["Content-Length"])                counter = 0.0                MEGABYTES = 2.0 ** 20.0                while True:                    data = response.read(8192)                    if not data:                        break                    counter += len(data)                    output_file.write(data)                    # We perform animation by overwriting the elements.                    weights_warning.warning(                        "Downloading %s... (%6.2f/%6.2f MB)"                        % (url, counter / MEGABYTES, length / MEGABYTES)                    )                    progress_bar.progress(min(counter / length, 1.0))    # Finally, we remove these visual elements by calling .empty().    finally:        if weights_warning is not None:            weights_warning.empty()        if progress_bar is not None:            progress_bar.empty()# This code is based on https://github.com/whitphx/streamlit-webrtc/blob/c1fe3c783c9e8042ce0c95d789e833233fd82e74/sample_utils/turn.py@st.cache_data  # type: ignoredef get_ice_servers():    """Use Twilio's TURN server because Streamlit Community Cloud has changed    its infrastructure and WebRTC connection cannot be established without TURN server now.  # noqa: E501    We considered Open Relay Project (https://www.metered.ca/tools/openrelay/) too,    but it is not stable and hardly works as some people reported like https://github.com/aiortc/aiortc/issues/832#issuecomment-1482420656  # noqa: E501    See https://github.com/whitphx/streamlit-webrtc/issues/1213    """    # Ref: https://www.twilio.com/docs/stun-turn/api    try:        account_sid = os.environ["TWILIO_ACCOUNT_SID"]        auth_token = os.environ["TWILIO_AUTH_TOKEN"]    except KeyError:        logger.warning(            "Twilio credentials are not set. Fallback to a free STUN server from Google."  # noqa: E501        )        return [{"urls": ["stun:stun.l.google.com:19302"]}]    client = Client(account_sid, auth_token)    token = client.tokens.create()    return token.ice_serversdef main():    st.header("Real Time Speech-to-Text")    st.markdown(        """This demo app is using [DeepSpeech](https://github.com/mozilla/DeepSpeech),an open speech-to-text engine.A pre-trained model released with[v0.9.3](https://github.com/mozilla/DeepSpeech/releases/tag/v0.9.3),trained on American English is being served."""    )    # https://github.com/mozilla/DeepSpeech/releases/tag/v0.9.3    MODEL_URL = "https://github.com/mozilla/DeepSpeech/releases/download/v0.9.3/deepspeech-0.9.3-models.pbmm"  # noqa    LANG_MODEL_URL = "https://github.com/mozilla/DeepSpeech/releases/download/v0.9.3/deepspeech-0.9.3-models.scorer"  # noqa    MODEL_LOCAL_PATH = HERE / "models/deepspeech-0.9.3-models.pbmm"    LANG_MODEL_LOCAL_PATH = HERE / "models/deepspeech-0.9.3-models.scorer"    download_file(MODEL_URL, MODEL_LOCAL_PATH, expected_size=188915987)    download_file(LANG_MODEL_URL, LANG_MODEL_LOCAL_PATH, expected_size=953363776)    lm_alpha = 0.931289039105002    lm_beta = 1.1834137581510284    beam = 100    sound_only_page = "Sound only (sendonly)"    with_video_page = "With video (sendrecv)"    app_mode = st.selectbox("Choose the app mode", [sound_only_page, with_video_page])    if app_mode == sound_only_page:        app_sst(            str(MODEL_LOCAL_PATH), str(LANG_MODEL_LOCAL_PATH), lm_alpha, lm_beta, beam        )    elif app_mode == with_video_page:        app_sst_with_video(            str(MODEL_LOCAL_PATH), str(LANG_MODEL_LOCAL_PATH), lm_alpha, lm_beta, beam        )def app_sst(model_path: str, lm_path: str, lm_alpha: float, lm_beta: float, beam: int):    webrtc_ctx = webrtc_streamer(        key="speech-to-text",        mode=WebRtcMode.SENDONLY,        audio_receiver_size=1024,        rtc_configuration={"iceServers": get_ice_servers()},        media_stream_constraints={"video": False, "audio": True},    )    status_indicator = st.empty()    if not webrtc_ctx.state.playing:        return    status_indicator.write("Loading...")    text_output = st.empty()    stream = None    while True:        if webrtc_ctx.audio_receiver:            if stream is None:                from deepspeech import Model                model = Model(model_path)                model.enableExternalScorer(lm_path)                model.setScorerAlphaBeta(lm_alpha, lm_beta)                model.setBeamWidth(beam)                stream = model.createStream()                status_indicator.write("Model loaded.")            sound_chunk = pydub.AudioSegment.empty()            try:                audio_frames = webrtc_ctx.audio_receiver.get_frames(timeout=1)            except queue.Empty:                time.sleep(0.1)                status_indicator.write("No frame arrived.")                continue            status_indicator.write("Running. Say something!")            for audio_frame in audio_frames:                sound = pydub.AudioSegment(                    data=audio_frame.to_ndarray().tobytes(),                    sample_width=audio_frame.format.bytes,                    frame_rate=audio_frame.sample_rate,                    channels=len(audio_frame.layout.channels),                )                sound_chunk += sound            if len(sound_chunk) > 0:                sound_chunk = sound_chunk.set_channels(1).set_frame_rate(                    model.sampleRate()                )                buffer = np.array(sound_chunk.get_array_of_samples())                stream.feedAudioContent(buffer)                text = stream.intermediateDecode()                text_output.markdown(f"**Text:** {text}")        else:            status_indicator.write("AudioReciver is not set. Abort.")            breakdef app_sst_with_video(    model_path: str, lm_path: str, lm_alpha: float, lm_beta: float, beam: int):    frames_deque_lock = threading.Lock()    frames_deque: deque = deque([])    async def queued_audio_frames_callback(        frames: List[av.AudioFrame],    ) -> av.AudioFrame:        with frames_deque_lock:            frames_deque.extend(frames)        # Return empty frames to be silent.        new_frames = []        for frame in frames:            input_array = frame.to_ndarray()            new_frame = av.AudioFrame.from_ndarray(                np.zeros(input_array.shape, dtype=input_array.dtype),                layout=frame.layout.name,            )            new_frame.sample_rate = frame.sample_rate            new_frames.append(new_frame)        return new_frames    webrtc_ctx = webrtc_streamer(        key="speech-to-text-w-video",        mode=WebRtcMode.SENDRECV,        queued_audio_frames_callback=queued_audio_frames_callback,        rtc_configuration={"iceServers": get_ice_servers()},        media_stream_constraints={"video": True, "audio": True},    )    status_indicator = st.empty()    if not webrtc_ctx.state.playing:        return    status_indicator.write("Loading...")    text_output = st.empty()    stream = None    while True:        if webrtc_ctx.state.playing:            if stream is None:                from deepspeech import Model                model = Model(model_path)                model.enableExternalScorer(lm_path)                model.setScorerAlphaBeta(lm_alpha, lm_beta)                model.setBeamWidth(beam)                stream = model.createStream()                status_indicator.write("Model loaded.")            sound_chunk = pydub.AudioSegment.empty()            audio_frames = []            with frames_deque_lock:                while len(frames_deque) > 0:                    frame = frames_deque.popleft()                    audio_frames.append(frame)            if len(audio_frames) == 0:                time.sleep(0.1)                status_indicator.write("No frame arrived.")                continue            status_indicator.write("Running. Say something!")            for audio_frame in audio_frames:                sound = pydub.AudioSegment(                    data=audio_frame.to_ndarray().tobytes(),                    sample_width=audio_frame.format.bytes,                    frame_rate=audio_frame.sample_rate,                    channels=len(audio_frame.layout.channels),                )                sound_chunk += sound            if len(sound_chunk) > 0:                sound_chunk = sound_chunk.set_channels(1).set_frame_rate(                    model.sampleRate()                )                buffer = np.array(sound_chunk.get_array_of_samples())                stream.feedAudioContent(buffer)                text = stream.intermediateDecode()                text_output.markdown(f"**Text:** {text}")        else:            status_indicator.write("Stopped.")            breakif __name__ == "__main__":    import os    DEBUG = os.environ.get("DEBUG", "false").lower() not in ["false", "no", "0"]    logging.basicConfig(        format="[%(asctime)s] %(levelname)7s from %(name)s in %(pathname)s:%(lineno)d: "        "%(message)s",        force=True,    )    logger.setLevel(level=logging.DEBUG if DEBUG else logging.INFO)    st_webrtc_logger = logging.getLogger("streamlit_webrtc")    st_webrtc_logger.setLevel(logging.DEBUG)    fsevents_logger = logging.getLogger("fsevents")    fsevents_logger.setLevel(logging.WARNING)    main()

参考文献

【1】API Reference - Streamlit Docs

【2】andfanilo/streamlit-lottie: Streamlit component to render Lottie animations (github.com)turner-anderson/streamlit-cropper: A simple image cropper for Streamlit (github.com)andfanilo/streamlit-lottie: Streamlit component to render Lottie animations (github.com) 

【3】awetomate/text-to-speech-streamlit: Text-to-Speech solution using Google's Cloud TTS API and a Streamlit front end (github.com) 【4】Using streamlit for an STT / TTS model demo? - ? Streamlit Components - Streamlit

【5】AI-App/Streamlit-TTS (github.com)

【6】Building a Voice Assistant using ChatGPT API | Vahid's ML-Blog (vahidmirjalili.com) 

【7】streamlit/llm-examples: Streamlit LLM app examples for getting started (github.com) 

【8】whitphx/streamlit-stt-app: Real time web based Speech-to-Text app with Streamlit (github.com)


点击全文阅读


本文链接:http://m.zhangshiyu.com/post/84000.html

<< 上一篇 下一篇 >>

  • 评论(0)
  • 赞助本站

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

关于我们 | 我要投稿 | 免责申明

Copyright © 2020-2022 ZhangShiYu.com Rights Reserved.豫ICP备2022013469号-1