Transcribe Audio Files Sent via Email

Tutorials

Intro: Seamless Integration

The versatility of APIs is truly astounding, as they empower developers to interconnect systems, share data, and automate processes in unique and groundbreaking ways. In this blog post, we’ll explore how developers can create a tool that transforms audio files sent to an email address into transcriptions with ease.

Imagine a scenario where an agent wants to transcribe an exceptional customer service conversation. Rather than requiring agents to log into ElevateAI, upload audio files, and download transcriptions, developers can construct an internal service to streamline the process by ingesting audio files, transcribing them, and delivering the transcripts directly.

Let’s now build it. You can download sample code with an implementation from its GitHub repository. If you want to send ElevateAI files in bulk, consider importing multiple audio files using the command line.

The GitHub repo references a submodule, the ElevateAI Python SDK. We’ll use the ElevateAI.py in the SDK to interface with the ElevateAI API. At a high level, what are the steps?

  1. Access an email account and locate an email that has an audio attachment.
  2. Download and save the attachment.
  3. Transcribe the audio file attachment.
  4. Email the transcript back.

For the transcription part of the code, the step are: tell ElevateAI that you want to transcribe an audio file, upload the file, and then download the transcriptions and CX insights when ElevateAI is done. The functions in ElevateAI.py, DeclareAudioInteraction, UploadInteraction, GetPunctuatedTranscript (or GetWordByWordTranscription), and GetAIResults will do the heavy lifting.

Let’s dive in!

 

Step 1. Configure

Read a configuration file that has settings to send and receive emails.

Primarily, we want to pull out the IMAP and SMTP hostnames, usernames, and passwords.

def read_config(filename):
    """
    Read and parse the configuration file.
    """
    try:
        with open(filename, 'r') as f:
            config = json.load(f)
            required_fields = ['imap_server', 'imap_username', 'imap_password',
                               'smtp_server', 'smtp_username', 'smtp_password', 'api_token']
            for field in required_fields:
                if field not in config:
                    raise ValueError(f"Config file is missing required field: {field}")
            return config
    except FileNotFoundError:
        print(f'Error: Config file "{filename}" not found.')
        sys.exit(1)
    except json.JSONDecodeError:
        print(f'Error: Config file "{filename}" is not valid JSON.')
        sys.exit(1)
    except ValueError as e:
        print(f'Error: {e}')
        sys.exit(1)

 

Step 2. Retrieve

Find the latest email with ‘Transcribe’ in the subject.

For the sake of this exercise, we will only retrieve a specific email, but a POC will require a more robust implementation.

# Search for the newest email message with an attachment
search_criteria = 'DATE'
result, data = imap.sort(search_criteria, 'UTF-8', 'SUBJECT "Transcribe"')
latest_email_id = data[0].split()[-1]

# Fetch the email message and extract the attachment
result, data = imap.fetch(latest_email_id, "(RFC822)")
raw_email = data[0][1]
email_message = email.message_from_bytes(raw_email)

attachment_path = None
sender_address = None

for part in email_message.walk():
    if part.get_content_maintype() == 'multipart':
        continue
    if part.get('Content-Disposition') is None:
        continue

    filename = part.get_filename()
    if not filename:
        continue

    # Save the attachment to a temporary file
    file_name = filename
    attachment_path = os.path.join(tmp_folder, filename)
    with open(attachment_path, 'wb') as f:
        f.write(part.get_payload(decode=True))

 

search_criteria = 'DATE'
result, data = imap.sort(search_criteria, 'UTF-8', 'SUBJECT "Transcribe"')
latest_email_id = data[0].split()[-1]

Step 3. Download

Download the attachment and save it in a temporary directory.

Use Python’s built in functionality email handling functionality to download the email attachment and store it.

for part in email_message.walk():
    if part.get_content_maintype() == 'multipart':
        continue
    if part.get('Content-Disposition') is None:
        continue

    filename = part.get_filename()
    if not filename:
        continue

    # Save the attachment to a temporary file
    file_name = filename
    attachment_path = os.path.join(tmp_folder, filename)
    with open(attachment_path, 'wb') as f:
        f.write(part.get_payload(decode=True))

Step 4. Transcribe

Declare the interaction, upload the audio file, and wait for ElevateAI to transcribe the audio file.

Send the audio to ElevateAI for transcription. Block and wait till the file is processed.

declareResp = ElevateAI.DeclareAudioInteraction(langaugeTag, vert, None, token, transcriptionMode, True)

declareJson = declareResp.json()

interactionId = declareJson["interactionIdentifier"]

if (localFilePath is None):
  raise Exception('Something wrong with attachment')

uploadInteractionResponse =  ElevateAI.UploadInteraction(interactionId, token, localFilePath, fileName)

#Loop over status until processed
while True:
  getInteractionStatusResponse = ElevateAI.GetInteractionStatus(interactionId,token)
  getInteractionStatusResponseJson = getInteractionStatusResponse.json()
  if getInteractionStatusResponseJson["status"] == "processed" or getInteractionStatusResponseJson["status"] == "fileUploadFailed" or getInteractionStatusResponseJson["status"] == "fileDownloadFailed" or getInteractionStatusResponseJson["status"] == "processingFailed" :
        break
  time.sleep(15)

 

Step 5. Convert

Convert the transcription, which is in JSON format, into a regular text file.

Once, we have the JSON, parse it so it reads like a conversation and store it.

def print_conversation(json_str):
  data = json.loads(json_str)
  filename = 'transcript.txt'
  
  # Initialize variables to store the accumulated phrases for each participant
  participantOne_phrases = ""
  participantTwo_phrases = ""
  tmp_folder = tempfile.mkdtemp()
  attachment_path = os.path.join(tmp_folder, filename)
  print("=== Begin Transcription Output ===\n\n")

  with open(attachment_path, 'w') as f:
    # Loop through the sentenceSegments list and accumulate phrases for each participant
    for segment in data['sentenceSegments']:
        if segment['participant'] == 'participantOne':
            participantOne_phrases += segment['phrase'] + " "
        elif segment['participant'] == 'participantTwo':
            participantTwo_phrases += segment['phrase'] + " "

        # If the next segment has a different participant, print the accumulated phrases and reset the variables
        if (data['sentenceSegments'].index(segment) != len(data['sentenceSegments'])-1) and (segment['participant'] != data['sentenceSegments'][data['sentenceSegments'].index(segment)+1]['participant']):
            p1 = participantOne_phrases.strip()
            p2 = participantTwo_phrases.strip()
            if p1:
              print("participantOne:\n" + p1 + "\n")
              f.write("participantOne:\n" + p1 + "\n\n")
            if p2:
              print("participantTwo:\n" + p2 + "\n")
              f.write("participantTwo:\n" + p2 + "\n\n")
            participantOne_phrases = ""
            participantTwo_phrases = ""

    # Print the accumulated phrases for the last participant
    p1 = participantOne_phrases.strip()
    p2 = participantTwo_phrases.strip()
    if p1:
      print("participantOne:\n" + p1 + "\n")
      f.write("participantOne:\n" + p1 + "\n\n")

    if p2:
      print("participantTwo:\n" + p2 + "\n")
      f.write("participantTwo:\n" + p2 + "\n\n")

    print("=== End Transcription Output ===\n\n")

  f.close()

  return attachment_path

Step 6. Email

Send the text file back through email.

Create a new email, attach the transcription, and send it back to the original sender.

def send_email_with_attachment(attachment_path, recipient_address, config):

  smtp_server = config["smtp_server"]
  smtp_username = config["smtp_username"]
  smtp_password = config["smtp_password"]

  # Log in to the SMTP server
  smtp = smtplib.SMTP_SSL(smtp_server)
  smtp.ehlo()
  smtp.login(smtp_username, smtp_password)
  print("SMTP logged in.")

  # Create a message object
  message = MIMEMultipart()
  message['From'] = smtp_username
  message['To'] = recipient_address
  message['Subject'] = "Completed Transcription"

  # Add the attachment to the message
  with open(attachment_path, 'r') as f:
    attachment = MIMEApplication(f.read(), _subtype='txt')
    attachment.add_header('Content-Disposition', 'attachment', filename=os.path.basename(attachment_path))
    message.attach(attachment)

  # Send the message
  smtp.send_message(message)

  # Log out of the SMTP server
  smtp.quit()

Sample code can be found in GitHub.

Harnessing the Power of CX ...
Efficient Bulk Transcriptio...

Related Posts