CTIgor: An Agentic CTI Assistant
Continuing down the road of my ongoing experimentation with Generative AI, one of these experiments manifested into a Cyber Threat Intelligence assistant to help facilitate the threat post/report consumption process. This post will follow the steps of gradually building this out, as an educational exercise.

Introduction
Not too long ago, I came across this AI Agents for Beginners 10-lesson mini-course shared by one of my contacts. Working through the course, I decided to try applying some of the example use-cases to a practical area of cyber security presenting a common scaling problem: review + cataloging of Cyber Threat Intelligence (CTI) reports. The outcome of this is a project I chose to name CTIgor, an assistant in threat reporting review. In this blog post, I’ll discuss the steps in building it (somewhat) from scratch.
Azure OpenAI Deployment
As the AI Agents Courselet is a Microsoft publication, many of the examples are Azure-centric. For those wishing
to utilize alternative services, or even locally-hosted LLMs, the
semantic-kernel AI connectors offer support for
AWS Bedrock,
Ollama, and
others. In the Python code at https://github.com/ckane/ctigor/tree/semantic-kernel-port/, the use of AzureChatCompletion
and
AzureChatPromptExecutionSettings
can be substituted with instances of the relevant types for the target LLM platform or interface. The
instructions below will use Azure’s OpenAI interface, but reliance upon it will be minimized so as to maintain portability.
In order to help bootstrap a fairly inexpensive AI deployment using Azure, I’ve written some Terraform code that can be used to deploy the needed OpenAI infrastructure in Azure. Microsoft has a partnership with OpenAI which has led them to offer a lot of OpenAI-derived services through Azure and an OpenAI-compatible API.
The code in main.tf deploys the following infrastructure into Azure to support generative AI:
- Azure Resource Group: To logically isolate this project in the Azure tenant
- Azure Cognitive Services Account: Logical grouping object for AI service deployments
- Azure Cognitive Services Deployment: A deployment of an LLM (in this case, OpenAI-compatible) that will be interacted with
A number of input variables are defined in variables.tf, that allows
customization of the deployment. Placeholders for the required parameters are in
terraform.tfvars.example. This file must be copied to
terraform.tfvars
and edited to add your Azure Subscription Id plus three unique descriptive names for the infrastructure defined in
main.tf
. This is largely intended to be a simple example for descriptive purposes, but with an active Azure Subscription, it should
work nonetheless, to quickly bootstrap a working environment. Note that the infrastructure deployed will cost money per use, so the
default model choice I’ve picked is gpt-4o-mini
, which is
priced around $0.15 per 1M input tokens
and $0.60 per 1M output (generated) tokens. The model supports a context window of 128k tokens, which provides sufficient storage for many
large CTI reports. Generally speaking, the more expensive models are targeted at more complex tasks or satisfying more ambiguous or lengthy
queries.
Python Environment
For the project ctigor, I’ve built the project using the
uv Python framework.
The project should “just work” with an installed uv
system after cloning the repository. However, for anyone wanting to use their preferred
Python environment manager or more traditional pip
system, I also maintain a
requirements.txt for you in the repository, and all the
uv
details can be ignored if you prefer.
uv
is nice, as you can run uv run script.py
(substituting script.py
for the Python script or command you’d like to run within the managed
virtual environment) and it will handle the venv
details behind the scenes for you.
Similar to Terraform above, for Python I’ve placed a
local_settings.py.example file which needs to be
copied into local_settings.py
and then populated with the appropriate values from Terraform. The terraform output
command
can be used to retrieve them:
For local_settings.azure_api_key
:
terraform output openai_primary_key
For local_settings.endpoint
:
terraform output openai_endpoint
And then make sure that local_settings.deployment
matches the name of the Cognitive Services deployment you defined in terraform.tfvars
.
Example Testing AI Functionality
A quick test of the Azure OpenAI service is important, to validate everything is working alright. The following Python script will prompt the LLM to generate a fact about hedgehogs. The code has been commented to explain each of the steps
import asyncio
# Import the semantic-kernel objects used by the program
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion, AzureChatPromptExecutionSettings
from semantic_kernel.contents import ChatHistory
from semantic_kernel.kernel import Kernel
# Import secrets from local_settings.py
import local_settings
def create_azure_chat_completion():
# Define the Azure OpenAI AI Connector and connect to the deployment Terraform provisioned from main.tf
chat_service = AzureChatCompletion(
deployment_name=local_settings.deployment,
api_key=local_settings.azure_api_key,
endpoint=local_settings.endpoint,
)
return chat_service
async def main():
# Instantiate a new AI Kernel
kernel = Kernel()
# Define a ChatHistory object and add a user prompt to it
chat_history = ChatHistory()
chat_history.add_user_message("Tell me a fact about hedgehogs")
# Create a new Azure Chat Completion instance to interact with
chat_service = create_azure_chat_completion()
# Define the request settings to use model defaults
request_settings = AzureChatPromptExecutionSettings()
# Prompt the model with the given chat history
response = await chat_service.get_chat_message_content(
chat_history=chat_history, settings=request_settings, kernel=kernel
)
# Fail if no response
assert response is not None
# Display response on console
print(f"=======\n{response}\n=======")
if __name__ == "__main__":
asyncio.run(main())
When run, it should produce output similar to the following:
=======
Hedgehogs are nocturnal animals, meaning they are primarily active during the night.
They have excellent senses of hearing and smell, which help them navigate and find
food in the dark.
=======
The instantiation of the model works by being fed an incrementally updated ChatHistory
which is appended to any time a new
prompt from the user is given or an answer is generated by the LLM. Each new prompt or answer can be appended to the ChatHistory
calling the appropriate ChatHistory.add_user_message
or ChatHistory.add_assistant_message
method. This ensures that for each subsequent
interaction, the instance retains a “memory” of the conversation, allowing parts of the conversation to be referred back to and used
for subsequent answers.
CTIgor Class
After verifying the above, the symantic-kernel
initialization code will next be moved into a new ctiagent.py
module:
import asyncio
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion, AzureChatPromptExecutionSettings
from semantic_kernel.contents import ChatHistory
from semantic_kernel.kernel import Kernel
# Import secrets from local_settings.py
import local_settings
class CTIgor(object):
def __init__(self):
# Initialize a new Kernel to work with
self.kernel = Kernel()
# Define the Azure OpenAI AI Connector and connect to the deployment Terraform provisioned from main.tf
self.chat_service = AzureChatCompletion(
deployment_name=local_settings.deployment,
api_key=local_settings.azure_api_key,
endpoint=local_settings.endpoint,
)
# Define a ChatHistory object
self.chat_history = ChatHistory()
# Define the request settings to use model defaults
self.request_settings = AzureChatPromptExecutionSettings()
async def prompt(self, input_prompt: str):
self.chat_history.add_user_message(input_prompt)
# Prompt the model with the given chat history, waiting for response
response = await self.chat_service.get_chat_message_content(
chat_history=self.chat_history, settings=self.request_settings, kernel=self.kernel
)
# Ensure response isn't None
assert response is not None
# Append the response to the chat_history
self.chat_history.add_assistant_message(response.content)
return response
The new CTIgor
object simplifies the interaction with the model instance to have a single prompt(str)
call which performs
all the work of:
- Adding the user prompt to the
ChatHistory
instance - Sending the updated
ChatHistory
instance to the LLM - Waiting on a response
- Validating the response completed successfully
- Adding the response from the AI assistant to the
ChatHistory
- Returning the validated response to the caller
Additionally, all of the variables declared in the main()
and global scopes are now cleanly organized inside a
data type that will handle the AI interactions.
With these adjustments, refactoring the earlier test_openai.py
script now looks like this:
import asyncio
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion, AzureChatPromptExecutionSettings
from semantic_kernel.contents import ChatHistory
from semantic_kernel.kernel import Kernel
from ctiagent import CTIgor
async def main():
ctigor = CTIgor()
response = await ctigor.prompt("Tell me a fact about hedgehogs")
# Display response on console
print(f"=======\n{response}\n=======")
if __name__ == "__main__":
asyncio.run(main())
Successive calls to CTIgor.prompt
will append the new input and output to the instance’s internal ChatHistory
.
Add a Helper Function Plugin
The semantic_kernel.functions package contains
an interface to provide context-aware function plugins to the agent for performing tasks. Declaring function plugins, and supplying
English-language metadata with them when registering with the Kernel
provides the LLM with awareness of code that it can choose to
call. You define the conditions under which the LLM will call the utility function(s) much in the same manner that you would use to
explain to another person how and when to use the tool. Because of this, annotations for semantic-kernel
double as documentation for
your code: a nice perk!
Consider the following utility function definition: it defines a function to call that takes two input values, a high and a low bound,
and will return an int
value that falls within the range (inclusive of the bounds):
from random import randint
from typing import Annotated
from semantic_kernel.functions import kernel_function
class RandomNumberPlugin:
"""Generates a Random Number"""
@kernel_function(
name='gen_random',
description="Generate a random number given a low and high bound"
)
async def gen_random(
self,
low: Annotated[int, "Lower bound of the random number"],
high: Annotated[int, "Upper bound of the random number"]
) -> Annotated[int, "Generated random number within the bounds"]:
print(f"Running gen_random with low={low}, high={high}")
return randint(low, high)
In the above code, the @kernel_function
decorator is used to give an English-language description
that the LLM will use to
determine under what conditions to execute the function, as well as a short description of what the inputs mean, and what the
function will return. In the above, the print
call isn’t necessary for the logic to work, but instead provides a useful signal
that validates to us when the agent is calling the function. Furthermore, the use of typing.Annotated
on the input variables and the output will also be utilized by the LLM to decide how and when to use the function.
The above can be added to a new ctagent_functions.py
file, and then loaded with from ctiagent_functions import RandomNumberPlugin
. However, in order for the
LLM to know to use it, the LLM needs to be told to enable function-calling, as well as have this new function class registered in the Kernel
. To tell the
LLM to enable function calling, the following needs to be added to the imports in ctiagent.py
:
from semantic_kernel.connectors.ai import FunctionChoiceBehavior
from ctiagent_functions import RandomNumberPlugin
And then, in ctiagent.CTIgor
, the AzureChatPromptExecutionSettings()
call needs to be told to override the default behavior with
FunctionChoiceBehavior
, for our purpose, using Automatic invocation.
# Define the request settings to use automatic function-choice execution
self.request_settings = AzureChatPromptExecutionSettings(function_choice_behavior=FunctionChoiceBehavior.Auto())
The plugin can then be registered like this inside of ctiagent.CTIgor.__init__
:
# Register the RandomNumberPlugin with the kernel
self.kernel.add_plugin(RandomNumberPlugin(), plugin_name="random_number")
The new ctiagent.py
code should look like:
import asyncio
from semantic_kernel.connectors.ai import FunctionChoiceBehavior
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
from semantic_kernel.connectors.ai.open_ai import AzureChatPromptExecutionSettings
from semantic_kernel.contents import ChatHistory
from semantic_kernel.kernel import Kernel
from ctiagent_functions import RandomNumberPlugin
# Import secrets from local_settings.py
import local_settings
class CTIgor(object):
def __init__(self):
# Initialize a new Kernel to work with
self.kernel = Kernel()
# Define the Azure OpenAI AI Connector and connect to the deployment Terraform
# provisioned from main.tf
self.chat_service = AzureChatCompletion(
deployment_name=local_settings.deployment,
api_key=local_settings.azure_api_key,
endpoint=local_settings.endpoint,
)
# Define a ChatHistory object
self.chat_history = ChatHistory()
# Define the request settings to use model defaults
self.request_settings = AzureChatPromptExecutionSettings(
function_choice_behavior=FunctionChoiceBehavior.Auto()
)
# Register the RandomNumberPlugin with the kernel
self.kernel.add_plugin(RandomNumberPlugin(), plugin_name="random_number")
async def prompt(self, input_prompt: str):
self.chat_history.add_user_message(input_prompt)
# Prompt the model with the given chat history, waiting for response
response = await self.chat_service.get_chat_message_content(
chat_history=self.chat_history, settings=self.request_settings, kernel=self.kernel
)
# Ensure response isn't None
assert response is not None
# Append the response to the chat_history
self.chat_history.add_assistant_message(response.content)
return response
Finally, we’ll make a new program named test_chat.py
that implements a simple prompt-response chat loop. The addition of the RandomNumberPlugin
to ctiagent.CTIgor
should provide the glue necessary for the LLM to choose to run it when needed:
import asyncio
from ctiagent import CTIgor
async def main():
ctigor = CTIgor()
# Loop forever while the user has more input
while True:
try:
# Get input from the user, display a prompt to indicate waiting on user input
user_prompt = input('CTIgor> ')
# If user says 'quit' then exit
if user_prompt.lower() in ['quit', 'exit', 'bye']:
raise EOFError
# Send the user's prompt to the LLM and wait for the response
response = await ctigor.prompt(user_prompt)
# Display response on console
print(f"=======\n{response}\n=======")
except EOFError:
# On EOF, exit the program gracefully
print("Thank you, bye!")
break
if __name__ == "__main__":
asyncio.run(main())
Testing the Chat Agent
Now, if you run test_chat.py
(uv run test_chat.py
if using uv
), the LLM can be given prompts for information like a normal AI chat assistant, like:
CTIgor> What are three planets in our solar system?
=======
Three planets in our solar system are:
1. Earth
2. Mars
3. Jupiter
=======
Testing the Chat Agent w/ Random Number Generation
Further, prompts that ask it to generate random numbers will trigger the print
statement mentioned above, indicating the helper
function is being run by the agent to generate the random value before providing it to us:
CTIgor> Give me a random number between 723 and 64
Running gen_random with low=64, high=723
=======
The random number between 64 and 723 is 489.
=======
Of note in the above is that due to our Annotation
objects added to the function definition, regardless of what order the
bounds are provided to the agent, it will properly identify which is the “low” and which is the “high” bound.
Even more abstract requests for random selection of information can trigger the function call too. For example, combining the prior two prompts:
CTIgor> Give me a random name of one of our solar system's planets
Running gen_random with low=0, high=7
=======
The random name of one of our solar system's planets is Saturn.
=======
From the above, it is clear that the LLM uses our random-number generation helper function to generate a random number between
0
and 7
(inclusive) - for 8 possible options. Clearly, the LLM is aware of Pluto’s demotion.
If you don’t like that behavior, instructing the LLM to consider Pluto a planet may look like this:
CTIgor> Give me another random name of one of our solar system's planets, but assume Pluto is also a planet.
Running gen_random with low=0, high=8
=======
The random name of one of our solar system's planets is Venus.
=======
Add Content-Reading Capability
Let’s start with adding another simple helper function to ctiagent_functions.py
:
class FilePlugin:
"""A Plugin to handle work with files"""
@kernel_function(
name="load_file_content",
description="Load a file from disk, given a filename. Returns a bytes object of the file contents."
)
async def load_file_content(
self, file_name: Annotated[str, "The name and path of the file on disk to return the contents of"]
) -> Annotated[bytes, "The contents from the file"]:
with open(file_name, "rb") as txtfile:
return txtfile.read()
The above class implements a function named load_file_content
which takes a file name as input (as a str
type) and
returns a bytes
of the file’s content to the caller. The function assumes that the file is a regular text file (trusts user
discretion) and performs no validation on the file before loading it.
This can be connected to the agent by adding the necessary code at the right places in ctiagent.py
where the
references to RandomNumberPlugin
were placed:
from ctiagent_functions import RandomNumberPlugin, FilePlugin
class CTIgor(object):
def __init__(self):
....
# Register the RandomNumberPlugin, FilePlugin with the kernel
self.kernel.add_plugin(RandomNumberPlugin(), plugin_name="random_number")
self.kernel.add_plugin(FilePlugin(), plugin_name="file")
Once it is properly added to ctiagent.py
, the chat loop inside of test_chat.py
will now be able to take advantage of it,
given appropriate direction. We can render an HTML CTI report into text using the lynx -dump <url>
command. This will
render the HTML to text on stdout
, which can be redirected to a file:
lynx -dump https://cloud.google.com/blog/topics/threat-intelligence/china-nexus-espionage-targets-juniper-routers/ > /tmp/juniper-routers.txt
The file /tmp/juniper-routers.txt
can then be loaded into the agent’s model, to be further analyzed and queried using
natural language prompts. For example (long lines have been manually wrapped):
CTIgor> Summarize the contents of /tmp/juniper-routers.txt for me. I am a technical cyber security analyst.
=======
The document provides an analysis of a cyber espionage campaign conducted by
the China-nexus group UNC3886, targeting Juniper Networks' Junos OS routers and
deploying custom backdoors called TINYSHELL. Here is a summary of key points
relevant to a technical cybersecurity analyst:
1. **Incident Overview**: In mid-2024, Mandiant discovered that threat actors
had deployed **custom backdoors** on Juniper routers, attributed to UNC3886.
These backdoors disable logging and allow for active and passive access to the
compromised systems.
2. **Affected Devices**: The attackers primarily targeted **Juniper MX
routers** operating with end-of-life hardware and software, demonstrating that
outdated systems are more susceptible to such attacks.
3. **Malware Characteristics**:
- The analysis identified **six distinct malware samples**, all based on
TINYSHELL, each with unique functionalities, including:
- `appid`: Active backdoor mimicking a legitimate binary.
- `to`, `irad`, `lmpad`, `jdosd`, and `oemd`: Various active and passive
backdoors with specific operational tasks.
- The malware relies on **process injection techniques** to bypass security
mechanisms, specifically Juniper's verified exec subsystem (Veriexec), by
executing malicious code within legitimate process memory.
4. **Attack Methodology**:
- Attacks exploit legitimate credentials to gain privileged access to the
routers.
- Post-compromise activity involves manipulating process memory and
disabling logging functionalities to maintain stealth and long-term persistence.
- Command and Control (C2) servers are utilized for remote access and
commands, with robust encryption for communications.
5. **Mitigation Recommendations**:
- Organizations should upgrade their Juniper devices to the latest software
images.
- Implement a centralized Identity and Access Management (IAM) system with
robust authentication protocols.
- Enhance network monitoring, and validate configurations against defined
security standards.
- Proactively leverage threat intelligence for ongoing evaluation of
security postures against emerging threats.
6. **Indicators of Compromise (IoCs)**: The document includes specific IoCs
(e.g., filenames, hashes of malware samples, and C2 IP addresses) for detecting
the identified malware.
In conclusion, this incident illustrates the evolving tactics of advanced
persistent threats (APTs) and emphasizes the need for organizations to maintain
updated security measures and conduct regular assessments of their network
infrastructure to defend against such targeted attacks.
=======
Note that in the prompt, I told it my role is a technical cyber security analyst
. Providing different roles for yourself (the user
) can
help guide the LLM to generate output tailored to your audience. You can try I am a CEO
or I am an artist running an online studio
. My
personal favorite is I am a five-year-old child
. Experimenting with these is an exercise left to the reader, I won’t demo them
exhaustively here.
Further inquiries about the document can be made, as well:
CTIgor> Show me the C2 IP addresses, formatted as a JSON list of strings.
=======
Here are the C2 IP addresses formatted as a JSON list of strings:
` ` `json
[
"129.126.109.50:22",
"116.88.34.184:22",
"223.25.78.136:22",
"45.77.39.28:22",
"101.100.182.122:22",
"118.189.188.122:22",
"158.140.135.244:22",
"8.222.225.8:22"
]
` ` `
=======
In the above, the C2 IPs were present in the document, but the summary didn’t include them, so we inquired further about them. If we
want to have the agent dig into and explain the details of how Veriexec
was abused, we can do that too:
CTIgor> How exactly did it bypass security using veriexec?
=======
The malware managed to circumvent Veriexec protection through a process
injection technique. Here's a detailed breakdown of how this was achieved:
1. **Understanding Veriexec**: Veriexec is a kernel-based file integrity
subsystem used in Junos OS that prevents unauthorized binaries, libraries, and
scripts from executing. Its primary role is to maintain the integrity of the
operating system by blocking code that has not been verified.
2. **Challenge of Disabling Veriexec**: Directly disabling Veriexec would
trigger alerts. Therefore, the attackers needed a method that allowed them to
run untrusted code without raising alarms.
3. **Process Injection Technique**: Instead of trying to disable Veriexec, the
threat actors executed malicious code within the context of a legitimate,
trusted process. This technique allowed the malware to run without triggering
Veriexec's security measures.
4. **Execution Steps**:
- The threat actor first gained privileged access to a Juniper router using
legitimate credentials.
- They entered the FreeBSD shell from the Junos OS command-line interface
(CLI).
- The malicious actors generated a Base64-encoded payload (named `ldb.b64`)
that was then decoded and extracted into a compressed archive containing the
malware binaries.
- They injected malicious payloads into the memory of a legitimate process
(a newly spawned `cat` process). This involved using the `dd` command to write
the malicious code directly into the memory of the `cat` process.
5. **Impact of the Injection**: By redirecting the executed commands within the
legitimate process, the malware effectively replaced certain function calls.
For instance, they altered the global offset table entry for the `fclose`
function, which allowed the `cat` process to execute the shellcode loader of
the malware instead of its intended function, thus executing the injected code
without alerting the Veriexec system.
6. **Consequences**: This method granted the attackers the ability to run their
malicious payloads while leaving the original legitimate processes untouched,
thereby maintaining a low profile within the compromised system and enabling
continuous access.
In summary, by employing process injection within the context of a trusted
process, the attackers successfully bypassed Veriexec's security measures
without triggering alerts, allowing their malware to execute while minimizing
the risk of detection.
=======
Another example:
CTIgor> Are there any URLs that I should visit with more details about this specific attack?
=======
Here are some URLs to resources that provide detailed information about the
specific attack by UNC3886 on Juniper routers, as well as broader insights into
similar cyber espionage tactics:
1. **Mandiant Blog Post on the Incident**:
- [Ghost in the Router: China-Nexus Espionage Actor UNC3886 Targets Juniper
Routers](https://cloud.google.com/blog/topics/threat-intelligence/china-nexus-espionage-targets-juniper-routers)
- This blog post by Mandiant details the
discovery of the backdoors on Juniper routers, the techniques used, and
recommended actions for organizations.
2. **Mandiant Threat Intelligence Reports**:
- [Mandiant Threat Intelligence
Reports](https://www.mandiant.com/resources/reports) - A collection of threat
intelligence reports, including those that detail activities of cyber espionage
groups like UNC3886.
3. **Juniper Networks Security Advisories**:
- [Juniper Networks Security
Advisories](https://www.juniper.net/support/advisories/) - Here, you can find
security advisories related to vulnerabilities and incidents affecting Juniper
products, including any advisories related to the malware discovered by
Mandiant.
4. **MITRE ATT&CK Framework**:
- [MITRE ATT&CK](https://attack.mitre.org/) - The database of tactics and
techniques that attackers may use. Search for techniques related to "process
injection" or other methods discussed in the report.
5. **Cisco Talos Intelligence Group**:
- [Cisco Talos Intelligence Reports](https://talosintelligence.com/) -
Provides updates and reports on the latest threats, vulnerabilities, and
incidents relevant to Cisco products and the broader cybersecurity landscape.
6. **FireEye Threat Intelligence**:
- [FireEye Intelligence
Reports](https://www.fireeye.com/current-threats/threat-intelligence-reports.html) -
FireEye provides insights into various APT groups and their tactics,
including reports on activities of cyber espionage.
7. **Cybersecurity & Infrastructure Security Agency (CISA)**:
- [CISA Alerts & Technical
Guidance](https://www.cisa.gov/uscert/ncas/current-activity) - CISA regularly
publishes alerts about vulnerabilities and threats affecting critical
infrastructure, which often includes network devices.
These resources will provide you with explicit insights and further context
about the methodologies used in the attack, the implications for organizations,
and general recommendations for improving security.
=======
Adding a Better Front-End
This is helpful, but it is basically an AI chat-bot at its core, with a few additional features. Due to this approach, using it is a bit cumbersome for providing CTI assistance to report review. A more purpose-built front end (still command-line, though) is warranted. Let’s add some command-line options, as well as a bootstrap behavior to streamline loading the specified CTI report.
A new program named ctigor_summarize.py
built against the same interface, and derived from test_chat.py
:
import asyncio
from argparse import ArgumentParser
from ctiagent import CTIgor
class CTIgorReportSummarizer:
def __init__(self):
pass
def argparse():
ap = ArgumentParser(description="Have the AI agent ummarize a CTI report, and optionally provide interactive analysis of it")
ap.add_argument('-f', '--filename', required=False, type=str, help="File path on disk to summarize")
ap.add_argument('-i', '--interactive', required=False, default=False, action='store_true',
help="Provide an interactive prompt for more analysis")
return ap.parse_args()
async def load_file(self):
return await self.ctigor.prompt(f"Summarize the text file {self.args.filename} for me")
async def main(self):
self.args = CTIgorReportSummarizer.argparse()
self.ctigor = CTIgor()
if self.args.filename:
response = await self.load_file()
print(f'Summary of {self.args.filename}')
print('================================')
print(f"{response}\n")
if not self.args.interactive:
# If -i was not specified, then exit early
return
# Loop forever while the user has more input
while True:
try:
# Get input from the user, display a prompt to indicate waiting on user input
user_prompt = input('CTIgor> ')
# If user says 'quit' then exit
if user_prompt.lower() in ['quit', 'exit', 'bye']:
raise EOFError
# Send the user's prompt to the LLM and wait for the response
response = await self.ctigor.prompt(user_prompt)
# Display response on console
print(f"=======\n{response}\n=======")
except EOFError:
# On EOF, exit the program gracefully
print("Thank you, bye!")
break
if __name__ == "__main__":
summarizer = CTIgorReportSummarizer()
asyncio.run(summarizer.main())
Run the above as ctigor_summarize.py -f report.txt
if you just want a summary of the report and for it to exit. Adding the -i
argument
will run it in interactive mode, which will give the user a prompt allowing further questions to be asked of the LLM. Note that when
using online LLMs that are pay-by-request or similar, each prompt counts as additional usage. This new version provides an easily
scriptable interface which can be used to bulk-summarize multiple reports, saved as text files.
Integrating Web Retrieval
Next, let’s add a helper function that will handle the web-retrieval step for us, so we don’t have to use an external tool like lynx
or similar to get the content from the web. To accomplish this, we will use the Python library
html2text
, which needs to be added to requirements.txt
or added via uv add
, if using
uv
.
Similar to above with FilePlugin
, edit ctiagent_functions.py
to add the new plugin:
class WebPlugin:
"""A Plugin to work with Web URLs"""
@kernel_function(name="download_report", description="Given a URL, convert the page to markdown text and return it as a string")
async def load_from_web(
self, url: Annotated[str, "URL to read from the web into markdown content"]
) -> Annotated[bytes, "The contents from the site, formatted as Markdown"]:
async with aiohttp.ClientSession() as session:
resphtml = await session.get(url)
async with resphtml:
resptxt = html2text.html2text(await resphtml.text())
return resptxt
return None
The above code uses two new modules, which need to be imported in ctiagent_functions.py
:
import aiohttp
import html2text
Then, edit the appropriate lines in ctiagent.py
to ensure it is loaded into the agent’s context:
from ctiagent_functions import RandomNumberPlugin, FilePlugin, WebPlugin
# Register the RandomNumberPlugin, FilePlugin, WebPlugin with the kernel
self.kernel.add_plugin(RandomNumberPlugin(), plugin_name="random_number")
self.kernel.add_plugin(FilePlugin(), plugin_name="file")
self.kernel.add_plugin(WebPlugin(), plugin_name="web") # This line was added
And, finally, we’ll update ctigor_summarize.txt
to offer an interface to pull the content from the web or
a file:
import asyncio
from argparse import ArgumentParser
from ctiagent import CTIgor
class CTIgorReportSummarizer:
def __init__(self):
pass
def argparse():
ap = ArgumentParser(description="Have the AI agent ummarize a CTI report, and optionally provide interactive analysis of it")
ap.add_argument('-f', '--filename', required=False, type=str, help="File path on disk to summarize")
ap.add_argument('-w', '--webpage', required=False, type=str, help="URL of HTML webpage to summarize")
ap.add_argument('-i', '--interactive', required=False, default=False, action='store_true',
help="Provide an interactive prompt for more analysis")
return ap.parse_args()
# A prompt to instruct the LLM to load an already-converted text file of a report from a file on disk
async def load_file(self):
return await self.ctigor.prompt(f"Summarize the text file {self.args.filename} for me")
# A prompt to instruct the LLM to call WebPlugin to fetch + convert the webpage to Markdown for us
async def load_webpage(self):
return await self.ctigor.prompt(f"Summarize the webpage {self.args.webpage} for me")
async def main(self):
self.args = CTIgorReportSummarizer.argparse()
self.ctigor = CTIgor()
# If both -w and -f are specified, the -w takes precedence and the -f will be ignored
if self.args.webpage:
response = await self.load_webpage()
print(f'Summary of {self.args.webpage}')
print('================================')
print(f"{response}\n")
elif self.args.filename:
response = await self.load_file()
print(f'Summary of {self.args.filename}')
print('================================')
print(f"{response}\n")
if not self.args.interactive:
# If -i was not specified, then exit early
return
# Loop forever while the user has more input
while True:
try:
# Get input from the user, display a prompt to indicate waiting on user input
user_prompt = input('CTIgor> ')
# If user says 'quit' then exit
if user_prompt.lower() in ['quit', 'exit', 'bye']:
raise EOFError
# Send the user's prompt to the LLM and wait for the response
response = await self.ctigor.prompt(user_prompt)
# Display response on console
print(f"=======\n{response}\n=======")
except EOFError:
# On EOF, exit the program gracefully
print("Thank you, bye!")
break
if __name__ == "__main__":
summarizer = CTIgorReportSummarizer()
asyncio.run(summarizer.main())
Example Output
ctigor_summarize.py -w \
https://www.lookout.com/threat-intelligence/article/lookout-discovers-new-spyware-by-north-korean-apt37 \
-i
Summary of https://www.lookout.com/threat-intelligence/article/lookout-discovers-new-spyware-by-north-korean-apt37
================================
The article on Lookout's website discusses the discovery of a new Android
spyware known as KoSpy, attributed to the North Korean advanced persistent
threat (APT) group APT37, also referred to as ScarCruft. Here are the key
points:
1. **Nature of KoSpy**:
- KoSpy is designed to masquerade as legitimate utility applications,
targeting Korean and English-speaking users. It primarily infects devices by
appearing as apps like "File Manager" and "Software Update Utility."
2. **Discovery Timeline**:
- The spyware was first detected in March 2022, with active samples still
observed in March 2024.
3. **Operational Mechanics**:
- It utilizes a two-stage command and control (C2) infrastructure, relying
on a Firebase database to load configurations and update its operations.
- The spyware is capable of collecting sensitive data including SMS
messages, call logs, location information, and files, using dynamically loaded
plugins.
4. **Distribution Channels**:
- KoSpy samples have been found in both Google Play and third-party app
stores, though they have been removed following detection.
5. **Connections to Other Groups**:
- There are indications that the infrastructure behind KoSpy shares
similarities with another North Korean hacking group known as APT43 (Kimsuky).
6. **Language Support**:
- The spyware app displays its user interface in both Korean and English,
indicating targeted attacks on specific demographics.
7. **Attribution and Threat Landscape**:
- The research highlights the challenges in attributing cyber attacks due to
overlapping infrastructures and tactics used by different North Korean threat
actor groups.
The overall analysis underscores the sophistication and ongoing threat of the
KoSpy spyware as part of North Korea's cyber espionage efforts.
Using the interactive mode, further information and/or customized summaries can be generated:
CTIgor> List a timeline of the KoSpy spyware activity described in the report
and include infrastructure like IPs and Domains
=======
Here’s a summarized timeline of the KoSpy spyware activity based on the
information from the report, including infrastructure elements such as domains
and IPs:
### KoSpy Spyware Activity Timeline
1. **March 2022**:
- **Event**: Initial observations of KoSpy spyware samples begin.
- **Infrastructure**: Specific domains or IPs are not detailed for this
early activity.
2. **2022-2023** (Ongoing):
- **Event**: KoSpy remains active, targeting Korean and English-speaking
users with malicious applications, primarily disguised as utility apps.
- **Infrastructure**: The report indicates usage of Firebase Firestore for
configuration retrieval, although exact domains are not specified yet.
3. **March 2024**:
- **Event**: Identification of the most recent samples of KoSpy spyware.
- **Infrastructure**:
- **C2 Domains**:
- `joinupvts.org`
- `resolveissue.org`
- `crowdon.info`
- `st0746.net`
- **IP Address**: `27.255.79.225` (associated with domain `st0746.net`,
located in South Korea).
4. **Ongoing** (post-March 2024):
- **Event**: All KoSpy-related apps discovered on the Google Play Store have
been removed, and related Firebase projects have been deactivated by Google.
5. **Shared Infrastructure**:
- **Event**: Evidence of KoSpy's infrastructure overlaps with another North
Korean group, APT43 (Kimsuky). The shared domains and IPs suggest coordinated
cyber activities.
- **Additional Notes**: The report does not provide specific timestamps for
historical infrastructure but mentions past associations of the listed domains
with malicious activities aimed at Korean users.
This timeline captures key events and infrastructure related to the KoSpy
spyware, including discovery, active operation phases, and notable
infrastructure elements such as domains and IPs.
=======
Cost Analysis
In working through this blog post, using Azure OpenAI and the gpt-4o-mini
model that is available,
a total of 151
requests were made, resulting in 672,000+
tokens, for a total AI usage charge of
about $0.10
. It is worth remembering that “token count” is roughly equivalent to “word count” in
the documents, so some usage on the input side can be estimated using wc -w
on inputs.
Project URL
A companion GitHub project is available at the following URL, which includes some of the example code from this blog post:
- https://github.com/ckane/ctigor/tree/semantic-kernel-port/ (Semantic Kernel implementation, associated with this blog post)
- https://github.com/ckane/ctigor (AutoGen implementation)
Permanent Link: https://blog.malware.re/2025/03/28/ctigor/index.html